loop: add method to run a function with the lock

Convert some _invoke to _locked
This commit is contained in:
Wim Taymans 2025-03-10 17:24:13 +01:00
parent fb49e0795c
commit f7fdafc203
18 changed files with 99 additions and 57 deletions

View file

@ -626,8 +626,8 @@ do_mix_set_io(struct spa_loop *loop, bool async, uint32_t seq,
static inline void mix_set_io(struct mix *mix, void *data, size_t size) static inline void mix_set_io(struct mix *mix, void *data, size_t size)
{ {
struct io_info info = { .mix = mix, .data = data, .size = size }; struct io_info info = { .mix = mix, .data = data, .size = size };
pw_data_loop_invoke(mix->port->client->loop, pw_loop_locked(mix->port->client->loop->loop,
do_mix_set_io, SPA_ID_INVALID, &info, sizeof(info), false, NULL); do_mix_set_io, SPA_ID_INVALID, &info, sizeof(info), NULL);
} }
static void init_mix(struct mix *mix, uint32_t mix_id, struct port *port, uint32_t peer_id) static void init_mix(struct mix *mix, uint32_t mix_id, struct port *port, uint32_t peer_id)
@ -2496,16 +2496,16 @@ static int client_node_command(void *data, const struct spa_command *command)
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
if (c->started) { if (c->started) {
pw_data_loop_invoke(c->loop, pw_loop_locked(c->loop->loop,
do_unprepare_client, SPA_ID_INVALID, NULL, 0, false, c); do_unprepare_client, SPA_ID_INVALID, NULL, 0, c);
c->started = false; c->started = false;
} }
break; break;
case SPA_NODE_COMMAND_Start: case SPA_NODE_COMMAND_Start:
if (!c->started) { if (!c->started) {
pw_data_loop_invoke(c->loop, pw_loop_locked(c->loop->loop,
do_prepare_client, SPA_ID_INVALID, NULL, 0, false, c); do_prepare_client, SPA_ID_INVALID, NULL, 0, c);
c->started = true; c->started = true;
} }
break; break;
@ -3279,8 +3279,8 @@ static int client_node_set_activation(void *data,
link->trigger = link->activation->server_version < 1 ? trigger_link_v0 : trigger_link_v1; link->trigger = link->activation->server_version < 1 ? trigger_link_v0 : trigger_link_v1;
spa_list_append(&c->links, &link->link); spa_list_append(&c->links, &link->link);
pw_data_loop_invoke(c->loop, pw_loop_locked(c->loop->loop,
do_add_link, SPA_ID_INVALID, NULL, 0, false, link); do_add_link, SPA_ID_INVALID, NULL, 0, link);
} }
else { else {
link = find_activation(&c->links, node_id); link = find_activation(&c->links, node_id);
@ -3290,8 +3290,8 @@ static int client_node_set_activation(void *data,
} }
spa_list_remove(&link->link); spa_list_remove(&link->link);
pw_data_loop_invoke(c->loop, pw_loop_locked(c->loop->loop,
do_remove_link, SPA_ID_INVALID, NULL, 0, false, link); do_remove_link, SPA_ID_INVALID, NULL, 0, link);
queue_free_link(c, link); queue_free_link(c, link);
} }

View file

@ -133,6 +133,24 @@ struct spa_loop_methods {
size_t size, size_t size,
bool block, bool block,
void *user_data); void *user_data);
/** Call a function with the loop lock acquired
* May be called from any thread and multiple threads at the same time.
*
* \param[in] object The callbacks data.
* \param func The function to be called.
* \param seq An opaque sequence number. This will be made
* available to func.
* \param[in] data Data that will be passed to func.
* \param size The size of data.
* \param user_data An opaque pointer passed to func.
* \return the return value of func. */
int (*locked) (void *object,
spa_invoke_func_t func,
uint32_t seq,
const void *data,
size_t size,
void *user_data);
}; };
SPA_API_LOOP int spa_loop_add_source(struct spa_loop *object, struct spa_source *source) SPA_API_LOOP int spa_loop_add_source(struct spa_loop *object, struct spa_source *source)
@ -158,6 +176,15 @@ SPA_API_LOOP int spa_loop_invoke(struct spa_loop *object,
spa_loop, &object->iface, invoke, 0, func, seq, data, spa_loop, &object->iface, invoke, 0, func, seq, data,
size, block, user_data); size, block, user_data);
} }
SPA_API_LOOP int spa_loop_locked(struct spa_loop *object,
spa_invoke_func_t func, uint32_t seq, const void *data,
size_t size, void *user_data)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop, &object->iface, locked, 0, func, seq, data,
size, user_data);
}
/** Control hooks. These hooks can't be removed from their /** Control hooks. These hooks can't be removed from their
* callbacks and must be removed from a safe place (when the loop * callbacks and must be removed from a safe place (when the loop

View file

@ -684,7 +684,7 @@ static void stop_driver_timer(struct impl *this)
/* Perform the actual stop within /* Perform the actual stop within
* the dataloop to avoid data races. */ * the dataloop to avoid data races. */
spa_loop_invoke(this->data_loop, do_remove_driver_timer_source, 0, NULL, 0, true, this); spa_loop_locked(this->data_loop, do_remove_driver_timer_source, 0, NULL, 0, this);
} }
static void on_driver_timeout(struct spa_source *source) static void on_driver_timeout(struct spa_source *source)
@ -795,7 +795,7 @@ static void reevaluate_following_state(struct impl *this)
if (following != this->following) { if (following != this->following) {
spa_log_debug(this->log, "%p: following state changed: %d->%d", this, this->following, following); spa_log_debug(this->log, "%p: following state changed: %d->%d", this, this->following, following);
this->following = following; this->following = following;
spa_loop_invoke(this->data_loop, do_reevaluate_following_state, 0, NULL, 0, true, this); spa_loop_locked(this->data_loop, do_reevaluate_following_state, 0, NULL, 0, this);
} }
} }

View file

@ -3739,7 +3739,7 @@ int spa_alsa_start(struct state *state)
} }
state->started = true; state->started = true;
spa_loop_invoke(state->data_loop, do_state_sync, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_state_sync, 0, NULL, 0, state);
return 0; return 0;
} }
@ -3783,7 +3783,7 @@ int spa_alsa_reassign_follower(struct state *state)
} }
setup_matching(state); setup_matching(state);
if (state->started) if (state->started)
spa_loop_invoke(state->data_loop, do_state_sync, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_state_sync, 0, NULL, 0, state);
else if (state->want_started) else if (state->want_started)
spa_alsa_start(state); spa_alsa_start(state);
@ -3812,7 +3812,7 @@ int spa_alsa_pause(struct state *state)
spa_log_debug(state->log, "%p: pause", state); spa_log_debug(state->log, "%p: pause", state);
state->started = false; state->started = false;
spa_loop_invoke(state->data_loop, do_state_sync, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_state_sync, 0, NULL, 0, state);
spa_list_for_each(follower, &state->followers, driver_link) spa_list_for_each(follower, &state->followers, driver_link)
spa_alsa_pause(follower); spa_alsa_pause(follower);

View file

@ -1176,7 +1176,7 @@ int spa_alsa_seq_reassign_follower(struct seq_state *state)
if (following != state->following) { if (following != state->following) {
spa_log_debug(state->log, "alsa %p: reassign follower %d->%d", state, state->following, following); spa_log_debug(state->log, "alsa %p: reassign follower %d->%d", state, state->following, following);
state->following = following; state->following = following;
spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_reassign_follower, 0, NULL, 0, state);
} }
return 0; return 0;
} }
@ -1205,7 +1205,7 @@ int spa_alsa_seq_pause(struct seq_state *state)
spa_log_debug(state->log, "alsa %p: pause", state); spa_log_debug(state->log, "alsa %p: pause", state);
spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_remove_source, 0, NULL, 0, state);
if ((res = snd_seq_stop_queue(state->event.hndl, state->event.queue_id, NULL)) < 0) { if ((res = snd_seq_stop_queue(state->event.hndl, state->event.queue_id, NULL)) < 0) {
spa_log_warn(state->log, "failed to stop queue: %s", snd_strerror(res)); spa_log_warn(state->log, "failed to stop queue: %s", snd_strerror(res));

View file

@ -1386,7 +1386,7 @@ static int load_filter_graph(struct impl *impl, const char *graph, int order)
} }
res = setup_filter_graphs(impl); res = setup_filter_graphs(impl);
spa_loop_invoke(impl->data_loop, do_sync_filter_graph, 0, NULL, 0, true, impl); spa_loop_locked(impl->data_loop, do_sync_filter_graph, 0, NULL, 0, impl);
if (impl->in_filter_props == 0) if (impl->in_filter_props == 0)
clean_filter_handles(impl, false); clean_filter_handles(impl, false);
@ -3283,7 +3283,7 @@ impl_node_port_set_io(void *object,
case SPA_IO_Buffers: case SPA_IO_Buffers:
if (this->data_loop) { if (this->data_loop) {
struct io_data d = { .port = port, .data = data, .size = size }; struct io_data d = { .port = port, .data = data, .size = size };
spa_loop_invoke(this->data_loop, do_set_port_io, 0, NULL, 0, true, &d); spa_loop_locked(this->data_loop, do_set_port_io, 0, NULL, 0, &d);
} }
else else
port->io = data; port->io = data;

View file

@ -744,8 +744,8 @@ impl_node_port_set_io(void *object,
switch (id) { switch (id) {
case SPA_IO_Buffers: case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers: case SPA_IO_AsyncBuffers:
spa_loop_invoke(this->data_loop, spa_loop_locked(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); do_port_set_io, SPA_ID_INVALID, NULL, 0, &info);
break; break;
default: default:
return -ENOENT; return -ENOENT;

View file

@ -680,8 +680,8 @@ impl_node_port_set_io(void *object,
switch (id) { switch (id) {
case SPA_IO_Buffers: case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers: case SPA_IO_AsyncBuffers:
spa_loop_invoke(this->data_loop, spa_loop_locked(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); do_port_set_io, SPA_ID_INVALID, NULL, 0, &info);
break; break;
default: default:
return -ENOENT; return -ENOENT;

View file

@ -1013,7 +1013,7 @@ static int impl_clear(struct spa_handle *handle)
this = (struct impl *) handle; this = (struct impl *) handle;
if (this->data_loop) if (this->data_loop)
spa_loop_invoke(this->data_loop, do_remove_timer, 0, NULL, 0, true, this); spa_loop_locked(this->data_loop, do_remove_timer, 0, NULL, 0, this);
spa_system_close(this->data_system, this->timer_source.fd); spa_system_close(this->data_system, this->timer_source.fd);
return 0; return 0;

View file

@ -1131,7 +1131,7 @@ int spa_avb_reassign_follower(struct state *state)
if (following != state->following) { if (following != state->following) {
spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following); spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following);
state->following = following; state->following = following;
spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_reassign_follower, 0, NULL, 0, state);
} }
freewheel = state->position && freewheel = state->position &&
@ -1208,7 +1208,7 @@ int spa_avb_pause(struct state *state)
spa_log_debug(state->log, "%p: pause", state); spa_log_debug(state->log, "%p: pause", state);
spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state); spa_loop_locked(state->data_loop, do_remove_source, 0, NULL, 0, state);
state->started = false; state->started = false;

View file

@ -605,8 +605,8 @@ impl_node_port_set_io(void *object,
switch (id) { switch (id) {
case SPA_IO_Buffers: case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers: case SPA_IO_AsyncBuffers:
spa_loop_invoke(this->data_loop, spa_loop_locked(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); do_port_set_io, SPA_ID_INVALID, NULL, 0, &info);
break; break;
default: default:
return -ENOENT; return -ENOENT;

View file

@ -195,7 +195,6 @@ static int remove_from_poll(struct impl *impl, struct spa_source *source)
static int loop_remove_source(void *object, struct spa_source *source) static int loop_remove_source(void *object, struct spa_source *source)
{ {
struct impl *impl = object; struct impl *impl = object;
spa_assert(!impl->polling);
int res = remove_from_poll(impl, source); int res = remove_from_poll(impl, source);
detach_source(source); detach_source(source);
@ -553,6 +552,16 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
} }
return res; return res;
} }
static int loop_locked(void *object, spa_invoke_func_t func, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct impl *impl = object;
int res;
pthread_mutex_lock(&impl->lock);
res = func(&impl->loop, false, seq, data, size, user_data);
pthread_mutex_unlock(&impl->lock);
return res;
}
static int loop_get_fd(void *object) static int loop_get_fd(void *object)
{ {
@ -1214,6 +1223,7 @@ static const struct spa_loop_methods impl_loop = {
.update_source = loop_update_source, .update_source = loop_update_source,
.remove_source = loop_remove_source, .remove_source = loop_remove_source,
.invoke = loop_invoke, .invoke = loop_invoke,
.locked = loop_locked,
}; };
static const struct spa_loop_control_methods impl_loop_control_cancel = { static const struct spa_loop_control_methods impl_loop_control_cancel = {

View file

@ -1239,12 +1239,11 @@ static void client_node_resource_destroy(void *data)
spa_hook_remove(&impl->object_listener); spa_hook_remove(&impl->object_listener);
if (impl->data_source.fd != -1) { if (impl->data_source.fd != -1) {
spa_loop_invoke(impl->data_loop, spa_loop_locked(impl->data_loop,
do_remove_source, do_remove_source,
SPA_ID_INVALID, SPA_ID_INVALID,
NULL, NULL,
0, 0,
true,
&impl->data_source); &impl->data_source);
} }
if (this->node) if (this->node)

View file

@ -1449,7 +1449,7 @@ static void hook_removed(struct spa_hook *hook)
{ {
struct filter *impl = hook->priv; struct filter *impl = hook->priv;
if (impl->data_loop) if (impl->data_loop)
pw_loop_invoke(impl->data_loop, do_remove_callbacks, 1, NULL, 0, true, impl); pw_loop_locked(impl->data_loop, do_remove_callbacks, 1, NULL, 0, impl);
else else
spa_zero(impl->rt_callbacks); spa_zero(impl->rt_callbacks);
hook->priv = NULL; hook->priv = NULL;
@ -2081,8 +2081,8 @@ SPA_EXPORT
int pw_filter_flush(struct pw_filter *filter, bool drain) int pw_filter_flush(struct pw_filter *filter, bool drain)
{ {
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
pw_loop_invoke(impl->data_loop, pw_loop_locked(impl->data_loop,
drain ? do_drain : do_flush, 1, NULL, 0, true, impl); drain ? do_drain : do_flush, 1, NULL, 0, impl);
return 0; return 0;
} }

View file

@ -209,7 +209,7 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq,
static void add_node_to_graph(struct pw_impl_node *node) static void add_node_to_graph(struct pw_impl_node *node)
{ {
pw_loop_invoke(node->data_loop, do_node_prepare, 1, NULL, 0, true, node); pw_loop_locked(node->data_loop, do_node_prepare, 1, NULL, 0, node);
} }
/* called from the node data loop and undoes the changes done in do_node_prepare. */ /* called from the node data loop and undoes the changes done in do_node_prepare. */
@ -248,7 +248,7 @@ do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq,
static void remove_node_from_graph(struct pw_impl_node *node) static void remove_node_from_graph(struct pw_impl_node *node)
{ {
pw_loop_invoke(node->data_loop, do_node_unprepare, 1, NULL, 0, true, node); pw_loop_locked(node->data_loop, do_node_unprepare, 1, NULL, 0, node);
} }
static void node_deactivate(struct pw_impl_node *this) static void node_deactivate(struct pw_impl_node *this)
@ -810,8 +810,8 @@ int pw_impl_node_set_io(struct pw_impl_node *this, uint32_t id, void *data, size
if (data != NULL && size < sizeof(struct spa_io_position)) if (data != NULL && size < sizeof(struct spa_io_position))
return -EINVAL; return -EINVAL;
pw_log_debug("%p: set position %p", this, data); pw_log_debug("%p: set position %p", this, data);
pw_loop_invoke(this->data_loop, pw_loop_locked(this->data_loop,
do_update_position, SPA_ID_INVALID, &data, sizeof(void*), true, this); do_update_position, SPA_ID_INVALID, &data, sizeof(void*), this);
break; break;
case SPA_IO_Clock: case SPA_IO_Clock:
if (data != NULL && size < sizeof(struct spa_io_clock)) if (data != NULL && size < sizeof(struct spa_io_clock))
@ -869,8 +869,8 @@ do_add_target(struct spa_loop *loop,
SPA_EXPORT SPA_EXPORT
int pw_impl_node_add_target(struct pw_impl_node *node, struct pw_node_target *t) int pw_impl_node_add_target(struct pw_impl_node *node, struct pw_node_target *t)
{ {
pw_loop_invoke(node->data_loop, pw_loop_locked(node->data_loop,
do_add_target, SPA_ID_INVALID, &node, sizeof(void *), true, t); do_add_target, SPA_ID_INVALID, &node, sizeof(void *), t);
if (t->node) if (t->node)
pw_impl_node_emit_peer_added(node, t->node); pw_impl_node_emit_peer_added(node, t->node);
@ -905,8 +905,8 @@ int pw_impl_node_remove_target(struct pw_impl_node *node, struct pw_node_target
{ {
/* we also update the target list for remote nodes so that the profiler /* we also update the target list for remote nodes so that the profiler
* can inspect the nodes as well */ * can inspect the nodes as well */
pw_loop_invoke(node->data_loop, pw_loop_locked(node->data_loop,
do_remove_target, SPA_ID_INVALID, &node, sizeof(void *), true, t); do_remove_target, SPA_ID_INVALID, &node, sizeof(void *), t);
if (t->node) if (t->node)
pw_impl_node_emit_peer_removed(node, t->node); pw_impl_node_emit_peer_removed(node, t->node);
@ -2359,8 +2359,8 @@ void pw_impl_node_add_rt_listener(struct pw_impl_node *node,
void *data) void *data)
{ {
struct listener_data d = { .listener = listener, .events = events, .data = data }; struct listener_data d = { .listener = listener, .events = events, .data = data };
pw_loop_invoke(node->data_loop, pw_loop_locked(node->data_loop,
do_add_rt_listener, SPA_ID_INVALID, &d, sizeof(d), false, node); do_add_rt_listener, SPA_ID_INVALID, &d, sizeof(d), node);
} }
static int do_remove_listener(struct spa_loop *loop, static int do_remove_listener(struct spa_loop *loop,
@ -2375,8 +2375,8 @@ SPA_EXPORT
void pw_impl_node_remove_rt_listener(struct pw_impl_node *node, void pw_impl_node_remove_rt_listener(struct pw_impl_node *node,
struct spa_hook *listener) struct spa_hook *listener)
{ {
pw_loop_invoke(node->data_loop, pw_loop_locked(node->data_loop,
do_remove_listener, SPA_ID_INVALID, NULL, 0, true, listener); do_remove_listener, SPA_ID_INVALID, NULL, 0, listener);
} }
/** Destroy a node /** Destroy a node

View file

@ -241,8 +241,8 @@ static int port_set_io(void *object,
case SPA_IO_Buffers: case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers: case SPA_IO_AsyncBuffers:
if (data == NULL || size == 0) { if (data == NULL || size == 0) {
pw_loop_invoke(this->node->data_loop, pw_loop_locked(this->node->data_loop,
do_remove_mix, SPA_ID_INVALID, NULL, 0, true, mix); do_remove_mix, SPA_ID_INVALID, NULL, 0, mix);
mix->io_data = mix->io[0] = mix->io[1] = NULL; mix->io_data = mix->io[0] = mix->io[1] = NULL;
} else if (data != NULL && size >= sizeof(struct spa_io_buffers)) { } else if (data != NULL && size >= sizeof(struct spa_io_buffers)) {
if (size >= sizeof(struct spa_io_async_buffers)) { if (size >= sizeof(struct spa_io_async_buffers)) {
@ -253,8 +253,8 @@ static int port_set_io(void *object,
} else { } else {
mix->io_data = mix->io[0] = mix->io[1] = data; mix->io_data = mix->io[0] = mix->io[1] = data;
} }
pw_loop_invoke(this->node->data_loop, pw_loop_locked(this->node->data_loop,
do_add_mix, SPA_ID_INVALID, NULL, 0, false, mix); do_add_mix, SPA_ID_INVALID, NULL, 0, mix);
} }
} }
return 0; return 0;
@ -1388,7 +1388,7 @@ static void pw_impl_port_remove(struct pw_impl_port *port)
pw_log_debug("%p: remove", port); pw_log_debug("%p: remove", port);
pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port); pw_loop_locked(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, port);
if (SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_TO_REMOVE)) { if (SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_TO_REMOVE)) {
if ((res = spa_node_remove_port(node->node, port->direction, port->port_id)) < 0) if ((res = spa_node_remove_port(node->node, port->direction, port->port_id)) < 0)
@ -1817,7 +1817,7 @@ int pw_impl_port_set_param(struct pw_impl_port *port, uint32_t id, uint32_t flag
pw_log_debug("%p: %d set param %d %p", port, port->state, id, param); pw_log_debug("%p: %d set param %d %p", port, port->state, id, param);
if (id == SPA_PARAM_Format) { if (id == SPA_PARAM_Format) {
pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port); pw_loop_locked(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, port);
spa_node_port_set_io(node->node, spa_node_port_set_io(node->node,
port->direction, port->port_id, port->direction, port->port_id,
SPA_IO_Buffers, NULL, 0); SPA_IO_Buffers, NULL, 0);
@ -1903,7 +1903,7 @@ static int negotiate_mixer_buffers(struct pw_impl_port *port, uint32_t flags,
port->direction, port->port_id, port->direction, port->port_id,
SPA_IO_Buffers, NULL, 0); SPA_IO_Buffers, NULL, 0);
pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port); pw_loop_locked(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, port);
pw_buffers_clear(&port->mix_buffers); pw_buffers_clear(&port->mix_buffers);
@ -1943,7 +1943,7 @@ static int negotiate_mixer_buffers(struct pw_impl_port *port, uint32_t flags,
pw_direction_reverse(port->direction), 0, pw_direction_reverse(port->direction), 0,
SPA_IO_Buffers, SPA_IO_Buffers,
&port->rt.io, sizeof(port->rt.io)); &port->rt.io, sizeof(port->rt.io));
pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port); pw_loop_locked(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, port);
} }
return res; return res;
} }

View file

@ -64,6 +64,12 @@ PW_API_LOOP_IMPL int pw_loop_invoke(struct pw_loop *object,
{ {
return spa_loop_invoke(object->loop, func, seq, data, size, block, user_data); return spa_loop_invoke(object->loop, func, seq, data, size, block, user_data);
} }
PW_API_LOOP_IMPL int pw_loop_locked(struct pw_loop *object,
spa_invoke_func_t func, uint32_t seq, const void *data,
size_t size, void *user_data)
{
return spa_loop_locked(object->loop, func, seq, data, size, user_data);
}
PW_API_LOOP_IMPL int pw_loop_get_fd(struct pw_loop *object) PW_API_LOOP_IMPL int pw_loop_get_fd(struct pw_loop *object)
{ {

View file

@ -1748,7 +1748,7 @@ static void hook_removed(struct spa_hook *hook)
{ {
struct stream *impl = hook->priv; struct stream *impl = hook->priv;
if (impl->data_loop) if (impl->data_loop)
pw_loop_invoke(impl->data_loop, do_remove_callbacks, 1, NULL, 0, true, impl); pw_loop_locked(impl->data_loop, do_remove_callbacks, 1, NULL, 0, impl);
else else
spa_zero(impl->rt_callbacks); spa_zero(impl->rt_callbacks);
hook->priv = NULL; hook->priv = NULL;
@ -2590,8 +2590,8 @@ int pw_stream_flush(struct pw_stream *stream, bool drain)
if (stream->node == NULL) if (stream->node == NULL)
return -EIO; return -EIO;
pw_loop_invoke(impl->data_loop, pw_loop_locked(impl->data_loop,
drain ? do_drain : do_flush, 1, NULL, 0, true, impl); drain ? do_drain : do_flush, 1, NULL, 0, impl);
if (!drain) if (!drain)
spa_node_send_command(stream->node->node, spa_node_send_command(stream->node->node,