From 28744fc5ed08c8ff5f6e43d3d053863011901beb Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 4 Oct 2021 19:36:45 +0200 Subject: [PATCH] work-queue: share one work queue for links and nodes --- src/pipewire/impl-link.c | 67 +++++++++++++++++++++++++-------------- src/pipewire/impl-node.c | 6 ++-- src/pipewire/work-queue.c | 2 +- 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index a2fad5abd..c970b4ffa 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -145,21 +145,26 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat impl->output_busy_id = SPA_ID_INVALID; link->output->busy_count--; } - pw_work_queue_cancel(impl->work, link->output, SPA_ID_INVALID); + pw_work_queue_cancel(impl->work, &link->output_link, SPA_ID_INVALID); if (impl->input_busy_id != SPA_ID_INVALID) { impl->input_busy_id = SPA_ID_INVALID; link->input->busy_count--; } - pw_work_queue_cancel(impl->work, link->input, SPA_ID_INVALID); + pw_work_queue_cancel(impl->work, &link->input_link, SPA_ID_INVALID); } } static void complete_ready(void *obj, void *data, int res, uint32_t id) { - struct pw_impl_port *port = obj; + struct pw_impl_port *port; struct pw_impl_link *this = data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + if (obj == &this->input_link) + port = this->input; + else + port = this->output; + if (id == impl->input_busy_id) { impl->input_busy_id = SPA_ID_INVALID; port->busy_count--; @@ -187,10 +192,18 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id) static void complete_paused(void *obj, void *data, int res, uint32_t id) { - struct pw_impl_port *port = obj; + struct pw_impl_port *port; struct pw_impl_link *this = data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - struct pw_impl_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix; + struct pw_impl_port_mix *mix; + + if (obj == &this->input_link) { + port = this->input; + mix = &this->rt.in_mix; + } else { + port = this->output; + mix = &this->rt.out_mix; + } if (id == impl->input_busy_id) { impl->input_busy_id = SPA_ID_INVALID; @@ -220,8 +233,14 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) static void complete_sync(void *obj, void *data, int res, uint32_t id) { - struct pw_impl_port *port = obj; + struct pw_impl_port *port; struct pw_impl_link *this = data; + + if (obj == &this->input_link) + port = this->input; + else + port = this->output; + pw_log_debug("%p: obj:%p port %p complete state:%d: %s", this, obj, port, port->state, spa_strerror(res)); } @@ -357,10 +376,10 @@ static int do_negotiate(struct pw_impl_link *this) if (SPA_RESULT_IS_ASYNC(res)) { output->busy_count++; res = spa_node_sync(output->node->node, res); - impl->output_busy_id = pw_work_queue_add(impl->work, output, res, + impl->output_busy_id = pw_work_queue_add(impl->work, &this->output_link, res, complete_ready, this); } else { - complete_ready(output, this, res, SPA_ID_INVALID); + complete_ready(&this->output_link, this, res, SPA_ID_INVALID); } } if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) { @@ -376,12 +395,12 @@ static int do_negotiate(struct pw_impl_link *this) if (SPA_RESULT_IS_ASYNC(res2)) { input->busy_count++; res2 = spa_node_sync(input->node->node, res2); - impl->input_busy_id = pw_work_queue_add(impl->work, input, res2, + impl->input_busy_id = pw_work_queue_add(impl->work, &this->input_link, res2, complete_ready, this); if (res == 0) res = res2; } else { - complete_ready(input, this, res2, SPA_ID_INVALID); + complete_ready(&this->input_link, this, res2, SPA_ID_INVALID); } } @@ -520,12 +539,12 @@ static int do_allocation(struct pw_impl_link *this) if (SPA_RESULT_IS_ASYNC(res)) { output->busy_count++; res = spa_node_sync(output->node->node, res); - impl->output_busy_id = pw_work_queue_add(impl->work, output, res, + impl->output_busy_id = pw_work_queue_add(impl->work, &this->output_link, res, complete_paused, this); if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) return 0; } else { - complete_paused(output, this, res, SPA_ID_INVALID); + complete_paused(&this->output_link, this, res, SPA_ID_INVALID); } } @@ -543,10 +562,10 @@ static int do_allocation(struct pw_impl_link *this) if (SPA_RESULT_IS_ASYNC(res)) { input->busy_count++; res = spa_node_sync(input->node->node, res); - impl->input_busy_id = pw_work_queue_add(impl->work, input, res, + impl->input_busy_id = pw_work_queue_add(impl->work, &this->input_link, res, complete_paused, this); } else { - complete_paused(input, this, res, SPA_ID_INVALID); + complete_paused(&this->input_link, this, res, SPA_ID_INVALID); } return 0; @@ -666,13 +685,13 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id) if (output->busy_count > 0) { pw_log_debug("%p: output port %p was busy", this, output); res = spa_node_sync(output->node->node, 0); - pw_work_queue_add(impl->work, output, res, complete_sync, this); + pw_work_queue_add(impl->work, &this->output_link, res, complete_sync, this); goto exit; } else if (input->busy_count > 0) { pw_log_debug("%p: input port %p was busy", this, input); res = spa_node_sync(input->node->node, 0); - pw_work_queue_add(impl->work, input, res, complete_sync, this); + pw_work_queue_add(impl->work, &this->input_link, res, complete_sync, this); goto exit; } @@ -717,6 +736,8 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port) pw_log_warn("%p: port %p clear error %s", this, port, spa_strerror(res)); } pw_impl_port_release_mix(port, mix); + + pw_work_queue_cancel(impl->work, &this->input_link, SPA_ID_INVALID); this->input = NULL; } @@ -743,6 +764,8 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port) /* we don't clear output buffers when the link goes away. They will get * cleared when the node goes to suspend */ pw_impl_port_release_mix(port, mix); + + pw_work_queue_cancel(impl->work, &this->output_link, SPA_ID_INVALID); this->output = NULL; } @@ -962,11 +985,11 @@ static const struct pw_impl_port_events output_port_events = { .latency_changed = output_port_latency_changed, }; -static void node_result(struct impl *impl, struct pw_impl_port *port, +static void node_result(struct impl *impl, void *obj, int seq, int res, uint32_t type, const void *result) { if (SPA_RESULT_IS_ASYNC(seq)) - pw_work_queue_complete(impl->work, port, SPA_RESULT_ASYNC_SEQ(seq), res); + pw_work_queue_complete(impl->work, obj, SPA_RESULT_ASYNC_SEQ(seq), res); } static void input_node_result(void *data, int seq, int res, uint32_t type, const void *result) @@ -975,7 +998,7 @@ static void input_node_result(void *data, int seq, int res, uint32_t type, const struct pw_impl_port *port = impl->this.input; pw_log_trace("%p: input port %p result seq:%d res:%d type:%u", impl, port, seq, res, type); - node_result(impl, port, seq, res, type, result); + node_result(impl, &impl->this.input_link, seq, res, type, result); } static void output_node_result(void *data, int seq, int res, uint32_t type, const void *result) @@ -984,7 +1007,7 @@ static void output_node_result(void *data, int seq, int res, uint32_t type, cons struct pw_impl_port *port = impl->this.output; pw_log_trace("%p: output port %p result seq:%d res:%d type:%u", impl, port, seq, res, type); - node_result(impl, port, seq, res, type, result); + node_result(impl, &impl->this.output_link, seq, res, type, result); } static void node_active_changed(void *data, bool active) @@ -1183,7 +1206,7 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, if (user_data_size > 0) this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void); - impl->work = pw_work_queue_new(context->main_loop); + impl->work = pw_context_get_work_queue(context); if (impl->work == NULL) goto error_work_queue; @@ -1411,8 +1434,6 @@ void pw_impl_link_destroy(struct pw_impl_link *link) spa_hook_list_clean(&link->listener_list); - pw_work_queue_destroy(impl->work); - pw_properties_free(link->properties); free(link->name); diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index ff67018f0..c44bdce35 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -1186,7 +1186,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, goto error_clean; } - impl->work = pw_work_queue_new(this->context->main_loop); + impl->work = pw_context_get_work_queue(this->context); if (impl->work == NULL) { res = -errno; goto error_clean; @@ -1785,14 +1785,14 @@ void pw_impl_node_destroy(struct pw_impl_node *node) pw_memblock_unref(node->activation); - pw_work_queue_destroy(impl->work); - pw_param_clear(&impl->param_list, SPA_ID_INVALID); pw_param_clear(&impl->pending_list, SPA_ID_INVALID); pw_map_clear(&node->input_port_map); pw_map_clear(&node->output_port_map); + pw_work_queue_cancel(impl->work, node, SPA_ID_INVALID); + pw_properties_free(node->properties); clear_info(node); diff --git a/src/pipewire/work-queue.c b/src/pipewire/work-queue.c index de36a4eb1..190d29f42 100644 --- a/src/pipewire/work-queue.c +++ b/src/pipewire/work-queue.c @@ -65,7 +65,7 @@ static void process_work_queue(void *data, uint64_t count) spa_list_for_each_safe(item, tmp, &this->work_list, link) { if (item->seq != SPA_ID_INVALID) { - pw_log_debug("%p: %d waiting for item %p seq:%d id:%u", this, + pw_log_debug("%p: n_queued:%d waiting for item %p seq:%d id:%u", this, this->n_queued, item->obj, item->seq, item->id); continue; }