module-rtp: Synchronize access to timer_running flag

This commit is contained in:
Carlos Rafael Giani 2025-08-22 17:39:34 +02:00 committed by Wim Taymans
parent 37e597ff0a
commit caf72fd9bc
2 changed files with 31 additions and 5 deletions

View file

@ -376,7 +376,7 @@ static void set_timer(struct impl *impl, uint64_t time, uint64_t itime)
ts.it_interval.tv_nsec = itime % SPA_NSEC_PER_SEC;
spa_system_timerfd_settime(impl->data_loop->system,
impl->timer->fd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
impl->timer_running = time != 0 && itime != 0;
set_timer_running(impl, time != 0 && itime != 0);
}
static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uint64_t set_timestamp)
@ -448,7 +448,7 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
spa_ringbuffer_read_update(&impl->ring, timestamp);
done:
if (impl->timer_running) {
if (is_timer_running(impl)) {
if (get_internal_stream_state(impl) != RTP_STREAM_INTERNAL_STATE_STOPPING) {
/* If the stream isn't being stopped, and instead is running,
* keep the timer running if there was sufficient data to

View file

@ -141,7 +141,14 @@ struct impl {
struct pw_loop *main_loop;
struct pw_loop *data_loop;
struct spa_source *timer;
bool timer_running;
/* IMPORTANT: Do NOT access this value directly. Use the atomic
* set_timer_running() / is_timer_running() accessors, since the
* flag is accessed by both the dataloop and mainloop. To prevent
* memory visibility issues, atomic accessors need to be used.
*
* Also, its type here is uint8_t. See the explanation about atomic
* access below for the reason why. */
uint8_t timer_running;
int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len);
/* Used for resetting the ring buffer before the stream starts, to prevent
@ -208,6 +215,21 @@ static inline void set_internal_stream_state(struct impl *impl, enum rtp_stream_
SPA_ATOMIC_STORE(impl->internal_state, (uint32_t)state);
}
/* Similar to the atomic internal_state accessors, these safeguard
* the timer_running flag, which can be accessed both by stream_stop()
* and the flush_timeout, which are called in separate threads.
* Since timer_running and internal_state are accessed independently,
* they are treated as two independent atomic variables instead of two
* resources under a common mutex. */
static inline bool is_timer_running(struct impl *impl) {
return (bool)SPA_ATOMIC_LOAD(impl->timer_running);
}
static inline void set_timer_running(struct impl *impl, bool running) {
SPA_ATOMIC_STORE(impl->timer_running, (uint8_t)(running ? 1 : 0));
}
static int do_finish_stopping_state(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
int res = 0;
@ -381,6 +403,8 @@ static int stream_start(struct impl *impl)
static int stream_stop(struct impl *impl)
{
bool timer_running;
switch (get_internal_stream_state(impl)) {
case RTP_STREAM_INTERNAL_STATE_STOPPING:
case RTP_STREAM_INTERNAL_STATE_STOPPED:
@ -391,10 +415,12 @@ static int stream_stop(struct impl *impl)
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPING);
timer_running = is_timer_running(impl);
/* Proper stop is only possible if the timer is currently not running,
* because a stop involves closing the connection. If the timer is still
* running, it needs an open connection for sending out remaining packets. */
if (!impl->timer_running) {
if (!timer_running) {
int res;
pw_log_info("closing connection as part of stopping the stream");
rtp_stream_emit_close_connection(impl, &res);
@ -422,7 +448,7 @@ static int stream_stop(struct impl *impl)
/* Only switch to STOPPED if the stream could _actually_ be stopped,
* meaning that the timer was no longer running, and the connection
* could be closed. */
if (!impl->timer_running) {
if (!timer_running) {
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED);
pw_log_info("stream stopped");
}