From 784f00306887a70919da6663336f522d09b9fe0e Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 13 Jun 2023 09:37:28 +0200 Subject: [PATCH] stream: remove and check callbacks safely Clear the callbacks from the processing thread to avoid races. Check the callbacks in the processing thread before calling them, we just need to check if there are functions, we checked the method when installing the function. Fixes #3251 --- src/pipewire/filter.c | 19 ++++++++++++++----- src/pipewire/stream.c | 19 +++++++++++++++---- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 5ac7b1d83..934368080 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -969,10 +969,10 @@ static void call_process(struct filter *impl) { pw_log_trace_fp("%p: call process", impl); if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) { - spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events, - process, 0, impl->rt.position); - } - else { + if (impl->rt_callbacks.funcs) + spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events, + process, 0, impl->rt.position); + } else { pw_loop_invoke(impl->main_loop, do_call_process, 1, NULL, 0, false, impl); } @@ -1436,10 +1436,19 @@ void pw_filter_destroy(struct pw_filter *filter) free(impl); } +static int +do_remove_callbacks(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct filter *impl = user_data; + spa_zero(impl->rt_callbacks); + return 0; +} + static void hook_removed(struct spa_hook *hook) { struct filter *impl = hook->priv; - spa_zero(impl->rt_callbacks); + pw_loop_invoke(impl->data_loop, do_remove_callbacks, 1, NULL, 0, true, impl); hook->priv = NULL; hook->removed = NULL; } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 5572bff29..c19a2c523 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -426,11 +426,13 @@ static inline void call_process(struct stream *impl) pw_log_trace_fp("%p: call process rt:%u", impl, impl->process_rt); if (impl->direction == SPA_DIRECTION_OUTPUT && update_requested(impl) <= 0) return; - if (impl->process_rt) - spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0); - else + if (impl->process_rt) { + if (impl->rt_callbacks.funcs) + spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0); + } else { pw_loop_invoke(impl->main_loop, do_call_process, 1, NULL, 0, false, impl); + } } static int @@ -1693,10 +1695,19 @@ void pw_stream_destroy(struct pw_stream *stream) free(impl); } +static int +do_remove_callbacks(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + spa_zero(impl->rt_callbacks); + return 0; +} + static void hook_removed(struct spa_hook *hook) { struct stream *impl = hook->priv; - spa_zero(impl->rt_callbacks); + pw_loop_invoke(impl->data_loop, do_remove_callbacks, 1, NULL, 0, true, impl); hook->priv = NULL; hook->removed = NULL; }