gst: add rate control to the sink

Track the elapsed time between buffers and try to keep the buffer fill
level constant by changing the rate of the stream.

See #4374
This commit is contained in:
Wim Taymans 2024-11-26 17:45:41 +01:00
parent 22f0b858b1
commit 9419a12e74
5 changed files with 66 additions and 7 deletions

View file

@ -26,6 +26,7 @@
#include <spa/pod/builder.h> #include <spa/pod/builder.h>
#include <spa/utils/result.h> #include <spa/utils/result.h>
#include <spa/utils/dll.h>
#include <gst/video/video.h> #include <gst/video/video.h>
@ -481,14 +482,13 @@ static void
do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
{ {
GstPipeWirePoolData *data; GstPipeWirePoolData *data;
GstPipeWireStream *stream = pwsink->stream;
gboolean res; gboolean res;
guint i; guint i;
struct spa_buffer *b; struct spa_buffer *b;
data = gst_pipewire_pool_get_data(buffer); data = gst_pipewire_pool_get_data(buffer);
GST_LOG_OBJECT (pwsink, "queue buffer %p, pw_buffer %p", buffer, data->b);
b = data->b->buffer; b = data->b->buffer;
if (data->header) { if (data->header) {
@ -508,12 +508,15 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
data->crop->region.size.height = meta->width; data->crop->region.size.height = meta->width;
} }
} }
data->b->size = 0;
for (i = 0; i < b->n_datas; i++) { for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i]; struct spa_data *d = &b->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i); GstMemory *mem = gst_buffer_peek_memory (buffer, i);
d->chunk->offset = mem->offset; d->chunk->offset = mem->offset;
d->chunk->size = mem->size; d->chunk->size = mem->size;
d->chunk->stride = pwsink->stream->pool->video_info.stride[i]; d->chunk->stride = stream->pool->video_info.stride[i];
data->b->size += mem->size / 4;
} }
GstVideoMeta *meta = gst_buffer_get_video_meta (buffer); GstVideoMeta *meta = gst_buffer_get_video_meta (buffer);
@ -532,9 +535,47 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
} }
} }
if ((res = pw_stream_queue_buffer (pwsink->stream->pwstream, data->b)) < 0) { if ((res = pw_stream_queue_buffer (stream->pwstream, data->b)) < 0) {
g_warning ("can't send buffer %s", spa_strerror(res)); g_warning ("can't send buffer %s", spa_strerror(res));
} }
if (pwsink->rate_match) {
double err, corr;
struct pw_time ts;
guint64 queued, now, elapsed, target;
pw_stream_get_time_n(stream->pwstream, &ts, sizeof(ts));
now = pw_stream_get_nsec(stream->pwstream);
if (ts.now != 0)
elapsed = gst_util_uint64_scale_int (now - ts.now, ts.rate.denom, GST_SECOND * ts.rate.num);
else
elapsed = 0;
queued = ts.queued - ts.size;
target = elapsed;
err = ((gint64)queued - ((gint64)target));
corr = spa_dll_update(&stream->dll, SPA_CLAMPD(err, -128.0, 128.0));
stream->err_wdw = (double)ts.rate.denom/ts.size;
double avg = (stream->err_avg * stream->err_wdw + (err - stream->err_avg)) / (stream->err_wdw + 1.0);
stream->err_var = (stream->err_var * stream->err_wdw +
(err - stream->err_avg) * (err - avg)) / (stream->err_wdw + 1.0);
stream->err_avg = avg;
if (stream->last_ts == 0 || stream->last_ts + SPA_NSEC_PER_SEC < now) {
stream->last_ts = now;
spa_dll_set_bw(&stream->dll, SPA_CLAMPD(fabs(stream->err_avg) / sqrt(fabs(stream->err_var)), 0.001, SPA_DLL_BW_MAX),
ts.size, ts.rate.denom);
GST_INFO_OBJECT (pwsink, "queue buffer %p, pw_buffer %p q:%"PRIi64"/%"PRIi64" e:%"PRIu64
" err:%+03f corr:%f %f %f %f",
buffer, data->b, ts.queued, ts.size, elapsed, err, corr,
stream->err_avg, stream->err_var, stream->dll.bw);
}
pw_stream_set_rate (stream->pwstream, corr);
}
} }
@ -613,9 +654,16 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
pwsink = GST_PIPEWIRE_SINK (bsink); pwsink = GST_PIPEWIRE_SINK (bsink);
s = gst_caps_get_structure (caps, 0); s = gst_caps_get_structure (caps, 0);
rate = 0; if (gst_structure_has_name (s, "audio/x-raw")) {
if (gst_structure_has_name (s, "audio/x-raw"))
gst_structure_get_int (s, "rate", &rate); gst_structure_get_int (s, "rate", &rate);
pwsink->rate = rate;
pwsink->rate_match = true;
} else {
pwsink->rate = rate = 0;
pwsink->rate_match = false;
}
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate);
possible = gst_caps_to_format_all (caps); possible = gst_caps_to_format_all (caps);

View file

@ -50,6 +50,8 @@ struct _GstPipeWireSink {
/* video state */ /* video state */
gboolean negotiated; gboolean negotiated;
gboolean rate_match;
gint rate;
GstPipeWireSinkMode mode; GstPipeWireSinkMode mode;
}; };

View file

@ -19,6 +19,7 @@ gst_pipewire_stream_init (GstPipeWireStream * self)
self->fd = -1; self->fd = -1;
self->client_name = g_strdup (pw_get_client_name()); self->client_name = g_strdup (pw_get_client_name());
self->pool = gst_pipewire_pool_new (self); self->pool = gst_pipewire_pool_new (self);
spa_dll_init(&self->dll);
} }
static void static void

View file

@ -11,6 +11,7 @@
#include "gstpipewirecore.h" #include "gstpipewirecore.h"
#include <gst/gst.h> #include <gst/gst.h>
#include <spa/utils/dll.h>
#include <pipewire/pipewire.h> #include <pipewire/pipewire.h>
G_BEGIN_DECLS G_BEGIN_DECLS
@ -29,6 +30,13 @@ struct _GstPipeWireStream {
GstPipeWirePool *pool; GstPipeWirePool *pool;
GstClock *clock; GstClock *clock;
guint64 position;
struct spa_dll dll;
double err_avg, err_var, err_wdw;
guint64 last_ts;
guint64 base_buffer_ts;
guint64 base_ts;
/* the actual pw stream */ /* the actual pw stream */
struct pw_stream *pwstream; struct pw_stream *pwstream;
struct spa_hook pwstream_listener; struct spa_hook pwstream_listener;

View file

@ -27,7 +27,7 @@ pipewire_gst_headers = [
pipewire_gst = shared_library('gstpipewire', pipewire_gst = shared_library('gstpipewire',
pipewire_gst_sources, pipewire_gst_sources,
include_directories : [ configinc ], include_directories : [ configinc ],
dependencies : [ spa_dep, gst_dep, pipewire_dep ], dependencies : [ spa_dep, gst_dep, pipewire_dep, mathlib ],
install : true, install : true,
install_dir : '@0@/gstreamer-1.0'.format(get_option('libdir')), install_dir : '@0@/gstreamer-1.0'.format(get_option('libdir')),
) )