diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index 2b69a0be7..cd7ca7f4e 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -1043,8 +1043,11 @@ on_data_io(void *data, int fd, uint32_t mask) if (sess == NULL) goto unknown_ssrc; - if (sess->data_ready && sess->receiving) - rtp_stream_receive_packet(sess->recv, buffer, len); + if (sess->data_ready && sess->receiving) { + uint64_t current_time = rtp_stream_get_nsec(sess->recv); + rtp_stream_receive_packet(sess->recv, buffer, len, + current_time); + } } } return; diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 3954d3137..377cc4109 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -227,13 +227,6 @@ struct impl { bool waiting; }; -static inline uint64_t get_time_ns(void) -{ - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return SPA_TIMESPEC_TO_NSEC(&ts); -} - static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { @@ -261,6 +254,9 @@ on_rtp_io(void *data, int fd, uint32_t mask) struct impl *impl = data; ssize_t len; int suppressed; + uint64_t current_time; + + current_time = rtp_stream_get_nsec(impl->stream); if (mask & SPA_IO_IN) { if ((len = recv(fd, impl->buffer, impl->buffer_size, 0)) < 0) @@ -270,10 +266,17 @@ on_rtp_io(void *data, int fd, uint32_t mask) goto short_packet; if (SPA_LIKELY(impl->stream)) { - if (rtp_stream_receive_packet(impl->stream, impl->buffer, len) < 0) + if (rtp_stream_receive_packet(impl->stream, impl->buffer, len, + current_time) < 0) goto receive_error; } + /* Update last packet timestamp for IGMP recovery. + * The recovery timer will check this to see if recovery + * is necessary. Do this _before_ invoking do_start() + * in case the stream is waking up from standby. */ + SPA_ATOMIC_STORE(impl->last_packet_time, current_time); + if (SPA_ATOMIC_LOAD(impl->state) != STATE_RECEIVING) { if (!SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_RECEIVING)) { if (SPA_ATOMIC_CAS(impl->state, STATE_IDLE, STATE_RECEIVING)) @@ -284,11 +287,11 @@ on_rtp_io(void *data, int fd, uint32_t mask) return; receive_error: - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0) pw_log_warn("(%d suppressed) recv() error: %m", suppressed); return; short_packet: - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0) pw_log_warn("(%d suppressed) short packet of len %zd received", suppressed, len); return; diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 1563e1917..0af38d649 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -233,7 +233,8 @@ static void rtp_audio_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } -static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen, plen; @@ -273,7 +274,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len) timestamp = ntohl(hdr->timestamp) - impl->ts_offset; impl->receiving = true; - impl->last_recv_timestamp = pw_stream_get_nsec(impl->stream); + impl->last_recv_timestamp = current_time; plen = len - hlen; samples = plen / stride; diff --git a/src/modules/module-rtp/midi.c b/src/modules/module-rtp/midi.c index 498c6e6a9..5fbdf3b63 100644 --- a/src/modules/module-rtp/midi.c +++ b/src/modules/module-rtp/midi.c @@ -318,7 +318,8 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti return 0; } -static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen; diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c index 7eeda7f43..d13a4efaf 100644 --- a/src/modules/module-rtp/opus.c +++ b/src/modules/module-rtp/opus.c @@ -99,7 +99,8 @@ static void rtp_opus_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } -static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen, plen; diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 3834206ec..fa055ce0c 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -151,7 +151,8 @@ struct impl { * access below for the reason why. */ uint8_t timer_running; - int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); + int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time); /* Used for resetting the ring buffer before the stream starts, to prevent * reading from uninitialized memory. This can otherwise happen in direct * timestamp mode when the read index is set to an uninitialized location. @@ -1036,10 +1037,17 @@ int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *di return pw_stream_update_properties(impl->stream, dict); } -int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, + uint64_t current_time) { struct impl *impl = (struct impl*)s; - return impl->receive_rtp(impl, buffer, len); + return impl->receive_rtp(impl, buffer, len, current_time); +} + +uint64_t rtp_stream_get_nsec(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + return pw_stream_get_nsec(impl->stream); } uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate) diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index ea358f350..095b8395c 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -62,7 +62,10 @@ void rtp_stream_destroy(struct rtp_stream *s); int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *dict); -int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len); +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, + uint64_t current_time); + +uint64_t rtp_stream_get_nsec(struct rtp_stream *s); uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate);