From 694409443f91bfa0db0ec71169b10539408d0a2f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 25 Apr 2024 16:18:03 +0200 Subject: [PATCH] impl-node: avoid bitfield races Move the bits that are used in the realtime thread away from the bits from the main thread to avoid bitfield races. Move some fields in rt structs to make it explicit that they are only to be modified from the realtime threads. --- src/pipewire/impl-node.c | 22 +++++++++++----------- src/pipewire/impl-port.c | 24 +++++++++++++----------- src/pipewire/private.h | 12 ++++++++---- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index a49005497..5c4879107 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -163,7 +163,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, struct pw_impl_node *this = user_data; struct pw_impl_node *driver = this->driver_node; - if (!this->added) { + if (!this->rt.added) { uint64_t dummy; int res; @@ -172,7 +172,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, if (SPA_UNLIKELY(res != -EAGAIN && res != 0)) pw_log_warn("%p: read failed %m", this); - this->added = true; + this->rt.added = true; /* remote nodes have their source added in client-node instead */ if (!this->remote) spa_loop_add_source(loop, &this->source); @@ -186,11 +186,11 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_impl_node *this = user_data; - if (this->added) { + if (this->rt.added) { if (!this->remote) spa_loop_remove_source(loop, &this->source); remove_node(this); - this->added = false; + this->rt.added = false; } return 0; } @@ -269,7 +269,7 @@ static int start_node(struct pw_impl_node *this) return 0; pw_log_debug("%p: start node driving:%d driver:%d added:%d", this, - this->driving, this->driver, this->added); + this->driving, this->driver, this->rt.added); if (!(this->driving && this->driver)) { impl->pending_play = true; @@ -369,7 +369,7 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat switch (state) { case PW_NODE_STATE_RUNNING: pw_log_debug("%p: start node driving:%d driver:%d added:%d", node, - node->driving, node->driver, node->added); + node->driving, node->driver, node->rt.added); if (res >= 0) { pw_loop_invoke(node->data_loop, do_node_add, 1, NULL, 0, true, node); @@ -443,7 +443,7 @@ static int suspend_node(struct pw_impl_node *this) node_deactivate(this); pw_log_debug("%p: suspend node driving:%d driver:%d added:%d", this, - this->driving, this->driver, this->added); + this->driving, this->driver, this->rt.added); res = spa_node_send_command(this->node, &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Suspend)); @@ -842,7 +842,7 @@ do_move_nodes(struct spa_loop *loop, node->target_rate = node->rt.position->clock.target_rate; node->target_quantum = node->rt.position->clock.target_duration; - if (node->added) { + if (node->rt.added) { remove_node(node); add_node(node, driver); } @@ -1226,7 +1226,7 @@ static inline int process_node(void *data) if (SPA_UNLIKELY(!this->transport_sync)) a->pending_sync = false; - if (SPA_LIKELY(this->added)) { + if (SPA_LIKELY(this->rt.added)) { /* process input mixers */ spa_list_for_each(p, &this->rt.input_mix, rt.node_link) spa_node_process_fast(p->mix); @@ -1768,9 +1768,9 @@ static int node_ready(void *data, int status) uint64_t nsec; pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d added:%d", node, - node->driver, node->exported, driver, status, node->added); + node->driver, node->exported, driver, status, node->rt.added); - if (SPA_UNLIKELY(!node->added)) { + if (SPA_UNLIKELY(!node->rt.added)) { /* This can happen when we are stopping a node and removed it from the * graph but we still have not completed the Pause/Suspend command on * the node. In that case, the node might still emit ready events, diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index 25d3f3f09..201fb9871 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -29,7 +29,9 @@ PW_LOG_TOPIC_EXTERN(log_port); struct impl { struct pw_impl_port this; struct spa_node mix_node; /**< mix node implementation */ - struct spa_list mix_list; + struct { + struct spa_list mix_list; + } rt; struct spa_list param_list; struct spa_list pending_list; @@ -134,9 +136,9 @@ do_add_mix(struct spa_loop *loop, struct pw_impl_port *this = mix->p; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); pw_log_trace("%p: add mix %p", this, mix); - if (!mix->active) { - spa_list_append(&impl->mix_list, &mix->rt_link); - mix->active = true; + if (!mix->rt.active) { + spa_list_append(&impl->rt.mix_list, &mix->rt.link); + mix->rt.active = true; } return 0; } @@ -148,9 +150,9 @@ do_remove_mix(struct spa_loop *loop, struct pw_impl_port_mix *mix = user_data; struct pw_impl_port *this = mix->p; pw_log_trace("%p: remove mix %p", this, mix); - if (mix->active) { - spa_list_remove(&mix->rt_link); - mix->active = false; + if (mix->rt.active) { + spa_list_remove(&mix->rt.link); + mix->rt.active = false; } return 0; } @@ -189,7 +191,7 @@ static int tee_process(void *object) struct spa_io_buffers *io = &this->rt.io; pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id); - spa_list_for_each(mix, &impl->mix_list, rt_link) { + spa_list_for_each(mix, &impl->rt.mix_list, rt.link) { pw_log_trace_fp("%p: port %d %p->%p %d", this, mix->port.port_id, io, mix->io, mix->io->buffer_id); *mix->io = *io; @@ -226,7 +228,7 @@ static int schedule_mix_input(void *object) if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this))) return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; - spa_list_for_each(mix, &impl->mix_list, rt_link) { + spa_list_for_each(mix, &impl->rt.mix_list, rt.link) { pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this, mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id); *io = *mix->io; @@ -241,7 +243,7 @@ static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t bu struct impl *impl = object; struct pw_impl_port_mix *mix; - spa_list_for_each(mix, &impl->mix_list, rt_link) { + spa_list_for_each(mix, &impl->rt.mix_list, rt.link) { pw_log_trace_fp("%p: reuse buffer %d %d", impl, port_id, buffer_id); /* FIXME send reuse buffer to peer */ break; @@ -585,7 +587,7 @@ struct pw_impl_port *pw_context_create_port( spa_list_init(&impl->param_list); spa_list_init(&impl->pending_list); impl->cache_params = true; - spa_list_init(&impl->mix_list); + spa_list_init(&impl->rt.mix_list); this = &impl->this; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 187f78ea0..916b23428 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -670,7 +670,6 @@ struct pw_impl_node { unsigned int transport_sync:1; /**< supports transport sync */ unsigned int target_pending:1; /**< a quantum/rate update is pending */ unsigned int moved:1; /**< the node was moved drivers */ - unsigned int added:1; /**< the node was add to graph */ unsigned int pause_on_idle:1; /**< Pause processing when IDLE */ unsigned int suspend_on_idle:1; unsigned int need_resume:1; @@ -729,6 +728,8 @@ struct pw_impl_node { struct spa_list driver_link; /* our link in driver */ struct spa_ratelimit rate_limit; + + bool added; /**< the node was add to graph */ } rt; struct spa_fraction target_rate; uint64_t target_quantum; @@ -741,7 +742,6 @@ struct pw_impl_node { struct pw_impl_port_mix { struct spa_list link; - struct spa_list rt_link; struct pw_impl_port *p; struct { enum spa_direction direction; @@ -750,8 +750,12 @@ struct pw_impl_port_mix { struct spa_io_buffers *io; uint32_t id; uint32_t peer_id; - unsigned int have_buffers:1; - unsigned int active:1; + bool have_buffers; + + struct { + bool active; + struct spa_list link; + } rt; }; struct pw_impl_port_implementation {