filter-chain: support no input or output streams

When the graph has no inputs and the channels is set to 0, don't create
a capture stream. Likewise, don't create a playback stream when there
are no graph outputs and the output channels is 0.

You can use this to make a sine source or a null sink.
This commit is contained in:
Wim Taymans 2026-01-21 16:15:08 +01:00
parent 7f2cce1021
commit 56a4ab5234

View file

@ -193,6 +193,10 @@ extern struct spa_handle_factory spa_filter_graph_factory;
* graph will then be duplicated as many times to match the number of input/output
* channels of the streams.
*
* If the graph has no inputs and the capture channels is set as 0, only the
* playback stream will be created. Likewise, if there are no outputs and the
* playback channels is 0, there will be no capture stream created.
*
* ### Volumes
*
* Normally the volume of the sink/source is handled by the stream software volume.
@ -1249,92 +1253,105 @@ static void capture_destroy(void *d)
impl->capture = NULL;
}
static void capture_process(void *d)
static void do_process(struct impl *impl)
{
struct impl *impl = d;
int res;
if ((res = pw_stream_trigger_process(impl->playback)) < 0) {
pw_log_debug("playback trigger error: %s", spa_strerror(res));
struct pw_buffer *in, *out;
uint32_t i, n_in = 0, n_out = 0, data_size = 0;
struct spa_data *bd;
const void *cin[128];
void *cout[128];
in = out = NULL;
if (impl->capture) {
while (true) {
struct pw_buffer *t;
if ((t = pw_stream_dequeue_buffer(impl->capture)) == NULL)
break;
/* playback part is not ready, consume, discard and recycle
* the capture buffers */
pw_stream_queue_buffer(impl->capture, t);
if (in)
pw_stream_queue_buffer(impl->capture, in);
in = t;
}
if (in == NULL) {
pw_log_debug("%p: out of capture buffers: %m", impl);
} else {
for (i = 0; i < in->buffer->n_datas; i++) {
uint32_t offs, size;
bd = &in->buffer->datas[i];
offs = SPA_MIN(bd->chunk->offset, bd->maxsize);
size = SPA_MIN(bd->chunk->size, bd->maxsize - offs);
cin[n_in++] = SPA_PTROFF(bd->data, offs, void);
data_size = i == 0 ? size : SPA_MIN(data_size, size);
}
}
}
if (impl->playback) {
out = pw_stream_dequeue_buffer(impl->playback);
if (out == NULL) {
pw_log_debug("%p: out of playback buffers: %m", impl);
} else {
if (data_size == 0)
data_size = out->requested * sizeof(float);
for (i = 0; i < out->buffer->n_datas; i++) {
bd = &out->buffer->datas[i];
data_size = SPA_MIN(data_size, bd->maxsize);
cout[n_out++] = bd->data;
bd->chunk->offset = 0;
bd->chunk->size = data_size;
bd->chunk->stride = sizeof(float);
}
}
pw_log_trace_fp("%p: size:%d requested:%"PRIu64, impl,
data_size, out->requested);
}
for (; n_in < impl->n_inputs; i++)
cin[n_in++] = NULL;
for (; n_out < impl->n_outputs; i++)
cout[n_out++] = NULL;
if (impl->graph_active)
spa_filter_graph_process(impl->graph, cin, cout, data_size / sizeof(float));
if (in != NULL)
pw_stream_queue_buffer(impl->capture, in);
if (out != NULL)
pw_stream_queue_buffer(impl->playback, out);
}
static void capture_process(void *d)
{
struct impl *impl = d;
int res;
if (impl->playback) {
if ((res = pw_stream_trigger_process(impl->playback)) < 0) {
pw_log_debug("playback trigger error: %s", spa_strerror(res));
while (impl->capture) {
struct pw_buffer *t;
if ((t = pw_stream_dequeue_buffer(impl->capture)) == NULL)
break;
/* playback part is not ready, consume, discard and recycle
* the capture buffers */
pw_stream_queue_buffer(impl->capture, t);
}
}
} else {
do_process(impl);
}
}
static void playback_process(void *d)
{
struct impl *impl = d;
struct pw_buffer *in, *out;
uint32_t i, data_size = 0;
int32_t stride = 0;
struct spa_data *bd;
const void *cin[128];
void *cout[128];
in = NULL;
while (true) {
struct pw_buffer *t;
if ((t = pw_stream_dequeue_buffer(impl->capture)) == NULL)
break;
if (in)
pw_stream_queue_buffer(impl->capture, in);
in = t;
}
if (in == NULL)
pw_log_debug("%p: out of capture buffers: %m", impl);
if ((out = pw_stream_dequeue_buffer(impl->playback)) == NULL)
pw_log_debug("%p: out of playback buffers: %m", impl);
if (in == NULL || out == NULL)
goto done;
for (i = 0; i < in->buffer->n_datas; i++) {
uint32_t offs, size;
bd = &in->buffer->datas[i];
offs = SPA_MIN(bd->chunk->offset, bd->maxsize);
size = SPA_MIN(bd->chunk->size, bd->maxsize - offs);
cin[i] = SPA_PTROFF(bd->data, offs, void);
data_size = i == 0 ? size : SPA_MIN(data_size, size);
stride = SPA_MAX(stride, bd->chunk->stride);
}
for (; i < impl->n_inputs; i++)
cin[i] = NULL;
for (i = 0; i < out->buffer->n_datas; i++) {
bd = &out->buffer->datas[i];
data_size = SPA_MIN(data_size, bd->maxsize);
cout[i] = bd->data;
bd->chunk->offset = 0;
bd->chunk->size = data_size;
bd->chunk->stride = stride;
}
for (; i < impl->n_outputs; i++)
cout[i] = NULL;
pw_log_trace_fp("%p: stride:%d size:%d requested:%"PRIu64" (%"PRIu64")", impl,
stride, data_size, out->requested, out->requested * stride);
if (impl->graph_active)
spa_filter_graph_process(impl->graph, cin, cout, data_size / sizeof(float));
done:
if (in != NULL)
pw_stream_queue_buffer(impl->capture, in);
if (out != NULL)
pw_stream_queue_buffer(impl->playback, out);
do_process(impl);
}
static int activate_graph(struct impl *impl)
@ -1403,6 +1420,9 @@ static void update_latency(struct impl *impl, enum spa_direction direction, bool
struct pw_stream *s = direction == SPA_DIRECTION_OUTPUT ?
impl->playback : impl->capture;
if (s == NULL)
return;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
latency = impl->latency[direction];
spa_process_latency_info_add(&impl->process_latency, &latency);
@ -1459,10 +1479,13 @@ static void param_tag_changed(struct impl *impl, const struct spa_pod *param,
if (param == 0 || spa_tag_parse(param, &tag, &state) < 0)
return;
if (tag.direction == SPA_DIRECTION_INPUT)
pw_stream_update_params(impl->capture, params, 1);
else
pw_stream_update_params(impl->playback, params, 1);
if (tag.direction == SPA_DIRECTION_INPUT) {
if (impl->capture)
pw_stream_update_params(impl->capture, params, 1);
} else {
if (impl->playback)
pw_stream_update_params(impl->playback, params, 1);
}
}
static void capture_state_changed(void *data, enum pw_stream_state old,
@ -1539,8 +1562,7 @@ static void param_changed(struct impl *impl, uint32_t id, const struct spa_pod *
return;
error:
pw_stream_set_error(direction == SPA_DIRECTION_INPUT ? impl->capture : impl->playback,
res, "can't start graph: %s", spa_strerror(res));
pw_stream_set_error(stream, res, "can't start graph: %s", spa_strerror(res));
}
static void capture_param_changed(void *data, uint32_t id, const struct spa_pod *param)
@ -1599,7 +1621,7 @@ static void playback_state_changed(void *data, enum pw_stream_state old,
}
return;
error:
pw_stream_set_error(impl->capture, res, "can't start graph: %s",
pw_stream_set_error(impl->playback, res, "can't start graph: %s",
spa_strerror(res));
}
@ -1628,43 +1650,39 @@ static const struct pw_stream_events out_stream_events = {
static int setup_streams(struct impl *impl)
{
int res;
uint32_t i, n_params, *offs;
uint32_t i, n_params, *offs, flags;
struct pw_array offsets;
const struct spa_pod **params = NULL;
struct spa_pod_dynamic_builder b;
struct spa_filter_graph *graph = impl->graph;
impl->capture = pw_stream_new(impl->core,
"filter capture", impl->capture_props);
impl->capture_props = NULL;
if (impl->capture == NULL)
return -errno;
if (impl->capture_info.channels > 0) {
impl->capture = pw_stream_new(impl->core,
"filter capture", impl->capture_props);
impl->capture_props = NULL;
if (impl->capture == NULL)
return -errno;
pw_stream_add_listener(impl->capture,
&impl->capture_listener,
&in_stream_events, impl);
pw_stream_add_listener(impl->capture,
&impl->capture_listener,
&in_stream_events, impl);
}
impl->playback = pw_stream_new(impl->core,
"filter playback", impl->playback_props);
impl->playback_props = NULL;
if (impl->playback == NULL)
return -errno;
if (impl->playback_info.channels > 0) {
impl->playback = pw_stream_new(impl->core,
"filter playback", impl->playback_props);
impl->playback_props = NULL;
if (impl->playback == NULL)
return -errno;
pw_stream_add_listener(impl->playback,
&impl->playback_listener,
&out_stream_events, impl);
pw_stream_add_listener(impl->playback,
&impl->playback_listener,
&out_stream_events, impl);
}
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
pw_array_init(&offsets, 512);
if ((offs = pw_array_add(&offsets, sizeof(uint32_t))) == NULL) {
res = -errno;
goto done;
}
*offs = b.b.state.offset;
spa_format_audio_raw_build(&b.b,
SPA_PARAM_EnumFormat, &impl->capture_info);
for (i = 0;; i++) {
uint32_t save = b.b.state.offset;
if (spa_filter_graph_enum_prop_info(graph, i, &b.b, NULL) != 1)
@ -1687,7 +1705,7 @@ static int setup_streams(struct impl *impl)
res = -ENOMEM;
goto done;
}
if ((params = calloc(n_params, sizeof(struct spa_pod*))) == NULL) {
if ((params = calloc(n_params+1, sizeof(struct spa_pod*))) == NULL) {
res = -errno;
goto done;
}
@ -1696,32 +1714,44 @@ static int setup_streams(struct impl *impl)
for (i = 0; i < n_params; i++)
params[i] = spa_pod_builder_deref(&b.b, offs[i]);
res = pw_stream_connect(impl->capture,
PW_DIRECTION_INPUT,
PW_ID_ANY,
PW_STREAM_FLAG_AUTOCONNECT |
if (impl->capture) {
params[n_params++] = spa_format_audio_raw_build(&b.b,
SPA_PARAM_EnumFormat, &impl->capture_info);
flags = PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS |
PW_STREAM_FLAG_ASYNC,
params, n_params);
PW_STREAM_FLAG_RT_PROCESS;
if (impl->playback)
flags |= PW_STREAM_FLAG_ASYNC;
spa_pod_dynamic_builder_clean(&b);
if (res < 0)
goto done;
res = pw_stream_connect(impl->capture,
PW_DIRECTION_INPUT,
PW_ID_ANY,
flags,
params, n_params);
n_params = 0;
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
params[n_params++] = spa_format_audio_raw_build(&b.b,
SPA_PARAM_EnumFormat, &impl->playback_info);
spa_pod_dynamic_builder_clean(&b);
if (res < 0)
goto done;
res = pw_stream_connect(impl->playback,
PW_DIRECTION_OUTPUT,
PW_ID_ANY,
PW_STREAM_FLAG_AUTOCONNECT |
n_params = 0;
spa_pod_dynamic_builder_init(&b, NULL, 0, 4096);
}
if (impl->playback) {
params[n_params++] = spa_format_audio_raw_build(&b.b,
SPA_PARAM_EnumFormat, &impl->playback_info);
flags = PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS |
PW_STREAM_FLAG_TRIGGER,
params, n_params);
PW_STREAM_FLAG_RT_PROCESS;
if (impl->capture)
flags |= PW_STREAM_FLAG_TRIGGER;
res = pw_stream_connect(impl->playback,
PW_DIRECTION_OUTPUT,
PW_ID_ANY,
flags,
params, n_params);
}
spa_pod_dynamic_builder_clean(&b);
done:
@ -1801,7 +1831,11 @@ static void graph_props_changed(void *object, enum spa_direction direction)
spa_pod_dynamic_builder_init(&b, buffer, sizeof(buffer), 4096);
spa_filter_graph_get_props(graph, &b.b, (struct spa_pod **)&params[0]);
pw_stream_update_params(impl->capture, params, 1);
if (impl->capture)
pw_stream_update_params(impl->capture, params, 1);
else if (impl->playback)
pw_stream_update_params(impl->playback, params, 1);
spa_pod_dynamic_builder_clean(&b);
}