module-rtp: Limit actual max buffer size to an integer multiple of stride

Opus and MIDI code get TODOs added, since it is currently unclear how to
implement that fix for them.
This commit is contained in:
Carlos Rafael Giani 2025-08-03 20:10:25 +02:00 committed by Wim Taymans
parent e5be9cce4f
commit c9a8b8629f
4 changed files with 58 additions and 15 deletions

View file

@ -55,7 +55,7 @@ static void rtp_audio_process_playback(void *data)
* ring buffer at a position X" is mentioned. To be exact, that buffer is actually
* impl->buffer. And since X can be a timestamp whose value is far higher than the
* buffer size (and the fact that impl->buffer is a _ring_ buffer), reads and writes
* actually first apply BUFFER_MASK to the position to implement a ring buffer
* actually first do a modulo operation to the position to implement a ring buffer
* index wrap-around. (Wrap-around when reading / writing the data bytes is
* handled by the spa_ringbuffer code; this is about the wrap around of the
* read or write index itself.) */
@ -114,8 +114,8 @@ static void rtp_audio_process_playback(void *data)
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
impl->actual_max_buffer_size,
(timestamp * stride) % impl->actual_max_buffer_size,
d[0].data, wanted * stride);
/* Clear the bytes that were just retrieved. Since the fill level
@ -126,8 +126,8 @@ static void rtp_audio_process_playback(void *data)
* region of the retrieved data with null bytes. */
ringbuffer_clear(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
impl->actual_max_buffer_size,
(timestamp * stride) % impl->actual_max_buffer_size,
wanted * stride);
if (!impl->io_position) {
@ -215,8 +215,8 @@ static void rtp_audio_process_playback(void *data)
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
impl->actual_max_buffer_size,
(timestamp * stride) % impl->actual_max_buffer_size,
d[0].data, wanted * stride);
timestamp += wanted;
@ -314,8 +314,8 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
pw_log_trace("got samples:%u", samples);
spa_ringbuffer_write_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(write * stride) & BUFFER_MASK,
impl->actual_max_buffer_size,
(write * stride) % impl->actual_max_buffer_size,
&buffer[hlen], (samples * stride));
/* Only update the write index if data was actually _appended_.
@ -418,8 +418,8 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
header.timestamp = htonl(impl->ts_offset + (set_timestamp ? set_timestamp : timestamp));
set_iovec(&impl->ring,
impl->buffer, BUFFER_SIZE,
(timestamp * stride) & BUFFER_MASK,
impl->buffer, impl->actual_max_buffer_size,
(timestamp * stride) % impl->actual_max_buffer_size,
&iov[1], tosend * stride);
pw_log_trace("sending %d packet:%d ts_offset:%d timestamp:%d",
@ -553,8 +553,8 @@ static void rtp_audio_process_capture(void *data)
spa_ringbuffer_write_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(expected_timestamp * stride) & BUFFER_MASK,
impl->actual_max_buffer_size,
(expected_timestamp * stride) % impl->actual_max_buffer_size,
SPA_PTROFF(d[0].data, offs, void), wanted * stride);
expected_timestamp += wanted;
spa_ringbuffer_write_update(&impl->ring, expected_timestamp);

View file

@ -5,7 +5,8 @@
#include <inttypes.h>
#include <limits.h>
/* TODO: Direct timestamp mode here may require a rework. See audio.c for a reference. */
/* TODO: Direct timestamp mode here may require a rework. See audio.c for a reference.
* Also check out the usage of actual_max_buffer_size in audio.c. */
static void rtp_midi_process_playback(void *data)
{

View file

@ -7,7 +7,8 @@
#include <opus/opus.h>
#include <opus/opus_multistream.h>
/* TODO: Direct timestamp mode here may require a rework. See audio.c for a reference. */
/* TODO: Direct timestamp mode here may require a rework. See audio.c for a reference.
* Also check out the usage of actual_max_buffer_size in audio.c. */
static void rtp_opus_process_playback(void *data)
{

View file

@ -62,6 +62,7 @@ struct impl {
uint32_t rate;
uint32_t stride;
uint32_t actual_max_buffer_size;
uint8_t payload;
uint32_t ssrc;
uint16_t seq;
@ -423,6 +424,46 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
break;
}
/* Limit the actual maximum buffer size to the maximum integer multiple
* amount of impl->stride that fits within BUFFER_SIZE. This is important
* to prevent corner cases where the read pointer wrapped around at the
* same time when the IO clock experiences a discontinuity.
*
* If the BUFFER_SIZE constant is not an integer multiple of impl->stride,
* pointer wrap-arounds will result in positions that exhibit a nonzero
* impl->stride division rest. Also, the write and read pointers are normally
* increased monotonically and contiguously. But, if a discontinuity is
* detected, these pointers may be resynchronized. Importantly, sometimes
* only one of them may be resynchronized, while the other retains its existing
* synchronization. (For example, the read and write side may use different
* discontinuity thresholds.)
*
* What then can happen is that the resynchronized pointer exhibits a _different_
* impl->stride division than the other pointer. Once the resynchronization takes
* place, that pointer is again monotonically increased from then on, so those
* division rests will stay different. This then means that the read and write
* operations will not be aligned properly. For example, a write operation might
* write to position 20 in the ring buffer, but the read operation might read
* from position 22, and doing so with a stride value of 6. The end result is
* invalid data.
*
* One way to visualize this is to think of the ring buffer as a grid. The grid
* cell size equals impl->stride. If BUFFER_SIZE is not an integer multiple of
* impl->stride, it means that the very last grid cell will have a size that is
* smaller than impl->stride. The unaligned read/write operations mean that the
* operations will not be done at the same grid cell boundaries, so for example
* the read operation might think that a cell starts at byte 2, while the write
* operation might think that the same cell starts at byte 4.
*
* By limiting the actual maximum buffer size to the maximum integer multiple
* amount of impl->stride that fits within BUFFER_SIZE, this is avoided, since
* then, all grid cells are guaranteed to have the size impl->stride, so the
* aforementioned division rest will always be zero.
*/
impl->actual_max_buffer_size = SPA_ROUND_DOWN(BUFFER_SIZE, impl->stride);
pw_log_debug("possible / actual max buffer size: %" PRIu32 " / %" PRIu32,
(uint32_t)BUFFER_SIZE, impl->actual_max_buffer_size);
pw_properties_setf(props, "rtp.mime", "%s", impl->format_info->mime);
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)