Merge branch 'rtp-module-fixes' into 'master'

Fixes for the RTP module

See merge request pipewire/pipewire!2682
This commit is contained in:
Carlos Rafael Giani 2026-02-03 08:18:20 +00:00
commit 73d0e06aba
3 changed files with 208 additions and 53 deletions

View file

@ -573,6 +573,22 @@ if build_module_zeroconf_discover
endif
summary({'zeroconf-discover': build_module_zeroconf_discover}, bool_yn: true, section: 'Optional Modules')
# Several modules (rtp-sink, rtp-source, raop-sink) use the same code
# for actual RTP transport. To not have to recompile the same code
# multiple times, and to make the build script a little more robust
# (by avoiding build script code duplication), create a static library
# that contains that common code.
pipewire_module_rtp_common_lib = static_library('pipewire-module-rtp-common-lib',
[ 'module-rtp/stream.c' ],
include_directories : [configinc],
install : false,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep],
)
pipewire_module_rtp_common_dep = declare_dependency(
link_with: pipewire_module_rtp_common_lib,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep],
)
build_module_raop_discover = avahi_dep.found()
if build_module_raop_discover
pipewire_module_raop_discover = shared_library('pipewire-module-raop-discover',
@ -605,13 +621,12 @@ build_module_raop = openssl_lib.found()
if build_module_raop
pipewire_module_raop_sink = shared_library('pipewire-module-raop-sink',
[ 'module-raop-sink.c',
'module-raop/rtsp-client.c',
'module-rtp/stream.c' ],
'module-raop/rtsp-client.c' ],
include_directories : [configinc],
install : true,
install_dir : modules_install_dir,
install_rpath: modules_install_dir,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep, openssl_lib],
dependencies : [pipewire_module_rtp_common_dep, openssl_lib],
)
endif
summary({'raop-sink (requires OpenSSL)': build_module_raop}, bool_yn: true, section: 'Optional Modules')
@ -620,36 +635,33 @@ roc_dep = dependency('roc', version: '>= 0.4.0', required: get_option('roc'))
summary({'ROC': roc_dep.found()}, bool_yn: true, section: 'Streaming between daemons')
pipewire_module_rtp_source = shared_library('pipewire-module-rtp-source',
[ 'module-rtp-source.c',
'module-rtp/stream.c' ],
[ 'module-rtp-source.c' ],
include_directories : [configinc],
install : true,
install_dir : modules_install_dir,
install_rpath: modules_install_dir,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep],
dependencies : [pipewire_module_rtp_common_dep],
)
pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink',
[ 'module-rtp-sink.c',
'module-rtp/stream.c' ],
[ 'module-rtp-sink.c' ],
include_directories : [configinc],
install : true,
install_dir : modules_install_dir,
install_rpath: modules_install_dir,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep],
dependencies : [pipewire_module_rtp_common_dep],
)
build_module_rtp_session = avahi_dep.found()
if build_module_rtp_session
pipewire_module_rtp_session = shared_library('pipewire-module-rtp-session',
[ 'module-rtp/stream.c',
'module-zeroconf-discover/avahi-poll.c',
'module-rtp-session.c' ],
[ 'module-zeroconf-discover/avahi-poll.c',
'module-rtp-session.c' ],
include_directories : [configinc],
install : true,
install_dir : modules_install_dir,
install_rpath: modules_install_dir,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, avahi_dep, opus_dep],
dependencies : [pipewire_module_rtp_common_dep, avahi_dep],
)
endif

View file

@ -22,6 +22,15 @@ static void ringbuffer_clear(struct spa_ringbuffer *rbuf SPA_UNUSED,
memset(iov[1].iov_base, 0, iov[1].iov_len);
}
static inline uint64_t scale_u64(uint64_t val, uint32_t num, uint32_t denom)
{
#if 0
return ((__uint128_t)val * num) / denom;
#else
return (uint64_t)((double)val / denom * num);
#endif
}
static void rtp_audio_process_playback(void *data)
{
struct impl *impl = data;
@ -61,6 +70,9 @@ static void rtp_audio_process_playback(void *data)
* read or write index itself.) */
if (impl->direct_timestamp) {
uint32_t num_samples_to_read;
uint32_t read_index;
/* In direct timestamp mode, the focus lies on synchronized playback, not
* on a constant latency. The ring buffer fill level is not of interest
* here. The code in rtp_audio_receive() writes to the ring buffer at
@ -89,22 +101,32 @@ static void rtp_audio_process_playback(void *data)
* timestamp mode, since all of them shift the timestamp by the same
* `sess.latency.msec` into the future.
*
* "Fill level" makes no sense in this mode, since a constant latency
* is not important in this mode, so no DLL is needed. Also, matching
* the pace of the synchronized clock is done by having the graph
* driver be synchronized to that clock, which will in turn cause
* any output sinks to adjust their DLLs (or similar control loop
* mechanisms) to match the pace of their data consumption with the
* pace of the driver. */
* Since in this mode, a constant latency is not important, tracking
* the fill level to keep it steady makes no sense. Consequently,
* no DLL is needed. Also, matching the pace of the synchronized clock
* is done by having the graph driver be synchronized to that clock,
* which will in turn cause any output sinks to adjust their DLLs
* (or similar control loop mechanisms) to match the pace of their
* data consumption with the pace of the driver.
*
* The fill level is still important though to correctly handle corner
* cases where the ring buffer is (almost) empty. If fewer samples
* are available than what the read operation wants, the deficit
* has to be compensated with nullbytes. To that end, the "avail"
* quantity tracks how many samples are actually available. */
if (impl->io_position) {
/* Use the clock position directly as the read index.
* Do NOT add device_delay here - the sink's DLL handles
* matching its hardware clock to the driver pace. Adding
* device_delay would create a feedback loop since rate
* adjustments affect both ringbuffer and device buffer. */
timestamp = impl->io_position->clock.position;
uint32_t clock_rate = impl->io_position->clock.rate.denom;
/* Translate the clock position to an RTP timestamp and
* shift it to compensate for device delay and ASRC delay.
* The device delay is scaled along with the clock position,
* since both are expressed in clock sample units, while
* pwt.buffered is expressed in stream time. */
timestamp = scale_u64(impl->io_position->clock.position + device_delay,
impl->rate, clock_rate) + pwt.buffered;
spa_ringbuffer_read_update(&impl->ring, timestamp);
avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index);
} else {
/* In the unlikely case that no spa_io_position pointer
* was passed yet by PipeWire to this node, resort to a
@ -112,26 +134,72 @@ static void rtp_audio_process_playback(void *data)
* This most likely is not in sync with other nodes,
* but _something_ is needed as read index until the
* spa_io_position is available. */
spa_ringbuffer_get_read_index(&impl->ring, &timestamp);
avail = spa_ringbuffer_get_read_index(&impl->ring, &timestamp);
read_index = timestamp;
}
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
impl->actual_max_buffer_size,
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
d[0].data, wanted * stride);
/* If avail is 0, it means that the ring buffer is empty. <0 means
* that there is an underrun, typically because the PTP time now
* is ahead of the RTP data (this can happen when the PTP master
* changes for example). And in cases where only a little bit of
* data is left, it is important to not try to use more than what
* is actually available.
* Overruns would happen if the write pointer is further ahead than
* what the ringbuffer size actually allows. This too can happen
* if the PTP time jumps. No actual buffer overflow would happen
* then, since the write operations always apply modulo to the
* timestamps to wrap around the ringbuffer borders.
*/
bool has_underrun = (avail < 0);
bool has_overrun = !has_underrun && ((uint32_t)avail) > impl->actual_max_buffer_size;
num_samples_to_read = has_underrun ? 0 : SPA_MIN((uint32_t)avail, wanted);
/* Clear the bytes that were just retrieved. Since the fill level
* is not tracked in this buffer mode, it is possible that as soon
* as actual playback ends, the RTP source node re-reads old data.
* Make sure it reads silence when no actual new data is present
* and the RTP source node still runs. Do this by filling the
* region of the retrieved data with null bytes. */
ringbuffer_clear(&impl->ring,
impl->buffer,
impl->actual_max_buffer_size,
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
wanted * stride);
/* Do some additional logging in the under/overrun cases. */
if (SPA_UNLIKELY(pw_log_topic_enabled(SPA_LOG_LEVEL_TRACE, PW_LOG_TOPIC_DEFAULT)))
{
uint32_t write_index;
int32_t filled = spa_ringbuffer_get_write_index(&impl->ring, &write_index);
if (has_underrun) {
pw_log_trace("Direct timestamp mode: Read index underrun: write_index: %"
PRIu32 ", read_index: %" PRIu32 ", wanted: %u - filled: %" PRIi32,
write_index, read_index, wanted, filled);
} else if (has_overrun) {
pw_log_trace("Direct timestamp mode: Read index overrun: write_index: %"
PRIu32 ", read_index: %" PRIu32 ", wanted: %u - filled: %" PRIi32
", buffer size: %u", write_index, read_index, wanted, filled,
impl->actual_max_buffer_size);
}
}
if (num_samples_to_read > 0) {
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
impl->actual_max_buffer_size,
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
d[0].data, num_samples_to_read * stride);
/* Clear the bytes that were just retrieved. Since the fill level
* is not tracked in this buffer mode, it is possible that as soon
* as actual playback ends, the RTP source node re-reads old data.
* Make sure it reads silence when no actual new data is present
* and the RTP source node still runs. Do this by filling the
* region of the retrieved data with null bytes. */
ringbuffer_clear(&impl->ring,
impl->buffer,
impl->actual_max_buffer_size,
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
num_samples_to_read * stride);
}
if (num_samples_to_read < wanted) {
/* If fewer samples were available than what was wanted,
* fill the remaining space in the destination memory
* with nullsamples. */
void *bytes_to_clear = SPA_PTROFF(d[0].data, num_samples_to_read * stride, void);
size_t num_bytes_to_clear = (wanted - num_samples_to_read) * stride;
spa_memzero(bytes_to_clear, num_bytes_to_clear);
}
if (!impl->io_position) {
/* In the unlikely case that no spa_io_position pointer
@ -222,6 +290,25 @@ static void rtp_audio_process_playback(void *data)
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
d[0].data, wanted * stride);
/* Clear the bytes that were just retrieved. Unlike in the
* direct timestamp mode, here, bytes are always read out
* of the ring buffer in sequence - the read pointer does
* not "jump around" (which can happen in direct timestamp
* mode if the last iteration has been a while ago and the
* driver clock time advanced significantly, or if the driver
* time experienced a discontinuity). However, should there
* be packet loss, it could lead to segments in the ring
* buffer that should have been written to but weren't written
* to. These segments would then contain old stale data. By
* clearing data out of the ring buffer after reading it, it
* is ensured that no stale data can exist - in the packet loss
* case, the outcome would be a gap made of nullsamples instead. */
ringbuffer_clear(&impl->ring,
impl->buffer,
impl->actual_max_buffer_size,
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
wanted * stride);
timestamp += wanted;
spa_ringbuffer_read_update(&impl->ring, timestamp);
}
@ -334,17 +421,43 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
* and not _appended_. In this example, `expected_write` would
* be 100 (since `expected_write` is the current write index),
* `write` would be 90, `samples` would be 10. In this case,
* the inequality below does not hold, so data is being
* _inserted_. By contrast, during normal operation, `write`
* and `expected_write` are equal, so the inequality below
* _does_ hold, meaning that data is being appended.
* the (expected_write < (write + samples)) inequality does
* not hold, so data is being _inserted_. By contrast, during
* normal operation, `write` and `expected_write` are equal,
* so the aforementioned inequality _does_ hold, meaning that
* data is being appended.
*
* The code below handles this, and also handles a 32-bit
* integer overflow corner case where the comparison has
* to be done differently to account for the wrap-around.
*
* (Note that this write index update is only important if
* the constant delay mode is active, or if no spa_io_position
* was not provided yet. See the rtp_audio_process_playback()
* code for more about this.) */
if (expected_write < (write + samples)) {
write += samples;
/* Compute new_write, handling potential 32-bit overflow.
* In unsigned arithmetic, if write + samples exceeds UINT32_MAX,
* it wraps around to a smaller value. We detect this by checking
* if new_write < write (which can only happen on overflow). */
const uint32_t new_write = write + samples;
const bool wrapped_around = new_write < write;
/* Determine if new_write is ahead of expected_write.
* We're appending (ahead) if:
*
* 1. Normal case: new_write > expected_write (forward progress)
* 2. Wrap-around case: new_write wrapped around (wrapped_around == true),
* meaning we've cycled through the 32-bit index space and are
* continuing from the beginning. In this case, we're always ahead.
*
* We're NOT appending (inserting/behind) if:
* - new_write <= expected_write AND no wrap-around occurred
* (we're filling a gap or writing behind the current position) */
const bool is_appending = wrapped_around || (new_write > expected_write);
if (is_appending) {
write = new_write;
spa_ringbuffer_write_update(&impl->ring, write);
}
}
@ -426,20 +539,27 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
iov[0].iov_len = sizeof(header);
while (num_packets > 0) {
uint32_t rtp_timestamp;
if (impl->marker_on_first && impl->first)
header.m = 1;
else
header.m = 0;
rtp_timestamp = impl->ts_offset + (set_timestamp ? set_timestamp : timestamp);
header.sequence_number = htons(impl->seq);
header.timestamp = htonl(impl->ts_offset + (set_timestamp ? set_timestamp : timestamp));
header.timestamp = htonl(rtp_timestamp);
set_iovec(&impl->ring,
impl->buffer, impl->actual_max_buffer_size,
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
&iov[1], tosend * stride);
pw_log_trace("sending %d packet:%d ts_offset:%d timestamp:%d",
tosend, num_packets, impl->ts_offset, timestamp);
pw_log_trace("sending %d packet:%d ts_offset:%d timestamp:%u (%f s)",
tosend, num_packets, impl->ts_offset, timestamp,
(double)timestamp * impl->io_position->clock.rate.num /
impl->io_position->clock.rate.denom);
rtp_stream_emit_send_packet(impl, iov, 3);
@ -500,6 +620,7 @@ static void rtp_audio_process_capture(void *data)
uint32_t pending, num_queued;
struct spa_io_position *pos;
uint64_t next_nsec, quantum;
struct pw_time pwt;
if (impl->separate_sender) {
/* apply the DLL rate */
@ -517,6 +638,8 @@ static void rtp_audio_process_capture(void *data)
stride = impl->stride;
wanted = size / stride;
pw_stream_get_time_n(impl->stream, &pwt, sizeof(pwt));
filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp);
pos = impl->io_position;
@ -533,6 +656,21 @@ static void rtp_audio_process_capture(void *data)
impl->sink_resamp_delay = impl->io_rate_match->delay;
impl->sink_quantum = (uint64_t)(pos->clock.duration * SPA_NSEC_PER_SEC / rate);
}
/* Compensate for the stream resampler's delay. */
actual_timestamp -= pwt.buffered;
/* If we got a request for less than quantum worth of samples, it indicates that there
* is a gap created by the resampler. We have to skip it to avoid timestamp discontinuity. */
if (pwt.buffered > 0) {
int32_t ideal_quantum = (int32_t)scale_u64(pos->clock.duration, impl->rate, rate);
if (wanted < ideal_quantum) {
int32_t num_samples_to_skip = ideal_quantum - wanted;
pw_log_info("wanted: %" PRId32 " < ideal quantum: %" PRId32 " - skipping %"
PRId32" samples", wanted, ideal_quantum, num_samples_to_skip);
actual_timestamp += num_samples_to_skip;
}
}
} else {
actual_timestamp = expected_timestamp;
next_nsec = 0;
@ -569,7 +707,8 @@ static void rtp_audio_process_capture(void *data)
if (!impl->have_sync) {
pw_log_info("(re)sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u",
actual_timestamp, impl->seq, impl->ts_offset, impl->ssrc);
impl->ring.readindex = impl->ring.writeindex = actual_timestamp;
spa_ringbuffer_read_update(&impl->ring, actual_timestamp);
spa_ringbuffer_write_update(&impl->ring, actual_timestamp);
memset(impl->buffer, 0, BUFFER_SIZE);
impl->have_sync = true;
expected_timestamp = actual_timestamp;

View file

@ -454,6 +454,10 @@ static int stream_stop(struct impl *impl)
* meaning that the timer was no longer running, and the connection
* could be closed. */
if (!timer_running) {
/* Clear the ringbuffer to prevent old invalid packets from being
* sent when processing resumes via rtp_audio_flush_packets() */
if (impl->reset_ringbuffer)
impl->reset_ringbuffer(impl);
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED);
pw_log_info("stream stopped");
}