gst: sink: update clock before every trigger process

Get the clock pointer using the io_changed stream event.
and update the clock before triggering the process

The clock needs to be updated in the data loop thread
and before triggering the process so move the calls to
`pw_stream_trigger_process` from gstreamer thread context
into the data loop thread context by invoking a callback
and update the clock inside the data loop callback
before the trigger
This commit is contained in:
Taruntej Kanakamalla 2025-04-18 23:01:56 +05:30
parent bca83c8eee
commit d5e2cc94cd
3 changed files with 99 additions and 4 deletions

View file

@ -40,6 +40,9 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug);
#define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE
#define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
#define MAX_ERROR_MS 1
#define RESYNC_TIMEOUT_MS 10
enum
{
PROP_0,
@ -377,6 +380,7 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
sink->mode = DEFAULT_PROP_MODE;
sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL;
sink->is_rawvideo = false;
sink->first_buffer = true;
GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
@ -690,7 +694,15 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res));
} else {
data->queued = TRUE;
GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer);
GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p ",data->b, data->b->size, buffer);
if (pwsink->first_buffer) {
pwsink->first_buffer = false;
pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer);
}
stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts, pwsink->rate, 1 * GST_SECOND);
// have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable)
stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer), pwsink->rate, 1 * GST_SECOND));
}
switch (pwsink->slave_method) {
@ -702,6 +714,52 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
}
}
static void update_time (GstPipeWireSink *pwsink)
{
struct spa_io_position *p = pwsink->stream->io_position;
double err = 0.0, corr = 1.0;
guint64 now;
double_t max_err = pwsink->rate * MAX_ERROR_MS/1000;
double_t resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000;
if (pwsink->first_buffer) {
// use the target duration before the first buffer
pwsink->stream->buf_duration = p->clock.target_duration;
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate);
}
now = pw_stream_get_nsec(pwsink->stream->pwstream);
err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 *GST_SECOND) -
(double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 *GST_SECOND);
GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %lu next is %lu", err, max_err, now, p->clock.next_nsec);
if (fabs(err) > max_err) {
if (fabs(err) > resync_timeout) {
GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err);
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate);
err = 0.0;
} else {
err = SPA_CLAMPD(err, -max_err, max_err);
}
}
corr = spa_dll_update(&pwsink->stream->dll, err);
p->clock.nsec = now;
p->clock.position = pwsink->stream->position;
p->clock.duration = pwsink->stream->buf_duration;
/* we don't have a way to estimate the target (next cycle) buffer duration
* so use the current buffer duration
*/
p->clock.target_duration = pwsink->stream->buf_duration;
p->clock.rate = SPA_FRACTION(1, pwsink->rate);
// current time plus duration scaled with correlation
p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate);
p->clock.rate_diff = corr;
GST_DEBUG_OBJECT(pwsink, "now %lu, position %lu, duration %lu, rate :%d, next : %lu, delay is %ld, rate_diff is %f",
p->clock.nsec, p->clock.position, p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff);
}
static void
on_process (void *data)
@ -711,6 +769,24 @@ on_process (void *data)
g_cond_signal (&pwsink->stream->pool->cond);
}
static int invoke_trigger_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
GstPipeWireSink *pwsink = user_data;
/* Note: We cannot use the rate for computation of other clock params
* in case of video because the rate for video is set as 0 in the _setcaps.
* So skip update time for video (i.e. when rate is 0). The video buffers
* get timestamp from the SPA_META_Header anyway
*/
if (pwsink->rate)
update_time(pwsink);
return pw_stream_trigger_process(pwsink->stream->pwstream);
}
static void
on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error)
{
@ -726,7 +802,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
break;
case PW_STREAM_STATE_STREAMING:
if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream);
pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink);
break;
case PW_STREAM_STATE_ERROR:
/* make the error permanent, if it is not already;
@ -1023,7 +1099,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_buffer_unref (b);
if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream);
pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink);
}
} else {
GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool");
@ -1031,7 +1107,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
do_send_buffer (pwsink, buffer);
if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream);
pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink);
}
done_unlock:
@ -1045,6 +1121,19 @@ not_negotiated:
}
}
static void
on_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
GstPipeWireSink *pwsink = data;
switch (id) {
case SPA_IO_Position:
GST_DEBUG_OBJECT(pwsink, "got io position %p", area);
pwsink->stream->io_position = area;
break;
}
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_state_changed,
@ -1052,6 +1141,7 @@ static const struct pw_stream_events stream_events = {
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.process = on_process,
.io_changed = on_io_changed,
};
static GstStateChangeReturn

View file

@ -70,6 +70,8 @@ struct _GstPipeWireSink {
gboolean rate_match;
gint rate;
gboolean is_rawvideo;
gboolean first_buffer;
GstClockTime first_buffer_pts;
GstPipeWireSinkMode mode;
GstPipeWireSinkSlaveMethod slave_method;

View file

@ -31,6 +31,7 @@ struct _GstPipeWireStream {
GstClock *clock;
guint64 position;
guint64 buf_duration;
struct spa_dll dll;
double err_avg, err_var, err_wdw;
guint64 last_ts;
@ -41,6 +42,8 @@ struct _GstPipeWireStream {
struct pw_stream *pwstream;
struct spa_hook pwstream_listener;
struct spa_io_position *io_position;
/* common properties */
int fd;
gchar *path;