Merge branch 'sklug/upstream/fix-stream-synchronization-v1' into 'master'

RFC: Improve/fix timestamping issues with gstpipewiresrc

See merge request pipewire/pipewire!2786
This commit is contained in:
Stefan Klug 2026-04-30 14:53:23 +00:00
commit 7e505e93d6
3 changed files with 46 additions and 70 deletions

View file

@ -34,16 +34,17 @@ gst_pipewire_clock_get_internal_time (GstClock * clock)
GstClockTime result;
uint64_t now;
if (G_UNLIKELY (!s))
return pclock->last_time;
now = pw_stream_get_nsec(s->pwstream);
if (G_UNLIKELY (!s))
return pclock->last_time ? pclock->last_time : now;
#if 1
struct pw_time t;
if (s->pwstream == NULL ||
pw_stream_get_time_n (s->pwstream, &t, sizeof(t)) < 0 ||
t.rate.denom == 0)
return pclock->last_time;
return pclock->last_time ? pclock->last_time : now;
result = gst_util_uint64_scale (t.ticks, GST_SECOND * t.rate.num, t.rate.denom);
result += now - t.now;

View file

@ -116,8 +116,6 @@ static gboolean gst_pipewire_src_start (GstBaseSrc * basesrc);
static gboolean gst_pipewire_src_stop (GstBaseSrc * basesrc);
static gboolean gst_pipewire_src_event (GstBaseSrc * src, GstEvent * event);
static gboolean gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query);
static void gst_pipewire_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
GstClockTime * start, GstClockTime * end);
static void
gst_pipewire_src_set_property (GObject * object, guint prop_id,
@ -501,7 +499,6 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
gstbasesrc_class->stop = gst_pipewire_src_stop;
gstbasesrc_class->event = gst_pipewire_src_event;
gstbasesrc_class->query = gst_pipewire_src_query;
gstbasesrc_class->get_times = gst_pipewire_src_get_times;
gstpushsrc_class->create = gst_pipewire_src_create;
GST_DEBUG_CATEGORY_INIT (pipewire_src_debug, "pipewiresrc", 0,
@ -531,7 +528,6 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->autoconnect = DEFAULT_AUTOCONNECT;
src->min_latency = 0;
src->max_latency = GST_CLOCK_TIME_NONE;
src->last_buffer_clock_time = GST_CLOCK_TIME_NONE;
src->n_buffers = 0;
src->flushing_on_remove_buffer = FALSE;
src->on_disconnect = DEFAULT_ON_DISCONNECT;
@ -1523,40 +1519,13 @@ gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query)
return res;
}
static void
gst_pipewire_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
GstClockTime * start, GstClockTime * end)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
/* for live sources, sync on the timestamp of the buffer */
if (gst_base_src_is_live (basesrc)) {
GstClockTime timestamp = GST_BUFFER_PTS (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* get duration to calculate end time */
GstClockTime duration = GST_BUFFER_DURATION (buffer);
if (GST_CLOCK_TIME_IS_VALID (duration)) {
*end = timestamp + duration;
}
*start = timestamp;
}
} else {
*start = GST_CLOCK_TIME_NONE;
*end = GST_CLOCK_TIME_NONE;
}
GST_LOG_OBJECT (pwsrc, "start %" GST_TIME_FORMAT " (%" G_GUINT64_FORMAT
"), end %" GST_TIME_FORMAT " (%" G_GUINT64_FORMAT ")",
GST_TIME_ARGS (*start), *start, GST_TIME_ARGS (*end), *end);
}
static GstFlowReturn
gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
{
GstPipeWireSrc *pwsrc;
GstClockTime pts, dts, base_time;
const char *error = NULL;
GstClock *clock;
GstBuffer *buf;
gboolean update_time = FALSE, timeout = FALSE;
GstCaps *caps = NULL;
@ -1615,21 +1584,12 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf);
if (buf != NULL) {
if (pwsrc->resend_last || pwsrc->keepalive_time > 0) {
GstClock *clock;
GstBuffer *old;
old = pwsrc->last_buffer;
pwsrc->last_buffer = gst_buffer_copy (buf);
gst_buffer_unref (old);
gst_buffer_add_parent_buffer_meta (pwsrc->last_buffer, buf);
clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (clock != NULL) {
pwsrc->last_buffer_clock_time = gst_clock_get_time (clock);
gst_object_unref (clock);
} else {
pwsrc->last_buffer_clock_time = GST_CLOCK_TIME_NONE;
}
}
break;
}
@ -1652,38 +1612,52 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
pw_thread_loop_unlock (pwsrc->stream->core->loop);
*buffer = buf;
clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (update_time) {
GstClock *clock;
GstClockTime current_clock_time;
clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc));
if (clock != NULL) {
current_clock_time = gst_clock_get_time (clock);
pts = dts = gst_clock_get_time (clock);
gst_object_unref (clock);
} else {
current_clock_time = GST_CLOCK_TIME_NONE;
pts = dts = GST_CLOCK_TIME_NONE;
}
if (GST_CLOCK_TIME_IS_VALID (current_clock_time) &&
GST_CLOCK_TIME_IS_VALID (pwsrc->last_buffer_clock_time) &&
GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (*buffer)) &&
GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (*buffer))) {
GstClockTime diff;
diff = current_clock_time - pwsrc->last_buffer_clock_time;
GST_BUFFER_PTS (*buffer) += diff;
GST_BUFFER_DTS (*buffer) += diff;
} else {
GST_BUFFER_PTS (*buffer) = GST_BUFFER_DTS (*buffer) = current_clock_time;
}
GST_LOG_OBJECT (pwsrc, "Sending keepalive buffer pts/dts: %" GST_TIME_FORMAT
" (%" G_GUINT64_FORMAT ")", GST_TIME_ARGS (current_clock_time),
current_clock_time);
GST_LOG_OBJECT (pwsrc, "Sending keepalive buffer");
} else {
pts = GST_BUFFER_PTS (*buffer);
dts = GST_BUFFER_DTS (*buffer);
}
/*
* We need to map the pipwire time to gstreamer time. If the gstreamer clock
* is provided by us, we can safely use the base_time of the element.
* Otherwise we can not assume that the gstreamer clock is CLOCK_MONOTONIC and
* must therefore fall back to our own base_time. This might introduce a bit
* of jitter.
*/
base_time = 0;
if (pwsrc->is_live) {
if (clock == pwsrc->stream->clock) {
base_time = gst_element_get_base_time (GST_ELEMENT_CAST (pwsrc));
} else {
base_time = pwsrc->pw_base_time;
}
}
if (GST_CLOCK_TIME_IS_VALID (pts))
pts = (pts >= base_time ? pts - base_time : 0);
if (GST_CLOCK_TIME_IS_VALID (dts))
dts = (dts >= base_time ? dts - base_time : 0);
GST_LOG_OBJECT (pwsrc,
"pts %" G_GUINT64_FORMAT ", dts %" G_GUINT64_FORMAT
", base-time %" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
GST_BUFFER_PTS (*buffer), GST_BUFFER_DTS (*buffer), GST_TIME_ARGS (base_time),
GST_TIME_ARGS (pts), GST_TIME_ARGS (dts));
GST_BUFFER_PTS (*buffer) = pts;
GST_BUFFER_DTS (*buffer) = dts;
return GST_FLOW_OK;
not_negotiated:
@ -1781,6 +1755,7 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition)
GST_DEBUG_OBJECT (this, "activating stream");
pw_thread_loop_lock (this->stream->core->loop);
this->pw_base_time = pw_stream_get_nsec (this->stream->pwstream);
pw_stream_set_active (this->stream->pwstream, true);
/* if state have been paused for longer time, the underlying node might
* be moved from idle to suspended, which would mean format cleared via

View file

@ -79,11 +79,11 @@ struct _GstPipeWireSrc {
gboolean is_live;
int64_t delay;
uint64_t pw_base_time;
GstClockTime min_latency;
GstClockTime max_latency;
GstBuffer *last_buffer;
GstClockTime last_buffer_clock_time;
enum spa_meta_videotransform_value transform_value;