From d56d5fa87adcf01ee5b2be999dcc593cc460b7d0 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 19 May 2026 17:30:28 +0200 Subject: [PATCH] raop: implement retransmission Keep the last relation between the sequence number and the timestamp (ringbuffer position). When a retransmission is requested for a given sequence number use the relation to calculate the corresponding timestamp and retransmit the packet from the ringbuffer again. See #5276 --- src/modules/module-raop-sink.c | 1 + src/modules/module-rtp/audio.c | 125 +++++++++++++++++++++----------- src/modules/module-rtp/stream.c | 11 +++ src/modules/module-rtp/stream.h | 2 + 4 files changed, 96 insertions(+), 43 deletions(-) diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index a6ee69f76..af84ba3eb 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -689,6 +689,7 @@ on_control_source_io(void *data, int fd, uint32_t mask) case 0xd5: pw_log_debug("retransmit request seq:%u num:%u", seq, num); /* retransmit request */ + rtp_stream_resend_packets(impl->stream, seq, num); break; } } diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index e49363f75..5a7f4fd5f 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -501,12 +501,83 @@ static void set_timer(struct impl *impl, uint64_t time, uint64_t itime) set_timer_running(impl, time != 0 && itime != 0); } +static void rtp_audio_send_packets(struct impl *impl, uint32_t timestamp, uint32_t tosend, + uint16_t seq, uint16_t num, uint64_t set_timestamp, bool first) +{ + struct iovec iov[3]; + struct rtp_header header; + uint32_t stride; + + stride = impl->stride; + + spa_zero(header); + header.v = 2; + header.pt = impl->payload; + header.ssrc = htonl(impl->ssrc); + + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header); + + while (num > 0) { + uint32_t rtp_timestamp; + + if (impl->marker_on_first && first) + header.m = 1; + else + header.m = 0; + + rtp_timestamp = impl->ts_offset + impl->ts_align + (set_timestamp ? set_timestamp : timestamp); + + header.sequence_number = htons(seq); + header.timestamp = htonl(rtp_timestamp); + + set_iovec(&impl->ring, + impl->buffer, impl->actual_max_buffer_size, + ((uint64_t)timestamp * stride) % impl->actual_max_buffer_size, + &iov[1], tosend * stride); + + pw_log_trace_fp("sending %d packet:%d ts_offset:%d timestamp:%u (%f s)", + tosend, num_packets, impl->ts_offset, timestamp, + (double)timestamp * impl->io_position->clock.rate.num / + impl->io_position->clock.rate.denom); + + rtp_stream_call_send_packet(impl, iov, 3); + + seq++; + first = false; + timestamp += tosend; + num--; + } +} + +static int rtp_audio_resend_packets(struct impl *impl, uint16_t seq, uint16_t num) +{ + uint32_t tosend, last_seq, last_ts, timestamp; + int32_t diff; + uint64_t ts_seq; + + tosend = impl->psamples; + + ts_seq = SPA_ATOMIC_LOAD(impl->last_ts_seq); + last_seq = ts_seq >> 32; + last_ts = ts_seq & 0xffffffff; + + diff = (int32_t)(last_seq - seq); + if (diff < 0) + return -EIO; + + timestamp = last_ts - (diff * tosend); + + pw_log_info("resend %d -> %d %d %d", seq, timestamp, last_ts - timestamp, impl->actual_max_buffer_size); + + rtp_audio_send_packets(impl, timestamp, tosend, seq, num, 0, false); + return 0; +} + static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uint64_t set_timestamp) { int32_t avail, tosend; - uint32_t stride, timestamp; - struct iovec iov[3]; - struct rtp_header header; + uint32_t timestamp; bool insufficient_data; avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); @@ -533,48 +604,15 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin num_packets = SPA_MIN(num_packets, (uint32_t)(avail / tosend)); } - stride = impl->stride; + rtp_audio_send_packets(impl, timestamp, tosend, impl->seq, + num_packets, set_timestamp, impl->first); - spa_zero(header); - header.v = 2; - header.pt = impl->payload; - header.ssrc = htonl(impl->ssrc); - - iov[0].iov_base = &header; - iov[0].iov_len = sizeof(header); - - while (num_packets > 0) { - uint32_t rtp_timestamp; - - if (impl->marker_on_first && impl->first) - header.m = 1; - else - header.m = 0; - - rtp_timestamp = impl->ts_offset + impl->ts_align + (set_timestamp ? set_timestamp : timestamp); - - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(rtp_timestamp); - - set_iovec(&impl->ring, - impl->buffer, impl->actual_max_buffer_size, - ((uint64_t)timestamp * stride) % impl->actual_max_buffer_size, - &iov[1], tosend * stride); - - pw_log_trace_fp("sending %d packet:%d ts_offset:%d timestamp:%u (%f s)", - tosend, num_packets, impl->ts_offset, timestamp, - (double)timestamp * impl->io_position->clock.rate.num / - impl->io_position->clock.rate.denom); - - rtp_stream_call_send_packet(impl, iov, 3); - - impl->seq++; - impl->first = false; - timestamp += tosend; - avail -= tosend; - num_packets--; - } + impl->first = false; + impl->seq += num_packets; + avail -= tosend * num_packets; + timestamp += tosend * num_packets; spa_ringbuffer_read_update(&impl->ring, timestamp); + SPA_ATOMIC_STORE(impl->last_ts_seq, ((uint64_t)impl->seq << 32) | timestamp); done: if (is_timer_running(impl)) { @@ -966,6 +1004,7 @@ static int rtp_audio_init(struct impl *impl, struct pw_core *core, enum spa_dire impl->receive_rtp = rtp_audio_receive; impl->stop_timer = rtp_audio_stop_timer; impl->flush_timeout = rtp_audio_flush_timeout; + impl->resend_packets = rtp_audio_resend_packets; setup_ptp_sender(impl, core, direction, ptp_driver); diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 40d6dcf58..a1da20420 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -172,6 +172,7 @@ struct impl { void (*stop_timer)(struct impl *impl); void (*flush_timeout)(struct impl *impl, uint64_t expirations); void (*deinit)(struct impl *impl, enum spa_direction direction); + int (*resend_packets)(struct impl *impl, uint16_t seq, uint16_t num); /* * pw_filter where the filter would be driven at the PTP clock @@ -194,6 +195,8 @@ struct impl { uint64_t rtp_base_ts; uint32_t rtp_last_ts; + uint64_t last_ts_seq; + /* The process latency, set by on_stream_param_changed(). */ struct spa_process_latency_info process_latency; }; @@ -1059,6 +1062,14 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, struct impl *impl = (struct impl*)s; return impl->receive_rtp(impl, buffer, len, current_time); } +int rtp_stream_resend_packets(struct rtp_stream *s, uint16_t seq, uint16_t num) +{ + struct impl *impl = (struct impl*)s; + if (impl->resend_packets) + return impl->resend_packets(impl, seq, num); + else + return -ENOTSUP; +} uint64_t rtp_stream_get_nsec(struct rtp_stream *s) { diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 2c6e6dea5..f7f061fc9 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -66,6 +66,8 @@ int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *di int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, uint64_t current_time); +int rtp_stream_resend_packets(struct rtp_stream *s, uint16_t seq, uint16_t num); + uint64_t rtp_stream_get_nsec(struct rtp_stream *s); uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate);