loop: spa_loop_invoke -> spa_loop_locked where possible

When we simply need to change some state for the code executed in the
loop, we can use locked() instead of invoke(). This is more efficient
and avoids some context switches in the normal case.
This commit is contained in:
Wim Taymans 2025-05-30 11:59:35 +02:00
parent b943c31fd8
commit c45d667934
29 changed files with 75 additions and 75 deletions

View file

@ -86,7 +86,7 @@ static void stream_link(struct group *group, struct stream *stream)
struct modify_info info = { .stream = stream, .streams = &group->streams };
int res;
res = spa_loop_invoke(group->data_loop, do_modify, 0, NULL, 0, true, &info);
res = spa_loop_locked(group->data_loop, do_modify, 0, NULL, 0, &info);
spa_assert_se(res == 0);
}
@ -95,7 +95,7 @@ static void stream_unlink(struct stream *stream)
struct modify_info info = { .stream = stream, .streams = NULL };
int res;
res = spa_loop_invoke(stream->group->data_loop, do_modify, 0, NULL, 0, true, &info);
res = spa_loop_locked(stream->group->data_loop, do_modify, 0, NULL, 0, &info);
spa_assert_se(res == 0);
}
@ -393,7 +393,7 @@ static void group_destroy(struct group *group)
spa_assert(spa_list_is_empty(&group->streams));
res = spa_loop_invoke(group->data_loop, do_remove_source, 0, NULL, 0, true, group);
res = spa_loop_locked(group->data_loop, do_remove_source, 0, NULL, 0, group);
spa_assert_se(res == 0);
close(group->timerfd);

View file

@ -452,7 +452,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
}
if (this->started) {
spa_loop_invoke(this->data_loop, do_reassign_io, 0, NULL, 0, true, &info);
spa_loop_locked(this->data_loop, do_reassign_io, 0, NULL, 0, &info);
} else {
this->clock = info.clock;
this->position = info.position;
@ -1548,7 +1548,7 @@ static int transport_start(struct impl *this)
this->transport_started = true;
if (this->transport->iso_io)
spa_loop_invoke(this->data_loop, do_start_iso_io, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_start_iso_io, 0, NULL, 0, this);
if (is_asha) {
struct spa_bt_asha *asha = this->asha;
@ -1678,7 +1678,7 @@ static void transport_stop(struct impl *this)
spa_log_trace(this->log, "%p: stop transport", this);
spa_loop_invoke(this->data_loop, do_remove_transport_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_transport_source, 0, NULL, 0, this);
if (this->codec_data && this->own_codec_data)
this->codec->deinit(this->codec_data);
@ -1696,7 +1696,7 @@ static int do_stop(struct impl *this)
this->start_ready = false;
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_source, 0, NULL, 0, this);
transport_stop(this);
@ -2310,7 +2310,7 @@ static void transport_destroy(void *data)
{
struct impl *this = data;
spa_log_debug(this->log, "transport %p destroy", this->transport);
spa_loop_invoke(this->data_loop, do_transport_destroy, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_transport_destroy, 0, NULL, 0, this);
}
static void transport_state_changed(void *data,

View file

@ -315,7 +315,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_reassign_follower, 0, NULL, 0, this);
}
return 0;
}
@ -799,7 +799,7 @@ static int transport_start(struct impl *this)
spa_strerror(res));
if (this->transport->iso_io)
spa_loop_invoke(this->data_loop, do_start_iso_io, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_start_iso_io, 0, NULL, 0, this);
this->transport_started = true;
@ -899,7 +899,7 @@ static void transport_stop(struct impl *this)
spa_log_debug(this->log, "%p: transport stop", this);
spa_loop_invoke(this->data_loop, do_remove_transport_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_transport_source, 0, NULL, 0, this);
if (this->fd >= 0) {
close(this->fd);
@ -924,7 +924,7 @@ static int do_stop(struct impl *this)
this->start_ready = false;
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_source, 0, NULL, 0, this);
transport_stop(this);
@ -1736,7 +1736,7 @@ static void transport_destroy(void *data)
{
struct impl *this = data;
spa_log_debug(this->log, "transport %p destroy", this->transport);
spa_loop_invoke(this->data_loop, do_transport_destroy, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_transport_destroy, 0, NULL, 0, this);
}
static const struct spa_bt_transport_events transport_events = {

View file

@ -1135,7 +1135,7 @@ static int do_stop(struct impl *this)
spa_log_debug(this->log, "%p: stop", this);
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_source, 0, NULL, 0, this);
this->started = false;
@ -1149,7 +1149,7 @@ static int do_release(struct impl *this)
spa_log_debug(this->log, "%p: release", this);
spa_loop_invoke(this->data_loop, do_remove_port_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_port_source, 0, NULL, 0, this);
for (i = 0; i < N_PORTS; ++i) {
struct port *port = &this->ports[i];
@ -1260,7 +1260,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_reassign_follower, 0, NULL, 0, this);
}
return 0;

View file

@ -271,7 +271,7 @@ static int do_remove_source(struct spa_loop *loop,
void spa_bt_sco_io_destroy(struct spa_bt_sco_io *io)
{
if (io->started)
spa_loop_invoke(io->data_loop, do_remove_source, 0, NULL, 0, true, io);
spa_loop_locked(io->data_loop, do_remove_source, 0, NULL, 0, io);
io->started = false;
free(io);

View file

@ -308,7 +308,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_reassign_follower, 0, NULL, 0, this);
}
return 0;
}
@ -954,7 +954,7 @@ static void transport_stop(struct impl *this)
spa_log_trace(this->log, "sco-sink %p: transport stop", this);
spa_loop_invoke(this->data_loop, do_remove_transport_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_transport_source, 0, NULL, 0, this);
if (this->buffer) {
free(this->buffer);
@ -978,7 +978,7 @@ static int do_stop(struct impl *this)
this->start_ready = false;
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_source, 0, NULL, 0, this);
transport_stop(this);
@ -1571,7 +1571,7 @@ static void transport_destroy(void *data)
{
struct impl *this = data;
spa_log_debug(this->log, "transport %p destroy", this->transport);
spa_loop_invoke(this->data_loop, do_transport_destroy, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_transport_destroy, 0, NULL, 0, this);
}
static const struct spa_bt_transport_events transport_events = {

View file

@ -285,7 +285,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_reassign_follower, 0, NULL, 0, this);
}
return 0;
@ -764,7 +764,7 @@ static int transport_start(struct impl *this)
/* Start socket i/o */
if ((res = spa_bt_transport_ensure_sco_io(this->transport, this->data_loop, this->data_system)) < 0)
goto fail;
spa_loop_invoke(this->data_loop, do_add_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_add_source, 0, NULL, 0, this);
/* Set the started flag */
this->transport_started = true;
@ -862,7 +862,7 @@ static void transport_stop(struct impl *this)
spa_log_debug(this->log, "sco-source %p: transport stop", this);
spa_loop_invoke(this->data_loop, do_remove_transport_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_transport_source, 0, NULL, 0, this);
spa_bt_decode_buffer_clear(&port->buffer);
@ -882,7 +882,7 @@ static int do_stop(struct impl *this)
this->start_ready = false;
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_source, 0, NULL, 0, this);
transport_stop(this);
@ -1589,7 +1589,7 @@ static void transport_destroy(void *data)
{
struct impl *this = data;
spa_log_debug(this->log, "transport %p destroy", this->transport);
spa_loop_invoke(this->data_loop, do_transport_destroy, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_transport_destroy, 0, NULL, 0, this);
}
static const struct spa_bt_transport_events transport_events = {

View file

@ -262,7 +262,7 @@ static void spatializer_reload(void * Instance)
spa_log_error(impl->log, "reloading left or right convolver failed");
return;
}
spa_loop_invoke(impl->plugin->data_loop, do_switch, 1, NULL, 0, true, impl);
spa_loop_locked(impl->plugin->data_loop, do_switch, 1, NULL, 0, impl);
}
struct free_data {

View file

@ -1059,7 +1059,7 @@ static int spa_libcamera_stream_off(struct impl *impl)
impl->camera->requestCompleted.disconnect(impl, &impl::requestComplete);
spa_loop_invoke(impl->data_loop, do_remove_source, 0, NULL, 0, true, impl);
spa_loop_locked(impl->data_loop, do_remove_source, 0, NULL, 0, impl);
if (impl->source.fd >= 0) {
spa_system_close(impl->system, impl->source.fd);
impl->source.fd = -1;

View file

@ -708,7 +708,7 @@ static int impl_clear(struct spa_handle *handle)
this = (struct impl *) handle;
if (this->data_loop)
spa_loop_invoke(this->data_loop, do_remove_timer, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_timer, 0, NULL, 0, this);
spa_system_close(this->data_system, this->timer_source.fd);
return 0;

View file

@ -739,7 +739,7 @@ static int impl_clear(struct spa_handle *handle)
this = (struct impl *) handle;
if (this->data_loop)
spa_loop_invoke(this->data_loop, do_remove_timer, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_timer, 0, NULL, 0, this);
spa_system_close(this->data_system, this->timer_source.fd);
return 0;

View file

@ -2220,7 +2220,7 @@ impl_node_port_set_io(void *object,
case SPA_IO_Buffers:
if (this->data_loop) {
struct io_data d = { .port = port, .data = data, .size = size };
spa_loop_invoke(this->data_loop, do_set_port_io, 0, NULL, 0, true, &d);
spa_loop_locked(this->data_loop, do_set_port_io, 0, NULL, 0, &d);
}
else
port->io = data;

View file

@ -852,7 +852,7 @@ static int impl_clear(struct spa_handle *handle)
this = (struct impl *) handle;
if (this->data_loop)
spa_loop_invoke(this->data_loop, do_remove_timer, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_timer, 0, NULL, 0, this);
spa_loop_utils_destroy_source(this->loop_utils, this->timer_source);
return 0;

View file

@ -933,7 +933,7 @@ static int impl_clear(struct spa_handle *handle)
spa_vulkan_compute_deinit(&this->state);
if (this->data_loop)
spa_loop_invoke(this->data_loop, do_remove_timer, 0, NULL, 0, true, this);
spa_loop_locked(this->data_loop, do_remove_timer, 0, NULL, 0, this);
spa_system_close(this->data_system, this->timer_source.fd);
return 0;

View file

@ -1229,12 +1229,11 @@ static void client_node0_resource_destroy(void *data)
spa_hook_remove(&impl->object_listener);
if (node->data_source.fd != -1) {
spa_loop_invoke(node->data_loop,
spa_loop_locked(node->data_loop,
do_remove_source,
SPA_ID_INVALID,
NULL,
0,
true,
&node->data_source);
}
if (this->node)

View file

@ -553,7 +553,7 @@ static void resize_delay(struct stream *stream, uint32_t size)
for (i = 0; i < channels; ++i)
ringbuffer_init(&info.delay[i], SPA_PTROFF(info.buf, i*size, void), size);
pw_loop_invoke(stream->impl->data_loop, do_replace_delay, 0, NULL, 0, true, &info);
pw_loop_locked(stream->impl->data_loop, do_replace_delay, 0, NULL, 0, &info);
free(info.buf);
}
@ -616,7 +616,7 @@ static int do_clear_delaybuf(struct spa_loop *loop, bool async, uint32_t seq,
static void clear_delaybuf(struct impl *impl)
{
pw_loop_invoke(impl->data_loop, do_clear_delaybuf, 0, NULL, 0, true, impl);
pw_loop_locked(impl->data_loop, do_clear_delaybuf, 0, NULL, 0, impl);
}
static int do_add_stream(struct spa_loop *loop, bool async, uint32_t seq,
@ -669,7 +669,7 @@ static void remove_stream(struct stream *s, bool destroy)
{
pw_log_debug("destroy stream %d", s->id);
pw_loop_invoke(s->impl->data_loop, do_remove_stream, 0, NULL, 0, true, s);
pw_loop_locked(s->impl->data_loop, do_remove_stream, 0, NULL, 0, s);
if (destroy && s->stream) {
spa_hook_remove(&s->stream_listener);
@ -896,7 +896,7 @@ static int create_stream(struct stream_info *info)
direction, PW_ID_ANY, flags, params, n_params)) < 0)
goto error;
pw_loop_invoke(impl->data_loop, do_add_stream, 0, NULL, 0, true, s);
pw_loop_locked(impl->data_loop, do_add_stream, 0, NULL, 0, s);
update_delay(impl);
return 0;

View file

@ -87,7 +87,7 @@ bool client_detach(struct client *client)
if (server->wait_clients > 0 && --server->wait_clients == 0) {
int mask = server->source->mask;
SPA_FLAG_SET(mask, SPA_IO_IN);
pw_loop_update_io(impl->loop, server->source, mask);
pw_loop_update_io(impl->main_loop, server->source, mask);
}
client->server = NULL;
@ -112,7 +112,7 @@ void client_disconnect(struct client *client)
pw_map_for_each(&client->streams, client_free_stream, client);
if (client->source) {
pw_loop_destroy_source(impl->loop, client->source);
pw_loop_destroy_source(impl->main_loop, client->source);
client->source = NULL;
}
@ -207,7 +207,7 @@ int client_queue_message(struct client *client, struct message *msg)
uint32_t mask = client->source->mask;
if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
SPA_FLAG_SET(mask, SPA_IO_OUT);
pw_loop_update_io(impl->loop, client->source, mask);
pw_loop_update_io(impl->main_loop, client->source, mask);
}
client->new_msg_since_last_flush = true;
@ -278,7 +278,7 @@ int client_flush_messages(struct client *client)
if (SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(client->impl->loop, client->source, mask);
pw_loop_update_io(client->impl->main_loop, client->source, mask);
}
} else {
if (res != -EAGAIN && res != -EWOULDBLOCK)

View file

@ -47,7 +47,7 @@ struct stats {
};
struct impl {
struct pw_loop *loop;
struct pw_loop *main_loop;
struct pw_context *context;
struct spa_hook context_listener;

View file

@ -214,12 +214,12 @@ static int module_combine_sink_load(struct module *module)
pw_manager_add_listener(data->manager, &data->manager_listener,
&manager_events, data);
data->sinks_timeout = pw_loop_add_timer(module->impl->loop, on_sinks_timeout, data);
data->sinks_timeout = pw_loop_add_timer(module->impl->main_loop, on_sinks_timeout, data);
if (data->sinks_timeout) {
struct timespec timeout = {0};
timeout.tv_sec = TIMEOUT_SINKS_MSEC / 1000;
timeout.tv_nsec = (TIMEOUT_SINKS_MSEC % 1000) * SPA_NSEC_PER_MSEC;
pw_loop_update_timer(module->impl->loop, data->sinks_timeout, &timeout, NULL, false);
pw_loop_update_timer(module->impl->main_loop, data->sinks_timeout, &timeout, NULL, false);
}
return data->load_emitted ? 0 : SPA_RESULT_RETURN_ASYNC(0);
}
@ -229,7 +229,7 @@ static int module_combine_sink_unload(struct module *module)
struct module_combine_sink_data *d = module->user_data;
if (d->sinks_timeout != NULL)
pw_loop_destroy_source(module->impl->loop, d->sinks_timeout);
pw_loop_destroy_source(module->impl->main_loop, d->sinks_timeout);
if (d->mod != NULL) {
spa_hook_remove(&d->mod_listener);

View file

@ -197,7 +197,7 @@ static void handle_module_group(struct module_gsettings_data *d, gchar *name)
snprintf(p, sizeof(p), "args%i", i);
info.args[i] = g_settings_get_string(settings, p);
}
pw_loop_invoke(impl->loop, do_handle_info, 0,
pw_loop_invoke(impl->main_loop, do_handle_info, 0,
&info, sizeof(info), false, d);
g_object_unref(G_OBJECT(settings));

View file

@ -1484,7 +1484,7 @@ static void stream_process(void *data)
pw_stream_get_time_n(stream->stream, &pd.pwt, sizeof(pd.pwt));
pw_loop_invoke(impl->loop,
pw_loop_invoke(impl->main_loop,
do_process_done, 1, &pd, sizeof(pd), false, stream);
}
@ -5492,7 +5492,7 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
spa_list_init(&impl->cleanup_clients);
spa_list_init(&impl->free_messages);
impl->loop = pw_context_get_main_loop(context);
impl->main_loop = pw_context_get_main_loop(context);
impl->work_queue = pw_context_get_work_queue(context);
if (props == NULL)

View file

@ -384,7 +384,7 @@ on_connect(void *data, int fd, uint32_t mask)
if (server->n_clients > 0) {
int m = server->source->mask;
SPA_FLAG_CLEAR(m, SPA_IO_IN);
pw_loop_update_io(impl->loop, server->source, m);
pw_loop_update_io(impl->main_loop, server->source, m);
server->wait_clients++;
}
}
@ -404,7 +404,7 @@ on_connect(void *data, int fd, uint32_t mask)
pw_log_debug("server %p: new client %p fd:%d", server, client, client_fd);
client->source = pw_loop_add_io(impl->loop,
client->source = pw_loop_add_io(impl->main_loop,
client_fd,
SPA_IO_ERR | SPA_IO_HUP | SPA_IO_IN,
true, on_client_data, client);
@ -949,7 +949,7 @@ static int server_start(struct server *server, const struct sockaddr_storage *ad
if (fd < 0)
return fd;
server->source = pw_loop_add_io(impl->loop, fd, SPA_IO_IN, true, on_connect, server);
server->source = pw_loop_add_io(impl->main_loop, fd, SPA_IO_IN, true, on_connect, server);
if (server->source == NULL) {
res = -errno;
pw_log_error("server %p: can't create server source: %m", impl);
@ -1100,7 +1100,7 @@ void server_free(struct server *server)
spa_hook_list_call(&impl->hooks, struct impl_events, server_stopped, 0, server);
if (server->source)
pw_loop_destroy_source(impl->loop, server->source);
pw_loop_destroy_source(impl->main_loop, server->source);
if (server->addr.ss_family == AF_UNIX && !server->activated)
unlink(((const struct sockaddr_un *) &server->addr)->sun_path);

View file

@ -118,7 +118,7 @@ void stream_free(struct stream *stream)
/* force processing of all pending messages before we destroy
* the stream */
pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);
pw_loop_invoke(impl->main_loop, NULL, 0, NULL, 0, false, client);
pw_stream_destroy(stream->stream);
}

View file

@ -584,7 +584,7 @@ static void free_session(struct session *sess)
{
struct impl *impl = sess->impl;
pw_loop_invoke(impl->data_loop, do_unlink_session, 1, NULL, 0, true, sess);
pw_loop_locked(impl->data_loop, do_unlink_session, 1, NULL, 0, sess);
sess->impl->n_sessions--;

View file

@ -151,7 +151,7 @@ struct impl {
struct pw_properties *props;
struct pw_context *context;
struct pw_loop *loop;
struct pw_loop *main_loop;
struct pw_loop *data_loop;
struct pw_core *core;
@ -226,7 +226,7 @@ on_rtp_io(void *data, int fd, uint32_t mask)
if (!impl->receiving) {
impl->receiving = true;
pw_loop_invoke(impl->loop, do_start, 1, NULL, 0, false, impl);
pw_loop_invoke(impl->main_loop, do_start, 1, NULL, 0, false, impl);
}
}
return;
@ -349,7 +349,7 @@ static void on_stream_start_retry_timer_event(void *data, uint64_t expirations)
static void destroy_stream_start_retry_timer(struct impl *impl)
{
if (impl->stream_start_retry_timer != NULL) {
pw_loop_destroy_source(impl->loop, impl->stream_start_retry_timer);
pw_loop_destroy_source(impl->main_loop, impl->stream_start_retry_timer);
impl->stream_start_retry_timer = NULL;
}
}
@ -382,7 +382,7 @@ static int stream_start(struct impl *impl)
if (impl->stream_start_retry_timer == NULL) {
struct timespec value, interval;
impl->stream_start_retry_timer = pw_loop_add_timer(impl->loop,
impl->stream_start_retry_timer = pw_loop_add_timer(impl->main_loop,
on_stream_start_retry_timer_event, impl);
/* Use a 1-second retry interval. The network interfaces
* are likely to be up and running then. */
@ -390,7 +390,7 @@ static int stream_start(struct impl *impl)
value.tv_nsec = 0;
interval.tv_sec = 1;
interval.tv_nsec = 0;
pw_loop_update_timer(impl->loop, impl->stream_start_retry_timer, &value,
pw_loop_update_timer(impl->main_loop, impl->stream_start_retry_timer, &value,
&interval, false);
}
/* Do nothing if the timer is already up. */
@ -579,7 +579,7 @@ static void impl_destroy(struct impl *impl)
pw_core_disconnect(impl->core);
if (impl->standby_timer)
pw_loop_destroy_source(impl->loop, impl->standby_timer);
pw_loop_destroy_source(impl->main_loop, impl->standby_timer);
destroy_stream_start_retry_timer(impl);
@ -663,7 +663,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->loop = pw_context_get_main_loop(context);
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
if ((sess_name = pw_properties_get(props, "sess.name")) == NULL)
@ -769,7 +769,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener,
&core_events, impl);
impl->standby_timer = pw_loop_add_timer(impl->loop, on_standby_timer_event, impl);
impl->standby_timer = pw_loop_add_timer(impl->main_loop, on_standby_timer_event, impl);
if (impl->standby_timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
@ -779,7 +779,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
value.tv_nsec = 0;
interval.tv_sec = impl->cleanup_interval;
interval.tv_nsec = 0;
pw_loop_update_timer(impl->loop, impl->standby_timer, &value, &interval, false);
pw_loop_update_timer(impl->main_loop, impl->standby_timer, &value, &interval, false);
impl->stream = rtp_stream_new(impl->core,
PW_DIRECTION_OUTPUT, pw_properties_copy(stream_props),

View file

@ -166,7 +166,7 @@ struct impl {
struct pw_properties *props;
struct pw_context *context;
struct pw_loop *loop;
struct pw_loop *main_loop;
struct pw_loop *data_loop;
struct pw_core *core;
@ -457,7 +457,7 @@ static struct stream *make_stream(struct impl *impl, const struct vban_header *h
stream->salen = salen;
spa_list_append(&impl->streams, &stream->link);
pw_loop_invoke(impl->loop, do_setup_stream, 1, NULL, 0, false, stream);
pw_loop_invoke(impl->main_loop, do_setup_stream, 1, NULL, 0, false, stream);
return stream;
}
@ -603,7 +603,7 @@ static void impl_destroy(struct impl *impl)
pw_core_disconnect(impl->core);
if (impl->timer)
pw_loop_destroy_source(impl->loop, impl->timer);
pw_loop_destroy_source(impl->main_loop, impl->timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
@ -681,7 +681,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->loop = pw_context_get_main_loop(context);
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
spa_list_init(&impl->streams);
@ -747,7 +747,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener,
&core_events, impl);
impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl);
impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
@ -757,7 +757,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
value.tv_nsec = 0;
interval.tv_sec = impl->cleanup_interval;
interval.tv_nsec = 0;
pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false);
pw_loop_update_timer(impl->main_loop, impl->timer, &value, &interval, false);
if ((res = listen_start(impl)) < 0) {
pw_log_error("failed to start VBAN stream: %s", spa_strerror(res));

View file

@ -280,7 +280,7 @@ int pw_data_loop_stop(struct pw_data_loop *loop)
pthread_cancel(loop->thread);
} else {
pw_log_debug("%p signal", loop);
pw_loop_invoke(loop->loop, do_stop, 1, NULL, 0, false, loop);
pw_loop_locked(loop->loop, do_stop, 1, NULL, 0, loop);
}
pw_log_debug("%p join", loop);
if ((utils = loop->thread_utils) == NULL)

View file

@ -79,8 +79,9 @@ bool pw_data_loop_in_thread(struct pw_data_loop *loop);
struct spa_thread *pw_data_loop_get_thread(struct pw_data_loop *loop);
/** invoke func in the context of the thread or in the caller thread when
* the loop is not running. May be called from the loop's thread, but otherwise
* can only be called by a single thread at a time.
* the loop is not running. May be called from the loop's thread, and
* can be called by multiple threads at the same time.
*
* If called from the loop's thread, all callbacks previously queued with
* pw_data_loop_invoke() will be run synchronously, which might cause
* unexpected reentrancy problems.

View file

@ -107,7 +107,7 @@ SPA_EXPORT
int pw_main_loop_quit(struct pw_main_loop *loop)
{
pw_log_debug("%p: quit", loop);
return pw_loop_invoke(loop->loop, do_stop, 1, NULL, 0, false, loop);
return pw_loop_locked(loop->loop, do_stop, 1, NULL, 0, loop);
}
/** Start a main loop