diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index a3b057d49..a496f8d31 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -67,6 +67,19 @@ struct resource_data { /** \endcond */ +/* Called from the node data loop when a node needs to be scheduled by + * the given driver. 3 things needs to happen: + * + * - the node is added to the driver target list and the required state + * is incremented. This makes sure the node is woken up when the driver + * starts a new cycle. + * - the node needs to trigger the driver when it completes. This means + * the driver is added to the target list. + * - the node targets (including the driver we added above) have their + * required state incremented. + * + * This code is called from the data-loop to ensure synchronization + */ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver) { struct pw_node_activation_state *dstate, *nstate; @@ -78,13 +91,7 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver) pw_log_trace("%p: add to driver %p %p %p", this, driver, driver->rt.activation, this->rt.activation); - /* signal the driver */ - this->rt.driver_target.activation = driver->rt.activation; - this->rt.driver_target.node = driver; - this->rt.driver_target.system = driver->data_system; - this->rt.driver_target.fd = driver->source.fd; - spa_list_append(&this->rt.target_list, &this->rt.driver_target.link); - + /* let the driver trigger us as part of the processing cycle */ spa_list_append(&driver->rt.target_list, &this->rt.target.link); nstate = &this->rt.activation->state[0]; if (!this->rt.target.active) { @@ -92,6 +99,15 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver) this->rt.target.active = true; } + /* trigger the driver when we complete */ + this->rt.driver_target.activation = driver->rt.activation; + this->rt.driver_target.node = driver; + this->rt.driver_target.system = driver->data_system; + this->rt.driver_target.fd = driver->source.fd; + spa_list_append(&this->rt.target_list, &this->rt.driver_target.link); + + /* now increment the required states of all this node targets, including + * the driver we added above */ spa_list_for_each(t, &this->rt.target_list, link) { dstate = &t->activation->state[0]; if (!t->active) { @@ -104,6 +120,7 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver) } } +/* called from the data loop and undoes the changes done in add_node. */ static void remove_node(struct pw_impl_node *this) { struct pw_node_activation_state *dstate, *nstate; @@ -156,6 +173,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, pw_log_warn("%p: read failed %m", this); this->added = true; + /* remote nodes have their source added in client-node instead */ if (!this->remote) spa_loop_add_source(loop, &this->source); add_node(this, driver); @@ -1097,18 +1115,19 @@ static inline uint64_t get_time_ns(struct spa_system *system) return SPA_TIMESPEC_TO_NSEC(&ts); } -static inline void node_signal(struct pw_impl_node *this) +static inline void node_trigger(struct pw_impl_node *this) { pw_log_trace_fp("node %p %s", this, this->name); if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->source.fd, 1) < 0)) pw_log_warn("node %p: write failed %m", this); } -static inline int resume_node(struct pw_impl_node *this, int status, uint64_t nsec) +/* called from data-loop when all the targets of a node need to be triggered */ +static inline int trigger_targets(struct pw_impl_node *this, int status, uint64_t nsec) { struct pw_node_target *t; - pw_log_trace_fp("%p: %s trigger peers %"PRIu64, this, this->name, nsec); + pw_log_trace_fp("%p: %s trigger targets %"PRIu64, this, this->name, nsec); spa_list_for_each(t, &this->rt.target_list, link) { struct pw_node_activation *a = t->activation; @@ -1127,20 +1146,11 @@ static inline int resume_node(struct pw_impl_node *this, int status, uint64_t ns return 0; } -static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_activation *a) -{ - uint64_t signal_time = a->signal_time; - uint64_t prev_signal_time = a->prev_signal_time; - if (SPA_LIKELY(signal_time > prev_signal_time)) { - uint64_t process_time = a->finish_time - a->signal_time; - uint64_t period_time = signal_time - prev_signal_time; - float load = (float) process_time / (float) period_time; - a->cpu_load[0] = (a->cpu_load[0] + load) / 2.0f; - a->cpu_load[1] = (a->cpu_load[1] * 7.0f + load) / 8.0f; - a->cpu_load[2] = (a->cpu_load[2] * 31.0f + load) / 32.0f; - } -} - +/* The main processing entry point of a node. This is called from the data-loop and usually + * as a result of signaling the eventfd of the node. + * + * This code runs on the client and the server, depending on where the node is. + */ static inline int process_node(void *data) { struct pw_impl_node *this = data; @@ -1161,11 +1171,14 @@ static inline int process_node(void *data) a->pending_sync = false; if (SPA_LIKELY(this->added)) { + /* process input mixers */ spa_list_for_each(p, &this->rt.input_mix, rt.node_link) spa_node_process_fast(p->mix); + /* process the actual node */ status = spa_node_process_fast(this->node); + /* process output tee */ if (status & SPA_STATUS_HAVE_DATA) { spa_list_for_each(p, &this->rt.output_mix, rt.node_link) spa_node_process_fast(p->mix); @@ -1173,7 +1186,7 @@ static inline int process_node(void *data) } else { /* This can happen when we deactivated the node but some links are * still not shut down. We simply don't schedule the node and make - * sure we trigger the peers in resume_node below. */ + * sure we trigger the peers in trigger_targets below. */ pw_log_debug("%p: scheduling non-active node %s", this, this->name); status = SPA_STATUS_HAVE_DATA; } @@ -1185,8 +1198,12 @@ static inline int process_node(void *data) a->status = PW_NODE_ACTIVATION_FINISHED; a->finish_time = nsec; + /* we don't need to trigger targets when the node was driving the + * graph because that means we finished the graph. Also don't schedule + * peers when the node returns OK, because that means the resume will + * happen asynchronously later (unimplemented though). */ if (SPA_LIKELY(!this->driving && status != SPA_STATUS_OK)) - resume_node(this, status, nsec); + trigger_targets(this, status, nsec); if (SPA_UNLIKELY(status & SPA_STATUS_DRAINED)) pw_context_driver_emit_drained(this->context, this); @@ -1203,7 +1220,7 @@ int pw_impl_node_trigger(struct pw_impl_node *node) uint64_t nsec = get_time_ns(node->data_system); a->status = PW_NODE_ACTIVATION_TRIGGERED; a->signal_time = nsec; - node_signal(node); + node_trigger(node); } return 0; } @@ -1298,6 +1315,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, this->properties = properties; + /* the eventfd used to signal the node */ if ((res = spa_system_eventfd_create(this->data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0) goto error_clean; @@ -1584,6 +1602,20 @@ static const struct spa_node_events node_events = { .event = node_event, }; +static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_activation *a) +{ + uint64_t signal_time = a->signal_time; + uint64_t prev_signal_time = a->prev_signal_time; + if (SPA_LIKELY(signal_time > prev_signal_time)) { + uint64_t process_time = a->finish_time - a->signal_time; + uint64_t period_time = signal_time - prev_signal_time; + float load = (float) process_time / (float) period_time; + a->cpu_load[0] = (a->cpu_load[0] + load) / 2.0f; + a->cpu_load[1] = (a->cpu_load[1] * 7.0f + load) / 8.0f; + a->cpu_load[2] = (a->cpu_load[2] * 31.0f + load) / 32.0f; + } +} + #define SYNC_CHECK 0 #define SYNC_START 1 #define SYNC_STOP 2 @@ -1672,6 +1704,9 @@ static inline void update_position(struct pw_impl_node *node, int all_ready) a->position.offset += a->position.clock.duration; } +/* Called from the data-loop and it is the starting point for driver nodes. + * Most of the logic here is to check for reposition updates and transport changes. + */ static int node_ready(void *data, int status) { struct pw_impl_node *node = data, *reposition_node = NULL; @@ -1712,7 +1747,7 @@ static int node_ready(void *data, int status) state->pending, state->required); dump_states(node); } - node_signal(node); + node_trigger(node); } else { uint64_t signal_time = a->signal_time; /* old nodes set the TRIGGERED status on node_ready, patch this @@ -1788,6 +1823,8 @@ again: } a->status = PW_NODE_ACTIVATION_TRIGGERED; + /* remote nodes set the signal_time before writing the ready + * eventfd */ if (!node->remote) a->signal_time = nsec; impl->prev_signal_time = a->prev_signal_time; @@ -1807,6 +1844,8 @@ again: pw_context_driver_emit_start(node->context, node); } + /* this should not happen, driver nodes that are not currently driving + * should not emit the ready callback */ if (SPA_UNLIKELY(node->driver && !node->driving)) return 0; @@ -1818,11 +1857,12 @@ again: } if (!node->remote && (status & SPA_STATUS_HAVE_DATA)) { /* remote nodes have done the output mix already before - * they triggered the ready event */ + * they wrote the ready eventfd */ spa_list_for_each(p, &node->rt.output_mix, rt.node_link) spa_node_process_fast(p->mix); } - return resume_node(node, status, nsec); + /* now signal all the nodes we drive */ + return trigger_targets(node, status, nsec); } static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)