diff --git a/src/modules/module-filter-chain.c b/src/modules/module-filter-chain.c index 84b70d651..673c3ea9a 100644 --- a/src/modules/module-filter-chain.c +++ b/src/modules/module-filter-chain.c @@ -13,14 +13,20 @@ #include #include +#include #include +#include #include #include #include +#include #include #include +#include "pipewire/extensions/session-manager/interfaces.h" + + #define NAME "filter-chain" PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); @@ -1012,6 +1018,35 @@ extern struct spa_handle_factory spa_filter_graph_factory; * prefixed with 'input.' and 'output.' to generate a capture and playback * stream node.name respectively. * + * ## Control Stream + * + * The module can provide an input stream for control data. It can only be + * configured with the property `control.props`, properties on the filter + * definition will not be taken into account. + * + * The control stream has to be enabled by setting the attribute `enabled` of + * the `control.props` to true. + * + *\code{.unparsed} + * # ~/.config/pipewire/filter-chain.conf.d/my-filter-chain-1.conf + * + * context.modules = [ + * { name = libpipewire-module-filter-chain + * args = { + * ... + * filter.graph = {...} + * capture.props = {...} + * control.props = { + * node.name = "control_in", + * node.autoconnect = true, + * enabled = true, + * media.class = "Stream/Input/Control" + * } + * } + * } + * ] + *\endcode + * * ## Example configuration of a virtual source * * This example uses the rnnoise LADSPA plugin to create a new @@ -1156,6 +1191,7 @@ static const struct spa_dict_item module_props[] = { #include #define DEFAULT_RATE 48000 +#define DEFAULT_FEEDBACK_BUFFER_SIZE 49152 struct impl { struct pw_context *context; @@ -1179,6 +1215,20 @@ struct impl { struct spa_hook playback_listener; struct spa_audio_info_raw playback_info; + struct pw_properties *control_props; + struct pw_stream *control; + struct spa_hook control_listener; + bool control_stream_active; + + struct pw_properties *feedback_props; + struct pw_stream *feedback; + struct spa_hook feedback_listener; + bool feedback_stream_active; + + struct spa_ringbuffer feedback_ringbuffer; + uint32_t feedback_ringbuffer_size; + void *feedback_ringbuffer_data; + struct spa_audio_info_raw info; struct spa_io_position *position; @@ -1222,6 +1272,131 @@ static void capture_process(void *d) } } +static int apply_props(struct spa_loop *loop, bool async, uint32_t seq, const void *data, + size_t size, void *user_data) +{ + const struct impl *impl = user_data; + const struct spa_pod *pod = data; + + struct spa_pod_control *pod_control; + SPA_POD_SEQUENCE_FOREACH((struct spa_pod_sequence *)pod, pod_control) { + if (pod_control->type == SPA_CONTROL_Properties) { + spa_filter_graph_set_props(impl->graph, SPA_DIRECTION_OUTPUT, + &pod_control->value); + } + } + + return 0; +} + +static void control_process(void *d) +{ + struct impl *impl = d; + + struct pw_buffer *control = pw_stream_dequeue_buffer(impl->control); + if (control == NULL) + return; + if (control->buffer->n_datas > 0) { + struct spa_pod *pod = spa_pod_from_data( + control->buffer->datas[0].data, + control->buffer->datas[0].maxsize, + control->buffer->datas[0].chunk->offset, + control->buffer->datas[0].chunk->size); + + if (spa_pod_is_sequence(pod)) { + struct pw_loop *loop = pw_context_get_main_loop(impl->context); + pw_loop_invoke(loop, apply_props, 0, pod, SPA_POD_SIZE(pod), + false, impl); + } + } + + pw_stream_queue_buffer(impl->control, control); +} + +static void feedback_process(void *d) { + struct impl *impl = d; + + struct pw_buffer *feedback = pw_stream_dequeue_buffer(impl->feedback); + if (feedback == NULL) { + pw_log_debug("%p: out of control buffers: %m", &impl); + return; + } + + struct spa_buffer *spa_buffer = feedback->buffer; + if (!spa_buffer || spa_buffer->n_datas < 1) { + pw_log_warn("control buffer has no data planes"); + return; + } + + struct spa_data *data = &spa_buffer->datas[0]; + if (!data->data || !data->chunk || data->maxsize == 0) { + pw_log_warn("invalid control buffer data"); + return; + } + + struct spa_pod_builder b; + spa_pod_builder_init(&b, data->data, data->maxsize); + + struct spa_pod_frame seq_f; + spa_pod_builder_push_sequence(&b, &seq_f, 0); + + uint8_t message_buffer[4096]; + uint32_t index; + while (spa_ringbuffer_get_read_index(&impl->feedback_ringbuffer, &index) > 0) { + fprintf(stdout, "reading from ringbuffer index: %d\n", index); + + uint64_t pod_data_size; + spa_ringbuffer_read_data( + &impl->feedback_ringbuffer, + impl->feedback_ringbuffer_data, + impl->feedback_ringbuffer_size, + index % impl->feedback_ringbuffer_size, + &pod_data_size, + sizeof(uint64_t)); + + struct spa_pod_builder_state state; + spa_pod_builder_get_state(&b, &state); + fprintf(stdout, "pod_data_size: %lu\n", pod_data_size); + fprintf(stdout, "builder size:%u maxsize:%d offset:%d\n", + b.size, data->maxsize, state.offset); + + // next control size: round_up_8(sizeof(struct spa_pod_control) + data_size) + /* + if (pod_data_size + sizeof(struct spa_pod_control) + sizeof( + struct spa_pod_sequence) > b.size - state.offset) { + fprintf(stdout, "buffer too small\n"); + pw_log_warn("not enough space in builder for pod data"); + break; + } + */ + + spa_ringbuffer_read_data( + &impl->feedback_ringbuffer, + impl->feedback_ringbuffer_data, + impl->feedback_ringbuffer_size, + (index + sizeof(uint64_t)) % impl->feedback_ringbuffer_size, + &message_buffer, + pod_data_size); + + spa_ringbuffer_read_update(&impl->feedback_ringbuffer, + index + sizeof(uint64_t) + pod_data_size); + + fprintf(stdout, "adding control data\n"); + spa_pod_builder_control(&b, 0, SPA_CONTROL_Properties); + spa_pod_builder_raw(&b, message_buffer, pod_data_size); + + fprintf(stdout, "builder size:%u maxsize:%d offset:%d\n", + b.size, data->maxsize, state.offset); + } + + const struct spa_sequence *spa_sequence = spa_pod_builder_pop(&b, &seq_f); + const uint32_t seq_size = SPA_POD_SIZE(spa_sequence); + data->chunk->offset = 0; + data->chunk->size = seq_size; + data->chunk->stride = 0; + pw_stream_queue_buffer(impl->feedback, feedback); +} + static void playback_process(void *d) { struct impl *impl = d; @@ -1581,6 +1756,76 @@ static const struct pw_stream_events out_stream_events = { .param_changed = playback_param_changed, }; +static void control_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->control_listener); + impl->control = NULL; +} + +static void control_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct impl *impl = data; + + switch (state) { + case PW_STREAM_STATE_PAUSED: + pw_stream_flush(impl->control, false); + break; + case PW_STREAM_STATE_ERROR: + pw_log_info("module %p: error: %s", impl, error); + break; + case PW_STREAM_STATE_STREAMING: + default: + break; + } + return; +} + +static const struct pw_stream_events control_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = control_destroy, + .process = control_process, + .io_changed = NULL, + .state_changed = control_state_changed, + .param_changed = NULL, +}; + +static void feedback_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->feedback_listener); + impl->feedback = NULL; +} + +static void feedback_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct impl *impl = data; + + switch (state) { + case PW_STREAM_STATE_PAUSED: + pw_stream_flush(impl->feedback, false); + break; + case PW_STREAM_STATE_ERROR: + pw_log_info("module %p: error: %s", impl, error); + break; + case PW_STREAM_STATE_STREAMING: + default: + break; + } + return; +} + +static const struct pw_stream_events feedback_stream_events = { + .version = PW_VERSION_STREAM_EVENTS, + .destroy = feedback_destroy, + .process = feedback_process, + .io_changed = NULL, + .state_changed = feedback_state_changed, + .param_changed = NULL, +}; + static int setup_streams(struct impl *impl) { int res; @@ -1590,6 +1835,72 @@ static int setup_streams(struct impl *impl) struct spa_pod_dynamic_builder b; struct spa_filter_graph *graph = impl->graph; + if (impl->control_stream_active) { + impl->control = pw_stream_new(impl->core, + "filter control", impl->control_props); + impl->control_props = NULL; + if (impl->control == NULL) + return -errno; + + pw_stream_add_listener(impl->control, + &impl->control_listener, + &control_stream_events, impl); + + uint8_t buffer[256]; + struct spa_pod_builder bt = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + const struct spa_pod *param[1]; + + param[0] = spa_pod_builder_add_object(&bt, + SPA_TYPE_OBJECT_Format, + SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, + SPA_POD_Id(SPA_MEDIA_TYPE_application), + SPA_FORMAT_mediaSubtype, + SPA_POD_Id(SPA_MEDIA_SUBTYPE_control) + ); + + pw_stream_connect(impl->control, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + param, 1); + } + + if (impl->feedback_stream_active) { + impl->feedback = pw_stream_new(impl->core, + "filter feedback", impl->feedback_props); + impl->feedback_props = NULL; + if (impl->feedback == NULL) + return -errno; + + pw_stream_add_listener(impl->feedback, + &impl->feedback_listener, + &feedback_stream_events, impl); + + uint8_t buffer[256]; + struct spa_pod_builder bt = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + const struct spa_pod *param[1]; + + param[0] = spa_pod_builder_add_object(&bt, + SPA_TYPE_OBJECT_Format, + SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, + SPA_POD_Id(SPA_MEDIA_TYPE_application), + SPA_FORMAT_mediaSubtype, + SPA_POD_Id(SPA_MEDIA_SUBTYPE_control) + ); + + pw_stream_connect(impl->feedback, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + param, 1); + } + impl->capture = pw_stream_new(impl->core, "filter capture", impl->capture_props); impl->capture_props = NULL; @@ -1751,6 +2062,36 @@ static void graph_props_changed(void *object, enum spa_direction direction) spa_filter_graph_get_props(graph, &b.b, (struct spa_pod **)¶ms[0]); pw_stream_update_params(impl->capture, params, 1); + + if (impl->feedback_stream_active) { + fprintf(stdout, "graph props changed\n"); + uint32_t index; + int32_t write_index_return = spa_ringbuffer_get_write_index(&impl->feedback_ringbuffer, &index); + const uint32_t available = impl->feedback_ringbuffer_size - write_index_return; + const uint64_t pod_length = SPA_POD_SIZE(params[0]); + const uint64_t full_length = pod_length + sizeof(uint64_t); + if (available >= full_length) { + fprintf(stdout, "writing to buffer\n"); + fprintf(stdout, "writing data, pod length: %lu\nindex: %u\n", + pod_length, index); + + spa_ringbuffer_write_data(&impl->feedback_ringbuffer, + impl->feedback_ringbuffer_data, impl->feedback_ringbuffer_size, + index % impl->feedback_ringbuffer_size, &pod_length, + sizeof(uint64_t)); + + spa_ringbuffer_write_data(&impl->feedback_ringbuffer, + impl->feedback_ringbuffer_data, impl->feedback_ringbuffer_size, + (index + sizeof(uint64_t)) % impl->feedback_ringbuffer_size, params[0], + SPA_POD_SIZE(params[0])); + + spa_ringbuffer_write_update(&impl->feedback_ringbuffer, + index + sizeof(uint64_t) + SPA_POD_SIZE(params[0])); + } else { + fprintf(stdout, "not enough space in ringbuffer, available: %u, required: %lu, write index return: %i, buffer size: %u\n", available, full_length, write_index_return, impl->feedback_ringbuffer_size); + } + } + spa_pod_dynamic_builder_clean(&b); } @@ -1796,16 +2137,24 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { - /* disconnect both streams before destroying any of them */ + /* disconnect all streams before destroying any of them */ if (impl->capture) pw_stream_disconnect(impl->capture); if (impl->playback) pw_stream_disconnect(impl->playback); + if (impl->control) + pw_stream_disconnect(impl->control); + if (impl->feedback) + pw_stream_disconnect(impl->feedback); if (impl->capture) pw_stream_destroy(impl->capture); if (impl->playback) pw_stream_destroy(impl->playback); + if (impl->control) + pw_stream_destroy(impl->control); + if (impl->feedback) + pw_stream_destroy(impl->feedback); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -1815,6 +2164,8 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->capture_props); pw_properties_free(impl->playback_props); + pw_properties_free(impl->control_props); + pw_properties_free(impl->feedback_props); pw_properties_free(impl->props); free(impl); @@ -1854,6 +2205,22 @@ static void copy_props(struct impl *impl, struct pw_properties *props, const cha } } +static void copy_props_control(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (impl->control_stream_active) { + if (pw_properties_get(impl->control_props, key) == NULL) + pw_properties_set(impl->control_props, key, str); + } + + if (impl->feedback_stream_active) { + if (pw_properties_get(impl->feedback_props, key) == NULL) + pw_properties_set(impl->feedback_props, key, str); + } + } +} + SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { @@ -1889,7 +2256,10 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->capture_props = pw_properties_new(NULL, NULL); impl->playback_props = pw_properties_new(NULL, NULL); - if (impl->capture_props == NULL || impl->playback_props == NULL) { + impl->control_props = pw_properties_new(NULL, NULL); + impl->feedback_props = pw_properties_new(NULL, NULL); + if (impl->capture_props == NULL || impl->playback_props == NULL || + impl->control_props == NULL || impl->feedback_props == NULL) { res = -errno; pw_log_error( "can't create properties: %m"); goto error; @@ -1915,6 +2285,10 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_properties_update_string(impl->capture_props, str, strlen(str)); if ((str = pw_properties_get(props, "playback.props")) != NULL) pw_properties_update_string(impl->playback_props, str, strlen(str)); + if ((str = pw_properties_get(props, "control.props")) != NULL) + pw_properties_update_string(impl->control_props, str, strlen(str)); + if ((str = pw_properties_get(props, "feedback.props")) != NULL) + pw_properties_update_string(impl->feedback_props, str, strlen(str)); copy_props(impl, props, PW_KEY_AUDIO_RATE); copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); @@ -1927,6 +2301,21 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, PW_KEY_MEDIA_NAME); copy_props(impl, props, "resample.prefill"); + impl->control_stream_active = pw_properties_get_bool(impl->control_props, "enabled", false); + impl->feedback_stream_active = pw_properties_get_bool(impl->feedback_props, "enabled", false); + + if (impl->feedback_stream_active) { + spa_ringbuffer_init(&impl->feedback_ringbuffer); + impl->feedback_ringbuffer_data = malloc(DEFAULT_FEEDBACK_BUFFER_SIZE); + impl->feedback_ringbuffer_size = DEFAULT_FEEDBACK_BUFFER_SIZE; + } + + copy_props_control(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props_control(impl, props, PW_KEY_NODE_GROUP); + copy_props_control(impl, props, PW_KEY_NODE_LINK_GROUP); + copy_props_control(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props_control(impl, props, PW_KEY_MEDIA_NAME); + parse_audio_info(impl->capture_props, &impl->capture_info); parse_audio_info(impl->playback_props, &impl->playback_info); @@ -1965,6 +2354,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (pw_properties_get(impl->playback_props, PW_KEY_MEDIA_NAME) == NULL) pw_properties_setf(impl->playback_props, PW_KEY_MEDIA_NAME, "%s output", pw_properties_get(impl->playback_props, PW_KEY_NODE_DESCRIPTION)); + if (pw_properties_get(impl->control_props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(impl->control_props, PW_KEY_MEDIA_NAME, "%s control", + pw_properties_get(impl->control_props, PW_KEY_NODE_DESCRIPTION)); + if (pw_properties_get(impl->feedback_props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(impl->feedback_props, PW_KEY_MEDIA_NAME, "%s feedback", + pw_properties_get(impl->feedback_props, PW_KEY_NODE_DESCRIPTION)); p = pw_context_get_properties(impl->context); pw_properties_set(props, "clock.quantum-limit", diff --git a/src/tools/meson.build b/src/tools/meson.build index 8147906fb..c65fe2bd8 100644 --- a/src/tools/meson.build +++ b/src/tools/meson.build @@ -9,6 +9,7 @@ tools_sources = [ [ 'pw-metadata', [ 'pw-metadata.c' ] ], [ 'pw-loopback', [ 'pw-loopback.c' ] ], [ 'pw-link', [ 'pw-link.c' ] ], + [ 'pw-ctrldump', [ 'pw-ctrldump.c' ] ], ] foreach t : tools_sources diff --git a/src/tools/pw-ctrldump.c b/src/tools/pw-ctrldump.c new file mode 100644 index 000000000..af3cad92d --- /dev/null +++ b/src/tools/pw-ctrldump.c @@ -0,0 +1,190 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2020 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "midifile.h" + +struct data; + +struct port { + struct data *data; +}; + +struct data { + struct pw_main_loop *loop; + const char *opt_remote; + struct pw_filter *filter; + struct port *in_port; + int64_t clock_time; +}; + +static void on_process(void *_data, struct spa_io_position *position) +{ + struct data *data = _data; + struct pw_buffer *b; + struct spa_buffer *buf; + struct spa_data *d; + struct spa_pod_parser parser; + struct spa_pod_frame frame; + struct spa_pod_sequence seq; + const void *seq_body, *c_body; + struct spa_pod_control c; + uint64_t offset; + + offset = data->clock_time; + data->clock_time += position->clock.duration; + + b = pw_filter_dequeue_buffer(data->in_port); + if (b == NULL) + return; + + buf = b->buffer; + d = &buf->datas[0]; + + if (d->data == NULL) + goto done; + + spa_pod_parser_init_from_data(&parser, d->data, d->maxsize, d->chunk->offset, d->chunk->size); + + if (spa_pod_parser_push_sequence_body(&parser, &frame, &seq, &seq_body) < 0) + goto done; + + while (spa_pod_parser_get_control_body(&parser, &c, &c_body) >= 0) { + struct midi_event ev; + + if (c.type != SPA_CONTROL_Properties) + continue; + + fprintf(stdout, "received message"); + } + +done: + pw_filter_queue_buffer(data->in_port, b); +} + +static const struct pw_filter_events filter_events = { + PW_VERSION_FILTER_EVENTS, + .process = on_process, +}; + +static void do_quit(void *userdata, int signal_number) +{ + struct data *data = userdata; + pw_main_loop_quit(data->loop); +} + +static int dump_filter(struct data *data) +{ + data->loop = pw_main_loop_new(NULL); + if (data->loop == NULL) + return -errno; + + pw_loop_add_signal(pw_main_loop_get_loop(data->loop), SIGINT, do_quit, data); + pw_loop_add_signal(pw_main_loop_get_loop(data->loop), SIGTERM, do_quit, data); + + data->filter = pw_filter_new_simple( + pw_main_loop_get_loop(data->loop), + "ctrl-dump", + pw_properties_new( + PW_KEY_REMOTE_NAME, data->opt_remote, + PW_KEY_MEDIA_TYPE, "Control", + PW_KEY_MEDIA_CATEGORY, "Filter", + PW_KEY_MEDIA_ROLE, "DSP", + NULL), + &filter_events, + data); + + data->in_port = pw_filter_add_port(data->filter, + PW_DIRECTION_INPUT, + PW_FILTER_PORT_FLAG_MAP_BUFFERS, + sizeof(struct port), + pw_properties_new( + PW_KEY_FORMAT_DSP, "8 bit raw", + PW_KEY_PORT_NAME, "input", + NULL), + NULL, 0); + + if (pw_filter_connect(data->filter, PW_FILTER_FLAG_RT_PROCESS, NULL, 0) < 0) { + fprintf(stderr, "can't connect\n"); + return -1; + } + + pw_main_loop_run(data->loop); + + pw_filter_destroy(data->filter); + pw_main_loop_destroy(data->loop); + + return 0; +} + +static void show_help(const char *name, bool error) +{ + fprintf(error ? stderr : stdout, "%s [options] [FILE]\n" + " -h, --help Show this help\n" + " --version Show version\n" + " -r, --remote Remote daemon name\n", + name); +} + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + int res = 0, c; + static const struct option long_options[] = { + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'V' }, + { "remote", required_argument, NULL, 'r' }, + { NULL, 0, NULL, 0} + }; + + setlocale(LC_ALL, ""); + pw_init(&argc, &argv); + + setlinebuf(stdout); + + while ((c = getopt_long(argc, argv, "hVr:", long_options, NULL)) != -1) { + switch (c) { + case 'h': + show_help(argv[0], false); + return 0; + case 'V': + printf("%s\n" + "Compiled with libpipewire %s\n" + "Linked with libpipewire %s\n", + argv[0], + pw_get_headers_version(), + pw_get_library_version()); + return 0; + case 'r': + data.opt_remote = optarg; + break; + default: + show_help(argv[0], true); + return -1; + } + } + + if (optind < argc) { + fprintf(stderr, "write to file not possible\n"); + } else { + res = dump_filter(&data); + } + pw_deinit(); + return res; +}