From f0bc0d068ecaaea5482d2753985aabcca5733de1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 7 Sep 2021 12:20:36 +0200 Subject: [PATCH] link: avoid multiple concurrent negotiations When multiple links are created at the same time for the same port, we get into a race where multiple links will try to set a format asynchronously and eventually break the links. Avoid this by marking the port as busy for as long as an async format or buffer is pending and avoid starting new link negotiation when one of the ports is busy. This problem was observed when ardour6 tries to link all device capture ports to its single monitor port. --- src/pipewire/impl-link.c | 109 ++++++++++++++++++++++++++++++++------- src/pipewire/private.h | 1 + 2 files changed, 92 insertions(+), 18 deletions(-) diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index d79cb934b..455b9e328 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -54,6 +54,9 @@ struct impl { struct pw_work_queue *work; + uint32_t output_busy_id; + uint32_t input_busy_id; + struct spa_pod *format_filter; struct pw_properties *properties; @@ -137,7 +140,15 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat } else if (state == PW_LINK_STATE_INIT) { link->prepared = false; link->preparing = false; + if (impl->output_busy_id != SPA_ID_INVALID) { + impl->output_busy_id = SPA_ID_INVALID; + link->output->busy_count--; + } pw_work_queue_cancel(impl->work, link->output, SPA_ID_INVALID); + if (impl->input_busy_id != SPA_ID_INVALID) { + impl->input_busy_id = SPA_ID_INVALID; + link->input->busy_count--; + } pw_work_queue_cancel(impl->work, link->input, SPA_ID_INVALID); } } @@ -146,12 +157,24 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id) { struct pw_impl_port *port = obj; struct pw_impl_link *this = data; + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - pw_log_debug(NAME" %p: obj:%p port %p complete READY: %s", this, obj, port, spa_strerror(res)); + if (id == impl->input_busy_id) { + impl->input_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id == impl->output_busy_id) { + impl->output_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id != SPA_ID_INVALID) + return; + + pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port, + port->state, spa_strerror(res)); if (SPA_RESULT_IS_OK(res)) { - pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, - 0, NULL); + if (port->state < PW_IMPL_PORT_STATE_READY) + pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, + 0, NULL); } else { pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR, res, spa_aprintf("port error going to READY: %s", spa_strerror(res))); @@ -165,13 +188,25 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) { struct pw_impl_port *port = obj; struct pw_impl_link *this = data; + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_impl_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix; - pw_log_debug(NAME" %p: obj:%p port %p complete PAUSED: %s", this, obj, port, spa_strerror(res)); + if (id == impl->input_busy_id) { + impl->input_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id == impl->output_busy_id) { + impl->output_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id != SPA_ID_INVALID) + return; + + pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port, + port->state, spa_strerror(res)); if (SPA_RESULT_IS_OK(res)) { - pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED, - 0, NULL); + if (port->state < PW_IMPL_PORT_STATE_PAUSED) + pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED, + 0, NULL); mix->have_buffers = true; } else { pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR, @@ -182,6 +217,14 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL); } +static void complete_sync(void *obj, void *data, int res, uint32_t id) +{ + struct pw_impl_port *port = obj; + struct pw_impl_link *this = data; + pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port, + port->state, spa_strerror(res)); +} + static int do_negotiate(struct pw_impl_link *this) { struct pw_context *context = this->context; @@ -311,11 +354,12 @@ static int do_negotiate(struct pw_impl_link *this) goto error; } if (SPA_RESULT_IS_ASYNC(res)) { - res = spa_node_sync(output->node->node, res), - pw_work_queue_add(impl->work, output, res, + output->busy_count++; + res = spa_node_sync(output->node->node, res); + impl->output_busy_id = pw_work_queue_add(impl->work, output, res, complete_ready, this); } else { - complete_ready(output, this, res, 0); + complete_ready(output, this, res, SPA_ID_INVALID); } } if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) { @@ -329,13 +373,14 @@ static int do_negotiate(struct pw_impl_link *this) goto error; } if (SPA_RESULT_IS_ASYNC(res2)) { - res2 = spa_node_sync(input->node->node, res2), - pw_work_queue_add(impl->work, input, res2, + input->busy_count++; + res2 = spa_node_sync(input->node->node, res2); + impl->input_busy_id = pw_work_queue_add(impl->work, input, res2, complete_ready, this); if (res == 0) res = res2; } else { - complete_ready(input, this, res2, 0); + complete_ready(input, this, res2, SPA_ID_INVALID); } } @@ -472,13 +517,14 @@ static int do_allocation(struct pw_impl_link *this) goto error_clear; } if (SPA_RESULT_IS_ASYNC(res)) { - res = spa_node_sync(output->node->node, res), - pw_work_queue_add(impl->work, output, res, + output->busy_count++; + res = spa_node_sync(output->node->node, res); + impl->output_busy_id = pw_work_queue_add(impl->work, output, res, complete_paused, this); if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) return 0; } else { - complete_paused(output, this, res, 0); + complete_paused(output, this, res, SPA_ID_INVALID); } } @@ -494,11 +540,12 @@ static int do_allocation(struct pw_impl_link *this) } if (SPA_RESULT_IS_ASYNC(res)) { - res = spa_node_sync(input->node->node, res), - pw_work_queue_add(impl->work, input, res, + input->busy_count++; + res = spa_node_sync(input->node->node, res); + impl->input_busy_id = pw_work_queue_add(impl->work, input, res, complete_paused, this); } else { - complete_paused(input, this, res, 0); + complete_paused(input, this, res, SPA_ID_INVALID); } return 0; @@ -615,6 +662,19 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id) link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL); } + if (output->busy_count > 0) { + pw_log_debug(NAME" %p: output port %p was busy", this, output); + res = spa_node_sync(output->node->node, 0); + pw_work_queue_add(impl->work, output, res, complete_sync, this); + goto exit; + } + else if (input->busy_count > 0) { + pw_log_debug(NAME" %p: input port %p was busy", this, input); + res = spa_node_sync(input->node->node, 0); + pw_work_queue_add(impl->work, input, res, complete_sync, this); + goto exit; + } + if ((res = do_negotiate(this)) != 0) goto exit; @@ -638,6 +698,11 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port) int res; pw_log_debug(NAME" %p: remove input port %p", this, port); + + if (impl->input_busy_id != SPA_ID_INVALID) { + impl->input_busy_id = SPA_ID_INVALID; + port->busy_count--; + } spa_hook_remove(&impl->input_port_listener); spa_hook_remove(&impl->input_node_listener); spa_hook_remove(&impl->input_global_listener); @@ -660,6 +725,11 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port) struct pw_impl_port_mix *mix = &this->rt.out_mix; pw_log_debug(NAME" %p: remove output port %p", this, port); + + if (impl->output_busy_id != SPA_ID_INVALID) { + impl->output_busy_id = SPA_ID_INVALID; + port->busy_count--; + } spa_hook_remove(&impl->output_port_listener); spa_hook_remove(&impl->output_node_listener); spa_hook_remove(&impl->output_global_listener); @@ -1100,6 +1170,9 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, if (impl == NULL) goto error_no_mem; + impl->input_busy_id = SPA_ID_INVALID; + impl->output_busy_id = SPA_ID_INVALID; + this = &impl->this; this->feedback = pw_impl_node_can_reach(input_node, output_node, 0); pw_properties_set(properties, PW_KEY_LINK_FEEDBACK, this->feedback ? "true" : NULL); diff --git a/src/pipewire/private.h b/src/pipewire/private.h index e7c9978fc..638cb3e97 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -836,6 +836,7 @@ struct pw_impl_port { } rt; /**< data only accessed from the data thread */ unsigned int added:1; unsigned int destroying:1; + int busy_count; struct spa_latency_info latency[2]; /**< latencies */ unsigned int have_latency_param:1;