From b136bb8ca5675ad67c338f88f49f1f93a9ab88f0 Mon Sep 17 00:00:00 2001 From: Julian Bouzas Date: Thu, 26 Aug 2021 15:53:33 -0400 Subject: [PATCH] pipewire-pulse: delay create stream reply if the peer is not found When the client adapter is configured in passthrough mode, the stream param changed event in pipewire-pulse is emitted before the session manager creates the link, and not after. Therfore, the peer can never be found when replying create stream, and the pulseaudio application receives a stream error. This patch delays the create stream reply until the link is added if the peer cannot be found, fixing the above race conditon to allow passthrough mode to work with pulseaudio applications. --- src/modules/module-protocol-pulse/client.h | 2 + .../module-protocol-pulse/pulse-server.c | 673 +++++++++--------- src/modules/module-protocol-pulse/server.c | 1 + src/modules/module-protocol-pulse/stream.c | 2 + src/modules/module-protocol-pulse/stream.h | 1 + 5 files changed, 353 insertions(+), 326 deletions(-) diff --git a/src/modules/module-protocol-pulse/client.h b/src/modules/module-protocol-pulse/client.h index 88712eaa8..1b923d281 100644 --- a/src/modules/module-protocol-pulse/client.h +++ b/src/modules/module-protocol-pulse/client.h @@ -92,6 +92,8 @@ struct client { struct spa_list pending_samples; + struct spa_list pending_streams; + unsigned int disconnect:1; unsigned int disconnecting:1; unsigned int need_flush:1; diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index e88b58847..e07dc3290 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -357,6 +357,329 @@ static void handle_metadata(struct client *client, struct pw_manager_object *old } } +static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss) +{ + uint64_t u; + u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom; + u = (u + 1000000UL - 1) / 1000000UL; + u *= sample_spec_frame_size(ss); + return (uint32_t) u; +} + +static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) +{ + uint32_t frame_size, max_prebuf, minreq; + struct defs *defs = &s->impl->defs; + + frame_size = s->frame_size; + minreq = frac_to_bytes_round_up(defs->min_req, &s->ss); + + if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) + attr->maxlength = MAXLENGTH; + attr->maxlength -= attr->maxlength % frame_size; + attr->maxlength = SPA_MAX(attr->maxlength, frame_size); + + if (attr->tlength == (uint32_t) -1) + attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss); + if (attr->tlength > attr->maxlength) + attr->tlength = attr->maxlength; + attr->tlength -= attr->tlength % frame_size; + attr->tlength = SPA_MAX(attr->tlength, frame_size); + attr->tlength = SPA_MAX(attr->tlength, minreq); + + if (attr->minreq == (uint32_t) -1) { + uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss); + /* With low-latency, tlength/4 gives a decent default in all of traditional, + * adjust latency and early request modes. */ + uint32_t m = attr->tlength / 4; + m -= m % frame_size; + attr->minreq = SPA_MIN(process, m); + } + attr->minreq = SPA_MAX(attr->minreq, minreq); + + if (attr->tlength < attr->minreq+frame_size) + attr->tlength = attr->minreq + frame_size; + + attr->minreq -= attr->minreq % frame_size; + if (attr->minreq <= 0) { + attr->minreq = frame_size; + attr->tlength += frame_size*2; + } + if (attr->tlength <= attr->minreq) + attr->tlength = attr->minreq*2 + frame_size; + + max_prebuf = attr->tlength + frame_size - attr->minreq; + if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf) + attr->prebuf = max_prebuf; + attr->prebuf -= attr->prebuf % frame_size; + + s->missing = attr->tlength; + attr->fragsize = 0; + + pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s, + s->client->name, attr->maxlength, attr->tlength, + attr->minreq, attr->prebuf); +} + +static int reply_create_playback_stream(struct stream *stream, struct pw_manager_object *peer) +{ + struct client *client = stream->client; + struct message *reply; + uint32_t missing, peer_id; + struct spa_dict_item items[5]; + char latency[32]; + char attr_maxlength[32]; + char attr_tlength[32]; + char attr_prebuf[32]; + char attr_minreq[32]; + const char *peer_name; + struct spa_fraction lat; + uint64_t lat_usec; + struct defs *defs = &stream->impl->defs; + + fix_playback_buffer_attr(stream, &stream->attr); + + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + + if (stream->early_requests) { + lat.num = stream->attr.minreq; + } else if (stream->adjust_latency) { + if (stream->attr.tlength > stream->attr.minreq * 2) + lat.num = (stream->attr.tlength - stream->attr.minreq * 2) / 2; + else + lat.num = stream->attr.minreq; + } else { + if (stream->attr.tlength > stream->attr.minreq * 2) + lat.num = stream->attr.tlength - stream->attr.minreq * 2; + else + lat.num = stream->attr.minreq; + } + lat.denom = stream->ss.rate; + lat.num /= stream->frame_size; + if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) + lat.num = (defs->min_quantum.num * lat.denom + + (defs->min_quantum.denom -1)) / defs->min_quantum.denom; + lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; + + snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); + snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); + snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength); + snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf); + snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq); + + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); + items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); + items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength); + items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf); + items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq); + pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5)); + + missing = stream_pop_missing(stream); + + pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s", + stream, client->name, stream->create_tag, missing, latency); + + reply = reply_new(client, stream->create_tag); + message_put(reply, + TAG_U32, stream->channel, /* stream index/channel */ + TAG_U32, stream->id, /* sink_input/stream index */ + TAG_U32, missing, /* missing/requested bytes */ + TAG_INVALID); + + if (peer && pw_manager_object_is_sink(peer)) { + peer_id = peer->id; + peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); + } else { + peer_id = SPA_ID_INVALID; + peer_name = NULL; + } + + if (client->version >= 9) { + message_put(reply, + TAG_U32, stream->attr.maxlength, + TAG_U32, stream->attr.tlength, + TAG_U32, stream->attr.prebuf, + TAG_U32, stream->attr.minreq, + TAG_INVALID); + } + if (client->version >= 12) { + message_put(reply, + TAG_SAMPLE_SPEC, &stream->ss, + TAG_CHANNEL_MAP, &stream->map, + TAG_U32, peer_id, /* sink index */ + TAG_STRING, peer_name, /* sink name */ + TAG_BOOLEAN, false, /* sink suspended state */ + TAG_INVALID); + } + if (client->version >= 13) { + message_put(reply, + TAG_USEC, lat_usec, /* sink configured latency */ + TAG_INVALID); + } + if (client->version >= 21) { + struct format_info info; + spa_zero(info); + info.encoding = ENCODING_PCM; + message_put(reply, + TAG_FORMAT_INFO, &info, /* sink_input format */ + TAG_INVALID); + } + + stream->create_tag = SPA_ID_INVALID; + + return client_queue_message(client, reply); +} + +static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr) +{ + uint32_t frame_size, minfrag; + struct defs *defs = &s->impl->defs; + + frame_size = s->frame_size; + + if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) + attr->maxlength = MAXLENGTH; + attr->maxlength -= attr->maxlength % frame_size; + attr->maxlength = SPA_MAX(attr->maxlength, frame_size); + + minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss); + + if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) + attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss); + attr->fragsize -= attr->fragsize % frame_size; + attr->fragsize = SPA_MAX(attr->fragsize, minfrag); + attr->fragsize = SPA_MAX(attr->fragsize, frame_size); + + if (attr->fragsize > attr->maxlength) + attr->fragsize = attr->maxlength; + + attr->tlength = attr->minreq = attr->prebuf = 0; + + pw_log_info(NAME" %p: [%s] maxlength:%u fragsize:%u minfrag:%u", s, + s->client->name, attr->maxlength, attr->fragsize, minfrag); +} + +static int reply_create_record_stream(struct stream *stream, struct pw_manager_object *peer) +{ + struct client *client = stream->client; + struct pw_manager *manager = client->manager; + struct message *reply; + struct spa_dict_item items[3]; + char latency[32], *tmp; + char attr_maxlength[32]; + char attr_fragsize[32]; + const char *peer_name, *name; + uint32_t peer_id; + struct spa_fraction lat; + uint64_t lat_usec; + struct defs *defs = &stream->impl->defs; + + fix_record_buffer_attr(stream, &stream->attr); + + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + + if (stream->early_requests) { + lat.num = stream->attr.fragsize; + } else if (stream->adjust_latency) { + lat.num = stream->attr.fragsize; + } else { + lat.num = stream->attr.fragsize; + } + + lat.num /= stream->frame_size; + lat.denom = stream->ss.rate; + if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) + lat.num = (defs->min_quantum.num * lat.denom + + (defs->min_quantum.denom -1)) / defs->min_quantum.denom; + lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; + + snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); + + snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); + snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize); + + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); + items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); + items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize); + pw_stream_update_properties(stream->stream, + &SPA_DICT_INIT(items, 3)); + + pw_log_info(NAME" %p: [%s] reply CREATE_RECORD_STREAM tag:%u latency:%s", + stream, client->name, stream->create_tag, latency); + + reply = reply_new(client, stream->create_tag); + message_put(reply, + TAG_U32, stream->channel, /* stream index/channel */ + TAG_U32, stream->id, /* source_output/stream index */ + TAG_INVALID); + + if (peer && pw_manager_object_is_sink_input(peer)) + peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT); + if (peer && pw_manager_object_is_source_or_monitor(peer)) { + name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); + if (!pw_manager_object_is_source(peer)) { + size_t len = (name ? strlen(name) : 5) + 10; + peer_id = peer->id | MONITOR_FLAG; + peer_name = tmp = alloca(len); + snprintf(tmp, len, "%s.monitor", name ? name : "sink"); + } else { + peer_id = peer->id; + peer_name = name; + } + } else { + peer_id = SPA_ID_INVALID; + peer_name = NULL; + } + + if (client->version >= 9) { + message_put(reply, + TAG_U32, stream->attr.maxlength, + TAG_U32, stream->attr.fragsize, + TAG_INVALID); + } + if (client->version >= 12) { + message_put(reply, + TAG_SAMPLE_SPEC, &stream->ss, + TAG_CHANNEL_MAP, &stream->map, + TAG_U32, peer_id, /* source index */ + TAG_STRING, peer_name, /* source name */ + TAG_BOOLEAN, false, /* source suspended state */ + TAG_INVALID); + } + if (client->version >= 13) { + message_put(reply, + TAG_USEC, lat_usec, /* source configured latency */ + TAG_INVALID); + } + if (client->version >= 22) { + struct format_info info; + spa_zero(info); + info.encoding = ENCODING_PCM; + message_put(reply, + TAG_FORMAT_INFO, &info, /* source_output format */ + TAG_INVALID); + } + + stream->create_tag = SPA_ID_INVALID; + + return client_queue_message(client, reply); +} + +static int reply_create_stream(struct stream *stream, struct pw_manager_object *peer) +{ + return stream->direction == PW_DIRECTION_OUTPUT ? + reply_create_playback_stream(stream, peer) : + reply_create_record_stream(stream, peer); +} + static void manager_added(void *data, struct pw_manager_object *o) { struct client *client = data; @@ -370,6 +693,20 @@ static void manager_added(void *data, struct pw_manager_object *o) handle_metadata(client, NULL, o, str); } + if (spa_streq(o->type, PW_TYPE_INTERFACE_Link)) { + struct stream *s; + struct pw_manager_object *peer = NULL; + spa_list_for_each(s, &client->pending_streams, link) { + peer = find_linked(s->client->manager, s->id, s->direction); + if (peer) + break; + } + if (peer) { + reply_create_stream(s, peer); + spa_list_remove(&s->link); + } + } + send_object_event(client, o, SUBSCRIPTION_EVENT_NEW); /* Adding sinks etc. may also change defaults */ @@ -559,327 +896,6 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s return reply_simple_ack(client, tag); } -static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss) -{ - uint64_t u; - u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom; - u = (u + 1000000UL - 1) / 1000000UL; - u *= sample_spec_frame_size(ss); - return (uint32_t) u; -} - -static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) -{ - uint32_t frame_size, max_prebuf, minreq; - struct defs *defs = &s->impl->defs; - - frame_size = s->frame_size; - minreq = frac_to_bytes_round_up(defs->min_req, &s->ss); - - if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) - attr->maxlength = MAXLENGTH; - attr->maxlength -= attr->maxlength % frame_size; - attr->maxlength = SPA_MAX(attr->maxlength, frame_size); - - if (attr->tlength == (uint32_t) -1) - attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss); - if (attr->tlength > attr->maxlength) - attr->tlength = attr->maxlength; - attr->tlength -= attr->tlength % frame_size; - attr->tlength = SPA_MAX(attr->tlength, frame_size); - attr->tlength = SPA_MAX(attr->tlength, minreq); - - if (attr->minreq == (uint32_t) -1) { - uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss); - /* With low-latency, tlength/4 gives a decent default in all of traditional, - * adjust latency and early request modes. */ - uint32_t m = attr->tlength / 4; - m -= m % frame_size; - attr->minreq = SPA_MIN(process, m); - } - attr->minreq = SPA_MAX(attr->minreq, minreq); - - if (attr->tlength < attr->minreq+frame_size) - attr->tlength = attr->minreq + frame_size; - - attr->minreq -= attr->minreq % frame_size; - if (attr->minreq <= 0) { - attr->minreq = frame_size; - attr->tlength += frame_size*2; - } - if (attr->tlength <= attr->minreq) - attr->tlength = attr->minreq*2 + frame_size; - - max_prebuf = attr->tlength + frame_size - attr->minreq; - if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf) - attr->prebuf = max_prebuf; - attr->prebuf -= attr->prebuf % frame_size; - - s->missing = attr->tlength; - attr->fragsize = 0; - - pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s, - s->client->name, attr->maxlength, attr->tlength, - attr->minreq, attr->prebuf); -} - -static int reply_create_playback_stream(struct stream *stream) -{ - struct client *client = stream->client; - struct pw_manager *manager = client->manager; - struct message *reply; - uint32_t missing, peer_id; - struct spa_dict_item items[5]; - char latency[32]; - char attr_maxlength[32]; - char attr_tlength[32]; - char attr_prebuf[32]; - char attr_minreq[32]; - struct pw_manager_object *peer; - const char *peer_name; - struct spa_fraction lat; - uint64_t lat_usec; - struct defs *defs = &stream->impl->defs; - - fix_playback_buffer_attr(stream, &stream->attr); - - stream->buffer = calloc(1, stream->attr.maxlength); - if (stream->buffer == NULL) - return -errno; - - spa_ringbuffer_init(&stream->ring); - - if (stream->early_requests) { - lat.num = stream->attr.minreq; - } else if (stream->adjust_latency) { - if (stream->attr.tlength > stream->attr.minreq * 2) - lat.num = (stream->attr.tlength - stream->attr.minreq * 2) / 2; - else - lat.num = stream->attr.minreq; - } else { - if (stream->attr.tlength > stream->attr.minreq * 2) - lat.num = stream->attr.tlength - stream->attr.minreq * 2; - else - lat.num = stream->attr.minreq; - } - lat.denom = stream->ss.rate; - lat.num /= stream->frame_size; - if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) - lat.num = (defs->min_quantum.num * lat.denom + - (defs->min_quantum.denom -1)) / defs->min_quantum.denom; - lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; - - snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); - snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); - snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength); - snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf); - snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq); - - items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); - items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); - items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength); - items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf); - items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq); - pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5)); - - missing = stream_pop_missing(stream); - - pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s", - stream, client->name, stream->create_tag, missing, latency); - - reply = reply_new(client, stream->create_tag); - message_put(reply, - TAG_U32, stream->channel, /* stream index/channel */ - TAG_U32, stream->id, /* sink_input/stream index */ - TAG_U32, missing, /* missing/requested bytes */ - TAG_INVALID); - - peer = find_linked(manager, stream->id, stream->direction); - if (peer && pw_manager_object_is_sink(peer)) { - peer_id = peer->id; - peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); - } else { - peer_id = SPA_ID_INVALID; - peer_name = NULL; - } - - if (client->version >= 9) { - message_put(reply, - TAG_U32, stream->attr.maxlength, - TAG_U32, stream->attr.tlength, - TAG_U32, stream->attr.prebuf, - TAG_U32, stream->attr.minreq, - TAG_INVALID); - } - if (client->version >= 12) { - message_put(reply, - TAG_SAMPLE_SPEC, &stream->ss, - TAG_CHANNEL_MAP, &stream->map, - TAG_U32, peer_id, /* sink index */ - TAG_STRING, peer_name, /* sink name */ - TAG_BOOLEAN, false, /* sink suspended state */ - TAG_INVALID); - } - if (client->version >= 13) { - message_put(reply, - TAG_USEC, lat_usec, /* sink configured latency */ - TAG_INVALID); - } - if (client->version >= 21) { - struct format_info info; - spa_zero(info); - info.encoding = ENCODING_PCM; - message_put(reply, - TAG_FORMAT_INFO, &info, /* sink_input format */ - TAG_INVALID); - } - - stream->create_tag = SPA_ID_INVALID; - - return client_queue_message(client, reply); -} - -static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr) -{ - uint32_t frame_size, minfrag; - struct defs *defs = &s->impl->defs; - - frame_size = s->frame_size; - - if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) - attr->maxlength = MAXLENGTH; - attr->maxlength -= attr->maxlength % frame_size; - attr->maxlength = SPA_MAX(attr->maxlength, frame_size); - - minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss); - - if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) - attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss); - attr->fragsize -= attr->fragsize % frame_size; - attr->fragsize = SPA_MAX(attr->fragsize, minfrag); - attr->fragsize = SPA_MAX(attr->fragsize, frame_size); - - if (attr->fragsize > attr->maxlength) - attr->fragsize = attr->maxlength; - - attr->tlength = attr->minreq = attr->prebuf = 0; - - pw_log_info(NAME" %p: [%s] maxlength:%u fragsize:%u minfrag:%u", s, - s->client->name, attr->maxlength, attr->fragsize, minfrag); -} - -static int reply_create_record_stream(struct stream *stream) -{ - struct client *client = stream->client; - struct pw_manager *manager = client->manager; - struct message *reply; - struct spa_dict_item items[3]; - char latency[32], *tmp; - char attr_maxlength[32]; - char attr_fragsize[32]; - struct pw_manager_object *peer; - const char *peer_name, *name; - uint32_t peer_id; - struct spa_fraction lat; - uint64_t lat_usec; - struct defs *defs = &stream->impl->defs; - - fix_record_buffer_attr(stream, &stream->attr); - - stream->buffer = calloc(1, stream->attr.maxlength); - if (stream->buffer == NULL) - return -errno; - - spa_ringbuffer_init(&stream->ring); - - if (stream->early_requests) { - lat.num = stream->attr.fragsize; - } else if (stream->adjust_latency) { - lat.num = stream->attr.fragsize; - } else { - lat.num = stream->attr.fragsize; - } - - lat.num /= stream->frame_size; - lat.denom = stream->ss.rate; - if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) - lat.num = (defs->min_quantum.num * lat.denom + - (defs->min_quantum.denom -1)) / defs->min_quantum.denom; - lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; - - snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); - - snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); - snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize); - - items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); - items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); - items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize); - pw_stream_update_properties(stream->stream, - &SPA_DICT_INIT(items, 3)); - - pw_log_info(NAME" %p: [%s] reply CREATE_RECORD_STREAM tag:%u latency:%s", - stream, client->name, stream->create_tag, latency); - - reply = reply_new(client, stream->create_tag); - message_put(reply, - TAG_U32, stream->channel, /* stream index/channel */ - TAG_U32, stream->id, /* source_output/stream index */ - TAG_INVALID); - - peer = find_linked(manager, stream->id, stream->direction); - if (peer && pw_manager_object_is_sink_input(peer)) - peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT); - if (peer && pw_manager_object_is_source_or_monitor(peer)) { - name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); - if (!pw_manager_object_is_source(peer)) { - size_t len = (name ? strlen(name) : 5) + 10; - peer_id = peer->id | MONITOR_FLAG; - peer_name = tmp = alloca(len); - snprintf(tmp, len, "%s.monitor", name ? name : "sink"); - } else { - peer_id = peer->id; - peer_name = name; - } - } else { - peer_id = SPA_ID_INVALID; - peer_name = NULL; - } - - if (client->version >= 9) { - message_put(reply, - TAG_U32, stream->attr.maxlength, - TAG_U32, stream->attr.fragsize, - TAG_INVALID); - } - if (client->version >= 12) { - message_put(reply, - TAG_SAMPLE_SPEC, &stream->ss, - TAG_CHANNEL_MAP, &stream->map, - TAG_U32, peer_id, /* source index */ - TAG_STRING, peer_name, /* source name */ - TAG_BOOLEAN, false, /* source suspended state */ - TAG_INVALID); - } - if (client->version >= 13) { - message_put(reply, - TAG_USEC, lat_usec, /* source configured latency */ - TAG_INVALID); - } - if (client->version >= 22) { - struct format_info info; - spa_zero(info); - info.encoding = ENCODING_PCM; - message_put(reply, - TAG_FORMAT_INFO, &info, /* source_output format */ - TAG_INVALID); - } - - stream->create_tag = SPA_ID_INVALID; - - return client_queue_message(client, reply); -} - static void stream_control_info(void *data, uint32_t id, const struct pw_stream_control *control) { @@ -996,6 +1012,7 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * stream->rate = stream->ss.rate; if (stream->create_tag != SPA_ID_INVALID) { + struct pw_manager_object *peer; stream->id = pw_stream_get_node_id(stream->stream); if (stream->volume_set) { @@ -1010,11 +1027,12 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * if (stream->corked) pw_stream_set_active(stream->stream, false); - if (stream->direction == PW_DIRECTION_OUTPUT) { - reply_create_playback_stream(stream); - } else { - reply_create_record_stream(stream); - } + /* if peer exists, reply immediately, otherwise reply when the link is created */ + peer = find_linked(stream->client->manager, stream->id, stream->direction); + if (peer) + reply_create_stream(stream, peer); + else + spa_list_append(&stream->client->pending_streams, &stream->link); } params[n_params++] = get_buffers_param(stream, &stream->attr, &b); @@ -1456,6 +1474,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui if (stream == NULL) goto error_errno; + spa_list_init(&stream->link); stream->impl = impl; stream->client = client; stream->corked = corked; @@ -1701,6 +1720,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint if (stream == NULL) goto error_errno; + spa_list_init(&stream->link); stream->type = STREAM_TYPE_RECORD; stream->direction = PW_DIRECTION_INPUT; stream->impl = impl; @@ -1966,6 +1986,7 @@ static int do_create_upload_stream(struct client *client, uint32_t command, uint if (stream == NULL) goto error_errno; + spa_list_init(&stream->link); stream->type = STREAM_TYPE_UPLOAD; stream->direction = PW_DIRECTION_OUTPUT; stream->impl = impl; diff --git a/src/modules/module-protocol-pulse/server.c b/src/modules/module-protocol-pulse/server.c index 813930fdd..3fe73ffe1 100644 --- a/src/modules/module-protocol-pulse/server.c +++ b/src/modules/module-protocol-pulse/server.c @@ -391,6 +391,7 @@ on_connect(void *data, int fd, uint32_t mask) spa_list_init(&client->out_messages); spa_list_init(&client->operations); spa_list_init(&client->pending_samples); + spa_list_init(&client->pending_streams); pw_log_debug("server %p: new client %p fd:%d", server, client, client_fd); diff --git a/src/modules/module-protocol-pulse/stream.c b/src/modules/module-protocol-pulse/stream.c index 2dae6ede3..f3c9fd977 100644 --- a/src/modules/module-protocol-pulse/stream.c +++ b/src/modules/module-protocol-pulse/stream.c @@ -50,6 +50,8 @@ void stream_free(struct stream *stream) pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel); + spa_list_remove(&stream->link); + if (stream->drain_tag) reply_error(client, -1, stream->drain_tag, -ENOENT); diff --git a/src/modules/module-protocol-pulse/stream.h b/src/modules/module-protocol-pulse/stream.h index 2b8f07077..e094ce402 100644 --- a/src/modules/module-protocol-pulse/stream.h +++ b/src/modules/module-protocol-pulse/stream.h @@ -48,6 +48,7 @@ struct buffer_attr { }; struct stream { + struct spa_list link; uint32_t create_tag; uint32_t channel; /* index in map */ uint32_t id; /* id of global */