pipewire-pulse: delay create stream reply if the peer is not found

When the client adapter is configured in passthrough mode, the stream param
changed event in pipewire-pulse is emitted before the session manager creates
the link, and not after. Therfore, the peer can never be found when replying
create stream, and the pulseaudio application receives a stream error.

This patch delays the create stream reply until the link is added if the peer
cannot be found, fixing the above race conditon to allow passthrough mode to
work with pulseaudio applications.
This commit is contained in:
Julian Bouzas 2021-08-26 15:53:33 -04:00 committed by Wim Taymans
parent 60480f422e
commit b136bb8ca5
5 changed files with 353 additions and 326 deletions

View file

@ -357,6 +357,329 @@ static void handle_metadata(struct client *client, struct pw_manager_object *old
}
}
static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss)
{
uint64_t u;
u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom;
u = (u + 1000000UL - 1) / 1000000UL;
u *= sample_spec_frame_size(ss);
return (uint32_t) u;
}
static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
uint32_t frame_size, max_prebuf, minreq;
struct defs *defs = &s->impl->defs;
frame_size = s->frame_size;
minreq = frac_to_bytes_round_up(defs->min_req, &s->ss);
if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
attr->maxlength = MAXLENGTH;
attr->maxlength -= attr->maxlength % frame_size;
attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
if (attr->tlength == (uint32_t) -1)
attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss);
if (attr->tlength > attr->maxlength)
attr->tlength = attr->maxlength;
attr->tlength -= attr->tlength % frame_size;
attr->tlength = SPA_MAX(attr->tlength, frame_size);
attr->tlength = SPA_MAX(attr->tlength, minreq);
if (attr->minreq == (uint32_t) -1) {
uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss);
/* With low-latency, tlength/4 gives a decent default in all of traditional,
* adjust latency and early request modes. */
uint32_t m = attr->tlength / 4;
m -= m % frame_size;
attr->minreq = SPA_MIN(process, m);
}
attr->minreq = SPA_MAX(attr->minreq, minreq);
if (attr->tlength < attr->minreq+frame_size)
attr->tlength = attr->minreq + frame_size;
attr->minreq -= attr->minreq % frame_size;
if (attr->minreq <= 0) {
attr->minreq = frame_size;
attr->tlength += frame_size*2;
}
if (attr->tlength <= attr->minreq)
attr->tlength = attr->minreq*2 + frame_size;
max_prebuf = attr->tlength + frame_size - attr->minreq;
if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf)
attr->prebuf = max_prebuf;
attr->prebuf -= attr->prebuf % frame_size;
s->missing = attr->tlength;
attr->fragsize = 0;
pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s,
s->client->name, attr->maxlength, attr->tlength,
attr->minreq, attr->prebuf);
}
static int reply_create_playback_stream(struct stream *stream, struct pw_manager_object *peer)
{
struct client *client = stream->client;
struct message *reply;
uint32_t missing, peer_id;
struct spa_dict_item items[5];
char latency[32];
char attr_maxlength[32];
char attr_tlength[32];
char attr_prebuf[32];
char attr_minreq[32];
const char *peer_name;
struct spa_fraction lat;
uint64_t lat_usec;
struct defs *defs = &stream->impl->defs;
fix_playback_buffer_attr(stream, &stream->attr);
stream->buffer = calloc(1, stream->attr.maxlength);
if (stream->buffer == NULL)
return -errno;
spa_ringbuffer_init(&stream->ring);
if (stream->early_requests) {
lat.num = stream->attr.minreq;
} else if (stream->adjust_latency) {
if (stream->attr.tlength > stream->attr.minreq * 2)
lat.num = (stream->attr.tlength - stream->attr.minreq * 2) / 2;
else
lat.num = stream->attr.minreq;
} else {
if (stream->attr.tlength > stream->attr.minreq * 2)
lat.num = stream->attr.tlength - stream->attr.minreq * 2;
else
lat.num = stream->attr.minreq;
}
lat.denom = stream->ss.rate;
lat.num /= stream->frame_size;
if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num)
lat.num = (defs->min_quantum.num * lat.denom +
(defs->min_quantum.denom -1)) / defs->min_quantum.denom;
lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom);
snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength);
snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength);
snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf);
snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq);
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength);
items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength);
items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf);
items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq);
pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5));
missing = stream_pop_missing(stream);
pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s",
stream, client->name, stream->create_tag, missing, latency);
reply = reply_new(client, stream->create_tag);
message_put(reply,
TAG_U32, stream->channel, /* stream index/channel */
TAG_U32, stream->id, /* sink_input/stream index */
TAG_U32, missing, /* missing/requested bytes */
TAG_INVALID);
if (peer && pw_manager_object_is_sink(peer)) {
peer_id = peer->id;
peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
} else {
peer_id = SPA_ID_INVALID;
peer_name = NULL;
}
if (client->version >= 9) {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.tlength,
TAG_U32, stream->attr.prebuf,
TAG_U32, stream->attr.minreq,
TAG_INVALID);
}
if (client->version >= 12) {
message_put(reply,
TAG_SAMPLE_SPEC, &stream->ss,
TAG_CHANNEL_MAP, &stream->map,
TAG_U32, peer_id, /* sink index */
TAG_STRING, peer_name, /* sink name */
TAG_BOOLEAN, false, /* sink suspended state */
TAG_INVALID);
}
if (client->version >= 13) {
message_put(reply,
TAG_USEC, lat_usec, /* sink configured latency */
TAG_INVALID);
}
if (client->version >= 21) {
struct format_info info;
spa_zero(info);
info.encoding = ENCODING_PCM;
message_put(reply,
TAG_FORMAT_INFO, &info, /* sink_input format */
TAG_INVALID);
}
stream->create_tag = SPA_ID_INVALID;
return client_queue_message(client, reply);
}
static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
uint32_t frame_size, minfrag;
struct defs *defs = &s->impl->defs;
frame_size = s->frame_size;
if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
attr->maxlength = MAXLENGTH;
attr->maxlength -= attr->maxlength % frame_size;
attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss);
if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss);
attr->fragsize -= attr->fragsize % frame_size;
attr->fragsize = SPA_MAX(attr->fragsize, minfrag);
attr->fragsize = SPA_MAX(attr->fragsize, frame_size);
if (attr->fragsize > attr->maxlength)
attr->fragsize = attr->maxlength;
attr->tlength = attr->minreq = attr->prebuf = 0;
pw_log_info(NAME" %p: [%s] maxlength:%u fragsize:%u minfrag:%u", s,
s->client->name, attr->maxlength, attr->fragsize, minfrag);
}
static int reply_create_record_stream(struct stream *stream, struct pw_manager_object *peer)
{
struct client *client = stream->client;
struct pw_manager *manager = client->manager;
struct message *reply;
struct spa_dict_item items[3];
char latency[32], *tmp;
char attr_maxlength[32];
char attr_fragsize[32];
const char *peer_name, *name;
uint32_t peer_id;
struct spa_fraction lat;
uint64_t lat_usec;
struct defs *defs = &stream->impl->defs;
fix_record_buffer_attr(stream, &stream->attr);
stream->buffer = calloc(1, stream->attr.maxlength);
if (stream->buffer == NULL)
return -errno;
spa_ringbuffer_init(&stream->ring);
if (stream->early_requests) {
lat.num = stream->attr.fragsize;
} else if (stream->adjust_latency) {
lat.num = stream->attr.fragsize;
} else {
lat.num = stream->attr.fragsize;
}
lat.num /= stream->frame_size;
lat.denom = stream->ss.rate;
if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num)
lat.num = (defs->min_quantum.num * lat.denom +
(defs->min_quantum.denom -1)) / defs->min_quantum.denom;
lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom);
snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength);
snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize);
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength);
items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize);
pw_stream_update_properties(stream->stream,
&SPA_DICT_INIT(items, 3));
pw_log_info(NAME" %p: [%s] reply CREATE_RECORD_STREAM tag:%u latency:%s",
stream, client->name, stream->create_tag, latency);
reply = reply_new(client, stream->create_tag);
message_put(reply,
TAG_U32, stream->channel, /* stream index/channel */
TAG_U32, stream->id, /* source_output/stream index */
TAG_INVALID);
if (peer && pw_manager_object_is_sink_input(peer))
peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT);
if (peer && pw_manager_object_is_source_or_monitor(peer)) {
name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
if (!pw_manager_object_is_source(peer)) {
size_t len = (name ? strlen(name) : 5) + 10;
peer_id = peer->id | MONITOR_FLAG;
peer_name = tmp = alloca(len);
snprintf(tmp, len, "%s.monitor", name ? name : "sink");
} else {
peer_id = peer->id;
peer_name = name;
}
} else {
peer_id = SPA_ID_INVALID;
peer_name = NULL;
}
if (client->version >= 9) {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.fragsize,
TAG_INVALID);
}
if (client->version >= 12) {
message_put(reply,
TAG_SAMPLE_SPEC, &stream->ss,
TAG_CHANNEL_MAP, &stream->map,
TAG_U32, peer_id, /* source index */
TAG_STRING, peer_name, /* source name */
TAG_BOOLEAN, false, /* source suspended state */
TAG_INVALID);
}
if (client->version >= 13) {
message_put(reply,
TAG_USEC, lat_usec, /* source configured latency */
TAG_INVALID);
}
if (client->version >= 22) {
struct format_info info;
spa_zero(info);
info.encoding = ENCODING_PCM;
message_put(reply,
TAG_FORMAT_INFO, &info, /* source_output format */
TAG_INVALID);
}
stream->create_tag = SPA_ID_INVALID;
return client_queue_message(client, reply);
}
static int reply_create_stream(struct stream *stream, struct pw_manager_object *peer)
{
return stream->direction == PW_DIRECTION_OUTPUT ?
reply_create_playback_stream(stream, peer) :
reply_create_record_stream(stream, peer);
}
static void manager_added(void *data, struct pw_manager_object *o)
{
struct client *client = data;
@ -370,6 +693,20 @@ static void manager_added(void *data, struct pw_manager_object *o)
handle_metadata(client, NULL, o, str);
}
if (spa_streq(o->type, PW_TYPE_INTERFACE_Link)) {
struct stream *s;
struct pw_manager_object *peer = NULL;
spa_list_for_each(s, &client->pending_streams, link) {
peer = find_linked(s->client->manager, s->id, s->direction);
if (peer)
break;
}
if (peer) {
reply_create_stream(s, peer);
spa_list_remove(&s->link);
}
}
send_object_event(client, o, SUBSCRIPTION_EVENT_NEW);
/* Adding sinks etc. may also change defaults */
@ -559,327 +896,6 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s
return reply_simple_ack(client, tag);
}
static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss)
{
uint64_t u;
u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom;
u = (u + 1000000UL - 1) / 1000000UL;
u *= sample_spec_frame_size(ss);
return (uint32_t) u;
}
static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
uint32_t frame_size, max_prebuf, minreq;
struct defs *defs = &s->impl->defs;
frame_size = s->frame_size;
minreq = frac_to_bytes_round_up(defs->min_req, &s->ss);
if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
attr->maxlength = MAXLENGTH;
attr->maxlength -= attr->maxlength % frame_size;
attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
if (attr->tlength == (uint32_t) -1)
attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss);
if (attr->tlength > attr->maxlength)
attr->tlength = attr->maxlength;
attr->tlength -= attr->tlength % frame_size;
attr->tlength = SPA_MAX(attr->tlength, frame_size);
attr->tlength = SPA_MAX(attr->tlength, minreq);
if (attr->minreq == (uint32_t) -1) {
uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss);
/* With low-latency, tlength/4 gives a decent default in all of traditional,
* adjust latency and early request modes. */
uint32_t m = attr->tlength / 4;
m -= m % frame_size;
attr->minreq = SPA_MIN(process, m);
}
attr->minreq = SPA_MAX(attr->minreq, minreq);
if (attr->tlength < attr->minreq+frame_size)
attr->tlength = attr->minreq + frame_size;
attr->minreq -= attr->minreq % frame_size;
if (attr->minreq <= 0) {
attr->minreq = frame_size;
attr->tlength += frame_size*2;
}
if (attr->tlength <= attr->minreq)
attr->tlength = attr->minreq*2 + frame_size;
max_prebuf = attr->tlength + frame_size - attr->minreq;
if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf)
attr->prebuf = max_prebuf;
attr->prebuf -= attr->prebuf % frame_size;
s->missing = attr->tlength;
attr->fragsize = 0;
pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s,
s->client->name, attr->maxlength, attr->tlength,
attr->minreq, attr->prebuf);
}
static int reply_create_playback_stream(struct stream *stream)
{
struct client *client = stream->client;
struct pw_manager *manager = client->manager;
struct message *reply;
uint32_t missing, peer_id;
struct spa_dict_item items[5];
char latency[32];
char attr_maxlength[32];
char attr_tlength[32];
char attr_prebuf[32];
char attr_minreq[32];
struct pw_manager_object *peer;
const char *peer_name;
struct spa_fraction lat;
uint64_t lat_usec;
struct defs *defs = &stream->impl->defs;
fix_playback_buffer_attr(stream, &stream->attr);
stream->buffer = calloc(1, stream->attr.maxlength);
if (stream->buffer == NULL)
return -errno;
spa_ringbuffer_init(&stream->ring);
if (stream->early_requests) {
lat.num = stream->attr.minreq;
} else if (stream->adjust_latency) {
if (stream->attr.tlength > stream->attr.minreq * 2)
lat.num = (stream->attr.tlength - stream->attr.minreq * 2) / 2;
else
lat.num = stream->attr.minreq;
} else {
if (stream->attr.tlength > stream->attr.minreq * 2)
lat.num = stream->attr.tlength - stream->attr.minreq * 2;
else
lat.num = stream->attr.minreq;
}
lat.denom = stream->ss.rate;
lat.num /= stream->frame_size;
if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num)
lat.num = (defs->min_quantum.num * lat.denom +
(defs->min_quantum.denom -1)) / defs->min_quantum.denom;
lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom);
snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength);
snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength);
snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf);
snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq);
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength);
items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength);
items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf);
items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq);
pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5));
missing = stream_pop_missing(stream);
pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s",
stream, client->name, stream->create_tag, missing, latency);
reply = reply_new(client, stream->create_tag);
message_put(reply,
TAG_U32, stream->channel, /* stream index/channel */
TAG_U32, stream->id, /* sink_input/stream index */
TAG_U32, missing, /* missing/requested bytes */
TAG_INVALID);
peer = find_linked(manager, stream->id, stream->direction);
if (peer && pw_manager_object_is_sink(peer)) {
peer_id = peer->id;
peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
} else {
peer_id = SPA_ID_INVALID;
peer_name = NULL;
}
if (client->version >= 9) {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.tlength,
TAG_U32, stream->attr.prebuf,
TAG_U32, stream->attr.minreq,
TAG_INVALID);
}
if (client->version >= 12) {
message_put(reply,
TAG_SAMPLE_SPEC, &stream->ss,
TAG_CHANNEL_MAP, &stream->map,
TAG_U32, peer_id, /* sink index */
TAG_STRING, peer_name, /* sink name */
TAG_BOOLEAN, false, /* sink suspended state */
TAG_INVALID);
}
if (client->version >= 13) {
message_put(reply,
TAG_USEC, lat_usec, /* sink configured latency */
TAG_INVALID);
}
if (client->version >= 21) {
struct format_info info;
spa_zero(info);
info.encoding = ENCODING_PCM;
message_put(reply,
TAG_FORMAT_INFO, &info, /* sink_input format */
TAG_INVALID);
}
stream->create_tag = SPA_ID_INVALID;
return client_queue_message(client, reply);
}
static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
uint32_t frame_size, minfrag;
struct defs *defs = &s->impl->defs;
frame_size = s->frame_size;
if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
attr->maxlength = MAXLENGTH;
attr->maxlength -= attr->maxlength % frame_size;
attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss);
if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss);
attr->fragsize -= attr->fragsize % frame_size;
attr->fragsize = SPA_MAX(attr->fragsize, minfrag);
attr->fragsize = SPA_MAX(attr->fragsize, frame_size);
if (attr->fragsize > attr->maxlength)
attr->fragsize = attr->maxlength;
attr->tlength = attr->minreq = attr->prebuf = 0;
pw_log_info(NAME" %p: [%s] maxlength:%u fragsize:%u minfrag:%u", s,
s->client->name, attr->maxlength, attr->fragsize, minfrag);
}
static int reply_create_record_stream(struct stream *stream)
{
struct client *client = stream->client;
struct pw_manager *manager = client->manager;
struct message *reply;
struct spa_dict_item items[3];
char latency[32], *tmp;
char attr_maxlength[32];
char attr_fragsize[32];
struct pw_manager_object *peer;
const char *peer_name, *name;
uint32_t peer_id;
struct spa_fraction lat;
uint64_t lat_usec;
struct defs *defs = &stream->impl->defs;
fix_record_buffer_attr(stream, &stream->attr);
stream->buffer = calloc(1, stream->attr.maxlength);
if (stream->buffer == NULL)
return -errno;
spa_ringbuffer_init(&stream->ring);
if (stream->early_requests) {
lat.num = stream->attr.fragsize;
} else if (stream->adjust_latency) {
lat.num = stream->attr.fragsize;
} else {
lat.num = stream->attr.fragsize;
}
lat.num /= stream->frame_size;
lat.denom = stream->ss.rate;
if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num)
lat.num = (defs->min_quantum.num * lat.denom +
(defs->min_quantum.denom -1)) / defs->min_quantum.denom;
lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom);
snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength);
snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize);
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength);
items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize);
pw_stream_update_properties(stream->stream,
&SPA_DICT_INIT(items, 3));
pw_log_info(NAME" %p: [%s] reply CREATE_RECORD_STREAM tag:%u latency:%s",
stream, client->name, stream->create_tag, latency);
reply = reply_new(client, stream->create_tag);
message_put(reply,
TAG_U32, stream->channel, /* stream index/channel */
TAG_U32, stream->id, /* source_output/stream index */
TAG_INVALID);
peer = find_linked(manager, stream->id, stream->direction);
if (peer && pw_manager_object_is_sink_input(peer))
peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT);
if (peer && pw_manager_object_is_source_or_monitor(peer)) {
name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
if (!pw_manager_object_is_source(peer)) {
size_t len = (name ? strlen(name) : 5) + 10;
peer_id = peer->id | MONITOR_FLAG;
peer_name = tmp = alloca(len);
snprintf(tmp, len, "%s.monitor", name ? name : "sink");
} else {
peer_id = peer->id;
peer_name = name;
}
} else {
peer_id = SPA_ID_INVALID;
peer_name = NULL;
}
if (client->version >= 9) {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.fragsize,
TAG_INVALID);
}
if (client->version >= 12) {
message_put(reply,
TAG_SAMPLE_SPEC, &stream->ss,
TAG_CHANNEL_MAP, &stream->map,
TAG_U32, peer_id, /* source index */
TAG_STRING, peer_name, /* source name */
TAG_BOOLEAN, false, /* source suspended state */
TAG_INVALID);
}
if (client->version >= 13) {
message_put(reply,
TAG_USEC, lat_usec, /* source configured latency */
TAG_INVALID);
}
if (client->version >= 22) {
struct format_info info;
spa_zero(info);
info.encoding = ENCODING_PCM;
message_put(reply,
TAG_FORMAT_INFO, &info, /* source_output format */
TAG_INVALID);
}
stream->create_tag = SPA_ID_INVALID;
return client_queue_message(client, reply);
}
static void stream_control_info(void *data, uint32_t id,
const struct pw_stream_control *control)
{
@ -996,6 +1012,7 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
stream->rate = stream->ss.rate;
if (stream->create_tag != SPA_ID_INVALID) {
struct pw_manager_object *peer;
stream->id = pw_stream_get_node_id(stream->stream);
if (stream->volume_set) {
@ -1010,11 +1027,12 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
if (stream->corked)
pw_stream_set_active(stream->stream, false);
if (stream->direction == PW_DIRECTION_OUTPUT) {
reply_create_playback_stream(stream);
} else {
reply_create_record_stream(stream);
}
/* if peer exists, reply immediately, otherwise reply when the link is created */
peer = find_linked(stream->client->manager, stream->id, stream->direction);
if (peer)
reply_create_stream(stream, peer);
else
spa_list_append(&stream->client->pending_streams, &stream->link);
}
params[n_params++] = get_buffers_param(stream, &stream->attr, &b);
@ -1456,6 +1474,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
if (stream == NULL)
goto error_errno;
spa_list_init(&stream->link);
stream->impl = impl;
stream->client = client;
stream->corked = corked;
@ -1701,6 +1720,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
if (stream == NULL)
goto error_errno;
spa_list_init(&stream->link);
stream->type = STREAM_TYPE_RECORD;
stream->direction = PW_DIRECTION_INPUT;
stream->impl = impl;
@ -1966,6 +1986,7 @@ static int do_create_upload_stream(struct client *client, uint32_t command, uint
if (stream == NULL)
goto error_errno;
spa_list_init(&stream->link);
stream->type = STREAM_TYPE_UPLOAD;
stream->direction = PW_DIRECTION_OUTPUT;
stream->impl = impl;