Merge branch 'mini-eq/export-notify-controls' into 'master'

filter-chain: export notify controls

See merge request pipewire/pipewire!2823
This commit is contained in:
bhack 2026-05-27 10:37:11 +00:00
commit b8117cb7a7
6 changed files with 1618 additions and 11 deletions

View file

@ -64,7 +64,7 @@ struct spa_filter_graph_events {
};
struct spa_filter_graph_methods {
#define SPA_VERSION_FILTER_GRAPH_METHODS 0
#define SPA_VERSION_FILTER_GRAPH_METHODS 1
uint32_t version;
int (*add_listener) (void *object,
@ -83,6 +83,10 @@ struct spa_filter_graph_methods {
int (*reset) (void *object);
int (*process) (void *object, const void *in[], void *out[], uint32_t n_samples);
int (*enum_notify_info) (void *object, uint32_t idx, struct spa_pod_builder *b,
struct spa_pod **param);
int (*get_notify) (void *object, struct spa_pod_builder *b, struct spa_pod **props);
};
SPA_API_FILTER_GRAPH int spa_filter_graph_add_listener(struct spa_filter_graph *object,
@ -138,6 +142,19 @@ SPA_API_FILTER_GRAPH int spa_filter_graph_process(struct spa_filter_graph *objec
spa_filter_graph, &object->iface, process, 0, in, out, n_samples);
}
SPA_API_FILTER_GRAPH int spa_filter_graph_enum_notify_info(struct spa_filter_graph *object,
uint32_t idx, struct spa_pod_builder *b, struct spa_pod **param)
{
return spa_api_method_r(int, -ENOTSUP,
spa_filter_graph, &object->iface, enum_notify_info, 1, idx, b, param);
}
SPA_API_FILTER_GRAPH int spa_filter_graph_get_notify(struct spa_filter_graph *object,
struct spa_pod_builder *b, struct spa_pod **props)
{
return spa_api_method_r(int, -ENOTSUP,
spa_filter_graph, &object->iface, get_notify, 1, b, props);
}
/**
* \}
*/
@ -147,4 +164,3 @@ SPA_API_FILTER_GRAPH int spa_filter_graph_process(struct spa_filter_graph *objec
#endif
#endif /* SPA_FILTER_GRAPH_H */

View file

@ -44,6 +44,7 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.filter-graph");
#define MAX_CHANNELS SPA_AUDIO_MAX_CHANNELS
#define DEFAULT_RATE 48000
#define DEFAULT_EXPORT_INTERVAL_MS 33u
#define spa_filter_graph_emit(hooks,method,version,...) \
spa_hook_list_call_simple(hooks, struct spa_filter_graph_events, \
@ -189,12 +190,19 @@ struct graph {
uint32_t n_control;
struct port **control_port;
uint32_t n_notify;
struct port **notify_port;
uint32_t n_input_names;
char **input_names;
uint32_t n_output_names;
char **output_names;
uint32_t n_export_control_names;
char **export_control_names;
uint32_t notify_interval_ms;
struct volume volume[2];
uint32_t default_inputs;
@ -267,7 +275,8 @@ static void emit_filter_graph_info(struct impl *impl, bool full)
if (impl->info.change_mask || full) {
char n_inputs[64], n_outputs[64], latency[64];
char n_default_inputs[64], n_default_outputs[64];
struct spa_dict_item items[6];
char export_interval[64];
struct spa_dict_item items[8];
struct spa_dict dict = SPA_DICT(items, 0);
char in_pos[MAX_CHANNELS * 8];
char out_pos[MAX_CHANNELS * 8];
@ -293,6 +302,11 @@ static void emit_filter_graph_info(struct impl *impl, bool full)
graph->n_outputs_position, graph->outputs_position);
items[dict.n_items++] = SPA_DICT_ITEM("outputs.audio.position", out_pos);
}
if (graph->n_export_control_names > 0) {
snprintf(export_interval, sizeof(export_interval), "%u",
graph->notify_interval_ms);
items[dict.n_items++] = SPA_DICT_ITEM("export.interval.ms", export_interval);
}
items[dict.n_items++] = SPA_DICT_ITEM("latency",
spa_dtoa(latency, sizeof(latency),
(graph->min_latency + graph->max_latency) / 2.0f));
@ -457,6 +471,18 @@ static void get_ranges(struct impl *impl, struct spa_fga_port *p,
}
}
static void port_get_name(struct port *port, char *name, size_t size)
{
struct node *node = port->node;
const struct spa_fga_descriptor *d = node->desc->desc;
struct spa_fga_port *p = &d->ports[port->p];
if (node->name[0] != '\0')
snprintf(name, size, "%s:%s", node->name, p->name);
else
snprintf(name, size, "%s", p->name);
}
static int impl_enum_prop_info(void *object, uint32_t idx, struct spa_pod_builder *b,
struct spa_pod **param)
{
@ -483,10 +509,7 @@ static int impl_enum_prop_info(void *object, uint32_t idx, struct spa_pod_builde
get_ranges(impl, p, &def, &min, &max);
if (node->name[0] != '\0')
snprintf(name, sizeof(name), "%s:%s", node->name, p->name);
else
snprintf(name, sizeof(name), "%s", p->name);
port_get_name(port, name, sizeof(name));
spa_pod_builder_push_object(b, &f[0],
SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo);
@ -558,10 +581,7 @@ static int impl_get_props(void *object, struct spa_pod_builder *b, struct spa_po
struct spa_fga_port *p = &d->ports[port->p];
float v, min, max;
if (node->name[0] != '\0')
snprintf(name, sizeof(name), "%s:%s", node->name, p->name);
else
snprintf(name, sizeof(name), "%s", p->name);
port_get_name(port, name, sizeof(name));
if (port->control_initialized)
v = port->control_current;
@ -586,6 +606,119 @@ static int impl_get_props(void *object, struct spa_pod_builder *b, struct spa_po
return 1;
}
static int impl_enum_notify_info(void *object, uint32_t idx, struct spa_pod_builder *b,
struct spa_pod **param)
{
struct impl *impl = object;
struct graph *graph = &impl->graph;
struct spa_pod *pod;
struct spa_pod_frame f[2];
struct port *port;
struct node *node;
const struct spa_fga_descriptor *d;
struct spa_fga_port *p;
float def, min, max;
char name[512];
if (idx >= graph->n_notify)
return 0;
port = graph->notify_port[idx];
node = port->node;
d = node->desc->desc;
p = &d->ports[port->p];
get_ranges(impl, p, &def, &min, &max);
port_get_name(port, name, sizeof(name));
spa_pod_builder_push_object(b, &f[0],
SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo);
spa_pod_builder_add (b,
SPA_PROP_INFO_name, SPA_POD_String(name),
0);
spa_pod_builder_prop(b, SPA_PROP_INFO_type, 0);
if (p->hint & SPA_FGA_HINT_BOOLEAN) {
if (min == max) {
spa_pod_builder_bool(b, def <= 0.0f ? false : true);
} else {
spa_pod_builder_push_choice(b, &f[1], SPA_CHOICE_Enum, 0);
spa_pod_builder_bool(b, def <= 0.0f ? false : true);
spa_pod_builder_bool(b, false);
spa_pod_builder_bool(b, true);
spa_pod_builder_pop(b, &f[1]);
}
} else if (p->hint & SPA_FGA_HINT_INTEGER) {
if (min == max) {
spa_pod_builder_int(b, (int32_t)def);
} else {
spa_pod_builder_push_choice(b, &f[1], SPA_CHOICE_Range, 0);
spa_pod_builder_int(b, (int32_t)def);
spa_pod_builder_int(b, (int32_t)min);
spa_pod_builder_int(b, (int32_t)max);
spa_pod_builder_pop(b, &f[1]);
}
} else {
if (min == max) {
spa_pod_builder_float(b, def);
} else {
spa_pod_builder_push_choice(b, &f[1], SPA_CHOICE_Range, 0);
spa_pod_builder_float(b, def);
spa_pod_builder_float(b, min);
spa_pod_builder_float(b, max);
spa_pod_builder_pop(b, &f[1]);
}
}
spa_pod_builder_prop(b, SPA_PROP_INFO_params, 0);
spa_pod_builder_bool(b, true);
pod = spa_pod_builder_pop(b, &f[0]);
if (pod == NULL)
return -ENOSPC;
if (param)
*param = pod;
return 1;
}
static int impl_get_notify(void *object, struct spa_pod_builder *b, struct spa_pod **props)
{
struct impl *impl = object;
struct graph *graph = &impl->graph;
struct spa_pod_frame f[2];
struct spa_pod *res;
uint32_t i;
char name[512];
spa_pod_builder_push_object(b, &f[0],
SPA_TYPE_OBJECT_Props, SPA_PARAM_Props);
spa_pod_builder_prop(b, SPA_PROP_params, 0);
spa_pod_builder_push_struct(b, &f[1]);
for (i = 0; i < graph->n_notify; i++) {
struct port *port = graph->notify_port[i];
const struct spa_fga_descriptor *d = port->node->desc->desc;
struct spa_fga_port *p = &d->ports[port->p];
float v = port->control_data[0];
port_get_name(port, name, sizeof(name));
spa_pod_builder_string(b, name);
if (p->hint & SPA_FGA_HINT_BOOLEAN) {
spa_pod_builder_bool(b, v <= 0.0f ? false : true);
} else if (p->hint & SPA_FGA_HINT_INTEGER) {
spa_pod_builder_int(b, (int32_t)v);
} else {
spa_pod_builder_float(b, v);
}
}
spa_pod_builder_pop(b, &f[1]);
res = spa_pod_builder_pop(b, &f[0]);
if (res == NULL)
return -ENOSPC;
if (props)
*props = res;
return 1;
}
static int port_id_set_control_value(struct port *port, uint32_t id, float value)
{
struct node *node = port->node;
@ -2161,6 +2294,34 @@ static int setup_graph_controls(struct graph *graph)
return 0;
}
static int setup_graph_notify(struct graph *graph)
{
struct impl *impl = graph->impl;
struct node *first;
uint32_t i;
if (graph->n_export_control_names == 0)
return 0;
graph->notify_port = calloc(graph->n_export_control_names, sizeof(struct port *));
if (graph->notify_port == NULL)
return -errno;
first = spa_list_first(&graph->node_list, struct node, link);
for (i = 0; i < graph->n_export_control_names; i++) {
const char *name = graph->export_control_names[i];
struct port *port;
port = find_port(first, name, SPA_FGA_PORT_OUTPUT | SPA_FGA_PORT_CONTROL);
if (port == NULL) {
spa_log_error(impl->log, "export control port %s not found", name);
return -ENOENT;
}
graph->notify_port[graph->n_notify++] = port;
}
return 0;
}
/**
* filter.graph = {
* nodes = [
@ -2186,6 +2347,7 @@ static int load_graph(struct graph *graph, const struct spa_dict *props)
struct spa_json inputs, outputs, *pinputs = NULL, *poutputs = NULL;
struct spa_json ivolumes, ovolumes, *pivolumes = NULL, *povolumes = NULL;
struct spa_json nodes, *pnodes = NULL, links, *plinks = NULL;
struct spa_json export_controls, *pexport_controls = NULL;
struct node *first, *last;
const char *json, *val;
char key[256];
@ -2193,6 +2355,7 @@ static int load_graph(struct graph *graph, const struct spa_dict *props)
spa_list_init(&graph->node_list);
spa_list_init(&graph->link_list);
graph->notify_interval_ms = DEFAULT_EXPORT_INTERVAL_MS;
if ((json = spa_dict_lookup(props, "filter.graph")) == NULL) {
spa_log_error(impl->log, "missing filter.graph property");
@ -2290,6 +2453,21 @@ static int load_graph(struct graph *graph, const struct spa_dict *props)
}
spa_json_enter(&it[0], &ovolumes);
povolumes = &ovolumes;
}
else if (spa_streq("export.controls", key)) {
if (!spa_json_is_array(val, len)) {
spa_log_error(impl->log, "%s expects an array", key);
return -EINVAL;
}
spa_json_enter(&it[0], &export_controls);
pexport_controls = &export_controls;
}
else if (spa_streq("export.interval.ms", key)) {
if (spa_json_parse_int(val, len, &res) <= 0 || res < 0) {
spa_log_error(impl->log, "%s expects a non-negative integer", key);
return -EINVAL;
}
graph->notify_interval_ms = res;
} else {
spa_log_warn(impl->log, "unexpected graph key '%s'", key);
}
@ -2350,8 +2528,23 @@ static int load_graph(struct graph *graph, const struct spa_dict *props)
graph->n_output_names++;
}
}
if (pexport_controls != NULL) {
graph->n_export_control_names = count_array(pexport_controls);
graph->export_control_names = calloc(graph->n_export_control_names, sizeof(char *));
if (graph->export_control_names == NULL)
return -ENOMEM;
graph->n_export_control_names = 0;
while (spa_json_get_string(pexport_controls, key, sizeof(key)) > 0) {
graph->export_control_names[graph->n_export_control_names] = strdup(key);
if (graph->export_control_names[graph->n_export_control_names] == NULL)
return -ENOMEM;
graph->n_export_control_names++;
}
}
if ((res = setup_graph_controls(graph)) < 0)
return res;
if ((res = setup_graph_notify(graph)) < 0)
return res;
first = spa_list_first(&graph->node_list, struct node, link);
last = spa_list_last(&graph->node_list, struct node, link);
@ -2391,8 +2584,13 @@ static void graph_free(struct graph *graph)
for (i = 0; i < graph->n_output_names; i++)
free(graph->output_names[i]);
free(graph->output_names);
for (i = 0; i < graph->n_export_control_names; i++)
free(graph->export_control_names[i]);
free(graph->export_control_names);
free(graph->control_port);
graph->control_port = NULL;
free(graph->notify_port);
graph->notify_port = NULL;
}
static const struct spa_filter_graph_methods impl_filter_graph = {
@ -2405,6 +2603,8 @@ static const struct spa_filter_graph_methods impl_filter_graph = {
.deactivate = impl_deactivate,
.reset = impl_reset,
.process = impl_process,
.enum_notify_info = impl_enum_notify_info,
.get_notify = impl_get_notify,
};
static int impl_get_interface(struct spa_handle *handle, const char *type, void **interface)

View file

@ -7,6 +7,7 @@
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
@ -18,6 +19,7 @@
#include <spa/utils/overflow.h>
#include <spa/param/audio/raw-json.h>
#include <spa/pod/dynamic.h>
#include <spa/control/control.h>
#include <spa/filter-graph/filter-graph.h>
#include <pipewire/impl.h>
@ -88,6 +90,8 @@ extern struct spa_handle_factory spa_filter_graph_factory;
* playback.volumes = [
* { control = <portname> min = <value> max = <value> scale = <scale> } ...
* ]
* export.controls = [ <portname> ... ]
* export.interval.ms = 33
* }
*\endcode
*
@ -217,6 +221,29 @@ extern struct spa_handle_factory spa_filter_graph_factory;
* default this is linear but it can be set to cubic when the control applies a
* cubic transformation.
*
* ### Exported Controls
*
* Output control ports can be exported as a PipeWire control stream for metering
* and status monitoring. The port names use the same `<node_name>:<port_name>`
* syntax as links. Exported values are emitted as `SPA_CONTROL_Properties` on a
* read-only `application/control` output stream. Each exported port is emitted
* as a scalar value. When the graph is duplicated to match the channel count,
* the value from the first graph instance is exported.
*
* The `export.interval.ms` value controls how often values are emitted. The default
* is 33ms. A value of 0 emits on every processed quantum when a control stream
* buffer is available.
*
*\code{.unparsed}
* filter.graph = {
* nodes = [
* { type = builtin name = ramp label = ramp }
* ]
* export.controls = [ "ramp:Current" ]
* export.interval.ms = 33
* }
*\endcode
*
* ## Builtin filters
*
* There are some useful builtin filters available. The type should be `builtin` and
@ -1234,6 +1261,7 @@ static const struct spa_dict_item module_props[] = {
#define DEFAULT_RATE 48000
#define MAX_DATAS 1024u
#define CONTROL_STREAM_MIN_BUFFER_SIZE 4096u
struct impl {
struct pw_context *context;
@ -1257,6 +1285,14 @@ struct impl {
struct spa_hook playback_listener;
struct spa_audio_info_raw playback_info;
struct pw_properties *control_props;
struct pw_stream *control;
struct spa_hook control_listener;
uint32_t control_buffer_size;
uint64_t control_interval;
uint64_t control_last_time;
bool control_buffer_size_warned;
struct spa_audio_info_raw info;
struct spa_io_position *position;
@ -1283,6 +1319,80 @@ static void capture_destroy(void *d)
impl->capture = NULL;
}
static int emit_control_stream(struct impl *impl)
{
struct pw_buffer *buf;
struct spa_buffer *buffer;
struct spa_data *bd;
struct spa_pod_builder b;
struct spa_pod_frame f;
uint64_t now = 0;
int res;
if (impl->control == NULL || !impl->graph_active)
return 0;
if (impl->position)
now = impl->position->clock.nsec;
if (impl->control_interval > 0 &&
now > 0 &&
impl->control_last_time > 0 &&
now >= impl->control_last_time &&
now - impl->control_last_time < impl->control_interval)
return 0;
buf = pw_stream_dequeue_buffer(impl->control);
if (buf == NULL)
return 0;
buffer = buf->buffer;
if (buffer->n_datas == 0)
goto queue_empty;
bd = &buffer->datas[0];
if (bd->data == NULL || bd->chunk == NULL ||
bd->maxsize < impl->control_buffer_size) {
if (!impl->control_buffer_size_warned) {
pw_log_warn("%p: control stream buffer too small: %u < %u",
impl, bd->maxsize, impl->control_buffer_size);
impl->control_buffer_size_warned = true;
}
goto queue_empty;
}
spa_pod_builder_init(&b, bd->data, bd->maxsize);
spa_pod_builder_push_sequence(&b, &f, 0);
spa_pod_builder_control(&b, 0, SPA_CONTROL_Properties);
res = spa_filter_graph_get_notify(impl->graph, &b, NULL);
if (res < 0) {
if (!impl->control_buffer_size_warned) {
pw_log_warn("%p: can't build control stream buffer: %s",
impl, spa_strerror(res));
impl->control_buffer_size_warned = true;
}
goto queue_empty;
}
spa_pod_builder_pop(&b, &f);
bd->chunk->offset = 0;
bd->chunk->size = b.state.offset;
bd->chunk->stride = 0;
pw_stream_queue_buffer(impl->control, buf);
impl->control_last_time = now;
impl->control_buffer_size_warned = false;
return 0;
queue_empty:
if (buffer->n_datas > 0 && buffer->datas[0].chunk != NULL) {
bd = &buffer->datas[0];
bd->chunk->offset = 0;
bd->chunk->size = 0;
bd->chunk->stride = 0;
}
pw_stream_queue_buffer(impl->control, buf);
return 0;
}
static void do_process(struct impl *impl)
{
struct pw_buffer *in, *out;
@ -1354,6 +1464,7 @@ static void do_process(struct impl *impl)
cout[n_out++] = NULL;
spa_filter_graph_process(impl->graph, cin, cout, data_size / sizeof(float));
emit_control_stream(impl);
}
if (in != NULL)
@ -1682,6 +1793,169 @@ static const struct pw_stream_events out_stream_events = {
.param_changed = playback_param_changed,
};
static void control_destroy(void *d)
{
struct impl *impl = d;
spa_hook_remove(&impl->control_listener);
impl->control = NULL;
}
static void control_state_changed(void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct impl *impl = data;
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
pw_log_info("module %p: control stream unconnected", impl);
break;
case PW_STREAM_STATE_ERROR:
pw_log_info("module %p: control stream error: %s", impl, error);
break;
default:
break;
}
}
static const struct pw_stream_events control_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = control_destroy,
.state_changed = control_state_changed,
};
static int get_control_stream_buffer_size(struct impl *impl, uint32_t *size)
{
struct spa_pod_dynamic_builder b;
struct spa_pod_frame f;
struct spa_pod *pod;
int res;
spa_pod_dynamic_builder_init(&b, NULL, 0, CONTROL_STREAM_MIN_BUFFER_SIZE);
res = spa_pod_builder_push_sequence(&b.b, &f, 0);
if (res >= 0)
res = spa_pod_builder_control(&b.b, 0, SPA_CONTROL_Properties);
if (res >= 0)
res = spa_filter_graph_get_notify(impl->graph, &b.b, NULL);
pod = spa_pod_builder_pop(&b.b, &f);
if (res >= 0 && pod == NULL)
res = -ENOSPC;
if (res >= 0 && b.b.state.offset > INT_MAX)
res = -EOVERFLOW;
if (res >= 0)
*size = SPA_MAX((uint32_t)b.b.state.offset,
CONTROL_STREAM_MIN_BUFFER_SIZE);
spa_pod_dynamic_builder_clean(&b);
return res;
}
static int setup_control_stream(struct impl *impl)
{
struct spa_pod_dynamic_builder b;
struct pw_array offsets;
const struct spa_pod **params = NULL;
uint32_t i, n_params, *offs;
int res = 0;
spa_pod_dynamic_builder_init(&b, NULL, 0, CONTROL_STREAM_MIN_BUFFER_SIZE);
pw_array_init(&offsets, 512);
for (i = 0;; i++) {
uint32_t save = b.b.state.offset;
res = spa_filter_graph_enum_notify_info(impl->graph, i, &b.b, NULL);
if (res != 1) {
if (res < 0)
goto done;
break;
}
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = save;
}
if (i == 0)
goto done;
if ((res = get_control_stream_buffer_size(impl, &impl->control_buffer_size)) < 0)
goto done;
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = b.b.state.offset;
spa_pod_builder_add_object(&b.b,
SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control));
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = b.b.state.offset;
spa_pod_builder_add_object(&b.b,
SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, 32),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
impl->control_buffer_size, impl->control_buffer_size, INT_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(1));
n_params = pw_array_get_len(&offsets, uint32_t);
params = calloc(n_params, sizeof(struct spa_pod *));
if (params == NULL) {
res = -errno;
goto done;
}
offs = offsets.data;
for (i = 0; i < n_params; i++)
params[i] = spa_pod_builder_deref(&b.b, offs[i]);
impl->control_props = pw_properties_new(
PW_KEY_MEDIA_TYPE, "Control",
PW_KEY_MEDIA_CATEGORY, "Monitor",
PW_KEY_MEDIA_ROLE, "DSP",
PW_KEY_MEDIA_CLASS, "Stream/Output/Data",
PW_KEY_NODE_AUTOCONNECT, "false",
PW_KEY_FORMAT_DSP, "8 bit raw control",
NULL);
if (impl->control_props == NULL) {
res = -errno;
goto done;
}
pw_properties_setf(impl->control_props, PW_KEY_NODE_NAME, "notify.%s",
pw_properties_get(impl->props, PW_KEY_NODE_NAME));
pw_properties_setf(impl->control_props, PW_KEY_MEDIA_NAME, "%s notify",
pw_properties_get(impl->props, PW_KEY_NODE_DESCRIPTION));
impl->control = pw_stream_new(impl->core,
"filter notify", impl->control_props);
impl->control_props = NULL;
if (impl->control == NULL) {
res = -errno;
goto done;
}
pw_stream_add_listener(impl->control,
&impl->control_listener,
&control_stream_events, impl);
res = pw_stream_connect(impl->control,
PW_DIRECTION_OUTPUT,
PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS,
params, n_params);
done:
free(params);
pw_array_clear(&offsets);
spa_pod_dynamic_builder_clean(&b);
return res < 0 ? res : 0;
}
static int setup_streams(struct impl *impl)
{
int res = 0;
@ -1805,6 +2079,8 @@ static int setup_streams(struct impl *impl)
flags,
params, n_params);
}
if (res >= 0)
res = setup_control_stream(impl);
spa_pod_dynamic_builder_clean(&b);
done:
@ -1871,6 +2147,10 @@ static void graph_info(void *object, const struct spa_filter_graph_info *info)
pw_log_info("using default outputs %d", val);
impl->playback_info.channels = val;
}
else if (spa_streq(k, "export.interval.ms") &&
spa_atou32(s, &val, 0)) {
impl->control_interval = (uint64_t)val * SPA_NSEC_PER_MSEC;
}
}
if (impl->capture_info.channels == impl->playback_info.channels) {
copy_position(&impl->capture_info, &impl->playback_info);
@ -1952,11 +2232,15 @@ static void impl_destroy(struct impl *impl)
pw_stream_disconnect(impl->capture);
if (impl->playback)
pw_stream_disconnect(impl->playback);
if (impl->control)
pw_stream_disconnect(impl->control);
if (impl->capture)
pw_stream_destroy(impl->capture);
if (impl->playback)
pw_stream_destroy(impl->playback);
if (impl->control)
pw_stream_destroy(impl->control);
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
@ -1966,6 +2250,7 @@ static void impl_destroy(struct impl *impl)
pw_properties_free(impl->capture_props);
pw_properties_free(impl->playback_props);
pw_properties_free(impl->control_props);
pw_properties_free(impl->props);
free(impl);

View file

@ -108,6 +108,55 @@ test('test-support',
dependencies: [spa_dep, systemd_dep, spa_support_dep, spa_journal_dep],
link_with: [pwtest_lib])
)
test('test-filter-graph',
executable('test-filter-graph',
'test-filter-graph.c',
include_directories: pwtest_inc,
dependencies: [spa_dep, pipewire_dep],
link_with: [pwtest_lib]),
env : [
'SPA_PLUGIN_DIR=@0@'.format(meson.project_build_root() / 'spa/plugins'),
'SPA_DATA_DIR=@0@'.format(meson.project_source_root() / 'spa/plugins'),
],
depends : [
spa_support_lib,
spa_filter_graph,
spa_filter_graph_plugin_builtin,
],
)
if get_option('audioconvert').allowed()
test('test-filter-chain',
executable('test-filter-chain',
'test-filter-chain.c',
c_args: pwtest_c_args,
include_directories: pwtest_inc,
dependencies: [spa_dep, pipewire_dep],
link_with: [pwtest_lib]),
env : [
'SPA_PLUGIN_DIR=@0@'.format(meson.project_build_root() / 'spa/plugins'),
'SPA_DATA_DIR=@0@'.format(meson.project_source_root() / 'spa/plugins'),
'PIPEWIRE_MODULE_DIR=@0@'.format(meson.project_build_root() / 'src/modules'),
'LD_LIBRARY_PATH=@0@'.format(meson.project_build_root() / 'src/pipewire'),
],
depends : [
pipewire_exec,
pipewire_module_protocol_native,
pipewire_module_scheduler_v1,
pipewire_module_access,
pipewire_module_client_node,
pipewire_module_adapter,
pipewire_module_spa_node_factory,
pipewire_module_link_factory,
pipewire_module_filter_chain,
spa_support_lib,
spa_audioconvert_lib,
spa_filter_graph,
spa_filter_graph_plugin_builtin,
],
)
endif
endif
test('test-spa',
executable('test-spa',

713
test/test-filter-chain.c Normal file
View file

@ -0,0 +1,713 @@
/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2026 PipeWire authors */
/* SPDX-License-Identifier: MIT */
#include "config.h"
#include "pwtest.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pipewire/pipewire.h>
#include <pipewire/link.h>
#include <spa/control/control.h>
#include <spa/param/audio/raw.h>
#include <spa/param/audio/raw-utils.h>
#include <spa/param/format.h>
#include <spa/param/props.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include <spa/utils/result.h>
#include <spa/utils/string.h>
struct data {
struct pw_main_loop *loop;
struct pw_context *context;
struct pw_core *core;
struct pw_registry *registry;
struct pw_stream *source;
struct pw_stream *sink;
struct pw_stream *control;
struct spa_hook registry_listener;
struct spa_hook source_listener;
struct spa_hook sink_listener;
struct spa_hook control_listener;
struct pw_proxy *source_link;
struct pw_proxy *sink_link;
struct pw_proxy *control_link;
struct spa_hook source_link_listener;
struct spa_hook sink_link_listener;
struct spa_hook control_link_listener;
bool source_ready;
bool sink_ready;
bool control_ready;
bool source_streaming;
bool sink_streaming;
bool control_streaming;
bool audio_source_ready;
bool audio_sink_ready;
bool control_monitor_ready;
bool effect_input_ready;
bool effect_output_ready;
bool notify_ready;
bool source_link_ready;
bool sink_link_ready;
bool control_link_ready;
bool got_notify;
float notify_value;
};
static pid_t cleanup_daemon_pid;
static const char daemon_conf[] =
"context.properties = {\n"
" core.daemon = true\n"
" core.name = pipewire-0\n"
" support.dbus = false\n"
" default.clock.rate = 48000\n"
" default.clock.quantum = 64\n"
" default.clock.min-quantum = 64\n"
" default.clock.max-quantum = 64\n"
" settings.check-quantum = false\n"
" settings.check-rate = false\n"
"}\n"
"context.spa-libs = {\n"
" support.* = support/libspa-support\n"
" audio.convert.* = audioconvert/libspa-audioconvert\n"
" audio.adapt = audioconvert/libspa-audioconvert\n"
" filter.graph.plugin.builtin = filter-graph/libspa-filter-graph-plugin-builtin\n"
" filter.graph = filter-graph/libspa-filter-graph\n"
"}\n"
"context.modules = [\n"
" { name = libpipewire-module-protocol-native }\n"
" { name = libpipewire-module-scheduler-v1 }\n"
" { name = libpipewire-module-access args = { access.socket = { pipewire-0 = unrestricted } } }\n"
" { name = libpipewire-module-client-node }\n"
" { name = libpipewire-module-adapter }\n"
" { name = libpipewire-module-spa-node-factory }\n"
" { name = libpipewire-module-link-factory }\n"
" { name = libpipewire-module-filter-chain\n"
" args = {\n"
" node.name = \"test-filter-chain\"\n"
" node.description = \"test filter chain\"\n"
" capture.props = {\n"
" node.name = \"effect_input.test-filter-chain\"\n"
" media.class = \"Audio/Sink\"\n"
" audio.channels = 1\n"
" audio.position = [ MONO ]\n"
" }\n"
" playback.props = {\n"
" node.name = \"effect_output.test-filter-chain\"\n"
" media.class = \"Audio/Source\"\n"
" audio.channels = 1\n"
" audio.position = [ MONO ]\n"
" }\n"
" filter.graph = {\n"
" nodes = [\n"
" { type = builtin name = copy label = copy }\n"
" { type = builtin name = ramp label = ramp\n"
" control = {\n"
" Start = 0.0\n"
" Stop = 1.0\n"
" \"Duration (s)\" = 0.001\n"
" }\n"
" }\n"
" ]\n"
" inputs = [ \"copy:In\" ]\n"
" outputs = [ \"copy:Out\" ]\n"
" export.controls = [ \"ramp:Current\" ]\n"
" export.interval.ms = 0\n"
" }\n"
" }\n"
" }\n"
"]\n"
"stream.properties = {\n"
" adapter.auto-port-config = { mode = dsp }\n"
"}\n"
"context.objects = [\n"
" { factory = spa-node-factory\n"
" args = {\n"
" factory.name = support.node.driver\n"
" node.name = Dummy-Driver\n"
" node.group = pipewire.dummy\n"
" priority.driver = 20000\n"
" }\n"
" }\n"
"]\n";
static const char client_conf[] =
"context.properties = {\n"
" support.dbus = false\n"
"}\n"
"context.spa-libs = {\n"
" support.* = support/libspa-support\n"
" audio.convert.* = audioconvert/libspa-audioconvert\n"
" audio.adapt = audioconvert/libspa-audioconvert\n"
"}\n"
"context.modules = [\n"
" { name = libpipewire-module-protocol-native }\n"
" { name = libpipewire-module-client-node }\n"
" { name = libpipewire-module-adapter }\n"
"]\n"
"stream.properties = {\n"
" adapter.auto-port-config = { mode = dsp }\n"
"}\n";
static int write_file(const char *path, const char *content)
{
FILE *f;
f = fopen(path, "w");
if (f == NULL)
return -errno;
if (fputs(content, f) < 0) {
int res = errno > 0 ? -errno : -EIO;
fclose(f);
return res;
}
if (fclose(f) < 0)
return -errno;
return 0;
}
static bool parse_params_struct(const struct spa_pod *pod, float *value)
{
struct spa_pod_parser prs;
struct spa_pod_frame f;
if (pod == NULL || !spa_pod_is_struct(pod))
return false;
spa_pod_parser_pod(&prs, pod);
if (spa_pod_parser_push_struct(&prs, &f) < 0)
return false;
while (true) {
const char *name;
float v;
if (spa_pod_parser_get_string(&prs, &name) < 0)
break;
if (spa_pod_parser_get_float(&prs, &v) < 0)
break;
if (spa_streq(name, "ramp:Current")) {
*value = v;
return true;
}
}
return false;
}
static bool parse_control_buffer(const void *data, uint32_t size, float *value)
{
struct spa_pod_parser parser;
struct spa_pod_frame frame;
struct spa_pod_sequence seq;
const void *seq_body, *c_body;
struct spa_pod_control c;
spa_pod_parser_init_from_data(&parser, data, size, 0, size);
if (spa_pod_parser_push_sequence_body(&parser, &frame, &seq, &seq_body) < 0)
return false;
while (spa_pod_parser_get_control_body(&parser, &c, &c_body) >= 0) {
const struct spa_pod *params = NULL;
if (c.type != SPA_CONTROL_Properties)
continue;
if (spa_pod_body_parse_object(&c.value, c_body,
SPA_TYPE_OBJECT_Props, NULL,
SPA_PROP_params, SPA_POD_OPT_Pod(&params)) < 0)
continue;
if (parse_params_struct(params, value))
return true;
}
return false;
}
static void source_process(void *userdata)
{
struct data *d = userdata;
struct pw_buffer *b;
while ((b = pw_stream_dequeue_buffer(d->source)) != NULL) {
struct spa_buffer *buf = b->buffer;
if (buf->n_datas > 0) {
struct spa_data *bd = &buf->datas[0];
uint32_t n_frames;
n_frames = b->requested;
if (n_frames == 0)
n_frames = bd->maxsize / sizeof(float);
n_frames = SPA_MIN(n_frames, bd->maxsize / sizeof(float));
if (bd->data != NULL)
memset(bd->data, 0, n_frames * sizeof(float));
bd->chunk->offset = 0;
bd->chunk->size = n_frames * sizeof(float);
bd->chunk->stride = sizeof(float);
}
pw_stream_queue_buffer(d->source, b);
}
}
static void sink_process(void *userdata)
{
struct data *d = userdata;
struct pw_buffer *b;
while ((b = pw_stream_dequeue_buffer(d->sink)) != NULL)
pw_stream_queue_buffer(d->sink, b);
}
static void control_process(void *userdata)
{
struct data *d = userdata;
struct pw_buffer *b;
while ((b = pw_stream_dequeue_buffer(d->control)) != NULL) {
struct spa_buffer *buf = b->buffer;
if (buf->n_datas > 0) {
struct spa_data *bd = &buf->datas[0];
uint32_t offset, size;
void *ptr;
offset = SPA_MIN(bd->chunk->offset, bd->maxsize);
size = SPA_MIN(bd->chunk->size, bd->maxsize - offset);
ptr = SPA_PTROFF(bd->data, offset, void);
if (ptr != NULL && size > 0 &&
parse_control_buffer(ptr, size, &d->notify_value)) {
d->got_notify = true;
pw_main_loop_quit(d->loop);
}
}
pw_stream_queue_buffer(d->control, b);
}
}
static void source_state_changed(void *userdata, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct data *d = userdata;
if (state == PW_STREAM_STATE_ERROR)
pwtest_fail_with_msg("source stream error: %s", error);
if (state == PW_STREAM_STATE_PAUSED ||
state == PW_STREAM_STATE_STREAMING)
d->source_ready = true;
d->source_streaming = state == PW_STREAM_STATE_STREAMING;
}
static void sink_state_changed(void *userdata, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct data *d = userdata;
if (state == PW_STREAM_STATE_ERROR)
pwtest_fail_with_msg("sink stream error: %s", error);
if (state == PW_STREAM_STATE_PAUSED ||
state == PW_STREAM_STATE_STREAMING)
d->sink_ready = true;
d->sink_streaming = state == PW_STREAM_STATE_STREAMING;
}
static void control_state_changed(void *userdata, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct data *d = userdata;
if (state == PW_STREAM_STATE_ERROR)
pwtest_fail_with_msg("control stream error: %s", error);
if (state == PW_STREAM_STATE_PAUSED ||
state == PW_STREAM_STATE_STREAMING)
d->control_ready = true;
d->control_streaming = state == PW_STREAM_STATE_STREAMING;
}
static const struct pw_stream_events source_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = source_state_changed,
.process = source_process,
};
static const struct pw_stream_events sink_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = sink_state_changed,
.process = sink_process,
};
static const struct pw_stream_events control_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = control_state_changed,
.process = control_process,
};
static void registry_global(void *userdata, uint32_t id, uint32_t permissions,
const char *type, uint32_t version, const struct spa_dict *props)
{
struct data *d = userdata;
const char *name;
if (!spa_streq(type, PW_TYPE_INTERFACE_Node) || props == NULL)
return;
name = spa_dict_lookup(props, PW_KEY_NODE_NAME);
if (name == NULL)
return;
if (spa_streq(name, "audio-source"))
d->audio_source_ready = true;
else if (spa_streq(name, "audio-sink"))
d->audio_sink_ready = true;
else if (spa_streq(name, "control-monitor"))
d->control_monitor_ready = true;
else if (spa_streq(name, "effect_input.test-filter-chain"))
d->effect_input_ready = true;
else if (spa_streq(name, "effect_output.test-filter-chain"))
d->effect_output_ready = true;
else if (spa_streq(name, "notify.test-filter-chain"))
d->notify_ready = true;
}
static const struct pw_registry_events registry_events = {
PW_VERSION_REGISTRY_EVENTS,
.global = registry_global,
};
static void link_info(void *userdata, const struct pw_link_info *info)
{
bool *ready = userdata;
if (info->state == PW_LINK_STATE_ACTIVE)
*ready = true;
}
static const struct pw_link_events link_events = {
PW_VERSION_LINK_EVENTS,
.info = link_info,
};
static int create_link(struct data *d, const char *output, const char *input,
struct pw_proxy **link)
{
struct pw_properties *props;
props = pw_properties_new(
PW_KEY_LINK_OUTPUT_NODE, output,
PW_KEY_LINK_INPUT_NODE, input,
NULL);
if (props == NULL)
return -errno;
*link = pw_core_create_object(d->core,
"link-factory",
PW_TYPE_INTERFACE_Link,
PW_VERSION_LINK,
&props->dict,
0);
pw_properties_free(props);
return *link == NULL ? -errno : 0;
}
static void iterate_streams_ready(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 200 &&
(!d->source_ready || !d->sink_ready || !d->control_ready); i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static void iterate_nodes_ready(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 400 &&
(!d->audio_source_ready || !d->audio_sink_ready ||
!d->control_monitor_ready || !d->effect_input_ready ||
!d->effect_output_ready || !d->notify_ready); i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static void iterate_links_ready(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 200 &&
(!d->source_link_ready || !d->sink_link_ready ||
!d->control_link_ready || !d->source_streaming ||
!d->sink_streaming || !d->control_streaming); i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static void iterate_notify(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 200 && !d->got_notify; i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static int setup_streams(struct data *d)
{
struct spa_pod_builder b;
uint8_t buffer[1024];
const struct spa_pod *audio_params[1];
const struct spa_pod *control_params[1];
struct spa_audio_info_raw audio_info = SPA_AUDIO_INFO_RAW_INIT(
.format = SPA_AUDIO_FORMAT_F32P,
.rate = 48000,
.channels = 1,
.position = { SPA_AUDIO_CHANNEL_MONO });
int res;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
audio_params[0] = spa_format_audio_raw_build(&b,
SPA_PARAM_EnumFormat, &audio_info);
d->source = pw_stream_new(d->core, "audio source",
pw_properties_new(PW_KEY_NODE_NAME, "audio-source", NULL));
if (d->source == NULL)
return -errno;
pw_stream_add_listener(d->source, &d->source_listener,
&source_events, d);
res = pw_stream_connect(d->source, PW_DIRECTION_OUTPUT, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS,
audio_params, 1);
if (res < 0)
return res;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
audio_params[0] = spa_format_audio_raw_build(&b,
SPA_PARAM_EnumFormat, &audio_info);
d->sink = pw_stream_new(d->core, "audio sink",
pw_properties_new(PW_KEY_NODE_NAME, "audio-sink", NULL));
if (d->sink == NULL)
return -errno;
pw_stream_add_listener(d->sink, &d->sink_listener,
&sink_events, d);
res = pw_stream_connect(d->sink, PW_DIRECTION_INPUT, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS,
audio_params, 1);
if (res < 0)
return res;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
control_params[0] = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control));
d->control = pw_stream_new(d->core, "control monitor",
pw_properties_new(
PW_KEY_NODE_NAME, "control-monitor",
PW_KEY_MEDIA_CLASS, "Stream/Input/Data",
PW_KEY_FORMAT_DSP, "8 bit raw control",
NULL));
if (d->control == NULL)
return -errno;
pw_stream_add_listener(d->control, &d->control_listener,
&control_events, d);
return pw_stream_connect(d->control, PW_DIRECTION_INPUT, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS,
control_params, 1);
}
static int setup_links(struct data *d)
{
int res;
res = create_link(d, "audio-source", "effect_input.test-filter-chain",
&d->source_link);
if (res < 0)
return res;
res = create_link(d, "effect_output.test-filter-chain", "audio-sink",
&d->sink_link);
if (res < 0)
return res;
res = create_link(d, "notify.test-filter-chain", "control-monitor",
&d->control_link);
if (res < 0)
return res;
pw_proxy_add_object_listener(d->source_link, &d->source_link_listener,
&link_events, &d->source_link_ready);
pw_proxy_add_object_listener(d->sink_link, &d->sink_link_listener,
&link_events, &d->sink_link_ready);
pw_proxy_add_object_listener(d->control_link, &d->control_link_listener,
&link_events, &d->control_link_ready);
return 0;
}
static pid_t start_daemon(const char *conf_dir, const char *runtime_dir)
{
pid_t pid = fork();
if (pid == 0) {
setenv("PIPEWIRE_CONFIG_DIR", conf_dir, 1);
setenv("PIPEWIRE_RUNTIME_DIR", runtime_dir, 1);
unsetenv("PIPEWIRE_REMOTE");
unsetenv("PIPEWIRE_CORE");
execl(BUILD_ROOT "/src/daemon/pipewire",
BUILD_ROOT "/src/daemon/pipewire",
"-c", "pipewire.conf",
(char *)NULL);
_exit(127);
}
return pid;
}
static void stop_daemon(pid_t pid)
{
int status;
if (pid <= 0)
return;
kill(pid, SIGTERM);
waitpid(pid, &status, 0);
if (cleanup_daemon_pid == pid)
cleanup_daemon_pid = 0;
}
static void cleanup_daemon(void)
{
stop_daemon(cleanup_daemon_pid);
}
PWTEST(filter_chain_exported_control_stream)
{
const char *tmpdir = getenv("TMPDIR");
char *template = NULL, *dir, *runtime_dir = NULL, *daemon_path = NULL,
*client_path = NULL, *socket_path = NULL;
struct data d;
pid_t daemon;
int res;
if (getenv("MESON_EXE_WRAPPER") != NULL) {
fprintf(stderr, "skipping live daemon test under Meson exe wrapper\n");
return PWTEST_SKIP;
}
if (tmpdir == NULL)
tmpdir = "/tmp";
pwtest_int_ge(asprintf(&template, "%s/pw-filter-chain-test-XXXXXX", tmpdir), 0);
dir = mkdtemp(template);
pwtest_ptr_notnull(dir);
pwtest_int_ge(asprintf(&runtime_dir, "%s/run", dir), 0);
pwtest_errno_ok(mkdir(runtime_dir, 0700));
pwtest_int_ge(asprintf(&daemon_path, "%s/pipewire.conf", dir), 0);
pwtest_int_ge(asprintf(&client_path, "%s/client.conf", dir), 0);
pwtest_neg_errno_ok(write_file(daemon_path, daemon_conf));
pwtest_neg_errno_ok(write_file(client_path, client_conf));
daemon = start_daemon(dir, runtime_dir);
pwtest_int_gt(daemon, 0);
cleanup_daemon_pid = daemon;
atexit(cleanup_daemon);
pwtest_int_ge(asprintf(&socket_path, "%s/pipewire-0", runtime_dir), 0);
for (uint32_t i = 0; i < 50 && access(socket_path, F_OK) < 0; i++)
usleep(100000);
pwtest_errno_ok(access(socket_path, F_OK));
setenv("PIPEWIRE_CONFIG_DIR", dir, 1);
setenv("PIPEWIRE_RUNTIME_DIR", runtime_dir, 1);
setenv("PIPEWIRE_REMOTE", "pipewire-0", 1);
spa_zero(d);
pw_init(0, NULL);
d.loop = pw_main_loop_new(NULL);
pwtest_ptr_notnull(d.loop);
d.context = pw_context_new(pw_main_loop_get_loop(d.loop), NULL, 0);
pwtest_ptr_notnull(d.context);
d.core = pw_context_connect(d.context, NULL, 0);
pwtest_ptr_notnull(d.core);
d.registry = pw_core_get_registry(d.core, PW_VERSION_REGISTRY, 0);
pwtest_ptr_notnull(d.registry);
pw_registry_add_listener(d.registry, &d.registry_listener,
&registry_events, &d);
res = setup_streams(&d);
pwtest_neg_errno_ok(res);
iterate_streams_ready(&d);
pwtest_bool_true(d.source_ready);
pwtest_bool_true(d.sink_ready);
pwtest_bool_true(d.control_ready);
iterate_nodes_ready(&d);
pwtest_bool_true(d.audio_source_ready);
pwtest_bool_true(d.audio_sink_ready);
pwtest_bool_true(d.control_monitor_ready);
pwtest_bool_true(d.effect_input_ready);
pwtest_bool_true(d.effect_output_ready);
pwtest_bool_true(d.notify_ready);
res = setup_links(&d);
pwtest_neg_errno_ok(res);
iterate_links_ready(&d);
pwtest_bool_true(d.source_link_ready);
pwtest_bool_true(d.sink_link_ready);
pwtest_bool_true(d.control_link_ready);
pwtest_bool_true(d.source_streaming);
pwtest_bool_true(d.sink_streaming);
pwtest_bool_true(d.control_streaming);
iterate_notify(&d);
pwtest_bool_true(d.got_notify);
pwtest_double_gt(d.notify_value, 0.0);
if (d.source_link)
pw_proxy_destroy(d.source_link);
if (d.sink_link)
pw_proxy_destroy(d.sink_link);
if (d.control_link)
pw_proxy_destroy(d.control_link);
if (d.source)
pw_stream_destroy(d.source);
if (d.sink)
pw_stream_destroy(d.sink);
if (d.control)
pw_stream_destroy(d.control);
if (d.registry)
pw_proxy_destroy((struct pw_proxy *)d.registry);
if (d.core)
pw_core_disconnect(d.core);
if (d.context)
pw_context_destroy(d.context);
if (d.loop)
pw_main_loop_destroy(d.loop);
pw_deinit();
stop_daemon(daemon);
unlink(daemon_path);
unlink(client_path);
rmdir(runtime_dir);
rmdir(dir);
free(socket_path);
free(client_path);
free(daemon_path);
free(runtime_dir);
free(template);
return PWTEST_PASS;
}
PWTEST_SUITE(filter_chain)
{
pwtest_add(filter_chain_exported_control_stream, PWTEST_NOARG);
return PWTEST_PASS;
}

344
test/test-filter-graph.c Normal file
View file

@ -0,0 +1,344 @@
/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2026 PipeWire authors */
/* SPDX-License-Identifier: MIT */
#include "config.h"
#include "pwtest.h"
#include <string.h>
#include <pipewire/pipewire.h>
#include <spa/filter-graph/filter-graph.h>
#include <spa/param/props.h>
#include <spa/pod/dynamic.h>
#include <spa/pod/iter.h>
#include <spa/pod/parser.h>
#include <spa/utils/keys.h>
#define EXPORT_RAMP_CURRENT "\"ramp:Current\" "
#define EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \
EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \
EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \
EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \
EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT
#define EXPORT_RAMP_CURRENT_100 EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10
#define EXPORT_RAMP_CURRENT_160 EXPORT_RAMP_CURRENT_100 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \
EXPORT_RAMP_CURRENT_10
static bool is_current_notify_info(struct spa_pod *pod)
{
const char *name = NULL;
if (pod == NULL || !spa_pod_is_object(pod))
return false;
if (SPA_POD_OBJECT_ID(pod) != SPA_PARAM_PropInfo)
return false;
if (spa_pod_parse_object(pod, SPA_TYPE_OBJECT_PropInfo, NULL,
SPA_PROP_INFO_name, SPA_POD_OPT_String(&name)) < 0)
return false;
return spa_streq(name, "ramp:Current");
}
static bool has_current_notify(struct spa_pod *pod)
{
struct spa_pod_object *obj = (struct spa_pod_object *)pod;
const struct spa_pod_prop *prop;
SPA_POD_OBJECT_FOREACH(obj, prop) {
struct spa_pod_parser prs;
struct spa_pod_frame f;
if (prop->key != SPA_PROP_params)
continue;
spa_pod_parser_pod(&prs, &prop->value);
if (spa_pod_parser_push_struct(&prs, &f) < 0)
return false;
while (true) {
const char *name;
float value;
if (spa_pod_parser_get_string(&prs, &name) < 0)
break;
if (spa_pod_parser_get_float(&prs, &value) < 0)
break;
if (spa_streq(name, "ramp:Current") && value > 0.0f)
return true;
}
}
return false;
}
PWTEST(export_controls)
{
struct pw_loop *loop;
struct pw_context *context;
struct pw_properties *props;
struct spa_handle *handle;
struct spa_filter_graph *graph;
struct spa_pod_dynamic_builder b;
struct spa_pod *pod = NULL;
float out[64];
void *outputs[1] = { out };
int res;
pw_init(0, NULL);
loop = pw_loop_new(NULL);
pwtest_ptr_notnull(loop);
context = pw_context_new(loop, pw_properties_new(
PW_KEY_CONFIG_NAME, "null",
"context.modules.allow-empty", "true",
"support.dbus", "false",
NULL), 0);
pwtest_ptr_notnull(context);
pw_context_add_spa_lib(context,
"^filter\\.graph\\.plugin\\.builtin$",
"filter-graph/libspa-filter-graph-plugin-builtin");
pw_context_add_spa_lib(context,
"^filter\\.graph$",
"filter-graph/libspa-filter-graph");
props = pw_properties_new(
"clock.quantum-limit", "128",
"filter-graph.n_inputs", "0",
"filter-graph.n_outputs", "1",
SPA_KEY_LIBRARY_NAME, "filter-graph/libspa-filter-graph",
"filter.graph",
"{ nodes = [ { type = builtin name = ramp label = ramp "
"control = { Start = 0.0 Stop = 1.0 \"Duration (s)\" = 0.001 } } ] "
"outputs = [ \"ramp:Out\" ] "
"export.controls = [ \"ramp:Current\" ] }",
NULL);
pwtest_ptr_notnull(props);
handle = pw_context_load_spa_handle(context, "filter.graph", &props->dict);
pwtest_ptr_notnull(handle);
res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_FilterGraph,
(void **)&graph);
pwtest_neg_errno_ok(res);
pwtest_ptr_notnull(graph);
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
res = spa_filter_graph_enum_notify_info(graph, 0, &b.b, &pod);
pwtest_int_eq(res, 1);
pwtest_ptr_notnull(pod);
pwtest_bool_true(is_current_notify_info(pod));
spa_pod_dynamic_builder_clean(&b);
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
res = spa_filter_graph_enum_notify_info(graph, 1, &b.b, &pod);
pwtest_int_eq(res, 0);
spa_pod_dynamic_builder_clean(&b);
res = spa_filter_graph_activate(graph,
&SPA_DICT_ITEMS(SPA_DICT_ITEM(PW_KEY_AUDIO_RATE, "48000")));
pwtest_neg_errno_ok(res);
spa_zero(out);
res = spa_filter_graph_process(graph, NULL, outputs, SPA_N_ELEMENTS(out));
pwtest_neg_errno_ok(res);
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
res = spa_filter_graph_get_notify(graph, &b.b, &pod);
pwtest_int_eq(res, 1);
pwtest_ptr_notnull(pod);
pwtest_bool_true(has_current_notify(pod));
spa_pod_dynamic_builder_clean(&b);
spa_filter_graph_deactivate(graph);
pw_unload_spa_handle(handle);
pw_properties_free(props);
pw_context_destroy(context);
pw_loop_destroy(loop);
pw_deinit();
return PWTEST_PASS;
}
PWTEST(export_controls_large_payload)
{
struct pw_loop *loop;
struct pw_context *context;
struct pw_properties *props;
struct spa_handle *handle;
struct spa_filter_graph *graph;
struct spa_pod_dynamic_builder b;
struct spa_pod *pod = NULL;
float out[64];
void *outputs[1] = { out };
int res;
pw_init(0, NULL);
loop = pw_loop_new(NULL);
pwtest_ptr_notnull(loop);
context = pw_context_new(loop, pw_properties_new(
PW_KEY_CONFIG_NAME, "null",
"context.modules.allow-empty", "true",
"support.dbus", "false",
NULL), 0);
pwtest_ptr_notnull(context);
pw_context_add_spa_lib(context,
"^filter\\.graph\\.plugin\\.builtin$",
"filter-graph/libspa-filter-graph-plugin-builtin");
pw_context_add_spa_lib(context,
"^filter\\.graph$",
"filter-graph/libspa-filter-graph");
props = pw_properties_new(
"clock.quantum-limit", "128",
"filter-graph.n_inputs", "0",
"filter-graph.n_outputs", "1",
SPA_KEY_LIBRARY_NAME, "filter-graph/libspa-filter-graph",
"filter.graph",
"{ nodes = [ { type = builtin name = ramp label = ramp "
"control = { Start = 0.0 Stop = 1.0 \"Duration (s)\" = 0.001 } } ] "
"outputs = [ \"ramp:Out\" ] "
"export.controls = [ " EXPORT_RAMP_CURRENT_160 "] }",
NULL);
pwtest_ptr_notnull(props);
handle = pw_context_load_spa_handle(context, "filter.graph", &props->dict);
pwtest_ptr_notnull(handle);
res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_FilterGraph,
(void **)&graph);
pwtest_neg_errno_ok(res);
pwtest_ptr_notnull(graph);
res = spa_filter_graph_activate(graph,
&SPA_DICT_ITEMS(SPA_DICT_ITEM(PW_KEY_AUDIO_RATE, "48000")));
pwtest_neg_errno_ok(res);
spa_zero(out);
res = spa_filter_graph_process(graph, NULL, outputs, SPA_N_ELEMENTS(out));
pwtest_neg_errno_ok(res);
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
res = spa_filter_graph_get_notify(graph, &b.b, &pod);
pwtest_int_eq(res, 1);
pwtest_ptr_notnull(pod);
pwtest_int_gt((int)b.b.state.offset, 4096);
pwtest_bool_true(has_current_notify(pod));
spa_pod_dynamic_builder_clean(&b);
spa_filter_graph_deactivate(graph);
pw_unload_spa_handle(handle);
pw_properties_free(props);
pw_context_destroy(context);
pw_loop_destroy(loop);
pw_deinit();
return PWTEST_PASS;
}
PWTEST(export_controls_duplicated_graph_uses_scalar_values)
{
struct pw_loop *loop;
struct pw_context *context;
struct pw_properties *props;
struct spa_handle *handle;
struct spa_filter_graph *graph;
struct spa_pod_dynamic_builder b;
struct spa_pod *pod = NULL;
float in[2][64], out[2][64];
const void *inputs[2] = { in[0], in[1] };
void *outputs[2] = { out[0], out[1] };
int res;
pw_init(0, NULL);
loop = pw_loop_new(NULL);
pwtest_ptr_notnull(loop);
context = pw_context_new(loop, pw_properties_new(
PW_KEY_CONFIG_NAME, "null",
"context.modules.allow-empty", "true",
"support.dbus", "false",
NULL), 0);
pwtest_ptr_notnull(context);
pw_context_add_spa_lib(context,
"^filter\\.graph\\.plugin\\.builtin$",
"filter-graph/libspa-filter-graph-plugin-builtin");
pw_context_add_spa_lib(context,
"^filter\\.graph$",
"filter-graph/libspa-filter-graph");
props = pw_properties_new(
"clock.quantum-limit", "128",
"filter-graph.n_inputs", "2",
"filter-graph.n_outputs", "2",
SPA_KEY_LIBRARY_NAME, "filter-graph/libspa-filter-graph",
"filter.graph",
"{ nodes = [ "
"{ type = builtin name = copy label = copy } "
"{ type = builtin name = ramp label = ramp "
"control = { Start = 0.0 Stop = 1.0 \"Duration (s)\" = 0.001 } } ] "
"inputs = [ \"copy:In\" ] "
"outputs = [ \"copy:Out\" ] "
"export.controls = [ \"ramp:Current\" ] }",
NULL);
pwtest_ptr_notnull(props);
handle = pw_context_load_spa_handle(context, "filter.graph", &props->dict);
pwtest_ptr_notnull(handle);
res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_FilterGraph,
(void **)&graph);
pwtest_neg_errno_ok(res);
pwtest_ptr_notnull(graph);
res = spa_filter_graph_activate(graph,
&SPA_DICT_ITEMS(SPA_DICT_ITEM(PW_KEY_AUDIO_RATE, "48000")));
pwtest_neg_errno_ok(res);
spa_zero(in);
spa_zero(out);
res = spa_filter_graph_process(graph, inputs, outputs, SPA_N_ELEMENTS(out[0]));
pwtest_neg_errno_ok(res);
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
res = spa_filter_graph_get_notify(graph, &b.b, &pod);
pwtest_int_eq(res, 1);
pwtest_ptr_notnull(pod);
pwtest_bool_true(has_current_notify(pod));
spa_pod_dynamic_builder_clean(&b);
spa_filter_graph_deactivate(graph);
pw_unload_spa_handle(handle);
pw_properties_free(props);
pw_context_destroy(context);
pw_loop_destroy(loop);
pw_deinit();
return PWTEST_PASS;
}
PWTEST_SUITE(filter_graph)
{
pwtest_add(export_controls, PWTEST_NOARG);
pwtest_add(export_controls_large_payload, PWTEST_NOARG);
pwtest_add(export_controls_duplicated_graph_uses_scalar_values, PWTEST_NOARG);
return PWTEST_PASS;
}
#undef EXPORT_RAMP_CURRENT_160
#undef EXPORT_RAMP_CURRENT_100
#undef EXPORT_RAMP_CURRENT_10
#undef EXPORT_RAMP_CURRENT