filter-graph: add a pipe filter

This commit is contained in:
Wim Taymans 2025-06-06 13:00:29 +02:00
parent 420c510d47
commit 4f3a2d723b
2 changed files with 253 additions and 0 deletions

View file

@ -11,9 +11,13 @@
#endif
#include <unistd.h>
#include <limits.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <spa/utils/json.h>
#include <spa/utils/result.h>
#include <spa/utils/cleanup.h>
#include <spa/support/cpu.h>
#include <spa/support/log.h>
#include <spa/plugins/audioconvert/resample.h>
@ -2557,6 +2561,221 @@ static const struct spa_fga_descriptor debug_desc = {
.cleanup = builtin_cleanup,
};
/* pipe */
struct pipe_impl {
struct plugin *plugin;
struct spa_log *log;
struct spa_fga_dsp *dsp;
unsigned long rate;
float *port[3];
float latency;
int write_fd;
int read_fd;
size_t written;
size_t read;
};
static int do_exec(struct pipe_impl *impl, const char *command)
{
int pid, res, len, argc = 0;
char *argv[512];
struct spa_json it[2];
const char *value;
int stdin_pipe[2];
int stdout_pipe[2];
if (spa_json_begin_array_relax(&it[0], command, strlen(command)) <= 0)
return -EINVAL;
while ((len = spa_json_next(&it[0], &value)) > 0) {
char *s;
if ((s = malloc(len+1)) == NULL)
return -errno;
spa_json_parse_stringn(value, len, s, len+1);
argv[argc++] = s;
}
argv[argc++] = NULL;
pipe2(stdin_pipe, 0);
pipe2(stdout_pipe, 0);
impl->write_fd = stdin_pipe[1];
impl->read_fd = stdout_pipe[0];
pid = fork();
if (pid == 0) {
char buf[1024];
char *const *p;
struct spa_strbuf s;
/* Double fork to avoid zombies; we don't want to set SIGCHLD handler */
pid = fork();
if (pid < 0) {
spa_log_error(impl->log, "fork error: %m");
goto done;
} else if (pid != 0) {
exit(0);
}
dup2(stdin_pipe[0], 0);
dup2(stdout_pipe[1], 1);
spa_strbuf_init(&s, buf, sizeof(buf));
for (p = argv; *p; ++p)
spa_strbuf_append(&s, " '%s'", *p);
spa_log_info(impl->log, "exec%s", s.buffer);
res = execvp(argv[0], argv);
if (res == -1) {
res = -errno;
spa_log_error(impl->log, "execvp error '%s': %m", argv[0]);
}
done:
exit(1);
} else if (pid < 0) {
spa_log_error(impl->log, "fork error: %m");
} else {
int status = 0;
do {
errno = 0;
res = waitpid(pid, &status, 0);
} while (res < 0 && errno == EINTR);
spa_log_debug(impl->log, "exec got pid %d res:%d status:%d", (int)pid, res, status);
}
return 0;
}
static void pipe_transfer(struct pipe_impl *impl, float *in, float *out, int count)
{
ssize_t sz;
sz = read(impl->read_fd, out, count * sizeof(float));
if (sz > 0) {
impl->read += sz;
if (impl->read == (size_t)sz) {
while ((sz = read(impl->read_fd, out, count * sizeof(float))) != -1)
impl->read += sz;
}
} else {
memset(out, 0, count * sizeof(float));
}
if ((sz = write(impl->write_fd, in, count * sizeof(float))) != -1)
impl->written += sz;
}
static void *pipe_instantiate(const struct spa_fga_plugin *plugin, const struct spa_fga_descriptor * Descriptor,
unsigned long SampleRate, int index, const char *config)
{
struct plugin *pl = SPA_CONTAINER_OF(plugin, struct plugin, plugin);
struct pipe_impl *impl;
struct spa_json it[2];
const char *val;
char key[256];
spa_autofree char*command = NULL;
int len;
errno = EINVAL;
if (config == NULL) {
spa_log_error(pl->log, "pipe: requires a config section");
return NULL;
}
if (spa_json_begin_object(&it[0], config, strlen(config)) <= 0) {
spa_log_error(pl->log, "pipe: config must be an object");
return NULL;
}
while ((len = spa_json_object_next(&it[0], key, sizeof(key), &val)) > 0) {
if (spa_streq(key, "command")) {
if ((command = malloc(len+1)) == NULL)
return NULL;
if (spa_json_parse_stringn(val, len, command, len+1) <= 0) {
spa_log_error(pl->log, "pipe: command requires a string");
return NULL;
}
}
else {
spa_log_warn(pl->log, "pipe: ignoring config key: '%s'", key);
}
}
if (command == NULL || command[0] == '\0') {
spa_log_error(pl->log, "pipe: command must be given and can not be empty");
return NULL;
}
impl = calloc(1, sizeof(*impl));
if (impl == NULL)
return NULL;
impl->plugin = pl;
impl->log = pl->log;
impl->dsp = pl->dsp;
impl->rate = SampleRate;
do_exec(impl, command);
fcntl(impl->write_fd, F_SETFL, fcntl(impl->write_fd, F_GETFL) | O_NONBLOCK);
fcntl(impl->read_fd, F_SETFL, fcntl(impl->read_fd, F_GETFL) | O_NONBLOCK);
return impl;
}
static void pipe_connect_port(void *Instance, unsigned long Port, float * DataLocation)
{
struct pipe_impl *impl = Instance;
impl->port[Port] = DataLocation;
}
static void pipe_run(void * Instance, unsigned long SampleCount)
{
struct pipe_impl *impl = Instance;
float *in = impl->port[0], *out = impl->port[1];
if (in != NULL && out != NULL)
pipe_transfer(impl, in, out, SampleCount);
}
static void pipe_cleanup(void * Instance)
{
struct pipe_impl *impl = Instance;
close(impl->write_fd);
close(impl->read_fd);
free(impl);
}
static struct spa_fga_port pipe_ports[] = {
{ .index = 0,
.name = "In",
.flags = SPA_FGA_PORT_INPUT | SPA_FGA_PORT_AUDIO,
},
{ .index = 1,
.name = "Out",
.flags = SPA_FGA_PORT_OUTPUT | SPA_FGA_PORT_AUDIO,
},
};
static const struct spa_fga_descriptor pipe_desc = {
.name = "pipe",
.flags = SPA_FGA_DESCRIPTOR_SUPPORTS_NULL_DATA,
.n_ports = SPA_N_ELEMENTS(pipe_ports),
.ports = pipe_ports,
.instantiate = pipe_instantiate,
.connect_port = pipe_connect_port,
.run = pipe_run,
.cleanup = pipe_cleanup,
};
static const struct spa_fga_descriptor * builtin_descriptor(unsigned long Index)
{
switch(Index) {
@ -2616,6 +2835,8 @@ static const struct spa_fga_descriptor * builtin_descriptor(unsigned long Index)
return &sqrt_desc;
case 27:
return &debug_desc;
case 28:
return &pipe_desc;
}
return NULL;
}