module-rtp-sink: improve sync

Always write to index of the clock, not just for the first packet.
This ensure the timestamp on the packet always matches the time it was
processed in the graph.
This commit is contained in:
Wim Taymans 2023-02-01 20:06:19 +01:00
parent 1a5de467db
commit b50fa83325

View file

@ -257,6 +257,8 @@ struct impl {
int rtp_fd;
int sap_fd;
unsigned sync:1;
};
@ -328,7 +330,8 @@ static void flush_packets(struct impl *impl)
pw_log_debug("remote end not listening");
break;
default:
pw_log_warn("sendmsg() failed: %m");
pw_log_warn("sendmsg() failed, seq:%u dropped: %m",
impl->seq);
break;
}
}
@ -346,7 +349,7 @@ static void stream_process(void *data)
struct impl *impl = data;
struct pw_buffer *buf;
struct spa_data *d;
uint32_t index;
uint32_t index, expected_index;
int32_t filled, wanted;
if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) {
@ -357,25 +360,32 @@ static void stream_process(void *data)
wanted = d[0].chunk->size;
filled = spa_ringbuffer_get_write_index(&impl->ring, &index);
if (filled == 0 && impl->io_position) {
index = impl->ring.readindex = impl->ring.writeindex =
impl->io_position->clock.position * impl->frame_size;
filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_index);
if (impl->io_position)
index = impl->io_position->clock.position * impl->frame_size;
else
index = expected_index;
if (!impl->sync) {
pw_log_info("sync %u", index);
impl->ring.readindex = impl->ring.writeindex = index;
impl->sync = true;
} else if (expected_index != index) {
pw_log_warn("expected %u != index %u", expected_index, index);
impl->ring.readindex = impl->ring.writeindex = index;
} else if (filled + wanted > (int32_t)BUFFER_SIZE) {
pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE);
impl->ring.readindex = impl->ring.writeindex = index;
}
spa_ringbuffer_write_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
d[0].data, wanted);
index += wanted;
spa_ringbuffer_write_update(&impl->ring, index);
if (filled + wanted > (int32_t)BUFFER_SIZE) {
pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE);
} else {
spa_ringbuffer_write_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
d[0].data, wanted);
index += wanted;
spa_ringbuffer_write_update(&impl->ring, index);
}
pw_stream_queue_buffer(impl->stream, buf);
flush_packets(impl);