diff --git a/src/modules/meson.build b/src/modules/meson.build index 4d0608d8e..99a9b5362 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -140,6 +140,7 @@ pipewire_module_protocol_pulse = shared_library('pipewire-module-protocol-pulse' [ 'module-protocol-pulse.c', 'module-protocol-pulse/manager.c', 'module-protocol-pulse/pulse-server.c', + 'module-protocol-pulse/modules/module-combine-sink.c', 'module-protocol-pulse/modules/module-ladspa-sink.c', 'module-protocol-pulse/modules/module-ladspa-source.c', 'module-protocol-pulse/modules/module-loopback.c', diff --git a/src/modules/module-protocol-pulse/module.c b/src/modules/module-protocol-pulse/module.c index c5dcc7b39..952a12774 100644 --- a/src/modules/module-protocol-pulse/module.c +++ b/src/modules/module-protocol-pulse/module.c @@ -214,6 +214,7 @@ int module_args_to_audioinfo(struct impl *impl, struct pw_properties *props, str #include "modules/registry.h" static const struct module_info module_list[] = { + { "module-combine-sink", create_module_combine_sink, }, { "module-ladspa-sink", create_module_ladspa_sink, }, { "module-ladspa-source", create_module_ladspa_source, }, { "module-loopback", create_module_loopback, }, diff --git a/src/modules/module-protocol-pulse/modules/module-combine-sink.c b/src/modules/module-protocol-pulse/modules/module-combine-sink.c new file mode 100644 index 000000000..9d66852f2 --- /dev/null +++ b/src/modules/module-protocol-pulse/modules/module-combine-sink.c @@ -0,0 +1,471 @@ +/* PipeWire + * + * Copyright © 2021 Wim Taymans + * Copyright © 2021 Arun Raghavan + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include + +#include "../manager.h" +#include "../module.h" +#include "registry.h" + +#define MAX_SINKS 64 /* ... good enough for anyone */ + +static const struct spa_dict_item module_combine_sink_info[] = { + { PW_KEY_MODULE_AUTHOR, "Arun Raghavan " }, + { PW_KEY_MODULE_DESCRIPTION, "Combine multiple sinks into a single sink" }, + { PW_KEY_MODULE_USAGE, "sink_name= " + "sink_properties= " + /* not a great name, but for backwards compatibility... */ + "slaves= " + "rate= " + "channels= " + "channel_map= " }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct module_combine_sink_data; + +/* This goes to the stream event listener to be able to identify the stream on + * which the event occured and to have a link to the module data */ +struct combine_stream { + struct pw_stream *stream; + struct spa_hook stream_listener; + struct module_combine_sink_data *data; + bool cleanup; +}; + +struct module_combine_sink_data { + struct module *module; + + struct pw_core *core; + struct pw_manager *manager; + + struct pw_stream *sink; + + struct spa_hook core_listener; + struct spa_hook manager_listener; + struct spa_hook sink_listener; + + char *sink_name; + char **sink_names; + struct combine_stream streams[MAX_SINKS]; + + struct spa_source *cleanup; + + struct spa_audio_info_raw info; +}; + +/* Core connection: mainly to unload the module if the connection errors out */ +static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message) +{ + struct module_combine_sink_data *d = data; + struct module *module = d->module; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + module_schedule_unload(module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +/* Input stream: the "combine sink" */ +static void capture_process(void *d) +{ + struct module_combine_sink_data *data = d; + struct pw_buffer *in; + int i; + + if ((in = pw_stream_dequeue_buffer(data->sink)) == NULL) { + pw_log_warn("out of capture buffers: %m"); + return; + } + + for (i = 0; i < MAX_SINKS; i++) { + struct pw_buffer *out; + uint32_t j, size = 0; + int32_t stride = 0; + + if (data->streams[i].stream == NULL || data->streams[i].cleanup) + continue; + + if ((out = pw_stream_dequeue_buffer(data->streams[i].stream)) == NULL) { + pw_log_warn("out of playback buffers: %m"); + continue; + } + + if (in->buffer->n_datas != out->buffer->n_datas) { + pw_log_error("incompatible buffer planes"); + continue; + } + + for (j = 0; j < out->buffer->n_datas; j++) { + struct spa_data *ds, *dd; + + dd = &out->buffer->datas[j]; + + if (j < in->buffer->n_datas) { + ds = &in->buffer->datas[j]; + + memcpy(dd->data, + SPA_PTROFF(ds->data, ds->chunk->offset, void), + ds->chunk->size); + + size = SPA_MAX(size, ds->chunk->size); + stride = SPA_MAX(stride, ds->chunk->stride); + } else { + memset(dd->data, 0, size); + } + dd->chunk->offset = 0; + dd->chunk->size = size; + dd->chunk->stride = stride; + } + + pw_stream_queue_buffer(data->streams[i].stream, out); + } + + if (in != NULL) + pw_stream_queue_buffer(data->sink, in); +} + +static void on_in_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct module_combine_sink_data *data = d; + struct module *module = data->module; + + if (state == PW_STREAM_STATE_UNCONNECTED) { + pw_log_info("stream disconnected, unloading"); + module_schedule_unload(module); + } +} + +static const struct pw_stream_events in_stream_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = on_in_stream_state_changed, + .process = capture_process +}; + +/* Output streams: one per sink we have combined output to */ +static void on_out_stream_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct combine_stream *s = data; + + if (state == PW_STREAM_STATE_UNCONNECTED) { + s->cleanup = true; + pw_loop_signal_event(s->data->module->impl->loop, s->data->cleanup); + } +} + +static const struct pw_stream_events out_stream_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = on_out_stream_state_changed, +}; + +static void manager_added(void *d, struct pw_manager_object *o) +{ + struct module_combine_sink_data *data = d; + struct combine_stream *cstream; + struct pw_properties *props; + const struct spa_pod *params[1]; + const char *sink_name; + struct spa_pod_builder b; + uint32_t n_params; + char buffer[1024]; + int i, res; + + if (!pw_manager_object_is_sink(o)) + return; + + sink_name = pw_properties_get(o->props, PW_KEY_NODE_NAME); + + if (strcmp(sink_name, data->sink_name) == 0) { + /* That's the sink we created */ + return; + } + + if (data->sink_names) { + int i; + + for (i = 0; data->sink_names[i] != NULL; i++) { + if (strcmp(data->sink_names[i], sink_name) == 0) { + i = -1; + break; + } + } + + /* This sink isn't in our list */ + if (i > -1) + return; + } + + pw_log_info("Adding %s to combine outputs", sink_name); + + for (i = 0; i < MAX_SINKS; i++) + if (data->streams[i].stream == NULL) + break; + + if (i == MAX_SINKS) { + pw_log_error("Cannot combine more than %u sinks", MAX_SINKS); + return; + } else { + cstream = &data->streams[i]; + } + + snprintf(buffer, sizeof(buffer), "Simultaneous output on %s", + pw_properties_get(o->props, PW_KEY_NODE_DESCRIPTION)); + + props = pw_properties_new(NULL, NULL); + pw_properties_set(props, PW_KEY_NODE_TARGET, sink_name); + pw_properties_setf(props, PW_KEY_NODE_GROUP, "combine_sink-%u", data->module->idx); + pw_properties_set(props, PW_KEY_NODE_DONT_RECONNECT, "true"); + pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); + pw_properties_set(props, PW_KEY_NODE_PASSIVE, "true"); + + cstream->data = data; + cstream->stream = pw_stream_new(data->core, buffer, props); + if (cstream->stream == NULL) { + pw_log_error("Could not create stream"); + return; + } + + pw_stream_add_listener(cstream->stream, + &cstream->stream_listener, + &out_stream_events, cstream); + + n_params = 0; + b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &data->info); + + if ((res = pw_stream_connect(cstream->stream, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) { + pw_log_error("Could not connect to sink '%s'", sink_name); + return; + } +} + +static const struct pw_manager_events manager_events = { + PW_VERSION_MANAGER_EVENTS, + .added = manager_added, +}; + +static void cleanup_stream(struct combine_stream *s) { + pw_stream_destroy(s->stream); + s->stream = NULL; + s->data = NULL; + s->cleanup = false; +} + +static void on_cleanup(void *d, uint64_t count) +{ + struct module_combine_sink_data *data = d; + int i; + + for (i = 0; i < MAX_SINKS; i++) { + if (data->streams[i].cleanup) + cleanup_stream(&data->streams[i]); + } +} + +static int module_combine_sink_load(struct client *client, struct module *module) +{ + struct module_combine_sink_data *data = module->user_data; + struct pw_properties *props; + int res; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + + data->core = pw_context_connect(module->impl->context, + pw_properties_copy(client->props), + 0); + if (data->core == NULL) + return -errno; + + pw_core_add_listener(data->core, + &data->core_listener, + &core_events, data); + + props = pw_properties_new(NULL, NULL); + pw_properties_set(props, PW_KEY_NODE_NAME, data->sink_name); + pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + pw_properties_setf(props, PW_KEY_NODE_GROUP, "combine_sink-%u", data->module->idx); + pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); + + data->sink = pw_stream_new(data->core, data->sink_name, props); + if (data->sink == NULL) + return -errno; + + pw_stream_add_listener(data->sink, + &data->sink_listener, + &in_stream_events, data); + + n_params = 0; + params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &data->info); + + if ((res = pw_stream_connect(data->sink, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + data->manager = pw_manager_new(data->core); + if (client->manager == NULL) + return -errno; + + pw_manager_add_listener(data->manager, &data->manager_listener, + &manager_events, data); + + data->cleanup = pw_loop_add_event(module->impl->loop, on_cleanup, data); + + pw_log_info("loaded module %p id:%u name:%s", module, module->idx, module->name); + module_emit_loaded(module, 0); + + return 0; +} + +static int module_combine_sink_unload(struct client *client, struct module *module) +{ + struct module_combine_sink_data *d = module->user_data; + int i; + + pw_log_info("unload module %p id:%u name:%s", module, module->idx, module->name); + + /* Note that we explicitly disconnect the hooks to avoid having the + * cleanup triggered again in those callbacks */ + if (d->sink != NULL) { + spa_hook_remove(&d->sink_listener); + pw_stream_destroy(d->sink); + } + for (i = 0; i < MAX_SINKS; i++) { + if (d->streams[i].stream) + cleanup_stream(&d->streams[i]); + } + if (d->manager != NULL) + pw_manager_destroy(d->manager); + if (d->core != NULL) + pw_core_disconnect(d->core); + if (d->sink_names) + pw_free_strv(d->sink_names); + free(d->sink_name); + + return 0; +} + +static const struct module_methods module_combine_sink_methods = { + VERSION_MODULE_METHODS, + .load = module_combine_sink_load, + .unload = module_combine_sink_unload, +}; + +struct module *create_module_combine_sink(struct impl *impl, const char *argument) +{ + struct module *module; + struct module_combine_sink_data *d; + struct pw_properties *props = NULL; + const char *str; + char *sink_name, **sink_names = NULL; + struct spa_audio_info_raw info = { 0 }; + int i, n, res; + + props = pw_properties_new_dict(&SPA_DICT_INIT_ARRAY(module_combine_sink_info)); + if (!props) { + res = -EINVAL; + goto out; + } + if (argument) + module_args_add_props(props, argument); + + if ((str = pw_properties_get(props, "sink_name")) != NULL) { + sink_name = strdup(str); + pw_properties_set(props, "sink_name", NULL); + } else { + sink_name = strdup("combined"); + } + + if ((str = pw_properties_get(props, "slaves")) != NULL) { + sink_names = pw_split_strv(str, ",", MAX_SINKS, &n); + pw_properties_set(props, "slaves", NULL); + } + + if ((str = pw_properties_get(props, "adjust_time")) != NULL) { + pw_log_info("The `adjust_time` modarg is ignored"); + pw_properties_set(props, "slaves", NULL); + } + + if ((str = pw_properties_get(props, "resample_method")) != NULL) { + pw_log_info("The `resample_method` modarg is ignored"); + pw_properties_set(props, "slaves", NULL); + } + + if (module_args_to_audioinfo(impl, props, &info) < 0) { + res = -EINVAL; + goto out; + } + + module = module_new(impl, &module_combine_sink_methods, sizeof(*d)); + if (module == NULL) { + res = -errno; + goto out; + } + + module->props = props; + d = module->user_data; + d->module = module; + d->info = info; + d->sink_name = sink_name; + d->sink_names = sink_names; + for (i = 0; i < MAX_SINKS; i++) { + d->streams[i].stream = NULL; + d->streams[i].cleanup = false; + } + + return module; +out: + if (props) + pw_properties_free(props); + if (sink_names) + pw_free_strv(sink_names); + errno = -res; + + return NULL; +} diff --git a/src/modules/module-protocol-pulse/modules/registry.h b/src/modules/module-protocol-pulse/modules/registry.h index e86504c8d..066000065 100644 --- a/src/modules/module-protocol-pulse/modules/registry.h +++ b/src/modules/module-protocol-pulse/modules/registry.h @@ -28,6 +28,7 @@ #include "../internal.h" +struct module *create_module_combine_sink(struct impl *impl, const char *argument); struct module *create_module_ladspa_sink(struct impl *impl, const char *argument); struct module *create_module_ladspa_source(struct impl *impl, const char *argument); struct module *create_module_loopback(struct impl *impl, const char *argument);