diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 79b341e2c..ac8985c29 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -40,6 +40,9 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE #define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define MAX_ERROR_MS 1 +#define RESYNC_TIMEOUT_MS 10 + enum { PROP_0, @@ -377,6 +380,7 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) sink->mode = DEFAULT_PROP_MODE; sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL; sink->is_rawvideo = false; + sink->first_buffer = true; GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK); @@ -690,7 +694,15 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res)); } else { data->queued = TRUE; - GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer); + GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p ",data->b, data->b->size, buffer); + if (pwsink->first_buffer) { + pwsink->first_buffer = false; + pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer); + } + stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts, pwsink->rate, 1 * GST_SECOND); + + // have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable) + stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer), pwsink->rate, 1 * GST_SECOND)); } switch (pwsink->slave_method) { @@ -702,6 +714,52 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } +static void update_time (GstPipeWireSink *pwsink) +{ + struct spa_io_position *p = pwsink->stream->io_position; + double err = 0.0, corr = 1.0; + guint64 now; + double_t max_err = pwsink->rate * MAX_ERROR_MS/1000; + double_t resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000; + + if (pwsink->first_buffer) { + // use the target duration before the first buffer + pwsink->stream->buf_duration = p->clock.target_duration; + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate); + } + + now = pw_stream_get_nsec(pwsink->stream->pwstream); + err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 *GST_SECOND) - + (double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 *GST_SECOND); + + GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %lu next is %lu", err, max_err, now, p->clock.next_nsec); + + if (fabs(err) > max_err) { + if (fabs(err) > resync_timeout) { + GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err); + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate); + err = 0.0; + } else { + err = SPA_CLAMPD(err, -max_err, max_err); + } + } + corr = spa_dll_update(&pwsink->stream->dll, err); + + p->clock.nsec = now; + p->clock.position = pwsink->stream->position; + p->clock.duration = pwsink->stream->buf_duration; + /* we don't have a way to estimate the target (next cycle) buffer duration + * so use the current buffer duration + */ + p->clock.target_duration = pwsink->stream->buf_duration; + p->clock.rate = SPA_FRACTION(1, pwsink->rate); + // current time plus duration scaled with correlation + p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate); + p->clock.rate_diff = corr; + + GST_DEBUG_OBJECT(pwsink, "now %lu, position %lu, duration %lu, rate :%d, next : %lu, delay is %ld, rate_diff is %f", + p->clock.nsec, p->clock.position, p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff); +} static void on_process (void *data) @@ -711,6 +769,24 @@ on_process (void *data) g_cond_signal (&pwsink->stream->pool->cond); } +static int invoke_trigger_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + + GstPipeWireSink *pwsink = user_data; + + + /* Note: We cannot use the rate for computation of other clock params + * in case of video because the rate for video is set as 0 in the _setcaps. + * So skip update time for video (i.e. when rate is 0). The video buffers + * get timestamp from the SPA_META_Header anyway + */ + + if (pwsink->rate) + update_time(pwsink); + return pw_stream_trigger_process(pwsink->stream->pwstream); +} + static void on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) { @@ -726,7 +802,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta break; case PW_STREAM_STATE_STREAMING: if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); break; case PW_STREAM_STATE_ERROR: /* make the error permanent, if it is not already; @@ -1023,7 +1099,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_unref (b); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); } } else { GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool"); @@ -1031,7 +1107,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) do_send_buffer (pwsink, buffer); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); } done_unlock: @@ -1045,6 +1121,19 @@ not_negotiated: } } +static void +on_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + GstPipeWireSink *pwsink = data; + + switch (id) { + case SPA_IO_Position: + GST_DEBUG_OBJECT(pwsink, "got io position %p", area); + pwsink->stream->io_position = area; + break; + } +} + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, @@ -1052,6 +1141,7 @@ static const struct pw_stream_events stream_events = { .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, .process = on_process, + .io_changed = on_io_changed, }; static GstStateChangeReturn diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index f4a961f9a..5816f7a15 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -70,6 +70,8 @@ struct _GstPipeWireSink { gboolean rate_match; gint rate; gboolean is_rawvideo; + gboolean first_buffer; + GstClockTime first_buffer_pts; GstPipeWireSinkMode mode; GstPipeWireSinkSlaveMethod slave_method; diff --git a/src/gst/gstpipewirestream.h b/src/gst/gstpipewirestream.h index a301375c7..23dc996a9 100644 --- a/src/gst/gstpipewirestream.h +++ b/src/gst/gstpipewirestream.h @@ -31,6 +31,7 @@ struct _GstPipeWireStream { GstClock *clock; guint64 position; + guint64 buf_duration; struct spa_dll dll; double err_avg, err_var, err_wdw; guint64 last_ts; @@ -41,6 +42,8 @@ struct _GstPipeWireStream { struct pw_stream *pwstream; struct spa_hook pwstream_listener; + struct spa_io_position *io_position; + /* common properties */ int fd; gchar *path;