diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index c15c6c1a4..a27d6b244 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -689,6 +689,30 @@ gst_pipewire_sink_stop (GstBaseSink * basesink) return TRUE; } +static void on_core_done (void *object, uint32_t id, int seq) +{ + GstPipeWireSink * pwsink = object; + if (id == PW_ID_CORE) { + pwsink->last_seq = seq; + pw_thread_loop_signal (pwsink->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, +}; + +static void do_sync(GstPipeWireSink * pwsink) +{ + pwsink->pending_seq = pw_core_sync(pwsink->core, 0, pwsink->pending_seq); + while (true) { + if (pwsink->last_seq == pwsink->pending_seq || pwsink->last_error < 0) + break; + pw_thread_loop_wait (pwsink->loop); + } +} + static gboolean gst_pipewire_sink_open (GstPipeWireSink * pwsink) { @@ -705,6 +729,11 @@ gst_pipewire_sink_open (GstPipeWireSink * pwsink) if (pwsink->core == NULL) goto connect_error; + pw_core_add_listener(pwsink->core, + &pwsink->core_listener, + &core_events, + pwsink); + pw_thread_loop_unlock (pwsink->loop); return TRUE; @@ -730,9 +759,11 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink) { pw_thread_loop_lock (pwsink->loop); if (pwsink->stream) { - pw_stream_disconnect (pwsink->stream); + pw_stream_destroy (pwsink->stream); + pwsink->stream = NULL; } if (pwsink->core) { + do_sync(pwsink); pw_core_disconnect (pwsink->core); pwsink->core = NULL; } @@ -740,10 +771,6 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink) pw_thread_loop_stop (pwsink->loop); - if (pwsink->stream) { - pw_stream_destroy (pwsink->stream); - pwsink->stream = NULL; - } return TRUE; } diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 898770b74..88b210d5d 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -87,6 +87,10 @@ struct _GstPipeWireSink { struct pw_context *context; struct pw_core *core; + struct spa_hook core_listener; + int pending_seq; + int last_seq; + int last_error; struct pw_stream *stream; struct spa_hook stream_listener; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index e2b472457..9f385faf8 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -954,6 +954,30 @@ static const struct pw_stream_events stream_events = { .process = on_process, }; +static void on_core_done (void *object, uint32_t id, int seq) +{ + GstPipeWireSrc * pwsrc = object; + if (id == PW_ID_CORE) { + pwsrc->last_seq = seq; + pw_thread_loop_signal (pwsrc->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, +}; + +static void do_sync(GstPipeWireSrc * pwsrc) +{ + pwsrc->pending_seq = pw_core_sync(pwsrc->core, 0, pwsrc->pending_seq); + while (true) { + if (pwsrc->last_seq == pwsrc->pending_seq || pwsrc->last_error < 0) + break; + pw_thread_loop_wait (pwsrc->loop); + } +} + static gboolean gst_pipewire_src_open (GstPipeWireSrc * pwsrc) { @@ -972,6 +996,11 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) if (pwsrc->core == NULL) goto connect_error; + pw_core_add_listener(pwsrc->core, + &pwsrc->core_listener, + &core_events, + pwsrc); + if (pwsrc->properties) { props = pw_properties_new (NULL, NULL); gst_structure_foreach (pwsrc->properties, copy_properties, props); @@ -989,7 +1018,6 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) &stream_events, pwsrc); - pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time); pw_thread_loop_unlock (pwsrc->loop); @@ -1018,8 +1046,6 @@ no_stream: static void gst_pipewire_src_close (GstPipeWireSrc * pwsrc) { - pw_thread_loop_stop (pwsrc->loop); - pwsrc->last_time = gst_clock_get_time (pwsrc->clock); gst_element_post_message (GST_ELEMENT (pwsrc), @@ -1030,11 +1056,19 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) g_clear_object (&pwsrc->clock); GST_OBJECT_UNLOCK (pwsrc); - pw_stream_destroy (pwsrc->stream); - pwsrc->stream = NULL; + pw_thread_loop_lock (pwsrc->loop); + if (pwsrc->stream) { + pw_stream_destroy (pwsrc->stream); + pwsrc->stream = NULL; + } + if (pwsrc->core) { + do_sync(pwsrc); + pw_core_disconnect (pwsrc->core); + pwsrc->core = NULL; + } + pw_thread_loop_unlock (pwsrc->loop); - pw_core_disconnect (pwsrc->core); - pwsrc->core = NULL; + pw_thread_loop_stop (pwsrc->loop); } static GstStateChangeReturn diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index acefd0781..97975e736 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -77,6 +77,10 @@ struct _GstPipeWireSrc { struct pw_context *context; struct pw_core *core; + struct spa_hook core_listener; + int last_error; + int last_seq; + int pending_seq; struct pw_stream *stream; struct spa_hook stream_listener;