pulse-server: emulate synchronous MOVE_* commands

Make MOVE_SINK_INPUT/MOVE_SOURCE_OUTPUT change the linked peer
immediately in subsequent GET_SINK_INPUT_INFO/GET_SOURCE_OUTPUT_INFO
commands.  Do this by keeping track of the sink/source where the client
moved the stream to, and temporarily replying so in future GET_INFO (but
only in messages for that client).

We discard the temporary override when we either get an update event for
the stream (i.e. SM moved the stream), or a 1sec timer runs out. If the
timer runs out, we emit a sink-input/source-output change event, as in
that case what we claimed in the earlier GET_INFO messages might not be
true, so clients need to update their information.

This gets rid of race conditions where an application moves a stream,
and expects the move to be visible in future GET_INFO replies, which may
fail to happen because it takes some time for the session manager to
re-link the streams.

Fixes pasystray behavior.
This commit is contained in:
Pauli Virtanen 2022-05-14 12:44:14 +03:00 committed by Wim Taymans
parent 83be5d866f
commit 98aa7ccff0

View file

@ -88,11 +88,18 @@
#define MAX_FORMATS 32
#define TEMPORARY_MOVE_TIMEOUT (SPA_NSEC_PER_SEC)
bool debug_messages = false;
struct latency_offset_data {
int64_t prev_latency_offset;
unsigned int initialized:1;
uint8_t initialized:1;
};
struct temporary_move_data {
uint32_t peer_index;
uint8_t used:1;
};
static struct sample *find_sample(struct impl *impl, uint32_t index, const char *name)
@ -256,6 +263,75 @@ static int send_object_event(struct client *client, struct pw_manager_object *o,
return 0;
}
static uint32_t get_temporary_move_target(struct client *client, struct pw_manager_object *o)
{
struct temporary_move_data *d;
d = pw_manager_object_get_data(o, "temporary_move_data");
if (d == NULL || d->peer_index == SPA_ID_INVALID)
return SPA_ID_INVALID;
pw_log_debug("[%s] using temporary move target for index:%d -> index:%d",
client->name, o->index, d->peer_index);
d->used = true;
return d->peer_index;
}
static void set_temporary_move_target(struct client *client, struct pw_manager_object *o, uint32_t index)
{
struct temporary_move_data *d;
if (!pw_manager_object_is_sink_input(o) && !pw_manager_object_is_source_output(o))
return;
if (index == SPA_ID_INVALID) {
d = pw_manager_object_get_data(o, "temporary_move_data");
if (d == NULL)
return;
if (d->peer_index != SPA_ID_INVALID)
pw_log_debug("cleared temporary move target for index:%d", o->index);
d->peer_index = SPA_ID_INVALID;
d->used = false;
return;
}
d = pw_manager_object_add_temporary_data(o, "temporary_move_data",
sizeof(struct temporary_move_data),
TEMPORARY_MOVE_TIMEOUT);
if (d == NULL)
return;
pw_log_debug("[%s] set temporary move target for index:%d to index:%d",
client->name, o->index, index);
d->peer_index = index;
d->used = false;
}
static void temporary_move_target_timeout(struct client *client, struct pw_manager_object *o)
{
struct temporary_move_data *d = pw_manager_object_get_data(o, "temporary_move_data");
struct pw_manager_object *peer;
/*
* Send change event if the temporary data was used, and the peer
* is not what we claimed.
*/
if (d == NULL || d->peer_index == SPA_ID_INVALID || !d->used)
goto done;
peer = find_linked(client->manager, o->id, pw_manager_object_is_sink_input(o) ?
PW_DIRECTION_OUTPUT : PW_DIRECTION_INPUT);
if (peer == NULL || peer->index != d->peer_index) {
pw_log_debug("[%s] temporary move timeout for index:%d, send change event",
client->name, o->index);
send_object_event(client, o, SUBSCRIPTION_EVENT_CHANGE);
}
done:
set_temporary_move_target(client, o, SPA_ID_INVALID);
}
static struct pw_manager_object *find_device(struct client *client,
uint32_t index, const char *name, bool sink, bool *is_monitor);
@ -773,6 +849,8 @@ static void manager_updated(void *data, struct pw_manager_object *o)
send_object_event(client, o, SUBSCRIPTION_EVENT_CHANGE);
set_temporary_move_target(client, o, SPA_ID_INVALID);
send_latency_offset_subscribe_event(client, o);
send_default_change_subscribe_event(client, pw_manager_object_is_sink(o), pw_manager_object_is_source_or_monitor(o));
}
@ -793,6 +871,14 @@ static void manager_removed(void *data, struct pw_manager_object *o)
}
}
static void manager_object_data_timeout(void *data, struct pw_manager_object *o, const char *key)
{
struct client *client = data;
if (spa_streq(key, "temporary_move_data"))
temporary_move_target_timeout(client, o);
}
static int json_object_find(const char *obj, const char *key, char *value, size_t len)
{
struct spa_json it[2];
@ -887,7 +973,8 @@ static const struct pw_manager_events manager_events = {
.updated = manager_updated,
.removed = manager_removed,
.metadata = manager_metadata,
.disconnect = manager_disconnect
.disconnect = manager_disconnect,
.object_data_timeout = manager_object_data_timeout,
};
static int do_set_client_name(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -3940,7 +4027,6 @@ static int fill_sink_input_info(struct client *client, struct message *m,
struct impl *impl = client->impl;
struct pw_node_info *info = o->info;
struct pw_manager *manager = client->manager;
struct pw_manager_object *peer;
const char *str;
uint32_t module_id = SPA_ID_INVALID, client_id = SPA_ID_INVALID;
uint32_t peer_index;
@ -3966,11 +4052,15 @@ static int fill_sink_input_info(struct client *client, struct message *m,
!volume_valid(&dev_info.volume_info.volume))
return -ENOENT;
peer = find_linked(manager, o->id, PW_DIRECTION_OUTPUT);
if (peer && pw_manager_object_is_sink(peer))
peer_index = peer->index;
else
peer_index = SPA_ID_INVALID;
peer_index = get_temporary_move_target(client, o);
if (peer_index == SPA_ID_INVALID) {
struct pw_manager_object *peer;
peer = find_linked(manager, o->id, PW_DIRECTION_OUTPUT);
if (peer && pw_manager_object_is_sink(peer))
peer_index = peer->index;
else
peer_index = SPA_ID_INVALID;
}
message_put(m,
TAG_U32, o->index, /* sink_input index */
@ -4020,7 +4110,6 @@ static int fill_source_output_info(struct client *client, struct message *m,
struct impl *impl = client->impl;
struct pw_node_info *info = o->info;
struct pw_manager *manager = client->manager;
struct pw_manager_object *peer;
const char *str;
uint32_t module_id = SPA_ID_INVALID, client_id = SPA_ID_INVALID;
uint32_t peer_index;
@ -4046,11 +4135,15 @@ static int fill_source_output_info(struct client *client, struct message *m,
!volume_valid(&dev_info.volume_info.volume))
return -ENOENT;
peer = find_linked(manager, o->id, PW_DIRECTION_INPUT);
if (peer && pw_manager_object_is_source_or_monitor(peer))
peer_index = peer->index;
else
peer_index = SPA_ID_INVALID;
peer_index = get_temporary_move_target(client, o);
if (peer_index == SPA_ID_INVALID) {
struct pw_manager_object *peer;
peer = find_linked(manager, o->id, PW_DIRECTION_INPUT);
if (peer && pw_manager_object_is_source_or_monitor(peer))
peer_index = peer->index;
else
peer_index = SPA_ID_INVALID;
}
message_put(m,
TAG_U32, o->index, /* source_output index */
@ -4695,6 +4788,10 @@ static int do_move_stream(struct client *client, uint32_t command, uint32_t tag,
SPA_TYPE_INFO_BASE"Id", "%"PRIi64, target_serial)) < 0)
return res;
/* We will temporarily claim the stream was already moved */
set_temporary_move_target(client, o, dev->index);
send_object_event(client, o, SUBSCRIPTION_EVENT_CHANGE);
return reply_simple_ack(client, tag);
}