From 6eba010d3801ab8914d8248f3dbed6cb718bf95f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 21 May 2020 12:22:48 +0200 Subject: [PATCH] gst: do a sync before disconnect This makes sure we first nicely remove the stream from the server and then close the socket. If we don't do this, the disconnect might not have flushed out our disconnect and the server is left with a non-responsive node, especially if the disconnect on the core was done with a socket from the portal that is still open. --- src/gst/gstpipewiresink.c | 37 ++++++++++++++++++++++++++---- src/gst/gstpipewiresink.h | 4 ++++ src/gst/gstpipewiresrc.c | 48 +++++++++++++++++++++++++++++++++------ src/gst/gstpipewiresrc.h | 4 ++++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index c15c6c1a4..a27d6b244 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -689,6 +689,30 @@ gst_pipewire_sink_stop (GstBaseSink * basesink) return TRUE; } +static void on_core_done (void *object, uint32_t id, int seq) +{ + GstPipeWireSink * pwsink = object; + if (id == PW_ID_CORE) { + pwsink->last_seq = seq; + pw_thread_loop_signal (pwsink->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, +}; + +static void do_sync(GstPipeWireSink * pwsink) +{ + pwsink->pending_seq = pw_core_sync(pwsink->core, 0, pwsink->pending_seq); + while (true) { + if (pwsink->last_seq == pwsink->pending_seq || pwsink->last_error < 0) + break; + pw_thread_loop_wait (pwsink->loop); + } +} + static gboolean gst_pipewire_sink_open (GstPipeWireSink * pwsink) { @@ -705,6 +729,11 @@ gst_pipewire_sink_open (GstPipeWireSink * pwsink) if (pwsink->core == NULL) goto connect_error; + pw_core_add_listener(pwsink->core, + &pwsink->core_listener, + &core_events, + pwsink); + pw_thread_loop_unlock (pwsink->loop); return TRUE; @@ -730,9 +759,11 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink) { pw_thread_loop_lock (pwsink->loop); if (pwsink->stream) { - pw_stream_disconnect (pwsink->stream); + pw_stream_destroy (pwsink->stream); + pwsink->stream = NULL; } if (pwsink->core) { + do_sync(pwsink); pw_core_disconnect (pwsink->core); pwsink->core = NULL; } @@ -740,10 +771,6 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink) pw_thread_loop_stop (pwsink->loop); - if (pwsink->stream) { - pw_stream_destroy (pwsink->stream); - pwsink->stream = NULL; - } return TRUE; } diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 898770b74..88b210d5d 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -87,6 +87,10 @@ struct _GstPipeWireSink { struct pw_context *context; struct pw_core *core; + struct spa_hook core_listener; + int pending_seq; + int last_seq; + int last_error; struct pw_stream *stream; struct spa_hook stream_listener; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index e2b472457..9f385faf8 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -954,6 +954,30 @@ static const struct pw_stream_events stream_events = { .process = on_process, }; +static void on_core_done (void *object, uint32_t id, int seq) +{ + GstPipeWireSrc * pwsrc = object; + if (id == PW_ID_CORE) { + pwsrc->last_seq = seq; + pw_thread_loop_signal (pwsrc->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, +}; + +static void do_sync(GstPipeWireSrc * pwsrc) +{ + pwsrc->pending_seq = pw_core_sync(pwsrc->core, 0, pwsrc->pending_seq); + while (true) { + if (pwsrc->last_seq == pwsrc->pending_seq || pwsrc->last_error < 0) + break; + pw_thread_loop_wait (pwsrc->loop); + } +} + static gboolean gst_pipewire_src_open (GstPipeWireSrc * pwsrc) { @@ -972,6 +996,11 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) if (pwsrc->core == NULL) goto connect_error; + pw_core_add_listener(pwsrc->core, + &pwsrc->core_listener, + &core_events, + pwsrc); + if (pwsrc->properties) { props = pw_properties_new (NULL, NULL); gst_structure_foreach (pwsrc->properties, copy_properties, props); @@ -989,7 +1018,6 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) &stream_events, pwsrc); - pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time); pw_thread_loop_unlock (pwsrc->loop); @@ -1018,8 +1046,6 @@ no_stream: static void gst_pipewire_src_close (GstPipeWireSrc * pwsrc) { - pw_thread_loop_stop (pwsrc->loop); - pwsrc->last_time = gst_clock_get_time (pwsrc->clock); gst_element_post_message (GST_ELEMENT (pwsrc), @@ -1030,11 +1056,19 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) g_clear_object (&pwsrc->clock); GST_OBJECT_UNLOCK (pwsrc); - pw_stream_destroy (pwsrc->stream); - pwsrc->stream = NULL; + pw_thread_loop_lock (pwsrc->loop); + if (pwsrc->stream) { + pw_stream_destroy (pwsrc->stream); + pwsrc->stream = NULL; + } + if (pwsrc->core) { + do_sync(pwsrc); + pw_core_disconnect (pwsrc->core); + pwsrc->core = NULL; + } + pw_thread_loop_unlock (pwsrc->loop); - pw_core_disconnect (pwsrc->core); - pwsrc->core = NULL; + pw_thread_loop_stop (pwsrc->loop); } static GstStateChangeReturn diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index acefd0781..97975e736 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -77,6 +77,10 @@ struct _GstPipeWireSrc { struct pw_context *context; struct pw_core *core; + struct spa_hook core_listener; + int last_error; + int last_seq; + int pending_seq; struct pw_stream *stream; struct spa_hook stream_listener;