mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-07 13:30:09 -05:00
link: avoid multiple concurrent negotiations
When multiple links are created at the same time for the same port, we get into a race where multiple links will try to set a format asynchronously and eventually break the links. Avoid this by marking the port as busy for as long as an async format or buffer is pending and avoid starting new link negotiation when one of the ports is busy. This problem was observed when ardour6 tries to link all device capture ports to its single monitor port.
This commit is contained in:
parent
2d88ad179a
commit
f0bc0d068e
2 changed files with 92 additions and 18 deletions
|
|
@ -54,6 +54,9 @@ struct impl {
|
||||||
|
|
||||||
struct pw_work_queue *work;
|
struct pw_work_queue *work;
|
||||||
|
|
||||||
|
uint32_t output_busy_id;
|
||||||
|
uint32_t input_busy_id;
|
||||||
|
|
||||||
struct spa_pod *format_filter;
|
struct spa_pod *format_filter;
|
||||||
struct pw_properties *properties;
|
struct pw_properties *properties;
|
||||||
|
|
||||||
|
|
@ -137,7 +140,15 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat
|
||||||
} else if (state == PW_LINK_STATE_INIT) {
|
} else if (state == PW_LINK_STATE_INIT) {
|
||||||
link->prepared = false;
|
link->prepared = false;
|
||||||
link->preparing = false;
|
link->preparing = false;
|
||||||
|
if (impl->output_busy_id != SPA_ID_INVALID) {
|
||||||
|
impl->output_busy_id = SPA_ID_INVALID;
|
||||||
|
link->output->busy_count--;
|
||||||
|
}
|
||||||
pw_work_queue_cancel(impl->work, link->output, SPA_ID_INVALID);
|
pw_work_queue_cancel(impl->work, link->output, SPA_ID_INVALID);
|
||||||
|
if (impl->input_busy_id != SPA_ID_INVALID) {
|
||||||
|
impl->input_busy_id = SPA_ID_INVALID;
|
||||||
|
link->input->busy_count--;
|
||||||
|
}
|
||||||
pw_work_queue_cancel(impl->work, link->input, SPA_ID_INVALID);
|
pw_work_queue_cancel(impl->work, link->input, SPA_ID_INVALID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -146,12 +157,24 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
|
||||||
{
|
{
|
||||||
struct pw_impl_port *port = obj;
|
struct pw_impl_port *port = obj;
|
||||||
struct pw_impl_link *this = data;
|
struct pw_impl_link *this = data;
|
||||||
|
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
|
||||||
|
|
||||||
pw_log_debug(NAME" %p: obj:%p port %p complete READY: %s", this, obj, port, spa_strerror(res));
|
if (id == impl->input_busy_id) {
|
||||||
|
impl->input_busy_id = SPA_ID_INVALID;
|
||||||
|
port->busy_count--;
|
||||||
|
} else if (id == impl->output_busy_id) {
|
||||||
|
impl->output_busy_id = SPA_ID_INVALID;
|
||||||
|
port->busy_count--;
|
||||||
|
} else if (id != SPA_ID_INVALID)
|
||||||
|
return;
|
||||||
|
|
||||||
|
pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port,
|
||||||
|
port->state, spa_strerror(res));
|
||||||
|
|
||||||
if (SPA_RESULT_IS_OK(res)) {
|
if (SPA_RESULT_IS_OK(res)) {
|
||||||
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY,
|
if (port->state < PW_IMPL_PORT_STATE_READY)
|
||||||
0, NULL);
|
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY,
|
||||||
|
0, NULL);
|
||||||
} else {
|
} else {
|
||||||
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR,
|
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR,
|
||||||
res, spa_aprintf("port error going to READY: %s", spa_strerror(res)));
|
res, spa_aprintf("port error going to READY: %s", spa_strerror(res)));
|
||||||
|
|
@ -165,13 +188,25 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
|
||||||
{
|
{
|
||||||
struct pw_impl_port *port = obj;
|
struct pw_impl_port *port = obj;
|
||||||
struct pw_impl_link *this = data;
|
struct pw_impl_link *this = data;
|
||||||
|
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
|
||||||
struct pw_impl_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix;
|
struct pw_impl_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix;
|
||||||
|
|
||||||
pw_log_debug(NAME" %p: obj:%p port %p complete PAUSED: %s", this, obj, port, spa_strerror(res));
|
if (id == impl->input_busy_id) {
|
||||||
|
impl->input_busy_id = SPA_ID_INVALID;
|
||||||
|
port->busy_count--;
|
||||||
|
} else if (id == impl->output_busy_id) {
|
||||||
|
impl->output_busy_id = SPA_ID_INVALID;
|
||||||
|
port->busy_count--;
|
||||||
|
} else if (id != SPA_ID_INVALID)
|
||||||
|
return;
|
||||||
|
|
||||||
|
pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port,
|
||||||
|
port->state, spa_strerror(res));
|
||||||
|
|
||||||
if (SPA_RESULT_IS_OK(res)) {
|
if (SPA_RESULT_IS_OK(res)) {
|
||||||
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED,
|
if (port->state < PW_IMPL_PORT_STATE_PAUSED)
|
||||||
0, NULL);
|
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED,
|
||||||
|
0, NULL);
|
||||||
mix->have_buffers = true;
|
mix->have_buffers = true;
|
||||||
} else {
|
} else {
|
||||||
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR,
|
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR,
|
||||||
|
|
@ -182,6 +217,14 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
|
||||||
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);
|
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void complete_sync(void *obj, void *data, int res, uint32_t id)
|
||||||
|
{
|
||||||
|
struct pw_impl_port *port = obj;
|
||||||
|
struct pw_impl_link *this = data;
|
||||||
|
pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port,
|
||||||
|
port->state, spa_strerror(res));
|
||||||
|
}
|
||||||
|
|
||||||
static int do_negotiate(struct pw_impl_link *this)
|
static int do_negotiate(struct pw_impl_link *this)
|
||||||
{
|
{
|
||||||
struct pw_context *context = this->context;
|
struct pw_context *context = this->context;
|
||||||
|
|
@ -311,11 +354,12 @@ static int do_negotiate(struct pw_impl_link *this)
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
if (SPA_RESULT_IS_ASYNC(res)) {
|
if (SPA_RESULT_IS_ASYNC(res)) {
|
||||||
res = spa_node_sync(output->node->node, res),
|
output->busy_count++;
|
||||||
pw_work_queue_add(impl->work, output, res,
|
res = spa_node_sync(output->node->node, res);
|
||||||
|
impl->output_busy_id = pw_work_queue_add(impl->work, output, res,
|
||||||
complete_ready, this);
|
complete_ready, this);
|
||||||
} else {
|
} else {
|
||||||
complete_ready(output, this, res, 0);
|
complete_ready(output, this, res, SPA_ID_INVALID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) {
|
if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) {
|
||||||
|
|
@ -329,13 +373,14 @@ static int do_negotiate(struct pw_impl_link *this)
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
if (SPA_RESULT_IS_ASYNC(res2)) {
|
if (SPA_RESULT_IS_ASYNC(res2)) {
|
||||||
res2 = spa_node_sync(input->node->node, res2),
|
input->busy_count++;
|
||||||
pw_work_queue_add(impl->work, input, res2,
|
res2 = spa_node_sync(input->node->node, res2);
|
||||||
|
impl->input_busy_id = pw_work_queue_add(impl->work, input, res2,
|
||||||
complete_ready, this);
|
complete_ready, this);
|
||||||
if (res == 0)
|
if (res == 0)
|
||||||
res = res2;
|
res = res2;
|
||||||
} else {
|
} else {
|
||||||
complete_ready(input, this, res2, 0);
|
complete_ready(input, this, res2, SPA_ID_INVALID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -472,13 +517,14 @@ static int do_allocation(struct pw_impl_link *this)
|
||||||
goto error_clear;
|
goto error_clear;
|
||||||
}
|
}
|
||||||
if (SPA_RESULT_IS_ASYNC(res)) {
|
if (SPA_RESULT_IS_ASYNC(res)) {
|
||||||
res = spa_node_sync(output->node->node, res),
|
output->busy_count++;
|
||||||
pw_work_queue_add(impl->work, output, res,
|
res = spa_node_sync(output->node->node, res);
|
||||||
|
impl->output_busy_id = pw_work_queue_add(impl->work, output, res,
|
||||||
complete_paused, this);
|
complete_paused, this);
|
||||||
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
|
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
complete_paused(output, this, res, 0);
|
complete_paused(output, this, res, SPA_ID_INVALID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -494,11 +540,12 @@ static int do_allocation(struct pw_impl_link *this)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SPA_RESULT_IS_ASYNC(res)) {
|
if (SPA_RESULT_IS_ASYNC(res)) {
|
||||||
res = spa_node_sync(input->node->node, res),
|
input->busy_count++;
|
||||||
pw_work_queue_add(impl->work, input, res,
|
res = spa_node_sync(input->node->node, res);
|
||||||
|
impl->input_busy_id = pw_work_queue_add(impl->work, input, res,
|
||||||
complete_paused, this);
|
complete_paused, this);
|
||||||
} else {
|
} else {
|
||||||
complete_paused(input, this, res, 0);
|
complete_paused(input, this, res, SPA_ID_INVALID);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
@ -615,6 +662,19 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
|
||||||
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);
|
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (output->busy_count > 0) {
|
||||||
|
pw_log_debug(NAME" %p: output port %p was busy", this, output);
|
||||||
|
res = spa_node_sync(output->node->node, 0);
|
||||||
|
pw_work_queue_add(impl->work, output, res, complete_sync, this);
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
else if (input->busy_count > 0) {
|
||||||
|
pw_log_debug(NAME" %p: input port %p was busy", this, input);
|
||||||
|
res = spa_node_sync(input->node->node, 0);
|
||||||
|
pw_work_queue_add(impl->work, input, res, complete_sync, this);
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
if ((res = do_negotiate(this)) != 0)
|
if ((res = do_negotiate(this)) != 0)
|
||||||
goto exit;
|
goto exit;
|
||||||
|
|
||||||
|
|
@ -638,6 +698,11 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port)
|
||||||
int res;
|
int res;
|
||||||
|
|
||||||
pw_log_debug(NAME" %p: remove input port %p", this, port);
|
pw_log_debug(NAME" %p: remove input port %p", this, port);
|
||||||
|
|
||||||
|
if (impl->input_busy_id != SPA_ID_INVALID) {
|
||||||
|
impl->input_busy_id = SPA_ID_INVALID;
|
||||||
|
port->busy_count--;
|
||||||
|
}
|
||||||
spa_hook_remove(&impl->input_port_listener);
|
spa_hook_remove(&impl->input_port_listener);
|
||||||
spa_hook_remove(&impl->input_node_listener);
|
spa_hook_remove(&impl->input_node_listener);
|
||||||
spa_hook_remove(&impl->input_global_listener);
|
spa_hook_remove(&impl->input_global_listener);
|
||||||
|
|
@ -660,6 +725,11 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port)
|
||||||
struct pw_impl_port_mix *mix = &this->rt.out_mix;
|
struct pw_impl_port_mix *mix = &this->rt.out_mix;
|
||||||
|
|
||||||
pw_log_debug(NAME" %p: remove output port %p", this, port);
|
pw_log_debug(NAME" %p: remove output port %p", this, port);
|
||||||
|
|
||||||
|
if (impl->output_busy_id != SPA_ID_INVALID) {
|
||||||
|
impl->output_busy_id = SPA_ID_INVALID;
|
||||||
|
port->busy_count--;
|
||||||
|
}
|
||||||
spa_hook_remove(&impl->output_port_listener);
|
spa_hook_remove(&impl->output_port_listener);
|
||||||
spa_hook_remove(&impl->output_node_listener);
|
spa_hook_remove(&impl->output_node_listener);
|
||||||
spa_hook_remove(&impl->output_global_listener);
|
spa_hook_remove(&impl->output_global_listener);
|
||||||
|
|
@ -1100,6 +1170,9 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
|
||||||
if (impl == NULL)
|
if (impl == NULL)
|
||||||
goto error_no_mem;
|
goto error_no_mem;
|
||||||
|
|
||||||
|
impl->input_busy_id = SPA_ID_INVALID;
|
||||||
|
impl->output_busy_id = SPA_ID_INVALID;
|
||||||
|
|
||||||
this = &impl->this;
|
this = &impl->this;
|
||||||
this->feedback = pw_impl_node_can_reach(input_node, output_node, 0);
|
this->feedback = pw_impl_node_can_reach(input_node, output_node, 0);
|
||||||
pw_properties_set(properties, PW_KEY_LINK_FEEDBACK, this->feedback ? "true" : NULL);
|
pw_properties_set(properties, PW_KEY_LINK_FEEDBACK, this->feedback ? "true" : NULL);
|
||||||
|
|
|
||||||
|
|
@ -836,6 +836,7 @@ struct pw_impl_port {
|
||||||
} rt; /**< data only accessed from the data thread */
|
} rt; /**< data only accessed from the data thread */
|
||||||
unsigned int added:1;
|
unsigned int added:1;
|
||||||
unsigned int destroying:1;
|
unsigned int destroying:1;
|
||||||
|
int busy_count;
|
||||||
|
|
||||||
struct spa_latency_info latency[2]; /**< latencies */
|
struct spa_latency_info latency[2]; /**< latencies */
|
||||||
unsigned int have_latency_param:1;
|
unsigned int have_latency_param:1;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue