diff --git a/src/modules/module-protocol-pulse/client.h b/src/modules/module-protocol-pulse/client.h index 88712eaa8..1b923d281 100644 --- a/src/modules/module-protocol-pulse/client.h +++ b/src/modules/module-protocol-pulse/client.h @@ -92,6 +92,8 @@ struct client { struct spa_list pending_samples; + struct spa_list pending_streams; + unsigned int disconnect:1; unsigned int disconnecting:1; unsigned int need_flush:1; diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index e88b58847..e07dc3290 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -357,6 +357,329 @@ static void handle_metadata(struct client *client, struct pw_manager_object *old } } +static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss) +{ + uint64_t u; + u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom; + u = (u + 1000000UL - 1) / 1000000UL; + u *= sample_spec_frame_size(ss); + return (uint32_t) u; +} + +static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) +{ + uint32_t frame_size, max_prebuf, minreq; + struct defs *defs = &s->impl->defs; + + frame_size = s->frame_size; + minreq = frac_to_bytes_round_up(defs->min_req, &s->ss); + + if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) + attr->maxlength = MAXLENGTH; + attr->maxlength -= attr->maxlength % frame_size; + attr->maxlength = SPA_MAX(attr->maxlength, frame_size); + + if (attr->tlength == (uint32_t) -1) + attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss); + if (attr->tlength > attr->maxlength) + attr->tlength = attr->maxlength; + attr->tlength -= attr->tlength % frame_size; + attr->tlength = SPA_MAX(attr->tlength, frame_size); + attr->tlength = SPA_MAX(attr->tlength, minreq); + + if (attr->minreq == (uint32_t) -1) { + uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss); + /* With low-latency, tlength/4 gives a decent default in all of traditional, + * adjust latency and early request modes. */ + uint32_t m = attr->tlength / 4; + m -= m % frame_size; + attr->minreq = SPA_MIN(process, m); + } + attr->minreq = SPA_MAX(attr->minreq, minreq); + + if (attr->tlength < attr->minreq+frame_size) + attr->tlength = attr->minreq + frame_size; + + attr->minreq -= attr->minreq % frame_size; + if (attr->minreq <= 0) { + attr->minreq = frame_size; + attr->tlength += frame_size*2; + } + if (attr->tlength <= attr->minreq) + attr->tlength = attr->minreq*2 + frame_size; + + max_prebuf = attr->tlength + frame_size - attr->minreq; + if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf) + attr->prebuf = max_prebuf; + attr->prebuf -= attr->prebuf % frame_size; + + s->missing = attr->tlength; + attr->fragsize = 0; + + pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s, + s->client->name, attr->maxlength, attr->tlength, + attr->minreq, attr->prebuf); +} + +static int reply_create_playback_stream(struct stream *stream, struct pw_manager_object *peer) +{ + struct client *client = stream->client; + struct message *reply; + uint32_t missing, peer_id; + struct spa_dict_item items[5]; + char latency[32]; + char attr_maxlength[32]; + char attr_tlength[32]; + char attr_prebuf[32]; + char attr_minreq[32]; + const char *peer_name; + struct spa_fraction lat; + uint64_t lat_usec; + struct defs *defs = &stream->impl->defs; + + fix_playback_buffer_attr(stream, &stream->attr); + + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + + if (stream->early_requests) { + lat.num = stream->attr.minreq; + } else if (stream->adjust_latency) { + if (stream->attr.tlength > stream->attr.minreq * 2) + lat.num = (stream->attr.tlength - stream->attr.minreq * 2) / 2; + else + lat.num = stream->attr.minreq; + } else { + if (stream->attr.tlength > stream->attr.minreq * 2) + lat.num = stream->attr.tlength - stream->attr.minreq * 2; + else + lat.num = stream->attr.minreq; + } + lat.denom = stream->ss.rate; + lat.num /= stream->frame_size; + if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) + lat.num = (defs->min_quantum.num * lat.denom + + (defs->min_quantum.denom -1)) / defs->min_quantum.denom; + lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; + + snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); + snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); + snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength); + snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf); + snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq); + + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); + items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); + items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength); + items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf); + items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq); + pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5)); + + missing = stream_pop_missing(stream); + + pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s", + stream, client->name, stream->create_tag, missing, latency); + + reply = reply_new(client, stream->create_tag); + message_put(reply, + TAG_U32, stream->channel, /* stream index/channel */ + TAG_U32, stream->id, /* sink_input/stream index */ + TAG_U32, missing, /* missing/requested bytes */ + TAG_INVALID); + + if (peer && pw_manager_object_is_sink(peer)) { + peer_id = peer->id; + peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); + } else { + peer_id = SPA_ID_INVALID; + peer_name = NULL; + } + + if (client->version >= 9) { + message_put(reply, + TAG_U32, stream->attr.maxlength, + TAG_U32, stream->attr.tlength, + TAG_U32, stream->attr.prebuf, + TAG_U32, stream->attr.minreq, + TAG_INVALID); + } + if (client->version >= 12) { + message_put(reply, + TAG_SAMPLE_SPEC, &stream->ss, + TAG_CHANNEL_MAP, &stream->map, + TAG_U32, peer_id, /* sink index */ + TAG_STRING, peer_name, /* sink name */ + TAG_BOOLEAN, false, /* sink suspended state */ + TAG_INVALID); + } + if (client->version >= 13) { + message_put(reply, + TAG_USEC, lat_usec, /* sink configured latency */ + TAG_INVALID); + } + if (client->version >= 21) { + struct format_info info; + spa_zero(info); + info.encoding = ENCODING_PCM; + message_put(reply, + TAG_FORMAT_INFO, &info, /* sink_input format */ + TAG_INVALID); + } + + stream->create_tag = SPA_ID_INVALID; + + return client_queue_message(client, reply); +} + +static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr) +{ + uint32_t frame_size, minfrag; + struct defs *defs = &s->impl->defs; + + frame_size = s->frame_size; + + if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) + attr->maxlength = MAXLENGTH; + attr->maxlength -= attr->maxlength % frame_size; + attr->maxlength = SPA_MAX(attr->maxlength, frame_size); + + minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss); + + if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) + attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss); + attr->fragsize -= attr->fragsize % frame_size; + attr->fragsize = SPA_MAX(attr->fragsize, minfrag); + attr->fragsize = SPA_MAX(attr->fragsize, frame_size); + + if (attr->fragsize > attr->maxlength) + attr->fragsize = attr->maxlength; + + attr->tlength = attr->minreq = attr->prebuf = 0; + + pw_log_info(NAME" %p: [%s] maxlength:%u fragsize:%u minfrag:%u", s, + s->client->name, attr->maxlength, attr->fragsize, minfrag); +} + +static int reply_create_record_stream(struct stream *stream, struct pw_manager_object *peer) +{ + struct client *client = stream->client; + struct pw_manager *manager = client->manager; + struct message *reply; + struct spa_dict_item items[3]; + char latency[32], *tmp; + char attr_maxlength[32]; + char attr_fragsize[32]; + const char *peer_name, *name; + uint32_t peer_id; + struct spa_fraction lat; + uint64_t lat_usec; + struct defs *defs = &stream->impl->defs; + + fix_record_buffer_attr(stream, &stream->attr); + + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + + if (stream->early_requests) { + lat.num = stream->attr.fragsize; + } else if (stream->adjust_latency) { + lat.num = stream->attr.fragsize; + } else { + lat.num = stream->attr.fragsize; + } + + lat.num /= stream->frame_size; + lat.denom = stream->ss.rate; + if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) + lat.num = (defs->min_quantum.num * lat.denom + + (defs->min_quantum.denom -1)) / defs->min_quantum.denom; + lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; + + snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); + + snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); + snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize); + + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); + items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); + items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize); + pw_stream_update_properties(stream->stream, + &SPA_DICT_INIT(items, 3)); + + pw_log_info(NAME" %p: [%s] reply CREATE_RECORD_STREAM tag:%u latency:%s", + stream, client->name, stream->create_tag, latency); + + reply = reply_new(client, stream->create_tag); + message_put(reply, + TAG_U32, stream->channel, /* stream index/channel */ + TAG_U32, stream->id, /* source_output/stream index */ + TAG_INVALID); + + if (peer && pw_manager_object_is_sink_input(peer)) + peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT); + if (peer && pw_manager_object_is_source_or_monitor(peer)) { + name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); + if (!pw_manager_object_is_source(peer)) { + size_t len = (name ? strlen(name) : 5) + 10; + peer_id = peer->id | MONITOR_FLAG; + peer_name = tmp = alloca(len); + snprintf(tmp, len, "%s.monitor", name ? name : "sink"); + } else { + peer_id = peer->id; + peer_name = name; + } + } else { + peer_id = SPA_ID_INVALID; + peer_name = NULL; + } + + if (client->version >= 9) { + message_put(reply, + TAG_U32, stream->attr.maxlength, + TAG_U32, stream->attr.fragsize, + TAG_INVALID); + } + if (client->version >= 12) { + message_put(reply, + TAG_SAMPLE_SPEC, &stream->ss, + TAG_CHANNEL_MAP, &stream->map, + TAG_U32, peer_id, /* source index */ + TAG_STRING, peer_name, /* source name */ + TAG_BOOLEAN, false, /* source suspended state */ + TAG_INVALID); + } + if (client->version >= 13) { + message_put(reply, + TAG_USEC, lat_usec, /* source configured latency */ + TAG_INVALID); + } + if (client->version >= 22) { + struct format_info info; + spa_zero(info); + info.encoding = ENCODING_PCM; + message_put(reply, + TAG_FORMAT_INFO, &info, /* source_output format */ + TAG_INVALID); + } + + stream->create_tag = SPA_ID_INVALID; + + return client_queue_message(client, reply); +} + +static int reply_create_stream(struct stream *stream, struct pw_manager_object *peer) +{ + return stream->direction == PW_DIRECTION_OUTPUT ? + reply_create_playback_stream(stream, peer) : + reply_create_record_stream(stream, peer); +} + static void manager_added(void *data, struct pw_manager_object *o) { struct client *client = data; @@ -370,6 +693,20 @@ static void manager_added(void *data, struct pw_manager_object *o) handle_metadata(client, NULL, o, str); } + if (spa_streq(o->type, PW_TYPE_INTERFACE_Link)) { + struct stream *s; + struct pw_manager_object *peer = NULL; + spa_list_for_each(s, &client->pending_streams, link) { + peer = find_linked(s->client->manager, s->id, s->direction); + if (peer) + break; + } + if (peer) { + reply_create_stream(s, peer); + spa_list_remove(&s->link); + } + } + send_object_event(client, o, SUBSCRIPTION_EVENT_NEW); /* Adding sinks etc. may also change defaults */ @@ -559,327 +896,6 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s return reply_simple_ack(client, tag); } -static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss) -{ - uint64_t u; - u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom; - u = (u + 1000000UL - 1) / 1000000UL; - u *= sample_spec_frame_size(ss); - return (uint32_t) u; -} - -static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) -{ - uint32_t frame_size, max_prebuf, minreq; - struct defs *defs = &s->impl->defs; - - frame_size = s->frame_size; - minreq = frac_to_bytes_round_up(defs->min_req, &s->ss); - - if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) - attr->maxlength = MAXLENGTH; - attr->maxlength -= attr->maxlength % frame_size; - attr->maxlength = SPA_MAX(attr->maxlength, frame_size); - - if (attr->tlength == (uint32_t) -1) - attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss); - if (attr->tlength > attr->maxlength) - attr->tlength = attr->maxlength; - attr->tlength -= attr->tlength % frame_size; - attr->tlength = SPA_MAX(attr->tlength, frame_size); - attr->tlength = SPA_MAX(attr->tlength, minreq); - - if (attr->minreq == (uint32_t) -1) { - uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss); - /* With low-latency, tlength/4 gives a decent default in all of traditional, - * adjust latency and early request modes. */ - uint32_t m = attr->tlength / 4; - m -= m % frame_size; - attr->minreq = SPA_MIN(process, m); - } - attr->minreq = SPA_MAX(attr->minreq, minreq); - - if (attr->tlength < attr->minreq+frame_size) - attr->tlength = attr->minreq + frame_size; - - attr->minreq -= attr->minreq % frame_size; - if (attr->minreq <= 0) { - attr->minreq = frame_size; - attr->tlength += frame_size*2; - } - if (attr->tlength <= attr->minreq) - attr->tlength = attr->minreq*2 + frame_size; - - max_prebuf = attr->tlength + frame_size - attr->minreq; - if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf) - attr->prebuf = max_prebuf; - attr->prebuf -= attr->prebuf % frame_size; - - s->missing = attr->tlength; - attr->fragsize = 0; - - pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s, - s->client->name, attr->maxlength, attr->tlength, - attr->minreq, attr->prebuf); -} - -static int reply_create_playback_stream(struct stream *stream) -{ - struct client *client = stream->client; - struct pw_manager *manager = client->manager; - struct message *reply; - uint32_t missing, peer_id; - struct spa_dict_item items[5]; - char latency[32]; - char attr_maxlength[32]; - char attr_tlength[32]; - char attr_prebuf[32]; - char attr_minreq[32]; - struct pw_manager_object *peer; - const char *peer_name; - struct spa_fraction lat; - uint64_t lat_usec; - struct defs *defs = &stream->impl->defs; - - fix_playback_buffer_attr(stream, &stream->attr); - - stream->buffer = calloc(1, stream->attr.maxlength); - if (stream->buffer == NULL) - return -errno; - - spa_ringbuffer_init(&stream->ring); - - if (stream->early_requests) { - lat.num = stream->attr.minreq; - } else if (stream->adjust_latency) { - if (stream->attr.tlength > stream->attr.minreq * 2) - lat.num = (stream->attr.tlength - stream->attr.minreq * 2) / 2; - else - lat.num = stream->attr.minreq; - } else { - if (stream->attr.tlength > stream->attr.minreq * 2) - lat.num = stream->attr.tlength - stream->attr.minreq * 2; - else - lat.num = stream->attr.minreq; - } - lat.denom = stream->ss.rate; - lat.num /= stream->frame_size; - if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) - lat.num = (defs->min_quantum.num * lat.denom + - (defs->min_quantum.denom -1)) / defs->min_quantum.denom; - lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; - - snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); - snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); - snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength); - snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf); - snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq); - - items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); - items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); - items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength); - items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf); - items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq); - pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5)); - - missing = stream_pop_missing(stream); - - pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s", - stream, client->name, stream->create_tag, missing, latency); - - reply = reply_new(client, stream->create_tag); - message_put(reply, - TAG_U32, stream->channel, /* stream index/channel */ - TAG_U32, stream->id, /* sink_input/stream index */ - TAG_U32, missing, /* missing/requested bytes */ - TAG_INVALID); - - peer = find_linked(manager, stream->id, stream->direction); - if (peer && pw_manager_object_is_sink(peer)) { - peer_id = peer->id; - peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); - } else { - peer_id = SPA_ID_INVALID; - peer_name = NULL; - } - - if (client->version >= 9) { - message_put(reply, - TAG_U32, stream->attr.maxlength, - TAG_U32, stream->attr.tlength, - TAG_U32, stream->attr.prebuf, - TAG_U32, stream->attr.minreq, - TAG_INVALID); - } - if (client->version >= 12) { - message_put(reply, - TAG_SAMPLE_SPEC, &stream->ss, - TAG_CHANNEL_MAP, &stream->map, - TAG_U32, peer_id, /* sink index */ - TAG_STRING, peer_name, /* sink name */ - TAG_BOOLEAN, false, /* sink suspended state */ - TAG_INVALID); - } - if (client->version >= 13) { - message_put(reply, - TAG_USEC, lat_usec, /* sink configured latency */ - TAG_INVALID); - } - if (client->version >= 21) { - struct format_info info; - spa_zero(info); - info.encoding = ENCODING_PCM; - message_put(reply, - TAG_FORMAT_INFO, &info, /* sink_input format */ - TAG_INVALID); - } - - stream->create_tag = SPA_ID_INVALID; - - return client_queue_message(client, reply); -} - -static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr) -{ - uint32_t frame_size, minfrag; - struct defs *defs = &s->impl->defs; - - frame_size = s->frame_size; - - if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) - attr->maxlength = MAXLENGTH; - attr->maxlength -= attr->maxlength % frame_size; - attr->maxlength = SPA_MAX(attr->maxlength, frame_size); - - minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss); - - if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) - attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss); - attr->fragsize -= attr->fragsize % frame_size; - attr->fragsize = SPA_MAX(attr->fragsize, minfrag); - attr->fragsize = SPA_MAX(attr->fragsize, frame_size); - - if (attr->fragsize > attr->maxlength) - attr->fragsize = attr->maxlength; - - attr->tlength = attr->minreq = attr->prebuf = 0; - - pw_log_info(NAME" %p: [%s] maxlength:%u fragsize:%u minfrag:%u", s, - s->client->name, attr->maxlength, attr->fragsize, minfrag); -} - -static int reply_create_record_stream(struct stream *stream) -{ - struct client *client = stream->client; - struct pw_manager *manager = client->manager; - struct message *reply; - struct spa_dict_item items[3]; - char latency[32], *tmp; - char attr_maxlength[32]; - char attr_fragsize[32]; - struct pw_manager_object *peer; - const char *peer_name, *name; - uint32_t peer_id; - struct spa_fraction lat; - uint64_t lat_usec; - struct defs *defs = &stream->impl->defs; - - fix_record_buffer_attr(stream, &stream->attr); - - stream->buffer = calloc(1, stream->attr.maxlength); - if (stream->buffer == NULL) - return -errno; - - spa_ringbuffer_init(&stream->ring); - - if (stream->early_requests) { - lat.num = stream->attr.fragsize; - } else if (stream->adjust_latency) { - lat.num = stream->attr.fragsize; - } else { - lat.num = stream->attr.fragsize; - } - - lat.num /= stream->frame_size; - lat.denom = stream->ss.rate; - if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num) - lat.num = (defs->min_quantum.num * lat.denom + - (defs->min_quantum.denom -1)) / defs->min_quantum.denom; - lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; - - snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom); - - snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength); - snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize); - - items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); - items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength); - items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize); - pw_stream_update_properties(stream->stream, - &SPA_DICT_INIT(items, 3)); - - pw_log_info(NAME" %p: [%s] reply CREATE_RECORD_STREAM tag:%u latency:%s", - stream, client->name, stream->create_tag, latency); - - reply = reply_new(client, stream->create_tag); - message_put(reply, - TAG_U32, stream->channel, /* stream index/channel */ - TAG_U32, stream->id, /* source_output/stream index */ - TAG_INVALID); - - peer = find_linked(manager, stream->id, stream->direction); - if (peer && pw_manager_object_is_sink_input(peer)) - peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT); - if (peer && pw_manager_object_is_source_or_monitor(peer)) { - name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); - if (!pw_manager_object_is_source(peer)) { - size_t len = (name ? strlen(name) : 5) + 10; - peer_id = peer->id | MONITOR_FLAG; - peer_name = tmp = alloca(len); - snprintf(tmp, len, "%s.monitor", name ? name : "sink"); - } else { - peer_id = peer->id; - peer_name = name; - } - } else { - peer_id = SPA_ID_INVALID; - peer_name = NULL; - } - - if (client->version >= 9) { - message_put(reply, - TAG_U32, stream->attr.maxlength, - TAG_U32, stream->attr.fragsize, - TAG_INVALID); - } - if (client->version >= 12) { - message_put(reply, - TAG_SAMPLE_SPEC, &stream->ss, - TAG_CHANNEL_MAP, &stream->map, - TAG_U32, peer_id, /* source index */ - TAG_STRING, peer_name, /* source name */ - TAG_BOOLEAN, false, /* source suspended state */ - TAG_INVALID); - } - if (client->version >= 13) { - message_put(reply, - TAG_USEC, lat_usec, /* source configured latency */ - TAG_INVALID); - } - if (client->version >= 22) { - struct format_info info; - spa_zero(info); - info.encoding = ENCODING_PCM; - message_put(reply, - TAG_FORMAT_INFO, &info, /* source_output format */ - TAG_INVALID); - } - - stream->create_tag = SPA_ID_INVALID; - - return client_queue_message(client, reply); -} - static void stream_control_info(void *data, uint32_t id, const struct pw_stream_control *control) { @@ -996,6 +1012,7 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * stream->rate = stream->ss.rate; if (stream->create_tag != SPA_ID_INVALID) { + struct pw_manager_object *peer; stream->id = pw_stream_get_node_id(stream->stream); if (stream->volume_set) { @@ -1010,11 +1027,12 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * if (stream->corked) pw_stream_set_active(stream->stream, false); - if (stream->direction == PW_DIRECTION_OUTPUT) { - reply_create_playback_stream(stream); - } else { - reply_create_record_stream(stream); - } + /* if peer exists, reply immediately, otherwise reply when the link is created */ + peer = find_linked(stream->client->manager, stream->id, stream->direction); + if (peer) + reply_create_stream(stream, peer); + else + spa_list_append(&stream->client->pending_streams, &stream->link); } params[n_params++] = get_buffers_param(stream, &stream->attr, &b); @@ -1456,6 +1474,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui if (stream == NULL) goto error_errno; + spa_list_init(&stream->link); stream->impl = impl; stream->client = client; stream->corked = corked; @@ -1701,6 +1720,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint if (stream == NULL) goto error_errno; + spa_list_init(&stream->link); stream->type = STREAM_TYPE_RECORD; stream->direction = PW_DIRECTION_INPUT; stream->impl = impl; @@ -1966,6 +1986,7 @@ static int do_create_upload_stream(struct client *client, uint32_t command, uint if (stream == NULL) goto error_errno; + spa_list_init(&stream->link); stream->type = STREAM_TYPE_UPLOAD; stream->direction = PW_DIRECTION_OUTPUT; stream->impl = impl; diff --git a/src/modules/module-protocol-pulse/server.c b/src/modules/module-protocol-pulse/server.c index 813930fdd..3fe73ffe1 100644 --- a/src/modules/module-protocol-pulse/server.c +++ b/src/modules/module-protocol-pulse/server.c @@ -391,6 +391,7 @@ on_connect(void *data, int fd, uint32_t mask) spa_list_init(&client->out_messages); spa_list_init(&client->operations); spa_list_init(&client->pending_samples); + spa_list_init(&client->pending_streams); pw_log_debug("server %p: new client %p fd:%d", server, client, client_fd); diff --git a/src/modules/module-protocol-pulse/stream.c b/src/modules/module-protocol-pulse/stream.c index 2dae6ede3..f3c9fd977 100644 --- a/src/modules/module-protocol-pulse/stream.c +++ b/src/modules/module-protocol-pulse/stream.c @@ -50,6 +50,8 @@ void stream_free(struct stream *stream) pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel); + spa_list_remove(&stream->link); + if (stream->drain_tag) reply_error(client, -1, stream->drain_tag, -ENOENT); diff --git a/src/modules/module-protocol-pulse/stream.h b/src/modules/module-protocol-pulse/stream.h index 2b8f07077..e094ce402 100644 --- a/src/modules/module-protocol-pulse/stream.h +++ b/src/modules/module-protocol-pulse/stream.h @@ -48,6 +48,7 @@ struct buffer_attr { }; struct stream { + struct spa_list link; uint32_t create_tag; uint32_t channel; /* index in map */ uint32_t id; /* id of global */