From b2844201c2e1f40d3f4dc7d5298b40f8bc6a7001 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 8 May 2024 17:14:32 +0200 Subject: [PATCH] impl-node: update rt flags from rt threads Update the added and prepared flags from the rt thread. We also need to check if the node was prepared before we can schedule it. --- src/pipewire/impl-node.c | 71 +++++++++++++++++++++++++--------------- src/pipewire/private.h | 1 + 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index a2851f0f0..39e0477d3 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -88,7 +88,10 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq, uint64_t dummy; int res; - pw_log_trace("%p: prepare driver %p", this, driver); + pw_log_trace("%p: prepare driver %p %d", this, driver, this->rt.prepared); + + if (this->rt.prepared) + return 0; /* clear the eventfd in case it was written to while the node was stopped */ res = spa_system_eventfd_read(this->rt.target.system, this->source.fd, &dummy); @@ -96,14 +99,18 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq, pw_log_warn("%p: read failed %m", this); /* remote nodes have their source added in client-node instead */ - if (!this->remote) + if (!this->remote) { + pw_log_trace("%p %p %p", this, loop, this->source.loop); spa_loop_add_source(loop, &this->source); + } if (!this->exported) { /* trigger the driver when we complete */ copy_target(&this->rt.driver_target, &driver->rt.target); spa_list_append(&this->rt.target_list, &this->rt.driver_target.link); } + this->rt.prepared = true; + return 0; } @@ -114,15 +121,22 @@ do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq, { struct pw_impl_node *this = user_data; - pw_log_trace("%p: unprepare driver %s", this, this->rt.driver_target.name); + pw_log_trace("%p: unprepare driver %s %d", this, this->rt.driver_target.name, + this->rt.prepared); - if (!this->remote) + if (!this->rt.prepared) + return 0; + + if (!this->remote) { + pw_log_trace("%p %p %p", this, loop, this->source.loop); spa_loop_remove_source(loop, &this->source); + } if (!this->exported) { spa_list_remove(&this->rt.driver_target.link); spa_zero(this->rt.driver_target); } + this->rt.prepared = false; return 0; } @@ -145,8 +159,12 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, struct pw_node_activation_state *dstate, *nstate; struct pw_node_target *t; - pw_log_trace("%p: add to driver %p %p %p", this, driver, - driver->rt.target.activation, this->rt.target.activation); + pw_log_trace("%p: add to driver %p %p %p %d", this, driver, + driver->rt.target.activation, this->rt.target.activation, + this->rt.added); + + if (this->rt.added) + return 0; /* let the driver trigger us as part of the processing cycle */ spa_list_append(&driver->rt.target_list, &this->rt.target.link); @@ -168,6 +186,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, this, dstate, dstate->pending, dstate->required, nstate, nstate->pending, nstate->required); } + this->rt.added = true; return 0; } @@ -180,9 +199,13 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, struct pw_node_activation_state *dstate, *nstate; struct pw_node_target *t; - pw_log_trace("%p: remove from driver %s %p %p", + pw_log_trace("%p: remove from driver %s %p %p %d", this, this->rt.driver_target.name, - this->rt.driver_target.activation, this->rt.target.activation); + this->rt.driver_target.activation, this->rt.target.activation, + this->rt.added); + + if (!this->rt.added) + return 0; /* remove from driver */ spa_list_remove(&this->rt.target.link); @@ -203,6 +226,7 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, this, dstate, dstate->pending, dstate->required, nstate, nstate->pending, nstate->required); } + this->rt.added = false; return 0; } @@ -221,8 +245,7 @@ static void add_node_to_graph(struct pw_impl_node *node) { struct pw_impl_node *driver = node->driver_node; - if (node->rt.added) - return; + pw_log_debug("%p: driver:%p", node, driver); if (node->data_loop == driver->data_loop) { pw_loop_invoke(node->data_loop, do_node_prepare_add, 1, NULL, 0, true, node); @@ -231,7 +254,6 @@ static void add_node_to_graph(struct pw_impl_node *node) if (!node->exported) pw_loop_invoke(driver->data_loop, do_node_add, 1, NULL, 0, true, node); } - node->rt.added = true; } static int @@ -249,8 +271,7 @@ static void remove_node_from_graph(struct pw_impl_node *node) { struct pw_impl_node *driver = node->driver_node; - if (!node->rt.added) - return; + pw_log_debug("%p: driver:%p", node, driver); if (node->data_loop == driver->data_loop) { pw_loop_invoke(node->data_loop, do_node_remove_unprepare, 1, NULL, 0, true, node); @@ -259,7 +280,6 @@ static void remove_node_from_graph(struct pw_impl_node *node) pw_loop_invoke(driver->data_loop, do_node_remove, 1, NULL, 0, true, node); pw_loop_invoke(node->data_loop, do_node_unprepare, 1, NULL, 0, true, node); } - node->rt.added = false; } static int @@ -275,8 +295,7 @@ static void move_node_to_graph(struct pw_impl_node *node) { struct pw_impl_node *driver = node->driver_node; - if (!node->rt.added) - return; + pw_log_debug("%p: driver:%p", node, driver); if (node->data_loop == driver->data_loop) { pw_loop_invoke(node->data_loop, do_node_move, 1, NULL, 0, true, node); @@ -359,8 +378,8 @@ static int start_node(struct pw_impl_node *this) if (impl->pending_state >= PW_NODE_STATE_RUNNING) return 0; - pw_log_debug("%p: start node driving:%d driver:%d added:%d", this, - this->driving, this->driver, this->rt.added); + pw_log_debug("%p: start node driving:%d driver:%d prepared:%d", this, + this->driving, this->driver, this->rt.prepared); if (!(this->driving && this->driver)) { impl->pending_play = true; @@ -459,8 +478,8 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat switch (state) { case PW_NODE_STATE_RUNNING: - pw_log_debug("%p: start node driving:%d driver:%d added:%d", node, - node->driving, node->driver, node->rt.added); + pw_log_debug("%p: start node driving:%d driver:%d prepared:%d", node, + node->driving, node->driver, node->rt.prepared); if (res >= 0) { add_node_to_graph(node); @@ -533,8 +552,8 @@ static int suspend_node(struct pw_impl_node *this) node_deactivate(this); - pw_log_debug("%p: suspend node driving:%d driver:%d added:%d", this, - this->driving, this->driver, this->rt.added); + pw_log_debug("%p: suspend node driving:%d driver:%d prepared:%d", this, + this->driving, this->driver, this->rt.prepared); res = spa_node_send_command(this->node, &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Suspend)); @@ -1403,7 +1422,7 @@ static inline int process_node(void *data) if (SPA_UNLIKELY(!this->transport_sync)) a->pending_sync = false; - if (SPA_LIKELY(this->rt.added)) { + if (SPA_LIKELY(this->rt.prepared)) { /* process input mixers */ spa_list_for_each(p, &this->rt.input_mix, rt.node_link) spa_node_process_fast(p->mix); @@ -1948,10 +1967,10 @@ static int node_ready(void *data, int status) struct pw_impl_port *p; uint64_t nsec; - pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d added:%d", node, - node->driver, node->exported, driver, status, node->rt.added); + pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d prepared:%d", node, + node->driver, node->exported, driver, status, node->rt.prepared); - if (SPA_UNLIKELY(!node->rt.added)) { + if (SPA_UNLIKELY(!node->rt.prepared)) { /* This can happen when we are stopping a node and removed it from the * graph but we still have not completed the Pause/Suspend command on * the node. In that case, the node might still emit ready events, diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 9df1e6019..8a5b676dd 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -739,6 +739,7 @@ struct pw_impl_node { struct spa_ratelimit rate_limit; + bool prepared; /**< the node was added to loop */ bool added; /**< the node was added to driver */ } rt; struct spa_fraction target_rate;