From 8e9bbaf3dc53a299c3773ce3af4e0fe5abe223cc Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 13 Jul 2020 12:11:34 +0200 Subject: [PATCH] pipewiresrc: add option to resend last buffer on EOS Add an option to resend the last buffer on EOS with an updated timestamp. This can be used to make sure encoders fill up the gap between last buffer and EOS, like with sparse streams from screen capture. --- src/gst/gstpipewiresrc.c | 86 ++++++++++++++++++++++++++++++++++++---- src/gst/gstpipewiresrc.h | 3 ++ 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index cdc709023..a8c62ad3b 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -62,6 +62,7 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define DEFAULT_ALWAYS_COPY false #define DEFAULT_MIN_BUFFERS 1 #define DEFAULT_MAX_BUFFERS INT32_MAX +#define DEFAULT_RESEND_LAST false enum { @@ -73,6 +74,7 @@ enum PROP_MIN_BUFFERS, PROP_MAX_BUFFERS, PROP_FD, + PROP_RESEND_LAST, }; @@ -89,6 +91,8 @@ G_DEFINE_TYPE (GstPipeWireSrc, gst_pipewire_src, GST_TYPE_PUSH_SRC); static GstStateChangeReturn gst_pipewire_src_change_state (GstElement * element, GstStateChange transition); +static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event); + static gboolean gst_pipewire_src_negotiate (GstBaseSrc * basesrc); static GstFlowReturn gst_pipewire_src_create (GstPushSrc * psrc, @@ -140,6 +144,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->fd = g_value_get_int (value); break; + case PROP_RESEND_LAST: + pwsrc->resend_last = g_value_get_boolean (value); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -181,6 +189,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_int (value, pwsrc->fd); break; + case PROP_RESEND_LAST: + g_value_set_boolean (value, pwsrc->resend_last); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -315,8 +327,19 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_RESEND_LAST, + g_param_spec_boolean ("resend-last", + "Resend last", + "Resend last buffer on EOS", + DEFAULT_RESEND_LAST, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->change_state = gst_pipewire_src_change_state; + gstelement_class->send_event = gst_pipewire_src_send_event; gst_element_class_set_static_metadata (gstelement_class, "PipeWire source", "Source/Video", @@ -352,6 +375,7 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->min_buffers = DEFAULT_MIN_BUFFERS; src->max_buffers = DEFAULT_MAX_BUFFERS; src->fd = -1; + src->resend_last = DEFAULT_RESEND_LAST; src->client_name = g_strdup(pw_get_client_name ()); @@ -856,12 +880,21 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) if (state != PW_STREAM_STATE_STREAMING) goto streaming_stopped; - - buf = dequeue_buffer (pwsrc); - GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); - if (buf != NULL) + if (pwsrc->eos) { + if (pwsrc->last_buffer == NULL) + goto streaming_eos; + buf = pwsrc->last_buffer; + pwsrc->last_buffer = NULL; break; - + } else { + buf = dequeue_buffer (pwsrc); + GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); + if (buf != NULL) { + if (pwsrc->resend_last) + gst_buffer_replace (&pwsrc->last_buffer, buf); + break; + } + } pw_thread_loop_wait (pwsrc->loop); } pw_thread_loop_unlock (pwsrc->loop); @@ -878,8 +911,18 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) else base_time = 0; - pts = GST_BUFFER_PTS (*buffer); - dts = GST_BUFFER_DTS (*buffer); + if (pwsrc->last_buffer == NULL && pwsrc->resend_last) { + GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc)); + if (clock != NULL) { + pts = dts = gst_clock_get_time (clock); + gst_object_unref (clock); + } else { + pts = dts = GST_CLOCK_TIME_NONE; + } + } else { + pts = GST_BUFFER_PTS (*buffer); + dts = GST_BUFFER_DTS (*buffer); + } if (GST_CLOCK_TIME_IS_VALID (pts)) pts = (pts >= base_time ? pts - base_time : 0); @@ -901,6 +944,11 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } +streaming_eos: + { + pw_thread_loop_unlock (pwsrc->loop); + return GST_FLOW_EOS; + } streaming_error: { pw_thread_loop_unlock (pwsrc->loop); @@ -927,6 +975,8 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc) pwsrc = GST_PIPEWIRE_SRC (basesrc); pw_thread_loop_lock (pwsrc->loop); + pwsrc->eos = false; + gst_buffer_replace (&pwsrc->last_buffer, NULL); pw_thread_loop_unlock (pwsrc->loop); return TRUE; @@ -1072,6 +1122,28 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) pw_thread_loop_stop (pwsrc->loop); } +static gboolean +gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) +{ + GstPipeWireSrc *this = GST_PIPEWIRE_SRC_CAST (elem); + gboolean ret; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_DEBUG_OBJECT (this, "got EOS"); + pw_thread_loop_lock (this->loop); + this->eos = true; + pw_thread_loop_signal (this->loop, FALSE); + pw_thread_loop_unlock (this->loop); + ret = TRUE; + break; + default: + ret = GST_ELEMENT_CLASS (parent_class)->send_event (elem, event); + break; + } + return ret; +} + static GstStateChangeReturn gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) { diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index 97975e736..e7ea864e1 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -64,10 +64,12 @@ struct _GstPipeWireSrc { gint min_buffers; gint max_buffers; int fd; + gboolean resend_last; gboolean negotiated; gboolean flushing; gboolean started; + gboolean eos; gboolean is_live; GstClockTime min_latency; @@ -85,6 +87,7 @@ struct _GstPipeWireSrc { struct pw_stream *stream; struct spa_hook stream_listener; + GstBuffer *last_buffer; GstStructure *properties; GstPipeWirePool *pool;