milan-avb: pace the talker flush timer on the RT data loop

This commit is contained in:
hackerman-kl 2026-05-31 21:21:16 +02:00 committed by Wim Taymans
parent 895e3a4fa1
commit 5fe0a7e575
3 changed files with 49 additions and 18 deletions

View file

@ -41,6 +41,10 @@ struct pw_avb *pw_avb_new(struct pw_context *context,
impl->context = context;
impl->loop = pw_context_get_main_loop(context);
/* Acquire an RT loop (data.rt class) so the talker flush timer is paced
* under SCHED_FIFO; main-loop scheduling jitter exceeds the 2ms class-A
* presentation margin and produces late timestamps at the listener. */
impl->data_loop = pw_context_acquire_loop(context, NULL);
impl->timer_queue = pw_context_get_timer_queue(context);
impl->props = props;
impl->core = pw_context_get_object(context, PW_TYPE_INTERFACE_Core);
@ -80,6 +84,9 @@ static void impl_free(struct impl *impl)
spa_list_consume(s, &impl->servers, link)
avdecc_server_free(s);
if (impl->data_loop != NULL) {
pw_context_release_loop(impl->context, impl->data_loop);
}
free(impl);
}

View file

@ -40,6 +40,7 @@ struct avb_transport_ops {
struct impl {
struct pw_loop *loop;
struct pw_loop *data_loop; /* RT (SCHED_FIFO) loop for talker egress pacing */
struct pw_timer_queue *timer_queue;
struct pw_context *context;
struct spa_hook context_listener;

View file

@ -267,6 +267,43 @@ static void on_flush_tick(void *data, uint64_t expirations)
}
}
/* Talker egress pacing runs on the RT data loop (impl->data_loop). A source
* cannot be added to or removed from a running loop off-thread, so the flush
* timer is created and destroyed ON the RT thread via pw_loop_invoke. */
static int do_add_flush_timer(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct stream *stream = user_data;
struct pw_loop *dl = stream->server->impl->data_loop;
struct timespec value = {
.tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC),
.tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC),
};
struct timespec interval = value;
stream->flush_last_ns = 0;
stream->flush_timer = pw_loop_add_timer(dl, on_flush_tick, stream);
if (stream->flush_timer != NULL) {
pw_loop_update_timer(dl, stream->flush_timer, &value, &interval, false);
} else {
pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)", stream);
}
return 0;
}
static int do_remove_flush_timer(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct stream *stream = user_data;
if (stream->flush_timer != NULL) {
pw_loop_destroy_source(stream->server->impl->data_loop, stream->flush_timer);
stream->flush_timer = NULL;
stream->flush_last_ns = 0;
}
return 0;
}
static void on_source_stream_process(void *data)
{
struct stream *stream = data;
@ -1436,21 +1473,8 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
stream_out_mark_counters_dirty(stream);
if (stream->flush_timer == NULL) {
struct timespec value = {
.tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC),
.tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC),
};
struct timespec interval = value;
stream->flush_last_ns = 0;
stream->flush_timer = pw_loop_add_timer(server->impl->loop,
on_flush_tick, stream);
if (stream->flush_timer)
pw_loop_update_timer(server->impl->loop,
stream->flush_timer,
&value, &interval, false);
else
pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)",
stream);
pw_loop_invoke(server->impl->data_loop, do_add_flush_timer,
0, NULL, 0, true, stream);
}
}
@ -1470,9 +1494,8 @@ int stream_deactivate(struct stream *stream, uint64_t now)
stream->source = NULL;
}
if (stream->flush_timer != NULL) {
pw_loop_destroy_source(stream->server->impl->loop, stream->flush_timer);
stream->flush_timer = NULL;
stream->flush_last_ns = 0;
pw_loop_invoke(stream->server->impl->data_loop, do_remove_flush_timer,
0, NULL, 0, true, stream);
}
/* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the
* reservation immediately (Leave) instead of holding stale state until its