module-rtp: use timestamps as ringbuffer index

Use the timestamps directly as the ringbuffer index. We can save some
conversions to bytes and there is a direct mapping to RTP timestamp,
clock position and ringbuffer index.

Simplify the source a little. Remove the buffering state, we always
start with read and write pointers separted by the target buffering.
This commit is contained in:
Wim Taymans 2023-02-02 15:49:58 +01:00
parent 16e995be26
commit aed394cf89
2 changed files with 119 additions and 120 deletions

View file

@ -250,9 +250,7 @@ struct session {
struct spa_io_position *position;
struct spa_dll dll;
uint32_t target_buffer;
uint32_t last_packet_size;
float max_error;
unsigned buffering:1;
unsigned first:1;
unsigned receiving:1;
unsigned direct_timestamp:1;
@ -270,8 +268,8 @@ static void stream_process(void *data)
struct session *sess = data;
struct pw_buffer *buf;
struct spa_data *d;
uint32_t index, target_buffer;
int32_t avail, wanted;
uint32_t wanted, timestamp, target_buffer, stride, maxsize;
int32_t avail;
if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) {
pw_log_debug("Out of stream buffers: %m");
@ -279,42 +277,53 @@ static void stream_process(void *data)
}
d = buf->buffer->datas;
wanted = buf->requested ?
SPA_MIN(buf->requested * sess->info.stride, d[0].maxsize)
: d[0].maxsize;
stride = sess->info.stride;
maxsize = d[0].maxsize / stride;
wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize;
if (sess->position && sess->direct_timestamp) {
/* in direct mode, read directly from the timestamp index,
* because sender and receiver are in sync, this would keep
* target_buffer of bytes available. */
spa_ringbuffer_read_update(&sess->ring,
sess->position->clock.position * sess->info.stride);
sess->position->clock.position);
}
avail = spa_ringbuffer_get_read_index(&sess->ring, &index);
avail = spa_ringbuffer_get_read_index(&sess->ring, &timestamp);
target_buffer = sess->target_buffer + sess->last_packet_size / 2;
target_buffer = sess->target_buffer;
if (avail < wanted || sess->buffering) {
memset(d[0].data, 0, wanted);
if (!sess->buffering && sess->have_sync) {
pw_log_debug("underrun %u/%u < %u, buffering...",
avail, target_buffer, wanted);
sess->buffering = true;
if (avail < (int32_t)wanted) {
enum spa_log_level level;
memset(d[0].data, 0, wanted * stride);
if (sess->have_sync) {
sess->have_sync = false;
level = SPA_LOG_LEVEL_WARN;
} else {
level = SPA_LOG_LEVEL_DEBUG;
}
pw_log(level, "underrun %d/%u < %u",
avail, target_buffer, wanted);
} else {
float error, corr;
if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE)) {
if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) {
pw_log_warn("overrun %u > %u", avail, target_buffer * 8);
index += avail - target_buffer;
timestamp += avail - target_buffer;
avail = target_buffer;
} else {
if (sess->first) {
if ((uint32_t)avail > target_buffer) {
uint32_t skip = avail - target_buffer;
pw_log_debug("first: avail:%d skip:%u target:%u",
} else if (sess->first) {
if ((uint32_t)avail > target_buffer) {
uint32_t skip = avail - target_buffer;
pw_log_debug("first: avail:%d skip:%u target:%u",
avail, skip, target_buffer);
index += skip;
avail = target_buffer;
}
sess->first = false;
timestamp += skip;
avail = target_buffer;
}
sess->first = false;
}
if (!sess->direct_timestamp) {
/* when not using direct timestamp and clocks are not
* in sync, try to adjust our playback rate to keep the
* requested target_buffer bytes in the ringbuffer */
error = (float)target_buffer - (float)avail;
error = SPA_CLAMP(error, -sess->max_error, sess->max_error);
@ -323,24 +332,25 @@ static void stream_process(void *data)
pw_log_debug("avail:%u target:%u error:%f corr:%f", avail,
target_buffer, error, corr);
if (sess->rate_match && !sess->direct_timestamp) {
SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE);
if (sess->rate_match) {
SPA_FLAG_SET(sess->rate_match->flags,
SPA_IO_RATE_MATCH_FLAG_ACTIVE);
sess->rate_match->rate = 1.0f / corr;
}
}
spa_ringbuffer_read_data(&sess->ring,
sess->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
d[0].data, wanted);
(timestamp * stride) & BUFFER_MASK,
d[0].data, wanted * stride);
index += wanted;
spa_ringbuffer_read_update(&sess->ring, index);
timestamp += wanted;
spa_ringbuffer_read_update(&sess->ring, timestamp);
}
d[0].chunk->size = wanted;
d[0].chunk->stride = sess->info.stride;
d[0].chunk->size = wanted * stride;
d[0].chunk->stride = stride;
d[0].chunk->offset = 0;
buf->size = wanted / sess->info.stride;
buf->size = wanted;
pw_stream_queue_buffer(sess->stream, buf);
}
@ -374,7 +384,7 @@ on_rtp_io(void *data, int fd, uint32_t mask)
uint8_t buffer[2048], *payload;
if (mask & SPA_IO_IN) {
uint32_t index, expected_index, timestamp;
uint32_t stride, read, timestamp, expected_timestamp, samples;
uint16_t seq;
int32_t filled;
@ -405,60 +415,45 @@ on_rtp_io(void *data, int fd, uint32_t mask)
sess->expected_seq = seq + 1;
sess->have_seq = true;
len = SPA_ROUND_DOWN(len - hlen, sess->info.stride);
stride = sess->info.stride;
samples = (len - hlen) / stride;
payload = &buffer[hlen];
filled = spa_ringbuffer_get_write_index(&sess->ring, &index);
filled = spa_ringbuffer_get_write_index(&sess->ring, &expected_timestamp);
timestamp = ntohl(hdr->timestamp) - sess->info.ts_offset;
expected_index = timestamp * sess->info.stride;
if (sess->direct_timestamp)
expected_index += sess->target_buffer;
read = ntohl(hdr->timestamp) - sess->info.ts_offset;
/* we always write to timestamp + delay */
timestamp = read + sess->target_buffer;
if (!sess->have_sync) {
sess->ring.readindex = sess->ring.writeindex =
index = expected_index;
filled = 0;
sess->have_sync = true;
sess->buffering = true;
pw_log_debug("sync to timestamp %u", index);
pw_log_info("sync to timestamp %u", read);
/* we read from timestamp, keeping target_buffer of data
* in the ringbuffer. */
sess->ring.readindex = read;
sess->ring.writeindex = timestamp;
filled = sess->target_buffer;
spa_dll_init(&sess->dll);
spa_dll_set_bw(&sess->dll, SPA_DLL_BW_MIN, 128, sess->info.info.rate);
} else if (expected_index != index) {
sess->have_sync = true;
} else if (expected_timestamp != timestamp) {
pw_log_debug("unexpected timestamp (%u != %u)",
index / sess->info.stride,
expected_index / sess->info.stride);
index = expected_index;
filled = 0;
timestamp, expected_timestamp);
}
if (filled + len > BUFFER_SIZE) {
pw_log_debug("got rtp, capture overrun %u %zd", filled, len);
if (filled + samples > BUFFER_SIZE / stride) {
pw_log_debug("capture overrun %u + %u > %u", filled, samples,
BUFFER_SIZE / stride);
sess->have_sync = false;
} else {
uint32_t target_buffer;
pw_log_trace("got rtp packet len:%zd", len);
pw_log_trace("got samples:%u", samples);
spa_ringbuffer_write_data(&sess->ring,
sess->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
payload, len);
index += len;
filled += len;
spa_ringbuffer_write_update(&sess->ring, index);
sess->last_packet_size = len;
target_buffer = sess->target_buffer + len/2;
if (sess->buffering && (uint32_t)filled > target_buffer) {
sess->buffering = false;
pw_log_debug("buffering done %u > %u",
filled, target_buffer);
}
(timestamp * stride) & BUFFER_MASK,
payload, (samples * stride));
timestamp += samples;
spa_ringbuffer_write_update(&sess->ring, timestamp);
}
sess->receiving = true;
}
@ -559,9 +554,9 @@ error:
return res;
}
static uint32_t msec_to_bytes(struct sdp_info *info, uint32_t msec)
static uint32_t msec_to_samples(struct sdp_info *info, uint32_t msec)
{
return msec * info->stride * info->info.rate / 1000;
return msec * info->info.rate / 1000;
}
static void session_free(struct session *sess)
@ -736,12 +731,12 @@ static int session_new(struct impl *impl, struct sdp_info *info)
sess_latency_msec = pw_properties_get_uint32(props,
"sess.latency.msec", impl->sess_latency_msec);
session->target_buffer = msec_to_bytes(info, sess_latency_msec);
session->max_error = msec_to_bytes(info, ERROR_MSEC);
session->target_buffer = msec_to_samples(info, sess_latency_msec);
session->max_error = msec_to_samples(info, ERROR_MSEC);
pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", info->info.rate);
pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d",
session->target_buffer / (2 * info->stride), info->info.rate);
session->target_buffer / 2, info->info.rate);
spa_dll_init(&session->dll);
spa_dll_set_bw(&session->dll, SPA_DLL_BW_MIN, 128, session->info.info.rate);