From 6c772a18430584e2b8c548c56b4a4ce83c53071c Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 30 Nov 2023 10:40:37 +0100 Subject: [PATCH] module-pulse-tunnel: don't block the main thread Do the pulse context and stream connect async so that we don't have to block the main thread. Fixes #3221 --- src/modules/module-pulse-tunnel.c | 303 +++++++++++++++--------------- 1 file changed, 150 insertions(+), 153 deletions(-) diff --git a/src/modules/module-pulse-tunnel.c b/src/modules/module-pulse-tunnel.c index a11936f57..310c7b51f 100644 --- a/src/modules/module-pulse-tunnel.c +++ b/src/modules/module-pulse-tunnel.c @@ -518,42 +518,37 @@ static void module_schedule_destroy(struct impl *impl) pw_loop_invoke(impl->main_loop, do_schedule_destroy, 1, NULL, 0, false, impl); } -static void context_state_cb(pa_context *c, void *userdata) +static int +do_create_stream(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) { - struct impl *impl = userdata; - bool do_destroy = false; - switch (pa_context_get_state(c)) { - case PA_CONTEXT_TERMINATED: - case PA_CONTEXT_FAILED: - do_destroy = true; - SPA_FALLTHROUGH; - case PA_CONTEXT_READY: - pa_threaded_mainloop_signal(impl->pa_mainloop, 0); - break; - case PA_CONTEXT_UNCONNECTED: - do_destroy = true; - break; - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; + struct impl *impl = user_data; + int res; + if (impl->stream == NULL) { + if ((res = create_stream(impl)) < 0) { + pw_log_error("failed to create stream: %s", spa_strerror(res)); + do_schedule_destroy(loop, async, seq, NULL, 0, user_data); + } } - if (do_destroy) - module_schedule_destroy(impl); + return 0; } static void stream_state_cb(pa_stream *s, void * userdata) { struct impl *impl = userdata; bool do_destroy = false; - switch (pa_stream_get_state(s)) { + pa_stream_state_t state = pa_stream_get_state(s); + + pw_log_debug("stream state %d", state); + + switch (state) { case PA_STREAM_FAILED: case PA_STREAM_TERMINATED: do_destroy = true; SPA_FALLTHROUGH; case PA_STREAM_READY: impl->pa_index = pa_stream_get_index(impl->pa_stream); - pa_threaded_mainloop_signal(impl->pa_mainloop, 0); + pw_loop_invoke(impl->main_loop, do_create_stream, 1, NULL, 0, false, impl); break; case PA_STREAM_UNCONNECTED: do_destroy = true; @@ -697,14 +692,96 @@ static void stream_overflow_cb(pa_stream *s, void *userdata) static void stream_latency_update_cb(pa_stream *s, void *userdata) { - struct impl *impl = userdata; pa_usec_t usec; int negative; - pa_stream_get_latency(s, &usec, &negative); - pw_log_debug("latency %ld negative %d", usec, negative); - pa_threaded_mainloop_signal(impl->pa_mainloop, 0); +} + +static int create_pulse_stream(struct impl *impl) +{ + pa_sample_spec ss; + pa_channel_map map; + uint32_t latency_bytes, i, aux = 0; + const char *remote_node_target; + char stream_name[1024]; + pa_buffer_attr bufferattr; + int err = PA_ERR_IO; + + ss.format = (pa_sample_format_t) format_id2pa(impl->info.format); + ss.channels = impl->info.channels; + ss.rate = impl->info.rate; + + map.channels = impl->info.channels; + for (i = 0; i < map.channels; i++) + map.map[i] = (pa_channel_position_t)channel_id2pa(impl->info.position[i], &aux); + + snprintf(stream_name, sizeof(stream_name), _("Tunnel for %s@%s"), + pw_get_user_name(), pw_get_host_name()); + + pw_log_info("create stream %s", stream_name); + + if (!(impl->pa_stream = pa_stream_new(impl->pa_context, stream_name, &ss, &map))) { + err = pa_context_errno(impl->pa_context); + goto exit; + } + + pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl); + pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl); + pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl); + pa_stream_set_underflow_callback(impl->pa_stream, stream_underflow_cb, impl); + pa_stream_set_overflow_callback(impl->pa_stream, stream_overflow_cb, impl); + pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl); + + remote_node_target = pw_properties_get(impl->props, PW_KEY_TARGET_OBJECT); + + bufferattr.fragsize = (uint32_t) -1; + bufferattr.minreq = (uint32_t) -1; + bufferattr.maxlength = (uint32_t) -1; + bufferattr.prebuf = (uint32_t) -1; + + latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); + + impl->target_latency = latency_bytes / impl->frame_size; + + /* half in our buffer, half in the network + remote */ + impl->target_buffer = latency_bytes / 2; + + if (impl->mode == MODE_SOURCE) { + bufferattr.fragsize = latency_bytes / 2; + + pa_context_subscribe(impl->pa_context, + PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT, NULL, impl); + + if ((err = pa_stream_connect_record(impl->pa_stream, + remote_node_target, &bufferattr, + PA_STREAM_DONT_MOVE | + PA_STREAM_INTERPOLATE_TIMING | + PA_STREAM_ADJUST_LATENCY | + PA_STREAM_AUTO_TIMING_UPDATE)) != 0) + err = pa_context_errno(impl->pa_context); + } else { + bufferattr.tlength = latency_bytes / 2; + bufferattr.minreq = bufferattr.tlength / 4; + bufferattr.prebuf = bufferattr.tlength; + + pa_context_subscribe(impl->pa_context, + PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, impl); + + if ((err = pa_stream_connect_playback(impl->pa_stream, + remote_node_target, &bufferattr, + PA_STREAM_DONT_MOVE | + PA_STREAM_INTERPOLATE_TIMING | + PA_STREAM_ADJUST_LATENCY | + PA_STREAM_AUTO_TIMING_UPDATE, + NULL, NULL)) != 0) + err = pa_context_errno(impl->pa_context); + } + +exit: + if (err != PA_OK) + pw_log_error("failed to create stream: %s", pa_strerror(err)); + return err_to_res(err); } static int @@ -779,6 +856,36 @@ static void context_subscribe_cb(pa_context *c, pa_subscription_event_type_t t, idx, sink_input_info_cb, impl); } +static void context_state_cb(pa_context *c, void *userdata) +{ + struct impl *impl = userdata; + bool do_destroy = false; + pa_context_state_t state = pa_context_get_state(c); + + pw_log_debug("state %d", state); + + switch (state) { + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + do_destroy = true; + SPA_FALLTHROUGH; + case PA_CONTEXT_READY: + if (impl->pa_stream == NULL) + if (create_pulse_stream(impl) < 0) + do_destroy = true; + break; + case PA_CONTEXT_UNCONNECTED: + do_destroy = true; + break; + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + } + if (do_destroy) + module_schedule_destroy(impl); +} + static pa_proplist* tunnel_new_proplist(struct impl *impl) { pa_proplist *proplist = pa_proplist_new(); @@ -788,20 +895,15 @@ static pa_proplist* tunnel_new_proplist(struct impl *impl) return proplist; } -static int create_pulse_stream(struct impl *impl) +static int start_pulse_connection(struct impl *impl) { - pa_sample_spec ss; - pa_channel_map map; - const char *server_address, *remote_node_target; + const char *server_address; pa_proplist *props = NULL; pa_mainloop_api *api; - char stream_name[1024]; - pa_buffer_attr bufferattr; - int res = -EIO; - uint32_t latency_bytes, i, aux = 0; + int err = PA_ERR_IO; if ((impl->pa_mainloop = pa_threaded_mainloop_new()) == NULL) - goto error; + goto exit; api = pa_threaded_mainloop_get_api(impl->pa_mainloop); @@ -810,15 +912,17 @@ static int create_pulse_stream(struct impl *impl) pa_proplist_free(props); if (impl->pa_context == NULL) - goto error; + goto exit; pa_context_set_state_callback(impl->pa_context, context_state_cb, impl); server_address = pw_properties_get(impl->props, "pulse.server.address"); + pw_log_info("connecting to %s...", server_address); + if (pa_context_connect(impl->pa_context, server_address, 0, NULL) < 0) { - res = pa_context_errno(impl->pa_context); - goto error; + err = pa_context_errno(impl->pa_context); + goto exit; } pa_threaded_mainloop_lock(impl->pa_mainloop); @@ -826,122 +930,18 @@ static int create_pulse_stream(struct impl *impl) pa_context_set_subscribe_callback(impl->pa_context, context_subscribe_cb, impl); if (pa_threaded_mainloop_start(impl->pa_mainloop) < 0) - goto error_unlock; + goto exit_unlock; - for (;;) { - pa_context_state_t state; - - state = pa_context_get_state(impl->pa_context); - if (state == PA_CONTEXT_READY) - break; - - if (!PA_CONTEXT_IS_GOOD(state)) { - res = pa_context_errno(impl->pa_context); - goto error_unlock; - } - /* Wait until the context is ready */ - pa_threaded_mainloop_wait(impl->pa_mainloop); - } - - ss.format = (pa_sample_format_t) format_id2pa(impl->info.format); - ss.channels = impl->info.channels; - ss.rate = impl->info.rate; - - map.channels = impl->info.channels; - for (i = 0; i < map.channels; i++) - map.map[i] = (pa_channel_position_t)channel_id2pa(impl->info.position[i], &aux); - - snprintf(stream_name, sizeof(stream_name), _("Tunnel for %s@%s"), - pw_get_user_name(), pw_get_host_name()); - - if (!(impl->pa_stream = pa_stream_new(impl->pa_context, stream_name, &ss, &map))) { - res = pa_context_errno(impl->pa_context); - goto error_unlock; - } - - pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl); - pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl); - pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl); - pa_stream_set_underflow_callback(impl->pa_stream, stream_underflow_cb, impl); - pa_stream_set_overflow_callback(impl->pa_stream, stream_overflow_cb, impl); - pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl); - - remote_node_target = pw_properties_get(impl->props, PW_KEY_TARGET_OBJECT); - - bufferattr.fragsize = (uint32_t) -1; - bufferattr.minreq = (uint32_t) -1; - bufferattr.maxlength = (uint32_t) -1; - bufferattr.prebuf = (uint32_t) -1; - - latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); - - impl->target_latency = latency_bytes / impl->frame_size; - - /* half in our buffer, half in the network + remote */ - impl->target_buffer = latency_bytes / 2; - - if (impl->mode == MODE_SOURCE) { - bufferattr.fragsize = latency_bytes / 2; - - pa_context_subscribe(impl->pa_context, - PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT, NULL, impl); - - res = pa_stream_connect_record(impl->pa_stream, - remote_node_target, &bufferattr, - PA_STREAM_DONT_MOVE | - PA_STREAM_INTERPOLATE_TIMING | - PA_STREAM_ADJUST_LATENCY | - PA_STREAM_AUTO_TIMING_UPDATE); - } else { - bufferattr.tlength = latency_bytes / 2; - bufferattr.minreq = bufferattr.tlength / 4; - bufferattr.prebuf = bufferattr.tlength; - - pa_context_subscribe(impl->pa_context, - PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, impl); - - res = pa_stream_connect_playback(impl->pa_stream, - remote_node_target, &bufferattr, - PA_STREAM_DONT_MOVE | - PA_STREAM_INTERPOLATE_TIMING | - PA_STREAM_ADJUST_LATENCY | - PA_STREAM_AUTO_TIMING_UPDATE, - NULL, NULL); - } - - if (res < 0) { - res = pa_context_errno(impl->pa_context); - goto error_unlock; - } - - for (;;) { - pa_stream_state_t state; - - state = pa_stream_get_state(impl->pa_stream); - if (state == PA_STREAM_READY) - break; - - if (!PA_STREAM_IS_GOOD(state)) { - res = pa_context_errno(impl->pa_context); - goto error_unlock; - } - - /* Wait until the stream is ready */ - pa_threaded_mainloop_wait(impl->pa_mainloop); - } + err = PA_OK; +exit_unlock: pa_threaded_mainloop_unlock(impl->pa_mainloop); - - return 0; - -error_unlock: - pa_threaded_mainloop_unlock(impl->pa_mainloop); -error: - pw_log_error("failed to connect: %s", pa_strerror(res)); - return err_to_res(res); +exit: + if (err != PA_OK) + pw_log_error("failed to connect: %s", pa_strerror(err)); + return err_to_res(err); } - static void core_error(void *data, uint32_t id, int seq, int res, const char *message) { struct impl *impl = data; @@ -1234,10 +1234,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - if ((res = create_pulse_stream(impl)) < 0) - goto error; - - if ((res = create_stream(impl)) < 0) + if ((res = start_pulse_connection(impl)) < 0) goto error; pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);