mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
pulse-server/combine-sink: mark loaded after streams appear
module-combine-sink should become loaded only after its input stream, and all explicitly specified output sink stream nodes appear. Since output sinks may appear on the manager after a delay, wait for them only up to a timeout, and fail load after that. Fail load if there are errors in input stream, or in an explicitly specified output. This is needed for Zoom sound sharing to work. Pulseaudio additionally fails the module load if streams fail to connect, but we do not do that here at the moment.
This commit is contained in:
parent
77e50d2dfe
commit
2e3c749a01
1 changed files with 80 additions and 5 deletions
|
|
@ -39,6 +39,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
|
|||
|
||||
#define MAX_SINKS 64 /* ... good enough for anyone */
|
||||
|
||||
#define TIMEOUT_SINKS_MSEC 2000
|
||||
|
||||
static const struct spa_dict_item module_combine_sink_info[] = {
|
||||
{ PW_KEY_MODULE_AUTHOR, "Arun Raghavan <arun@asymptotic.io>" },
|
||||
{ PW_KEY_MODULE_DESCRIPTION, "Combine multiple sinks into a single sink" },
|
||||
|
|
@ -61,6 +63,7 @@ struct combine_stream {
|
|||
struct spa_hook stream_listener;
|
||||
struct module_combine_sink_data *data;
|
||||
bool cleanup;
|
||||
bool started;
|
||||
};
|
||||
|
||||
struct module_combine_sink_data {
|
||||
|
|
@ -80,8 +83,14 @@ struct module_combine_sink_data {
|
|||
struct combine_stream streams[MAX_SINKS];
|
||||
|
||||
struct spa_source *cleanup;
|
||||
struct spa_source *sinks_timeout;
|
||||
|
||||
struct spa_audio_info_raw info;
|
||||
|
||||
unsigned int sinks_pending;
|
||||
unsigned int source_started:1;
|
||||
unsigned int load_emitted:1;
|
||||
unsigned int start_error:1;
|
||||
};
|
||||
|
||||
/* Core connection: mainly to unload the module if the connection errors out */
|
||||
|
|
@ -161,6 +170,24 @@ static void capture_process(void *d)
|
|||
pw_stream_queue_buffer(data->sink, in);
|
||||
}
|
||||
|
||||
static void check_initialized(struct module_combine_sink_data *data)
|
||||
{
|
||||
struct module *module = data->module;
|
||||
|
||||
if (data->load_emitted)
|
||||
return;
|
||||
|
||||
if (data->start_error) {
|
||||
pw_log_debug("module load error");
|
||||
data->load_emitted = true;
|
||||
module_emit_loaded(module, -EIO);
|
||||
} else if (data->sinks_pending == 0 && data->source_started) {
|
||||
pw_log_debug("module loaded");
|
||||
data->load_emitted = true;
|
||||
module_emit_loaded(module, 0);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_in_stream_state_changed(void *d, enum pw_stream_state old,
|
||||
enum pw_stream_state state, const char *error)
|
||||
{
|
||||
|
|
@ -168,6 +195,14 @@ static void on_in_stream_state_changed(void *d, enum pw_stream_state old,
|
|||
struct module *module = data->module;
|
||||
uint32_t i;
|
||||
|
||||
if (!data->source_started && state != PW_STREAM_STATE_CONNECTING) {
|
||||
/* Input stream appears on server */
|
||||
data->source_started = true;
|
||||
if (state < PW_STREAM_STATE_PAUSED)
|
||||
data->start_error = true;
|
||||
check_initialized(data);
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case PW_STREAM_STATE_PAUSED:
|
||||
pw_stream_flush(data->sink, false);
|
||||
|
|
@ -199,6 +234,16 @@ static void on_out_stream_state_changed(void *data, enum pw_stream_state old,
|
|||
{
|
||||
struct combine_stream *s = data;
|
||||
|
||||
if (!s->started && state != PW_STREAM_STATE_CONNECTING) {
|
||||
/* Output stream appears on server */
|
||||
s->started = true;
|
||||
if (s->data->sinks_pending > 0)
|
||||
--s->data->sinks_pending;
|
||||
if (state < PW_STREAM_STATE_PAUSED)
|
||||
s->data->start_error = true;
|
||||
check_initialized(s->data);
|
||||
}
|
||||
|
||||
if (state == PW_STREAM_STATE_UNCONNECTED) {
|
||||
s->cleanup = true;
|
||||
pw_loop_signal_event(s->data->module->impl->loop, s->data->cleanup);
|
||||
|
|
@ -276,7 +321,7 @@ static void manager_added(void *d, struct pw_manager_object *o)
|
|||
cstream->stream = pw_stream_new(data->core, NULL, props);
|
||||
if (cstream->stream == NULL) {
|
||||
pw_log_error("Could not create stream");
|
||||
return;
|
||||
goto error;
|
||||
}
|
||||
|
||||
pw_stream_add_listener(cstream->stream,
|
||||
|
|
@ -296,8 +341,14 @@ static void manager_added(void *d, struct pw_manager_object *o)
|
|||
PW_STREAM_FLAG_RT_PROCESS,
|
||||
params, n_params)) < 0) {
|
||||
pw_log_error("Could not connect to sink '%s'", sink_name);
|
||||
return;
|
||||
goto error;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
data->start_error = true;
|
||||
check_initialized(data);
|
||||
}
|
||||
|
||||
static const struct pw_manager_events manager_events = {
|
||||
|
|
@ -326,6 +377,17 @@ static void on_cleanup(void *d, uint64_t count)
|
|||
}
|
||||
}
|
||||
|
||||
static void on_sinks_timeout(void *d, uint64_t count)
|
||||
{
|
||||
struct module_combine_sink_data *data = d;
|
||||
|
||||
if (data->load_emitted)
|
||||
return;
|
||||
|
||||
data->start_error = true;
|
||||
check_initialized(data);
|
||||
}
|
||||
|
||||
static int module_combine_sink_load(struct client *client, struct module *module)
|
||||
{
|
||||
struct module_combine_sink_data *data = module->user_data;
|
||||
|
|
@ -389,7 +451,15 @@ static int module_combine_sink_load(struct client *client, struct module *module
|
|||
|
||||
data->cleanup = pw_loop_add_event(module->impl->loop, on_cleanup, data);
|
||||
|
||||
return 0;
|
||||
data->sinks_timeout = pw_loop_add_timer(module->impl->loop, on_sinks_timeout, data);
|
||||
if (data->sinks_timeout) {
|
||||
struct timespec timeout = {0}, interval = {0};
|
||||
timeout.tv_sec = TIMEOUT_SINKS_MSEC / 1000;
|
||||
timeout.tv_nsec = (TIMEOUT_SINKS_MSEC % 1000) * SPA_NSEC_PER_MSEC;
|
||||
pw_loop_update_timer(module->impl->loop, data->sinks_timeout, &timeout, &interval, false);
|
||||
}
|
||||
|
||||
return data->load_emitted ? 0 : SPA_RESULT_RETURN_ASYNC(0);
|
||||
}
|
||||
|
||||
static int module_combine_sink_unload(struct module *module)
|
||||
|
|
@ -400,6 +470,9 @@ static int module_combine_sink_unload(struct module *module)
|
|||
if (d->cleanup != NULL)
|
||||
pw_loop_destroy_source(module->impl->loop, d->cleanup);
|
||||
|
||||
if (d->sinks_timeout != NULL)
|
||||
pw_loop_destroy_source(module->impl->loop, d->sinks_timeout);
|
||||
|
||||
/* Note that we explicitly disconnect the hooks to avoid having the
|
||||
* cleanup triggered again in those callbacks */
|
||||
if (d->sink != NULL) {
|
||||
|
|
@ -442,7 +515,8 @@ struct module *create_module_combine_sink(struct impl *impl, const char *argumen
|
|||
const char *str;
|
||||
char *sink_name = NULL, **sink_names = NULL;
|
||||
struct spa_audio_info_raw info = { 0 };
|
||||
int i, n, res;
|
||||
int i, res;
|
||||
int num_sinks = 0;
|
||||
|
||||
PW_LOG_TOPIC_INIT(mod_topic);
|
||||
|
||||
|
|
@ -462,7 +536,7 @@ struct module *create_module_combine_sink(struct impl *impl, const char *argumen
|
|||
}
|
||||
|
||||
if ((str = pw_properties_get(props, "slaves")) != NULL) {
|
||||
sink_names = pw_split_strv(str, ",", MAX_SINKS, &n);
|
||||
sink_names = pw_split_strv(str, ",", MAX_SINKS, &num_sinks);
|
||||
pw_properties_set(props, "slaves", NULL);
|
||||
}
|
||||
|
||||
|
|
@ -493,6 +567,7 @@ struct module *create_module_combine_sink(struct impl *impl, const char *argumen
|
|||
d->info = info;
|
||||
d->sink_name = sink_name;
|
||||
d->sink_names = sink_names;
|
||||
d->sinks_pending = (sink_names == NULL) ? 0 : num_sinks;
|
||||
for (i = 0; i < MAX_SINKS; i++) {
|
||||
d->streams[i].stream = NULL;
|
||||
d->streams[i].cleanup = false;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue