From cfd3bcd6b269c3e9c4e30ad5de2ae9eef2b03d34 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 17 Jul 2023 11:41:03 +0200 Subject: [PATCH] impl-node: add rt_events Remove the context_driver events and replace them with realtime node events. The problem is that the realtime node events are emitted from the node data thread, which can be different for each node and aggregating them into context_driver events is not a good idea. It's also nice for the stream drained event, which no longer needs to go through the context_driver events. --- src/modules/module-client-node/client-node.c | 2 +- src/modules/module-client-node/remote-node.c | 23 ++++--- src/modules/module-profiler.c | 19 +++--- src/pipewire/context.c | 66 +++++++++----------- src/pipewire/context.h | 10 ++- src/pipewire/impl-client.c | 4 +- src/pipewire/impl-node.c | 58 +++++++++++++++-- src/pipewire/impl-node.h | 25 ++++++++ src/pipewire/private.h | 47 +++++--------- src/pipewire/stream.c | 21 +++---- 10 files changed, 161 insertions(+), 114 deletions(-) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 18ddfa6bc..fbaa8ecee 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -1092,7 +1092,7 @@ static void node_on_data_fd_events(struct spa_source *source) spa_node_call_ready(&impl->callbacks, status); } else { spa_log_trace_fp(impl->log, "%p: got complete", impl); - pw_context_driver_emit_complete(node->context, node); + pw_impl_node_rt_emit_complete(node); } } } diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index af91548ba..f5d116f59 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -45,7 +45,6 @@ struct mix { struct node_data { struct pw_context *context; - struct spa_hook context_listener; struct pw_loop *data_loop; struct spa_system *data_system; @@ -61,6 +60,7 @@ struct node_data { struct pw_impl_node *node; struct spa_hook node_listener; + struct spa_hook node_rt_listener; unsigned int do_free:1; unsigned int have_transport:1; unsigned int allow_mlock:1; @@ -1129,11 +1129,10 @@ static void client_node_removed(void *_data) spa_hook_remove(&data->proxy_client_node_listener); spa_hook_remove(&data->client_node_listener); - pw_context_driver_remove_listener(data->context, - &data->context_listener); - if (data->node) { spa_hook_remove(&data->node_listener); + pw_impl_node_remove_rt_listener(data->node, + &data->node_rt_listener); pw_impl_node_set_state(data->node, PW_NODE_STATE_SUSPENDED); clean_node(data); @@ -1168,12 +1167,13 @@ static const struct pw_proxy_events proxy_client_node_events = { .bound_props = client_node_bound_props, }; -static void context_complete(void *data, struct pw_impl_node *node) +static void node_rt_complete(void *data) { struct node_data *d = data; + struct pw_impl_node *node = d->node; struct spa_system *data_system = d->data_system; - if (node != d->node || !node->driving || + if (!node->driving || !SPA_FLAG_IS_SET(node->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER)) return; @@ -1181,9 +1181,9 @@ static void context_complete(void *data, struct pw_impl_node *node) pw_log_warn("node %p: write failed %m", node); } -static const struct pw_context_driver_events context_events = { - PW_VERSION_CONTEXT_DRIVER_EVENTS, - .complete = context_complete, +static const struct pw_impl_node_rt_events node_rt_events = { + PW_VERSION_IMPL_NODE_RT_EVENTS, + .complete = node_rt_complete, }; static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_free, @@ -1238,14 +1238,13 @@ static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_ &proxy_client_node_events, data); pw_impl_node_add_listener(node, &data->node_listener, &node_events, data); + pw_impl_node_add_rt_listener(node, &data->node_rt_listener, + &node_rt_events, data); pw_client_node_add_listener(data->client_node, &data->client_node_listener, &client_node_events, data); - pw_context_driver_add_listener(data->context, - &data->context_listener, - &context_events, data); do_node_init(data); diff --git a/src/modules/module-profiler.c b/src/modules/module-profiler.c index 97a1c37c0..5f084af8a 100644 --- a/src/modules/module-profiler.c +++ b/src/modules/module-profiler.c @@ -278,17 +278,15 @@ done: impl->count++; } -static const struct pw_context_driver_events context_events = { - PW_VERSION_CONTEXT_DRIVER_EVENTS, - .incomplete = context_do_profile, - .complete = context_do_profile, +static const struct pw_context_events context_events = { + PW_VERSION_CONTEXT_EVENTS, + .profiler = context_do_profile, }; static void stop_listener(struct impl *impl) { if (impl->listening) { - pw_context_driver_remove_listener(impl->context, - &impl->context_listener); + pw_context_stop_profiler(impl->context); impl->listening = false; } } @@ -331,9 +329,7 @@ global_bind(void *object, struct pw_impl_client *client, uint32_t permissions, if (++impl->busy == 1) { pw_log_info("%p: starting profiler", impl); - pw_context_driver_add_listener(impl->context, - &impl->context_listener, - &context_events, impl); + pw_context_start_profiler(impl->context); impl->listening = true; } return 0; @@ -346,6 +342,7 @@ static void module_destroy(void *data) if (impl->global != NULL) pw_global_destroy(impl->global); + spa_hook_remove(&impl->context_listener); spa_hook_remove(&impl->module_listener); pw_properties_free(impl->properties); @@ -430,6 +427,10 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); + pw_context_add_listener(impl->context, + &impl->context_listener, + &context_events, impl); + pw_global_register(impl->global); pw_global_add_listener(impl->global, &impl->global_listener, &global_events, impl); diff --git a/src/pipewire/context.c b/src/pipewire/context.c index d7822c176..fec6f634c 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -479,59 +479,51 @@ void pw_context_add_listener(struct pw_context *context, spa_hook_list_append(&context->listener_list, listener, events, data); } -struct listener_data { - struct spa_hook *listener; - const struct pw_context_driver_events *events; - void *data; +static void node_complete(void *data) +{ + struct pw_impl_node *node = data; + pw_log_info("complete"); + pw_context_emit_profiler(node->context, node); +} + +static struct pw_impl_node_rt_events node_rt_events = { + PW_VERSION_IMPL_NODE_RT_EVENTS, + .complete = node_complete, }; -static int -do_add_listener(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct pw_context *context = user_data; - const struct listener_data *d = data; - spa_hook_list_append(&context->driver_listener_list, - d->listener, d->events, d->data); - return 0; -} - SPA_EXPORT -void pw_context_driver_add_listener(struct pw_context *context, - struct spa_hook *listener, - const struct pw_context_driver_events *events, - void *data) +void pw_context_start_profiler(struct pw_context *context) { - struct listener_data d = { - .listener = listener, - .events = events, - .data = data }; struct pw_impl_node *n; + + pw_log_info("%d", context->profiling); + if (context->profiling++ > 0) + return; + spa_list_for_each(n, &context->driver_list, driver_link) { + if (SPA_FLAG_IS_SET(n->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER)) + continue; + SPA_FLAG_SET(n->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER); + pw_impl_node_add_rt_listener(n, &n->profiler_listener, &node_rt_events, n); } - pw_loop_invoke(context->data_loop, - do_add_listener, SPA_ID_INVALID, &d, sizeof(d), false, context); -} - -static int do_remove_listener(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct spa_hook *listener = user_data; - spa_hook_remove(listener); - return 0; } SPA_EXPORT -void pw_context_driver_remove_listener(struct pw_context *context, - struct spa_hook *listener) +void pw_context_stop_profiler(struct pw_context *context) { struct pw_impl_node *n; + + pw_log_info("%d", context->profiling); + if (--context->profiling > 0) + return; + spa_list_for_each(n, &context->driver_list, driver_link) { + if (!SPA_FLAG_IS_SET(n->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER)) + continue; SPA_FLAG_CLEAR(n->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER); + pw_impl_node_remove_rt_listener(n, &n->profiler_listener); } - pw_loop_invoke(context->data_loop, - do_remove_listener, SPA_ID_INVALID, NULL, 0, true, listener); } SPA_EXPORT diff --git a/src/pipewire/context.h b/src/pipewire/context.h index 3971d78b4..5c4386728 100644 --- a/src/pipewire/context.h +++ b/src/pipewire/context.h @@ -43,6 +43,7 @@ struct pw_context; struct pw_global; struct pw_impl_client; +struct pw_impl_node; #include #include @@ -50,7 +51,7 @@ struct pw_impl_client; /** context events emitted by the context object added with \ref pw_context_add_listener */ struct pw_context_events { -#define PW_VERSION_CONTEXT_EVENTS 0 +#define PW_VERSION_CONTEXT_EVENTS 1 uint32_t version; /** The context is being destroyed */ @@ -63,6 +64,8 @@ struct pw_context_events { void (*global_added) (void *data, struct pw_global *global); /** a global object was removed */ void (*global_removed) (void *data, struct pw_global *global); + /** a driver completed */ + void (*profiler) (void *data, struct pw_impl_node *node); }; /** Make a new context object for a given main_loop. Ownership of the properties is taken */ @@ -168,6 +171,11 @@ int pw_context_set_object(struct pw_context *context, const char *type, void *va /** get an object from the context */ void *pw_context_get_object(struct pw_context *context, const char *type); +/** start the profiler, driver_completed events will be emited */ +void pw_context_start_profiler(struct pw_context *context); +/** stop the profiler */ +void pw_context_stop_profiler(struct pw_context *context); + /** * \} */ diff --git a/src/pipewire/impl-client.c b/src/pipewire/impl-client.c index 556567dac..d7d46ca0d 100644 --- a/src/pipewire/impl-client.c +++ b/src/pipewire/impl-client.c @@ -713,7 +713,7 @@ int pw_impl_client_update_permissions(struct pw_impl_client *client, if (context->current_client == client) new_perm &= old_perm; - pw_log_debug("%p: set default permissions %08x -> %08x", + pw_log_info("%p: set default permissions %08x -> %08x", client, old_perm, new_perm); def->permissions = new_perm; @@ -748,7 +748,7 @@ int pw_impl_client_update_permissions(struct pw_impl_client *client, if (context->current_client == client) new_perm &= old_perm; - pw_log_debug("%p: set global %d permissions %08x -> %08x", + pw_log_info("%p: set global %d permissions %08x -> %08x", client, global->id, old_perm, new_perm); p->permissions = new_perm; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index a3a9ae905..ecf343bb5 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -1249,11 +1249,12 @@ static inline int process_node(void *data) /* calculate CPU time when finished */ a->signal_time = this->driver_start; calculate_stats(this, a); - pw_context_driver_emit_complete(this->context, this); + pw_impl_node_rt_emit_complete(this); +// pw_context_driver_emit_complete(this->context, this); } if (SPA_UNLIKELY(status & SPA_STATUS_DRAINED)) - pw_context_driver_emit_drained(this->context, this); + pw_impl_node_rt_emit_drained(this); return status; } @@ -1394,6 +1395,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, spa_list_init(&this->peer_list); spa_hook_list_init(&this->listener_list); + spa_hook_list_init(&this->rt_listener_list); this->info.state = PW_NODE_STATE_CREATING; this->info.props = &this->properties->dict; @@ -1726,7 +1728,7 @@ static inline void update_position(struct pw_impl_node *node, int all_ready, uin pw_log_warn("(%s-%u) sync timeout, going to RUNNING", node->name, node->info.id); check_states(node, nsec); - pw_context_driver_emit_timeout(node->context, node); + pw_impl_node_rt_emit_timeout(node); all_ready = true; } if (all_ready) @@ -1775,7 +1777,7 @@ static int node_ready(void *data, int status) state, a->position.clock.duration, state->pending, state->required); check_states(node, nsec); - pw_context_driver_emit_incomplete(node->context, node); + pw_impl_node_rt_emit_incomplete(node); } /* This update is done too late, the driver should do this @@ -1841,7 +1843,7 @@ again: update_position(node, all_ready, nsec); - pw_context_driver_emit_start(node->context, node); + pw_impl_node_rt_emit_start(node); } /* this should not happen, driver nodes that are not currently driving * should not emit the ready callback */ @@ -1904,7 +1906,7 @@ static int node_xrun(void *data, uint64_t trigger, uint64_t delay, struct spa_po missed); } - pw_context_driver_emit_xrun(this->context, this); + pw_impl_node_rt_emit_xrun(this); return 0; } @@ -1954,6 +1956,50 @@ void pw_impl_node_add_listener(struct pw_impl_node *node, spa_hook_list_append(&node->listener_list, listener, events, data); } +struct listener_data { + struct spa_hook *listener; + const struct pw_impl_node_rt_events *events; + void *data; +}; + +static int +do_add_rt_listener(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct pw_impl_node *node = user_data; + const struct listener_data *d = data; + spa_hook_list_append(&node->rt_listener_list, + d->listener, d->events, d->data); + return 0; +} + +SPA_EXPORT +void pw_impl_node_add_rt_listener(struct pw_impl_node *node, + struct spa_hook *listener, + const struct pw_impl_node_rt_events *events, + void *data) +{ + struct listener_data d = { .listener = listener, .events = events, .data = data }; + pw_loop_invoke(node->data_loop, + do_add_rt_listener, SPA_ID_INVALID, &d, sizeof(d), false, node); +} + +static int do_remove_listener(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct spa_hook *listener = user_data; + spa_hook_remove(listener); + return 0; +} + +SPA_EXPORT +void pw_impl_node_remove_rt_listener(struct pw_impl_node *node, + struct spa_hook *listener) +{ + pw_loop_invoke(node->data_loop, + do_remove_listener, SPA_ID_INVALID, NULL, 0, true, listener); +} + /** Destroy a node * \param node a node to destroy * diff --git a/src/pipewire/impl-node.h b/src/pipewire/impl-node.h index 1d0919621..091e58531 100644 --- a/src/pipewire/impl-node.h +++ b/src/pipewire/impl-node.h @@ -75,6 +75,23 @@ struct pw_impl_node_events { void (*peer_removed) (void *data, struct pw_impl_node *peer); }; +struct pw_impl_node_rt_events { +#define PW_VERSION_IMPL_NODE_RT_EVENTS 0 + uint32_t version; + /** the node is drained */ + void (*drained) (void *data); + /** the node had an xrun */ + void (*xrun) (void *data); + /** the driver node starts processing */ + void (*start) (void *data); + /** the driver node completed processing */ + void (*complete) (void *data); + /** the driver node did not complete processing */ + void (*incomplete) (void *data); + /** the node had */ + void (*timeout) (void *data); +}; + /** Create a new node */ struct pw_impl_node * pw_context_create_node(struct pw_context *context, /**< the context */ @@ -118,6 +135,14 @@ void pw_impl_node_add_listener(struct pw_impl_node *node, const struct pw_impl_node_events *events, void *data); +/** Add an rt_event listener */ +void pw_impl_node_add_rt_listener(struct pw_impl_node *node, + struct spa_hook *listener, + const struct pw_impl_node_rt_events *events, + void *data); +void pw_impl_node_remove_rt_listener(struct pw_impl_node *node, + struct spa_hook *listener); + /** Iterate the ports in the given direction. The callback should return * 0 to fetch the next item, any other value stops the iteration and returns * the value. When all callbacks return 0, this function returns 0 when all diff --git a/src/pipewire/private.h b/src/pipewire/private.h index d8977173e..e53b4065a 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -360,39 +360,6 @@ int pw_loop_check(struct pw_loop *loop); } \ }) -#define pw_context_driver_emit(c,m,v,...) spa_hook_list_call_simple(&c->driver_listener_list, struct pw_context_driver_events, m, v, ##__VA_ARGS__) -#define pw_context_driver_emit_start(c,n) pw_context_driver_emit(c, start, 0, n) -#define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n) -#define pw_context_driver_emit_incomplete(c,n) pw_context_driver_emit(c, incomplete, 0, n) -#define pw_context_driver_emit_timeout(c,n) pw_context_driver_emit(c, timeout, 0, n) -#define pw_context_driver_emit_drained(c,n) pw_context_driver_emit(c, drained, 0, n) -#define pw_context_driver_emit_complete(c,n) pw_context_driver_emit(c, complete, 0, n) - -struct pw_context_driver_events { -#define PW_VERSION_CONTEXT_DRIVER_EVENTS 0 - uint32_t version; - - /** The driver graph is started */ - void (*start) (void *data, struct pw_impl_node *node); - /** The driver under/overruns */ - void (*xrun) (void *data, struct pw_impl_node *node); - /** The driver could not complete the graph */ - void (*incomplete) (void *data, struct pw_impl_node *node); - /** The driver got a sync timeout */ - void (*timeout) (void *data, struct pw_impl_node *node); - /** a node drained */ - void (*drained) (void *data, struct pw_impl_node *node); - /** The driver completed the graph */ - void (*complete) (void *data, struct pw_impl_node *node); -}; - -void pw_context_driver_add_listener(struct pw_context *context, - struct spa_hook *listener, - const struct pw_context_driver_events *events, - void *data); -void pw_context_driver_remove_listener(struct pw_context *context, - struct spa_hook *listener); - #define pw_registry_resource(r,m,v,...) pw_resource_call(r, struct pw_registry_events,m,v,##__VA_ARGS__) #define pw_registry_resource_global(r,...) pw_registry_resource(r,global,0,__VA_ARGS__) #define pw_registry_resource_global_remove(r,...) pw_registry_resource(r,global_remove,0,__VA_ARGS__) @@ -404,6 +371,7 @@ void pw_context_driver_remove_listener(struct pw_context *context, #define pw_context_emit_check_access(c,cl) pw_context_emit(c, check_access, 0, cl) #define pw_context_emit_global_added(c,g) pw_context_emit(c, global_added, 0, g) #define pw_context_emit_global_removed(c,g) pw_context_emit(c, global_removed, 0, g) +#define pw_context_emit_profiler(c,n) pw_context_emit(c, profiler, 1, n) struct pw_context { struct pw_impl_core *core; /**< core object */ @@ -459,6 +427,8 @@ struct pw_context { long sc_pagesize; unsigned int freewheeling:1; + int profiling; + void *user_data; /**< extra user data */ }; @@ -651,6 +621,14 @@ struct pw_node_activation { #define pw_impl_node_emit_peer_added(n,p) pw_impl_node_emit(n, peer_added, 0, p) #define pw_impl_node_emit_peer_removed(n,p) pw_impl_node_emit(n, peer_removed, 0, p) +#define pw_impl_node_rt_emit(o,m,v,...) spa_hook_list_call(&o->rt_listener_list, struct pw_impl_node_rt_events, m, v, ##__VA_ARGS__) +#define pw_impl_node_rt_emit_drained(n) pw_impl_node_rt_emit(n, drained, 0) +#define pw_impl_node_rt_emit_xrun(n) pw_impl_node_rt_emit(n, xrun, 0) +#define pw_impl_node_rt_emit_start(n) pw_impl_node_rt_emit(n, start, 0) +#define pw_impl_node_rt_emit_complete(n) pw_impl_node_rt_emit(n, complete, 0) +#define pw_impl_node_rt_emit_incomplete(n) pw_impl_node_rt_emit(n, incomplete, 0) +#define pw_impl_node_rt_emit_timeout(n) pw_impl_node_rt_emit(n, timeout, 0) + struct pw_impl_node { struct pw_context *context; /**< context object */ struct spa_list link; /**< link in context node_list */ @@ -719,6 +697,7 @@ struct pw_impl_node { struct pw_map output_port_map; /**< map from port_id to port */ struct spa_hook_list listener_list; + struct spa_hook_list rt_listener_list; struct pw_loop *data_loop; /**< the data loop for this node */ struct spa_system *data_system; @@ -751,6 +730,7 @@ struct pw_impl_node { uint64_t target_quantum; uint64_t driver_start; + struct spa_hook profiler_listener; void *user_data; /**< extra user data */ }; @@ -1067,6 +1047,7 @@ struct pw_stream { struct pw_impl_node *node; struct spa_hook node_listener; + struct spa_hook node_rt_listener; struct spa_list controls; }; diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 067ecad4a..befef74b7 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -83,7 +83,6 @@ struct stream { const char *path; struct pw_context *context; - struct spa_hook context_listener; struct pw_loop *main_loop; struct pw_loop *data_loop; @@ -479,6 +478,7 @@ do_call_drained(struct spa_loop *loop, static void call_drained(struct stream *impl) { + pw_log_info("%p: drained", impl); pw_loop_invoke(impl->main_loop, do_call_drained, 1, NULL, 0, false, impl); } @@ -1388,6 +1388,7 @@ static void node_event_destroy(void *data) struct pw_stream *stream = data; struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); spa_hook_remove(&stream->node_listener); + pw_impl_node_remove_rt_listener(stream->node, &stream->node_rt_listener); stream->node = NULL; impl->data_loop = NULL; } @@ -1439,11 +1440,9 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; -static void context_drained(void *data, struct pw_impl_node *node) +static void node_drained(void *data) { struct stream *impl = data; - if (impl->this.node != node) - return; if (impl->draining && impl->drained) { impl->draining = false; if (impl->io != NULL) @@ -1452,9 +1451,9 @@ static void context_drained(void *data, struct pw_impl_node *node) } } -static const struct pw_context_driver_events context_events = { - PW_VERSION_CONTEXT_DRIVER_EVENTS, - .drained = context_drained, +static const struct pw_impl_node_rt_events node_rt_events = { + PW_VERSION_IMPL_NODE_RT_EVENTS, + .drained = node_drained, }; struct match { @@ -1542,9 +1541,6 @@ stream_new(struct pw_context *context, const char *name, impl->allow_mlock = context->settings.mem_allow_mlock; impl->warn_mlock = context->settings.mem_warn_mlock; - pw_context_driver_add_listener(impl->context, - &impl->context_listener, - &context_events, impl); return impl; error_properties: @@ -1710,9 +1706,6 @@ void pw_stream_destroy(struct pw_stream *stream) spa_hook_list_clean(&impl->hooks); spa_hook_list_clean(&stream->listener_list); - pw_context_driver_remove_listener(impl->context, - &impl->context_listener); - if (impl->data.context) pw_context_destroy(impl->data.context); @@ -2121,6 +2114,8 @@ pw_stream_connect(struct pw_stream *stream, pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream); pw_impl_node_add_listener(stream->node, &stream->node_listener, &node_events, stream); + pw_impl_node_add_rt_listener(stream->node, &stream->node_rt_listener, + &node_rt_events, stream); return 0;