From 78055736a25c162f6cef310442def405094cf973 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 30 Jan 2024 10:06:27 +0100 Subject: [PATCH] module-rtp: add packet timer When multiple packets need to be flushed (because sess.latency is set and larger than ptime) use a timer to space the packets uniformly in the current quantum to avoid bursts. See !1873 --- src/modules/module-rtp/audio.c | 71 ++++++++++++++++++++++++++++----- src/modules/module-rtp/stream.c | 25 +++++++++++- 2 files changed, 85 insertions(+), 11 deletions(-) diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 28ee51a39..ae03d005a 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -192,6 +192,17 @@ unexpected_ssrc: return -EINVAL; } +static void set_timer(struct impl *impl, uint64_t time, uint64_t itime) +{ + struct itimerspec ts; + ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; + ts.it_interval.tv_sec = itime / SPA_NSEC_PER_SEC; + ts.it_interval.tv_nsec = itime % SPA_NSEC_PER_SEC; + spa_system_timerfd_settime(impl->data_loop->system, + impl->timer->fd, SPA_FD_TIMER_ABSTIME, &ts, NULL); +} + static inline void set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, uint32_t offset, struct iovec *iov, uint32_t len) @@ -202,7 +213,7 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, iov[1].iov_base = buffer; } -static void rtp_audio_flush_packets(struct impl *impl) +static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets) { int32_t avail, tosend; uint32_t stride, timestamp; @@ -211,9 +222,10 @@ static void rtp_audio_flush_packets(struct impl *impl) avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); tosend = impl->psamples; - if (avail < tosend) - return; + goto done; + + num_packets = SPA_MIN(num_packets, (uint32_t)(avail / tosend)); stride = impl->stride; @@ -225,7 +237,7 @@ static void rtp_audio_flush_packets(struct impl *impl) iov[0].iov_base = &header; iov[0].iov_len = sizeof(header); - while (avail >= tosend) { + while (num_packets > 0) { if (impl->marker_on_first && impl->first) header.m = 1; else @@ -238,7 +250,8 @@ static void rtp_audio_flush_packets(struct impl *impl) (timestamp * stride) & BUFFER_MASK, &iov[1], tosend * stride); - pw_log_trace("sending %d avail:%d ts_offset:%d timestamp:%d", tosend, avail, impl->ts_offset, timestamp); + pw_log_trace("sending %d packet:%d ts_offset:%d timestamp:%d", + tosend, num_packets, impl->ts_offset, timestamp); rtp_stream_emit_send_packet(impl, iov, 3); @@ -246,8 +259,19 @@ static void rtp_audio_flush_packets(struct impl *impl) impl->first = false; timestamp += tosend; avail -= tosend; + num_packets--; } spa_ringbuffer_read_update(&impl->ring, timestamp); +done: + if (avail < tosend) + set_timer(impl, 0, 0); +} + +static void rtp_audio_flush_timeout(struct impl *impl, uint64_t expirations) +{ + if (expirations > 1) + pw_log_warn("missing timeout %"PRIu64, expirations); + rtp_audio_flush_packets(impl, expirations); } static void rtp_audio_process_capture(void *data) @@ -257,6 +281,9 @@ static void rtp_audio_process_capture(void *data) struct spa_data *d; uint32_t offs, size, timestamp, expected_timestamp, stride; int32_t filled, wanted; + uint32_t pending, num_queued; + struct spa_io_position *pos; + uint64_t next_nsec, quantum; if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_info("Out of stream buffers: %m"); @@ -271,11 +298,17 @@ static void rtp_audio_process_capture(void *data) filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); - if (SPA_LIKELY(impl->io_position)) { - uint32_t rate = impl->io_position->clock.rate.denom; - timestamp = impl->io_position->clock.position * impl->rate / rate; - } else + pos = impl->io_position; + if (SPA_LIKELY(pos)) { + uint32_t rate = pos->clock.rate.denom; + timestamp = pos->clock.position * impl->rate / rate; + next_nsec = pos->clock.next_nsec; + quantum = pos->clock.duration * SPA_NSEC_PER_SEC / (rate * pos->clock.rate_diff); + } else { timestamp = expected_timestamp; + next_nsec = 0; + quantum = 0; + } if (!impl->have_sync) { pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", @@ -284,6 +317,7 @@ static void rtp_audio_process_capture(void *data) memset(impl->buffer, 0, BUFFER_SIZE); impl->have_sync = true; expected_timestamp = timestamp; + filled = 0; } else { if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) { pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); @@ -291,6 +325,7 @@ static void rtp_audio_process_capture(void *data) } else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) { pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride); impl->have_sync = false; + filled = 0; } } @@ -304,7 +339,22 @@ static void rtp_audio_process_capture(void *data) pw_stream_queue_buffer(impl->stream, buf); - rtp_audio_flush_packets(impl); + pending = filled / impl->psamples; + num_queued = (filled + wanted) / impl->psamples; + + if (num_queued > 0) { + /* flush all previous packets plus new one right away */ + rtp_audio_flush_packets(impl, pending + 1); + num_queued -= SPA_MIN(num_queued, pending + 1); + + if (num_queued > 0) { + /* schedule timer for remaining */ + int64_t interval = quantum / (num_queued + 1); + uint64_t time = next_nsec - num_queued * interval; + pw_log_trace("%u %u %"PRIu64" %"PRIu64, pending, num_queued, time, interval); + set_timer(impl, time, interval); + } + } } static int rtp_audio_init(struct impl *impl, enum spa_direction direction) @@ -314,5 +364,6 @@ static int rtp_audio_init(struct impl *impl, enum spa_direction direction) else impl->stream_events.process = rtp_audio_process_playback; impl->receive_rtp = rtp_audio_receive; + impl->flush_timeout = rtp_audio_flush_timeout; return 0; } diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 7f5303506..386952ec0 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -85,7 +85,11 @@ struct impl { unsigned receiving:1; unsigned first:1; + struct pw_loop *data_loop; + struct spa_source *timer; + int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); + void (*flush_timeout)(struct impl *impl, uint64_t expirations); }; #include "module-rtp/audio.c" @@ -271,6 +275,12 @@ static float samples_to_msec(struct impl *impl, uint32_t samples) return samples * 1000.0f / impl->rate; } +static void on_flush_timeout(void *d, uint64_t expirations) +{ + struct impl *impl = d; + impl->flush_timeout(d, expirations); +} + struct rtp_stream *rtp_stream_new(struct pw_core *core, enum pw_direction direction, struct pw_properties *props, const struct rtp_stream_events *events, void *data) @@ -286,16 +296,26 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, enum pw_stream_flags flags; float latency_msec; int res; + struct pw_data_loop *data_loop; + struct pw_context *context; impl = calloc(1, sizeof(*impl)); if (impl == NULL) { res = -errno; goto out; - return NULL; } impl->first = true; spa_hook_list_init(&impl->listener_list); impl->stream_events = stream_events; + context = pw_core_get_context(core); + data_loop = pw_context_get_data_loop(context); + impl->data_loop = pw_data_loop_get_loop(data_loop); + impl->timer = pw_loop_add_timer(impl->data_loop, on_flush_timeout, impl); + if (impl->timer == NULL) { + res = -errno; + pw_log_error("can't create timer"); + goto out; + } if ((str = pw_properties_get(props, "sess.media")) == NULL) str = "audio"; @@ -561,6 +581,9 @@ void rtp_stream_destroy(struct rtp_stream *s) if (impl->stream) pw_stream_destroy(impl->stream); + if (impl->timer) + pw_loop_destroy_source(impl->data_loop, impl->timer); + spa_hook_list_clean(&impl->listener_list); free(impl); }