From b50fa833259e3c2aa172a292f0741d06ce4656e9 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 1 Feb 2023 20:06:19 +0100 Subject: [PATCH] module-rtp-sink: improve sync Always write to index of the clock, not just for the first packet. This ensure the timestamp on the packet always matches the time it was processed in the graph. --- src/modules/module-rtp-sink.c | 46 +++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index a4398ffb3..523f821eb 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -257,6 +257,8 @@ struct impl { int rtp_fd; int sap_fd; + + unsigned sync:1; }; @@ -328,7 +330,8 @@ static void flush_packets(struct impl *impl) pw_log_debug("remote end not listening"); break; default: - pw_log_warn("sendmsg() failed: %m"); + pw_log_warn("sendmsg() failed, seq:%u dropped: %m", + impl->seq); break; } } @@ -346,7 +349,7 @@ static void stream_process(void *data) struct impl *impl = data; struct pw_buffer *buf; struct spa_data *d; - uint32_t index; + uint32_t index, expected_index; int32_t filled, wanted; if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { @@ -357,25 +360,32 @@ static void stream_process(void *data) wanted = d[0].chunk->size; - filled = spa_ringbuffer_get_write_index(&impl->ring, &index); - if (filled == 0 && impl->io_position) { - index = impl->ring.readindex = impl->ring.writeindex = - impl->io_position->clock.position * impl->frame_size; + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_index); + if (impl->io_position) + index = impl->io_position->clock.position * impl->frame_size; + else + index = expected_index; + + if (!impl->sync) { + pw_log_info("sync %u", index); + impl->ring.readindex = impl->ring.writeindex = index; + impl->sync = true; + } else if (expected_index != index) { + pw_log_warn("expected %u != index %u", expected_index, index); + impl->ring.readindex = impl->ring.writeindex = index; + } else if (filled + wanted > (int32_t)BUFFER_SIZE) { + pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE); + impl->ring.readindex = impl->ring.writeindex = index; } + spa_ringbuffer_write_data(&impl->ring, + impl->buffer, + BUFFER_SIZE, + index & BUFFER_MASK, + d[0].data, wanted); + index += wanted; + spa_ringbuffer_write_update(&impl->ring, index); - if (filled + wanted > (int32_t)BUFFER_SIZE) { - pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE); - } else { - spa_ringbuffer_write_data(&impl->ring, - impl->buffer, - BUFFER_SIZE, - index & BUFFER_MASK, - d[0].data, wanted); - - index += wanted; - spa_ringbuffer_write_update(&impl->ring, index); - } pw_stream_queue_buffer(impl->stream, buf); flush_packets(impl);