diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index cdc709023..a8c62ad3b 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -62,6 +62,7 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define DEFAULT_ALWAYS_COPY false #define DEFAULT_MIN_BUFFERS 1 #define DEFAULT_MAX_BUFFERS INT32_MAX +#define DEFAULT_RESEND_LAST false enum { @@ -73,6 +74,7 @@ enum PROP_MIN_BUFFERS, PROP_MAX_BUFFERS, PROP_FD, + PROP_RESEND_LAST, }; @@ -89,6 +91,8 @@ G_DEFINE_TYPE (GstPipeWireSrc, gst_pipewire_src, GST_TYPE_PUSH_SRC); static GstStateChangeReturn gst_pipewire_src_change_state (GstElement * element, GstStateChange transition); +static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event); + static gboolean gst_pipewire_src_negotiate (GstBaseSrc * basesrc); static GstFlowReturn gst_pipewire_src_create (GstPushSrc * psrc, @@ -140,6 +144,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->fd = g_value_get_int (value); break; + case PROP_RESEND_LAST: + pwsrc->resend_last = g_value_get_boolean (value); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -181,6 +189,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_int (value, pwsrc->fd); break; + case PROP_RESEND_LAST: + g_value_set_boolean (value, pwsrc->resend_last); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -315,8 +327,19 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_RESEND_LAST, + g_param_spec_boolean ("resend-last", + "Resend last", + "Resend last buffer on EOS", + DEFAULT_RESEND_LAST, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->change_state = gst_pipewire_src_change_state; + gstelement_class->send_event = gst_pipewire_src_send_event; gst_element_class_set_static_metadata (gstelement_class, "PipeWire source", "Source/Video", @@ -352,6 +375,7 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->min_buffers = DEFAULT_MIN_BUFFERS; src->max_buffers = DEFAULT_MAX_BUFFERS; src->fd = -1; + src->resend_last = DEFAULT_RESEND_LAST; src->client_name = g_strdup(pw_get_client_name ()); @@ -856,12 +880,21 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) if (state != PW_STREAM_STATE_STREAMING) goto streaming_stopped; - - buf = dequeue_buffer (pwsrc); - GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); - if (buf != NULL) + if (pwsrc->eos) { + if (pwsrc->last_buffer == NULL) + goto streaming_eos; + buf = pwsrc->last_buffer; + pwsrc->last_buffer = NULL; break; - + } else { + buf = dequeue_buffer (pwsrc); + GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); + if (buf != NULL) { + if (pwsrc->resend_last) + gst_buffer_replace (&pwsrc->last_buffer, buf); + break; + } + } pw_thread_loop_wait (pwsrc->loop); } pw_thread_loop_unlock (pwsrc->loop); @@ -878,8 +911,18 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) else base_time = 0; - pts = GST_BUFFER_PTS (*buffer); - dts = GST_BUFFER_DTS (*buffer); + if (pwsrc->last_buffer == NULL && pwsrc->resend_last) { + GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc)); + if (clock != NULL) { + pts = dts = gst_clock_get_time (clock); + gst_object_unref (clock); + } else { + pts = dts = GST_CLOCK_TIME_NONE; + } + } else { + pts = GST_BUFFER_PTS (*buffer); + dts = GST_BUFFER_DTS (*buffer); + } if (GST_CLOCK_TIME_IS_VALID (pts)) pts = (pts >= base_time ? pts - base_time : 0); @@ -901,6 +944,11 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } +streaming_eos: + { + pw_thread_loop_unlock (pwsrc->loop); + return GST_FLOW_EOS; + } streaming_error: { pw_thread_loop_unlock (pwsrc->loop); @@ -927,6 +975,8 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc) pwsrc = GST_PIPEWIRE_SRC (basesrc); pw_thread_loop_lock (pwsrc->loop); + pwsrc->eos = false; + gst_buffer_replace (&pwsrc->last_buffer, NULL); pw_thread_loop_unlock (pwsrc->loop); return TRUE; @@ -1072,6 +1122,28 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) pw_thread_loop_stop (pwsrc->loop); } +static gboolean +gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) +{ + GstPipeWireSrc *this = GST_PIPEWIRE_SRC_CAST (elem); + gboolean ret; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_DEBUG_OBJECT (this, "got EOS"); + pw_thread_loop_lock (this->loop); + this->eos = true; + pw_thread_loop_signal (this->loop, FALSE); + pw_thread_loop_unlock (this->loop); + ret = TRUE; + break; + default: + ret = GST_ELEMENT_CLASS (parent_class)->send_event (elem, event); + break; + } + return ret; +} + static GstStateChangeReturn gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) { diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index 97975e736..e7ea864e1 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -64,10 +64,12 @@ struct _GstPipeWireSrc { gint min_buffers; gint max_buffers; int fd; + gboolean resend_last; gboolean negotiated; gboolean flushing; gboolean started; + gboolean eos; gboolean is_live; GstClockTime min_latency; @@ -85,6 +87,7 @@ struct _GstPipeWireSrc { struct pw_stream *stream; struct spa_hook stream_listener; + GstBuffer *last_buffer; GstStructure *properties; GstPipeWirePool *pool;