Merge branch 'mini-eq/export-notify-controls' into 'master'

filter-chain: export notify controls

See merge request pipewire/pipewire!2823
This commit is contained in:
bhack 2026-06-11 02:22:16 +00:00
commit b6a90fe3c1
6 changed files with 1618 additions and 11 deletions

View file

@ -7,6 +7,7 @@
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
@ -18,6 +19,7 @@
#include <spa/utils/overflow.h>
#include <spa/param/audio/raw-json.h>
#include <spa/pod/dynamic.h>
#include <spa/control/control.h>
#include <spa/filter-graph/filter-graph.h>
#include <pipewire/impl.h>
@ -88,6 +90,8 @@ extern struct spa_handle_factory spa_filter_graph_factory;
* playback.volumes = [
* { control = <portname> min = <value> max = <value> scale = <scale> } ...
* ]
* export.controls = [ <portname> ... ]
* export.interval.ms = 33
* }
*\endcode
*
@ -217,6 +221,29 @@ extern struct spa_handle_factory spa_filter_graph_factory;
* default this is linear but it can be set to cubic when the control applies a
* cubic transformation.
*
* ### Exported Controls
*
* Output control ports can be exported as a PipeWire control stream for metering
* and status monitoring. The port names use the same `<node_name>:<port_name>`
* syntax as links. Exported values are emitted as `SPA_CONTROL_Properties` on a
* read-only `application/control` output stream. Each exported port is emitted
* as a scalar value. When the graph is duplicated to match the channel count,
* the value from the first graph instance is exported.
*
* The `export.interval.ms` value controls how often values are emitted. The default
* is 33ms. A value of 0 emits on every processed quantum when a control stream
* buffer is available.
*
*\code{.unparsed}
* filter.graph = {
* nodes = [
* { type = builtin name = ramp label = ramp }
* ]
* export.controls = [ "ramp:Current" ]
* export.interval.ms = 33
* }
*\endcode
*
* ## Builtin filters
*
* There are some useful builtin filters available. The type should be `builtin` and
@ -1234,6 +1261,7 @@ static const struct spa_dict_item module_props[] = {
#define DEFAULT_RATE 48000
#define MAX_DATAS 1024u
#define CONTROL_STREAM_MIN_BUFFER_SIZE 4096u
struct impl {
struct pw_context *context;
@ -1257,6 +1285,14 @@ struct impl {
struct spa_hook playback_listener;
struct spa_audio_info_raw playback_info;
struct pw_properties *control_props;
struct pw_stream *control;
struct spa_hook control_listener;
uint32_t control_buffer_size;
uint64_t control_interval;
uint64_t control_last_time;
bool control_buffer_size_warned;
struct spa_audio_info_raw info;
struct spa_io_position *position;
@ -1283,6 +1319,80 @@ static void capture_destroy(void *d)
impl->capture = NULL;
}
static int emit_control_stream(struct impl *impl)
{
struct pw_buffer *buf;
struct spa_buffer *buffer;
struct spa_data *bd;
struct spa_pod_builder b;
struct spa_pod_frame f;
uint64_t now = 0;
int res;
if (impl->control == NULL || !impl->graph_active)
return 0;
if (impl->position)
now = impl->position->clock.nsec;
if (impl->control_interval > 0 &&
now > 0 &&
impl->control_last_time > 0 &&
now >= impl->control_last_time &&
now - impl->control_last_time < impl->control_interval)
return 0;
buf = pw_stream_dequeue_buffer(impl->control);
if (buf == NULL)
return 0;
buffer = buf->buffer;
if (buffer->n_datas == 0)
goto queue_empty;
bd = &buffer->datas[0];
if (bd->data == NULL || bd->chunk == NULL ||
bd->maxsize < impl->control_buffer_size) {
if (!impl->control_buffer_size_warned) {
pw_log_warn("%p: control stream buffer too small: %u < %u",
impl, bd->maxsize, impl->control_buffer_size);
impl->control_buffer_size_warned = true;
}
goto queue_empty;
}
spa_pod_builder_init(&b, bd->data, bd->maxsize);
spa_pod_builder_push_sequence(&b, &f, 0);
spa_pod_builder_control(&b, 0, SPA_CONTROL_Properties);
res = spa_filter_graph_get_notify(impl->graph, &b, NULL);
if (res < 0) {
if (!impl->control_buffer_size_warned) {
pw_log_warn("%p: can't build control stream buffer: %s",
impl, spa_strerror(res));
impl->control_buffer_size_warned = true;
}
goto queue_empty;
}
spa_pod_builder_pop(&b, &f);
bd->chunk->offset = 0;
bd->chunk->size = b.state.offset;
bd->chunk->stride = 0;
pw_stream_queue_buffer(impl->control, buf);
impl->control_last_time = now;
impl->control_buffer_size_warned = false;
return 0;
queue_empty:
if (buffer->n_datas > 0 && buffer->datas[0].chunk != NULL) {
bd = &buffer->datas[0];
bd->chunk->offset = 0;
bd->chunk->size = 0;
bd->chunk->stride = 0;
}
pw_stream_queue_buffer(impl->control, buf);
return 0;
}
static void do_process(struct impl *impl)
{
struct pw_buffer *in, *out;
@ -1354,6 +1464,7 @@ static void do_process(struct impl *impl)
cout[n_out++] = NULL;
spa_filter_graph_process(impl->graph, cin, cout, data_size / sizeof(float));
emit_control_stream(impl);
}
if (in != NULL)
@ -1682,6 +1793,169 @@ static const struct pw_stream_events out_stream_events = {
.param_changed = playback_param_changed,
};
static void control_destroy(void *d)
{
struct impl *impl = d;
spa_hook_remove(&impl->control_listener);
impl->control = NULL;
}
static void control_state_changed(void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct impl *impl = data;
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
pw_log_info("module %p: control stream unconnected", impl);
break;
case PW_STREAM_STATE_ERROR:
pw_log_info("module %p: control stream error: %s", impl, error);
break;
default:
break;
}
}
static const struct pw_stream_events control_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = control_destroy,
.state_changed = control_state_changed,
};
static int get_control_stream_buffer_size(struct impl *impl, uint32_t *size)
{
struct spa_pod_dynamic_builder b;
struct spa_pod_frame f;
struct spa_pod *pod;
int res;
spa_pod_dynamic_builder_init(&b, NULL, 0, CONTROL_STREAM_MIN_BUFFER_SIZE);
res = spa_pod_builder_push_sequence(&b.b, &f, 0);
if (res >= 0)
res = spa_pod_builder_control(&b.b, 0, SPA_CONTROL_Properties);
if (res >= 0)
res = spa_filter_graph_get_notify(impl->graph, &b.b, NULL);
pod = spa_pod_builder_pop(&b.b, &f);
if (res >= 0 && pod == NULL)
res = -ENOSPC;
if (res >= 0 && b.b.state.offset > INT_MAX)
res = -EOVERFLOW;
if (res >= 0)
*size = SPA_MAX((uint32_t)b.b.state.offset,
CONTROL_STREAM_MIN_BUFFER_SIZE);
spa_pod_dynamic_builder_clean(&b);
return res;
}
static int setup_control_stream(struct impl *impl)
{
struct spa_pod_dynamic_builder b;
struct pw_array offsets;
const struct spa_pod **params = NULL;
uint32_t i, n_params, *offs;
int res = 0;
spa_pod_dynamic_builder_init(&b, NULL, 0, CONTROL_STREAM_MIN_BUFFER_SIZE);
pw_array_init(&offsets, 512);
for (i = 0;; i++) {
uint32_t save = b.b.state.offset;
res = spa_filter_graph_enum_notify_info(impl->graph, i, &b.b, NULL);
if (res != 1) {
if (res < 0)
goto done;
break;
}
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = save;
}
if (i == 0)
goto done;
if ((res = get_control_stream_buffer_size(impl, &impl->control_buffer_size)) < 0)
goto done;
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = b.b.state.offset;
spa_pod_builder_add_object(&b.b,
SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control));
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = b.b.state.offset;
spa_pod_builder_add_object(&b.b,
SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, 32),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
impl->control_buffer_size, impl->control_buffer_size, INT_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(1));
n_params = pw_array_get_len(&offsets, uint32_t);
params = calloc(n_params, sizeof(struct spa_pod *));
if (params == NULL) {
res = -errno;
goto done;
}
offs = offsets.data;
for (i = 0; i < n_params; i++)
params[i] = spa_pod_builder_deref(&b.b, offs[i]);
impl->control_props = pw_properties_new(
PW_KEY_MEDIA_TYPE, "Control",
PW_KEY_MEDIA_CATEGORY, "Monitor",
PW_KEY_MEDIA_ROLE, "DSP",
PW_KEY_MEDIA_CLASS, "Stream/Output/Data",
PW_KEY_NODE_AUTOCONNECT, "false",
PW_KEY_FORMAT_DSP, "8 bit raw control",
NULL);
if (impl->control_props == NULL) {
res = -errno;
goto done;
}
pw_properties_setf(impl->control_props, PW_KEY_NODE_NAME, "notify.%s",
pw_properties_get(impl->props, PW_KEY_NODE_NAME));
pw_properties_setf(impl->control_props, PW_KEY_MEDIA_NAME, "%s notify",
pw_properties_get(impl->props, PW_KEY_NODE_DESCRIPTION));
impl->control = pw_stream_new(impl->core,
"filter notify", impl->control_props);
impl->control_props = NULL;
if (impl->control == NULL) {
res = -errno;
goto done;
}
pw_stream_add_listener(impl->control,
&impl->control_listener,
&control_stream_events, impl);
res = pw_stream_connect(impl->control,
PW_DIRECTION_OUTPUT,
PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS,
params, n_params);
done:
free(params);
pw_array_clear(&offsets);
spa_pod_dynamic_builder_clean(&b);
return res < 0 ? res : 0;
}
static int setup_streams(struct impl *impl)
{
int res = 0;
@ -1805,6 +2079,8 @@ static int setup_streams(struct impl *impl)
flags,
params, n_params);
}
if (res >= 0)
res = setup_control_stream(impl);
spa_pod_dynamic_builder_clean(&b);
done:
@ -1871,6 +2147,10 @@ static void graph_info(void *object, const struct spa_filter_graph_info *info)
pw_log_info("using default outputs %d", val);
impl->playback_info.channels = val;
}
else if (spa_streq(k, "export.interval.ms") &&
spa_atou32(s, &val, 0)) {
impl->control_interval = (uint64_t)val * SPA_NSEC_PER_MSEC;
}
}
if (impl->capture_info.channels == impl->playback_info.channels) {
copy_position(&impl->capture_info, &impl->playback_info);
@ -1952,11 +2232,15 @@ static void impl_destroy(struct impl *impl)
pw_stream_disconnect(impl->capture);
if (impl->playback)
pw_stream_disconnect(impl->playback);
if (impl->control)
pw_stream_disconnect(impl->control);
if (impl->capture)
pw_stream_destroy(impl->capture);
if (impl->playback)
pw_stream_destroy(impl->playback);
if (impl->control)
pw_stream_destroy(impl->control);
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
@ -1966,6 +2250,7 @@ static void impl_destroy(struct impl *impl)
pw_properties_free(impl->capture_props);
pw_properties_free(impl->playback_props);
pw_properties_free(impl->control_props);
pw_properties_free(impl->props);
free(impl);