From 0fe7f9765dd99aa768e4f7a418f4fa8cf7711ea7 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 10 Sep 2019 09:42:23 +0200 Subject: [PATCH] link: use the port as the work-queue object Use the port as the object/seq identifier of the defered work. This way we can handle feedback links between the same node and identify what port completed. --- src/pipewire/link.c | 47 +++++++++++++++++++-------------------- src/pipewire/work-queue.c | 4 ++-- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 3f6fb8258..3291061bf 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -162,11 +162,10 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state, static void complete_ready(void *obj, void *data, int res, uint32_t id) { + struct pw_port *port = obj; struct pw_link *this = data; - struct pw_port_mix *mix = obj == this->input->node ? &this->rt.in_mix : &this->rt.out_mix; - struct pw_port *port = mix->p; - pw_log_debug(NAME" %p: port %p complete READY: %s", this, port, spa_strerror(res)); + pw_log_debug(NAME" %p: obj:%p port %p complete READY: %s", this, obj, port, spa_strerror(res)); if (SPA_RESULT_IS_OK(res)) { pw_port_update_state(port, PW_PORT_STATE_READY, NULL); @@ -180,11 +179,11 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id) static void complete_paused(void *obj, void *data, int res, uint32_t id) { + struct pw_port *port = obj; struct pw_link *this = data; - struct pw_port_mix *mix = obj == this->input->node ? &this->rt.in_mix : &this->rt.out_mix; - struct pw_port *port = mix->p; + struct pw_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix; - pw_log_debug(NAME" %p: port %p: complete PAUSED: %s", this, port, spa_strerror(res)); + pw_log_debug(NAME" %p: obj:%p port %p complete PAUSED: %s", this, obj, port, spa_strerror(res)); if (SPA_RESULT_IS_OK(res)) { pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL); @@ -327,10 +326,10 @@ static int do_negotiate(struct pw_link *this) } if (SPA_RESULT_IS_ASYNC(res)) { res = spa_node_sync(output->node->node, res), - pw_work_queue_add(impl->work, output->node, res, + pw_work_queue_add(impl->work, output, res, complete_ready, this); } else { - complete_ready(output->node, this, res, 0); + complete_ready(output, this, res, 0); } } if (in_state == PW_PORT_STATE_CONFIGURE) { @@ -343,12 +342,12 @@ static int do_negotiate(struct pw_link *this) } if (SPA_RESULT_IS_ASYNC(res2)) { res2 = spa_node_sync(input->node->node, res2), - pw_work_queue_add(impl->work, input->node, res2, + pw_work_queue_add(impl->work, input, res2, complete_ready, this); if (res == 0) res = res2; } else { - complete_ready(input->node, this, res2, 0); + complete_ready(input, this, res2, 0); } } @@ -686,12 +685,12 @@ static int do_allocation(struct pw_link *this) if (SPA_RESULT_IS_ASYNC(res)) { res = spa_node_sync(output->node->node, res), - pw_work_queue_add(impl->work, output->node, res, + pw_work_queue_add(impl->work, output, res, complete_paused, this); if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) return 0; } else { - complete_paused(output->node, this, res, 0); + complete_paused(output, this, res, 0); } } @@ -708,10 +707,10 @@ static int do_allocation(struct pw_link *this) if (SPA_RESULT_IS_ASYNC(res)) { res = spa_node_sync(input->node->node, res), - pw_work_queue_add(impl->work, input->node, res, + pw_work_queue_add(impl->work, input, res, complete_paused, this); } else { - complete_paused(input->node, this, res, 0); + complete_paused(input, this, res, 0); } return 0; @@ -1085,30 +1084,30 @@ static const struct pw_port_events output_port_events = { .destroy = output_port_destroy, }; -static void node_result(struct impl *impl, struct pw_node *node, +static void node_result(struct impl *impl, struct pw_port *port, int seq, int res, uint32_t type, const void *result) { if (SPA_RESULT_IS_ASYNC(seq)) - pw_work_queue_complete(impl->work, node, SPA_RESULT_ASYNC_SEQ(seq), res); + pw_work_queue_complete(impl->work, port, SPA_RESULT_ASYNC_SEQ(seq), res); } static void input_node_result(void *data, int seq, int res, uint32_t type, const void *result) { struct impl *impl = data; - struct pw_node *node = impl->this.input->node; - pw_log_debug(NAME" %p: input node %p result seq:%d res:%d type:%u", - impl, node, seq, res, type); - node_result(impl, node, seq, res, type, result); + struct pw_port *port = impl->this.input; + pw_log_debug(NAME" %p: input port %p result seq:%d res:%d type:%u", + impl, port, seq, res, type); + node_result(impl, port, seq, res, type, result); } static void output_node_result(void *data, int seq, int res, uint32_t type, const void *result) { struct impl *impl = data; - struct pw_node *node = impl->this.output->node; - pw_log_debug(NAME" %p: output node %p result seq:%d res:%d type:%u", - impl, node, seq, res, type); + struct pw_port *port = impl->this.output; + pw_log_debug(NAME" %p: output port %p result seq:%d res:%d type:%u", + impl, port, seq, res, type); - node_result(impl, node, seq, res, type, result); + node_result(impl, port, seq, res, type, result); } static const struct pw_node_events input_node_events = { diff --git a/src/pipewire/work-queue.c b/src/pipewire/work-queue.c index d649f77af..c32fc3aea 100644 --- a/src/pipewire/work-queue.c +++ b/src/pipewire/work-queue.c @@ -247,8 +247,8 @@ int pw_work_queue_complete(struct pw_work_queue *queue, void *obj, uint32_t seq, spa_list_for_each(item, &queue->work_list, link) { if (item->obj == obj && item->seq == seq) { - pw_log_debug(NAME" %p: found defered %d for object %p", queue, seq, - obj); + pw_log_debug(NAME" %p: found defered %d for object %p res:%d", + queue, seq, obj, res); item->seq = SPA_ID_INVALID; item->res = res; have_work = true;