Merge branch 'zerocopy' into 'master'

Slow consumer hangs the graph.

See merge request pipewire/pipewire!1917
This commit is contained in:
Ashok Sidipotu 2024-06-11 07:57:36 +00:00
commit d4b2217ce5
3 changed files with 50 additions and 4 deletions

View file

@ -150,6 +150,7 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool);
GstPipeWirePoolData *data;
struct pw_buffer *b;
guint retry = 0;
GST_OBJECT_LOCK (pool);
while (TRUE) {
@ -158,6 +159,16 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
if ((b = pw_stream_dequeue_buffer(p->stream)))
break;
else if (errno == EBUSY) {
if (!retry) {
struct pw_time time;
pw_stream_get_time_n (p->stream, &time, sizeof(time));
retry = time.avail_buffers + time.queued_buffers + 1;
}
if(--retry)
continue;
}
if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT))
goto no_more_buffers;

View file

@ -40,7 +40,8 @@ static GQuark process_mem_data_quark;
GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug);
#define GST_CAT_DEFAULT pipewire_src_debug
#define DEFAULT_ALWAYS_COPY false
#define DEFAULT_ALWAYS_COPY false
#define DEFAULT_DEQUEUED_BUFFERS_THRESHOLD INT32_MAX
#define DEFAULT_MIN_BUFFERS 8
#define DEFAULT_MAX_BUFFERS INT32_MAX
#define DEFAULT_RESEND_LAST false
@ -62,6 +63,7 @@ enum
PROP_RESEND_LAST,
PROP_KEEPALIVE_TIME,
PROP_AUTOCONNECT,
PROP_DEQUEUED_BUFFERS_THRESHOLD,
};
@ -133,6 +135,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id,
pwsrc->always_copy = g_value_get_boolean (value);
break;
case PROP_DEQUEUED_BUFFERS_THRESHOLD:
pwsrc->dequeued_buffers_threshold = g_value_get_int(value);
break;
case PROP_MIN_BUFFERS:
pwsrc->min_buffers = g_value_get_int (value);
break;
@ -194,6 +200,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id,
g_value_set_boolean (value, pwsrc->always_copy);
break;
case PROP_DEQUEUED_BUFFERS_THRESHOLD:
g_value_set_int (value, pwsrc->dequeued_buffers_threshold);
break;
case PROP_MIN_BUFFERS:
g_value_set_int (value, pwsrc->min_buffers);
break;
@ -342,6 +352,15 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_DEQUEUED_BUFFERS_THRESHOLD,
g_param_spec_int("dequeued-buffers-threshold",
"Dequeued buffers threshold",
"buffers dequeued beyond this limit will always be copied",
1, G_MAXINT, DEFAULT_DEQUEUED_BUFFERS_THRESHOLD,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_MIN_BUFFERS,
g_param_spec_int ("min-buffers",
@ -437,6 +456,8 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->always_copy = DEFAULT_ALWAYS_COPY;
src->min_buffers = DEFAULT_MIN_BUFFERS;
src->max_buffers = DEFAULT_MAX_BUFFERS;
src->dequeued_buffers_threshold = DEFAULT_DEQUEUED_BUFFERS_THRESHOLD;
src->dequeued_buffers = 0;
src->fd = -1;
src->resend_last = DEFAULT_RESEND_LAST;
src->keepalive_time = DEFAULT_KEEPALIVE_TIME;
@ -480,8 +501,10 @@ buffer_recycle (GstMiniObject *obj)
if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0)
GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res));
else
else {
GST_LOG_OBJECT (src, "recycle buffer %p", obj);
src->dequeued_buffers--;
}
pw_thread_loop_unlock (src->core->loop);
@ -553,12 +576,15 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
struct spa_meta_videotransform *videotransform;
struct pw_time time;
guint i;
struct spa_meta_busy *busy;
gboolean is_dq_threshold_reached = FALSE;
b = pw_stream_dequeue_buffer (pwsrc->stream);
if (b == NULL)
return NULL;
data = b->user_data;
busy = spa_buffer_find_meta_data(b->buffer, SPA_META_Busy, sizeof(struct spa_meta_busy));
if (!GST_IS_BUFFER (data->buf)) {
GST_ERROR_OBJECT (pwsrc, "stream buffer %p is missing", data->buf);
@ -585,6 +611,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
buf = gst_buffer_new ();
pwsrc->dequeued_buffers++;
data->queued = FALSE;
GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
@ -665,12 +693,16 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
}
}
if ((pwsrc->dequeued_buffers_threshold != DEFAULT_DEQUEUED_BUFFERS_THRESHOLD) &&
(pwsrc->dequeued_buffers > pwsrc->dequeued_buffers_threshold))
is_dq_threshold_reached = TRUE;
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *pmem = gst_buffer_peek_memory (data->buf, i);
if (pmem) {
GstMemory *mem;
if (!pwsrc->always_copy)
if (busy && !pwsrc->always_copy && !is_dq_threshold_reached)
mem = gst_memory_share (pmem, d->chunk->offset, d->chunk->size);
else
mem = gst_memory_copy (pmem, d->chunk->offset, d->chunk->size);
@ -679,7 +711,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
if (d->chunk->flags & SPA_CHUNK_FLAG_CORRUPTED)
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_CORRUPTED);
}
if (!pwsrc->always_copy)
if (busy && !pwsrc->always_copy && !is_dq_threshold_reached)
gst_buffer_add_parent_buffer_meta (buf, data->buf);
gst_buffer_unref (data->buf);
return buf;

View file

@ -90,6 +90,9 @@ struct _GstPipeWireSrc {
GstClockTime last_time;
enum spa_meta_videotransform_value transform_value;
gint dequeued_buffers;
gint dequeued_buffers_threshold;
};
struct _GstPipeWireSrcClass {