diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 18dd9d0ee..1563e1917 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -376,7 +376,7 @@ static void set_timer(struct impl *impl, uint64_t time, uint64_t itime) ts.it_interval.tv_nsec = itime % SPA_NSEC_PER_SEC; spa_system_timerfd_settime(impl->data_loop->system, impl->timer->fd, SPA_FD_TIMER_ABSTIME, &ts, NULL); - impl->timer_running = time != 0 && itime != 0; + set_timer_running(impl, time != 0 && itime != 0); } static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uint64_t set_timestamp) @@ -448,7 +448,7 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin spa_ringbuffer_read_update(&impl->ring, timestamp); done: - if (impl->timer_running) { + if (is_timer_running(impl)) { if (get_internal_stream_state(impl) != RTP_STREAM_INTERNAL_STATE_STOPPING) { /* If the stream isn't being stopped, and instead is running, * keep the timer running if there was sufficient data to diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index f83019a3e..bb9da4995 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -141,7 +141,14 @@ struct impl { struct pw_loop *main_loop; struct pw_loop *data_loop; struct spa_source *timer; - bool timer_running; + /* IMPORTANT: Do NOT access this value directly. Use the atomic + * set_timer_running() / is_timer_running() accessors, since the + * flag is accessed by both the dataloop and mainloop. To prevent + * memory visibility issues, atomic accessors need to be used. + * + * Also, its type here is uint8_t. See the explanation about atomic + * access below for the reason why. */ + uint8_t timer_running; int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); /* Used for resetting the ring buffer before the stream starts, to prevent @@ -208,6 +215,21 @@ static inline void set_internal_stream_state(struct impl *impl, enum rtp_stream_ SPA_ATOMIC_STORE(impl->internal_state, (uint32_t)state); } +/* Similar to the atomic internal_state accessors, these safeguard + * the timer_running flag, which can be accessed both by stream_stop() + * and the flush_timeout, which are called in separate threads. + * Since timer_running and internal_state are accessed independently, + * they are treated as two independent atomic variables instead of two + * resources under a common mutex. */ + +static inline bool is_timer_running(struct impl *impl) { + return (bool)SPA_ATOMIC_LOAD(impl->timer_running); +} + +static inline void set_timer_running(struct impl *impl, bool running) { + SPA_ATOMIC_STORE(impl->timer_running, (uint8_t)(running ? 1 : 0)); +} + static int do_finish_stopping_state(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { int res = 0; @@ -381,6 +403,8 @@ static int stream_start(struct impl *impl) static int stream_stop(struct impl *impl) { + bool timer_running; + switch (get_internal_stream_state(impl)) { case RTP_STREAM_INTERNAL_STATE_STOPPING: case RTP_STREAM_INTERNAL_STATE_STOPPED: @@ -391,10 +415,12 @@ static int stream_stop(struct impl *impl) set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPING); + timer_running = is_timer_running(impl); + /* Proper stop is only possible if the timer is currently not running, * because a stop involves closing the connection. If the timer is still * running, it needs an open connection for sending out remaining packets. */ - if (!impl->timer_running) { + if (!timer_running) { int res; pw_log_info("closing connection as part of stopping the stream"); rtp_stream_emit_close_connection(impl, &res); @@ -422,7 +448,7 @@ static int stream_stop(struct impl *impl) /* Only switch to STOPPED if the stream could _actually_ be stopped, * meaning that the timer was no longer running, and the connection * could be closed. */ - if (!impl->timer_running) { + if (!timer_running) { set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED); pw_log_info("stream stopped"); }