diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index 6dc2e1a59..d61b1820b 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -1321,8 +1321,7 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, if (this->passive && str == NULL) pw_properties_set(properties, PW_KEY_LINK_PASSIVE, "true"); - impl->async = !output_node->driver && - (output_node->async || input_node->async) && + impl->async = (output_node->async || input_node->async) && SPA_FLAG_IS_SET(output->flags, PW_IMPL_PORT_FLAG_ASYNC) && SPA_FLAG_IS_SET(input->flags, PW_IMPL_PORT_FLAG_ASYNC); diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 9df27ab75..b04f6bd1a 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -2003,10 +2003,14 @@ static int node_ready(void *data, int status) struct pw_impl_node *node = data; struct pw_impl_node *driver = node->driver_node; struct pw_node_activation *a = node->rt.target.activation; + struct pw_node_activation_state *state = &a->state[0]; struct spa_system *data_system = node->rt.target.system; struct pw_node_target *t, *reposition_target = NULL;; struct pw_impl_port *p; - uint64_t nsec; + struct spa_io_clock *cl = &node->rt.position->clock; + int sync_type, all_ready, update_sync, target_sync, old_status; + uint32_t owner[2], reposition_owner, pending; + uint64_t min_timeout = UINT64_MAX, nsec; pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d prepared:%d", node, node->driver, node->exported, driver, status, node->rt.prepared); @@ -2019,131 +2023,118 @@ static int node_ready(void *data, int status) pw_log_info("%p: ready non-active node %s in state %d", node, node->name, node->info.state); return -EIO; } + if (SPA_UNLIKELY(node != driver)) { + pw_log_warn("%p: ready non-driver node %s", node, node->name); + return -EIO; + } nsec = get_time_ns(data_system); - if (SPA_LIKELY(node == driver)) { - struct pw_node_activation_state *state = &a->state[0]; - struct spa_io_clock *cl = &node->rt.position->clock; - int sync_type, all_ready, update_sync, target_sync; - uint32_t owner[2], reposition_owner; - uint64_t min_timeout = UINT64_MAX; - int32_t pending; - int old_status; - - if (SPA_UNLIKELY((pending = pw_node_activation_state_xchg(state)) > 0)) { - pw_impl_node_rt_emit_incomplete(driver); - old_status = SPA_ATOMIC_LOAD(a->status); - if (old_status != PW_NODE_ACTIVATION_FINISHED) { - SPA_ATOMIC_STORE(a->status, PW_NODE_ACTIVATION_TRIGGERED); - SPA_FLAG_SET(cl->flags, SPA_IO_CLOCK_FLAG_XRUN_RECOVER); - process_node(node, nsec); - SPA_FLAG_CLEAR(cl->flags, SPA_IO_CLOCK_FLAG_XRUN_RECOVER); - debug_xrun_graph(node, nsec); - } + if (SPA_UNLIKELY((pending = pw_node_activation_state_xchg(state)) > 0)) { + pw_impl_node_rt_emit_incomplete(driver); + old_status = SPA_ATOMIC_LOAD(a->status); + if (old_status != PW_NODE_ACTIVATION_FINISHED) { + SPA_ATOMIC_STORE(a->status, PW_NODE_ACTIVATION_TRIGGERED); + SPA_FLAG_SET(cl->flags, SPA_IO_CLOCK_FLAG_XRUN_RECOVER); + process_node(node, nsec); + SPA_FLAG_CLEAR(cl->flags, SPA_IO_CLOCK_FLAG_XRUN_RECOVER); + debug_xrun_graph(node, nsec); } + } - /* This update is done too late, the driver should do this - * before calling the ready callback so that it can use the new target - * duration and rate to schedule the next update. We do this here to - * help drivers that don't support this yet */ - if (SPA_UNLIKELY(cl->duration != cl->target_duration || - cl->rate.denom != cl->target_rate.denom)) { - pw_log_warn("driver %s did not update duration/rate (%"PRIu64"/%"PRIu64" %u/%u)", - node->name, - cl->duration, cl->target_duration, - cl->rate.denom, cl->target_rate.denom); - cl->duration = cl->target_duration; - cl->rate = cl->target_rate; - } + /* This update is done too late, the driver should do this + * before calling the ready callback so that it can use the new target + * duration and rate to schedule the next update. We do this here to + * help drivers that don't support this yet */ + if (SPA_UNLIKELY(cl->duration != cl->target_duration || + cl->rate.denom != cl->target_rate.denom)) { + pw_log_warn("driver %s did not update duration/rate (%"PRIu64"/%"PRIu64" %u/%u)", + node->name, + cl->duration, cl->target_duration, + cl->rate.denom, cl->target_rate.denom); + cl->duration = cl->target_duration; + cl->rate = cl->target_rate; + } - sync_type = check_updates(node, &reposition_owner); - owner[0] = SPA_ATOMIC_LOAD(a->segment_owner[0]); - owner[1] = SPA_ATOMIC_LOAD(a->segment_owner[1]); + sync_type = check_updates(node, &reposition_owner); + owner[0] = SPA_ATOMIC_LOAD(a->segment_owner[0]); + owner[1] = SPA_ATOMIC_LOAD(a->segment_owner[1]); again: - all_ready = sync_type == SYNC_CHECK; - update_sync = !all_ready; - target_sync = sync_type == SYNC_START ? true : false; + all_ready = sync_type == SYNC_CHECK; + update_sync = !all_ready; + target_sync = sync_type == SYNC_START ? true : false; - spa_list_for_each(t, &driver->rt.target_list, link) { - struct pw_node_activation *ta = t->activation; - uint32_t id = t->id; + spa_list_for_each(t, &driver->rt.target_list, link) { + struct pw_node_activation *ta = t->activation; + uint32_t id = t->id; - ta->driver_id = driver->info.id; + ta->driver_id = driver->info.id; retry_status: - pw_node_activation_state_reset(&ta->state[0]); - /* we don't change the state of inactive nodes and don't use them - * for reposition. The pending will be at least 1 and they might - * get decremented to 0 but since the status is inactive, we don't - * do the atomic CAS from NOT_TRIGGERED to TRIGGERED and we don't - * write the eventfd. */ - old_status = SPA_ATOMIC_LOAD(ta->status); - if (SPA_UNLIKELY(old_status == PW_NODE_ACTIVATION_INACTIVE)) - continue; + pw_node_activation_state_reset(&ta->state[0]); + /* we don't change the state of inactive nodes and don't use them + * for reposition. The pending will be at least 1 and they might + * get decremented to 0 but since the status is inactive, we don't + * do the atomic CAS from NOT_TRIGGERED to TRIGGERED and we don't + * write the eventfd. */ + old_status = SPA_ATOMIC_LOAD(ta->status); + if (SPA_UNLIKELY(old_status == PW_NODE_ACTIVATION_INACTIVE)) + continue; - /* if this fails, the node might just have stopped and we need to retry */ - if (SPA_UNLIKELY(!SPA_ATOMIC_CAS(ta->status, old_status, PW_NODE_ACTIVATION_NOT_TRIGGERED))) - goto retry_status; + /* if this fails, the node might just have stopped and we need to retry */ + if (SPA_UNLIKELY(!SPA_ATOMIC_CAS(ta->status, old_status, PW_NODE_ACTIVATION_NOT_TRIGGERED))) + goto retry_status; - if (old_status == PW_NODE_ACTIVATION_TRIGGERED || - old_status == PW_NODE_ACTIVATION_AWAKE) { - update_xrun_stats(ta, 1, nsec / 1000, 0); - debug_xrun_target(node, t, old_status, nsec); - } - - /* this is the node with reposition info */ - if (SPA_UNLIKELY(id == reposition_owner)) - reposition_target = t; - - /* update extra segment info if it is the owner */ - if (SPA_UNLIKELY(id == owner[0])) - a->position.segments[0].bar = ta->segment.bar; - if (SPA_UNLIKELY(id == owner[1])) - a->position.segments[0].video = ta->segment.video; - - min_timeout = SPA_MIN(min_timeout, ta->sync_timeout); - - if (SPA_UNLIKELY(update_sync)) { - ta->pending_sync = target_sync; - ta->pending_new_pos = target_sync; - } else { - all_ready &= ta->pending_sync == false; - } + if (old_status == PW_NODE_ACTIVATION_TRIGGERED || + old_status == PW_NODE_ACTIVATION_AWAKE) { + update_xrun_stats(ta, 1, nsec / 1000, 0); + debug_xrun_target(node, t, old_status, nsec); } - a->prev_signal_time = a->signal_time; - node->driver_start = nsec; + /* this is the node with reposition info */ + if (SPA_UNLIKELY(id == reposition_owner)) + reposition_target = t; - a->sync_timeout = SPA_MIN(min_timeout, DEFAULT_SYNC_TIMEOUT); + /* update extra segment info if it is the owner */ + if (SPA_UNLIKELY(id == owner[0])) + a->position.segments[0].bar = ta->segment.bar; + if (SPA_UNLIKELY(id == owner[1])) + a->position.segments[0].video = ta->segment.video; - if (SPA_UNLIKELY(reposition_target != NULL)) { - do_reposition(node, reposition_target); - sync_type = SYNC_START; - reposition_owner = 0; - reposition_target = NULL; - goto again; + min_timeout = SPA_MIN(min_timeout, ta->sync_timeout); + + if (SPA_UNLIKELY(update_sync)) { + ta->pending_sync = target_sync; + ta->pending_new_pos = target_sync; + } else { + all_ready &= ta->pending_sync == false; } - - update_position(node, all_ready, nsec); - - a->position.clock.cycle++; - pw_impl_node_rt_emit_start(node); } - /* this should not happen, driver nodes that are not currently driving - * should not emit the ready callback */ - if (SPA_UNLIKELY(node->driver && !node->driving)) - return 0; - if (SPA_UNLIKELY(!node->driver)) { - /* legacy, nodes should directly resume the graph by calling - * the peer eventfd directly, node_ready is only for drivers */ - a->status = PW_NODE_ACTIVATION_FINISHED; - a->finish_time = nsec; + a->prev_signal_time = a->signal_time; + node->driver_start = nsec; + + a->sync_timeout = SPA_MIN(min_timeout, DEFAULT_SYNC_TIMEOUT); + + if (SPA_UNLIKELY(reposition_target != NULL)) { + do_reposition(node, reposition_target); + sync_type = SYNC_START; + reposition_owner = 0; + reposition_target = NULL; + goto again; } + + update_position(node, all_ready, nsec); + + /* move output with previous cycle, this makes the async nodes + * pick up the new data immediately */ if (status & SPA_STATUS_HAVE_DATA) { spa_list_for_each(p, &node->rt.output_mix, rt.node_link) spa_node_process_fast(p->mix); } + + a->position.clock.cycle++; + pw_impl_node_rt_emit_start(node); + /* now signal all the nodes we drive */ trigger_targets(node, status, nsec); return 0;