From ba37229183b5d246b2ade8affb8c6b58a98c265d Mon Sep 17 00:00:00 2001 From: Ashok Sidipotu Date: Thu, 7 Mar 2024 06:43:13 +0530 Subject: [PATCH] gst: copy buffers when they cross a threshold Track the number of pipewire buffers dequeued and when they cross a 'threshold' copy the buffers even if the always-copy is disabled. The 'threshold' is a new pipewiresrc property. --- src/gst/gstpipewiresrc.c | 40 ++++++++++++++++++++++++++++++++++++---- src/gst/gstpipewiresrc.h | 3 +++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 0514e4caa..ea2ac7ebe 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -41,7 +41,8 @@ static GQuark process_mem_data_quark; GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define GST_CAT_DEFAULT pipewire_src_debug -#define DEFAULT_ALWAYS_COPY false +#define DEFAULT_ALWAYS_COPY false +#define DEFAULT_DEQUEUED_BUFFERS_THRESHOLD INT32_MAX #define DEFAULT_MIN_BUFFERS 8 #define DEFAULT_MAX_BUFFERS INT32_MAX #define DEFAULT_RESEND_LAST false @@ -63,6 +64,7 @@ enum PROP_RESEND_LAST, PROP_KEEPALIVE_TIME, PROP_AUTOCONNECT, + PROP_DEQUEUED_BUFFERS_THRESHOLD, }; @@ -134,6 +136,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->always_copy = g_value_get_boolean (value); break; + case PROP_DEQUEUED_BUFFERS_THRESHOLD: + pwsrc->dequeued_buffers_threshold = g_value_get_int(value); + break; + case PROP_MIN_BUFFERS: pwsrc->min_buffers = g_value_get_int (value); break; @@ -195,6 +201,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, pwsrc->always_copy); break; + case PROP_DEQUEUED_BUFFERS_THRESHOLD: + g_value_set_int (value, pwsrc->dequeued_buffers_threshold); + break; + case PROP_MIN_BUFFERS: g_value_set_int (value, pwsrc->min_buffers); break; @@ -343,6 +353,15 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_DEQUEUED_BUFFERS_THRESHOLD, + g_param_spec_int("dequeued-buffers-threshold", + "Dequeued buffers threshold", + "buffers dequeued beyond this limit will always be copied", + 1, G_MAXINT, DEFAULT_DEQUEUED_BUFFERS_THRESHOLD, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MIN_BUFFERS, g_param_spec_int ("min-buffers", @@ -438,6 +457,8 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->always_copy = DEFAULT_ALWAYS_COPY; src->min_buffers = DEFAULT_MIN_BUFFERS; src->max_buffers = DEFAULT_MAX_BUFFERS; + src->dequeued_buffers_threshold = DEFAULT_DEQUEUED_BUFFERS_THRESHOLD; + src->dequeued_buffers = 0; src->fd = -1; src->resend_last = DEFAULT_RESEND_LAST; src->keepalive_time = DEFAULT_KEEPALIVE_TIME; @@ -479,8 +500,10 @@ buffer_recycle (GstMiniObject *obj) if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); - else + else { GST_LOG_OBJECT (src, "recycle buffer %p", obj); + src->dequeued_buffers--; + } pw_thread_loop_unlock (src->core->loop); @@ -551,12 +574,15 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) struct spa_meta_region *crop; struct spa_meta_videotransform *videotransform; guint i; + struct spa_meta_busy *busy; + gboolean is_dq_threshold_reached = FALSE; b = pw_stream_dequeue_buffer (pwsrc->stream); if (b == NULL) return NULL; data = b->user_data; + busy = spa_buffer_find_meta_data(b->buffer, SPA_META_Busy, sizeof(struct spa_meta_busy)); if (!GST_IS_BUFFER (data->buf)) { GST_ERROR_OBJECT (pwsrc, "stream buffer %p is missing", data->buf); @@ -572,6 +598,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) buf = gst_buffer_new (); + pwsrc->dequeued_buffers++; + data->queued = FALSE; GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; @@ -638,12 +666,16 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) } } + if ((pwsrc->dequeued_buffers_threshold != DEFAULT_DEQUEUED_BUFFERS_THRESHOLD) && + (pwsrc->dequeued_buffers > pwsrc->dequeued_buffers_threshold)) + is_dq_threshold_reached = TRUE; + for (i = 0; i < b->buffer->n_datas; i++) { struct spa_data *d = &b->buffer->datas[i]; GstMemory *pmem = gst_buffer_peek_memory (data->buf, i); if (pmem) { GstMemory *mem; - if (!pwsrc->always_copy) + if (busy && !pwsrc->always_copy && !is_dq_threshold_reached) mem = gst_memory_share (pmem, d->chunk->offset, d->chunk->size); else mem = gst_memory_copy (pmem, d->chunk->offset, d->chunk->size); @@ -652,7 +684,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) if (d->chunk->flags & SPA_CHUNK_FLAG_CORRUPTED) GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_CORRUPTED); } - if (!pwsrc->always_copy) + if (busy && !pwsrc->always_copy && !is_dq_threshold_reached) gst_buffer_add_parent_buffer_meta (buf, data->buf); gst_buffer_unref (data->buf); return buf; diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index 97f636fb3..50471c51b 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -83,6 +83,9 @@ struct _GstPipeWireSrc { GstClockTime last_time; enum spa_meta_videotransform_value transform_value; + + gint dequeued_buffers; + gint dequeued_buffers_threshold; }; struct _GstPipeWireSrcClass {