gst: handle flush event in pipewiresink

flush the pw buffers to the stream's queue during a FLUSH_START event
and return the unqueued pw buffers, if they are dropped/released without
being rendered, so they can be available to be dequeued for the subsequent
`acquire` calls
This commit is contained in:
Taruntej Kanakamalla 2024-12-27 10:39:36 +05:30 committed by Wim Taymans
parent b70e99c41a
commit c0a6a7ea32
2 changed files with 66 additions and 4 deletions

View file

@ -16,6 +16,8 @@
#include "gstpipewirepool.h"
#include <spa/debug/types.h>
#include <spa/utils/result.h>
GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category);
#define GST_CAT_DEFAULT gst_pipewire_pool_debug_category
@ -161,21 +163,25 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool)))
goto flushing;
if ((b = pw_stream_dequeue_buffer(s->pwstream)))
if ((b = pw_stream_dequeue_buffer(s->pwstream))) {
GST_LOG_OBJECT (pool, "dequeued buffer %p", b);
break;
}
if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT))
goto no_more_buffers;
GST_WARNING ("queue empty");
GST_WARNING_OBJECT (pool, "failed to dequeue buffer: %s", strerror(errno));
g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool));
}
data = b->user_data;
data->queued = FALSE;
*buffer = data->buf;
GST_OBJECT_UNLOCK (pool);
GST_LOG_OBJECT (pool, "acquire buffer %p", *buffer);
GST_LOG_OBJECT (pool, "acquired gstbuffer %p", *buffer);
return GST_FLOW_OK;
@ -253,6 +259,28 @@ static void
release_buffer (GstBufferPool * pool, GstBuffer *buffer)
{
GST_LOG_OBJECT (pool, "release buffer %p", buffer);
GstPipeWirePoolData *data = gst_pipewire_pool_get_data(buffer);
if (!data->queued && data->b != NULL)
{
GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool);
GST_OBJECT_LOCK (pool);
g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&p->stream);
int res;
pw_thread_loop_lock (s->core->loop);
if ((res = pw_stream_return_buffer (s->pwstream, data->b)) < 0) {
GST_ERROR_OBJECT (pool,"can't return buffer %p; gstbuffer : %p, %s",data->b, buffer, spa_strerror(res));
} else {
data->queued = TRUE;
GST_DEBUG_OBJECT (pool, "returned buffer %p; gstbuffer:%p", data->b, buffer);
}
pw_thread_loop_unlock (s->core->loop);
GST_OBJECT_UNLOCK (pool);
}
}
static gboolean

View file

@ -120,6 +120,8 @@ static GstCaps *gst_pipewire_sink_sink_fixate (GstBaseSink * bsink,
static GstFlowReturn gst_pipewire_sink_render (GstBaseSink * psink,
GstBuffer * buffer);
static gboolean gst_pipewire_sink_event (GstBaseSink *sink, GstEvent *event);
static GstClock *
gst_pipewire_sink_provide_clock (GstElement * elem)
{
@ -272,6 +274,7 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass)
gstbasesink_class->fixate = gst_pipewire_sink_sink_fixate;
gstbasesink_class->propose_allocation = gst_pipewire_sink_propose_allocation;
gstbasesink_class->render = gst_pipewire_sink_render;
gstbasesink_class->event = gst_pipewire_sink_event;
GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0,
"PipeWire Sink");
@ -627,7 +630,10 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
}
if ((res = pw_stream_queue_buffer (stream->pwstream, data->b)) < 0) {
g_warning ("can't send buffer %s", spa_strerror(res));
GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res));
} else {
data->queued = TRUE;
GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer);
}
switch (pwsink->slave_method) {
@ -986,3 +992,31 @@ open_failed:
return GST_STATE_CHANGE_FAILURE;
}
}
static gboolean gst_pipewire_sink_event (GstBaseSink *sink, GstEvent *event) {
GstPipeWireSink *pw_sink = GST_PIPEWIRE_SINK(sink);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
GST_DEBUG_OBJECT (pw_sink, "flush-start");
pw_thread_loop_lock (pw_sink->stream->core->loop);
pw_stream_set_active(pw_sink->stream->pwstream, false);
pw_stream_flush(pw_sink->stream->pwstream, false);
pw_thread_loop_unlock (pw_sink->stream->core->loop);
break;
}
case GST_EVENT_FLUSH_STOP:
{
GST_DEBUG_OBJECT (pw_sink, "flush-stop");
pw_thread_loop_lock (pw_sink->stream->core->loop);
pw_stream_set_active(pw_sink->stream->pwstream, true);
pw_thread_loop_unlock (pw_sink->stream->core->loop);
break;
}
default:
break;
}
return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
}