diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 45524e7b6..3772dca4a 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -162,13 +162,12 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq, spa_loop_add_source(loop, &this->source); } - SPA_ATOMIC_STORE(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED); - spa_list_for_each(t, &this->rt.target_list, link) { /* we can now trigger ourself */ if (t->id == this->info.id || this->server_prepare) activate_target(this, t); } + SPA_ATOMIC_STORE(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED); this->rt.trigger_targets = trigger_activate_targets; this->rt.prepared = true; @@ -1283,7 +1282,7 @@ static const char *str_status(uint32_t status) return "unknown"; } -static void update_xrun_stats(struct pw_node_activation *a, uint32_t count, uint64_t trigger, uint64_t delay) +static inline void update_xrun_stats(struct pw_node_activation *a, uint32_t count, uint64_t trigger, uint64_t delay) { a->xrun_count += count; a->xrun_time = trigger; @@ -1291,16 +1290,40 @@ static void update_xrun_stats(struct pw_node_activation *a, uint32_t count, uint a->max_delay = SPA_MAX(a->max_delay, delay); } -static void check_states(struct pw_impl_node *driver, uint64_t nsec) +static inline void debug_xrun_target(struct pw_impl_node *driver, + struct pw_node_target *t, int status, uint64_t nsec) { - struct pw_node_target *t; - struct pw_node_activation *na = driver->rt.target.activation; - struct spa_io_clock *cl = &na->position.clock; - enum spa_log_level level = SPA_LOG_LEVEL_DEBUG; + struct pw_node_activation *a = t->activation; + struct pw_node_activation_state *state = &a->state[0]; int suppressed; + enum spa_log_level level = SPA_LOG_LEVEL_DEBUG; if ((suppressed = spa_ratelimit_test(&driver->rt.rate_limit, nsec)) >= 0) - level = SPA_LOG_LEVEL_INFO; + level = SPA_LOG_LEVEL_WARN; + + pw_log(level, "(%s-%u) xrun state:%p pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64 + " waiting:%"PRIu64" process:%"PRIu64" status:%s (%d suppressed)", + t->name, t->id, state, + state->pending, state->required, + a->signal_time, + a->awake_time, + a->finish_time, + a->awake_time - a->signal_time, + a->finish_time - a->awake_time, + str_status(status), suppressed); +} + +static inline void debug_xrun_graph(struct pw_impl_node *driver, uint64_t nsec) +{ + int suppressed; + enum spa_log_level level = SPA_LOG_LEVEL_DEBUG; + struct pw_node_target *t; + + if ((suppressed = spa_ratelimit_test(&driver->rt.rate_limit, nsec)) >= 0) + level = SPA_LOG_LEVEL_WARN; + + pw_log(level, "(%s-%u) graph xrun (%d suppressed)", + driver->name, driver->info.id, suppressed); spa_list_for_each(t, &driver->rt.target_list, link) { struct pw_node_activation *a = t->activation; @@ -1311,16 +1334,42 @@ static void check_states(struct pw_impl_node *driver, uint64_t nsec) if (a->status == PW_NODE_ACTIVATION_TRIGGERED || a->status == PW_NODE_ACTIVATION_AWAKE) { - update_xrun_stats(a, 1, nsec / 1000, 0); + pw_log(level, "(%s-%u) xrun state:%p pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64 + " waiting:%"PRIu64" process:%"PRIu64" status:%s", + t->name, t->id, state, + state->pending, state->required, + a->signal_time, + a->awake_time, + a->finish_time, + a->awake_time - a->signal_time, + a->finish_time - a->awake_time, + str_status(a->status)); - pw_log(level, "(%s-%u) client too slow! rate:%u/%u pos:%"PRIu64" status:%s (%u suppressed)", - t->name, t->id, - (uint32_t)(cl->rate.num * cl->duration), cl->rate.denom, - cl->position, str_status(a->status), - suppressed); } - pw_log_debug("(%s-%u) state:%p pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64 - " waiting:%"PRIu64" process:%"PRIu64" status:%s sync:%d", + } +} + +static void debug_sync_timeout(struct pw_impl_node *driver, uint64_t nsec) +{ + struct pw_node_target *t; + enum spa_log_level level = SPA_LOG_LEVEL_DEBUG; + int suppressed; + + if ((suppressed = spa_ratelimit_test(&driver->rt.rate_limit, nsec)) >= 0) + level = SPA_LOG_LEVEL_WARN; + + pw_log(level, "(%s-%u) sync timeout, going to RUNNING (%d suppressed)", + driver->name, driver->info.id, suppressed); + + spa_list_for_each(t, &driver->rt.target_list, link) { + struct pw_node_activation *a = t->activation; + struct pw_node_activation_state *state = &a->state[0]; + + if (!a->pending_sync) + continue; + + pw_log(level, "(%s-%u) sync state:%p pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64 + " waiting:%"PRIu64" process:%"PRIu64" status:%s", t->name, t->id, state, state->pending, state->required, a->signal_time, @@ -1328,7 +1377,7 @@ static void check_states(struct pw_impl_node *driver, uint64_t nsec) a->finish_time, a->awake_time - a->signal_time, a->finish_time - a->awake_time, - str_status(a->status), a->pending_sync); + str_status(a->status)); } } @@ -1358,25 +1407,23 @@ static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_ac * * This code runs on the client and the server, depending on where the node is. */ -static inline int process_node(void *data) +static inline int process_node(void *data, uint64_t nsec) { struct pw_impl_node *this = data; struct pw_impl_port *p; struct pw_node_activation *a = this->rt.target.activation; struct spa_system *data_system = this->rt.target.system; int status, old_status; - uint64_t nsec; if (!SPA_ATOMIC_CAS(a->status, PW_NODE_ACTIVATION_TRIGGERED, PW_NODE_ACTIVATION_AWAKE)) return 0; - nsec = get_time_ns(data_system); + a->awake_time = nsec; pw_log_trace_fp("%p: %s process remote:%u exported:%u %"PRIu64" %"PRIu64, this, this->name, this->remote, this->exported, a->signal_time, nsec); - a->awake_time = nsec; /* when transport sync is not supported, just clear the flag */ if (SPA_UNLIKELY(!this->transport_sync)) @@ -1405,11 +1452,11 @@ static inline int process_node(void *data) a->state[0].status = status; nsec = get_time_ns(data_system); - - pw_log_trace_fp("%p: finished status:%d %"PRIu64, this, status, nsec); old_status = SPA_ATOMIC_XCHG(a->status, PW_NODE_ACTIVATION_FINISHED); a->finish_time = nsec; + pw_log_trace_fp("%p: finished status:%d %"PRIu64, this, status, nsec); + /* we don't need to trigger targets when the node was driving the * graph because that means we finished the graph. */ if (SPA_LIKELY(!this->driving)) { @@ -1444,21 +1491,24 @@ static void node_on_fd_events(struct spa_source *source) return; } if (SPA_LIKELY(source->rmask & SPA_IO_IN)) { - uint64_t cmd; + uint64_t cmd, nsec; struct spa_system *data_system = this->rt.target.system; + nsec = get_time_ns(data_system); + if (SPA_UNLIKELY(spa_system_eventfd_read(data_system, this->source.fd, &cmd) < 0)) pw_log_warn("%p: read failed %m", this); else if (SPA_UNLIKELY(cmd > 1)) { pw_log_info("(%s-%u) client missed %"PRIu64" wakeups", this->name, this->info.id, cmd - 1); update_xrun_stats(this->rt.target.activation, cmd - 1, - get_time_ns(data_system) / 1000, 0); + nsec / 1000, 0); } - pw_log_trace_fp("%p: remote:%u exported:%u %s got process", this, this->remote, - this->exported, this->name); - process_node(this); + pw_log_trace_fp("%p: remote:%u exported:%u %s got process %"PRIu64, + this, this->remote, this->exported, this->name, nsec); + + process_node(this, nsec); } } @@ -1900,10 +1950,8 @@ static inline void update_position(struct pw_impl_node *node, int all_ready, uin if (SPA_UNLIKELY(a->position.state == SPA_IO_POSITION_STATE_STARTING)) { if (!all_ready && --a->sync_left == 0) { - pw_log_warn("(%s-%u) sync timeout, going to RUNNING", - node->name, node->info.id); - check_states(node, nsec); pw_impl_node_rt_emit_timeout(node); + debug_sync_timeout(node, nsec); all_ready = true; } if (all_ready) @@ -1949,16 +1997,16 @@ static int node_ready(void *data, int status) uint32_t owner[2], reposition_owner; uint64_t min_timeout = UINT64_MAX; int32_t pending; + int old_status; if (SPA_UNLIKELY((pending = pw_node_activation_state_xchg(state)) > 0)) { - pw_log_debug("(%s-%u) graph not finished: state:%p quantum:%"PRIu64 - " pending %d/%d cycle:%u", node->name, node->info.id, - state, a->position.clock.duration, - pending, state->required, a->position.clock.cycle); - SPA_ATOMIC_STORE(a->status, PW_NODE_ACTIVATION_TRIGGERED); - process_node(node); - check_states(node, nsec); - pw_impl_node_rt_emit_incomplete(node); + pw_impl_node_rt_emit_incomplete(driver); + old_status = SPA_ATOMIC_LOAD(a->status); + if (old_status != PW_NODE_ACTIVATION_FINISHED) { + SPA_ATOMIC_STORE(a->status, PW_NODE_ACTIVATION_TRIGGERED); + process_node(node, nsec); + debug_xrun_graph(node, nsec); + } } /* This update is done too late, the driver should do this @@ -1988,17 +2036,26 @@ again: uint32_t id = t->id; ta->driver_id = driver->info.id; - +retry_status: pw_node_activation_state_reset(&ta->state[0]); /* we don't change the state of inactive nodes and don't use them * for reposition. The pending will be at least 1 and they might * get decremented to 0 but since the status is inactive, we don't * do the atomic CAS from NOT_TRIGGERED to TRIGGERED and we don't * write the eventfd. */ - if (SPA_ATOMIC_LOAD(ta->status) == PW_NODE_ACTIVATION_INACTIVE) + old_status = SPA_ATOMIC_LOAD(ta->status); + if (SPA_UNLIKELY(old_status == PW_NODE_ACTIVATION_INACTIVE)) continue; - SPA_ATOMIC_STORE(ta->status, PW_NODE_ACTIVATION_NOT_TRIGGERED); + /* if this fails, the node might just have stopped and we need to retry */ + if (SPA_UNLIKELY(!SPA_ATOMIC_CAS(ta->status, old_status, PW_NODE_ACTIVATION_NOT_TRIGGERED))) + goto retry_status; + + if (old_status == PW_NODE_ACTIVATION_TRIGGERED || + old_status == PW_NODE_ACTIVATION_AWAKE) { + update_xrun_stats(ta, 1, nsec / 1000, 0); + debug_xrun_target(node, t, old_status, nsec); + } /* this is the node with reposition info */ if (SPA_UNLIKELY(id == reposition_owner))