module-rtp: make audio buffer size configurable

Add new sess.buffer-size property that can override default 1<<22b buffer
size.
This commit is contained in:
Martin Geier 2025-12-10 15:46:17 +01:00 committed by Carlos Rafael Giani
parent e7563b19b6
commit f935eb255c
4 changed files with 48 additions and 35 deletions

View file

@ -252,7 +252,7 @@ static void rtp_audio_process_playback(void *data)
avail = target_buffer;
}
impl->first = false;
} else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) {
} else if (avail > (int32_t)SPA_MIN(target_buffer * 8, impl->buffer_size / stride)) {
pw_log_warn("receiver read overrun %u > %u", avail, target_buffer * 8);
timestamp += avail - target_buffer;
avail = target_buffer;
@ -392,7 +392,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
spa_dll_init(&impl->dll);
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate);
memset(impl->buffer, 0, BUFFER_SIZE);
memset(impl->buffer, 0, impl->buffer_size);
impl->have_sync = true;
} else if (expected_write != write) {
pw_log_debug("unexpected write (%u != %u)",
@ -402,9 +402,9 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
/* Write overrun only makes sense in constant delay mode. See the
* RTP source module documentation and the rtp_audio_process_playback()
* code for an explanation why. */
if (!impl->direct_timestamp && (filled + samples > BUFFER_SIZE / stride)) {
if (!impl->direct_timestamp && (filled + samples > impl->buffer_size / stride)) {
pw_log_debug("receiver write overrun %u + %u > %u", filled, samples,
BUFFER_SIZE / stride);
impl->buffer_size / stride);
impl->have_sync = false;
} else {
pw_log_trace("got samples:%u", samples);
@ -736,9 +736,9 @@ static void rtp_audio_process_capture(void *data)
* "Driver architecture and workflow" for an explanation why not. */
pw_log_warn("timestamp: expected %u != actual %u", expected_timestamp, actual_timestamp);
impl->have_sync = false;
} else if (filled + wanted > (int32_t)SPA_MIN(impl->target_buffer * 8, BUFFER_SIZE / stride)) {
} else if (filled + wanted > (int32_t)SPA_MIN(impl->target_buffer * 8, impl->buffer_size / stride)) {
pw_log_warn("sender write overrun %u + %u > %u/%u", filled, wanted,
impl->target_buffer * 8, BUFFER_SIZE / stride);
impl->target_buffer * 8, impl->buffer_size / stride);
impl->have_sync = false;
filled = 0;
}
@ -754,7 +754,7 @@ static void rtp_audio_process_capture(void *data)
actual_timestamp, impl->seq, impl->ts_offset, impl->ts_align, impl->ssrc);
spa_ringbuffer_read_update(&impl->ring, actual_timestamp);
spa_ringbuffer_write_update(&impl->ring, actual_timestamp);
memset(impl->buffer, 0, BUFFER_SIZE);
memset(impl->buffer, 0, impl->buffer_size);
impl->have_sync = true;
expected_timestamp = actual_timestamp;
filled = 0;

View file

@ -49,7 +49,7 @@ static void rtp_midi_process_playback(void *data)
if (avail <= 0)
break;
ptr = SPA_PTROFF(impl->buffer, read & BUFFER_MASK2, void);
ptr = SPA_PTROFF(impl->buffer, read & impl->buffer_mask2, void);
if ((pod = spa_pod_from_data(ptr, avail, 0, avail)) == NULL)
goto done;
@ -235,7 +235,7 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti
}
filled = spa_ringbuffer_get_write_index(&impl->ring, &write);
if (filled > (int32_t)BUFFER_SIZE2) {
if (filled > (int32_t)impl->buffer_size2) {
pw_log_warn("overflow");
return -ENOSPC;
}
@ -261,11 +261,11 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti
if (hdr.j)
parse_journal(impl, &packet[end], seq, plen - end);
ptr = SPA_PTROFF(impl->buffer, write & BUFFER_MASK2, void);
ptr = SPA_PTROFF(impl->buffer, write & impl->buffer_mask2, void);
/* each packet is written as a sequence of events. The offset is
* the RTP timestamp */
spa_pod_builder_init(&b, ptr, BUFFER_SIZE2 - filled);
spa_pod_builder_init(&b, ptr, impl->buffer_size2 - filled);
spa_pod_builder_push_sequence(&b, &f[0], 0);
while (offs < end) {
@ -472,7 +472,7 @@ static void rtp_midi_flush_packets(struct impl *impl,
impl->seq++;
len = 0;
}
if ((unsigned int)size > BUFFER_SIZE || len > BUFFER_SIZE - size) {
if ((unsigned int)size > impl->buffer_size || len > impl->buffer_size - size) {
pw_log_error("Buffer overflow prevented!");
return; // FIXME: what to do instead?
}
@ -488,7 +488,7 @@ static void rtp_midi_flush_packets(struct impl *impl,
int res;
delta = offset - prev_offset;
prev_offset = offset;
res = write_event(&impl->buffer[len], BUFFER_SIZE - len, delta, data, size);
res = write_event(&impl->buffer[len], impl->buffer_size - len, delta, data, size);
if (res < 0) {
pw_log_warn("write_event error: %d", res);
return;

View file

@ -62,7 +62,7 @@ static void rtp_opus_process_playback(void *data)
avail = target_buffer;
}
impl->first = false;
} else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE2 / stride)) {
} else if (avail > (int32_t)SPA_MIN(target_buffer * 8, impl->buffer_size2 / stride)) {
pw_log_warn("overrun %u > %u", avail, target_buffer * 8);
timestamp += avail - target_buffer;
avail = target_buffer;
@ -83,8 +83,8 @@ static void rtp_opus_process_playback(void *data)
}
spa_ringbuffer_read_data(&impl->ring,
impl->buffer,
BUFFER_SIZE2,
(timestamp * stride) & BUFFER_MASK2,
impl->buffer_size2,
(timestamp * stride) & impl->buffer_mask2,
d[0].data, wanted * stride);
timestamp += wanted;
@ -165,19 +165,19 @@ static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
spa_dll_init(&impl->dll);
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate);
memset(impl->buffer, 0, BUFFER_SIZE);
memset(impl->buffer, 0, impl->buffer_size);
impl->have_sync = true;
} else if (expected_write != write) {
pw_log_debug("unexpected write (%u != %u)",
write, expected_write);
}
if (filled + 2880 > (int32_t)(BUFFER_SIZE2 / stride)) {
if (filled + 2880 > (int32_t)(impl->buffer_size2 / stride)) {
pw_log_debug("capture overrun %u + %d > %u", filled, 2880,
BUFFER_SIZE2 / stride);
impl->buffer_size2 / stride);
impl->have_sync = false;
} else {
uint32_t index = (write * stride) & BUFFER_MASK2, end;
uint32_t index = (write * stride) & impl->buffer_mask2, end;
res = opus_multistream_decode_float(dec,
&buffer[hlen], plen,
@ -186,8 +186,8 @@ static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
end = index + (res * stride);
/* fold to the lower part of the ringbuffer when overflow */
if (end > BUFFER_SIZE2)
memmove(impl->buffer, &impl->buffer[BUFFER_SIZE2], end - BUFFER_SIZE2);
if (end > impl->buffer_size2)
memmove(impl->buffer, &impl->buffer[impl->buffer_size2], end - impl->buffer_size2);
pw_log_info("receiving %zd len:%d timestamp:%d %u", plen, res, timestamp, index);
samples = res;
@ -302,22 +302,22 @@ static void rtp_opus_process_capture(void *data)
pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u",
timestamp, impl->seq, impl->ts_offset, impl->ssrc);
impl->ring.readindex = impl->ring.writeindex = expected_timestamp = timestamp;
memset(impl->buffer, 0, BUFFER_SIZE);
memset(impl->buffer, 0, impl->buffer_size);
impl->have_sync = true;
} else {
if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) {
pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp);
impl->have_sync = false;
} else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) {
pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride);
} else if (filled + wanted > (int32_t)(impl->buffer_size / stride)) {
pw_log_warn("overrun %u + %u > %u", filled, wanted, impl->buffer_size / stride);
impl->have_sync = false;
}
}
spa_ringbuffer_write_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
(filled * stride) & BUFFER_MASK,
impl->buffer_size,
(filled * stride) & impl->buffer_mask,
SPA_PTROFF(d[0].data, offs, void), wanted * stride);
expected_timestamp += wanted;
spa_ringbuffer_write_update(&impl->ring, expected_timestamp);

View file

@ -31,10 +31,7 @@
PW_LOG_TOPIC_EXTERN(mod_topic);
#define PW_LOG_TOPIC_DEFAULT mod_topic
#define BUFFER_SIZE (1u<<22)
#define BUFFER_MASK (BUFFER_SIZE-1)
#define BUFFER_SIZE2 (BUFFER_SIZE>>1)
#define BUFFER_MASK2 (BUFFER_SIZE2-1)
#define BUFFER_SIZE1 (1u<<22)
/* IMPORTANT: When using calls that have return values, like
* rtp_stream_emit_open_connection, callers must set the variables
@ -117,7 +114,11 @@ struct impl {
uint32_t ts_align;
struct spa_ringbuffer ring;
uint8_t buffer[BUFFER_SIZE];
uint8_t *buffer;
uint32_t buffer_size;
uint32_t buffer_mask;
uint32_t buffer_size2;
uint32_t buffer_mask2;
uint64_t last_recv_timestamp;
struct spa_io_rate_match *io_rate_match;
@ -615,7 +616,7 @@ static void on_flush_timeout(void *d, uint64_t expirations)
static void default_reset_ringbuffer(struct impl *impl)
{
spa_memzero(impl->buffer, sizeof(impl->buffer));
spa_memzero(impl->buffer, impl->buffer_size);
}
struct rtp_stream *rtp_stream_new(struct pw_core *core,
@ -782,9 +783,18 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
* then, all grid cells are guaranteed to have the size impl->stride, so the
* aforementioned division rest will always be zero.
*/
impl->actual_max_buffer_size = SPA_ROUND_DOWN(BUFFER_SIZE, impl->stride);
impl->buffer_size = pw_properties_get_uint32(props, "sess.buffer-size", BUFFER_SIZE1);
// find closest larger number rounded to power of two
impl->buffer_size = SPA_ROUND_UP_POW2_32(impl->buffer_size);
impl->buffer_mask = impl->buffer_size - 1;
impl->buffer_size2 = impl->buffer_size / 2;
impl->buffer_mask2 = impl->buffer_size2 - 1;
impl->buffer = calloc(1, impl->buffer_size);
impl->actual_max_buffer_size = SPA_ROUND_DOWN(impl->buffer_size, impl->stride);
pw_log_debug("possible / actual max buffer size: %" PRIu32 " / %" PRIu32,
(uint32_t)BUFFER_SIZE, impl->actual_max_buffer_size);
(uint32_t)impl->buffer_size, impl->actual_max_buffer_size);
pw_properties_setf(props, "rtp.mime", "%s", impl->format_info->mime);
@ -1046,6 +1056,9 @@ void rtp_stream_destroy(struct rtp_stream *s)
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
if (impl->buffer)
free(impl->buffer);
spa_hook_list_clean(&impl->listener_list);
free(impl);
}