diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index fc5d28bc2..52668e544 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -626,8 +626,8 @@ do_mix_set_io(struct spa_loop *loop, bool async, uint32_t seq, static inline void mix_set_io(struct mix *mix, void *data, size_t size) { struct io_info info = { .mix = mix, .data = data, .size = size }; - pw_data_loop_invoke(mix->port->client->loop, - do_mix_set_io, SPA_ID_INVALID, &info, sizeof(info), false, NULL); + pw_loop_locked(mix->port->client->loop->loop, + do_mix_set_io, SPA_ID_INVALID, &info, sizeof(info), NULL); } static void init_mix(struct mix *mix, uint32_t mix_id, struct port *port, uint32_t peer_id) @@ -2496,16 +2496,16 @@ static int client_node_command(void *data, const struct spa_command *command) case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Pause: if (c->started) { - pw_data_loop_invoke(c->loop, - do_unprepare_client, SPA_ID_INVALID, NULL, 0, false, c); + pw_loop_locked(c->loop->loop, + do_unprepare_client, SPA_ID_INVALID, NULL, 0, c); c->started = false; } break; case SPA_NODE_COMMAND_Start: if (!c->started) { - pw_data_loop_invoke(c->loop, - do_prepare_client, SPA_ID_INVALID, NULL, 0, false, c); + pw_loop_locked(c->loop->loop, + do_prepare_client, SPA_ID_INVALID, NULL, 0, c); c->started = true; } break; @@ -3279,8 +3279,8 @@ static int client_node_set_activation(void *data, link->trigger = link->activation->server_version < 1 ? trigger_link_v0 : trigger_link_v1; spa_list_append(&c->links, &link->link); - pw_data_loop_invoke(c->loop, - do_add_link, SPA_ID_INVALID, NULL, 0, false, link); + pw_loop_locked(c->loop->loop, + do_add_link, SPA_ID_INVALID, NULL, 0, link); } else { link = find_activation(&c->links, node_id); @@ -3290,8 +3290,8 @@ static int client_node_set_activation(void *data, } spa_list_remove(&link->link); - pw_data_loop_invoke(c->loop, - do_remove_link, SPA_ID_INVALID, NULL, 0, false, link); + pw_loop_locked(c->loop->loop, + do_remove_link, SPA_ID_INVALID, NULL, 0, link); queue_free_link(c, link); } diff --git a/spa/include/spa/support/loop.h b/spa/include/spa/support/loop.h index 1c842c7bd..97813bb14 100644 --- a/spa/include/spa/support/loop.h +++ b/spa/include/spa/support/loop.h @@ -133,6 +133,24 @@ struct spa_loop_methods { size_t size, bool block, void *user_data); + + /** Call a function with the loop lock acquired + * May be called from any thread and multiple threads at the same time. + * + * \param[in] object The callbacks data. + * \param func The function to be called. + * \param seq An opaque sequence number. This will be made + * available to func. + * \param[in] data Data that will be passed to func. + * \param size The size of data. + * \param user_data An opaque pointer passed to func. + * \return the return value of func. */ + int (*locked) (void *object, + spa_invoke_func_t func, + uint32_t seq, + const void *data, + size_t size, + void *user_data); }; SPA_API_LOOP int spa_loop_add_source(struct spa_loop *object, struct spa_source *source) @@ -158,6 +176,15 @@ SPA_API_LOOP int spa_loop_invoke(struct spa_loop *object, spa_loop, &object->iface, invoke, 0, func, seq, data, size, block, user_data); } +SPA_API_LOOP int spa_loop_locked(struct spa_loop *object, + spa_invoke_func_t func, uint32_t seq, const void *data, + size_t size, void *user_data) +{ + return spa_api_method_r(int, -ENOTSUP, + spa_loop, &object->iface, locked, 0, func, seq, data, + size, user_data); +} + /** Control hooks. These hooks can't be removed from their * callbacks and must be removed from a safe place (when the loop diff --git a/spa/plugins/alsa/alsa-compress-offload-sink.c b/spa/plugins/alsa/alsa-compress-offload-sink.c index d10d9c071..1477b5e4d 100644 --- a/spa/plugins/alsa/alsa-compress-offload-sink.c +++ b/spa/plugins/alsa/alsa-compress-offload-sink.c @@ -684,7 +684,7 @@ static void stop_driver_timer(struct impl *this) /* Perform the actual stop within * the dataloop to avoid data races. */ - spa_loop_invoke(this->data_loop, do_remove_driver_timer_source, 0, NULL, 0, true, this); + spa_loop_locked(this->data_loop, do_remove_driver_timer_source, 0, NULL, 0, this); } static void on_driver_timeout(struct spa_source *source) @@ -795,7 +795,7 @@ static void reevaluate_following_state(struct impl *this) if (following != this->following) { spa_log_debug(this->log, "%p: following state changed: %d->%d", this, this->following, following); this->following = following; - spa_loop_invoke(this->data_loop, do_reevaluate_following_state, 0, NULL, 0, true, this); + spa_loop_locked(this->data_loop, do_reevaluate_following_state, 0, NULL, 0, this); } } diff --git a/spa/plugins/alsa/alsa-pcm.c b/spa/plugins/alsa/alsa-pcm.c index bc5a0ae63..222a88d0b 100644 --- a/spa/plugins/alsa/alsa-pcm.c +++ b/spa/plugins/alsa/alsa-pcm.c @@ -3739,7 +3739,7 @@ int spa_alsa_start(struct state *state) } state->started = true; - spa_loop_invoke(state->data_loop, do_state_sync, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_state_sync, 0, NULL, 0, state); return 0; } @@ -3783,7 +3783,7 @@ int spa_alsa_reassign_follower(struct state *state) } setup_matching(state); if (state->started) - spa_loop_invoke(state->data_loop, do_state_sync, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_state_sync, 0, NULL, 0, state); else if (state->want_started) spa_alsa_start(state); @@ -3812,7 +3812,7 @@ int spa_alsa_pause(struct state *state) spa_log_debug(state->log, "%p: pause", state); state->started = false; - spa_loop_invoke(state->data_loop, do_state_sync, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_state_sync, 0, NULL, 0, state); spa_list_for_each(follower, &state->followers, driver_link) spa_alsa_pause(follower); diff --git a/spa/plugins/alsa/alsa-seq.c b/spa/plugins/alsa/alsa-seq.c index 9cb707814..86621a1d2 100644 --- a/spa/plugins/alsa/alsa-seq.c +++ b/spa/plugins/alsa/alsa-seq.c @@ -1176,7 +1176,7 @@ int spa_alsa_seq_reassign_follower(struct seq_state *state) if (following != state->following) { spa_log_debug(state->log, "alsa %p: reassign follower %d->%d", state, state->following, following); state->following = following; - spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_reassign_follower, 0, NULL, 0, state); } return 0; } @@ -1205,7 +1205,7 @@ int spa_alsa_seq_pause(struct seq_state *state) spa_log_debug(state->log, "alsa %p: pause", state); - spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_remove_source, 0, NULL, 0, state); if ((res = snd_seq_stop_queue(state->event.hndl, state->event.queue_id, NULL)) < 0) { spa_log_warn(state->log, "failed to stop queue: %s", snd_strerror(res)); diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index e5c0daf9f..04069a44a 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -1386,7 +1386,7 @@ static int load_filter_graph(struct impl *impl, const char *graph, int order) } res = setup_filter_graphs(impl); - spa_loop_invoke(impl->data_loop, do_sync_filter_graph, 0, NULL, 0, true, impl); + spa_loop_locked(impl->data_loop, do_sync_filter_graph, 0, NULL, 0, impl); if (impl->in_filter_props == 0) clean_filter_handles(impl, false); @@ -3283,7 +3283,7 @@ impl_node_port_set_io(void *object, case SPA_IO_Buffers: if (this->data_loop) { struct io_data d = { .port = port, .data = data, .size = size }; - spa_loop_invoke(this->data_loop, do_set_port_io, 0, NULL, 0, true, &d); + spa_loop_locked(this->data_loop, do_set_port_io, 0, NULL, 0, &d); } else port->io = data; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index ad1e8fdba..80e514490 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -744,8 +744,8 @@ impl_node_port_set_io(void *object, switch (id) { case SPA_IO_Buffers: case SPA_IO_AsyncBuffers: - spa_loop_invoke(this->data_loop, - do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); + spa_loop_locked(this->data_loop, + do_port_set_io, SPA_ID_INVALID, NULL, 0, &info); break; default: return -ENOENT; diff --git a/spa/plugins/audiomixer/mixer-dsp.c b/spa/plugins/audiomixer/mixer-dsp.c index fb396c14e..b4ee7d18d 100644 --- a/spa/plugins/audiomixer/mixer-dsp.c +++ b/spa/plugins/audiomixer/mixer-dsp.c @@ -680,8 +680,8 @@ impl_node_port_set_io(void *object, switch (id) { case SPA_IO_Buffers: case SPA_IO_AsyncBuffers: - spa_loop_invoke(this->data_loop, - do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); + spa_loop_locked(this->data_loop, + do_port_set_io, SPA_ID_INVALID, NULL, 0, &info); break; default: return -ENOENT; diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 273e4a5a3..6dc0db07f 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -1013,7 +1013,7 @@ static int impl_clear(struct spa_handle *handle) this = (struct impl *) handle; if (this->data_loop) - spa_loop_invoke(this->data_loop, do_remove_timer, 0, NULL, 0, true, this); + spa_loop_locked(this->data_loop, do_remove_timer, 0, NULL, 0, this); spa_system_close(this->data_system, this->timer_source.fd); return 0; diff --git a/spa/plugins/avb/avb-pcm.c b/spa/plugins/avb/avb-pcm.c index 3f608d7f4..59b05a3a5 100644 --- a/spa/plugins/avb/avb-pcm.c +++ b/spa/plugins/avb/avb-pcm.c @@ -1131,7 +1131,7 @@ int spa_avb_reassign_follower(struct state *state) if (following != state->following) { spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following); state->following = following; - spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_reassign_follower, 0, NULL, 0, state); } freewheel = state->position && @@ -1208,7 +1208,7 @@ int spa_avb_pause(struct state *state) spa_log_debug(state->log, "%p: pause", state); - spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state); + spa_loop_locked(state->data_loop, do_remove_source, 0, NULL, 0, state); state->started = false; diff --git a/spa/plugins/control/mixer.c b/spa/plugins/control/mixer.c index e97072d45..9085da5c5 100644 --- a/spa/plugins/control/mixer.c +++ b/spa/plugins/control/mixer.c @@ -605,8 +605,8 @@ impl_node_port_set_io(void *object, switch (id) { case SPA_IO_Buffers: case SPA_IO_AsyncBuffers: - spa_loop_invoke(this->data_loop, - do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); + spa_loop_locked(this->data_loop, + do_port_set_io, SPA_ID_INVALID, NULL, 0, &info); break; default: return -ENOENT; diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 290fc7e68..07e0bffff 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -195,7 +195,6 @@ static int remove_from_poll(struct impl *impl, struct spa_source *source) static int loop_remove_source(void *object, struct spa_source *source) { struct impl *impl = object; - spa_assert(!impl->polling); int res = remove_from_poll(impl, source); detach_source(source); @@ -553,6 +552,16 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, } return res; } +static int loop_locked(void *object, spa_invoke_func_t func, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct impl *impl = object; + int res; + pthread_mutex_lock(&impl->lock); + res = func(&impl->loop, false, seq, data, size, user_data); + pthread_mutex_unlock(&impl->lock); + return res; +} static int loop_get_fd(void *object) { @@ -1214,6 +1223,7 @@ static const struct spa_loop_methods impl_loop = { .update_source = loop_update_source, .remove_source = loop_remove_source, .invoke = loop_invoke, + .locked = loop_locked, }; static const struct spa_loop_control_methods impl_loop_control_cancel = { diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 134511b35..21f47538a 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -1239,12 +1239,11 @@ static void client_node_resource_destroy(void *data) spa_hook_remove(&impl->object_listener); if (impl->data_source.fd != -1) { - spa_loop_invoke(impl->data_loop, + spa_loop_locked(impl->data_loop, do_remove_source, SPA_ID_INVALID, NULL, 0, - true, &impl->data_source); } if (this->node) diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 4c1bb6870..6c6bfc74a 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -1449,7 +1449,7 @@ static void hook_removed(struct spa_hook *hook) { struct filter *impl = hook->priv; if (impl->data_loop) - pw_loop_invoke(impl->data_loop, do_remove_callbacks, 1, NULL, 0, true, impl); + pw_loop_locked(impl->data_loop, do_remove_callbacks, 1, NULL, 0, impl); else spa_zero(impl->rt_callbacks); hook->priv = NULL; @@ -2081,8 +2081,8 @@ SPA_EXPORT int pw_filter_flush(struct pw_filter *filter, bool drain) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - pw_loop_invoke(impl->data_loop, - drain ? do_drain : do_flush, 1, NULL, 0, true, impl); + pw_loop_locked(impl->data_loop, + drain ? do_drain : do_flush, 1, NULL, 0, impl); return 0; } diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index e5636d561..f37bfd411 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -209,7 +209,7 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq, static void add_node_to_graph(struct pw_impl_node *node) { - pw_loop_invoke(node->data_loop, do_node_prepare, 1, NULL, 0, true, node); + pw_loop_locked(node->data_loop, do_node_prepare, 1, NULL, 0, node); } /* called from the node data loop and undoes the changes done in do_node_prepare. */ @@ -248,7 +248,7 @@ do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq, static void remove_node_from_graph(struct pw_impl_node *node) { - pw_loop_invoke(node->data_loop, do_node_unprepare, 1, NULL, 0, true, node); + pw_loop_locked(node->data_loop, do_node_unprepare, 1, NULL, 0, node); } static void node_deactivate(struct pw_impl_node *this) @@ -810,8 +810,8 @@ int pw_impl_node_set_io(struct pw_impl_node *this, uint32_t id, void *data, size if (data != NULL && size < sizeof(struct spa_io_position)) return -EINVAL; pw_log_debug("%p: set position %p", this, data); - pw_loop_invoke(this->data_loop, - do_update_position, SPA_ID_INVALID, &data, sizeof(void*), true, this); + pw_loop_locked(this->data_loop, + do_update_position, SPA_ID_INVALID, &data, sizeof(void*), this); break; case SPA_IO_Clock: if (data != NULL && size < sizeof(struct spa_io_clock)) @@ -869,8 +869,8 @@ do_add_target(struct spa_loop *loop, SPA_EXPORT int pw_impl_node_add_target(struct pw_impl_node *node, struct pw_node_target *t) { - pw_loop_invoke(node->data_loop, - do_add_target, SPA_ID_INVALID, &node, sizeof(void *), true, t); + pw_loop_locked(node->data_loop, + do_add_target, SPA_ID_INVALID, &node, sizeof(void *), t); if (t->node) pw_impl_node_emit_peer_added(node, t->node); @@ -905,8 +905,8 @@ int pw_impl_node_remove_target(struct pw_impl_node *node, struct pw_node_target { /* we also update the target list for remote nodes so that the profiler * can inspect the nodes as well */ - pw_loop_invoke(node->data_loop, - do_remove_target, SPA_ID_INVALID, &node, sizeof(void *), true, t); + pw_loop_locked(node->data_loop, + do_remove_target, SPA_ID_INVALID, &node, sizeof(void *), t); if (t->node) pw_impl_node_emit_peer_removed(node, t->node); @@ -2359,8 +2359,8 @@ void pw_impl_node_add_rt_listener(struct pw_impl_node *node, void *data) { struct listener_data d = { .listener = listener, .events = events, .data = data }; - pw_loop_invoke(node->data_loop, - do_add_rt_listener, SPA_ID_INVALID, &d, sizeof(d), false, node); + pw_loop_locked(node->data_loop, + do_add_rt_listener, SPA_ID_INVALID, &d, sizeof(d), node); } static int do_remove_listener(struct spa_loop *loop, @@ -2375,8 +2375,8 @@ SPA_EXPORT void pw_impl_node_remove_rt_listener(struct pw_impl_node *node, struct spa_hook *listener) { - pw_loop_invoke(node->data_loop, - do_remove_listener, SPA_ID_INVALID, NULL, 0, true, listener); + pw_loop_locked(node->data_loop, + do_remove_listener, SPA_ID_INVALID, NULL, 0, listener); } /** Destroy a node diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index 4356604af..0bcff9f4c 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -241,8 +241,8 @@ static int port_set_io(void *object, case SPA_IO_Buffers: case SPA_IO_AsyncBuffers: if (data == NULL || size == 0) { - pw_loop_invoke(this->node->data_loop, - do_remove_mix, SPA_ID_INVALID, NULL, 0, true, mix); + pw_loop_locked(this->node->data_loop, + do_remove_mix, SPA_ID_INVALID, NULL, 0, mix); mix->io_data = mix->io[0] = mix->io[1] = NULL; } else if (data != NULL && size >= sizeof(struct spa_io_buffers)) { if (size >= sizeof(struct spa_io_async_buffers)) { @@ -253,8 +253,8 @@ static int port_set_io(void *object, } else { mix->io_data = mix->io[0] = mix->io[1] = data; } - pw_loop_invoke(this->node->data_loop, - do_add_mix, SPA_ID_INVALID, NULL, 0, false, mix); + pw_loop_locked(this->node->data_loop, + do_add_mix, SPA_ID_INVALID, NULL, 0, mix); } } return 0; @@ -1388,7 +1388,7 @@ static void pw_impl_port_remove(struct pw_impl_port *port) pw_log_debug("%p: remove", port); - pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port); + pw_loop_locked(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, port); if (SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_TO_REMOVE)) { if ((res = spa_node_remove_port(node->node, port->direction, port->port_id)) < 0) @@ -1817,7 +1817,7 @@ int pw_impl_port_set_param(struct pw_impl_port *port, uint32_t id, uint32_t flag pw_log_debug("%p: %d set param %d %p", port, port->state, id, param); if (id == SPA_PARAM_Format) { - pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port); + pw_loop_locked(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, port); spa_node_port_set_io(node->node, port->direction, port->port_id, SPA_IO_Buffers, NULL, 0); @@ -1903,7 +1903,7 @@ static int negotiate_mixer_buffers(struct pw_impl_port *port, uint32_t flags, port->direction, port->port_id, SPA_IO_Buffers, NULL, 0); - pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port); + pw_loop_locked(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, port); pw_buffers_clear(&port->mix_buffers); @@ -1943,7 +1943,7 @@ static int negotiate_mixer_buffers(struct pw_impl_port *port, uint32_t flags, pw_direction_reverse(port->direction), 0, SPA_IO_Buffers, &port->rt.io, sizeof(port->rt.io)); - pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port); + pw_loop_locked(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, port); } return res; } diff --git a/src/pipewire/loop.h b/src/pipewire/loop.h index 2190344c1..d4f9be076 100644 --- a/src/pipewire/loop.h +++ b/src/pipewire/loop.h @@ -64,6 +64,12 @@ PW_API_LOOP_IMPL int pw_loop_invoke(struct pw_loop *object, { return spa_loop_invoke(object->loop, func, seq, data, size, block, user_data); } +PW_API_LOOP_IMPL int pw_loop_locked(struct pw_loop *object, + spa_invoke_func_t func, uint32_t seq, const void *data, + size_t size, void *user_data) +{ + return spa_loop_locked(object->loop, func, seq, data, size, user_data); +} PW_API_LOOP_IMPL int pw_loop_get_fd(struct pw_loop *object) { diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 2ee63fae1..0feeabe50 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -1748,7 +1748,7 @@ static void hook_removed(struct spa_hook *hook) { struct stream *impl = hook->priv; if (impl->data_loop) - pw_loop_invoke(impl->data_loop, do_remove_callbacks, 1, NULL, 0, true, impl); + pw_loop_locked(impl->data_loop, do_remove_callbacks, 1, NULL, 0, impl); else spa_zero(impl->rt_callbacks); hook->priv = NULL; @@ -2590,8 +2590,8 @@ int pw_stream_flush(struct pw_stream *stream, bool drain) if (stream->node == NULL) return -EIO; - pw_loop_invoke(impl->data_loop, - drain ? do_drain : do_flush, 1, NULL, 0, true, impl); + pw_loop_locked(impl->data_loop, + drain ? do_drain : do_flush, 1, NULL, 0, impl); if (!drain) spa_node_send_command(stream->node->node,