diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index d79cb934b..455b9e328 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -54,6 +54,9 @@ struct impl { struct pw_work_queue *work; + uint32_t output_busy_id; + uint32_t input_busy_id; + struct spa_pod *format_filter; struct pw_properties *properties; @@ -137,7 +140,15 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat } else if (state == PW_LINK_STATE_INIT) { link->prepared = false; link->preparing = false; + if (impl->output_busy_id != SPA_ID_INVALID) { + impl->output_busy_id = SPA_ID_INVALID; + link->output->busy_count--; + } pw_work_queue_cancel(impl->work, link->output, SPA_ID_INVALID); + if (impl->input_busy_id != SPA_ID_INVALID) { + impl->input_busy_id = SPA_ID_INVALID; + link->input->busy_count--; + } pw_work_queue_cancel(impl->work, link->input, SPA_ID_INVALID); } } @@ -146,12 +157,24 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id) { struct pw_impl_port *port = obj; struct pw_impl_link *this = data; + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - pw_log_debug(NAME" %p: obj:%p port %p complete READY: %s", this, obj, port, spa_strerror(res)); + if (id == impl->input_busy_id) { + impl->input_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id == impl->output_busy_id) { + impl->output_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id != SPA_ID_INVALID) + return; + + pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port, + port->state, spa_strerror(res)); if (SPA_RESULT_IS_OK(res)) { - pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, - 0, NULL); + if (port->state < PW_IMPL_PORT_STATE_READY) + pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, + 0, NULL); } else { pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR, res, spa_aprintf("port error going to READY: %s", spa_strerror(res))); @@ -165,13 +188,25 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) { struct pw_impl_port *port = obj; struct pw_impl_link *this = data; + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_impl_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix; - pw_log_debug(NAME" %p: obj:%p port %p complete PAUSED: %s", this, obj, port, spa_strerror(res)); + if (id == impl->input_busy_id) { + impl->input_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id == impl->output_busy_id) { + impl->output_busy_id = SPA_ID_INVALID; + port->busy_count--; + } else if (id != SPA_ID_INVALID) + return; + + pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port, + port->state, spa_strerror(res)); if (SPA_RESULT_IS_OK(res)) { - pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED, - 0, NULL); + if (port->state < PW_IMPL_PORT_STATE_PAUSED) + pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED, + 0, NULL); mix->have_buffers = true; } else { pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR, @@ -182,6 +217,14 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL); } +static void complete_sync(void *obj, void *data, int res, uint32_t id) +{ + struct pw_impl_port *port = obj; + struct pw_impl_link *this = data; + pw_log_debug(NAME" %p: obj:%p port %p complete state:%d: %s", this, obj, port, + port->state, spa_strerror(res)); +} + static int do_negotiate(struct pw_impl_link *this) { struct pw_context *context = this->context; @@ -311,11 +354,12 @@ static int do_negotiate(struct pw_impl_link *this) goto error; } if (SPA_RESULT_IS_ASYNC(res)) { - res = spa_node_sync(output->node->node, res), - pw_work_queue_add(impl->work, output, res, + output->busy_count++; + res = spa_node_sync(output->node->node, res); + impl->output_busy_id = pw_work_queue_add(impl->work, output, res, complete_ready, this); } else { - complete_ready(output, this, res, 0); + complete_ready(output, this, res, SPA_ID_INVALID); } } if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) { @@ -329,13 +373,14 @@ static int do_negotiate(struct pw_impl_link *this) goto error; } if (SPA_RESULT_IS_ASYNC(res2)) { - res2 = spa_node_sync(input->node->node, res2), - pw_work_queue_add(impl->work, input, res2, + input->busy_count++; + res2 = spa_node_sync(input->node->node, res2); + impl->input_busy_id = pw_work_queue_add(impl->work, input, res2, complete_ready, this); if (res == 0) res = res2; } else { - complete_ready(input, this, res2, 0); + complete_ready(input, this, res2, SPA_ID_INVALID); } } @@ -472,13 +517,14 @@ static int do_allocation(struct pw_impl_link *this) goto error_clear; } if (SPA_RESULT_IS_ASYNC(res)) { - res = spa_node_sync(output->node->node, res), - pw_work_queue_add(impl->work, output, res, + output->busy_count++; + res = spa_node_sync(output->node->node, res); + impl->output_busy_id = pw_work_queue_add(impl->work, output, res, complete_paused, this); if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) return 0; } else { - complete_paused(output, this, res, 0); + complete_paused(output, this, res, SPA_ID_INVALID); } } @@ -494,11 +540,12 @@ static int do_allocation(struct pw_impl_link *this) } if (SPA_RESULT_IS_ASYNC(res)) { - res = spa_node_sync(input->node->node, res), - pw_work_queue_add(impl->work, input, res, + input->busy_count++; + res = spa_node_sync(input->node->node, res); + impl->input_busy_id = pw_work_queue_add(impl->work, input, res, complete_paused, this); } else { - complete_paused(input, this, res, 0); + complete_paused(input, this, res, SPA_ID_INVALID); } return 0; @@ -615,6 +662,19 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id) link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL); } + if (output->busy_count > 0) { + pw_log_debug(NAME" %p: output port %p was busy", this, output); + res = spa_node_sync(output->node->node, 0); + pw_work_queue_add(impl->work, output, res, complete_sync, this); + goto exit; + } + else if (input->busy_count > 0) { + pw_log_debug(NAME" %p: input port %p was busy", this, input); + res = spa_node_sync(input->node->node, 0); + pw_work_queue_add(impl->work, input, res, complete_sync, this); + goto exit; + } + if ((res = do_negotiate(this)) != 0) goto exit; @@ -638,6 +698,11 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port) int res; pw_log_debug(NAME" %p: remove input port %p", this, port); + + if (impl->input_busy_id != SPA_ID_INVALID) { + impl->input_busy_id = SPA_ID_INVALID; + port->busy_count--; + } spa_hook_remove(&impl->input_port_listener); spa_hook_remove(&impl->input_node_listener); spa_hook_remove(&impl->input_global_listener); @@ -660,6 +725,11 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port) struct pw_impl_port_mix *mix = &this->rt.out_mix; pw_log_debug(NAME" %p: remove output port %p", this, port); + + if (impl->output_busy_id != SPA_ID_INVALID) { + impl->output_busy_id = SPA_ID_INVALID; + port->busy_count--; + } spa_hook_remove(&impl->output_port_listener); spa_hook_remove(&impl->output_node_listener); spa_hook_remove(&impl->output_global_listener); @@ -1100,6 +1170,9 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, if (impl == NULL) goto error_no_mem; + impl->input_busy_id = SPA_ID_INVALID; + impl->output_busy_id = SPA_ID_INVALID; + this = &impl->this; this->feedback = pw_impl_node_can_reach(input_node, output_node, 0); pw_properties_set(properties, PW_KEY_LINK_FEEDBACK, this->feedback ? "true" : NULL); diff --git a/src/pipewire/private.h b/src/pipewire/private.h index e7c9978fc..638cb3e97 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -836,6 +836,7 @@ struct pw_impl_port { } rt; /**< data only accessed from the data thread */ unsigned int added:1; unsigned int destroying:1; + int busy_count; struct spa_latency_info latency[2]; /**< latencies */ unsigned int have_latency_param:1;