pipewiresrc: add option to resend last buffer on EOS

Add an option to resend the last buffer on EOS with an updated
timestamp. This can be used to make sure encoders fill up the
gap between last buffer and EOS, like with sparse streams from
screen capture.
This commit is contained in:
Wim Taymans 2020-07-13 12:11:34 +02:00
parent a596cdbf2e
commit 8e9bbaf3dc
2 changed files with 82 additions and 7 deletions

View file

@ -62,6 +62,7 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug);
#define DEFAULT_ALWAYS_COPY false #define DEFAULT_ALWAYS_COPY false
#define DEFAULT_MIN_BUFFERS 1 #define DEFAULT_MIN_BUFFERS 1
#define DEFAULT_MAX_BUFFERS INT32_MAX #define DEFAULT_MAX_BUFFERS INT32_MAX
#define DEFAULT_RESEND_LAST false
enum enum
{ {
@ -73,6 +74,7 @@ enum
PROP_MIN_BUFFERS, PROP_MIN_BUFFERS,
PROP_MAX_BUFFERS, PROP_MAX_BUFFERS,
PROP_FD, PROP_FD,
PROP_RESEND_LAST,
}; };
@ -89,6 +91,8 @@ G_DEFINE_TYPE (GstPipeWireSrc, gst_pipewire_src, GST_TYPE_PUSH_SRC);
static GstStateChangeReturn static GstStateChangeReturn
gst_pipewire_src_change_state (GstElement * element, GstStateChange transition); gst_pipewire_src_change_state (GstElement * element, GstStateChange transition);
static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event);
static gboolean gst_pipewire_src_negotiate (GstBaseSrc * basesrc); static gboolean gst_pipewire_src_negotiate (GstBaseSrc * basesrc);
static GstFlowReturn gst_pipewire_src_create (GstPushSrc * psrc, static GstFlowReturn gst_pipewire_src_create (GstPushSrc * psrc,
@ -140,6 +144,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id,
pwsrc->fd = g_value_get_int (value); pwsrc->fd = g_value_get_int (value);
break; break;
case PROP_RESEND_LAST:
pwsrc->resend_last = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -181,6 +189,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id,
g_value_set_int (value, pwsrc->fd); g_value_set_int (value, pwsrc->fd);
break; break;
case PROP_RESEND_LAST:
g_value_set_boolean (value, pwsrc->resend_last);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -315,8 +327,19 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
G_PARAM_READWRITE | G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS)); G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_RESEND_LAST,
g_param_spec_boolean ("resend-last",
"Resend last",
"Resend last buffer on EOS",
DEFAULT_RESEND_LAST,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->provide_clock = gst_pipewire_src_provide_clock;
gstelement_class->change_state = gst_pipewire_src_change_state; gstelement_class->change_state = gst_pipewire_src_change_state;
gstelement_class->send_event = gst_pipewire_src_send_event;
gst_element_class_set_static_metadata (gstelement_class, gst_element_class_set_static_metadata (gstelement_class,
"PipeWire source", "Source/Video", "PipeWire source", "Source/Video",
@ -352,6 +375,7 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->min_buffers = DEFAULT_MIN_BUFFERS; src->min_buffers = DEFAULT_MIN_BUFFERS;
src->max_buffers = DEFAULT_MAX_BUFFERS; src->max_buffers = DEFAULT_MAX_BUFFERS;
src->fd = -1; src->fd = -1;
src->resend_last = DEFAULT_RESEND_LAST;
src->client_name = g_strdup(pw_get_client_name ()); src->client_name = g_strdup(pw_get_client_name ());
@ -856,12 +880,21 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (state != PW_STREAM_STATE_STREAMING) if (state != PW_STREAM_STATE_STREAMING)
goto streaming_stopped; goto streaming_stopped;
if (pwsrc->eos) {
if (pwsrc->last_buffer == NULL)
goto streaming_eos;
buf = pwsrc->last_buffer;
pwsrc->last_buffer = NULL;
break;
} else {
buf = dequeue_buffer (pwsrc); buf = dequeue_buffer (pwsrc);
GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf);
if (buf != NULL) if (buf != NULL) {
if (pwsrc->resend_last)
gst_buffer_replace (&pwsrc->last_buffer, buf);
break; break;
}
}
pw_thread_loop_wait (pwsrc->loop); pw_thread_loop_wait (pwsrc->loop);
} }
pw_thread_loop_unlock (pwsrc->loop); pw_thread_loop_unlock (pwsrc->loop);
@ -878,8 +911,18 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
else else
base_time = 0; base_time = 0;
if (pwsrc->last_buffer == NULL && pwsrc->resend_last) {
GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (clock != NULL) {
pts = dts = gst_clock_get_time (clock);
gst_object_unref (clock);
} else {
pts = dts = GST_CLOCK_TIME_NONE;
}
} else {
pts = GST_BUFFER_PTS (*buffer); pts = GST_BUFFER_PTS (*buffer);
dts = GST_BUFFER_DTS (*buffer); dts = GST_BUFFER_DTS (*buffer);
}
if (GST_CLOCK_TIME_IS_VALID (pts)) if (GST_CLOCK_TIME_IS_VALID (pts))
pts = (pts >= base_time ? pts - base_time : 0); pts = (pts >= base_time ? pts - base_time : 0);
@ -901,6 +944,11 @@ not_negotiated:
{ {
return GST_FLOW_NOT_NEGOTIATED; return GST_FLOW_NOT_NEGOTIATED;
} }
streaming_eos:
{
pw_thread_loop_unlock (pwsrc->loop);
return GST_FLOW_EOS;
}
streaming_error: streaming_error:
{ {
pw_thread_loop_unlock (pwsrc->loop); pw_thread_loop_unlock (pwsrc->loop);
@ -927,6 +975,8 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
pwsrc = GST_PIPEWIRE_SRC (basesrc); pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->loop); pw_thread_loop_lock (pwsrc->loop);
pwsrc->eos = false;
gst_buffer_replace (&pwsrc->last_buffer, NULL);
pw_thread_loop_unlock (pwsrc->loop); pw_thread_loop_unlock (pwsrc->loop);
return TRUE; return TRUE;
@ -1072,6 +1122,28 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
pw_thread_loop_stop (pwsrc->loop); pw_thread_loop_stop (pwsrc->loop);
} }
static gboolean
gst_pipewire_src_send_event (GstElement * elem, GstEvent * event)
{
GstPipeWireSrc *this = GST_PIPEWIRE_SRC_CAST (elem);
gboolean ret;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
GST_DEBUG_OBJECT (this, "got EOS");
pw_thread_loop_lock (this->loop);
this->eos = true;
pw_thread_loop_signal (this->loop, FALSE);
pw_thread_loop_unlock (this->loop);
ret = TRUE;
break;
default:
ret = GST_ELEMENT_CLASS (parent_class)->send_event (elem, event);
break;
}
return ret;
}
static GstStateChangeReturn static GstStateChangeReturn
gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) gst_pipewire_src_change_state (GstElement * element, GstStateChange transition)
{ {

View file

@ -64,10 +64,12 @@ struct _GstPipeWireSrc {
gint min_buffers; gint min_buffers;
gint max_buffers; gint max_buffers;
int fd; int fd;
gboolean resend_last;
gboolean negotiated; gboolean negotiated;
gboolean flushing; gboolean flushing;
gboolean started; gboolean started;
gboolean eos;
gboolean is_live; gboolean is_live;
GstClockTime min_latency; GstClockTime min_latency;
@ -85,6 +87,7 @@ struct _GstPipeWireSrc {
struct pw_stream *stream; struct pw_stream *stream;
struct spa_hook stream_listener; struct spa_hook stream_listener;
GstBuffer *last_buffer;
GstStructure *properties; GstStructure *properties;
GstPipeWirePool *pool; GstPipeWirePool *pool;