module-rtp-source: improve buffer handling

Try to keep half the packet size in the ringbuffer as well. This helps
us adapt to the packet size of the sender.

Drop samples from the ringbuffer for the first packet we read. This
makes us lock onto the stream with the exact requested latency.
This commit is contained in:
Wim Taymans 2023-01-25 16:23:00 +01:00
parent 80a6880f33
commit 9fb44c3a71

View file

@ -239,8 +239,10 @@ struct session {
struct spa_io_rate_match *rate_match;
struct spa_dll dll;
uint32_t target_buffer;
uint32_t last_packet_size;
float max_error;
unsigned buffering:1;
unsigned first:1;
};
static void stream_destroy(void *d)
@ -255,8 +257,8 @@ static void stream_process(void *data)
struct session *sess = data;
struct pw_buffer *buf;
struct spa_data *d;
uint32_t index;
int32_t avail, wanted;
uint32_t index, target_buffer;
int32_t avail, wanted;
if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) {
pw_log_debug("Out of stream buffers: %m");
@ -270,27 +272,39 @@ static void stream_process(void *data)
avail = spa_ringbuffer_get_read_index(&sess->ring, &index);
if (avail < wanted || sess->buffering) {
memset(d[0].data, 0, wanted);
target_buffer = sess->target_buffer + sess->last_packet_size / 2;
if (avail < wanted || sess->buffering) {
memset(d[0].data, 0, wanted);
if (!sess->buffering && sess->have_sync) {
pw_log_debug("underrun %u/%u < %u, buffering...",
avail, sess->target_buffer, wanted);
avail, target_buffer, wanted);
sess->buffering = true;
}
} else {
} else {
float error, corr;
if (avail > (int32_t)BUFFER_SIZE) {
index += avail - sess->target_buffer;
avail = sess->target_buffer;
pw_log_warn("overrun %u > %u", avail, BUFFER_SIZE);
if (avail > (int32_t)SPA_MIN(target_buffer * 3, BUFFER_SIZE)) {
index += avail - target_buffer;
avail = target_buffer;
pw_log_warn("overrun %u > %u", avail, target_buffer * 3);
} else {
error = (float)sess->target_buffer - (float)avail;
if (sess->first) {
if ((uint32_t)avail > target_buffer) {
uint32_t skip = avail - target_buffer;
pw_log_debug("first: avail:%d skip:%u target:%u",
avail, skip, target_buffer);
index += skip;
avail = target_buffer;
}
sess->first = false;
}
error = (float)target_buffer - (float)avail;
error = SPA_CLAMP(error, -sess->max_error, sess->max_error);
corr = spa_dll_update(&sess->dll, error);
pw_log_debug("avail:%u target:%u error:%f corr:%f", avail,
sess->target_buffer, error, corr);
target_buffer, error, corr);
if (sess->rate_match) {
SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE);
@ -300,16 +314,16 @@ static void stream_process(void *data)
spa_ringbuffer_read_data(&sess->ring,
sess->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
d[0].data, wanted);
index & BUFFER_MASK,
d[0].data, wanted);
index += wanted;
spa_ringbuffer_read_update(&sess->ring, index);
}
d[0].chunk->size = wanted;
d[0].chunk->stride = sess->info.stride;
d[0].chunk->offset = 0;
buf->size = wanted / sess->info.stride;
index += wanted;
spa_ringbuffer_read_update(&sess->ring, index);
}
d[0].chunk->size = wanted;
d[0].chunk->stride = sess->info.stride;
d[0].chunk->offset = 0;
buf->size = wanted / sess->info.stride;
pw_stream_queue_buffer(sess->stream, buf);
}
@ -423,7 +437,9 @@ on_rtp_io(void *data, int fd, uint32_t mask)
pw_log_debug("got rtp, capture overrun %u %zd", filled, len);
sess->have_sync = false;
} else {
pw_log_trace("got rtp, buffering");
uint32_t target_buffer;
pw_log_trace("got rtp packet len:%zd", len);
spa_ringbuffer_write_data(&sess->ring,
sess->buffer,
BUFFER_SIZE,
@ -433,10 +449,13 @@ on_rtp_io(void *data, int fd, uint32_t mask)
filled += len;
spa_ringbuffer_write_update(&sess->ring, index);
if (sess->buffering && (uint32_t)filled > sess->target_buffer) {
sess->last_packet_size = len;
target_buffer = sess->target_buffer + len/2;
if (sess->buffering && (uint32_t)filled > target_buffer) {
sess->buffering = false;
pw_log_debug("buffering done %u > %u",
filled, sess->target_buffer);
filled, target_buffer);
}
}
}
@ -603,6 +622,7 @@ static int session_new(struct impl *impl, struct sdp_info *info)
return -errno;
session->info = *info;
session->first = true;
props = pw_properties_copy(impl->stream_props);
if (props == NULL) {