diff --git a/src/modules/module-protocol-pulse/modules/module-combine-sink.c b/src/modules/module-protocol-pulse/modules/module-combine-sink.c index 1dcbc2a44..86536f775 100644 --- a/src/modules/module-protocol-pulse/modules/module-combine-sink.c +++ b/src/modules/module-protocol-pulse/modules/module-combine-sink.c @@ -24,6 +24,7 @@ */ #include +#include #include #include @@ -49,130 +50,38 @@ static const struct spa_dict_item module_combine_sink_info[] = { "slaves= " "rate= " "channels= " - "channel_map= " }, + "channel_map= " + "remix= " }, { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; struct module_combine_sink_data; -/* This goes to the stream event listener to be able to identify the stream on - * which the event occurred and to have a link to the module data */ -struct combine_stream { - struct pw_stream *stream; - struct spa_hook stream_listener; - struct module_combine_sink_data *data; - bool cleanup; - bool started; -}; - struct module_combine_sink_data { struct module *module; struct pw_core *core; - struct pw_manager *manager; - - struct pw_stream *sink; - struct spa_hook core_listener; + struct pw_manager *manager; struct spa_hook manager_listener; - struct spa_hook sink_listener; + + struct pw_impl_module *mod; + struct spa_hook mod_listener; char *sink_name; char **sink_names; - struct combine_stream streams[MAX_SINKS]; + struct pw_properties *combine_props; - struct spa_source *cleanup; struct spa_source *sinks_timeout; struct spa_audio_info_raw info; unsigned int sinks_pending; - unsigned int source_started:1; + unsigned int remix:1; unsigned int load_emitted:1; unsigned int start_error:1; }; -/* Core connection: mainly to unload the module if the connection errors out */ -static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message) -{ - struct module_combine_sink_data *d = data; - struct module *module = d->module; - - pw_log_error("error id:%u seq:%d res:%d (%s): %s", - id, seq, res, spa_strerror(res), message); - - if (id == PW_ID_CORE && res == -EPIPE) - module_schedule_unload(module); -} - -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .error = on_core_error, -}; - -/* Input stream: the "combine sink" */ -static void capture_process(void *d) -{ - struct module_combine_sink_data *data = d; - struct pw_buffer *in; - int i; - - if ((in = pw_stream_dequeue_buffer(data->sink)) == NULL) { - pw_log_warn("out of capture buffers: %m"); - return; - } - - for (i = 0; i < MAX_SINKS; i++) { - struct pw_buffer *out; - uint32_t j; - - if (data->streams[i].stream == NULL || data->streams[i].cleanup) - continue; - - if ((out = pw_stream_dequeue_buffer(data->streams[i].stream)) == NULL) { - pw_log_warn("out of playback buffers: %m"); - continue; - } - - if (in->buffer->n_datas != out->buffer->n_datas) { - pw_log_error("incompatible buffer planes"); - continue; - } - - for (j = 0; j < out->buffer->n_datas; j++) { - struct spa_data *ds, *dd; - uint32_t outsize = 0; - int32_t stride = 0; - - dd = &out->buffer->datas[j]; - - if (j < in->buffer->n_datas) { - uint32_t offs, size; - - ds = &in->buffer->datas[j]; - - offs = SPA_MIN(ds->chunk->offset, ds->maxsize); - size = SPA_MIN(ds->chunk->size, ds->maxsize - offs); - - memcpy(dd->data, - SPA_PTROFF(ds->data, offs, void), size); - - outsize = SPA_MAX(outsize, size); - stride = SPA_MAX(stride, ds->chunk->stride); - } else { - memset(dd->data, 0, outsize); - } - dd->chunk->offset = 0; - dd->chunk->size = outsize; - dd->chunk->stride = stride; - } - - pw_stream_queue_buffer(data->streams[i].stream, out); - } - - pw_stream_queue_buffer(data->sink, in); -} - static void check_initialized(struct module_combine_sink_data *data) { struct module *module = data->module; @@ -184,174 +93,38 @@ static void check_initialized(struct module_combine_sink_data *data) pw_log_debug("module load error"); data->load_emitted = true; module_emit_loaded(module, -EIO); - } else if (data->sinks_pending == 0 && data->source_started) { + } else if (data->sinks_pending == 0) { pw_log_debug("module loaded"); data->load_emitted = true; module_emit_loaded(module, 0); } } -static void on_in_stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) -{ - struct module_combine_sink_data *data = d; - struct module *module = data->module; - uint32_t i; - - if (!data->source_started && state != PW_STREAM_STATE_CONNECTING) { - /* Input stream appears on server */ - data->source_started = true; - if (state < PW_STREAM_STATE_PAUSED) - data->start_error = true; - check_initialized(data); - } - - switch (state) { - case PW_STREAM_STATE_PAUSED: - pw_stream_flush(data->sink, false); - for (i = 0; i < MAX_SINKS; i++) { - struct combine_stream *s = &data->streams[i]; - if (s->stream == NULL || s->cleanup) - continue; - pw_stream_flush(s->stream, false); - } - break; - case PW_STREAM_STATE_UNCONNECTED: - pw_log_info("stream disconnected, unloading"); - module_schedule_unload(module); - break; - default: - break; - } -} - -static const struct pw_stream_events in_stream_events = { - PW_VERSION_STREAM_EVENTS, - .state_changed = on_in_stream_state_changed, - .process = capture_process -}; - -/* Output streams: one per sink we have combined output to */ -static void on_out_stream_state_changed(void *data, enum pw_stream_state old, - enum pw_stream_state state, const char *error) -{ - struct combine_stream *s = data; - - if (!s->started && state != PW_STREAM_STATE_CONNECTING) { - /* Output stream appears on server */ - s->started = true; - if (s->data->sinks_pending > 0) - --s->data->sinks_pending; - if (state < PW_STREAM_STATE_PAUSED) - s->data->start_error = true; - check_initialized(s->data); - } - - if (state == PW_STREAM_STATE_UNCONNECTED) { - s->cleanup = true; - pw_loop_signal_event(s->data->module->impl->loop, s->data->cleanup); - } -} - -static const struct pw_stream_events out_stream_events = { - PW_VERSION_STREAM_EVENTS, - .state_changed = on_out_stream_state_changed, -}; - static void manager_added(void *d, struct pw_manager_object *o) { struct module_combine_sink_data *data = d; - struct combine_stream *cstream; - struct pw_properties *props; - const struct spa_pod *params[1]; - const char *sink_name; - struct spa_pod_builder b; - uint32_t n_params; - char buffer[1024]; - int i, res; + const char *str; + uint32_t val = 0; + struct pw_node_info *info; - if (!pw_manager_object_is_sink(o)) + if (!spa_streq(o->type, PW_TYPE_INTERFACE_Node) || + (info = o->info) == NULL || info->props == NULL) return; - sink_name = pw_properties_get(o->props, PW_KEY_NODE_NAME); - - if (strcmp(sink_name, data->sink_name) == 0) { - /* That's the sink we created */ + str = spa_dict_lookup(info->props, "pulse.module.id"); + if (str == NULL || !spa_atou32(str, &val, 0) || val != data->module->index) return; + + pw_log_info("found our %s, pending:%d", + pw_properties_get(o->props, PW_KEY_NODE_NAME), + data->sinks_pending); + + if (!pw_manager_object_is_sink(o)) { + if (data->sinks_pending > 0) + data->sinks_pending--; } - - if (data->sink_names) { - int i; - - for (i = 0; data->sink_names[i] != NULL; i++) { - if (strcmp(data->sink_names[i], sink_name) == 0) { - i = -1; - break; - } - } - - /* This sink isn't in our list */ - if (i > -1) - return; - } - - pw_log_info("Adding %s to combine outputs", sink_name); - - for (i = 0; i < MAX_SINKS; i++) - if (data->streams[i].stream == NULL) - break; - - if (i == MAX_SINKS) { - pw_log_error("Cannot combine more than %u sinks", MAX_SINKS); - return; - } else { - cstream = &data->streams[i]; - } - - props = pw_properties_new(NULL, NULL); - pw_properties_setf(props, PW_KEY_NODE_NAME, - "combine_output.sink-%u.%s", data->module->index, sink_name); - pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, data->sink_name); - pw_properties_set(props, PW_KEY_TARGET_OBJECT, sink_name); - pw_properties_setf(props, PW_KEY_NODE_GROUP, "combine_sink-%u", data->module->index); - pw_properties_setf(props, PW_KEY_NODE_LINK_GROUP, "combine_sink-%u", data->module->index); - pw_properties_set(props, PW_KEY_NODE_DONT_RECONNECT, "true"); - pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); - pw_properties_set(props, PW_KEY_NODE_PASSIVE, "true"); - pw_properties_setf(props, "pulse.module.id", "%u", data->module->index); - - cstream->data = data; - cstream->stream = pw_stream_new(data->core, NULL, props); - if (cstream->stream == NULL) { - pw_log_error("Could not create stream"); - goto error; - } - - pw_stream_add_listener(cstream->stream, - &cstream->stream_listener, - &out_stream_events, cstream); - - n_params = 0; - b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, - &data->info); - - if ((res = pw_stream_connect(cstream->stream, - PW_DIRECTION_OUTPUT, - PW_ID_ANY, - PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, - params, n_params)) < 0) { - pw_log_error("Could not connect to sink '%s'", sink_name); - goto error; - } - - return; - -error: - data->start_error = true; check_initialized(data); + return; } static const struct pw_manager_events manager_events = { @@ -359,27 +132,6 @@ static const struct pw_manager_events manager_events = { .added = manager_added, }; -static void cleanup_stream(struct combine_stream *s) -{ - spa_hook_remove(&s->stream_listener); - pw_stream_destroy(s->stream); - - s->stream = NULL; - s->data = NULL; - s->cleanup = false; -} - -static void on_cleanup(void *d, uint64_t count) -{ - struct module_combine_sink_data *data = d; - int i; - - for (i = 0; i < MAX_SINKS; i++) { - if (data->streams[i].cleanup) - cleanup_stream(&data->streams[i]); - } -} - static void on_sinks_timeout(void *d, uint64_t count) { struct module_combine_sink_data *data = d; @@ -391,57 +143,84 @@ static void on_sinks_timeout(void *d, uint64_t count) check_initialized(data); } +static void module_destroy(void *data) +{ + struct module_combine_sink_data *d = data; + spa_hook_remove(&d->mod_listener); + d->mod = NULL; + module_schedule_unload(d->module); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy +}; + static int module_combine_sink_load(struct module *module) { struct module_combine_sink_data *data = module->user_data; - struct pw_properties *props; - int res; - uint32_t n_params; - const struct spa_pod *params[1]; - uint8_t buffer[1024]; - struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - const char *str; + uint32_t i; + FILE *f; + char *args; + size_t size; data->core = pw_context_connect(module->impl->context, NULL, 0); if (data->core == NULL) return -errno; - pw_core_add_listener(data->core, - &data->core_listener, - &core_events, data); - - props = pw_properties_new(NULL, NULL); - - pw_properties_set(props, PW_KEY_NODE_NAME, data->sink_name); - pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, data->sink_name); - pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); - pw_properties_setf(props, PW_KEY_NODE_GROUP, "combine_sink-%u", data->module->index); - pw_properties_setf(props, PW_KEY_NODE_LINK_GROUP, "combine_sink-%u", data->module->index); - pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); - pw_properties_setf(props, "pulse.module.id", "%u", module->index); - - if ((str = pw_properties_get(module->props, "sink_properties")) != NULL) - module_args_add_props(props, str); - - data->sink = pw_stream_new(data->core, data->sink_name, props); - if (data->sink == NULL) + if ((f = open_memstream(&args, &size)) == NULL) return -errno; - pw_stream_add_listener(data->sink, - &data->sink_listener, - &in_stream_events, data); + fprintf(f, "{"); + fprintf(f, " node.name = %s", data->sink_name); + fprintf(f, " node.description = %s", data->sink_name); + if (data->info.rate != 0) + fprintf(f, " audio.rate = %u", data->info.rate); + if (data->info.channels != 0) { + fprintf(f, " audio.channels = %u", data->info.channels); + if (!(data->info.flags & SPA_AUDIO_FLAG_UNPOSITIONED)) { + fprintf(f, " audio.position = [ "); + for (i = 0; i < data->info.channels; i++) + fprintf(f, "%s%s", i == 0 ? "" : ",", + channel_id2name(data->info.position[i])); + fprintf(f, " ]"); + } + } + fprintf(f, " combine.props = {"); + fprintf(f, " pulse.module.id = %u", module->index); + pw_properties_serialize_dict(f, &data->combine_props->dict, 0); + fprintf(f, " } stream.props = {"); + if (!data->remix) + fprintf(f, " "PW_KEY_STREAM_DONT_REMIX" = true"); + fprintf(f, " pulse.module.id = %u", module->index); + fprintf(f, " } stream.rules = ["); + if (data->sink_names == NULL) { + fprintf(f, " { matches = [ { media.class = \"Audio/Sink\" } ]"); + fprintf(f, " actions = { create-stream = { } } }"); + } else { + for (i = 0; data->sink_names[i] != NULL; i++) { + char name[1024]; + spa_json_encode_string(name, sizeof(name)-1, data->sink_names[i]); + fprintf(f, " { matches = [ { media.class = \"Audio/Sink\" "); + fprintf(f, " node.name = %s } ]", name); + fprintf(f, " actions = { create-stream = { } } }"); + } + } + fprintf(f, " ]"); + fprintf(f, "}"); + fclose(f); - n_params = 0; - params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, - &data->info); + data->mod = pw_context_load_module(module->impl->context, + "libpipewire-module-combine-stream", + args, NULL); + free(args); - if ((res = pw_stream_connect(data->sink, - PW_DIRECTION_INPUT, - PW_ID_ANY, - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, - params, n_params)) < 0) - return res; + if (data->mod == NULL) + return -errno; + + pw_impl_module_add_listener(data->mod, + &data->mod_listener, + &module_events, data); data->manager = pw_manager_new(data->core); if (data->manager == NULL) @@ -450,55 +229,39 @@ static int module_combine_sink_load(struct module *module) pw_manager_add_listener(data->manager, &data->manager_listener, &manager_events, data); - data->cleanup = pw_loop_add_event(module->impl->loop, on_cleanup, data); - data->sinks_timeout = pw_loop_add_timer(module->impl->loop, on_sinks_timeout, data); if (data->sinks_timeout) { - struct timespec timeout = {0}, interval = {0}; + struct timespec timeout = {0}; timeout.tv_sec = TIMEOUT_SINKS_MSEC / 1000; timeout.tv_nsec = (TIMEOUT_SINKS_MSEC % 1000) * SPA_NSEC_PER_MSEC; - pw_loop_update_timer(module->impl->loop, data->sinks_timeout, &timeout, &interval, false); + pw_loop_update_timer(module->impl->loop, data->sinks_timeout, &timeout, NULL, false); } - return data->load_emitted ? 0 : SPA_RESULT_RETURN_ASYNC(0); } static int module_combine_sink_unload(struct module *module) { struct module_combine_sink_data *d = module->user_data; - int i; - - if (d->cleanup != NULL) - pw_loop_destroy_source(module->impl->loop, d->cleanup); if (d->sinks_timeout != NULL) pw_loop_destroy_source(module->impl->loop, d->sinks_timeout); - /* Note that we explicitly disconnect the hooks to avoid having the - * cleanup triggered again in those callbacks */ - if (d->sink != NULL) { - spa_hook_remove(&d->sink_listener); - pw_stream_destroy(d->sink); + if (d->mod != NULL) { + spa_hook_remove(&d->mod_listener); + pw_impl_module_destroy(d->mod); + d->mod = NULL; } - - for (i = 0; i < MAX_SINKS; i++) { - if (d->streams[i].stream) - cleanup_stream(&d->streams[i]); - } - if (d->manager != NULL) { spa_hook_remove(&d->manager_listener); pw_manager_destroy(d->manager); } - if (d->core != NULL) { spa_hook_remove(&d->core_listener); pw_core_disconnect(d->core); } - pw_free_strv(d->sink_names); free(d->sink_name); - + pw_properties_free(d->combine_props); return 0; } @@ -506,14 +269,17 @@ static int module_combine_sink_prepare(struct module * const module) { struct module_combine_sink_data * const d = module->user_data; struct pw_properties * const props = module->props; + struct pw_properties *combine_props = NULL; const char *str; char *sink_name = NULL, **sink_names = NULL; struct spa_audio_info_raw info = { 0 }; - int i, res; + int res; int num_sinks = 0; PW_LOG_TOPIC_INIT(mod_topic); + combine_props = pw_properties_new(NULL, NULL); + if ((str = pw_properties_get(props, "sink_name")) != NULL) { sink_name = strdup(str); pw_properties_set(props, "sink_name", NULL); @@ -521,10 +287,18 @@ static int module_combine_sink_prepare(struct module * const module) sink_name = strdup("combined"); } + if ((str = pw_properties_get(module->props, "sink_properties")) != NULL) + module_args_add_props(combine_props, str); + if ((str = pw_properties_get(props, "slaves")) != NULL) { sink_names = pw_split_strv(str, ",", MAX_SINKS, &num_sinks); pw_properties_set(props, "slaves", NULL); } + d->remix = true; + if ((str = pw_properties_get(props, "remix")) != NULL) { + d->remix = pw_properties_parse_bool(str); + pw_properties_set(props, "remix", NULL); + } if ((str = pw_properties_get(props, "adjust_time")) != NULL) { pw_log_info("The `adjust_time` modarg is ignored"); @@ -546,15 +320,13 @@ static int module_combine_sink_prepare(struct module * const module) d->sink_name = sink_name; d->sink_names = sink_names; d->sinks_pending = (sink_names == NULL) ? 0 : num_sinks; - for (i = 0; i < MAX_SINKS; i++) { - d->streams[i].stream = NULL; - d->streams[i].cleanup = false; - } + d->combine_props = combine_props; return 0; out: free(sink_name); pw_free_strv(sink_names); + pw_properties_free(combine_props); return res; }