From 9fb44c3a713061ab2f5f1e2608de9dba1ff74287 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 25 Jan 2023 16:23:00 +0100 Subject: [PATCH] module-rtp-source: improve buffer handling Try to keep half the packet size in the ringbuffer as well. This helps us adapt to the packet size of the sender. Drop samples from the ringbuffer for the first packet we read. This makes us lock onto the stream with the exact requested latency. --- src/modules/module-rtp-source.c | 68 +++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 35ee410f1..8a93697e4 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -239,8 +239,10 @@ struct session { struct spa_io_rate_match *rate_match; struct spa_dll dll; uint32_t target_buffer; + uint32_t last_packet_size; float max_error; unsigned buffering:1; + unsigned first:1; }; static void stream_destroy(void *d) @@ -255,8 +257,8 @@ static void stream_process(void *data) struct session *sess = data; struct pw_buffer *buf; struct spa_data *d; - uint32_t index; - int32_t avail, wanted; + uint32_t index, target_buffer; + int32_t avail, wanted; if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); @@ -270,27 +272,39 @@ static void stream_process(void *data) avail = spa_ringbuffer_get_read_index(&sess->ring, &index); - if (avail < wanted || sess->buffering) { - memset(d[0].data, 0, wanted); + target_buffer = sess->target_buffer + sess->last_packet_size / 2; + + if (avail < wanted || sess->buffering) { + memset(d[0].data, 0, wanted); if (!sess->buffering && sess->have_sync) { pw_log_debug("underrun %u/%u < %u, buffering...", - avail, sess->target_buffer, wanted); + avail, target_buffer, wanted); sess->buffering = true; } - } else { + } else { float error, corr; - if (avail > (int32_t)BUFFER_SIZE) { - index += avail - sess->target_buffer; - avail = sess->target_buffer; - pw_log_warn("overrun %u > %u", avail, BUFFER_SIZE); + if (avail > (int32_t)SPA_MIN(target_buffer * 3, BUFFER_SIZE)) { + index += avail - target_buffer; + avail = target_buffer; + pw_log_warn("overrun %u > %u", avail, target_buffer * 3); } else { - error = (float)sess->target_buffer - (float)avail; + if (sess->first) { + if ((uint32_t)avail > target_buffer) { + uint32_t skip = avail - target_buffer; + pw_log_debug("first: avail:%d skip:%u target:%u", + avail, skip, target_buffer); + index += skip; + avail = target_buffer; + } + sess->first = false; + } + error = (float)target_buffer - (float)avail; error = SPA_CLAMP(error, -sess->max_error, sess->max_error); corr = spa_dll_update(&sess->dll, error); pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, - sess->target_buffer, error, corr); + target_buffer, error, corr); if (sess->rate_match) { SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); @@ -300,16 +314,16 @@ static void stream_process(void *data) spa_ringbuffer_read_data(&sess->ring, sess->buffer, BUFFER_SIZE, - index & BUFFER_MASK, - d[0].data, wanted); + index & BUFFER_MASK, + d[0].data, wanted); - index += wanted; - spa_ringbuffer_read_update(&sess->ring, index); - } - d[0].chunk->size = wanted; - d[0].chunk->stride = sess->info.stride; - d[0].chunk->offset = 0; - buf->size = wanted / sess->info.stride; + index += wanted; + spa_ringbuffer_read_update(&sess->ring, index); + } + d[0].chunk->size = wanted; + d[0].chunk->stride = sess->info.stride; + d[0].chunk->offset = 0; + buf->size = wanted / sess->info.stride; pw_stream_queue_buffer(sess->stream, buf); } @@ -423,7 +437,9 @@ on_rtp_io(void *data, int fd, uint32_t mask) pw_log_debug("got rtp, capture overrun %u %zd", filled, len); sess->have_sync = false; } else { - pw_log_trace("got rtp, buffering"); + uint32_t target_buffer; + + pw_log_trace("got rtp packet len:%zd", len); spa_ringbuffer_write_data(&sess->ring, sess->buffer, BUFFER_SIZE, @@ -433,10 +449,13 @@ on_rtp_io(void *data, int fd, uint32_t mask) filled += len; spa_ringbuffer_write_update(&sess->ring, index); - if (sess->buffering && (uint32_t)filled > sess->target_buffer) { + sess->last_packet_size = len; + target_buffer = sess->target_buffer + len/2; + + if (sess->buffering && (uint32_t)filled > target_buffer) { sess->buffering = false; pw_log_debug("buffering done %u > %u", - filled, sess->target_buffer); + filled, target_buffer); } } } @@ -603,6 +622,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) return -errno; session->info = *info; + session->first = true; props = pw_properties_copy(impl->stream_props); if (props == NULL) {