diff --git a/src/gst/gstpipewirecore.c b/src/gst/gstpipewirecore.c index 75bdbb84b..d9d78d5f2 100644 --- a/src/gst/gstpipewirecore.c +++ b/src/gst/gstpipewirecore.c @@ -160,11 +160,15 @@ GstPipeWireCore *gst_pipewire_core_get (int fd) static void do_sync(GstPipeWireCore * core) { + struct timespec abstime; core->pending_seq = pw_core_sync(core->core, 0, core->pending_seq); + pw_thread_loop_get_time (core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); while (true) { if (core->last_seq == core->pending_seq || core->last_error < 0) break; - pw_thread_loop_wait (core->loop); + if (pw_thread_loop_timed_wait_full (core->loop, &abstime) < 0) + break; } } diff --git a/src/gst/gstpipewirecore.h b/src/gst/gstpipewirecore.h index 90b6bd38b..c64d46621 100644 --- a/src/gst/gstpipewirecore.h +++ b/src/gst/gstpipewirecore.h @@ -33,6 +33,8 @@ G_BEGIN_DECLS typedef struct _GstPipeWireCore GstPipeWireCore; +#define GST_PIPEWIRE_DEFAULT_TIMEOUT 30 + /** * GstPipeWireCore: * diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 42d0e42a6..b971722a5 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -510,6 +510,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) guint size; guint min_buffers; guint max_buffers; + struct timespec abstime; pwsink = GST_PIPEWIRE_SINK (bsink); @@ -536,6 +537,9 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) (const struct spa_pod **) possible->pdata, possible->len); + pw_thread_loop_get_time (pwsink->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + while (TRUE) { state = pw_stream_get_state (pwsink->stream, &error); @@ -545,7 +549,10 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (state == PW_STREAM_STATE_ERROR) goto start_error; - pw_thread_loop_wait (pwsink->core->loop); + if (pw_thread_loop_timed_wait_full (pwsink->core->loop, &abstime) < 0) { + error = "timeout"; + goto start_error; + } } } res = TRUE; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 639faeec1..708ddbe86 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -559,9 +559,14 @@ static gboolean gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) { const char *error = NULL; + struct timespec abstime; pw_thread_loop_lock (pwsrc->core->loop); GST_DEBUG_OBJECT (pwsrc, "doing stream start"); + + pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + while (TRUE) { enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); @@ -577,7 +582,10 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) goto start_error; } - pw_thread_loop_wait (pwsrc->core->loop); + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) { + error = "timeout"; + goto start_error; + } } parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream)); @@ -602,8 +610,13 @@ wait_started (GstPipeWireSrc *this) { enum pw_stream_state state; const char *error = NULL; + struct timespec abstime; pw_thread_loop_lock (this->core->loop); + + pw_thread_loop_get_time (this->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + while (TRUE) { state = pw_stream_get_state (this->stream, &error); @@ -621,9 +634,13 @@ wait_started (GstPipeWireSrc *this) if (this->started) break; - pw_thread_loop_wait (this->core->loop); + if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) { + state = PW_STREAM_STATE_ERROR; + break; + } } - GST_DEBUG_OBJECT (this, "got started signal"); + GST_DEBUG_OBJECT (this, "got started signal: %s", + pw_stream_state_as_string (state)); pw_thread_loop_unlock (this->core->loop); return state; @@ -639,6 +656,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) gboolean result = FALSE; GPtrArray *possible; const char *error = NULL; + struct timespec abstime; /* first see what is possible on our source pad */ thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL); @@ -701,6 +719,9 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) possible->len); g_ptr_array_free (possible, TRUE); + pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + while (TRUE) { enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); @@ -711,7 +732,8 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) if (pwsrc->negotiated) break; - pw_thread_loop_wait (pwsrc->core->loop); + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) + goto connect_error; } caps = pwsrc->caps; pwsrc->caps = NULL; @@ -759,6 +781,7 @@ no_common_caps: } connect_error: { + GST_DEBUG_OBJECT (basesrc, "connect error"); pw_thread_loop_unlock (pwsrc->core->loop); return FALSE; } @@ -1084,6 +1107,8 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) { struct pw_properties *props; + GST_DEBUG_OBJECT (pwsrc, "open"); + pwsrc->core = gst_pipewire_core_get(pwsrc->fd); if (pwsrc->core == NULL) goto connect_error; @@ -1135,6 +1160,8 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) { pwsrc->last_time = gst_clock_get_time (pwsrc->clock); + GST_DEBUG_OBJECT (pwsrc, "close"); + gst_element_post_message (GST_ELEMENT (pwsrc), gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc), pwsrc->clock));