module-rtp: Fix and improve direct timestamp mode and documentation

Direct timestamp mode was incorrectly using over/underrun detection logic
and fill level tracking logic that is actually meant for the other mode
(referred to from now on as "constant latency mode"). Over/underruns are
tracked implicitly in the direct timestamp mode, and the absolute fill
level is not relevant in that mode, since the latency is not needed to
be constant then.

Also improve log lines and the RTP module documentation to define these
buffer modes clearly and explain their differences and use cases.

Opus and MIDI code get TODOs added, since their direct timestamp mode
implementations still may be incorrect. Fixing those will be done in
a separate commit.
This commit is contained in:
Carlos Rafael Giani 2025-07-23 21:05:10 +02:00 committed by Wim Taymans
parent f8b0d0a43c
commit 2bcc8589fa
4 changed files with 218 additions and 76 deletions

View file

@ -31,62 +31,133 @@ static void rtp_audio_process_playback(void *data)
* delay values to 0 (see docs), so do that here. */
device_delay = SPA_MAX(pwt.delay, 0LL);
if (impl->io_position && impl->direct_timestamp) {
/* in direct mode, read directly from the timestamp index,
* because sender and receiver are in sync, this would keep
* target_buffer of samples available. */
/* IMPORTANT: In the explanations below, sometimes, "reading/writing from/to the
* ring buffer at a position X" is mentioned. To be exact, that buffer is actually
* impl->buffer. And since X can be a timestamp whose value is far higher than the
* buffer size (and the fact that impl->buffer is a _ring_ buffer), reads and writes
* actually first apply BUFFER_MASK to the position to implement a ring buffer
* index wrap-around. (Wrap-around when reading / writing the data bytes is
* handled by the spa_ringbuffer code; this is about the wrap around of the
* read or write index itself.) */
/* Shift clock position by stream delay to compensate
* for processing and output delay. */
spa_ringbuffer_read_update(&impl->ring,
impl->io_position->clock.position + device_delay);
}
avail = spa_ringbuffer_get_read_index(&impl->ring, &timestamp);
if (impl->direct_timestamp) {
/* In direct timestamp mode, the focus lies on synchronized playback, not
* on a constant latency. The ring buffer fill level is not of interest
* here. The code in rtp_audio_receive() writes to the ring buffer at
* position (RTP timestamp + target_buffer), just like in the constant
* latency mode. Crucially however, in direct timestamp mode, it is assumed
* that the RTP timestamps are based on the same synchronized clock that
* runs the graph driver here, so the clock position is using the same
* time base as these timestamps.
*
* If the transport delay from the sender to this receiver were zero, then
* the data with the given RTP timestamp could in theory be played right
* away, since that timestamp would equal the clock position (or, in other
* words, it would be the present time). Since the transport takes some
* time, writing the data at the position (RTP timestamp + target_buffer)
* shifts the timestamp into the future sufficiently enough that no data
* is lost. (target_buffer corresponds to the `sess.latency.msec` RTP
* source module option, and that option has to be chosen by the user
* to be of a sensible size - high enough to at least match the maximum
* transport delay, but not too high to not risk too much latency
* Also, `sess.latency.msec` must be the same value across all RTP
* source nodes that shall play in sync.)
*
* When the code here reads from the position defined by the current
* clock position, it is then guaranteed that the data is accessed in
* sync with other RTP source nodes which also run in the direct
* timestamp mode, since all of them shift the timestamp by the same
* `sess.latency.msec` into the future.
*
* "Fill level" makes no sense in this mode, since a constant latency
* is not important in this mode, so no DLL is needed. Also, matching
* the pace of the synchronized clock is done by having the graph
* driver be synchronized to that clock, which will in turn cause
* any output sinks to adjust their DLLs (or similar control loop
* mechanisms) to match the pace of their data consumption with the
* pace of the driver. */
/* Reduce target buffer by the delay amount to start playback sooner.
* This compensates for the delay to the device. */
if (SPA_UNLIKELY(impl->target_buffer < device_delay)) {
pw_log_error("Delay to device (%" PRIu32 ") is higher than "
"the target buffer size (%" PRIu32 ")", device_delay,
impl->target_buffer);
target_buffer = 0;
} else {
target_buffer = impl->target_buffer - device_delay;
}
if (avail < (int32_t)wanted) {
enum spa_log_level level;
memset(d[0].data, 0, wanted * stride);
flags |= SPA_CHUNK_FLAG_EMPTY;
if (impl->have_sync) {
impl->have_sync = false;
level = SPA_LOG_LEVEL_INFO;
if (impl->io_position) {
/* Shift clock position by stream delay to compensate
* for processing and output delay. */
timestamp = impl->io_position->clock.position + device_delay;
spa_ringbuffer_read_update(&impl->ring, timestamp);
} else {
level = SPA_LOG_LEVEL_DEBUG;
/* In the unlikely case that no spa_io_position pointer
* was passed yet by PipeWire to this node, resort to a
* default behavior: just use the current read index.
* This most likely is not in sync with other nodes,
* but _something_ is needed as read index until the
* spa_io_position is available. */
spa_ringbuffer_get_read_index(&impl->ring, &timestamp);
}
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
d[0].data, wanted * stride);
if (!impl->io_position) {
/* In the unlikely case that no spa_io_position pointer
* was passed yet by PipeWire to this node, monotonically
* increment the read index like this to not consume from
* the same position in the ring buffer over and over again. */
timestamp += wanted;
spa_ringbuffer_read_update(&impl->ring, timestamp);
}
pw_log(level, "underrun %d/%u < %u",
avail, target_buffer, wanted);
} else {
double error, corr;
if (impl->first) {
if ((uint32_t)avail > target_buffer) {
uint32_t skip = avail - target_buffer;
pw_log_debug("first: avail:%d skip:%u target:%u",
avail, skip, target_buffer);
timestamp += skip;
/* In the constant delay mode, it is assumed that the ring buffer fill
* level matches impl->target_buffer. If not, check for over- and
* underruns. Adjust the DLL as needed. If the over/underruns are too
* severe, resynchronize. */
avail = spa_ringbuffer_get_read_index(&impl->ring, &timestamp);
/* Reduce target buffer by the delay amount to start playback sooner.
* This compensates for the delay to the device. */
if (SPA_UNLIKELY(impl->target_buffer < device_delay)) {
pw_log_error("Delay to device (%" PRIu32 ") is higher than "
"the target buffer size (%" PRIu32 ")", device_delay,
impl->target_buffer);
target_buffer = 0;
} else {
target_buffer = impl->target_buffer - device_delay;
}
if (avail < (int32_t)wanted) {
enum spa_log_level level;
memset(d[0].data, 0, wanted * stride);
flags |= SPA_CHUNK_FLAG_EMPTY;
if (impl->have_sync) {
impl->have_sync = false;
level = SPA_LOG_LEVEL_INFO;
} else {
level = SPA_LOG_LEVEL_DEBUG;
}
pw_log(level, "receiver read underrun %d/%u < %u",
avail, target_buffer, wanted);
} else {
double error, corr;
if (impl->first) {
if ((uint32_t)avail > target_buffer) {
uint32_t skip = avail - target_buffer;
pw_log_debug("first: avail:%d skip:%u target:%u",
avail, skip, target_buffer);
timestamp += skip;
avail = target_buffer;
}
impl->first = false;
} else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) {
pw_log_warn("receiver read overrun %u > %u", avail, target_buffer * 8);
timestamp += avail - target_buffer;
avail = target_buffer;
}
impl->first = false;
} else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) {
pw_log_warn("overrun %u > %u", avail, target_buffer * 8);
timestamp += avail - target_buffer;
avail = target_buffer;
}
if (!impl->direct_timestamp) {
/* when not using direct timestamp and clocks are not
* in sync, try to adjust our playback rate to keep the
* requested target_buffer bytes in the ringbuffer */
/* when the speed of the sender clock and our clock are
* not in sync, try to adjust our playback rate to keep
* the requested target_buffer bytes in the ringbuffer */
double in_flight = 0;
struct spa_io_position *pos = impl->io_position;
@ -109,16 +180,18 @@ static void rtp_audio_process_playback(void *data)
target_buffer, error, corr);
pw_stream_set_rate(impl->stream, 1.0 / corr);
}
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
d[0].data, wanted * stride);
timestamp += wanted;
spa_ringbuffer_read_update(&impl->ring, timestamp);
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
d[0].data, wanted * stride);
timestamp += wanted;
spa_ringbuffer_read_update(&impl->ring, timestamp);
}
}
d[0].chunk->offset = 0;
d[0].chunk->size = wanted * stride;
d[0].chunk->stride = stride;
@ -157,7 +230,10 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
if (impl->have_seq && impl->seq != seq) {
pw_log_info("unexpected seq (%d != %d) SSRC:%u",
seq, impl->seq, hdr->ssrc);
impl->have_sync = false;
/* No need to resynchronize here. If packets arrive out of
* order, then they are still written in order into the ring
* buffer, since they are written according to where the
* RTP timestamp points to. */
}
impl->seq = seq + 1;
impl->have_seq = true;
@ -195,8 +271,11 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
write, expected_write);
}
if (filled + samples > BUFFER_SIZE / stride) {
pw_log_debug("capture overrun %u + %u > %u", filled, samples,
/* Write overrun only makes sense in constant delay mode. See the
* RTP source module documentation and the rtp_audio_process_playback()
* code for an explanation why. */
if (!impl->direct_timestamp && (filled + samples > BUFFER_SIZE / stride)) {
pw_log_debug("receiver write overrun %u + %u > %u", filled, samples,
BUFFER_SIZE / stride);
impl->have_sync = false;
} else {
@ -206,9 +285,34 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
BUFFER_SIZE,
(write * stride) & BUFFER_MASK,
&buffer[hlen], (samples * stride));
write += samples;
spa_ringbuffer_write_update(&impl->ring, write);
/* Only update the write index if data was actually _appended_.
* If packets arrived out of order, then it may be that parts
* of the ring buffer further ahead were written to first, and
* now, unwritten parts preceding those other parts were now
* written to. For example, if previously, 10 samples were
* written to index 100, even though 10 samples were expected
* to be written at index 90, then there is a "hole" at index
* 90. If now, the packet that contains data for index 90
* arrived, then this data will be _inserted_ at index 90,
* and not _appended_. In this example, `expected_write` would
* be 100 (since `expected_write` is the current write index),
* `write` would be 90, `samples` would be 10. In this case,
* the inequality below does not hold, so data is being
* _inserted_. By contrast, during normal operation, `write`
* and `expected_write` are equal, so the inequality below
* _does_ hold, meaning that data is being appended.
*
* (Note that this write index update is only important if
* the constant delay mode is active, or if no spa_io_position
* was provided yet. See the rtp_audio_process_playback()
* code for more about this.) */
if (expected_write < (write + samples)) {
write += samples;
spa_ringbuffer_write_update(&impl->ring, write);
}
}
return 0;
short_packet:
@ -399,7 +503,7 @@ static void rtp_audio_process_capture(void *data)
pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp);
impl->have_sync = false;
} else if (filled + wanted > (int32_t)SPA_MIN(impl->target_buffer * 8, BUFFER_SIZE / stride)) {
pw_log_warn("overrun %u + %u > %u/%u", filled, wanted,
pw_log_warn("sender write overrun %u + %u > %u/%u", filled, wanted,
impl->target_buffer * 8, BUFFER_SIZE / stride);
impl->have_sync = false;
filled = 0;