diff --git a/src/modules/module-protocol-pulse/modules/module-tunnel-sink.c b/src/modules/module-protocol-pulse/modules/module-tunnel-sink.c index b779a3e56..59de3c6c2 100644 --- a/src/modules/module-protocol-pulse/modules/module-tunnel-sink.c +++ b/src/modules/module-protocol-pulse/modules/module-tunnel-sink.c @@ -32,6 +32,7 @@ static const char *const pulse_module_options = "sink= " "sink_name= " "sink_properties= " + "reconnect_interval_ms= " "format= " "channels= " "rate= " @@ -181,6 +182,10 @@ static int module_tunnel_sink_prepare(struct module * const module) } audioinfo_to_properties(&info, stream_props); + if ((str = pw_properties_get(props, "reconnect_interval_ms")) != NULL) { + pw_properties_set(props, "reconnect.interval.ms", str); + pw_properties_set(props, "reconnect_interval_ms", NULL); + } if ((str = pw_properties_get(props, "latency_msec")) != NULL) { pw_properties_set(props, "pulse.latency", str); pw_properties_set(props, "latency_msec", NULL); diff --git a/src/modules/module-protocol-pulse/modules/module-tunnel-source.c b/src/modules/module-protocol-pulse/modules/module-tunnel-source.c index b67efd5b7..415f2ddd1 100644 --- a/src/modules/module-protocol-pulse/modules/module-tunnel-source.c +++ b/src/modules/module-protocol-pulse/modules/module-tunnel-source.c @@ -32,6 +32,7 @@ static const char *const pulse_module_options = "source= " "source_name= " "source_properties= " + "reconnect_interval_ms= " "format= " "channels= " "rate= " @@ -178,6 +179,10 @@ static int module_tunnel_source_prepare(struct module * const module) } audioinfo_to_properties(&info, stream_props); + if ((str = pw_properties_get(props, "reconnect_interval_ms")) != NULL) { + pw_properties_set(props, "reconnect.interval.ms", str); + pw_properties_set(props, "reconnect_interval_ms", NULL); + } if ((str = pw_properties_get(props, "latency_msec")) != NULL) { pw_properties_set(props, "pulse.latency", str); pw_properties_set(props, "latency_msec", NULL); diff --git a/src/modules/module-pulse-tunnel.c b/src/modules/module-pulse-tunnel.c index 310c7b51f..9ace594b2 100644 --- a/src/modules/module-pulse-tunnel.c +++ b/src/modules/module-pulse-tunnel.c @@ -58,6 +58,9 @@ * - `pulse.server.address`: the address of the PulseAudio server to tunnel to. * - `pulse.latency`: the latency to end-to-end latency in milliseconds to * maintain (Default 200). + * - `reconnect.interval.ms`: when the remote connection is broken, retry to connect + * with this interval in millisconds. A value of 0 disables recovery + * and will result in a module unload. (Default 0) (Since 1.1.0) * - `stream.props`: Extra properties for the local stream. * * ## General options @@ -87,6 +90,7 @@ * # Set the remote address to tunnel to * pulse.server.address = "tcp:192.168.1.126" * #pulse.latency = 200 + * #reconnect.interval.ms = 0 * #audio.rate= * #audio.channels= * #audio.position= @@ -121,6 +125,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); "( audio.position= ] " \ "pulse.server.address=
" \ "( pulse.latency= ) " \ + "( reconnect.interval.ms= ) " \ "( tunnel.mode=source|sink, default sink ) " \ "( stream.props= ) " @@ -184,9 +189,16 @@ struct impl { float max_error; unsigned resync:1; - unsigned int do_disconnect:1; + bool do_disconnect:1; + bool stopping; + + struct spa_source *timer; + uint32_t reconnect_interval_ms; + bool recovering; }; +static int start_pulse_connection(struct impl *impl); + static void cork_stream(struct impl *impl, bool cork) { pa_operation *operation; @@ -463,8 +475,8 @@ static int create_stream(struct impl *impl) struct spa_pod_builder b; struct spa_latency_info latency; - impl->stream = pw_stream_new(impl->core, "pulse", impl->stream_props); - impl->stream_props = NULL; + impl->stream = pw_stream_new(impl->core, "pulse", + pw_properties_copy(impl->stream_props)); if (impl->stream == NULL) return -errno; @@ -503,19 +515,61 @@ static int create_stream(struct impl *impl) return 0; } +static void cleanup_streams(struct impl *impl) +{ + if (impl->pa_mainloop) { + pa_threaded_mainloop_stop(impl->pa_mainloop); + pa_threaded_mainloop_lock(impl->pa_mainloop); + } + if (impl->pa_stream) { + pa_stream_unref(impl->pa_stream); + impl->pa_stream = NULL; + } + if (impl->pa_context) { + pa_context_disconnect(impl->pa_context); + pa_context_unref(impl->pa_context); + impl->pa_context = NULL; + } + if (impl->pa_mainloop) { + pa_threaded_mainloop_unlock(impl->pa_mainloop); + pa_threaded_mainloop_free(impl->pa_mainloop); + impl->pa_mainloop = NULL; + } + if (impl->stream) + pw_stream_destroy(impl->stream); +} + +static void on_timer_event(void *data, uint64_t expirations) +{ + struct impl *impl = data; + cleanup_streams(impl); + start_pulse_connection(impl); +} + static int -do_schedule_destroy(struct spa_loop *loop, +do_schedule_recovery(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct impl *impl = user_data; - if (impl->module) - pw_impl_module_schedule_destroy(impl->module); + if (impl->reconnect_interval_ms > 0) { + struct timespec value; + uint64_t timestamp; + + timestamp = impl->reconnect_interval_ms * SPA_NSEC_PER_MSEC; + value.tv_sec = timestamp / SPA_NSEC_PER_SEC; + value.tv_nsec = timestamp % SPA_NSEC_PER_SEC; + pw_loop_update_timer(impl->main_loop, impl->timer, &value, NULL, false); + } else { + if (impl->module) + pw_impl_module_schedule_destroy(impl->module); + } return 0; } -static void module_schedule_destroy(struct impl *impl) +static void schedule_recovery(struct impl *impl) { - pw_loop_invoke(impl->main_loop, do_schedule_destroy, 1, NULL, 0, false, impl); + if (!impl->stopping) + pw_loop_invoke(impl->main_loop, do_schedule_recovery, 1, NULL, 0, false, impl); } static int @@ -527,7 +581,8 @@ do_create_stream(struct spa_loop *loop, if (impl->stream == NULL) { if ((res = create_stream(impl)) < 0) { pw_log_error("failed to create stream: %s", spa_strerror(res)); - do_schedule_destroy(loop, async, seq, NULL, 0, user_data); + if (impl->module) + pw_impl_module_schedule_destroy(impl->module); } } return 0; @@ -542,22 +597,22 @@ static void stream_state_cb(pa_stream *s, void * userdata) pw_log_debug("stream state %d", state); switch (state) { - case PA_STREAM_FAILED: - case PA_STREAM_TERMINATED: - do_destroy = true; - SPA_FALLTHROUGH; + case PA_STREAM_CREATING: + break; case PA_STREAM_READY: impl->pa_index = pa_stream_get_index(impl->pa_stream); pw_loop_invoke(impl->main_loop, do_create_stream, 1, NULL, 0, false, impl); break; + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: case PA_STREAM_UNCONNECTED: do_destroy = true; break; - case PA_STREAM_CREATING: - break; } - if (do_destroy) - module_schedule_destroy(impl); + if (do_destroy) { + pw_log_warn("stream failure: %d", state); + schedule_recovery(impl); + } } static void stream_read_request_cb(pa_stream *s, size_t length, void *userdata) @@ -865,25 +920,25 @@ static void context_state_cb(pa_context *c, void *userdata) pw_log_debug("state %d", state); switch (state) { - case PA_CONTEXT_TERMINATED: - case PA_CONTEXT_FAILED: - do_destroy = true; - SPA_FALLTHROUGH; + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; case PA_CONTEXT_READY: if (impl->pa_stream == NULL) if (create_pulse_stream(impl) < 0) do_destroy = true; break; + case PA_CONTEXT_TERMINATED: case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_FAILED: do_destroy = true; break; - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; } - if (do_destroy) - module_schedule_destroy(impl); + if (do_destroy) { + pw_log_warn("connection failure: %s", pa_strerror(pa_context_errno(c))); + schedule_recovery(impl); + } } static pa_proplist* tunnel_new_proplist(struct impl *impl) @@ -975,20 +1030,10 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { + impl->stopping = true; - if (impl->pa_mainloop) - pa_threaded_mainloop_stop(impl->pa_mainloop); - if (impl->pa_stream) - pa_stream_unref(impl->pa_stream); - if (impl->pa_context) { - pa_context_disconnect(impl->pa_context); - pa_context_unref(impl->pa_context); - } - if (impl->pa_mainloop) - pa_threaded_mainloop_free(impl->pa_mainloop); + cleanup_streams(impl); - if (impl->stream) - pw_stream_destroy(impl->stream); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -997,6 +1042,9 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->stream_props); pw_properties_free(impl->props); + if (impl->timer) + pw_loop_destroy_source(impl->main_loop, impl->timer); + free(impl->buffer); free(impl); } @@ -1170,10 +1218,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto error; } } + impl->reconnect_interval_ms = pw_properties_get_uint32(props, + "reconnect.interval.ms", 0); + + impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl); + if (impl->timer == NULL) { + res = -errno; + pw_log_error("can't create timer source: %m"); + goto error; + } impl->latency_msec = pw_properties_get_uint32(props, "pulse.latency", DEFAULT_LATENCY_MSEC); - if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); if (pw_properties_get(props, PW_KEY_NODE_NETWORK) == NULL)