gst: copy buffers when they cross a threshold

Track the number of pipewire buffers dequeued and when they cross a 'threshold'
copy the buffers even if the always-copy is disabled. The 'threshold' is a new
pipewiresrc property.
This commit is contained in:
Ashok Sidipotu 2024-03-07 06:43:13 +05:30
parent 46c4776dc2
commit ba37229183
2 changed files with 39 additions and 4 deletions

View file

@ -41,7 +41,8 @@ static GQuark process_mem_data_quark;
GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug);
#define GST_CAT_DEFAULT pipewire_src_debug #define GST_CAT_DEFAULT pipewire_src_debug
#define DEFAULT_ALWAYS_COPY false #define DEFAULT_ALWAYS_COPY false
#define DEFAULT_DEQUEUED_BUFFERS_THRESHOLD INT32_MAX
#define DEFAULT_MIN_BUFFERS 8 #define DEFAULT_MIN_BUFFERS 8
#define DEFAULT_MAX_BUFFERS INT32_MAX #define DEFAULT_MAX_BUFFERS INT32_MAX
#define DEFAULT_RESEND_LAST false #define DEFAULT_RESEND_LAST false
@ -63,6 +64,7 @@ enum
PROP_RESEND_LAST, PROP_RESEND_LAST,
PROP_KEEPALIVE_TIME, PROP_KEEPALIVE_TIME,
PROP_AUTOCONNECT, PROP_AUTOCONNECT,
PROP_DEQUEUED_BUFFERS_THRESHOLD,
}; };
@ -134,6 +136,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id,
pwsrc->always_copy = g_value_get_boolean (value); pwsrc->always_copy = g_value_get_boolean (value);
break; break;
case PROP_DEQUEUED_BUFFERS_THRESHOLD:
pwsrc->dequeued_buffers_threshold = g_value_get_int(value);
break;
case PROP_MIN_BUFFERS: case PROP_MIN_BUFFERS:
pwsrc->min_buffers = g_value_get_int (value); pwsrc->min_buffers = g_value_get_int (value);
break; break;
@ -195,6 +201,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id,
g_value_set_boolean (value, pwsrc->always_copy); g_value_set_boolean (value, pwsrc->always_copy);
break; break;
case PROP_DEQUEUED_BUFFERS_THRESHOLD:
g_value_set_int (value, pwsrc->dequeued_buffers_threshold);
break;
case PROP_MIN_BUFFERS: case PROP_MIN_BUFFERS:
g_value_set_int (value, pwsrc->min_buffers); g_value_set_int (value, pwsrc->min_buffers);
break; break;
@ -343,6 +353,15 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
G_PARAM_READWRITE | G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS)); G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_DEQUEUED_BUFFERS_THRESHOLD,
g_param_spec_int("dequeued-buffers-threshold",
"Dequeued buffers threshold",
"buffers dequeued beyond this limit will always be copied",
1, G_MAXINT, DEFAULT_DEQUEUED_BUFFERS_THRESHOLD,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, g_object_class_install_property (gobject_class,
PROP_MIN_BUFFERS, PROP_MIN_BUFFERS,
g_param_spec_int ("min-buffers", g_param_spec_int ("min-buffers",
@ -438,6 +457,8 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->always_copy = DEFAULT_ALWAYS_COPY; src->always_copy = DEFAULT_ALWAYS_COPY;
src->min_buffers = DEFAULT_MIN_BUFFERS; src->min_buffers = DEFAULT_MIN_BUFFERS;
src->max_buffers = DEFAULT_MAX_BUFFERS; src->max_buffers = DEFAULT_MAX_BUFFERS;
src->dequeued_buffers_threshold = DEFAULT_DEQUEUED_BUFFERS_THRESHOLD;
src->dequeued_buffers = 0;
src->fd = -1; src->fd = -1;
src->resend_last = DEFAULT_RESEND_LAST; src->resend_last = DEFAULT_RESEND_LAST;
src->keepalive_time = DEFAULT_KEEPALIVE_TIME; src->keepalive_time = DEFAULT_KEEPALIVE_TIME;
@ -479,8 +500,10 @@ buffer_recycle (GstMiniObject *obj)
if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0)
GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res));
else else {
GST_LOG_OBJECT (src, "recycle buffer %p", obj); GST_LOG_OBJECT (src, "recycle buffer %p", obj);
src->dequeued_buffers--;
}
pw_thread_loop_unlock (src->core->loop); pw_thread_loop_unlock (src->core->loop);
@ -551,12 +574,15 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
struct spa_meta_region *crop; struct spa_meta_region *crop;
struct spa_meta_videotransform *videotransform; struct spa_meta_videotransform *videotransform;
guint i; guint i;
struct spa_meta_busy *busy;
gboolean is_dq_threshold_reached = FALSE;
b = pw_stream_dequeue_buffer (pwsrc->stream); b = pw_stream_dequeue_buffer (pwsrc->stream);
if (b == NULL) if (b == NULL)
return NULL; return NULL;
data = b->user_data; data = b->user_data;
busy = spa_buffer_find_meta_data(b->buffer, SPA_META_Busy, sizeof(struct spa_meta_busy));
if (!GST_IS_BUFFER (data->buf)) { if (!GST_IS_BUFFER (data->buf)) {
GST_ERROR_OBJECT (pwsrc, "stream buffer %p is missing", data->buf); GST_ERROR_OBJECT (pwsrc, "stream buffer %p is missing", data->buf);
@ -572,6 +598,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
buf = gst_buffer_new (); buf = gst_buffer_new ();
pwsrc->dequeued_buffers++;
data->queued = FALSE; data->queued = FALSE;
GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
@ -638,12 +666,16 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
} }
} }
if ((pwsrc->dequeued_buffers_threshold != DEFAULT_DEQUEUED_BUFFERS_THRESHOLD) &&
(pwsrc->dequeued_buffers > pwsrc->dequeued_buffers_threshold))
is_dq_threshold_reached = TRUE;
for (i = 0; i < b->buffer->n_datas; i++) { for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i]; struct spa_data *d = &b->buffer->datas[i];
GstMemory *pmem = gst_buffer_peek_memory (data->buf, i); GstMemory *pmem = gst_buffer_peek_memory (data->buf, i);
if (pmem) { if (pmem) {
GstMemory *mem; GstMemory *mem;
if (!pwsrc->always_copy) if (busy && !pwsrc->always_copy && !is_dq_threshold_reached)
mem = gst_memory_share (pmem, d->chunk->offset, d->chunk->size); mem = gst_memory_share (pmem, d->chunk->offset, d->chunk->size);
else else
mem = gst_memory_copy (pmem, d->chunk->offset, d->chunk->size); mem = gst_memory_copy (pmem, d->chunk->offset, d->chunk->size);
@ -652,7 +684,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
if (d->chunk->flags & SPA_CHUNK_FLAG_CORRUPTED) if (d->chunk->flags & SPA_CHUNK_FLAG_CORRUPTED)
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_CORRUPTED); GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_CORRUPTED);
} }
if (!pwsrc->always_copy) if (busy && !pwsrc->always_copy && !is_dq_threshold_reached)
gst_buffer_add_parent_buffer_meta (buf, data->buf); gst_buffer_add_parent_buffer_meta (buf, data->buf);
gst_buffer_unref (data->buf); gst_buffer_unref (data->buf);
return buf; return buf;

View file

@ -83,6 +83,9 @@ struct _GstPipeWireSrc {
GstClockTime last_time; GstClockTime last_time;
enum spa_meta_videotransform_value transform_value; enum spa_meta_videotransform_value transform_value;
gint dequeued_buffers;
gint dequeued_buffers_threshold;
}; };
struct _GstPipeWireSrcClass { struct _GstPipeWireSrcClass {