From c0a6a7ea32f6737ce7db25b3ba419b0eaf461a93 Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Fri, 27 Dec 2024 10:39:36 +0530 Subject: [PATCH] gst: handle flush event in pipewiresink flush the pw buffers to the stream's queue during a FLUSH_START event and return the unqueued pw buffers, if they are dropped/released without being rendered, so they can be available to be dequeued for the subsequent `acquire` calls --- src/gst/gstpipewirepool.c | 34 +++++++++++++++++++++++++++++++--- src/gst/gstpipewiresink.c | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index 64982306e..87a3a2022 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -16,6 +16,8 @@ #include "gstpipewirepool.h" #include +#include + GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category); #define GST_CAT_DEFAULT gst_pipewire_pool_debug_category @@ -161,21 +163,25 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool))) goto flushing; - if ((b = pw_stream_dequeue_buffer(s->pwstream))) + if ((b = pw_stream_dequeue_buffer(s->pwstream))) { + GST_LOG_OBJECT (pool, "dequeued buffer %p", b); break; + } if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT)) goto no_more_buffers; - GST_WARNING ("queue empty"); + GST_WARNING_OBJECT (pool, "failed to dequeue buffer: %s", strerror(errno)); g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool)); } data = b->user_data; + data->queued = FALSE; + *buffer = data->buf; GST_OBJECT_UNLOCK (pool); - GST_LOG_OBJECT (pool, "acquire buffer %p", *buffer); + GST_LOG_OBJECT (pool, "acquired gstbuffer %p", *buffer); return GST_FLOW_OK; @@ -253,6 +259,28 @@ static void release_buffer (GstBufferPool * pool, GstBuffer *buffer) { GST_LOG_OBJECT (pool, "release buffer %p", buffer); + + GstPipeWirePoolData *data = gst_pipewire_pool_get_data(buffer); + + if (!data->queued && data->b != NULL) + { + GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); + GST_OBJECT_LOCK (pool); + + g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&p->stream); + int res; + + pw_thread_loop_lock (s->core->loop); + if ((res = pw_stream_return_buffer (s->pwstream, data->b)) < 0) { + GST_ERROR_OBJECT (pool,"can't return buffer %p; gstbuffer : %p, %s",data->b, buffer, spa_strerror(res)); + } else { + data->queued = TRUE; + GST_DEBUG_OBJECT (pool, "returned buffer %p; gstbuffer:%p", data->b, buffer); + } + pw_thread_loop_unlock (s->core->loop); + GST_OBJECT_UNLOCK (pool); + + } } static gboolean diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index b0564d539..631e74da8 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -120,6 +120,8 @@ static GstCaps *gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, static GstFlowReturn gst_pipewire_sink_render (GstBaseSink * psink, GstBuffer * buffer); +static gboolean gst_pipewire_sink_event (GstBaseSink *sink, GstEvent *event); + static GstClock * gst_pipewire_sink_provide_clock (GstElement * elem) { @@ -272,6 +274,7 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) gstbasesink_class->fixate = gst_pipewire_sink_sink_fixate; gstbasesink_class->propose_allocation = gst_pipewire_sink_propose_allocation; gstbasesink_class->render = gst_pipewire_sink_render; + gstbasesink_class->event = gst_pipewire_sink_event; GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0, "PipeWire Sink"); @@ -627,7 +630,10 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } if ((res = pw_stream_queue_buffer (stream->pwstream, data->b)) < 0) { - g_warning ("can't send buffer %s", spa_strerror(res)); + GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res)); + } else { + data->queued = TRUE; + GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer); } switch (pwsink->slave_method) { @@ -986,3 +992,31 @@ open_failed: return GST_STATE_CHANGE_FAILURE; } } + +static gboolean gst_pipewire_sink_event (GstBaseSink *sink, GstEvent *event) { + GstPipeWireSink *pw_sink = GST_PIPEWIRE_SINK(sink); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + { + GST_DEBUG_OBJECT (pw_sink, "flush-start"); + pw_thread_loop_lock (pw_sink->stream->core->loop); + pw_stream_set_active(pw_sink->stream->pwstream, false); + pw_stream_flush(pw_sink->stream->pwstream, false); + pw_thread_loop_unlock (pw_sink->stream->core->loop); + break; + } + case GST_EVENT_FLUSH_STOP: + { + GST_DEBUG_OBJECT (pw_sink, "flush-stop"); + pw_thread_loop_lock (pw_sink->stream->core->loop); + pw_stream_set_active(pw_sink->stream->pwstream, true); + pw_thread_loop_unlock (pw_sink->stream->core->loop); + break; + } + default: + break; + } + + return GST_BASE_SINK_CLASS (parent_class)->event (sink, event); +} \ No newline at end of file