From 5fe0a7e57537428450f7cdfcc91209aa6e91a554 Mon Sep 17 00:00:00 2001 From: hackerman-kl Date: Sun, 31 May 2026 21:21:16 +0200 Subject: [PATCH] milan-avb: pace the talker flush timer on the RT data loop --- src/modules/module-avb/avb.c | 7 ++++ src/modules/module-avb/internal.h | 1 + src/modules/module-avb/stream.c | 59 +++++++++++++++++++++---------- 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/src/modules/module-avb/avb.c b/src/modules/module-avb/avb.c index eeacb7b7c..207155d18 100644 --- a/src/modules/module-avb/avb.c +++ b/src/modules/module-avb/avb.c @@ -41,6 +41,10 @@ struct pw_avb *pw_avb_new(struct pw_context *context, impl->context = context; impl->loop = pw_context_get_main_loop(context); + /* Acquire an RT loop (data.rt class) so the talker flush timer is paced + * under SCHED_FIFO; main-loop scheduling jitter exceeds the 2ms class-A + * presentation margin and produces late timestamps at the listener. */ + impl->data_loop = pw_context_acquire_loop(context, NULL); impl->timer_queue = pw_context_get_timer_queue(context); impl->props = props; impl->core = pw_context_get_object(context, PW_TYPE_INTERFACE_Core); @@ -80,6 +84,9 @@ static void impl_free(struct impl *impl) spa_list_consume(s, &impl->servers, link) avdecc_server_free(s); + if (impl->data_loop != NULL) { + pw_context_release_loop(impl->context, impl->data_loop); + } free(impl); } diff --git a/src/modules/module-avb/internal.h b/src/modules/module-avb/internal.h index 3298ffbaf..f41d50832 100644 --- a/src/modules/module-avb/internal.h +++ b/src/modules/module-avb/internal.h @@ -40,6 +40,7 @@ struct avb_transport_ops { struct impl { struct pw_loop *loop; + struct pw_loop *data_loop; /* RT (SCHED_FIFO) loop for talker egress pacing */ struct pw_timer_queue *timer_queue; struct pw_context *context; struct spa_hook context_listener; diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index 872dcc0f4..d784a9a61 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -267,6 +267,43 @@ static void on_flush_tick(void *data, uint64_t expirations) } } +/* Talker egress pacing runs on the RT data loop (impl->data_loop). A source + * cannot be added to or removed from a running loop off-thread, so the flush + * timer is created and destroyed ON the RT thread via pw_loop_invoke. */ +static int do_add_flush_timer(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct stream *stream = user_data; + struct pw_loop *dl = stream->server->impl->data_loop; + struct timespec value = { + .tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC), + .tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC), + }; + struct timespec interval = value; + + stream->flush_last_ns = 0; + stream->flush_timer = pw_loop_add_timer(dl, on_flush_tick, stream); + if (stream->flush_timer != NULL) { + pw_loop_update_timer(dl, stream->flush_timer, &value, &interval, false); + } else { + pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)", stream); + } + return 0; +} + +static int do_remove_flush_timer(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct stream *stream = user_data; + + if (stream->flush_timer != NULL) { + pw_loop_destroy_source(stream->server->impl->data_loop, stream->flush_timer); + stream->flush_timer = NULL; + stream->flush_last_ns = 0; + } + return 0; +} + static void on_source_stream_process(void *data) { struct stream *stream = data; @@ -1436,21 +1473,8 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) stream_out_mark_counters_dirty(stream); if (stream->flush_timer == NULL) { - struct timespec value = { - .tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC), - .tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC), - }; - struct timespec interval = value; - stream->flush_last_ns = 0; - stream->flush_timer = pw_loop_add_timer(server->impl->loop, - on_flush_tick, stream); - if (stream->flush_timer) - pw_loop_update_timer(server->impl->loop, - stream->flush_timer, - &value, &interval, false); - else - pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)", - stream); + pw_loop_invoke(server->impl->data_loop, do_add_flush_timer, + 0, NULL, 0, true, stream); } } @@ -1470,9 +1494,8 @@ int stream_deactivate(struct stream *stream, uint64_t now) stream->source = NULL; } if (stream->flush_timer != NULL) { - pw_loop_destroy_source(stream->server->impl->loop, stream->flush_timer); - stream->flush_timer = NULL; - stream->flush_last_ns = 0; + pw_loop_invoke(stream->server->impl->data_loop, do_remove_flush_timer, + 0, NULL, 0, true, stream); } /* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the * reservation immediately (Leave) instead of holding stale state until its