diff --git a/spa/plugins/filter-graph/builtin_plugin.c b/spa/plugins/filter-graph/builtin_plugin.c index 6f2f0e8fd..6c60113cf 100644 --- a/spa/plugins/filter-graph/builtin_plugin.c +++ b/spa/plugins/filter-graph/builtin_plugin.c @@ -11,9 +11,13 @@ #endif #include #include +#include +#include +#include #include #include +#include #include #include #include @@ -2557,6 +2561,221 @@ static const struct spa_fga_descriptor debug_desc = { .cleanup = builtin_cleanup, }; +/* pipe */ +struct pipe_impl { + struct plugin *plugin; + + struct spa_log *log; + struct spa_fga_dsp *dsp; + unsigned long rate; + float *port[3]; + float latency; + + int write_fd; + int read_fd; + size_t written; + size_t read; +}; + +static int do_exec(struct pipe_impl *impl, const char *command) +{ + int pid, res, len, argc = 0; + char *argv[512]; + struct spa_json it[2]; + const char *value; + int stdin_pipe[2]; + int stdout_pipe[2]; + + if (spa_json_begin_array_relax(&it[0], command, strlen(command)) <= 0) + return -EINVAL; + + while ((len = spa_json_next(&it[0], &value)) > 0) { + char *s; + + if ((s = malloc(len+1)) == NULL) + return -errno; + + spa_json_parse_stringn(value, len, s, len+1); + + argv[argc++] = s; + } + argv[argc++] = NULL; + + pipe2(stdin_pipe, 0); + pipe2(stdout_pipe, 0); + + impl->write_fd = stdin_pipe[1]; + impl->read_fd = stdout_pipe[0]; + + pid = fork(); + + if (pid == 0) { + char buf[1024]; + char *const *p; + struct spa_strbuf s; + + /* Double fork to avoid zombies; we don't want to set SIGCHLD handler */ + pid = fork(); + + if (pid < 0) { + spa_log_error(impl->log, "fork error: %m"); + goto done; + } else if (pid != 0) { + exit(0); + } + + dup2(stdin_pipe[0], 0); + dup2(stdout_pipe[1], 1); + + spa_strbuf_init(&s, buf, sizeof(buf)); + for (p = argv; *p; ++p) + spa_strbuf_append(&s, " '%s'", *p); + + spa_log_info(impl->log, "exec%s", s.buffer); + res = execvp(argv[0], argv); + + if (res == -1) { + res = -errno; + spa_log_error(impl->log, "execvp error '%s': %m", argv[0]); + } +done: + exit(1); + } else if (pid < 0) { + spa_log_error(impl->log, "fork error: %m"); + } else { + int status = 0; + do { + errno = 0; + res = waitpid(pid, &status, 0); + } while (res < 0 && errno == EINTR); + spa_log_debug(impl->log, "exec got pid %d res:%d status:%d", (int)pid, res, status); + } + return 0; +} + +static void pipe_transfer(struct pipe_impl *impl, float *in, float *out, int count) +{ + ssize_t sz; + + sz = read(impl->read_fd, out, count * sizeof(float)); + if (sz > 0) { + impl->read += sz; + if (impl->read == (size_t)sz) { + while ((sz = read(impl->read_fd, out, count * sizeof(float))) != -1) + impl->read += sz; + } + } else { + memset(out, 0, count * sizeof(float)); + } + if ((sz = write(impl->write_fd, in, count * sizeof(float))) != -1) + impl->written += sz; +} + +static void *pipe_instantiate(const struct spa_fga_plugin *plugin, const struct spa_fga_descriptor * Descriptor, + unsigned long SampleRate, int index, const char *config) +{ + struct plugin *pl = SPA_CONTAINER_OF(plugin, struct plugin, plugin); + struct pipe_impl *impl; + struct spa_json it[2]; + const char *val; + char key[256]; + spa_autofree char*command = NULL; + int len; + + errno = EINVAL; + if (config == NULL) { + spa_log_error(pl->log, "pipe: requires a config section"); + return NULL; + } + + if (spa_json_begin_object(&it[0], config, strlen(config)) <= 0) { + spa_log_error(pl->log, "pipe: config must be an object"); + return NULL; + } + + while ((len = spa_json_object_next(&it[0], key, sizeof(key), &val)) > 0) { + if (spa_streq(key, "command")) { + if ((command = malloc(len+1)) == NULL) + return NULL; + + if (spa_json_parse_stringn(val, len, command, len+1) <= 0) { + spa_log_error(pl->log, "pipe: command requires a string"); + return NULL; + } + } + else { + spa_log_warn(pl->log, "pipe: ignoring config key: '%s'", key); + } + } + if (command == NULL || command[0] == '\0') { + spa_log_error(pl->log, "pipe: command must be given and can not be empty"); + return NULL; + } + + impl = calloc(1, sizeof(*impl)); + if (impl == NULL) + return NULL; + + impl->plugin = pl; + impl->log = pl->log; + impl->dsp = pl->dsp; + impl->rate = SampleRate; + + do_exec(impl, command); + + fcntl(impl->write_fd, F_SETFL, fcntl(impl->write_fd, F_GETFL) | O_NONBLOCK); + fcntl(impl->read_fd, F_SETFL, fcntl(impl->read_fd, F_GETFL) | O_NONBLOCK); + + return impl; +} + +static void pipe_connect_port(void *Instance, unsigned long Port, float * DataLocation) +{ + struct pipe_impl *impl = Instance; + impl->port[Port] = DataLocation; +} + +static void pipe_run(void * Instance, unsigned long SampleCount) +{ + struct pipe_impl *impl = Instance; + float *in = impl->port[0], *out = impl->port[1]; + + if (in != NULL && out != NULL) + pipe_transfer(impl, in, out, SampleCount); +} + +static void pipe_cleanup(void * Instance) +{ + struct pipe_impl *impl = Instance; + close(impl->write_fd); + close(impl->read_fd); + free(impl); +} + +static struct spa_fga_port pipe_ports[] = { + { .index = 0, + .name = "In", + .flags = SPA_FGA_PORT_INPUT | SPA_FGA_PORT_AUDIO, + }, + { .index = 1, + .name = "Out", + .flags = SPA_FGA_PORT_OUTPUT | SPA_FGA_PORT_AUDIO, + }, +}; + +static const struct spa_fga_descriptor pipe_desc = { + .name = "pipe", + .flags = SPA_FGA_DESCRIPTOR_SUPPORTS_NULL_DATA, + + .n_ports = SPA_N_ELEMENTS(pipe_ports), + .ports = pipe_ports, + + .instantiate = pipe_instantiate, + .connect_port = pipe_connect_port, + .run = pipe_run, + .cleanup = pipe_cleanup, +}; + static const struct spa_fga_descriptor * builtin_descriptor(unsigned long Index) { switch(Index) { @@ -2616,6 +2835,8 @@ static const struct spa_fga_descriptor * builtin_descriptor(unsigned long Index) return &sqrt_desc; case 27: return &debug_desc; + case 28: + return &pipe_desc; } return NULL; } diff --git a/src/modules/module-filter-chain.c b/src/modules/module-filter-chain.c index 8f1b11d16..19488d914 100644 --- a/src/modules/module-filter-chain.c +++ b/src/modules/module-filter-chain.c @@ -704,6 +704,38 @@ extern struct spa_handle_factory spa_filter_graph_factory; * control from "Control" will be copied to "Notify" and the control value will be * dumped into the INFO log. * + * ### pipe + * + * The pipe plugin can be used to filter the audio with another application using pipes + * for sending and receiving the raw audio. + * + * The application needs to consume raw float32 samples from stdin and produce filtered + * float32 samples on stdout. + * + * It has an "In" input port and an "Out" output data ports. + * + * The node requires a `config` section with extra configuration: + * + *\code{.unparsed} + * filter.graph = { + * nodes = [ + * { + * type = builtin + * name = ... + * label = pipe + * config = { + * command = "ffmpeg -f f32le -ac 1 -ar 48000 -blocksize 1024 -fflags nobuffer -i \"pipe:\" \"-filter:a\" \"loudnorm=I=-18:TP=-3:LRA=4\" -f f32le -ac 1 -ar 48000 \"pipe:\"" + * } + * ... + * } + * } + * ... + * } + *\endcode + * + * - `command` the command to execute. It should consume samples from stdin and produce + * samples on stdout. + * * ## General options * * Options with well-known behavior. Most options can be added to the global