From 864438e8e9725c94f71b88420be9f5fc5530ce74 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 28 Jan 2025 16:51:10 +0100 Subject: [PATCH] stream: don't emit drain when in progress We keep on calling the drain event for as long as we are drained. The application is supposed to inactivate the stream or provide more data at some point. Because we do this from the data thread, we use a non-blocking invoke. If for some reason the event callback takes a long time we might place a lot of these invoke calls into the invoke queue, which will then be dispatched one after another (and cause more blocking or a burst of useless invoke calls). Avoid this by only placing one drain invoke call into the queue at a time. Fixes #4529 --- src/pipewire/filter.c | 11 +++++++++-- src/pipewire/stream.c | 10 ++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index eb15eddb6..18ffb4e2a 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -146,6 +146,7 @@ struct filter { unsigned int warn_mlock:1; unsigned int trigger:1; int in_emit_param_changed; + int pending_drain; }; static int get_param_index(uint32_t id) @@ -989,13 +990,19 @@ do_call_drained(struct spa_loop *loop, struct pw_filter *filter = &impl->this; pw_log_trace("%p: drained", filter); pw_filter_emit_drained(filter); + SPA_ATOMIC_DEC(impl->pending_drain); return 0; } static void call_drained(struct filter *impl) { - pw_loop_invoke(impl->main_loop, - do_call_drained, 1, NULL, 0, false, impl); + pw_log_info("%p: drained", impl); + if (SPA_ATOMIC_INC(impl->pending_drain) == 1) { + pw_loop_invoke(impl->main_loop, + do_call_drained, 1, NULL, 0, false, impl); + } else { + SPA_ATOMIC_DEC(impl->pending_drain); + } } static int impl_node_process(void *object) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 04c4a1cff..c2186996f 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -154,6 +154,7 @@ struct stream { unsigned int trigger_done_rt:1; int in_set_param; int in_emit_param_changed; + int pending_drain; }; static int get_param_index(uint32_t id) @@ -455,14 +456,19 @@ do_call_drained(struct spa_loop *loop, struct pw_stream *stream = &impl->this; pw_log_trace_fp("%p: drained", stream); pw_stream_emit_drained(stream); + SPA_ATOMIC_DEC(impl->pending_drain); return 0; } static void call_drained(struct stream *impl) { pw_log_info("%p: drained", impl); - pw_loop_invoke(impl->main_loop, - do_call_drained, 1, NULL, 0, false, impl); + if (SPA_ATOMIC_INC(impl->pending_drain) == 1) { + pw_loop_invoke(impl->main_loop, + do_call_drained, 1, NULL, 0, false, impl); + } else { + SPA_ATOMIC_DEC(impl->pending_drain); + } } static int