gst: do a sync before disconnect

This makes sure we first nicely remove the stream from the server
and then close the socket.

If we don't do this, the disconnect might not have flushed out our
disconnect and the server is left with a non-responsive node,
especially if the disconnect on the core was done with a socket from
the portal that is still open.
This commit is contained in:
Wim Taymans 2020-05-21 12:22:48 +02:00
parent 48dea3d5ea
commit 6eba010d38
4 changed files with 81 additions and 12 deletions

View file

@ -689,6 +689,30 @@ gst_pipewire_sink_stop (GstBaseSink * basesink)
return TRUE; return TRUE;
} }
static void on_core_done (void *object, uint32_t id, int seq)
{
GstPipeWireSink * pwsink = object;
if (id == PW_ID_CORE) {
pwsink->last_seq = seq;
pw_thread_loop_signal (pwsink->loop, FALSE);
}
}
static const struct pw_core_events core_events = {
PW_VERSION_CORE_EVENTS,
.done = on_core_done,
};
static void do_sync(GstPipeWireSink * pwsink)
{
pwsink->pending_seq = pw_core_sync(pwsink->core, 0, pwsink->pending_seq);
while (true) {
if (pwsink->last_seq == pwsink->pending_seq || pwsink->last_error < 0)
break;
pw_thread_loop_wait (pwsink->loop);
}
}
static gboolean static gboolean
gst_pipewire_sink_open (GstPipeWireSink * pwsink) gst_pipewire_sink_open (GstPipeWireSink * pwsink)
{ {
@ -705,6 +729,11 @@ gst_pipewire_sink_open (GstPipeWireSink * pwsink)
if (pwsink->core == NULL) if (pwsink->core == NULL)
goto connect_error; goto connect_error;
pw_core_add_listener(pwsink->core,
&pwsink->core_listener,
&core_events,
pwsink);
pw_thread_loop_unlock (pwsink->loop); pw_thread_loop_unlock (pwsink->loop);
return TRUE; return TRUE;
@ -730,9 +759,11 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink)
{ {
pw_thread_loop_lock (pwsink->loop); pw_thread_loop_lock (pwsink->loop);
if (pwsink->stream) { if (pwsink->stream) {
pw_stream_disconnect (pwsink->stream); pw_stream_destroy (pwsink->stream);
pwsink->stream = NULL;
} }
if (pwsink->core) { if (pwsink->core) {
do_sync(pwsink);
pw_core_disconnect (pwsink->core); pw_core_disconnect (pwsink->core);
pwsink->core = NULL; pwsink->core = NULL;
} }
@ -740,10 +771,6 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink)
pw_thread_loop_stop (pwsink->loop); pw_thread_loop_stop (pwsink->loop);
if (pwsink->stream) {
pw_stream_destroy (pwsink->stream);
pwsink->stream = NULL;
}
return TRUE; return TRUE;
} }

View file

@ -87,6 +87,10 @@ struct _GstPipeWireSink {
struct pw_context *context; struct pw_context *context;
struct pw_core *core; struct pw_core *core;
struct spa_hook core_listener;
int pending_seq;
int last_seq;
int last_error;
struct pw_stream *stream; struct pw_stream *stream;
struct spa_hook stream_listener; struct spa_hook stream_listener;

View file

@ -954,6 +954,30 @@ static const struct pw_stream_events stream_events = {
.process = on_process, .process = on_process,
}; };
static void on_core_done (void *object, uint32_t id, int seq)
{
GstPipeWireSrc * pwsrc = object;
if (id == PW_ID_CORE) {
pwsrc->last_seq = seq;
pw_thread_loop_signal (pwsrc->loop, FALSE);
}
}
static const struct pw_core_events core_events = {
PW_VERSION_CORE_EVENTS,
.done = on_core_done,
};
static void do_sync(GstPipeWireSrc * pwsrc)
{
pwsrc->pending_seq = pw_core_sync(pwsrc->core, 0, pwsrc->pending_seq);
while (true) {
if (pwsrc->last_seq == pwsrc->pending_seq || pwsrc->last_error < 0)
break;
pw_thread_loop_wait (pwsrc->loop);
}
}
static gboolean static gboolean
gst_pipewire_src_open (GstPipeWireSrc * pwsrc) gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
{ {
@ -972,6 +996,11 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
if (pwsrc->core == NULL) if (pwsrc->core == NULL)
goto connect_error; goto connect_error;
pw_core_add_listener(pwsrc->core,
&pwsrc->core_listener,
&core_events,
pwsrc);
if (pwsrc->properties) { if (pwsrc->properties) {
props = pw_properties_new (NULL, NULL); props = pw_properties_new (NULL, NULL);
gst_structure_foreach (pwsrc->properties, copy_properties, props); gst_structure_foreach (pwsrc->properties, copy_properties, props);
@ -989,7 +1018,6 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
&stream_events, &stream_events,
pwsrc); pwsrc);
pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time); pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time);
pw_thread_loop_unlock (pwsrc->loop); pw_thread_loop_unlock (pwsrc->loop);
@ -1018,8 +1046,6 @@ no_stream:
static void static void
gst_pipewire_src_close (GstPipeWireSrc * pwsrc) gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
{ {
pw_thread_loop_stop (pwsrc->loop);
pwsrc->last_time = gst_clock_get_time (pwsrc->clock); pwsrc->last_time = gst_clock_get_time (pwsrc->clock);
gst_element_post_message (GST_ELEMENT (pwsrc), gst_element_post_message (GST_ELEMENT (pwsrc),
@ -1030,12 +1056,20 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
g_clear_object (&pwsrc->clock); g_clear_object (&pwsrc->clock);
GST_OBJECT_UNLOCK (pwsrc); GST_OBJECT_UNLOCK (pwsrc);
pw_thread_loop_lock (pwsrc->loop);
if (pwsrc->stream) {
pw_stream_destroy (pwsrc->stream); pw_stream_destroy (pwsrc->stream);
pwsrc->stream = NULL; pwsrc->stream = NULL;
}
if (pwsrc->core) {
do_sync(pwsrc);
pw_core_disconnect (pwsrc->core); pw_core_disconnect (pwsrc->core);
pwsrc->core = NULL; pwsrc->core = NULL;
} }
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_stop (pwsrc->loop);
}
static GstStateChangeReturn static GstStateChangeReturn
gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) gst_pipewire_src_change_state (GstElement * element, GstStateChange transition)

View file

@ -77,6 +77,10 @@ struct _GstPipeWireSrc {
struct pw_context *context; struct pw_context *context;
struct pw_core *core; struct pw_core *core;
struct spa_hook core_listener;
int last_error;
int last_seq;
int pending_seq;
struct pw_stream *stream; struct pw_stream *stream;
struct spa_hook stream_listener; struct spa_hook stream_listener;