module-pulse-tunnel: don't block the main thread

Do the pulse context and stream connect async so that we don't have to
block the main thread.

Fixes #3221
This commit is contained in:
Wim Taymans 2023-11-30 10:40:37 +01:00
parent 6ae9698ebc
commit 6c772a1843

View file

@ -518,42 +518,37 @@ static void module_schedule_destroy(struct impl *impl)
pw_loop_invoke(impl->main_loop, do_schedule_destroy, 1, NULL, 0, false, impl);
}
static void context_state_cb(pa_context *c, void *userdata)
static int
do_create_stream(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *impl = userdata;
bool do_destroy = false;
switch (pa_context_get_state(c)) {
case PA_CONTEXT_TERMINATED:
case PA_CONTEXT_FAILED:
do_destroy = true;
SPA_FALLTHROUGH;
case PA_CONTEXT_READY:
pa_threaded_mainloop_signal(impl->pa_mainloop, 0);
break;
case PA_CONTEXT_UNCONNECTED:
do_destroy = true;
break;
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
case PA_CONTEXT_SETTING_NAME:
break;
struct impl *impl = user_data;
int res;
if (impl->stream == NULL) {
if ((res = create_stream(impl)) < 0) {
pw_log_error("failed to create stream: %s", spa_strerror(res));
do_schedule_destroy(loop, async, seq, NULL, 0, user_data);
}
}
if (do_destroy)
module_schedule_destroy(impl);
return 0;
}
static void stream_state_cb(pa_stream *s, void * userdata)
{
struct impl *impl = userdata;
bool do_destroy = false;
switch (pa_stream_get_state(s)) {
pa_stream_state_t state = pa_stream_get_state(s);
pw_log_debug("stream state %d", state);
switch (state) {
case PA_STREAM_FAILED:
case PA_STREAM_TERMINATED:
do_destroy = true;
SPA_FALLTHROUGH;
case PA_STREAM_READY:
impl->pa_index = pa_stream_get_index(impl->pa_stream);
pa_threaded_mainloop_signal(impl->pa_mainloop, 0);
pw_loop_invoke(impl->main_loop, do_create_stream, 1, NULL, 0, false, impl);
break;
case PA_STREAM_UNCONNECTED:
do_destroy = true;
@ -697,14 +692,96 @@ static void stream_overflow_cb(pa_stream *s, void *userdata)
static void stream_latency_update_cb(pa_stream *s, void *userdata)
{
struct impl *impl = userdata;
pa_usec_t usec;
int negative;
pa_stream_get_latency(s, &usec, &negative);
pw_log_debug("latency %ld negative %d", usec, negative);
pa_threaded_mainloop_signal(impl->pa_mainloop, 0);
}
static int create_pulse_stream(struct impl *impl)
{
pa_sample_spec ss;
pa_channel_map map;
uint32_t latency_bytes, i, aux = 0;
const char *remote_node_target;
char stream_name[1024];
pa_buffer_attr bufferattr;
int err = PA_ERR_IO;
ss.format = (pa_sample_format_t) format_id2pa(impl->info.format);
ss.channels = impl->info.channels;
ss.rate = impl->info.rate;
map.channels = impl->info.channels;
for (i = 0; i < map.channels; i++)
map.map[i] = (pa_channel_position_t)channel_id2pa(impl->info.position[i], &aux);
snprintf(stream_name, sizeof(stream_name), _("Tunnel for %s@%s"),
pw_get_user_name(), pw_get_host_name());
pw_log_info("create stream %s", stream_name);
if (!(impl->pa_stream = pa_stream_new(impl->pa_context, stream_name, &ss, &map))) {
err = pa_context_errno(impl->pa_context);
goto exit;
}
pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl);
pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl);
pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl);
pa_stream_set_underflow_callback(impl->pa_stream, stream_underflow_cb, impl);
pa_stream_set_overflow_callback(impl->pa_stream, stream_overflow_cb, impl);
pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl);
remote_node_target = pw_properties_get(impl->props, PW_KEY_TARGET_OBJECT);
bufferattr.fragsize = (uint32_t) -1;
bufferattr.minreq = (uint32_t) -1;
bufferattr.maxlength = (uint32_t) -1;
bufferattr.prebuf = (uint32_t) -1;
latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss);
impl->target_latency = latency_bytes / impl->frame_size;
/* half in our buffer, half in the network + remote */
impl->target_buffer = latency_bytes / 2;
if (impl->mode == MODE_SOURCE) {
bufferattr.fragsize = latency_bytes / 2;
pa_context_subscribe(impl->pa_context,
PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT, NULL, impl);
if ((err = pa_stream_connect_record(impl->pa_stream,
remote_node_target, &bufferattr,
PA_STREAM_DONT_MOVE |
PA_STREAM_INTERPOLATE_TIMING |
PA_STREAM_ADJUST_LATENCY |
PA_STREAM_AUTO_TIMING_UPDATE)) != 0)
err = pa_context_errno(impl->pa_context);
} else {
bufferattr.tlength = latency_bytes / 2;
bufferattr.minreq = bufferattr.tlength / 4;
bufferattr.prebuf = bufferattr.tlength;
pa_context_subscribe(impl->pa_context,
PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, impl);
if ((err = pa_stream_connect_playback(impl->pa_stream,
remote_node_target, &bufferattr,
PA_STREAM_DONT_MOVE |
PA_STREAM_INTERPOLATE_TIMING |
PA_STREAM_ADJUST_LATENCY |
PA_STREAM_AUTO_TIMING_UPDATE,
NULL, NULL)) != 0)
err = pa_context_errno(impl->pa_context);
}
exit:
if (err != PA_OK)
pw_log_error("failed to create stream: %s", pa_strerror(err));
return err_to_res(err);
}
static int
@ -779,6 +856,36 @@ static void context_subscribe_cb(pa_context *c, pa_subscription_event_type_t t,
idx, sink_input_info_cb, impl);
}
static void context_state_cb(pa_context *c, void *userdata)
{
struct impl *impl = userdata;
bool do_destroy = false;
pa_context_state_t state = pa_context_get_state(c);
pw_log_debug("state %d", state);
switch (state) {
case PA_CONTEXT_TERMINATED:
case PA_CONTEXT_FAILED:
do_destroy = true;
SPA_FALLTHROUGH;
case PA_CONTEXT_READY:
if (impl->pa_stream == NULL)
if (create_pulse_stream(impl) < 0)
do_destroy = true;
break;
case PA_CONTEXT_UNCONNECTED:
do_destroy = true;
break;
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
case PA_CONTEXT_SETTING_NAME:
break;
}
if (do_destroy)
module_schedule_destroy(impl);
}
static pa_proplist* tunnel_new_proplist(struct impl *impl)
{
pa_proplist *proplist = pa_proplist_new();
@ -788,20 +895,15 @@ static pa_proplist* tunnel_new_proplist(struct impl *impl)
return proplist;
}
static int create_pulse_stream(struct impl *impl)
static int start_pulse_connection(struct impl *impl)
{
pa_sample_spec ss;
pa_channel_map map;
const char *server_address, *remote_node_target;
const char *server_address;
pa_proplist *props = NULL;
pa_mainloop_api *api;
char stream_name[1024];
pa_buffer_attr bufferattr;
int res = -EIO;
uint32_t latency_bytes, i, aux = 0;
int err = PA_ERR_IO;
if ((impl->pa_mainloop = pa_threaded_mainloop_new()) == NULL)
goto error;
goto exit;
api = pa_threaded_mainloop_get_api(impl->pa_mainloop);
@ -810,15 +912,17 @@ static int create_pulse_stream(struct impl *impl)
pa_proplist_free(props);
if (impl->pa_context == NULL)
goto error;
goto exit;
pa_context_set_state_callback(impl->pa_context, context_state_cb, impl);
server_address = pw_properties_get(impl->props, "pulse.server.address");
pw_log_info("connecting to %s...", server_address);
if (pa_context_connect(impl->pa_context, server_address, 0, NULL) < 0) {
res = pa_context_errno(impl->pa_context);
goto error;
err = pa_context_errno(impl->pa_context);
goto exit;
}
pa_threaded_mainloop_lock(impl->pa_mainloop);
@ -826,122 +930,18 @@ static int create_pulse_stream(struct impl *impl)
pa_context_set_subscribe_callback(impl->pa_context, context_subscribe_cb, impl);
if (pa_threaded_mainloop_start(impl->pa_mainloop) < 0)
goto error_unlock;
goto exit_unlock;
for (;;) {
pa_context_state_t state;
state = pa_context_get_state(impl->pa_context);
if (state == PA_CONTEXT_READY)
break;
if (!PA_CONTEXT_IS_GOOD(state)) {
res = pa_context_errno(impl->pa_context);
goto error_unlock;
}
/* Wait until the context is ready */
pa_threaded_mainloop_wait(impl->pa_mainloop);
}
ss.format = (pa_sample_format_t) format_id2pa(impl->info.format);
ss.channels = impl->info.channels;
ss.rate = impl->info.rate;
map.channels = impl->info.channels;
for (i = 0; i < map.channels; i++)
map.map[i] = (pa_channel_position_t)channel_id2pa(impl->info.position[i], &aux);
snprintf(stream_name, sizeof(stream_name), _("Tunnel for %s@%s"),
pw_get_user_name(), pw_get_host_name());
if (!(impl->pa_stream = pa_stream_new(impl->pa_context, stream_name, &ss, &map))) {
res = pa_context_errno(impl->pa_context);
goto error_unlock;
}
pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl);
pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl);
pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl);
pa_stream_set_underflow_callback(impl->pa_stream, stream_underflow_cb, impl);
pa_stream_set_overflow_callback(impl->pa_stream, stream_overflow_cb, impl);
pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl);
remote_node_target = pw_properties_get(impl->props, PW_KEY_TARGET_OBJECT);
bufferattr.fragsize = (uint32_t) -1;
bufferattr.minreq = (uint32_t) -1;
bufferattr.maxlength = (uint32_t) -1;
bufferattr.prebuf = (uint32_t) -1;
latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss);
impl->target_latency = latency_bytes / impl->frame_size;
/* half in our buffer, half in the network + remote */
impl->target_buffer = latency_bytes / 2;
if (impl->mode == MODE_SOURCE) {
bufferattr.fragsize = latency_bytes / 2;
pa_context_subscribe(impl->pa_context,
PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT, NULL, impl);
res = pa_stream_connect_record(impl->pa_stream,
remote_node_target, &bufferattr,
PA_STREAM_DONT_MOVE |
PA_STREAM_INTERPOLATE_TIMING |
PA_STREAM_ADJUST_LATENCY |
PA_STREAM_AUTO_TIMING_UPDATE);
} else {
bufferattr.tlength = latency_bytes / 2;
bufferattr.minreq = bufferattr.tlength / 4;
bufferattr.prebuf = bufferattr.tlength;
pa_context_subscribe(impl->pa_context,
PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, impl);
res = pa_stream_connect_playback(impl->pa_stream,
remote_node_target, &bufferattr,
PA_STREAM_DONT_MOVE |
PA_STREAM_INTERPOLATE_TIMING |
PA_STREAM_ADJUST_LATENCY |
PA_STREAM_AUTO_TIMING_UPDATE,
NULL, NULL);
}
if (res < 0) {
res = pa_context_errno(impl->pa_context);
goto error_unlock;
}
for (;;) {
pa_stream_state_t state;
state = pa_stream_get_state(impl->pa_stream);
if (state == PA_STREAM_READY)
break;
if (!PA_STREAM_IS_GOOD(state)) {
res = pa_context_errno(impl->pa_context);
goto error_unlock;
}
/* Wait until the stream is ready */
pa_threaded_mainloop_wait(impl->pa_mainloop);
}
err = PA_OK;
exit_unlock:
pa_threaded_mainloop_unlock(impl->pa_mainloop);
return 0;
error_unlock:
pa_threaded_mainloop_unlock(impl->pa_mainloop);
error:
pw_log_error("failed to connect: %s", pa_strerror(res));
return err_to_res(res);
exit:
if (err != PA_OK)
pw_log_error("failed to connect: %s", pa_strerror(err));
return err_to_res(err);
}
static void core_error(void *data, uint32_t id, int seq, int res, const char *message)
{
struct impl *impl = data;
@ -1234,10 +1234,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener,
&core_events, impl);
if ((res = create_pulse_stream(impl)) < 0)
goto error;
if ((res = create_stream(impl)) < 0)
if ((res = start_pulse_connection(impl)) < 0)
goto error;
pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);