diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index b4ba91984..dda7cd05a 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -3406,7 +3406,7 @@ jack_client_t * jack_client_open (const char *client_name, SPA_VERSION_THREAD_UTILS, &thread_utils_impl, client); - client->loop = client->context.context->data_loop_impl; + client->loop = pw_context_get_data_loop(client->context.context); pw_data_loop_stop(client->loop); pw_context_set_object(client->context.context, diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index 8f582e2e9..6c09386f7 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -47,6 +47,8 @@ struct mix { struct node_data { struct pw_context *context; + struct pw_loop *data_loop; + struct spa_system *data_system; struct pw_mempool *pool; @@ -108,12 +110,11 @@ do_deactivate_link(struct spa_loop *loop, static void clear_link(struct node_data *data, struct link *link) { - struct pw_context *context = data->context; pw_log_debug("link %p", link); - pw_loop_invoke(context->data_loop, + pw_loop_invoke(data->data_loop, do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, link); pw_memmap_free(link->map); - spa_system_close(context->data_system, link->signalfd); + spa_system_close(data->data_system, link->signalfd); spa_list_remove(&link->link); free(link); } @@ -140,7 +141,7 @@ static void clean_transport(struct node_data *data) pw_memmap_free(data->activation); data->node->rt.activation = data->node->activation->map->ptr; - spa_system_close(data->context->data_system, data->rtwritefd); + spa_system_close(data->data_system, data->rtwritefd); data->have_transport = false; } @@ -169,7 +170,7 @@ deactivate_mix(struct node_data *data, struct mix *mix) { if (mix->active) { pw_log_debug("node %p: mix %p deactivate", data, mix); - pw_loop_invoke(data->context->data_loop, + pw_loop_invoke(data->data_loop, do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); mix->active = false; } @@ -191,7 +192,7 @@ activate_mix(struct node_data *data, struct mix *mix) { if (!mix->active) { pw_log_debug("node %p: mix %p activate", data, mix); - pw_loop_invoke(data->context->data_loop, + pw_loop_invoke(data->data_loop, do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); mix->active = true; } @@ -263,7 +264,7 @@ static int client_node_transport(void *_data, proxy, readfd, writefd, data->remote_id, data->activation->ptr); data->rtwritefd = writefd; - spa_system_close(data->context->data_system, data->node->source.fd); + spa_system_close(data->data_system, data->node->source.fd); data->node->source.fd = readfd; data->have_transport = true; @@ -849,7 +850,7 @@ exit: static int link_signal_func(void *user_data) { struct link *link = user_data; - struct spa_system *data_system = link->data->context->data_system; + struct spa_system *data_system = link->data->data_system; pw_log_trace_fp("link %p: signal %p", link, link->target.activation); if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, link->signalfd, 1) < 0)) @@ -888,7 +889,7 @@ client_node_set_activation(void *_data, if (data->remote_id == node_id) { pw_log_debug("node %p: our activation %u: %u %u %u", node, node_id, memid, offset, size); - spa_system_close(data->context->data_system, signalfd); + spa_system_close(data->data_system, signalfd); return 0; } @@ -922,7 +923,7 @@ client_node_set_activation(void *_data, link->target.node = NULL; spa_list_append(&data->links, &link->link); - pw_loop_invoke(data->context->data_loop, + pw_loop_invoke(data->data_loop, do_activate_link, SPA_ID_INVALID, NULL, 0, false, link); pw_log_debug("node %p: link %p: fd:%d id:%u state %p required %d, pending %d", @@ -1177,7 +1178,7 @@ static int node_ready(void *d, int status) struct node_data *data = d; struct pw_impl_node *node = data->node; struct pw_node_activation *a = node->rt.activation; - struct spa_system *data_system = data->context->data_system; + struct spa_system *data_system = data->data_system; struct timespec ts; struct pw_impl_port *p; @@ -1254,6 +1255,8 @@ static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_ data->node = node; data->do_free = do_free; data->context = pw_impl_node_get_context(node); + data->data_loop = node->data_loop; + data->data_system = data->data_loop->system; data->client_node = (struct pw_client_node *)client_node; data->remote_id = SPA_ID_INVALID; diff --git a/src/modules/module-profiler.c b/src/modules/module-profiler.c index 336fe2f79..8a45f2b2d 100644 --- a/src/modules/module-profiler.c +++ b/src/modules/module-profiler.c @@ -74,6 +74,8 @@ struct impl { struct pw_context *context; struct pw_properties *properties; + struct pw_loop *data_loop; + struct spa_hook context_listener; struct spa_hook module_listener; @@ -284,7 +286,7 @@ static int do_stop(struct spa_loop *loop, static void stop_listener(struct impl *impl) { if (impl->listening) { - pw_loop_invoke(impl->context->data_loop, + pw_loop_invoke(impl->data_loop, do_stop, SPA_ID_INVALID, NULL, 0, true, impl); impl->listening = false; } @@ -338,7 +340,7 @@ global_bind(void *object, struct pw_impl_client *client, uint32_t permissions, if (++impl->busy == 1) { pw_log_info("%p: starting profiler", impl); - pw_loop_invoke(impl->context->data_loop, + pw_loop_invoke(impl->data_loop, do_start, SPA_ID_INVALID, NULL, 0, false, impl); impl->listening = true; } @@ -411,6 +413,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->context = context; impl->properties = props; + impl->data_loop = pw_context_get_data_loop(impl->context)->loop; spa_ringbuffer_init(&impl->buffer); diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 5365c8c17..cc564400f 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -107,6 +107,8 @@ struct filter { const char *path; struct pw_context *context; + struct pw_loop *main_loop; + struct pw_loop *data_loop; enum pw_filter_flags flags; @@ -467,7 +469,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size) impl->position = data; else impl->position = NULL; - pw_loop_invoke(impl->context->data_loop, + pw_loop_invoke(impl->data_loop, do_set_position, 1, NULL, 0, true, impl); break; } @@ -486,7 +488,7 @@ static int impl_send_command(void *object, const struct spa_command *command) case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Flush: case SPA_NODE_COMMAND_Pause: - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, NULL, 0, NULL, 0, false, impl); if (filter->state == PW_FILTER_STATE_STREAMING) { pw_log_debug("%p: pause", filter); @@ -987,7 +989,7 @@ static void call_process(struct filter *impl) process, 0, impl->rt.position); } else { - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, do_call_process, 1, NULL, 0, false, impl); } } @@ -1006,7 +1008,7 @@ do_call_drained(struct spa_loop *loop, static void call_drained(struct filter *impl) { - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, do_call_drained, 1, NULL, 0, false, impl); } @@ -1200,6 +1202,10 @@ filter_new(struct pw_context *context, const char *name, goto error_cleanup; } + impl->main_loop = pw_context_get_main_loop(context); + impl->data_loop = pw_data_loop_get_loop( + pw_context_get_data_loop(context)); + this = &impl->this; pw_log_debug("%p: new", impl); @@ -1399,7 +1405,7 @@ void pw_filter_destroy(struct pw_filter *filter) struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); struct port *p; - ensure_loop(impl->context->main_loop, return); + ensure_loop(impl->main_loop, return); pw_log_debug("%p: destroy", filter); @@ -1453,7 +1459,7 @@ void pw_filter_add_listener(struct pw_filter *filter, { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - ensure_loop(impl->context->main_loop); + ensure_loop(impl->main_loop); spa_hook_list_append(&filter->listener_list, listener, events, data); if (events->process && impl->rt_callbacks.funcs == NULL) { @@ -1503,7 +1509,7 @@ int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); int changed = 0; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); if (port_data) { changed = pw_properties_update(port->props, dict); @@ -1542,7 +1548,7 @@ pw_filter_connect(struct pw_filter *filter, uint32_t i; struct spa_dict_item items[1]; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); if (filter->proxy != NULL || filter->state != PW_FILTER_STATE_UNCONNECTED) return -EBUSY; @@ -1637,7 +1643,7 @@ SPA_EXPORT int pw_filter_disconnect(struct pw_filter *filter) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); return filter_disconnect(impl); } @@ -1720,7 +1726,7 @@ void *pw_filter_add_port(struct pw_filter *filter, struct port *p; const char *str; - ensure_loop(impl->context->main_loop, return NULL); + ensure_loop(impl->main_loop, return NULL); if (props == NULL) props = pw_properties_new(NULL, NULL); @@ -1784,7 +1790,7 @@ int pw_filter_remove_port(void *port_data) struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); struct filter *impl = port->filter; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); free_port(impl, port); return 0; @@ -1796,7 +1802,7 @@ int pw_filter_set_error(struct pw_filter *filter, { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); if (res < 0) { va_list args; @@ -1828,7 +1834,7 @@ int pw_filter_update_params(struct pw_filter *filter, struct port *port; int res; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); pw_log_debug("%p: update params", filter); @@ -1851,7 +1857,7 @@ int pw_filter_set_active(struct pw_filter *filter, bool active) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); pw_log_debug("%p: active:%d", filter, active); return 0; @@ -1969,7 +1975,7 @@ SPA_EXPORT int pw_filter_flush(struct pw_filter *filter, bool drain) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - pw_loop_invoke(impl->context->data_loop, + pw_loop_invoke(impl->data_loop, drain ? do_drain : do_flush, 1, NULL, 0, true, impl); return 0; } @@ -2013,10 +2019,10 @@ int pw_filter_trigger_process(struct pw_filter *filter) pw_log_trace_fp("%p", impl); if (!impl->driving) { - res = pw_loop_invoke(impl->context->main_loop, + res = pw_loop_invoke(impl->main_loop, do_trigger_request_process, 1, NULL, 0, false, impl); } else { - res = pw_loop_invoke(impl->context->data_loop, + res = pw_loop_invoke(impl->data_loop, do_trigger_process, 1, NULL, 0, false, impl); } return res; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index f611bdd5d..3f05d758b 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -146,7 +146,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, s this->added = true; if (this->source.loop == NULL) { - struct spa_system *data_system = this->context->data_system; + struct spa_system *data_system = this->data_loop->system; uint64_t dummy; int res; @@ -1126,7 +1126,7 @@ static inline int process_node(void *data) struct pw_impl_node *this = data; struct pw_impl_port *p; struct pw_node_activation *a = this->rt.activation; - struct spa_system *data_system = this->context->data_system; + struct spa_system *data_system = this->data_loop->system; int status; uint64_t nsec; @@ -1177,7 +1177,7 @@ static inline int process_node(void *data) static void node_on_fd_events(struct spa_source *source) { struct pw_impl_node *this = source->data; - struct spa_system *data_system = this->context->data_system; + struct spa_system *data_system = this->data_system; if (SPA_UNLIKELY(source->rmask & (SPA_IO_ERR | SPA_IO_HUP))) { pw_log_warn("%p: got socket error %08x", this, source->rmask); @@ -1235,7 +1235,6 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, struct impl *impl; struct pw_impl_node *this; size_t size; - struct spa_system *data_system = context->data_system; int res; impl = calloc(1, sizeof(struct impl) + user_data_size); @@ -1251,6 +1250,9 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, this->context = context; this->name = strdup("node"); + this->data_loop = pw_context_get_data_loop(context)->loop; + this->data_system = this->data_loop->system; + if (user_data_size > 0) this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void); @@ -1263,7 +1265,8 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, this->properties = properties; - if ((res = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0) + if ((res = spa_system_eventfd_create(this->data_system, + SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0) goto error_clean; pw_log_debug("%p: new fd:%d", this, res); @@ -1289,8 +1292,6 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, impl->work = pw_context_get_work_queue(this->context); impl->pending_id = SPA_ID_INVALID; - this->data_loop = context->data_loop; - spa_list_init(&this->follower_list); spa_hook_list_init(&this->listener_list); @@ -1334,7 +1335,7 @@ error_clean: if (this->activation) pw_memblock_unref(this->activation); if (this->source.fd != -1) - spa_system_close(this->context->data_system, this->source.fd); + spa_system_close(this->data_system, this->source.fd); free(impl); error_exit: pw_properties_free(properties); @@ -1642,7 +1643,7 @@ static int node_ready(void *data, int status) struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); struct pw_impl_node *driver = node->driver_node; struct pw_node_activation *a = node->rt.activation; - struct spa_system *data_system = node->context->data_system; + struct spa_system *data_system = node->data_system; struct pw_node_target *t; struct pw_impl_port *p; uint64_t nsec; @@ -1961,7 +1962,7 @@ void pw_impl_node_destroy(struct pw_impl_node *node) clear_info(node); - spa_system_close(context->data_system, node->source.fd); + spa_system_close(node->data_system, node->source.fd); free(impl); } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 502468f97..d582c352a 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -750,6 +750,7 @@ struct pw_impl_node { struct spa_hook_list listener_list; struct pw_loop *data_loop; /**< the data loop for this node */ + struct spa_system *data_system; struct spa_fraction latency; /**< requested latency */ struct spa_fraction max_latency; /**< maximum latency */ diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index d4a24513b..18f2f7985 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -85,6 +85,9 @@ struct stream { struct pw_context *context; struct spa_hook context_listener; + struct pw_loop *main_loop; + struct pw_loop *data_loop; + enum spa_direction direction; enum pw_stream_flags flags; @@ -426,7 +429,7 @@ static inline void call_process(struct stream *impl) if (impl->process_rt) spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0); else - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, do_call_process, 1, NULL, 0, false, impl); } @@ -443,7 +446,7 @@ do_call_drained(struct spa_loop *loop, static void call_drained(struct stream *impl) { - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, do_call_drained, 1, NULL, 0, false, impl); } @@ -460,7 +463,7 @@ do_call_trigger_done(struct spa_loop *loop, static void call_trigger_done(struct stream *impl) { - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, do_call_trigger_done, 1, NULL, 0, false, impl); } @@ -494,7 +497,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size) else impl->position = NULL; - pw_loop_invoke(impl->context->data_loop, + pw_loop_invoke(impl->data_loop, do_set_position, 1, NULL, 0, true, impl); break; default: @@ -607,7 +610,7 @@ static int impl_send_command(void *object, const struct spa_command *command) case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Flush: case SPA_NODE_COMMAND_Pause: - pw_loop_invoke(impl->context->main_loop, + pw_loop_invoke(impl->main_loop, NULL, 0, NULL, 0, false, impl); if (stream->state == PW_STREAM_STATE_STREAMING) { @@ -1448,6 +1451,7 @@ stream_new(struct pw_context *context, const char *name, res = -errno; goto error_properties; } + impl->main_loop = pw_context_get_main_loop(context); this = &impl->this; pw_log_debug("%p: new \"%s\"", impl, name); @@ -1649,7 +1653,7 @@ void pw_stream_destroy(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct control *c; - ensure_loop(impl->context->main_loop, return); + ensure_loop(impl->main_loop, return); pw_log_debug("%p: destroy", stream); @@ -1706,7 +1710,7 @@ void pw_stream_add_listener(struct pw_stream *stream, { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - ensure_loop(impl->context->main_loop); + ensure_loop(impl->main_loop); spa_hook_list_append(&stream->listener_list, listener, events, data); @@ -1744,7 +1748,7 @@ int pw_stream_update_properties(struct pw_stream *stream, const struct spa_dict int changed, res = 0; struct match match; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); changed = pw_properties_update(stream->properties, dict); if (!changed) @@ -1855,7 +1859,7 @@ pw_stream_connect(struct pw_stream *stream, uint32_t i; int res; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); pw_log_debug("%p: connect target:%d", stream, target_id); @@ -2047,6 +2051,8 @@ pw_stream_connect(struct pw_stream *stream, pw_impl_node_set_active(impl->node, !SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_INACTIVE)); + impl->data_loop = impl->node->data_loop; + pw_log_debug("%p: export node %p", stream, impl->node); stream->proxy = pw_core_export(stream->core, PW_TYPE_INTERFACE_Node, NULL, impl->node, 0); @@ -2086,7 +2092,7 @@ SPA_EXPORT int pw_stream_disconnect(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); return stream_disconnect(impl); } @@ -2096,7 +2102,7 @@ int pw_stream_set_error(struct pw_stream *stream, { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); if (res < 0) { va_list args; @@ -2126,7 +2132,7 @@ int pw_stream_update_params(struct pw_stream *stream, struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); int res; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); pw_log_debug("%p: update params", stream); if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0) @@ -2151,7 +2157,7 @@ SPA_EXPORT int pw_stream_set_param(struct pw_stream *stream, uint32_t id, const struct spa_pod *param) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); if (impl->node == NULL) return -EIO; @@ -2170,7 +2176,7 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_valu struct spa_pod *pod; struct control *c; - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); if (impl->node == NULL) return -EIO; @@ -2239,7 +2245,7 @@ int pw_stream_set_active(struct pw_stream *stream, bool active) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - ensure_loop(impl->context->main_loop, return -EIO); + ensure_loop(impl->main_loop, return -EIO); pw_log_debug("%p: active:%d", stream, active); @@ -2360,7 +2366,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) if (impl->direction == SPA_DIRECTION_OUTPUT && impl->driving && !impl->using_trigger) { pw_log_debug("deprecated: use pw_stream_trigger_process() to drive the stream."); - res = pw_loop_invoke(impl->context->data_loop, + res = pw_loop_invoke(impl->data_loop, do_trigger_deprecated, 1, NULL, 0, false, impl); } return res; @@ -2405,7 +2411,7 @@ int pw_stream_flush(struct pw_stream *stream, bool drain) if (impl->node == NULL) return -EIO; - pw_loop_invoke(impl->context->data_loop, + pw_loop_invoke(impl->data_loop, drain ? do_drain : do_flush, 1, NULL, 0, true, impl); if (!drain) @@ -2463,13 +2469,13 @@ int pw_stream_trigger_process(struct pw_stream *stream) impl->using_trigger = true; if (!impl->driving && !impl->trigger) { - res = pw_loop_invoke(impl->context->main_loop, + res = pw_loop_invoke(impl->main_loop, do_trigger_request_process, 1, NULL, 0, false, impl); } else { if (!impl->process_rt) call_process(impl); - res = pw_loop_invoke(impl->context->data_loop, + res = pw_loop_invoke(impl->data_loop, do_trigger_process, 1, NULL, 0, false, impl); } return res;