filter-chain: implement filter-graph latency

Collect the latency of the graph in filter-chain. We do this by first
inspecting the LATENCY ports on the plugins and us the notify value as
the latency on the node.

We then walk the graph from source to sink and for each node take the
max latency of all linked upstream peer nodes. We end up with the max
latency of the graph and emit this in the graph properties.

We then listen for the graph latency property and use that to update the
process_latency of the filter-chain, which will then update the latency
on the filter-chain ports.

Fixes #4678
This commit is contained in:
Wim Taymans 2025-05-07 15:56:12 +02:00
parent d277b3b62e
commit e545efdb6e
2 changed files with 118 additions and 49 deletions

View file

@ -117,6 +117,9 @@ struct node {
unsigned int n_deps;
uint32_t latency_index;
float latency;
unsigned int disabled:1;
unsigned int control_changed:1;
@ -138,6 +141,7 @@ struct graph_port {
const struct spa_fga_descriptor *desc;
void **hndl;
uint32_t port;
struct node *node;
unsigned next:1;
};
@ -194,6 +198,8 @@ struct graph {
uint32_t outputs_position[SPA_AUDIO_MAX_CHANNELS];
uint32_t n_outputs_position;
float latency;
unsigned activated:1;
unsigned setup:1;
};
@ -245,7 +251,7 @@ static void emit_filter_graph_info(struct impl *impl, bool full)
if (full)
impl->info.change_mask = impl->info_all;
if (impl->info.change_mask || full) {
char n_inputs[64], n_outputs[64];
char n_inputs[64], n_outputs[64], latency[64];
struct spa_dict_item items[6];
struct spa_dict dict = SPA_DICT(items, 0);
char in_pos[SPA_AUDIO_MAX_CHANNELS * 8];
@ -266,6 +272,8 @@ static void emit_filter_graph_info(struct impl *impl, bool full)
graph->n_outputs_position, graph->outputs_position);
items[dict.n_items++] = SPA_DICT_ITEM("outputs.audio.position", out_pos);
}
items[dict.n_items++] = SPA_DICT_ITEM("latency",
spa_dtoa(latency, sizeof(latency), graph->latency));
impl->info.props = &dict;
spa_filter_graph_emit_info(&impl->hooks, &impl->info);
impl->info.props = NULL;
@ -1344,6 +1352,7 @@ static int load_node(struct graph *graph, struct spa_json *json)
node->graph = graph;
node->desc = desc;
snprintf(node->name, sizeof(node->name), "%s", name);
node->latency_index = SPA_IDX_INVALID;
node->input_port = calloc(desc->n_input, sizeof(struct port));
node->output_port = calloc(desc->n_output, sizeof(struct port));
@ -1385,6 +1394,8 @@ static int load_node(struct graph *graph, struct spa_json *json)
port->idx = i;
port->external = SPA_ID_INVALID;
port->p = desc->notify[i];
if (desc->desc->ports[port->p].hint & SPA_FGA_HINT_LATENCY)
node->latency_index = i;
spa_list_init(&port->link_list);
}
if (have_config)
@ -1480,6 +1491,36 @@ static int impl_deactivate(void *object)
return 0;
}
static void sort_reset(struct graph *graph)
{
struct node *node;
spa_list_for_each(node, &graph->node_list, link) {
node->sorted = false;
node->n_sort_deps = node->n_deps;
}
}
static struct node *sort_next_node(struct graph *graph)
{
struct node *node;
spa_list_for_each(node, &graph->node_list, link) {
if (node->n_sort_deps == 0 && !node->sorted) {
uint32_t i;
struct link *link;
node->sorted = true;
for (i = 0; i < node->desc->n_output; i++) {
spa_list_for_each(link, &node->output_port[i].link_list, output_link)
link->input->node->n_sort_deps--;
}
for (i = 0; i < node->desc->n_notify; i++) {
spa_list_for_each(link, &node->notify_port[i].link_list, output_link)
link->input->node->n_sort_deps--;
}
return node;
}
}
return NULL;
}
static int setup_graph(struct graph *graph);
static int impl_activate(void *object, const struct spa_dict *props)
@ -1494,7 +1535,7 @@ static int impl_activate(void *object, const struct spa_dict *props)
const struct spa_fga_plugin *p;
uint32_t i, j, max_samples = impl->quantum_limit, n_ports;
int res;
float *sd, *dd, *data;
float *sd, *dd, *data, latency;
const char *rate, *str;
if (graph->activated)
@ -1527,7 +1568,6 @@ static int impl_activate(void *object, const struct spa_dict *props)
if ((res = setup_graph(graph)) < 0)
return res;
graph->setup = true;
emit_filter_graph_info(impl, false);
}
/* first make instances */
@ -1618,7 +1658,35 @@ static int impl_activate(void *object, const struct spa_dict *props)
d->control_changed(node->hndl[i]);
}
}
/* calculate latency */
sort_reset(graph);
while ((node = sort_next_node(graph)) != NULL) {
latency = 0.0f;
for (i = 0; i < node->desc->n_input; i++) {
spa_list_for_each(link, &node->input_port[i].link_list, input_link)
latency = fmaxf(latency, link->output->node->latency);
}
if (node->latency_index != SPA_IDX_INVALID) {
port = &node->notify_port[node->latency_index];
latency += port->control_data[0];
}
node->latency = latency;
spa_log_debug(impl->log, "%s latency:%f", node->name, latency);
}
latency = 0.0f;
for (i = 0; i < graph->n_outputs; i++) {
struct graph_port *port = &graph->output[i];
latency = fmaxf(latency, port->node->latency);
}
if (graph->latency != latency) {
graph->latency = latency;
impl->info.change_mask |= SPA_FILTER_GRAPH_CHANGE_MASK_PROPS;
spa_log_info(impl->log, "graph latency:%f", latency);
}
emit_filter_graph_info(impl, false);
spa_filter_graph_emit_props_changed(&impl->hooks, SPA_DIRECTION_INPUT);
return 0;
error:
@ -1626,18 +1694,6 @@ error:
return res;
}
static struct node *find_next_node(struct graph *graph)
{
struct node *node;
spa_list_for_each(node, &graph->node_list, link) {
if (node->n_sort_deps == 0 && !node->sorted) {
node->sorted = true;
return node;
}
}
return NULL;
}
static void unsetup_graph(struct graph *graph)
{
free(graph->input);
@ -1653,7 +1709,6 @@ static int setup_graph(struct graph *graph)
struct impl *impl = graph->impl;
struct node *node, *first, *last;
struct port *port;
struct link *link;
struct graph_port *gp;
struct graph_hndl *gh;
uint32_t i, j, n, n_input, n_output, n_hndl = 0;
@ -1758,6 +1813,7 @@ static int setup_graph(struct graph *graph)
spa_log_info(impl->log, "input port %s[%d]:%s",
first->name, i, d->ports[desc->input[j]].name);
gp->desc = d;
gp->node = first;
gp->hndl = &first->hndl[i];
gp->port = desc->input[j];
}
@ -1806,6 +1862,7 @@ static int setup_graph(struct graph *graph)
peer->external = graph->n_input;
gp = &graph->input[graph->n_input++];
gp->desc = peer->node->desc->desc;
gp->node = peer->node;
gp->hndl = &peer->node->hndl[i];
gp->port = peer->p;
gp->next = true;
@ -1822,6 +1879,7 @@ static int setup_graph(struct graph *graph)
port->external = graph->n_input;
gp = &graph->input[graph->n_input++];
gp->desc = d;
gp->node = port->node;
gp->hndl = &port->node->hndl[i];
gp->port = port->p;
gp->next = false;
@ -1837,6 +1895,7 @@ static int setup_graph(struct graph *graph)
spa_log_info(impl->log, "output port %s[%d]:%s",
last->name, i, d->ports[desc->output[j]].name);
gp->desc = d;
gp->node = last;
gp->hndl = &last->hndl[i];
gp->port = desc->output[j];
}
@ -1871,6 +1930,7 @@ static int setup_graph(struct graph *graph)
port->node->name, i, d->ports[port->p].name);
port->external = graph->n_output;
gp->desc = d;
gp->node = port->node;
gp->hndl = &port->node->hndl[i];
gp->port = port->p;
}
@ -1882,14 +1942,8 @@ static int setup_graph(struct graph *graph)
graph->n_hndl = 0;
graph->hndl = calloc(graph->n_nodes * n_hndl, sizeof(struct graph_hndl));
/* order all nodes based on dependencies, first reset fields */
spa_list_for_each(node, &graph->node_list, link) {
node->sorted = false;
node->n_sort_deps = node->n_deps;
}
while (true) {
if ((node = find_next_node(graph)) == NULL)
break;
sort_reset(graph);
while ((node = sort_next_node(graph)) != NULL) {
node->n_hndl = n_hndl;
desc = node->desc;
d = desc->desc;
@ -1901,14 +1955,6 @@ static int setup_graph(struct graph *graph)
gh->desc = d;
}
}
for (i = 0; i < desc->n_output; i++) {
spa_list_for_each(link, &node->output_port[i].link_list, output_link)
link->input->node->n_sort_deps--;
}
for (i = 0; i < desc->n_notify; i++) {
spa_list_for_each(link, &node->notify_port[i].link_list, output_link)
link->input->node->n_sort_deps--;
}
for (i = 0; i < desc->n_control; i++) {
/* any default values for the controls are set in the first instance
* of the control data. Duplicate this to the other instances now. */

View file

@ -975,14 +975,34 @@ static void param_latency_changed(struct impl *impl, const struct spa_pod *param
pw_stream_update_params(impl->playback, params, 1);
}
static void param_process_latency_changed(struct impl *impl, const struct spa_pod *param,
enum spa_direction direction)
static void update_process_latency(struct impl *impl)
{
uint8_t buffer[1024];
struct spa_pod_builder b;
struct spa_process_latency_info process_latency;
struct spa_latency_info latency;
const struct spa_pod *params[1];
const struct spa_pod *params[2];
spa_pod_builder_init(&b, buffer, sizeof(buffer));
latency = impl->latency[SPA_DIRECTION_INPUT];
spa_process_latency_info_add(&impl->process_latency, &latency);
params[0] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
if (impl->playback)
pw_stream_update_params(impl->playback, params, 1);
latency = impl->latency[SPA_DIRECTION_OUTPUT];
spa_process_latency_info_add(&impl->process_latency, &latency);
params[0] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
params[1] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &impl->process_latency);
if (impl->capture)
pw_stream_update_params(impl->capture, params, 2);
}
static void param_process_latency_changed(struct impl *impl, const struct spa_pod *param,
enum spa_direction direction)
{
struct spa_process_latency_info process_latency;
if (param == NULL)
spa_zero(process_latency);
@ -990,19 +1010,7 @@ static void param_process_latency_changed(struct impl *impl, const struct spa_po
return;
impl->process_latency = process_latency;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
latency = impl->latency[SPA_DIRECTION_INPUT];
spa_process_latency_info_add(&process_latency, &latency);
params[0] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
pw_stream_update_params(impl->playback, params, 1);
latency = impl->latency[SPA_DIRECTION_OUTPUT];
spa_process_latency_info_add(&process_latency, &latency);
params[0] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
pw_stream_update_params(impl->capture, params, 1);
update_process_latency(impl);
}
static void param_tag_changed(struct impl *impl, const struct spa_pod *param,
@ -1282,6 +1290,9 @@ static void copy_position(struct spa_audio_info_raw *dst, const struct spa_audio
static void graph_info(void *object, const struct spa_filter_graph_info *info)
{
struct impl *impl = object;
struct spa_dict *props = info->props;
uint32_t i;
if (impl->capture_info.channels == 0)
impl->capture_info.channels = info->n_inputs;
if (impl->playback_info.channels == 0)
@ -1294,6 +1305,18 @@ static void graph_info(void *object, const struct spa_filter_graph_info *info)
copy_position(&impl->capture_info, &impl->playback_info);
copy_position(&impl->playback_info, &impl->capture_info);
}
for (i = 0; props && i < props->n_items; i++) {
const char *k = props->items[i].key;
const char *s = props->items[i].value;
pw_log_info("%s %s", k, s);
if (spa_streq(k, "latency")) {
double latency;
if (spa_atod(s, &latency)) {
impl->process_latency.rate = (int32_t)latency;
update_process_latency(impl);
}
}
}
}
static void graph_apply_props(void *object, enum spa_direction direction, const struct spa_pod *props)