pulse-server: send STREAM_MOVED messages

Keep track of the last known peer of a stream and send a moved message
when it changes.

Fixes #2407
This commit is contained in:
Wim Taymans 2022-06-03 16:59:08 +02:00
parent be9c738661
commit efe30d5075
3 changed files with 75 additions and 0 deletions

View file

@ -798,6 +798,7 @@ static int reply_create_record_stream(struct stream *stream, struct pw_manager_o
static int reply_create_stream(struct stream *stream, struct pw_manager_object *peer) static int reply_create_stream(struct stream *stream, struct pw_manager_object *peer)
{ {
stream->peer_index = peer->index;
return stream->direction == PW_DIRECTION_OUTPUT ? return stream->direction == PW_DIRECTION_OUTPUT ?
reply_create_playback_stream(stream, peer) : reply_create_playback_stream(stream, peer) :
reply_create_record_stream(stream, peer); reply_create_record_stream(stream, peer);
@ -830,6 +831,27 @@ static void manager_added(void *data, struct pw_manager_object *o)
if (spa_streq(o->type, PW_TYPE_INTERFACE_Link)) { if (spa_streq(o->type, PW_TYPE_INTERFACE_Link)) {
struct stream *s, *t; struct stream *s, *t;
struct pw_manager_object *peer = NULL; struct pw_manager_object *peer = NULL;
union pw_map_item *item;
pw_array_for_each(item, &client->streams.items) {
struct stream *s = item->data;
const char *peer_name;
if (pw_map_item_is_free(item) || s->pending)
continue;
if (s->peer_index == SPA_ID_INVALID)
continue;
peer = find_peer_for_link(manager, o, s->id, s->direction);
if (peer == NULL || peer->props == NULL ||
peer->index == s->peer_index)
continue;
s->peer_index = peer->index;
peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
if (peer_name != NULL)
stream_send_moved(s, peer->index, peer_name);
}
spa_list_for_each_safe(s, t, &client->pending_streams, link) { spa_list_for_each_safe(s, t, &client->pending_streams, link) {
peer = find_peer_for_link(manager, o, s->id, s->direction); peer = find_peer_for_link(manager, o, s->id, s->direction);
if (peer) { if (peer) {

View file

@ -82,6 +82,8 @@ struct stream *stream_new(struct client *client, enum stream_type type, uint32_t
stream->attr = *attr; stream->attr = *attr;
spa_ringbuffer_init(&stream->ring); spa_ringbuffer_init(&stream->ring);
stream->peer_index = SPA_ID_INVALID;
parse_frac(client->props, "pulse.min.req", &defs->min_req, &stream->min_req); parse_frac(client->props, "pulse.min.req", &defs->min_req, &stream->min_req);
parse_frac(client->props, "pulse.min.frag", &defs->min_frag, &stream->min_frag); parse_frac(client->props, "pulse.min.frag", &defs->min_frag, &stream->min_frag);
parse_frac(client->props, "pulse.min.quantum", &defs->min_quantum, &stream->min_quantum); parse_frac(client->props, "pulse.min.quantum", &defs->min_quantum, &stream->min_quantum);
@ -359,3 +361,51 @@ int stream_update_minreq(struct stream *stream, uint32_t minreq)
} }
return 0; return 0;
} }
int stream_send_moved(struct stream *stream, uint32_t peer_index, const char *peer_name)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
uint32_t command;
command = stream->direction == PW_DIRECTION_OUTPUT ?
COMMAND_PLAYBACK_STREAM_MOVED :
COMMAND_RECORD_STREAM_MOVED;
pw_log_info("client %p [%s]: stream %p %s channel:%u",
client, client->name, stream, commands[command].name,
stream->channel);
if (client->version < 12)
return 0;
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, command,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_U32, peer_index,
TAG_STRING, peer_name,
TAG_BOOLEAN, false, /* suspended */
TAG_INVALID);
if (client->version >= 13) {
if (command == COMMAND_PLAYBACK_STREAM_MOVED) {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.tlength,
TAG_U32, stream->attr.prebuf,
TAG_U32, stream->attr.minreq,
TAG_USEC, stream->lat_usec,
TAG_INVALID);
} else {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.fragsize,
TAG_USEC, stream->lat_usec,
TAG_INVALID);
}
}
return client_queue_message(client, reply);
}

View file

@ -60,6 +60,8 @@ struct stream {
uint32_t id; /* id of global */ uint32_t id; /* id of global */
uint32_t index; /* index */ uint32_t index; /* index */
uint32_t peer_index;
struct impl *impl; struct impl *impl;
struct client *client; struct client *client;
enum stream_type type; enum stream_type type;
@ -128,5 +130,6 @@ int stream_send_killed(struct stream *stream);
int stream_send_started(struct stream *stream); int stream_send_started(struct stream *stream);
int stream_send_request(struct stream *stream); int stream_send_request(struct stream *stream);
int stream_update_minreq(struct stream *stream, uint32_t minreq); int stream_update_minreq(struct stream *stream, uint32_t minreq);
int stream_send_moved(struct stream *stream, uint32_t peer_index, const char *peer_name);
#endif /* PULSER_SERVER_STREAM_H */ #endif /* PULSER_SERVER_STREAM_H */