tools: add Latency reporting to pw-link

Rework how the monitor mode works. Instead of having separate paths for
the list and monitor mode, reuse the list mode. We simply mark all
changes and then list the changes in a loop.

This makes it possible to accumulate some updates and print them
together.

Add a -t option to list the latency params on a port.
This commit is contained in:
Wim Taymans 2025-09-03 14:38:03 +02:00
parent 0f1e4f8706
commit 0877eba761
2 changed files with 168 additions and 110 deletions

View file

@ -4,7 +4,7 @@ The PipeWire Link Command
# SYNOPSIS
**pw-link** \[*options*\] -o-l \[*out-pattern*\] \[*in-pattern*\]
**pw-link** \[*options*\] -o|-i|-l|-t \[*out-pattern*\] \[*in-pattern*\]
**pw-link** \[*options*\] *output* *input*
@ -42,6 +42,9 @@ List input ports
\par -l | \--links
List links
\par -t | \--latency
List port latencies
\par -m | \--monitor
Monitor links and ports. **pw-link** will not exit but monitor and print
new and destroyed ports or links.

View file

@ -14,6 +14,7 @@
#include <spa/utils/string.h>
#include <spa/utils/defs.h>
#include <spa/debug/file.h>
#include <spa/param/latency-utils.h>
#include <pipewire/pipewire.h>
#include <pipewire/filter.h>
@ -38,11 +39,24 @@ union object_data {
struct object {
struct spa_list link;
struct data *d;
uint32_t id;
enum object_type type;
struct pw_properties *props;
union object_data data;
#define STATE_NONE 0
#define STATE_NEW 1
#define STATE_CHANGED 2
#define STATE_DELETE 3
int state;
struct pw_proxy *proxy;
struct spa_hook proxy_listener;
struct spa_hook object_listener;
struct spa_latency_info latency[2];
bool latency_changed[2];
};
struct target_link {
@ -65,6 +79,7 @@ enum list_target {
LIST_INPUT = 1 << 1,
LIST_PORTS = LIST_OUTPUT | LIST_INPUT,
LIST_LINKS = 1 << 2,
LIST_LATENCY = 1 << 3,
};
struct data {
@ -98,12 +113,20 @@ struct data {
bool monitoring;
bool list_inputs;
bool list_outputs;
const char *prefix;
regex_t out_port_regex, *out_regex;
regex_t in_port_regex, *in_regex;
};
static void destroy_object(struct object *obj)
{
spa_list_remove(&obj->link);
pw_properties_free(obj->props);
if (obj->proxy)
pw_proxy_destroy(obj->proxy);
free(obj);
}
static void link_event(struct target_link *tl, enum pw_link_state state, int result)
{
/* Ignore non definitive states (negotiating, allocating, etc). */
@ -183,6 +206,23 @@ static void core_sync(struct data *data)
data->sync = pw_core_sync(data->core, PW_ID_CORE, data->sync);
}
static const char *state_name(struct data *d, struct object *o)
{
if (!d->opt_monitor)
return "";
switch (o->state) {
case STATE_NONE:
return " ";
case STATE_NEW:
return "+";
case STATE_CHANGED:
return "*";
case STATE_DELETE:
return "-";
}
return " ";
}
static struct object *find_object(struct data *data, enum object_type type, uint32_t id)
{
struct object *o;
@ -274,37 +314,58 @@ static char *port_alias(char *buffer, int size, struct object *n, struct object
return buffer;
}
static void print_port(struct data *data, const char *prefix, struct object *n,
struct object *p, bool verbose)
static void print_port(struct data *data, const char *prefix, const char *state,
struct object *n, struct object *p, bool verbose)
{
char buffer[1024], id[64] = "";
const char *prefix2 = "";
if (state == NULL)
state = state_name(data, p);
if (data->opt_id) {
snprintf(id, sizeof(id), "%4d ", p->id);
prefix2 = " ";
}
printf("%s%s%s%s\n", data->prefix, prefix,
printf("%s%s%s%s\n", state, prefix,
id, port_name(buffer, sizeof(buffer), n, p));
if (verbose) {
port_path(buffer, sizeof(buffer), n, p);
if (buffer[0] != '\0')
printf("%s %s%s%s\n", data->prefix, prefix2, prefix, buffer);
printf("%s %s%s%s\n", state, prefix2, prefix, buffer);
port_alias(buffer, sizeof(buffer), n, p);
if (buffer[0] != '\0')
printf("%s %s%s%s\n", data->prefix, prefix2, prefix, buffer);
printf("%s %s%s%s\n", state, prefix2, prefix, buffer);
}
}
static void print_port_id(struct data *data, const char *prefix, uint32_t peer)
static void print_port_id(struct data *data, const char *prefix, uint32_t peer, struct object *l)
{
struct object *n, *p;
if ((p = find_object(data, OBJECT_PORT, peer)) == NULL)
return;
if ((n = find_object(data, OBJECT_NODE, p->data.port.node)) == NULL)
return;
print_port(data, prefix, n, p, false);
print_port(data, prefix, state_name(data, l), n, p, false);
}
static void print_port_latency(struct data *data, const char *prefix,
struct object *p, enum spa_direction direction)
{
const char *state;
struct spa_latency_info *info = &p->latency[direction];
if (p->state == STATE_NONE || p->state == STATE_CHANGED)
state = p->latency_changed[direction] ? "*" : "=";
else
state = state_name(data, p);
printf("%s%s %s latency: { quantum=[ %f %f ], rate=[ %d %d ], ns=[ %"PRIi64" %"PRIi64" ] }\n",
state, prefix, direction == SPA_DIRECTION_INPUT ? "input ": "output",
info->min_quantum, info->max_quantum,
info->min_rate, info->max_rate, info->min_ns, info->max_ns);
p->latency_changed[direction] = false;
}
static void do_list_port_links(struct data *data, struct object *node, struct object *port)
@ -339,10 +400,10 @@ static void do_list_port_links(struct data *data, struct object *node, struct ob
continue;
if (first) {
print_port(data, "", node, port, data->opt_verbose);
print_port(data, "", NULL, node, port, data->opt_verbose);
first = false;
}
print_port_id(data, prefix, peer);
print_port_id(data, prefix, peer, o);
}
}
@ -389,6 +450,8 @@ static void do_list_ports(struct data *data, struct object *node,
spa_list_for_each(o, &data->objects, link) {
if (o->type != OBJECT_PORT)
continue;
if (o->state == STATE_NONE)
continue;
if (o->data.port.node != node->id)
continue;
if (o->data.port.direction != direction)
@ -398,7 +461,11 @@ static void do_list_ports(struct data *data, struct object *node,
continue;
if (data->opt_list & LIST_PORTS)
print_port(data, "", node, o, data->opt_verbose);
print_port(data, "", NULL, node, o, data->opt_verbose);
if (data->opt_list & LIST_LATENCY) {
print_port_latency(data, "", o, SPA_DIRECTION_INPUT);
print_port_latency(data, "", o, SPA_DIRECTION_OUTPUT);
}
if (data->opt_list & LIST_LINKS)
do_list_port_links(data, node, o);
}
@ -406,7 +473,7 @@ static void do_list_ports(struct data *data, struct object *node,
static void do_list(struct data *data)
{
struct object *n;
struct object *n, *t;
spa_list_for_each(n, &data->objects, link) {
if (n->type != OBJECT_NODE)
@ -415,6 +482,13 @@ static void do_list(struct data *data)
do_list_ports(data, n, PW_DIRECTION_OUTPUT, data->out_regex);
if (data->list_inputs)
do_list_ports(data, n, PW_DIRECTION_INPUT, data->in_regex);
}
spa_list_for_each_safe(n, t, &data->objects, link) {
if (n->state == STATE_DELETE)
destroy_object(n);
else
n->state = STATE_NONE;
}
}
@ -598,64 +672,49 @@ static int do_unlink_ports(struct data *data)
return 0;
}
static int do_monitor_port(struct data *data, struct object *port)
static void
removed_proxy (void *data)
{
regex_t *regex = NULL;
bool do_print = false;
struct object *node;
if (port->data.port.direction == PW_DIRECTION_OUTPUT && data->list_outputs) {
regex = data->out_regex;
do_print = true;
}
if (port->data.port.direction == PW_DIRECTION_INPUT && data->list_inputs) {
regex = data->in_regex;
do_print = true;
}
if (!do_print)
return 0;
if ((node = find_object(data, OBJECT_NODE, port->data.port.node)) == NULL)
return -ENOENT;
if (regex && !port_regex(data, node, port, regex))
return 0;
print_port(data, "", node, port, data->opt_verbose);
return 0;
struct object *obj = data;
pw_proxy_destroy(obj->proxy);
}
static int do_monitor_link(struct data *data, struct object *link)
static void
destroy_proxy (void *data)
{
char buffer1[1024], buffer2[1024], id[64] = "";
struct object *n1, *n2, *p1, *p2;
if (!(data->opt_list & LIST_LINKS))
return 0;
if ((p1 = find_object(data, OBJECT_PORT, link->data.link.output_port)) == NULL)
return -ENOENT;
if ((n1 = find_object(data, OBJECT_NODE, p1->data.port.node)) == NULL)
return -ENOENT;
if (data->out_regex && !port_regex(data, n1, p1, data->out_regex))
return 0;
if ((p2 = find_object(data, OBJECT_PORT, link->data.link.input_port)) == NULL)
return -ENOENT;
if ((n2 = find_object(data, OBJECT_NODE, p2->data.port.node)) == NULL)
return -ENOENT;
if (data->in_regex && !port_regex(data, n2, p2, data->in_regex))
return 0;
if (data->opt_id)
snprintf(id, sizeof(id), "%4d ", link->id);
printf("%s%s%s -> %s\n", data->prefix, id,
port_name(buffer1, sizeof(buffer1), n1, p1),
port_name(buffer2, sizeof(buffer2), n2, p2));
return 0;
struct object *obj = data;
spa_hook_remove(&obj->proxy_listener);
spa_hook_remove(&obj->object_listener);
obj->proxy = NULL;
}
static const struct pw_proxy_events proxy_events = {
PW_VERSION_PROXY_EVENTS,
.removed = removed_proxy,
.destroy = destroy_proxy,
};
static void port_event_param(void *_data, int seq, uint32_t id,
uint32_t index, uint32_t next, const struct spa_pod *param)
{
struct object *obj = _data;
struct spa_latency_info info;
if (id != SPA_PARAM_Latency ||
spa_latency_parse(param, &info) < 0)
return;
obj->latency[info.direction] = info;
if (obj->state == STATE_NONE)
obj->state = STATE_CHANGED;
obj->latency_changed[info.direction] = true;
core_sync(obj->d);
}
static const struct pw_port_events port_events = {
PW_VERSION_PORT_EVENTS,
.param = port_event_param
};
static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
const char *type, uint32_t version,
const struct spa_dict *props)
@ -663,7 +722,7 @@ static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
struct data *d = data;
enum object_type t;
union object_data extra = {0};
struct object *obj;
struct object *obj, *p;
const char *str;
if (props == NULL)
@ -698,6 +757,12 @@ static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
if ((str = spa_dict_lookup(props, PW_KEY_LINK_INPUT_PORT)) == NULL)
return;
extra.link.input_port = atoi(str);
if ((p = find_object(d, OBJECT_PORT, extra.link.output_port)) != NULL)
if (p->state == STATE_NONE)
p->state = STATE_CHANGED;
if ((p = find_object(d, OBJECT_PORT, extra.link.input_port)) != NULL)
if (p->state == STATE_NONE)
p->state = STATE_CHANGED;
} else
return;
@ -706,57 +771,42 @@ static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
obj->id = id;
obj->props = pw_properties_new_dict(props);
obj->data = extra;
obj->state = STATE_NEW;
obj->d = d;
spa_list_append(&d->objects, &obj->link);
if (d->monitoring) {
d->prefix = "+ ";
switch (obj->type) {
case OBJECT_ANY:
spa_assert_not_reached();
case OBJECT_NODE:
break;
case OBJECT_PORT:
do_monitor_port(d, obj);
break;
case OBJECT_LINK:
do_monitor_link(d, obj);
break;
}
switch (obj->type) {
case OBJECT_PORT:
{
uint32_t subs[] = { SPA_PARAM_Latency };
obj->proxy = pw_registry_bind(d->registry, id, type, PW_VERSION_PORT, 0);
pw_proxy_add_object_listener(obj->proxy, &obj->object_listener, &port_events, obj);
pw_proxy_add_listener(obj->proxy, &obj->proxy_listener, &proxy_events, obj);
pw_port_subscribe_params((struct pw_port*)obj->proxy, subs, SPA_N_ELEMENTS(subs));
break;
}
}
static void destroy_object(struct object *obj)
{
spa_list_remove(&obj->link);
pw_properties_free(obj->props);
free(obj);
default:
break;
}
core_sync(d);
}
static void registry_event_global_remove(void *data, uint32_t id)
{
struct data *d = data;
struct object *obj;
struct object *obj, *p;
if ((obj = find_object(d, OBJECT_ANY, id)) == NULL)
return;
if (d->monitoring) {
d->prefix = "- ";
switch (obj->type) {
case OBJECT_ANY:
spa_assert_not_reached();
case OBJECT_NODE:
break;
case OBJECT_PORT:
do_monitor_port(d, obj);
break;
case OBJECT_LINK:
do_monitor_link(d, obj);
break;
}
if (obj->type == OBJECT_LINK) {
if ((p = find_object(d, OBJECT_PORT, obj->data.link.output_port)) != NULL)
p->state = STATE_CHANGED;
if ((p = find_object(d, OBJECT_PORT, obj->data.link.input_port)) != NULL)
p->state = STATE_CHANGED;
}
destroy_object(obj);
obj->state = STATE_DELETE;
core_sync(d);
}
static const struct pw_registry_events registry_events = {
@ -806,6 +856,7 @@ static const struct pw_core_events core_events = {
static void do_quit(void *userdata, int signal_number)
{
struct data *data = userdata;
data->opt_monitor = false;
pw_main_loop_quit(data->loop);
}
@ -896,6 +947,7 @@ static int run(int argc, char *argv[])
{ "props", required_argument, NULL, 'p' },
{ "wait", no_argument, NULL, 'w' },
{ "disconnect", no_argument, NULL, 'd' },
{ "latency", no_argument, NULL, 't' },
{ NULL, 0, NULL, 0}
};
@ -905,7 +957,7 @@ static int run(int argc, char *argv[])
return -1;
}
while ((c = getopt_long(argc, argv, "hVr:oilmIvLPp:wd", long_options, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "hVr:oilmIvLPp:wdt", long_options, NULL)) != -1) {
switch (c) {
case 'h':
show_help(&data, argv[0], false);
@ -933,6 +985,10 @@ static int run(int argc, char *argv[])
data.opt_mode = MODE_LIST;
data.opt_list |= LIST_LINKS;
break;
case 't':
data.opt_mode = MODE_LIST;
data.opt_list |= LIST_LATENCY;
break;
case 'm':
data.opt_monitor = true;
break;
@ -1033,8 +1089,6 @@ static int run(int argc, char *argv[])
&data.registry_listener,
&registry_events, &data);
data.prefix = data.opt_monitor ? "= " : "";
core_sync(&data);
pw_main_loop_run(data.loop);
@ -1071,6 +1125,7 @@ static int run(int argc, char *argv[])
}
if (data.nb_links > 0) {
core_sync(&data);
pw_main_loop_run(data.loop);
struct target_link *tl;
@ -1085,10 +1140,10 @@ static int run(int argc, char *argv[])
break;
}
if (data.opt_monitor) {
data.monitoring = true;
while (data.opt_monitor) {
pw_main_loop_run(data.loop);
data.monitoring = false;
if (data.opt_monitor)
do_list(&data);
}
return 0;