pulse-server: reply async to some requests

Trigger a roundtrip before we send the reply to the client for many
methods like set_volume/set_mute etc.

this ensure that we get the updated information from the server before
we reply so that a subsequent query returns the up-to-data information.

Fixes #868
This commit is contained in:
Wim Taymans 2021-03-09 09:53:39 +01:00
parent 808b54bc19
commit d29bdf64ee

View file

@ -117,6 +117,13 @@ struct client;
#include "sample.c" #include "sample.c"
struct operation {
struct spa_list link;
struct client *client;
int seq;
uint32_t tag;
};
struct client { struct client {
struct spa_list link; struct spa_list link;
struct impl *impl; struct impl *impl;
@ -133,6 +140,7 @@ struct client {
struct pw_properties *props; struct pw_properties *props;
struct pw_core *core; struct pw_core *core;
struct spa_hook core_listener;
struct pw_manager *manager; struct pw_manager *manager;
struct spa_hook manager_listener; struct spa_hook manager_listener;
@ -275,6 +283,7 @@ struct impl {
#include "collect.c" #include "collect.c"
#include "module.c" #include "module.c"
static void sample_free(struct sample *sample) static void sample_free(struct sample *sample)
{ {
struct impl *impl = sample->impl; struct impl *impl = sample->impl;
@ -480,6 +489,35 @@ static int reply_error(struct client *client, uint32_t command, uint32_t tag, in
return send_message(client, reply); return send_message(client, reply);
} }
static int operation_new(struct client *client, uint32_t tag)
{
struct operation *o;
if ((o = calloc(1, sizeof(*o))) == NULL)
return -errno;
o->client = client;
o->tag = tag;
o->seq = pw_core_sync(client->core, PW_ID_CORE, 0);
spa_list_append(&client->operations, &o->link);
return 0;
}
static void operation_free(struct operation *o)
{
spa_list_remove(&o->link);
free(o);
}
static void operation_complete(struct operation *o)
{
struct client *client = o->client;
pw_log_info(NAME" %p: [%s] tag:%u complete", client, client->name, o->tag);
reply_simple_ack(o->client, o->tag);
operation_free(o);
}
#include "extension.c" #include "extension.c"
static int send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for) static int send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for)
@ -980,6 +1018,22 @@ static void manager_metadata(void *data, struct pw_manager_object *o,
} }
} }
static void client_core_done(void *data, uint32_t id, int seq)
{
struct client *client = data;
struct operation *o, *t;
spa_list_for_each_safe(o, t, &client->operations, link) {
if (o->seq == seq)
operation_complete(o);
}
}
static const struct pw_core_events client_core_events = {
PW_VERSION_CORE_EVENTS,
.done = client_core_done,
};
static const struct pw_manager_events manager_events = { static const struct pw_manager_events manager_events = {
PW_VERSION_MANAGER_EVENTS, PW_VERSION_MANAGER_EVENTS,
.sync = manager_sync, .sync = manager_sync,
@ -1028,6 +1082,8 @@ static int do_set_client_name(struct client *client, uint32_t command, uint32_t
goto error; goto error;
} }
client->connect_tag = tag; client->connect_tag = tag;
pw_core_add_listener(client->core, &client->core_listener,
&client_core_events, client);
pw_manager_add_listener(client->manager, &client->manager_listener, pw_manager_add_listener(client->manager, &client->manager_listener,
&manager_events, client); &manager_events, client);
} else { } else {
@ -3108,7 +3164,7 @@ static int do_set_stream_volume(struct client *client, uint32_t command, uint32_
return res; return res;
} }
done: done:
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -3159,7 +3215,7 @@ static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t
return res; return res;
} }
done: done:
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_set_volume(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_set_volume(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -3224,7 +3280,7 @@ static int do_set_volume(struct client *client, uint32_t command, uint32_t tag,
return res; return res;
done: done:
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_set_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_set_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -3288,7 +3344,7 @@ static int do_set_mute(struct client *client, uint32_t command, uint32_t tag, st
if (res < 0) if (res < 0)
return res; return res;
done: done:
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_set_port(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_set_port(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -3344,7 +3400,7 @@ static int do_set_port(struct client *client, uint32_t command, uint32_t tag, st
if ((res = set_card_port(card, device_id, port_id)) < 0) if ((res = set_card_port(card, device_id, port_id)) < 0)
return res; return res;
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_set_port_latency_offset(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_set_port_latency_offset(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -3412,7 +3468,7 @@ static int do_set_port_latency_offset(struct client *client, uint32_t command, u
if (res < 0) if (res < 0)
break; break;
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
return res; return res;
@ -4827,7 +4883,7 @@ static int do_set_profile(struct client *client, uint32_t command, uint32_t tag,
SPA_PARAM_PROFILE_index, SPA_POD_Int(profile_id), SPA_PARAM_PROFILE_index, SPA_POD_Int(profile_id),
SPA_PARAM_PROFILE_save, SPA_POD_Bool(true))); SPA_PARAM_PROFILE_save, SPA_POD_Bool(true)));
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_set_default(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_set_default(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -4900,7 +4956,7 @@ static int do_suspend(struct client *client, uint32_t command, uint32_t tag, str
cmd = SPA_NODE_COMMAND_Suspend; cmd = SPA_NODE_COMMAND_Suspend;
pw_node_send_command((struct pw_node*)o->proxy, &SPA_NODE_COMMAND_INIT(cmd)); pw_node_send_command((struct pw_node*)o->proxy, &SPA_NODE_COMMAND_INIT(cmd));
} }
return reply_simple_ack(client, tag); return operation_new(client, tag);
} }
static int do_move_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_move_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -5330,6 +5386,7 @@ static void client_free(struct client *client)
struct impl *impl = client->impl; struct impl *impl = client->impl;
struct message *msg; struct message *msg;
struct pending_sample *p; struct pending_sample *p;
struct operation *o;
pw_log_info(NAME" %p: client %p free", impl, client); pw_log_info(NAME" %p: client %p free", impl, client);
@ -5343,6 +5400,9 @@ static void client_free(struct client *client)
spa_list_consume(msg, &client->out_messages, link) spa_list_consume(msg, &client->out_messages, link)
message_free(impl, msg, true, false); message_free(impl, msg, true, false);
spa_list_consume(o, &client->operations, link)
operation_free(o);
if (client->core) { if (client->core) {
client->disconnecting = true; client->disconnecting = true;
pw_core_disconnect(client->core); pw_core_disconnect(client->core);