impl-link: add 1 quantum latency for async links

Update the scheduling doc with some information about how async
scheduling works. Also add something about the latency.

Async links add 1 quantum of latency so take that into account when
aggregating latencies.

Also a source directly linked to an async node does not add latency
(we evaluate the tee before incrementing the cycle so that it effectively
is executed in the previous cycle and consumed immediately by async
nodes). We can do this because the driver source always provides data
before the async node, and never concurrently.

Add a listener to the link for the node driver change as well because
that can now influence the latency for async nodes.
This commit is contained in:
Wim Taymans 2025-09-15 17:36:20 +02:00
parent f89428d9f8
commit 4dccddd564
4 changed files with 151 additions and 10 deletions

View file

@ -52,8 +52,6 @@ struct impl {
struct pw_properties *properties;
struct spa_io_buffers io[2];
bool async;
};
/** \endcond */
@ -799,7 +797,7 @@ int pw_impl_link_activate(struct pw_impl_link *this)
!impl->input.node->runnable || !impl->output.node->runnable)
return 0;
if (impl->async) {
if (this->async) {
io_type = SPA_IO_AsyncBuffers;
io_size = sizeof(struct spa_io_async_buffers);
} else {
@ -1200,16 +1198,29 @@ static void node_active_changed(void *data, bool active)
pw_impl_link_prepare(&impl->this);
}
static void node_driver_changed(void *data, struct pw_impl_node *old, struct pw_impl_node *driver)
{
struct impl *impl = data;
if (impl->this.async) {
/* for async links, input and output port latency depends on if the
* output node is directly driving the input node. */
pw_impl_port_recalc_latency(impl->output.port);
pw_impl_port_recalc_latency(impl->input.port);
}
}
static const struct pw_impl_node_events input_node_events = {
PW_VERSION_IMPL_NODE_EVENTS,
.result = input_node_result,
.active_changed = node_active_changed,
.driver_changed = node_driver_changed,
};
static const struct pw_impl_node_events output_node_events = {
PW_VERSION_IMPL_NODE_EVENTS,
.result = output_node_result,
.active_changed = node_active_changed,
.driver_changed = node_driver_changed,
};
static bool pw_impl_node_can_reach(struct pw_impl_node *output, struct pw_impl_node *input, int hop)
@ -1496,11 +1507,11 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
if (this->passive && str == NULL)
pw_properties_set(properties, PW_KEY_LINK_PASSIVE, "true");
impl->async = (output_node->async || input_node->async) &&
this->async = (output_node->async || input_node->async) &&
SPA_FLAG_IS_SET(output->flags, PW_IMPL_PORT_FLAG_ASYNC) &&
SPA_FLAG_IS_SET(input->flags, PW_IMPL_PORT_FLAG_ASYNC);
if (impl->async)
if (this->async)
pw_properties_set(properties, PW_KEY_LINK_ASYNC, "true");
spa_hook_list_init(&this->listener_list);
@ -1551,7 +1562,7 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
pw_log_info("(%s) (%s) -> (%s) async:%d:%d:%d:%04x:%04x:%d", this->name, output_node->name,
input_node->name, output_node->driving,
output_node->async, input_node->async,
output->flags, input->flags, impl->async);
output->flags, input->flags, this->async);
pw_impl_port_emit_link_added(output, this);
pw_impl_port_emit_link_added(input, this);

View file

@ -1679,7 +1679,7 @@ int pw_impl_port_for_each_link(struct pw_impl_port *port,
int pw_impl_port_recalc_latency(struct pw_impl_port *port)
{
struct pw_impl_link *l;
struct spa_latency_info latency, *current;
struct spa_latency_info latency, *current, other_latency;
struct pw_impl_port *other;
struct spa_pod *param;
struct spa_pod_builder b = { 0 };
@ -1702,7 +1702,14 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port)
port->info.id, other->info.id);
continue;
}
spa_latency_info_combine(&latency, &other->latency[other->direction]);
other_latency = other->latency[other->direction];
if (l->async && other->node->driver_node != port->node) {
/* we add 1 cycle delay from async links */
other_latency.min_quantum++;
other_latency.max_quantum++;
}
spa_latency_info_combine(&latency, &other_latency);
pw_log_debug("port %d: peer %d: latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
port->info.id, other->info.id,
latency.min_quantum, latency.max_quantum,
@ -1718,7 +1725,15 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port)
port->info.id, other->info.id);
continue;
}
spa_latency_info_combine(&latency, &other->latency[other->direction]);
other_latency = other->latency[other->direction];
if (l->async && other->node != port->node->driver_node) {
/* we only add 1 cycle delay for async links that
* are not from our driver */
other_latency.min_quantum++;
other_latency.max_quantum++;
}
spa_latency_info_combine(&latency, &other_latency);
pw_log_debug("port %d: peer %d: latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
port->info.id, other->info.id,
latency.min_quantum, latency.max_quantum,

View file

@ -1022,6 +1022,7 @@ struct pw_impl_link {
void *user_data;
unsigned int async:1;
unsigned int registered:1;
unsigned int feedback:1;
unsigned int preparing:1;