pipewire/test/test-filter-chain.c
2026-05-10 05:33:47 +02:00

713 lines
20 KiB
C

/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2026 PipeWire authors */
/* SPDX-License-Identifier: MIT */
#include "config.h"
#include "pwtest.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pipewire/pipewire.h>
#include <pipewire/link.h>
#include <spa/control/control.h>
#include <spa/param/audio/raw.h>
#include <spa/param/audio/raw-utils.h>
#include <spa/param/format.h>
#include <spa/param/props.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include <spa/utils/result.h>
#include <spa/utils/string.h>
struct data {
struct pw_main_loop *loop;
struct pw_context *context;
struct pw_core *core;
struct pw_registry *registry;
struct pw_stream *source;
struct pw_stream *sink;
struct pw_stream *control;
struct spa_hook registry_listener;
struct spa_hook source_listener;
struct spa_hook sink_listener;
struct spa_hook control_listener;
struct pw_proxy *source_link;
struct pw_proxy *sink_link;
struct pw_proxy *control_link;
struct spa_hook source_link_listener;
struct spa_hook sink_link_listener;
struct spa_hook control_link_listener;
bool source_ready;
bool sink_ready;
bool control_ready;
bool source_streaming;
bool sink_streaming;
bool control_streaming;
bool audio_source_ready;
bool audio_sink_ready;
bool control_monitor_ready;
bool effect_input_ready;
bool effect_output_ready;
bool notify_ready;
bool source_link_ready;
bool sink_link_ready;
bool control_link_ready;
bool got_notify;
float notify_value;
};
static pid_t cleanup_daemon_pid;
static const char daemon_conf[] =
"context.properties = {\n"
" core.daemon = true\n"
" core.name = pipewire-0\n"
" support.dbus = false\n"
" default.clock.rate = 48000\n"
" default.clock.quantum = 64\n"
" default.clock.min-quantum = 64\n"
" default.clock.max-quantum = 64\n"
" settings.check-quantum = false\n"
" settings.check-rate = false\n"
"}\n"
"context.spa-libs = {\n"
" support.* = support/libspa-support\n"
" audio.convert.* = audioconvert/libspa-audioconvert\n"
" audio.adapt = audioconvert/libspa-audioconvert\n"
" filter.graph.plugin.builtin = filter-graph/libspa-filter-graph-plugin-builtin\n"
" filter.graph = filter-graph/libspa-filter-graph\n"
"}\n"
"context.modules = [\n"
" { name = libpipewire-module-protocol-native }\n"
" { name = libpipewire-module-scheduler-v1 }\n"
" { name = libpipewire-module-access args = { access.socket = { pipewire-0 = unrestricted } } }\n"
" { name = libpipewire-module-client-node }\n"
" { name = libpipewire-module-adapter }\n"
" { name = libpipewire-module-spa-node-factory }\n"
" { name = libpipewire-module-link-factory }\n"
" { name = libpipewire-module-filter-chain\n"
" args = {\n"
" node.name = \"test-filter-chain\"\n"
" node.description = \"test filter chain\"\n"
" capture.props = {\n"
" node.name = \"effect_input.test-filter-chain\"\n"
" media.class = \"Audio/Sink\"\n"
" audio.channels = 1\n"
" audio.position = [ MONO ]\n"
" }\n"
" playback.props = {\n"
" node.name = \"effect_output.test-filter-chain\"\n"
" media.class = \"Audio/Source\"\n"
" audio.channels = 1\n"
" audio.position = [ MONO ]\n"
" }\n"
" filter.graph = {\n"
" nodes = [\n"
" { type = builtin name = copy label = copy }\n"
" { type = builtin name = ramp label = ramp\n"
" control = {\n"
" Start = 0.0\n"
" Stop = 1.0\n"
" \"Duration (s)\" = 0.001\n"
" }\n"
" }\n"
" ]\n"
" inputs = [ \"copy:In\" ]\n"
" outputs = [ \"copy:Out\" ]\n"
" export.controls = [ \"ramp:Current\" ]\n"
" export.interval.ms = 0\n"
" }\n"
" }\n"
" }\n"
"]\n"
"stream.properties = {\n"
" adapter.auto-port-config = { mode = dsp }\n"
"}\n"
"context.objects = [\n"
" { factory = spa-node-factory\n"
" args = {\n"
" factory.name = support.node.driver\n"
" node.name = Dummy-Driver\n"
" node.group = pipewire.dummy\n"
" priority.driver = 20000\n"
" }\n"
" }\n"
"]\n";
static const char client_conf[] =
"context.properties = {\n"
" support.dbus = false\n"
"}\n"
"context.spa-libs = {\n"
" support.* = support/libspa-support\n"
" audio.convert.* = audioconvert/libspa-audioconvert\n"
" audio.adapt = audioconvert/libspa-audioconvert\n"
"}\n"
"context.modules = [\n"
" { name = libpipewire-module-protocol-native }\n"
" { name = libpipewire-module-client-node }\n"
" { name = libpipewire-module-adapter }\n"
"]\n"
"stream.properties = {\n"
" adapter.auto-port-config = { mode = dsp }\n"
"}\n";
static int write_file(const char *path, const char *content)
{
FILE *f;
f = fopen(path, "w");
if (f == NULL)
return -errno;
if (fputs(content, f) < 0) {
int res = errno > 0 ? -errno : -EIO;
fclose(f);
return res;
}
if (fclose(f) < 0)
return -errno;
return 0;
}
static bool parse_params_struct(const struct spa_pod *pod, float *value)
{
struct spa_pod_parser prs;
struct spa_pod_frame f;
if (pod == NULL || !spa_pod_is_struct(pod))
return false;
spa_pod_parser_pod(&prs, pod);
if (spa_pod_parser_push_struct(&prs, &f) < 0)
return false;
while (true) {
const char *name;
float v;
if (spa_pod_parser_get_string(&prs, &name) < 0)
break;
if (spa_pod_parser_get_float(&prs, &v) < 0)
break;
if (spa_streq(name, "ramp:Current")) {
*value = v;
return true;
}
}
return false;
}
static bool parse_control_buffer(const void *data, uint32_t size, float *value)
{
struct spa_pod_parser parser;
struct spa_pod_frame frame;
struct spa_pod_sequence seq;
const void *seq_body, *c_body;
struct spa_pod_control c;
spa_pod_parser_init_from_data(&parser, data, size, 0, size);
if (spa_pod_parser_push_sequence_body(&parser, &frame, &seq, &seq_body) < 0)
return false;
while (spa_pod_parser_get_control_body(&parser, &c, &c_body) >= 0) {
const struct spa_pod *params = NULL;
if (c.type != SPA_CONTROL_Properties)
continue;
if (spa_pod_body_parse_object(&c.value, c_body,
SPA_TYPE_OBJECT_Props, NULL,
SPA_PROP_params, SPA_POD_OPT_Pod(&params)) < 0)
continue;
if (parse_params_struct(params, value))
return true;
}
return false;
}
static void source_process(void *userdata)
{
struct data *d = userdata;
struct pw_buffer *b;
while ((b = pw_stream_dequeue_buffer(d->source)) != NULL) {
struct spa_buffer *buf = b->buffer;
if (buf->n_datas > 0) {
struct spa_data *bd = &buf->datas[0];
uint32_t n_frames;
n_frames = b->requested;
if (n_frames == 0)
n_frames = bd->maxsize / sizeof(float);
n_frames = SPA_MIN(n_frames, bd->maxsize / sizeof(float));
if (bd->data != NULL)
memset(bd->data, 0, n_frames * sizeof(float));
bd->chunk->offset = 0;
bd->chunk->size = n_frames * sizeof(float);
bd->chunk->stride = sizeof(float);
}
pw_stream_queue_buffer(d->source, b);
}
}
static void sink_process(void *userdata)
{
struct data *d = userdata;
struct pw_buffer *b;
while ((b = pw_stream_dequeue_buffer(d->sink)) != NULL)
pw_stream_queue_buffer(d->sink, b);
}
static void control_process(void *userdata)
{
struct data *d = userdata;
struct pw_buffer *b;
while ((b = pw_stream_dequeue_buffer(d->control)) != NULL) {
struct spa_buffer *buf = b->buffer;
if (buf->n_datas > 0) {
struct spa_data *bd = &buf->datas[0];
uint32_t offset, size;
void *ptr;
offset = SPA_MIN(bd->chunk->offset, bd->maxsize);
size = SPA_MIN(bd->chunk->size, bd->maxsize - offset);
ptr = SPA_PTROFF(bd->data, offset, void);
if (ptr != NULL && size > 0 &&
parse_control_buffer(ptr, size, &d->notify_value)) {
d->got_notify = true;
pw_main_loop_quit(d->loop);
}
}
pw_stream_queue_buffer(d->control, b);
}
}
static void source_state_changed(void *userdata, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct data *d = userdata;
if (state == PW_STREAM_STATE_ERROR)
pwtest_fail_with_msg("source stream error: %s", error);
if (state == PW_STREAM_STATE_PAUSED ||
state == PW_STREAM_STATE_STREAMING)
d->source_ready = true;
d->source_streaming = state == PW_STREAM_STATE_STREAMING;
}
static void sink_state_changed(void *userdata, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct data *d = userdata;
if (state == PW_STREAM_STATE_ERROR)
pwtest_fail_with_msg("sink stream error: %s", error);
if (state == PW_STREAM_STATE_PAUSED ||
state == PW_STREAM_STATE_STREAMING)
d->sink_ready = true;
d->sink_streaming = state == PW_STREAM_STATE_STREAMING;
}
static void control_state_changed(void *userdata, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct data *d = userdata;
if (state == PW_STREAM_STATE_ERROR)
pwtest_fail_with_msg("control stream error: %s", error);
if (state == PW_STREAM_STATE_PAUSED ||
state == PW_STREAM_STATE_STREAMING)
d->control_ready = true;
d->control_streaming = state == PW_STREAM_STATE_STREAMING;
}
static const struct pw_stream_events source_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = source_state_changed,
.process = source_process,
};
static const struct pw_stream_events sink_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = sink_state_changed,
.process = sink_process,
};
static const struct pw_stream_events control_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = control_state_changed,
.process = control_process,
};
static void registry_global(void *userdata, uint32_t id, uint32_t permissions,
const char *type, uint32_t version, const struct spa_dict *props)
{
struct data *d = userdata;
const char *name;
if (!spa_streq(type, PW_TYPE_INTERFACE_Node) || props == NULL)
return;
name = spa_dict_lookup(props, PW_KEY_NODE_NAME);
if (name == NULL)
return;
if (spa_streq(name, "audio-source"))
d->audio_source_ready = true;
else if (spa_streq(name, "audio-sink"))
d->audio_sink_ready = true;
else if (spa_streq(name, "control-monitor"))
d->control_monitor_ready = true;
else if (spa_streq(name, "effect_input.test-filter-chain"))
d->effect_input_ready = true;
else if (spa_streq(name, "effect_output.test-filter-chain"))
d->effect_output_ready = true;
else if (spa_streq(name, "notify.test-filter-chain"))
d->notify_ready = true;
}
static const struct pw_registry_events registry_events = {
PW_VERSION_REGISTRY_EVENTS,
.global = registry_global,
};
static void link_info(void *userdata, const struct pw_link_info *info)
{
bool *ready = userdata;
if (info->state == PW_LINK_STATE_ACTIVE)
*ready = true;
}
static const struct pw_link_events link_events = {
PW_VERSION_LINK_EVENTS,
.info = link_info,
};
static int create_link(struct data *d, const char *output, const char *input,
struct pw_proxy **link)
{
struct pw_properties *props;
props = pw_properties_new(
PW_KEY_LINK_OUTPUT_NODE, output,
PW_KEY_LINK_INPUT_NODE, input,
NULL);
if (props == NULL)
return -errno;
*link = pw_core_create_object(d->core,
"link-factory",
PW_TYPE_INTERFACE_Link,
PW_VERSION_LINK,
&props->dict,
0);
pw_properties_free(props);
return *link == NULL ? -errno : 0;
}
static void iterate_streams_ready(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 200 &&
(!d->source_ready || !d->sink_ready || !d->control_ready); i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static void iterate_nodes_ready(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 400 &&
(!d->audio_source_ready || !d->audio_sink_ready ||
!d->control_monitor_ready || !d->effect_input_ready ||
!d->effect_output_ready || !d->notify_ready); i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static void iterate_links_ready(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 200 &&
(!d->source_link_ready || !d->sink_link_ready ||
!d->control_link_ready || !d->source_streaming ||
!d->sink_streaming || !d->control_streaming); i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static void iterate_notify(struct data *d)
{
struct pw_loop *loop = pw_main_loop_get_loop(d->loop);
pw_loop_enter(loop);
for (uint32_t i = 0; i < 200 && !d->got_notify; i++)
pw_loop_iterate(loop, 50);
pw_loop_leave(loop);
}
static int setup_streams(struct data *d)
{
struct spa_pod_builder b;
uint8_t buffer[1024];
const struct spa_pod *audio_params[1];
const struct spa_pod *control_params[1];
struct spa_audio_info_raw audio_info = SPA_AUDIO_INFO_RAW_INIT(
.format = SPA_AUDIO_FORMAT_F32P,
.rate = 48000,
.channels = 1,
.position = { SPA_AUDIO_CHANNEL_MONO });
int res;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
audio_params[0] = spa_format_audio_raw_build(&b,
SPA_PARAM_EnumFormat, &audio_info);
d->source = pw_stream_new(d->core, "audio source",
pw_properties_new(PW_KEY_NODE_NAME, "audio-source", NULL));
if (d->source == NULL)
return -errno;
pw_stream_add_listener(d->source, &d->source_listener,
&source_events, d);
res = pw_stream_connect(d->source, PW_DIRECTION_OUTPUT, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS,
audio_params, 1);
if (res < 0)
return res;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
audio_params[0] = spa_format_audio_raw_build(&b,
SPA_PARAM_EnumFormat, &audio_info);
d->sink = pw_stream_new(d->core, "audio sink",
pw_properties_new(PW_KEY_NODE_NAME, "audio-sink", NULL));
if (d->sink == NULL)
return -errno;
pw_stream_add_listener(d->sink, &d->sink_listener,
&sink_events, d);
res = pw_stream_connect(d->sink, PW_DIRECTION_INPUT, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS,
audio_params, 1);
if (res < 0)
return res;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
control_params[0] = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control));
d->control = pw_stream_new(d->core, "control monitor",
pw_properties_new(
PW_KEY_NODE_NAME, "control-monitor",
PW_KEY_MEDIA_CLASS, "Stream/Input/Data",
PW_KEY_FORMAT_DSP, "8 bit raw control",
NULL));
if (d->control == NULL)
return -errno;
pw_stream_add_listener(d->control, &d->control_listener,
&control_events, d);
return pw_stream_connect(d->control, PW_DIRECTION_INPUT, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS,
control_params, 1);
}
static int setup_links(struct data *d)
{
int res;
res = create_link(d, "audio-source", "effect_input.test-filter-chain",
&d->source_link);
if (res < 0)
return res;
res = create_link(d, "effect_output.test-filter-chain", "audio-sink",
&d->sink_link);
if (res < 0)
return res;
res = create_link(d, "notify.test-filter-chain", "control-monitor",
&d->control_link);
if (res < 0)
return res;
pw_proxy_add_object_listener(d->source_link, &d->source_link_listener,
&link_events, &d->source_link_ready);
pw_proxy_add_object_listener(d->sink_link, &d->sink_link_listener,
&link_events, &d->sink_link_ready);
pw_proxy_add_object_listener(d->control_link, &d->control_link_listener,
&link_events, &d->control_link_ready);
return 0;
}
static pid_t start_daemon(const char *conf_dir, const char *runtime_dir)
{
pid_t pid = fork();
if (pid == 0) {
setenv("PIPEWIRE_CONFIG_DIR", conf_dir, 1);
setenv("PIPEWIRE_RUNTIME_DIR", runtime_dir, 1);
unsetenv("PIPEWIRE_REMOTE");
unsetenv("PIPEWIRE_CORE");
execl(BUILD_ROOT "/src/daemon/pipewire",
BUILD_ROOT "/src/daemon/pipewire",
"-c", "pipewire.conf",
(char *)NULL);
_exit(127);
}
return pid;
}
static void stop_daemon(pid_t pid)
{
int status;
if (pid <= 0)
return;
kill(pid, SIGTERM);
waitpid(pid, &status, 0);
if (cleanup_daemon_pid == pid)
cleanup_daemon_pid = 0;
}
static void cleanup_daemon(void)
{
stop_daemon(cleanup_daemon_pid);
}
PWTEST(filter_chain_exported_control_stream)
{
const char *tmpdir = getenv("TMPDIR");
char *template = NULL, *dir, *runtime_dir = NULL, *daemon_path = NULL,
*client_path = NULL, *socket_path = NULL;
struct data d;
pid_t daemon;
int res;
if (getenv("MESON_EXE_WRAPPER") != NULL) {
fprintf(stderr, "skipping live daemon test under Meson exe wrapper\n");
return PWTEST_SKIP;
}
if (tmpdir == NULL)
tmpdir = "/tmp";
pwtest_int_ge(asprintf(&template, "%s/pw-filter-chain-test-XXXXXX", tmpdir), 0);
dir = mkdtemp(template);
pwtest_ptr_notnull(dir);
pwtest_int_ge(asprintf(&runtime_dir, "%s/run", dir), 0);
pwtest_errno_ok(mkdir(runtime_dir, 0700));
pwtest_int_ge(asprintf(&daemon_path, "%s/pipewire.conf", dir), 0);
pwtest_int_ge(asprintf(&client_path, "%s/client.conf", dir), 0);
pwtest_neg_errno_ok(write_file(daemon_path, daemon_conf));
pwtest_neg_errno_ok(write_file(client_path, client_conf));
daemon = start_daemon(dir, runtime_dir);
pwtest_int_gt(daemon, 0);
cleanup_daemon_pid = daemon;
atexit(cleanup_daemon);
pwtest_int_ge(asprintf(&socket_path, "%s/pipewire-0", runtime_dir), 0);
for (uint32_t i = 0; i < 50 && access(socket_path, F_OK) < 0; i++)
usleep(100000);
pwtest_errno_ok(access(socket_path, F_OK));
setenv("PIPEWIRE_CONFIG_DIR", dir, 1);
setenv("PIPEWIRE_RUNTIME_DIR", runtime_dir, 1);
setenv("PIPEWIRE_REMOTE", "pipewire-0", 1);
spa_zero(d);
pw_init(0, NULL);
d.loop = pw_main_loop_new(NULL);
pwtest_ptr_notnull(d.loop);
d.context = pw_context_new(pw_main_loop_get_loop(d.loop), NULL, 0);
pwtest_ptr_notnull(d.context);
d.core = pw_context_connect(d.context, NULL, 0);
pwtest_ptr_notnull(d.core);
d.registry = pw_core_get_registry(d.core, PW_VERSION_REGISTRY, 0);
pwtest_ptr_notnull(d.registry);
pw_registry_add_listener(d.registry, &d.registry_listener,
&registry_events, &d);
res = setup_streams(&d);
pwtest_neg_errno_ok(res);
iterate_streams_ready(&d);
pwtest_bool_true(d.source_ready);
pwtest_bool_true(d.sink_ready);
pwtest_bool_true(d.control_ready);
iterate_nodes_ready(&d);
pwtest_bool_true(d.audio_source_ready);
pwtest_bool_true(d.audio_sink_ready);
pwtest_bool_true(d.control_monitor_ready);
pwtest_bool_true(d.effect_input_ready);
pwtest_bool_true(d.effect_output_ready);
pwtest_bool_true(d.notify_ready);
res = setup_links(&d);
pwtest_neg_errno_ok(res);
iterate_links_ready(&d);
pwtest_bool_true(d.source_link_ready);
pwtest_bool_true(d.sink_link_ready);
pwtest_bool_true(d.control_link_ready);
pwtest_bool_true(d.source_streaming);
pwtest_bool_true(d.sink_streaming);
pwtest_bool_true(d.control_streaming);
iterate_notify(&d);
pwtest_bool_true(d.got_notify);
pwtest_double_gt(d.notify_value, 0.0);
if (d.source_link)
pw_proxy_destroy(d.source_link);
if (d.sink_link)
pw_proxy_destroy(d.sink_link);
if (d.control_link)
pw_proxy_destroy(d.control_link);
if (d.source)
pw_stream_destroy(d.source);
if (d.sink)
pw_stream_destroy(d.sink);
if (d.control)
pw_stream_destroy(d.control);
if (d.registry)
pw_proxy_destroy((struct pw_proxy *)d.registry);
if (d.core)
pw_core_disconnect(d.core);
if (d.context)
pw_context_destroy(d.context);
if (d.loop)
pw_main_loop_destroy(d.loop);
pw_deinit();
stop_daemon(daemon);
unlink(daemon_path);
unlink(client_path);
rmdir(runtime_dir);
rmdir(dir);
free(socket_path);
free(client_path);
free(daemon_path);
free(runtime_dir);
free(template);
return PWTEST_PASS;
}
PWTEST_SUITE(filter_chain)
{
pwtest_add(filter_chain_exported_control_stream, PWTEST_NOARG);
return PWTEST_PASS;
}