From f8725009bab065e4c37fc90e66a9e864e3c2e4cd Mon Sep 17 00:00:00 2001 From: bhack <14409-bhack@users.noreply.gitlab.freedesktop.org> Date: Sun, 10 May 2026 04:36:50 +0200 Subject: [PATCH] filter-chain: export notify controls --- spa/include/spa/filter-graph/filter-graph.h | 20 +- spa/plugins/filter-graph/filter-graph.c | 218 +++++- src/modules/module-filter-chain.c | 285 ++++++++ test/meson.build | 49 ++ test/test-filter-chain.c | 713 ++++++++++++++++++++ test/test-filter-graph.c | 344 ++++++++++ 6 files changed, 1618 insertions(+), 11 deletions(-) create mode 100644 test/test-filter-chain.c create mode 100644 test/test-filter-graph.c diff --git a/spa/include/spa/filter-graph/filter-graph.h b/spa/include/spa/filter-graph/filter-graph.h index 481085c5f..0b3136532 100644 --- a/spa/include/spa/filter-graph/filter-graph.h +++ b/spa/include/spa/filter-graph/filter-graph.h @@ -64,7 +64,7 @@ struct spa_filter_graph_events { }; struct spa_filter_graph_methods { -#define SPA_VERSION_FILTER_GRAPH_METHODS 0 +#define SPA_VERSION_FILTER_GRAPH_METHODS 1 uint32_t version; int (*add_listener) (void *object, @@ -83,6 +83,10 @@ struct spa_filter_graph_methods { int (*reset) (void *object); int (*process) (void *object, const void *in[], void *out[], uint32_t n_samples); + + int (*enum_notify_info) (void *object, uint32_t idx, struct spa_pod_builder *b, + struct spa_pod **param); + int (*get_notify) (void *object, struct spa_pod_builder *b, struct spa_pod **props); }; SPA_API_FILTER_GRAPH int spa_filter_graph_add_listener(struct spa_filter_graph *object, @@ -138,6 +142,19 @@ SPA_API_FILTER_GRAPH int spa_filter_graph_process(struct spa_filter_graph *objec spa_filter_graph, &object->iface, process, 0, in, out, n_samples); } +SPA_API_FILTER_GRAPH int spa_filter_graph_enum_notify_info(struct spa_filter_graph *object, + uint32_t idx, struct spa_pod_builder *b, struct spa_pod **param) +{ + return spa_api_method_r(int, -ENOTSUP, + spa_filter_graph, &object->iface, enum_notify_info, 1, idx, b, param); +} +SPA_API_FILTER_GRAPH int spa_filter_graph_get_notify(struct spa_filter_graph *object, + struct spa_pod_builder *b, struct spa_pod **props) +{ + return spa_api_method_r(int, -ENOTSUP, + spa_filter_graph, &object->iface, get_notify, 1, b, props); +} + /** * \} */ @@ -147,4 +164,3 @@ SPA_API_FILTER_GRAPH int spa_filter_graph_process(struct spa_filter_graph *objec #endif #endif /* SPA_FILTER_GRAPH_H */ - diff --git a/spa/plugins/filter-graph/filter-graph.c b/spa/plugins/filter-graph/filter-graph.c index cab91e11e..669cabf58 100644 --- a/spa/plugins/filter-graph/filter-graph.c +++ b/spa/plugins/filter-graph/filter-graph.c @@ -44,6 +44,7 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.filter-graph"); #define MAX_CHANNELS SPA_AUDIO_MAX_CHANNELS #define DEFAULT_RATE 48000 +#define DEFAULT_EXPORT_INTERVAL_MS 33u #define spa_filter_graph_emit(hooks,method,version,...) \ spa_hook_list_call_simple(hooks, struct spa_filter_graph_events, \ @@ -189,12 +190,19 @@ struct graph { uint32_t n_control; struct port **control_port; + uint32_t n_notify; + struct port **notify_port; + uint32_t n_input_names; char **input_names; uint32_t n_output_names; char **output_names; + uint32_t n_export_control_names; + char **export_control_names; + uint32_t notify_interval_ms; + struct volume volume[2]; uint32_t default_inputs; @@ -266,7 +274,8 @@ static void emit_filter_graph_info(struct impl *impl, bool full) if (impl->info.change_mask || full) { char n_inputs[64], n_outputs[64], latency[64]; char n_default_inputs[64], n_default_outputs[64]; - struct spa_dict_item items[6]; + char export_interval[64]; + struct spa_dict_item items[8]; struct spa_dict dict = SPA_DICT(items, 0); char in_pos[MAX_CHANNELS * 8]; char out_pos[MAX_CHANNELS * 8]; @@ -292,6 +301,11 @@ static void emit_filter_graph_info(struct impl *impl, bool full) graph->n_outputs_position, graph->outputs_position); items[dict.n_items++] = SPA_DICT_ITEM("outputs.audio.position", out_pos); } + if (graph->n_export_control_names > 0) { + snprintf(export_interval, sizeof(export_interval), "%u", + graph->notify_interval_ms); + items[dict.n_items++] = SPA_DICT_ITEM("export.interval.ms", export_interval); + } items[dict.n_items++] = SPA_DICT_ITEM("latency", spa_dtoa(latency, sizeof(latency), (graph->min_latency + graph->max_latency) / 2.0f)); @@ -456,6 +470,18 @@ static void get_ranges(struct impl *impl, struct spa_fga_port *p, } } +static void port_get_name(struct port *port, char *name, size_t size) +{ + struct node *node = port->node; + const struct spa_fga_descriptor *d = node->desc->desc; + struct spa_fga_port *p = &d->ports[port->p]; + + if (node->name[0] != '\0') + snprintf(name, size, "%s:%s", node->name, p->name); + else + snprintf(name, size, "%s", p->name); +} + static int impl_enum_prop_info(void *object, uint32_t idx, struct spa_pod_builder *b, struct spa_pod **param) { @@ -482,10 +508,7 @@ static int impl_enum_prop_info(void *object, uint32_t idx, struct spa_pod_builde get_ranges(impl, p, &def, &min, &max); - if (node->name[0] != '\0') - snprintf(name, sizeof(name), "%s:%s", node->name, p->name); - else - snprintf(name, sizeof(name), "%s", p->name); + port_get_name(port, name, sizeof(name)); spa_pod_builder_push_object(b, &f[0], SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo); @@ -557,10 +580,7 @@ static int impl_get_props(void *object, struct spa_pod_builder *b, struct spa_po struct spa_fga_port *p = &d->ports[port->p]; float v, min, max; - if (node->name[0] != '\0') - snprintf(name, sizeof(name), "%s:%s", node->name, p->name); - else - snprintf(name, sizeof(name), "%s", p->name); + port_get_name(port, name, sizeof(name)); if (port->control_initialized) v = port->control_current; @@ -585,6 +605,119 @@ static int impl_get_props(void *object, struct spa_pod_builder *b, struct spa_po return 1; } +static int impl_enum_notify_info(void *object, uint32_t idx, struct spa_pod_builder *b, + struct spa_pod **param) +{ + struct impl *impl = object; + struct graph *graph = &impl->graph; + struct spa_pod *pod; + struct spa_pod_frame f[2]; + struct port *port; + struct node *node; + const struct spa_fga_descriptor *d; + struct spa_fga_port *p; + float def, min, max; + char name[512]; + + if (idx >= graph->n_notify) + return 0; + + port = graph->notify_port[idx]; + node = port->node; + d = node->desc->desc; + p = &d->ports[port->p]; + + get_ranges(impl, p, &def, &min, &max); + port_get_name(port, name, sizeof(name)); + + spa_pod_builder_push_object(b, &f[0], + SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo); + spa_pod_builder_add (b, + SPA_PROP_INFO_name, SPA_POD_String(name), + 0); + spa_pod_builder_prop(b, SPA_PROP_INFO_type, 0); + if (p->hint & SPA_FGA_HINT_BOOLEAN) { + if (min == max) { + spa_pod_builder_bool(b, def <= 0.0f ? false : true); + } else { + spa_pod_builder_push_choice(b, &f[1], SPA_CHOICE_Enum, 0); + spa_pod_builder_bool(b, def <= 0.0f ? false : true); + spa_pod_builder_bool(b, false); + spa_pod_builder_bool(b, true); + spa_pod_builder_pop(b, &f[1]); + } + } else if (p->hint & SPA_FGA_HINT_INTEGER) { + if (min == max) { + spa_pod_builder_int(b, (int32_t)def); + } else { + spa_pod_builder_push_choice(b, &f[1], SPA_CHOICE_Range, 0); + spa_pod_builder_int(b, (int32_t)def); + spa_pod_builder_int(b, (int32_t)min); + spa_pod_builder_int(b, (int32_t)max); + spa_pod_builder_pop(b, &f[1]); + } + } else { + if (min == max) { + spa_pod_builder_float(b, def); + } else { + spa_pod_builder_push_choice(b, &f[1], SPA_CHOICE_Range, 0); + spa_pod_builder_float(b, def); + spa_pod_builder_float(b, min); + spa_pod_builder_float(b, max); + spa_pod_builder_pop(b, &f[1]); + } + } + spa_pod_builder_prop(b, SPA_PROP_INFO_params, 0); + spa_pod_builder_bool(b, true); + pod = spa_pod_builder_pop(b, &f[0]); + if (pod == NULL) + return -ENOSPC; + if (param) + *param = pod; + + return 1; +} + +static int impl_get_notify(void *object, struct spa_pod_builder *b, struct spa_pod **props) +{ + struct impl *impl = object; + struct graph *graph = &impl->graph; + struct spa_pod_frame f[2]; + struct spa_pod *res; + uint32_t i; + char name[512]; + + spa_pod_builder_push_object(b, &f[0], + SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); + spa_pod_builder_prop(b, SPA_PROP_params, 0); + spa_pod_builder_push_struct(b, &f[1]); + + for (i = 0; i < graph->n_notify; i++) { + struct port *port = graph->notify_port[i]; + const struct spa_fga_descriptor *d = port->node->desc->desc; + struct spa_fga_port *p = &d->ports[port->p]; + float v = port->control_data[0]; + + port_get_name(port, name, sizeof(name)); + spa_pod_builder_string(b, name); + + if (p->hint & SPA_FGA_HINT_BOOLEAN) { + spa_pod_builder_bool(b, v <= 0.0f ? false : true); + } else if (p->hint & SPA_FGA_HINT_INTEGER) { + spa_pod_builder_int(b, (int32_t)v); + } else { + spa_pod_builder_float(b, v); + } + } + spa_pod_builder_pop(b, &f[1]); + res = spa_pod_builder_pop(b, &f[0]); + if (res == NULL) + return -ENOSPC; + if (props) + *props = res; + return 1; +} + static int port_id_set_control_value(struct port *port, uint32_t id, float value) { struct node *node = port->node; @@ -2149,6 +2282,34 @@ static int setup_graph_controls(struct graph *graph) return 0; } +static int setup_graph_notify(struct graph *graph) +{ + struct impl *impl = graph->impl; + struct node *first; + uint32_t i; + + if (graph->n_export_control_names == 0) + return 0; + + graph->notify_port = calloc(graph->n_export_control_names, sizeof(struct port *)); + if (graph->notify_port == NULL) + return -errno; + + first = spa_list_first(&graph->node_list, struct node, link); + for (i = 0; i < graph->n_export_control_names; i++) { + const char *name = graph->export_control_names[i]; + struct port *port; + + port = find_port(first, name, SPA_FGA_PORT_OUTPUT | SPA_FGA_PORT_CONTROL); + if (port == NULL) { + spa_log_error(impl->log, "export control port %s not found", name); + return -ENOENT; + } + graph->notify_port[graph->n_notify++] = port; + } + return 0; +} + /** * filter.graph = { * nodes = [ @@ -2174,6 +2335,7 @@ static int load_graph(struct graph *graph, const struct spa_dict *props) struct spa_json inputs, outputs, *pinputs = NULL, *poutputs = NULL; struct spa_json ivolumes, ovolumes, *pivolumes = NULL, *povolumes = NULL; struct spa_json nodes, *pnodes = NULL, links, *plinks = NULL; + struct spa_json export_controls, *pexport_controls = NULL; struct node *first, *last; const char *json, *val; char key[256]; @@ -2181,6 +2343,7 @@ static int load_graph(struct graph *graph, const struct spa_dict *props) spa_list_init(&graph->node_list); spa_list_init(&graph->link_list); + graph->notify_interval_ms = DEFAULT_EXPORT_INTERVAL_MS; if ((json = spa_dict_lookup(props, "filter.graph")) == NULL) { spa_log_error(impl->log, "missing filter.graph property"); @@ -2278,6 +2441,21 @@ static int load_graph(struct graph *graph, const struct spa_dict *props) } spa_json_enter(&it[0], &ovolumes); povolumes = &ovolumes; + } + else if (spa_streq("export.controls", key)) { + if (!spa_json_is_array(val, len)) { + spa_log_error(impl->log, "%s expects an array", key); + return -EINVAL; + } + spa_json_enter(&it[0], &export_controls); + pexport_controls = &export_controls; + } + else if (spa_streq("export.interval.ms", key)) { + if (spa_json_parse_int(val, len, &res) <= 0 || res < 0) { + spa_log_error(impl->log, "%s expects a non-negative integer", key); + return -EINVAL; + } + graph->notify_interval_ms = res; } else { spa_log_warn(impl->log, "unexpected graph key '%s'", key); } @@ -2338,8 +2516,23 @@ static int load_graph(struct graph *graph, const struct spa_dict *props) graph->n_output_names++; } } + if (pexport_controls != NULL) { + graph->n_export_control_names = count_array(pexport_controls); + graph->export_control_names = calloc(graph->n_export_control_names, sizeof(char *)); + if (graph->export_control_names == NULL) + return -ENOMEM; + graph->n_export_control_names = 0; + while (spa_json_get_string(pexport_controls, key, sizeof(key)) > 0) { + graph->export_control_names[graph->n_export_control_names] = strdup(key); + if (graph->export_control_names[graph->n_export_control_names] == NULL) + return -ENOMEM; + graph->n_export_control_names++; + } + } if ((res = setup_graph_controls(graph)) < 0) return res; + if ((res = setup_graph_notify(graph)) < 0) + return res; first = spa_list_first(&graph->node_list, struct node, link); last = spa_list_last(&graph->node_list, struct node, link); @@ -2379,8 +2572,13 @@ static void graph_free(struct graph *graph) for (i = 0; i < graph->n_output_names; i++) free(graph->output_names[i]); free(graph->output_names); + for (i = 0; i < graph->n_export_control_names; i++) + free(graph->export_control_names[i]); + free(graph->export_control_names); free(graph->control_port); graph->control_port = NULL; + free(graph->notify_port); + graph->notify_port = NULL; } static const struct spa_filter_graph_methods impl_filter_graph = { @@ -2393,6 +2591,8 @@ static const struct spa_filter_graph_methods impl_filter_graph = { .deactivate = impl_deactivate, .reset = impl_reset, .process = impl_process, + .enum_notify_info = impl_enum_notify_info, + .get_notify = impl_get_notify, }; static int impl_get_interface(struct spa_handle *handle, const char *type, void **interface) diff --git a/src/modules/module-filter-chain.c b/src/modules/module-filter-chain.c index f4f96bcb2..34bea2f05 100644 --- a/src/modules/module-filter-chain.c +++ b/src/modules/module-filter-chain.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,7 @@ #include #include #include +#include #include #include @@ -88,6 +90,8 @@ extern struct spa_handle_factory spa_filter_graph_factory; * playback.volumes = [ * { control = min = max = scale = } ... * ] + * export.controls = [ ... ] + * export.interval.ms = 33 * } *\endcode * @@ -217,6 +221,29 @@ extern struct spa_handle_factory spa_filter_graph_factory; * default this is linear but it can be set to cubic when the control applies a * cubic transformation. * + * ### Exported Controls + * + * Output control ports can be exported as a PipeWire control stream for metering + * and status monitoring. The port names use the same `:` + * syntax as links. Exported values are emitted as `SPA_CONTROL_Properties` on a + * read-only `application/control` output stream. Each exported port is emitted + * as a scalar value. When the graph is duplicated to match the channel count, + * the value from the first graph instance is exported. + * + * The `export.interval.ms` value controls how often values are emitted. The default + * is 33ms. A value of 0 emits on every processed quantum when a control stream + * buffer is available. + * + *\code{.unparsed} + * filter.graph = { + * nodes = [ + * { type = builtin name = ramp label = ramp } + * ] + * export.controls = [ "ramp:Current" ] + * export.interval.ms = 33 + * } + *\endcode + * * ## Builtin filters * * There are some useful builtin filters available. The type should be `builtin` and @@ -1225,6 +1252,7 @@ static const struct spa_dict_item module_props[] = { #define DEFAULT_RATE 48000 #define MAX_DATAS 1024u +#define CONTROL_STREAM_MIN_BUFFER_SIZE 4096u struct impl { struct pw_context *context; @@ -1248,6 +1276,14 @@ struct impl { struct spa_hook playback_listener; struct spa_audio_info_raw playback_info; + struct pw_properties *control_props; + struct pw_stream *control; + struct spa_hook control_listener; + uint32_t control_buffer_size; + uint64_t control_interval; + uint64_t control_last_time; + bool control_buffer_size_warned; + struct spa_audio_info_raw info; struct spa_io_position *position; @@ -1274,6 +1310,80 @@ static void capture_destroy(void *d) impl->capture = NULL; } +static int emit_control_stream(struct impl *impl) +{ + struct pw_buffer *buf; + struct spa_buffer *buffer; + struct spa_data *bd; + struct spa_pod_builder b; + struct spa_pod_frame f; + uint64_t now = 0; + int res; + + if (impl->control == NULL || !impl->graph_active) + return 0; + + if (impl->position) + now = impl->position->clock.nsec; + if (impl->control_interval > 0 && + now > 0 && + impl->control_last_time > 0 && + now >= impl->control_last_time && + now - impl->control_last_time < impl->control_interval) + return 0; + + buf = pw_stream_dequeue_buffer(impl->control); + if (buf == NULL) + return 0; + + buffer = buf->buffer; + if (buffer->n_datas == 0) + goto queue_empty; + + bd = &buffer->datas[0]; + if (bd->data == NULL || bd->chunk == NULL || + bd->maxsize < impl->control_buffer_size) { + if (!impl->control_buffer_size_warned) { + pw_log_warn("%p: control stream buffer too small: %u < %u", + impl, bd->maxsize, impl->control_buffer_size); + impl->control_buffer_size_warned = true; + } + goto queue_empty; + } + + spa_pod_builder_init(&b, bd->data, bd->maxsize); + spa_pod_builder_push_sequence(&b, &f, 0); + spa_pod_builder_control(&b, 0, SPA_CONTROL_Properties); + res = spa_filter_graph_get_notify(impl->graph, &b, NULL); + if (res < 0) { + if (!impl->control_buffer_size_warned) { + pw_log_warn("%p: can't build control stream buffer: %s", + impl, spa_strerror(res)); + impl->control_buffer_size_warned = true; + } + goto queue_empty; + } + spa_pod_builder_pop(&b, &f); + + bd->chunk->offset = 0; + bd->chunk->size = b.state.offset; + bd->chunk->stride = 0; + pw_stream_queue_buffer(impl->control, buf); + impl->control_last_time = now; + impl->control_buffer_size_warned = false; + return 0; + +queue_empty: + if (buffer->n_datas > 0 && buffer->datas[0].chunk != NULL) { + bd = &buffer->datas[0]; + bd->chunk->offset = 0; + bd->chunk->size = 0; + bd->chunk->stride = 0; + } + pw_stream_queue_buffer(impl->control, buf); + return 0; +} + static void do_process(struct impl *impl) { struct pw_buffer *in, *out; @@ -1345,6 +1455,7 @@ static void do_process(struct impl *impl) cout[n_out++] = NULL; spa_filter_graph_process(impl->graph, cin, cout, data_size / sizeof(float)); + emit_control_stream(impl); } if (in != NULL) @@ -1673,6 +1784,169 @@ static const struct pw_stream_events out_stream_events = { .param_changed = playback_param_changed, }; +static void control_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->control_listener); + impl->control = NULL; +} + +static void control_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct impl *impl = data; + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + pw_log_info("module %p: control stream unconnected", impl); + break; + case PW_STREAM_STATE_ERROR: + pw_log_info("module %p: control stream error: %s", impl, error); + break; + default: + break; + } +} + +static const struct pw_stream_events control_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = control_destroy, + .state_changed = control_state_changed, +}; + +static int get_control_stream_buffer_size(struct impl *impl, uint32_t *size) +{ + struct spa_pod_dynamic_builder b; + struct spa_pod_frame f; + struct spa_pod *pod; + int res; + + spa_pod_dynamic_builder_init(&b, NULL, 0, CONTROL_STREAM_MIN_BUFFER_SIZE); + res = spa_pod_builder_push_sequence(&b.b, &f, 0); + if (res >= 0) + res = spa_pod_builder_control(&b.b, 0, SPA_CONTROL_Properties); + if (res >= 0) + res = spa_filter_graph_get_notify(impl->graph, &b.b, NULL); + pod = spa_pod_builder_pop(&b.b, &f); + if (res >= 0 && pod == NULL) + res = -ENOSPC; + if (res >= 0 && b.b.state.offset > INT_MAX) + res = -EOVERFLOW; + if (res >= 0) + *size = SPA_MAX((uint32_t)b.b.state.offset, + CONTROL_STREAM_MIN_BUFFER_SIZE); + spa_pod_dynamic_builder_clean(&b); + return res; +} + +static int setup_control_stream(struct impl *impl) +{ + struct spa_pod_dynamic_builder b; + struct pw_array offsets; + const struct spa_pod **params = NULL; + uint32_t i, n_params, *offs; + int res = 0; + + spa_pod_dynamic_builder_init(&b, NULL, 0, CONTROL_STREAM_MIN_BUFFER_SIZE); + pw_array_init(&offsets, 512); + + for (i = 0;; i++) { + uint32_t save = b.b.state.offset; + res = spa_filter_graph_enum_notify_info(impl->graph, i, &b.b, NULL); + if (res != 1) { + if (res < 0) + goto done; + break; + } + if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) { + res = -errno; + goto done; + } + *offs = save; + } + if (i == 0) + goto done; + + if ((res = get_control_stream_buffer_size(impl, &impl->control_buffer_size)) < 0) + goto done; + + if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) { + res = -errno; + goto done; + } + *offs = b.b.state.offset; + spa_pod_builder_add_object(&b.b, + SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); + + if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) { + res = -errno; + goto done; + } + *offs = b.b.state.offset; + spa_pod_builder_add_object(&b.b, + SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, 32), + SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( + impl->control_buffer_size, impl->control_buffer_size, INT_MAX), + SPA_PARAM_BUFFERS_stride, SPA_POD_Int(1)); + + n_params = pw_array_get_len(&offsets, uint32_t); + params = calloc(n_params, sizeof(struct spa_pod *)); + if (params == NULL) { + res = -errno; + goto done; + } + + offs = offsets.data; + for (i = 0; i < n_params; i++) + params[i] = spa_pod_builder_deref(&b.b, offs[i]); + + impl->control_props = pw_properties_new( + PW_KEY_MEDIA_TYPE, "Control", + PW_KEY_MEDIA_CATEGORY, "Monitor", + PW_KEY_MEDIA_ROLE, "DSP", + PW_KEY_MEDIA_CLASS, "Stream/Output/Data", + PW_KEY_NODE_AUTOCONNECT, "false", + PW_KEY_FORMAT_DSP, "8 bit raw control", + NULL); + if (impl->control_props == NULL) { + res = -errno; + goto done; + } + pw_properties_setf(impl->control_props, PW_KEY_NODE_NAME, "notify.%s", + pw_properties_get(impl->props, PW_KEY_NODE_NAME)); + pw_properties_setf(impl->control_props, PW_KEY_MEDIA_NAME, "%s notify", + pw_properties_get(impl->props, PW_KEY_NODE_DESCRIPTION)); + + impl->control = pw_stream_new(impl->core, + "filter notify", impl->control_props); + impl->control_props = NULL; + if (impl->control == NULL) { + res = -errno; + goto done; + } + + pw_stream_add_listener(impl->control, + &impl->control_listener, + &control_stream_events, impl); + + res = pw_stream_connect(impl->control, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params); + +done: + free(params); + pw_array_clear(&offsets); + spa_pod_dynamic_builder_clean(&b); + return res < 0 ? res : 0; +} + static int setup_streams(struct impl *impl) { int res = 0; @@ -1796,6 +2070,8 @@ static int setup_streams(struct impl *impl) flags, params, n_params); } + if (res >= 0) + res = setup_control_stream(impl); spa_pod_dynamic_builder_clean(&b); done: @@ -1862,6 +2138,10 @@ static void graph_info(void *object, const struct spa_filter_graph_info *info) pw_log_info("using default outputs %d", val); impl->playback_info.channels = val; } + else if (spa_streq(k, "export.interval.ms") && + spa_atou32(s, &val, 0)) { + impl->control_interval = (uint64_t)val * SPA_NSEC_PER_MSEC; + } } if (impl->capture_info.channels == impl->playback_info.channels) { copy_position(&impl->capture_info, &impl->playback_info); @@ -1943,11 +2223,15 @@ static void impl_destroy(struct impl *impl) pw_stream_disconnect(impl->capture); if (impl->playback) pw_stream_disconnect(impl->playback); + if (impl->control) + pw_stream_disconnect(impl->control); if (impl->capture) pw_stream_destroy(impl->capture); if (impl->playback) pw_stream_destroy(impl->playback); + if (impl->control) + pw_stream_destroy(impl->control); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -1957,6 +2241,7 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->capture_props); pw_properties_free(impl->playback_props); + pw_properties_free(impl->control_props); pw_properties_free(impl->props); free(impl); diff --git a/test/meson.build b/test/meson.build index a9ca3fc77..86f3655fa 100644 --- a/test/meson.build +++ b/test/meson.build @@ -107,6 +107,55 @@ test('test-support', dependencies: [spa_dep, systemd_dep, spa_support_dep, spa_journal_dep], link_with: [pwtest_lib]) ) + +test('test-filter-graph', + executable('test-filter-graph', + 'test-filter-graph.c', + include_directories: pwtest_inc, + dependencies: [spa_dep, pipewire_dep], + link_with: [pwtest_lib]), + env : [ + 'SPA_PLUGIN_DIR=@0@'.format(meson.project_build_root() / 'spa/plugins'), + 'SPA_DATA_DIR=@0@'.format(meson.project_source_root() / 'spa/plugins'), + ], + depends : [ + spa_support_lib, + spa_filter_graph, + spa_filter_graph_plugin_builtin, + ], +) + +if get_option('audioconvert').allowed() +test('test-filter-chain', + executable('test-filter-chain', + 'test-filter-chain.c', + c_args: pwtest_c_args, + include_directories: pwtest_inc, + dependencies: [spa_dep, pipewire_dep], + link_with: [pwtest_lib]), + env : [ + 'SPA_PLUGIN_DIR=@0@'.format(meson.project_build_root() / 'spa/plugins'), + 'SPA_DATA_DIR=@0@'.format(meson.project_source_root() / 'spa/plugins'), + 'PIPEWIRE_MODULE_DIR=@0@'.format(meson.project_build_root() / 'src/modules'), + 'LD_LIBRARY_PATH=@0@'.format(meson.project_build_root() / 'src/pipewire'), + ], + depends : [ + pipewire_exec, + pipewire_module_protocol_native, + pipewire_module_scheduler_v1, + pipewire_module_access, + pipewire_module_client_node, + pipewire_module_adapter, + pipewire_module_spa_node_factory, + pipewire_module_link_factory, + pipewire_module_filter_chain, + spa_support_lib, + spa_audioconvert_lib, + spa_filter_graph, + spa_filter_graph_plugin_builtin, + ], +) +endif endif test('test-spa', executable('test-spa', diff --git a/test/test-filter-chain.c b/test/test-filter-chain.c new file mode 100644 index 000000000..575888de1 --- /dev/null +++ b/test/test-filter-chain.c @@ -0,0 +1,713 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 PipeWire authors */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" +#include "pwtest.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct data { + struct pw_main_loop *loop; + struct pw_context *context; + struct pw_core *core; + struct pw_registry *registry; + struct pw_stream *source; + struct pw_stream *sink; + struct pw_stream *control; + struct spa_hook registry_listener; + struct spa_hook source_listener; + struct spa_hook sink_listener; + struct spa_hook control_listener; + struct pw_proxy *source_link; + struct pw_proxy *sink_link; + struct pw_proxy *control_link; + struct spa_hook source_link_listener; + struct spa_hook sink_link_listener; + struct spa_hook control_link_listener; + bool source_ready; + bool sink_ready; + bool control_ready; + bool source_streaming; + bool sink_streaming; + bool control_streaming; + bool audio_source_ready; + bool audio_sink_ready; + bool control_monitor_ready; + bool effect_input_ready; + bool effect_output_ready; + bool notify_ready; + bool source_link_ready; + bool sink_link_ready; + bool control_link_ready; + bool got_notify; + float notify_value; +}; + +static pid_t cleanup_daemon_pid; + +static const char daemon_conf[] = +"context.properties = {\n" +" core.daemon = true\n" +" core.name = pipewire-0\n" +" support.dbus = false\n" +" default.clock.rate = 48000\n" +" default.clock.quantum = 64\n" +" default.clock.min-quantum = 64\n" +" default.clock.max-quantum = 64\n" +" settings.check-quantum = false\n" +" settings.check-rate = false\n" +"}\n" +"context.spa-libs = {\n" +" support.* = support/libspa-support\n" +" audio.convert.* = audioconvert/libspa-audioconvert\n" +" audio.adapt = audioconvert/libspa-audioconvert\n" +" filter.graph.plugin.builtin = filter-graph/libspa-filter-graph-plugin-builtin\n" +" filter.graph = filter-graph/libspa-filter-graph\n" +"}\n" +"context.modules = [\n" +" { name = libpipewire-module-protocol-native }\n" +" { name = libpipewire-module-scheduler-v1 }\n" +" { name = libpipewire-module-access args = { access.socket = { pipewire-0 = unrestricted } } }\n" +" { name = libpipewire-module-client-node }\n" +" { name = libpipewire-module-adapter }\n" +" { name = libpipewire-module-spa-node-factory }\n" +" { name = libpipewire-module-link-factory }\n" +" { name = libpipewire-module-filter-chain\n" +" args = {\n" +" node.name = \"test-filter-chain\"\n" +" node.description = \"test filter chain\"\n" +" capture.props = {\n" +" node.name = \"effect_input.test-filter-chain\"\n" +" media.class = \"Audio/Sink\"\n" +" audio.channels = 1\n" +" audio.position = [ MONO ]\n" +" }\n" +" playback.props = {\n" +" node.name = \"effect_output.test-filter-chain\"\n" +" media.class = \"Audio/Source\"\n" +" audio.channels = 1\n" +" audio.position = [ MONO ]\n" +" }\n" +" filter.graph = {\n" +" nodes = [\n" +" { type = builtin name = copy label = copy }\n" +" { type = builtin name = ramp label = ramp\n" +" control = {\n" +" Start = 0.0\n" +" Stop = 1.0\n" +" \"Duration (s)\" = 0.001\n" +" }\n" +" }\n" +" ]\n" +" inputs = [ \"copy:In\" ]\n" +" outputs = [ \"copy:Out\" ]\n" +" export.controls = [ \"ramp:Current\" ]\n" +" export.interval.ms = 0\n" +" }\n" +" }\n" +" }\n" +"]\n" +"stream.properties = {\n" +" adapter.auto-port-config = { mode = dsp }\n" +"}\n" +"context.objects = [\n" +" { factory = spa-node-factory\n" +" args = {\n" +" factory.name = support.node.driver\n" +" node.name = Dummy-Driver\n" +" node.group = pipewire.dummy\n" +" priority.driver = 20000\n" +" }\n" +" }\n" +"]\n"; + +static const char client_conf[] = +"context.properties = {\n" +" support.dbus = false\n" +"}\n" +"context.spa-libs = {\n" +" support.* = support/libspa-support\n" +" audio.convert.* = audioconvert/libspa-audioconvert\n" +" audio.adapt = audioconvert/libspa-audioconvert\n" +"}\n" +"context.modules = [\n" +" { name = libpipewire-module-protocol-native }\n" +" { name = libpipewire-module-client-node }\n" +" { name = libpipewire-module-adapter }\n" +"]\n" +"stream.properties = {\n" +" adapter.auto-port-config = { mode = dsp }\n" +"}\n"; + +static int write_file(const char *path, const char *content) +{ + FILE *f; + + f = fopen(path, "w"); + if (f == NULL) + return -errno; + if (fputs(content, f) < 0) { + int res = errno > 0 ? -errno : -EIO; + fclose(f); + return res; + } + if (fclose(f) < 0) + return -errno; + return 0; +} + +static bool parse_params_struct(const struct spa_pod *pod, float *value) +{ + struct spa_pod_parser prs; + struct spa_pod_frame f; + + if (pod == NULL || !spa_pod_is_struct(pod)) + return false; + + spa_pod_parser_pod(&prs, pod); + if (spa_pod_parser_push_struct(&prs, &f) < 0) + return false; + + while (true) { + const char *name; + float v; + + if (spa_pod_parser_get_string(&prs, &name) < 0) + break; + if (spa_pod_parser_get_float(&prs, &v) < 0) + break; + if (spa_streq(name, "ramp:Current")) { + *value = v; + return true; + } + } + return false; +} + +static bool parse_control_buffer(const void *data, uint32_t size, float *value) +{ + struct spa_pod_parser parser; + struct spa_pod_frame frame; + struct spa_pod_sequence seq; + const void *seq_body, *c_body; + struct spa_pod_control c; + + spa_pod_parser_init_from_data(&parser, data, size, 0, size); + if (spa_pod_parser_push_sequence_body(&parser, &frame, &seq, &seq_body) < 0) + return false; + + while (spa_pod_parser_get_control_body(&parser, &c, &c_body) >= 0) { + const struct spa_pod *params = NULL; + + if (c.type != SPA_CONTROL_Properties) + continue; + if (spa_pod_body_parse_object(&c.value, c_body, + SPA_TYPE_OBJECT_Props, NULL, + SPA_PROP_params, SPA_POD_OPT_Pod(¶ms)) < 0) + continue; + if (parse_params_struct(params, value)) + return true; + } + return false; +} + +static void source_process(void *userdata) +{ + struct data *d = userdata; + struct pw_buffer *b; + + while ((b = pw_stream_dequeue_buffer(d->source)) != NULL) { + struct spa_buffer *buf = b->buffer; + + if (buf->n_datas > 0) { + struct spa_data *bd = &buf->datas[0]; + uint32_t n_frames; + + n_frames = b->requested; + if (n_frames == 0) + n_frames = bd->maxsize / sizeof(float); + n_frames = SPA_MIN(n_frames, bd->maxsize / sizeof(float)); + if (bd->data != NULL) + memset(bd->data, 0, n_frames * sizeof(float)); + bd->chunk->offset = 0; + bd->chunk->size = n_frames * sizeof(float); + bd->chunk->stride = sizeof(float); + } + pw_stream_queue_buffer(d->source, b); + } +} + +static void sink_process(void *userdata) +{ + struct data *d = userdata; + struct pw_buffer *b; + + while ((b = pw_stream_dequeue_buffer(d->sink)) != NULL) + pw_stream_queue_buffer(d->sink, b); +} + +static void control_process(void *userdata) +{ + struct data *d = userdata; + struct pw_buffer *b; + + while ((b = pw_stream_dequeue_buffer(d->control)) != NULL) { + struct spa_buffer *buf = b->buffer; + + if (buf->n_datas > 0) { + struct spa_data *bd = &buf->datas[0]; + uint32_t offset, size; + void *ptr; + + offset = SPA_MIN(bd->chunk->offset, bd->maxsize); + size = SPA_MIN(bd->chunk->size, bd->maxsize - offset); + ptr = SPA_PTROFF(bd->data, offset, void); + if (ptr != NULL && size > 0 && + parse_control_buffer(ptr, size, &d->notify_value)) { + d->got_notify = true; + pw_main_loop_quit(d->loop); + } + } + pw_stream_queue_buffer(d->control, b); + } +} + +static void source_state_changed(void *userdata, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct data *d = userdata; + + if (state == PW_STREAM_STATE_ERROR) + pwtest_fail_with_msg("source stream error: %s", error); + if (state == PW_STREAM_STATE_PAUSED || + state == PW_STREAM_STATE_STREAMING) + d->source_ready = true; + d->source_streaming = state == PW_STREAM_STATE_STREAMING; +} + +static void sink_state_changed(void *userdata, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct data *d = userdata; + + if (state == PW_STREAM_STATE_ERROR) + pwtest_fail_with_msg("sink stream error: %s", error); + if (state == PW_STREAM_STATE_PAUSED || + state == PW_STREAM_STATE_STREAMING) + d->sink_ready = true; + d->sink_streaming = state == PW_STREAM_STATE_STREAMING; +} + +static void control_state_changed(void *userdata, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct data *d = userdata; + + if (state == PW_STREAM_STATE_ERROR) + pwtest_fail_with_msg("control stream error: %s", error); + if (state == PW_STREAM_STATE_PAUSED || + state == PW_STREAM_STATE_STREAMING) + d->control_ready = true; + d->control_streaming = state == PW_STREAM_STATE_STREAMING; +} + +static const struct pw_stream_events source_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = source_state_changed, + .process = source_process, +}; + +static const struct pw_stream_events sink_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = sink_state_changed, + .process = sink_process, +}; + +static const struct pw_stream_events control_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = control_state_changed, + .process = control_process, +}; + +static void registry_global(void *userdata, uint32_t id, uint32_t permissions, + const char *type, uint32_t version, const struct spa_dict *props) +{ + struct data *d = userdata; + const char *name; + + if (!spa_streq(type, PW_TYPE_INTERFACE_Node) || props == NULL) + return; + + name = spa_dict_lookup(props, PW_KEY_NODE_NAME); + if (name == NULL) + return; + + if (spa_streq(name, "audio-source")) + d->audio_source_ready = true; + else if (spa_streq(name, "audio-sink")) + d->audio_sink_ready = true; + else if (spa_streq(name, "control-monitor")) + d->control_monitor_ready = true; + else if (spa_streq(name, "effect_input.test-filter-chain")) + d->effect_input_ready = true; + else if (spa_streq(name, "effect_output.test-filter-chain")) + d->effect_output_ready = true; + else if (spa_streq(name, "notify.test-filter-chain")) + d->notify_ready = true; +} + +static const struct pw_registry_events registry_events = { + PW_VERSION_REGISTRY_EVENTS, + .global = registry_global, +}; + +static void link_info(void *userdata, const struct pw_link_info *info) +{ + bool *ready = userdata; + + if (info->state == PW_LINK_STATE_ACTIVE) + *ready = true; +} + +static const struct pw_link_events link_events = { + PW_VERSION_LINK_EVENTS, + .info = link_info, +}; + +static int create_link(struct data *d, const char *output, const char *input, + struct pw_proxy **link) +{ + struct pw_properties *props; + + props = pw_properties_new( + PW_KEY_LINK_OUTPUT_NODE, output, + PW_KEY_LINK_INPUT_NODE, input, + NULL); + if (props == NULL) + return -errno; + + *link = pw_core_create_object(d->core, + "link-factory", + PW_TYPE_INTERFACE_Link, + PW_VERSION_LINK, + &props->dict, + 0); + pw_properties_free(props); + return *link == NULL ? -errno : 0; +} + +static void iterate_streams_ready(struct data *d) +{ + struct pw_loop *loop = pw_main_loop_get_loop(d->loop); + + pw_loop_enter(loop); + for (uint32_t i = 0; i < 200 && + (!d->source_ready || !d->sink_ready || !d->control_ready); i++) + pw_loop_iterate(loop, 50); + pw_loop_leave(loop); +} + +static void iterate_nodes_ready(struct data *d) +{ + struct pw_loop *loop = pw_main_loop_get_loop(d->loop); + + pw_loop_enter(loop); + for (uint32_t i = 0; i < 400 && + (!d->audio_source_ready || !d->audio_sink_ready || + !d->control_monitor_ready || !d->effect_input_ready || + !d->effect_output_ready || !d->notify_ready); i++) + pw_loop_iterate(loop, 50); + pw_loop_leave(loop); +} + +static void iterate_links_ready(struct data *d) +{ + struct pw_loop *loop = pw_main_loop_get_loop(d->loop); + + pw_loop_enter(loop); + for (uint32_t i = 0; i < 200 && + (!d->source_link_ready || !d->sink_link_ready || + !d->control_link_ready || !d->source_streaming || + !d->sink_streaming || !d->control_streaming); i++) + pw_loop_iterate(loop, 50); + pw_loop_leave(loop); +} + +static void iterate_notify(struct data *d) +{ + struct pw_loop *loop = pw_main_loop_get_loop(d->loop); + + pw_loop_enter(loop); + for (uint32_t i = 0; i < 200 && !d->got_notify; i++) + pw_loop_iterate(loop, 50); + pw_loop_leave(loop); +} + +static int setup_streams(struct data *d) +{ + struct spa_pod_builder b; + uint8_t buffer[1024]; + const struct spa_pod *audio_params[1]; + const struct spa_pod *control_params[1]; + struct spa_audio_info_raw audio_info = SPA_AUDIO_INFO_RAW_INIT( + .format = SPA_AUDIO_FORMAT_F32P, + .rate = 48000, + .channels = 1, + .position = { SPA_AUDIO_CHANNEL_MONO }); + int res; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + audio_params[0] = spa_format_audio_raw_build(&b, + SPA_PARAM_EnumFormat, &audio_info); + d->source = pw_stream_new(d->core, "audio source", + pw_properties_new(PW_KEY_NODE_NAME, "audio-source", NULL)); + if (d->source == NULL) + return -errno; + pw_stream_add_listener(d->source, &d->source_listener, + &source_events, d); + res = pw_stream_connect(d->source, PW_DIRECTION_OUTPUT, PW_ID_ANY, + PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS, + audio_params, 1); + if (res < 0) + return res; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + audio_params[0] = spa_format_audio_raw_build(&b, + SPA_PARAM_EnumFormat, &audio_info); + d->sink = pw_stream_new(d->core, "audio sink", + pw_properties_new(PW_KEY_NODE_NAME, "audio-sink", NULL)); + if (d->sink == NULL) + return -errno; + pw_stream_add_listener(d->sink, &d->sink_listener, + &sink_events, d); + res = pw_stream_connect(d->sink, PW_DIRECTION_INPUT, PW_ID_ANY, + PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS, + audio_params, 1); + if (res < 0) + return res; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + control_params[0] = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); + d->control = pw_stream_new(d->core, "control monitor", + pw_properties_new( + PW_KEY_NODE_NAME, "control-monitor", + PW_KEY_MEDIA_CLASS, "Stream/Input/Data", + PW_KEY_FORMAT_DSP, "8 bit raw control", + NULL)); + if (d->control == NULL) + return -errno; + pw_stream_add_listener(d->control, &d->control_listener, + &control_events, d); + return pw_stream_connect(d->control, PW_DIRECTION_INPUT, PW_ID_ANY, + PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS, + control_params, 1); +} + +static int setup_links(struct data *d) +{ + int res; + + res = create_link(d, "audio-source", "effect_input.test-filter-chain", + &d->source_link); + if (res < 0) + return res; + res = create_link(d, "effect_output.test-filter-chain", "audio-sink", + &d->sink_link); + if (res < 0) + return res; + res = create_link(d, "notify.test-filter-chain", "control-monitor", + &d->control_link); + if (res < 0) + return res; + + pw_proxy_add_object_listener(d->source_link, &d->source_link_listener, + &link_events, &d->source_link_ready); + pw_proxy_add_object_listener(d->sink_link, &d->sink_link_listener, + &link_events, &d->sink_link_ready); + pw_proxy_add_object_listener(d->control_link, &d->control_link_listener, + &link_events, &d->control_link_ready); + return 0; +} + +static pid_t start_daemon(const char *conf_dir, const char *runtime_dir) +{ + pid_t pid = fork(); + + if (pid == 0) { + setenv("PIPEWIRE_CONFIG_DIR", conf_dir, 1); + setenv("PIPEWIRE_RUNTIME_DIR", runtime_dir, 1); + unsetenv("PIPEWIRE_REMOTE"); + unsetenv("PIPEWIRE_CORE"); + execl(BUILD_ROOT "/src/daemon/pipewire", + BUILD_ROOT "/src/daemon/pipewire", + "-c", "pipewire.conf", + (char *)NULL); + _exit(127); + } + return pid; +} + +static void stop_daemon(pid_t pid) +{ + int status; + + if (pid <= 0) + return; + kill(pid, SIGTERM); + waitpid(pid, &status, 0); + if (cleanup_daemon_pid == pid) + cleanup_daemon_pid = 0; +} + +static void cleanup_daemon(void) +{ + stop_daemon(cleanup_daemon_pid); +} + +PWTEST(filter_chain_exported_control_stream) +{ + const char *tmpdir = getenv("TMPDIR"); + char *template = NULL, *dir, *runtime_dir = NULL, *daemon_path = NULL, + *client_path = NULL, *socket_path = NULL; + struct data d; + pid_t daemon; + int res; + + if (getenv("MESON_EXE_WRAPPER") != NULL) { + fprintf(stderr, "skipping live daemon test under Meson exe wrapper\n"); + return PWTEST_SKIP; + } + + if (tmpdir == NULL) + tmpdir = "/tmp"; + pwtest_int_ge(asprintf(&template, "%s/pw-filter-chain-test-XXXXXX", tmpdir), 0); + dir = mkdtemp(template); + pwtest_ptr_notnull(dir); + pwtest_int_ge(asprintf(&runtime_dir, "%s/run", dir), 0); + pwtest_errno_ok(mkdir(runtime_dir, 0700)); + pwtest_int_ge(asprintf(&daemon_path, "%s/pipewire.conf", dir), 0); + pwtest_int_ge(asprintf(&client_path, "%s/client.conf", dir), 0); + pwtest_neg_errno_ok(write_file(daemon_path, daemon_conf)); + pwtest_neg_errno_ok(write_file(client_path, client_conf)); + + daemon = start_daemon(dir, runtime_dir); + pwtest_int_gt(daemon, 0); + cleanup_daemon_pid = daemon; + atexit(cleanup_daemon); + pwtest_int_ge(asprintf(&socket_path, "%s/pipewire-0", runtime_dir), 0); + for (uint32_t i = 0; i < 50 && access(socket_path, F_OK) < 0; i++) + usleep(100000); + pwtest_errno_ok(access(socket_path, F_OK)); + + setenv("PIPEWIRE_CONFIG_DIR", dir, 1); + setenv("PIPEWIRE_RUNTIME_DIR", runtime_dir, 1); + setenv("PIPEWIRE_REMOTE", "pipewire-0", 1); + + spa_zero(d); + pw_init(0, NULL); + d.loop = pw_main_loop_new(NULL); + pwtest_ptr_notnull(d.loop); + d.context = pw_context_new(pw_main_loop_get_loop(d.loop), NULL, 0); + pwtest_ptr_notnull(d.context); + d.core = pw_context_connect(d.context, NULL, 0); + pwtest_ptr_notnull(d.core); + d.registry = pw_core_get_registry(d.core, PW_VERSION_REGISTRY, 0); + pwtest_ptr_notnull(d.registry); + pw_registry_add_listener(d.registry, &d.registry_listener, + ®istry_events, &d); + + res = setup_streams(&d); + pwtest_neg_errno_ok(res); + iterate_streams_ready(&d); + pwtest_bool_true(d.source_ready); + pwtest_bool_true(d.sink_ready); + pwtest_bool_true(d.control_ready); + iterate_nodes_ready(&d); + pwtest_bool_true(d.audio_source_ready); + pwtest_bool_true(d.audio_sink_ready); + pwtest_bool_true(d.control_monitor_ready); + pwtest_bool_true(d.effect_input_ready); + pwtest_bool_true(d.effect_output_ready); + pwtest_bool_true(d.notify_ready); + + res = setup_links(&d); + pwtest_neg_errno_ok(res); + iterate_links_ready(&d); + pwtest_bool_true(d.source_link_ready); + pwtest_bool_true(d.sink_link_ready); + pwtest_bool_true(d.control_link_ready); + pwtest_bool_true(d.source_streaming); + pwtest_bool_true(d.sink_streaming); + pwtest_bool_true(d.control_streaming); + + iterate_notify(&d); + pwtest_bool_true(d.got_notify); + pwtest_double_gt(d.notify_value, 0.0); + + if (d.source_link) + pw_proxy_destroy(d.source_link); + if (d.sink_link) + pw_proxy_destroy(d.sink_link); + if (d.control_link) + pw_proxy_destroy(d.control_link); + if (d.source) + pw_stream_destroy(d.source); + if (d.sink) + pw_stream_destroy(d.sink); + if (d.control) + pw_stream_destroy(d.control); + if (d.registry) + pw_proxy_destroy((struct pw_proxy *)d.registry); + if (d.core) + pw_core_disconnect(d.core); + if (d.context) + pw_context_destroy(d.context); + if (d.loop) + pw_main_loop_destroy(d.loop); + pw_deinit(); + stop_daemon(daemon); + unlink(daemon_path); + unlink(client_path); + rmdir(runtime_dir); + rmdir(dir); + free(socket_path); + free(client_path); + free(daemon_path); + free(runtime_dir); + free(template); + + return PWTEST_PASS; +} + +PWTEST_SUITE(filter_chain) +{ + pwtest_add(filter_chain_exported_control_stream, PWTEST_NOARG); + + return PWTEST_PASS; +} diff --git a/test/test-filter-graph.c b/test/test-filter-graph.c new file mode 100644 index 000000000..e76533769 --- /dev/null +++ b/test/test-filter-graph.c @@ -0,0 +1,344 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 PipeWire authors */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" +#include "pwtest.h" + +#include + +#include + +#include +#include +#include +#include +#include +#include + +#define EXPORT_RAMP_CURRENT "\"ramp:Current\" " +#define EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \ + EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \ + EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \ + EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT \ + EXPORT_RAMP_CURRENT EXPORT_RAMP_CURRENT +#define EXPORT_RAMP_CURRENT_100 EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 +#define EXPORT_RAMP_CURRENT_160 EXPORT_RAMP_CURRENT_100 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 EXPORT_RAMP_CURRENT_10 \ + EXPORT_RAMP_CURRENT_10 + +static bool is_current_notify_info(struct spa_pod *pod) +{ + const char *name = NULL; + + if (pod == NULL || !spa_pod_is_object(pod)) + return false; + if (SPA_POD_OBJECT_ID(pod) != SPA_PARAM_PropInfo) + return false; + if (spa_pod_parse_object(pod, SPA_TYPE_OBJECT_PropInfo, NULL, + SPA_PROP_INFO_name, SPA_POD_OPT_String(&name)) < 0) + return false; + return spa_streq(name, "ramp:Current"); +} + +static bool has_current_notify(struct spa_pod *pod) +{ + struct spa_pod_object *obj = (struct spa_pod_object *)pod; + const struct spa_pod_prop *prop; + + SPA_POD_OBJECT_FOREACH(obj, prop) { + struct spa_pod_parser prs; + struct spa_pod_frame f; + + if (prop->key != SPA_PROP_params) + continue; + + spa_pod_parser_pod(&prs, &prop->value); + if (spa_pod_parser_push_struct(&prs, &f) < 0) + return false; + + while (true) { + const char *name; + float value; + + if (spa_pod_parser_get_string(&prs, &name) < 0) + break; + if (spa_pod_parser_get_float(&prs, &value) < 0) + break; + if (spa_streq(name, "ramp:Current") && value > 0.0f) + return true; + } + } + return false; +} + +PWTEST(export_controls) +{ + struct pw_loop *loop; + struct pw_context *context; + struct pw_properties *props; + struct spa_handle *handle; + struct spa_filter_graph *graph; + struct spa_pod_dynamic_builder b; + struct spa_pod *pod = NULL; + float out[64]; + void *outputs[1] = { out }; + int res; + + pw_init(0, NULL); + + loop = pw_loop_new(NULL); + pwtest_ptr_notnull(loop); + + context = pw_context_new(loop, pw_properties_new( + PW_KEY_CONFIG_NAME, "null", + "context.modules.allow-empty", "true", + "support.dbus", "false", + NULL), 0); + pwtest_ptr_notnull(context); + + pw_context_add_spa_lib(context, + "^filter\\.graph\\.plugin\\.builtin$", + "filter-graph/libspa-filter-graph-plugin-builtin"); + pw_context_add_spa_lib(context, + "^filter\\.graph$", + "filter-graph/libspa-filter-graph"); + + props = pw_properties_new( + "clock.quantum-limit", "128", + "filter-graph.n_inputs", "0", + "filter-graph.n_outputs", "1", + SPA_KEY_LIBRARY_NAME, "filter-graph/libspa-filter-graph", + "filter.graph", + "{ nodes = [ { type = builtin name = ramp label = ramp " + "control = { Start = 0.0 Stop = 1.0 \"Duration (s)\" = 0.001 } } ] " + "outputs = [ \"ramp:Out\" ] " + "export.controls = [ \"ramp:Current\" ] }", + NULL); + pwtest_ptr_notnull(props); + + handle = pw_context_load_spa_handle(context, "filter.graph", &props->dict); + pwtest_ptr_notnull(handle); + + res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_FilterGraph, + (void **)&graph); + pwtest_neg_errno_ok(res); + pwtest_ptr_notnull(graph); + + spa_pod_dynamic_builder_init(&b, NULL, 0, 4096); + res = spa_filter_graph_enum_notify_info(graph, 0, &b.b, &pod); + pwtest_int_eq(res, 1); + pwtest_ptr_notnull(pod); + pwtest_bool_true(is_current_notify_info(pod)); + spa_pod_dynamic_builder_clean(&b); + + spa_pod_dynamic_builder_init(&b, NULL, 0, 4096); + res = spa_filter_graph_enum_notify_info(graph, 1, &b.b, &pod); + pwtest_int_eq(res, 0); + spa_pod_dynamic_builder_clean(&b); + + res = spa_filter_graph_activate(graph, + &SPA_DICT_ITEMS(SPA_DICT_ITEM(PW_KEY_AUDIO_RATE, "48000"))); + pwtest_neg_errno_ok(res); + + spa_zero(out); + res = spa_filter_graph_process(graph, NULL, outputs, SPA_N_ELEMENTS(out)); + pwtest_neg_errno_ok(res); + + spa_pod_dynamic_builder_init(&b, NULL, 0, 4096); + res = spa_filter_graph_get_notify(graph, &b.b, &pod); + pwtest_int_eq(res, 1); + pwtest_ptr_notnull(pod); + pwtest_bool_true(has_current_notify(pod)); + spa_pod_dynamic_builder_clean(&b); + + spa_filter_graph_deactivate(graph); + pw_unload_spa_handle(handle); + pw_properties_free(props); + pw_context_destroy(context); + pw_loop_destroy(loop); + pw_deinit(); + + return PWTEST_PASS; +} + +PWTEST(export_controls_large_payload) +{ + struct pw_loop *loop; + struct pw_context *context; + struct pw_properties *props; + struct spa_handle *handle; + struct spa_filter_graph *graph; + struct spa_pod_dynamic_builder b; + struct spa_pod *pod = NULL; + float out[64]; + void *outputs[1] = { out }; + int res; + + pw_init(0, NULL); + + loop = pw_loop_new(NULL); + pwtest_ptr_notnull(loop); + + context = pw_context_new(loop, pw_properties_new( + PW_KEY_CONFIG_NAME, "null", + "context.modules.allow-empty", "true", + "support.dbus", "false", + NULL), 0); + pwtest_ptr_notnull(context); + + pw_context_add_spa_lib(context, + "^filter\\.graph\\.plugin\\.builtin$", + "filter-graph/libspa-filter-graph-plugin-builtin"); + pw_context_add_spa_lib(context, + "^filter\\.graph$", + "filter-graph/libspa-filter-graph"); + + props = pw_properties_new( + "clock.quantum-limit", "128", + "filter-graph.n_inputs", "0", + "filter-graph.n_outputs", "1", + SPA_KEY_LIBRARY_NAME, "filter-graph/libspa-filter-graph", + "filter.graph", + "{ nodes = [ { type = builtin name = ramp label = ramp " + "control = { Start = 0.0 Stop = 1.0 \"Duration (s)\" = 0.001 } } ] " + "outputs = [ \"ramp:Out\" ] " + "export.controls = [ " EXPORT_RAMP_CURRENT_160 "] }", + NULL); + pwtest_ptr_notnull(props); + + handle = pw_context_load_spa_handle(context, "filter.graph", &props->dict); + pwtest_ptr_notnull(handle); + + res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_FilterGraph, + (void **)&graph); + pwtest_neg_errno_ok(res); + pwtest_ptr_notnull(graph); + + res = spa_filter_graph_activate(graph, + &SPA_DICT_ITEMS(SPA_DICT_ITEM(PW_KEY_AUDIO_RATE, "48000"))); + pwtest_neg_errno_ok(res); + + spa_zero(out); + res = spa_filter_graph_process(graph, NULL, outputs, SPA_N_ELEMENTS(out)); + pwtest_neg_errno_ok(res); + + spa_pod_dynamic_builder_init(&b, NULL, 0, 4096); + res = spa_filter_graph_get_notify(graph, &b.b, &pod); + pwtest_int_eq(res, 1); + pwtest_ptr_notnull(pod); + pwtest_int_gt((int)b.b.state.offset, 4096); + pwtest_bool_true(has_current_notify(pod)); + spa_pod_dynamic_builder_clean(&b); + + spa_filter_graph_deactivate(graph); + pw_unload_spa_handle(handle); + pw_properties_free(props); + pw_context_destroy(context); + pw_loop_destroy(loop); + pw_deinit(); + + return PWTEST_PASS; +} + +PWTEST(export_controls_duplicated_graph_uses_scalar_values) +{ + struct pw_loop *loop; + struct pw_context *context; + struct pw_properties *props; + struct spa_handle *handle; + struct spa_filter_graph *graph; + struct spa_pod_dynamic_builder b; + struct spa_pod *pod = NULL; + float in[2][64], out[2][64]; + const void *inputs[2] = { in[0], in[1] }; + void *outputs[2] = { out[0], out[1] }; + int res; + + pw_init(0, NULL); + + loop = pw_loop_new(NULL); + pwtest_ptr_notnull(loop); + + context = pw_context_new(loop, pw_properties_new( + PW_KEY_CONFIG_NAME, "null", + "context.modules.allow-empty", "true", + "support.dbus", "false", + NULL), 0); + pwtest_ptr_notnull(context); + + pw_context_add_spa_lib(context, + "^filter\\.graph\\.plugin\\.builtin$", + "filter-graph/libspa-filter-graph-plugin-builtin"); + pw_context_add_spa_lib(context, + "^filter\\.graph$", + "filter-graph/libspa-filter-graph"); + + props = pw_properties_new( + "clock.quantum-limit", "128", + "filter-graph.n_inputs", "2", + "filter-graph.n_outputs", "2", + SPA_KEY_LIBRARY_NAME, "filter-graph/libspa-filter-graph", + "filter.graph", + "{ nodes = [ " + "{ type = builtin name = copy label = copy } " + "{ type = builtin name = ramp label = ramp " + "control = { Start = 0.0 Stop = 1.0 \"Duration (s)\" = 0.001 } } ] " + "inputs = [ \"copy:In\" ] " + "outputs = [ \"copy:Out\" ] " + "export.controls = [ \"ramp:Current\" ] }", + NULL); + pwtest_ptr_notnull(props); + + handle = pw_context_load_spa_handle(context, "filter.graph", &props->dict); + pwtest_ptr_notnull(handle); + + res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_FilterGraph, + (void **)&graph); + pwtest_neg_errno_ok(res); + pwtest_ptr_notnull(graph); + + res = spa_filter_graph_activate(graph, + &SPA_DICT_ITEMS(SPA_DICT_ITEM(PW_KEY_AUDIO_RATE, "48000"))); + pwtest_neg_errno_ok(res); + + spa_zero(in); + spa_zero(out); + res = spa_filter_graph_process(graph, inputs, outputs, SPA_N_ELEMENTS(out[0])); + pwtest_neg_errno_ok(res); + + spa_pod_dynamic_builder_init(&b, NULL, 0, 4096); + res = spa_filter_graph_get_notify(graph, &b.b, &pod); + pwtest_int_eq(res, 1); + pwtest_ptr_notnull(pod); + pwtest_bool_true(has_current_notify(pod)); + spa_pod_dynamic_builder_clean(&b); + + spa_filter_graph_deactivate(graph); + pw_unload_spa_handle(handle); + pw_properties_free(props); + pw_context_destroy(context); + pw_loop_destroy(loop); + pw_deinit(); + + return PWTEST_PASS; +} + +PWTEST_SUITE(filter_graph) +{ + pwtest_add(export_controls, PWTEST_NOARG); + pwtest_add(export_controls_large_payload, PWTEST_NOARG); + pwtest_add(export_controls_duplicated_graph_uses_scalar_values, PWTEST_NOARG); + + return PWTEST_PASS; +} + +#undef EXPORT_RAMP_CURRENT_160 +#undef EXPORT_RAMP_CURRENT_100 +#undef EXPORT_RAMP_CURRENT_10 +#undef EXPORT_RAMP_CURRENT