Merge branch 'sklug/upstream/fix-stream-synchronization-v1' into 'master'

RFC: Improve/fix timestamping issues with gstpipewiresrc

See merge request pipewire/pipewire!2786
This commit is contained in:
Stefan Klug 2026-05-01 18:48:23 +00:00
commit 10837784b7
3 changed files with 46 additions and 70 deletions

View file

@ -34,16 +34,17 @@ gst_pipewire_clock_get_internal_time (GstClock * clock)
GstClockTime result; GstClockTime result;
uint64_t now; uint64_t now;
if (G_UNLIKELY (!s))
return pclock->last_time;
now = pw_stream_get_nsec(s->pwstream); now = pw_stream_get_nsec(s->pwstream);
if (G_UNLIKELY (!s))
return pclock->last_time ? pclock->last_time : now;
#if 1 #if 1
struct pw_time t; struct pw_time t;
if (s->pwstream == NULL || if (s->pwstream == NULL ||
pw_stream_get_time_n (s->pwstream, &t, sizeof(t)) < 0 || pw_stream_get_time_n (s->pwstream, &t, sizeof(t)) < 0 ||
t.rate.denom == 0) t.rate.denom == 0)
return pclock->last_time; return pclock->last_time ? pclock->last_time : now;
result = gst_util_uint64_scale (t.ticks, GST_SECOND * t.rate.num, t.rate.denom); result = gst_util_uint64_scale (t.ticks, GST_SECOND * t.rate.num, t.rate.denom);
result += now - t.now; result += now - t.now;

View file

@ -116,8 +116,6 @@ static gboolean gst_pipewire_src_start (GstBaseSrc * basesrc);
static gboolean gst_pipewire_src_stop (GstBaseSrc * basesrc); static gboolean gst_pipewire_src_stop (GstBaseSrc * basesrc);
static gboolean gst_pipewire_src_event (GstBaseSrc * src, GstEvent * event); static gboolean gst_pipewire_src_event (GstBaseSrc * src, GstEvent * event);
static gboolean gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query); static gboolean gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query);
static void gst_pipewire_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
GstClockTime * start, GstClockTime * end);
static void static void
gst_pipewire_src_set_property (GObject * object, guint prop_id, gst_pipewire_src_set_property (GObject * object, guint prop_id,
@ -501,7 +499,6 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
gstbasesrc_class->stop = gst_pipewire_src_stop; gstbasesrc_class->stop = gst_pipewire_src_stop;
gstbasesrc_class->event = gst_pipewire_src_event; gstbasesrc_class->event = gst_pipewire_src_event;
gstbasesrc_class->query = gst_pipewire_src_query; gstbasesrc_class->query = gst_pipewire_src_query;
gstbasesrc_class->get_times = gst_pipewire_src_get_times;
gstpushsrc_class->create = gst_pipewire_src_create; gstpushsrc_class->create = gst_pipewire_src_create;
GST_DEBUG_CATEGORY_INIT (pipewire_src_debug, "pipewiresrc", 0, GST_DEBUG_CATEGORY_INIT (pipewire_src_debug, "pipewiresrc", 0,
@ -531,7 +528,6 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->autoconnect = DEFAULT_AUTOCONNECT; src->autoconnect = DEFAULT_AUTOCONNECT;
src->min_latency = 0; src->min_latency = 0;
src->max_latency = GST_CLOCK_TIME_NONE; src->max_latency = GST_CLOCK_TIME_NONE;
src->last_buffer_clock_time = GST_CLOCK_TIME_NONE;
src->n_buffers = 0; src->n_buffers = 0;
src->flushing_on_remove_buffer = FALSE; src->flushing_on_remove_buffer = FALSE;
src->on_disconnect = DEFAULT_ON_DISCONNECT; src->on_disconnect = DEFAULT_ON_DISCONNECT;
@ -1523,40 +1519,13 @@ gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query)
return res; return res;
} }
static void
gst_pipewire_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
GstClockTime * start, GstClockTime * end)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
/* for live sources, sync on the timestamp of the buffer */
if (gst_base_src_is_live (basesrc)) {
GstClockTime timestamp = GST_BUFFER_PTS (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* get duration to calculate end time */
GstClockTime duration = GST_BUFFER_DURATION (buffer);
if (GST_CLOCK_TIME_IS_VALID (duration)) {
*end = timestamp + duration;
}
*start = timestamp;
}
} else {
*start = GST_CLOCK_TIME_NONE;
*end = GST_CLOCK_TIME_NONE;
}
GST_LOG_OBJECT (pwsrc, "start %" GST_TIME_FORMAT " (%" G_GUINT64_FORMAT
"), end %" GST_TIME_FORMAT " (%" G_GUINT64_FORMAT ")",
GST_TIME_ARGS (*start), *start, GST_TIME_ARGS (*end), *end);
}
static GstFlowReturn static GstFlowReturn
gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
{ {
GstPipeWireSrc *pwsrc; GstPipeWireSrc *pwsrc;
GstClockTime pts, dts, base_time;
const char *error = NULL; const char *error = NULL;
GstClock *clock;
GstBuffer *buf; GstBuffer *buf;
gboolean update_time = FALSE, timeout = FALSE; gboolean update_time = FALSE, timeout = FALSE;
GstCaps *caps = NULL; GstCaps *caps = NULL;
@ -1615,21 +1584,12 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf);
if (buf != NULL) { if (buf != NULL) {
if (pwsrc->resend_last || pwsrc->keepalive_time > 0) { if (pwsrc->resend_last || pwsrc->keepalive_time > 0) {
GstClock *clock;
GstBuffer *old; GstBuffer *old;
old = pwsrc->last_buffer; old = pwsrc->last_buffer;
pwsrc->last_buffer = gst_buffer_copy (buf); pwsrc->last_buffer = gst_buffer_copy (buf);
gst_buffer_unref (old); gst_buffer_unref (old);
gst_buffer_add_parent_buffer_meta (pwsrc->last_buffer, buf); gst_buffer_add_parent_buffer_meta (pwsrc->last_buffer, buf);
clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (clock != NULL) {
pwsrc->last_buffer_clock_time = gst_clock_get_time (clock);
gst_object_unref (clock);
} else {
pwsrc->last_buffer_clock_time = GST_CLOCK_TIME_NONE;
}
} }
break; break;
} }
@ -1652,38 +1612,52 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
pw_thread_loop_unlock (pwsrc->stream->core->loop); pw_thread_loop_unlock (pwsrc->stream->core->loop);
*buffer = buf; *buffer = buf;
clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (update_time) { if (update_time) {
GstClock *clock;
GstClockTime current_clock_time;
clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (clock != NULL) { if (clock != NULL) {
current_clock_time = gst_clock_get_time (clock); pts = dts = gst_clock_get_time (clock);
gst_object_unref (clock); gst_object_unref (clock);
} else { } else {
current_clock_time = GST_CLOCK_TIME_NONE; pts = dts = GST_CLOCK_TIME_NONE;
} }
if (GST_CLOCK_TIME_IS_VALID (current_clock_time) && GST_LOG_OBJECT (pwsrc, "Sending keepalive buffer");
GST_CLOCK_TIME_IS_VALID (pwsrc->last_buffer_clock_time) && } else {
GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (*buffer)) && pts = GST_BUFFER_PTS (*buffer);
GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (*buffer))) { dts = GST_BUFFER_DTS (*buffer);
GstClockTime diff;
diff = current_clock_time - pwsrc->last_buffer_clock_time;
GST_BUFFER_PTS (*buffer) += diff;
GST_BUFFER_DTS (*buffer) += diff;
} else {
GST_BUFFER_PTS (*buffer) = GST_BUFFER_DTS (*buffer) = current_clock_time;
}
GST_LOG_OBJECT (pwsrc, "Sending keepalive buffer pts/dts: %" GST_TIME_FORMAT
" (%" G_GUINT64_FORMAT ")", GST_TIME_ARGS (current_clock_time),
current_clock_time);
} }
/*
* We need to map the pipwire time to gstreamer time. If the gstreamer clock
* is provided by us, we can safely use the base_time of the element.
* Otherwise we can not assume that the gstreamer clock is CLOCK_MONOTONIC and
* must therefore fall back to our own base_time. This might introduce a bit
* of jitter.
*/
base_time = 0;
if (pwsrc->is_live) {
if (clock == pwsrc->stream->clock) {
base_time = gst_element_get_base_time (GST_ELEMENT_CAST (pwsrc));
} else {
base_time = pwsrc->pw_base_time;
}
}
if (GST_CLOCK_TIME_IS_VALID (pts))
pts = (pts >= base_time ? pts - base_time : 0);
if (GST_CLOCK_TIME_IS_VALID (dts))
dts = (dts >= base_time ? dts - base_time : 0);
GST_LOG_OBJECT (pwsrc,
"pts %" G_GUINT64_FORMAT ", dts %" G_GUINT64_FORMAT
", base-time %" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
GST_BUFFER_PTS (*buffer), GST_BUFFER_DTS (*buffer), GST_TIME_ARGS (base_time),
GST_TIME_ARGS (pts), GST_TIME_ARGS (dts));
GST_BUFFER_PTS (*buffer) = pts;
GST_BUFFER_DTS (*buffer) = dts;
return GST_FLOW_OK; return GST_FLOW_OK;
not_negotiated: not_negotiated:
@ -1781,6 +1755,7 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition)
GST_DEBUG_OBJECT (this, "activating stream"); GST_DEBUG_OBJECT (this, "activating stream");
pw_thread_loop_lock (this->stream->core->loop); pw_thread_loop_lock (this->stream->core->loop);
this->pw_base_time = pw_stream_get_nsec (this->stream->pwstream);
pw_stream_set_active (this->stream->pwstream, true); pw_stream_set_active (this->stream->pwstream, true);
/* if state have been paused for longer time, the underlying node might /* if state have been paused for longer time, the underlying node might
* be moved from idle to suspended, which would mean format cleared via * be moved from idle to suspended, which would mean format cleared via

View file

@ -79,11 +79,11 @@ struct _GstPipeWireSrc {
gboolean is_live; gboolean is_live;
int64_t delay; int64_t delay;
uint64_t pw_base_time;
GstClockTime min_latency; GstClockTime min_latency;
GstClockTime max_latency; GstClockTime max_latency;
GstBuffer *last_buffer; GstBuffer *last_buffer;
GstClockTime last_buffer_clock_time;
enum spa_meta_videotransform_value transform_value; enum spa_meta_videotransform_value transform_value;