From fb49e0795c28c892116b03369785ba86c27c968d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 10 Mar 2025 13:31:41 +0100 Subject: [PATCH] loop: move thread-loop to support loop Add more synchronization primitives to spa loop so that we can replace the thread-loop with it. --- spa/include/spa/support/loop.h | 29 ++++-- spa/plugins/support/loop.c | 85 +++++++++++++++--- src/pipewire/loop.c | 12 --- src/pipewire/loop.h | 30 +++++++ src/pipewire/private.h | 12 --- src/pipewire/thread-loop.c | 159 ++------------------------------- 6 files changed, 133 insertions(+), 194 deletions(-) diff --git a/spa/include/spa/support/loop.h b/spa/include/spa/support/loop.h index 223964994..1c842c7bd 100644 --- a/spa/include/spa/support/loop.h +++ b/spa/include/spa/support/loop.h @@ -306,15 +306,27 @@ struct spa_loop_control_methods { * \param[in] abstime the maximum time to wait for the signal or NULL * \return 0 on success or a negative return value on error. */ - int (*wait) (void *object, struct timespec *abstime); + int (*wait) (void *object, const struct timespec *abstime); /** Signal waiters - * Wake up all thread blocked in wait. Since version 2:2 + * Wake up all threads blocked in wait. Since version 2:2 + * When wait_for_accept is set, this functions blocks until all + * threads performed accept. + * + * \param[in] object the control + * \param[in] wait_for_accept block for accept + * \return 0 on success or a negative return value on error. + */ + int (*signal) (void *object, bool wait_for_accept); + + + /** Accept signalers + * Resume the thread that signaled with wait_for accept. * * \param[in] object the control * \return 0 on success or a negative return value on error. */ - int (*signal) (void *object); + int (*accept) (void *object); }; SPA_API_LOOP int spa_loop_control_get_fd(struct spa_loop_control *object) @@ -371,15 +383,20 @@ SPA_API_LOOP int spa_loop_control_get_time(struct spa_loop_control *object, spa_loop_control, &object->iface, get_time, 2, abstime, timeout); } SPA_API_LOOP int spa_loop_control_wait(struct spa_loop_control *object, - struct timespec *abstime) + const struct timespec *abstime) { return spa_api_method_r(int, -ENOTSUP, spa_loop_control, &object->iface, wait, 2, abstime); } -SPA_API_LOOP int spa_loop_control_signal(struct spa_loop_control *object) +SPA_API_LOOP int spa_loop_control_signal(struct spa_loop_control *object, bool wait_for_accept) { return spa_api_method_r(int, -ENOTSUP, - spa_loop_control, &object->iface, signal, 2); + spa_loop_control, &object->iface, signal, 2, wait_for_accept); +} +SPA_API_LOOP int spa_loop_control_accept(struct spa_loop_control *object) +{ + return spa_api_method_r(int, -ENOTSUP, + spa_loop_control, &object->iface, accept, 2); } typedef void (*spa_source_io_func_t) (void *data, int fd, uint32_t mask); diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 5543b9f11..290fc7e68 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -92,10 +92,14 @@ struct impl { struct queue *queues[QUEUES_MAX]; pthread_mutex_t lock; pthread_cond_t cond; + pthread_cond_t accept_cond; + int n_waiting; + int n_waiting_for_accept; int poll_fd; pthread_t thread; int enter_count; + int recurse; struct spa_source *wakeup; @@ -573,6 +577,7 @@ static void loop_enter(void *object) struct impl *impl = object; pthread_t thread_id = pthread_self(); + pthread_mutex_lock(&impl->lock); if (impl->enter_count == 0) { spa_return_if_fail(impl->thread == 0); impl->thread = thread_id; @@ -597,28 +602,49 @@ static void loop_leave(void *object) if (--impl->enter_count == 0) { impl->thread = 0; - pthread_mutex_lock(&impl->lock); flush_all_queues(impl); - pthread_mutex_unlock(&impl->lock); impl->polling = false; } + pthread_mutex_unlock(&impl->lock); } static int loop_check(void *object) { struct impl *impl = object; pthread_t thread_id = pthread_self(); - return (impl->thread == 0 || pthread_equal(impl->thread, thread_id)) ? 1 : 0; + int res; + + /* we are in the thread running the loop */ + if (impl->thread == 0 || pthread_equal(impl->thread, thread_id)) + return 1; + + /* if lock taken by something else, error */ + if ((res = pthread_mutex_trylock(&impl->lock)) != 0) + return -res; + + /* we could take the lock, check if we actually locked it somewhere */ + res = impl->recurse > 0 ? 1 : -EPERM; + pthread_mutex_unlock(&impl->lock); + return res; } static int loop_lock(void *object) { struct impl *impl = object; - return -pthread_mutex_lock(&impl->lock); + int res; + + if ((res = pthread_mutex_lock(&impl->lock)) == 0) + impl->recurse++; + return -res; } static int loop_unlock(void *object) { struct impl *impl = object; - return -pthread_mutex_unlock(&impl->lock); + int res; + spa_return_val_if_fail(impl->recurse > 0, -EIO); + impl->recurse--; + if ((res = pthread_mutex_unlock(&impl->lock)) != 0) + impl->recurse++; + return -res; } static int loop_get_time(void *object, struct timespec *abstime, int64_t timeout) { @@ -633,18 +659,46 @@ static int loop_get_time(void *object, struct timespec *abstime, int64_t timeout } return 0; } -static int loop_wait(void *object, struct timespec *abstime) +static int loop_wait(void *object, const struct timespec *abstime) { struct impl *impl = object; + int res; + + impl->n_waiting++; + impl->recurse--; if (abstime) - return pthread_cond_timedwait(&impl->cond, &impl->lock, abstime); + res = pthread_cond_timedwait(&impl->cond, &impl->lock, abstime); else - return pthread_cond_wait(&impl->cond, &impl->lock); + res = pthread_cond_wait(&impl->cond, &impl->lock); + impl->recurse++; + impl->n_waiting--; + return -res; } -static int loop_signal(void *object) + +static int loop_signal(void *object, bool wait_for_accept) { struct impl *impl = object; - return pthread_cond_signal(&impl->cond); + int res; + if (impl->n_waiting > 0) + if ((res = pthread_cond_broadcast(&impl->cond)) != 0) + return -res; + + if (wait_for_accept) { + impl->n_waiting_for_accept++; + + while (impl->n_waiting_for_accept > 0) { + if ((res = pthread_cond_wait(&impl->accept_cond, &impl->lock)) != 0) + return -res; + } + } + return res; +} + +static int loop_accept(void *object) +{ + struct impl *impl = object; + impl->n_waiting_for_accept--; + return -pthread_cond_signal(&impl->accept_cond); } static inline void free_source(struct source_impl *s) @@ -689,9 +743,11 @@ static int loop_iterate_cancel(void *object, int timeout) impl->polling = true; spa_loop_control_hook_before(&impl->hooks_list); + pthread_mutex_unlock(&impl->lock); nfds = spa_system_pollfd_wait(impl->system, impl->poll_fd, ep, SPA_N_ELEMENTS(ep), timeout); + pthread_mutex_lock(&impl->lock); spa_loop_control_hook_after(&impl->hooks_list); impl->polling = false; @@ -717,13 +773,11 @@ static int loop_iterate_cancel(void *object, int timeout) if (SPA_UNLIKELY(!spa_list_is_empty(&impl->destroy_list))) process_destroy(impl); - pthread_mutex_lock(&impl->lock); for (i = 0; i < nfds; i++) { struct spa_source *s = ep[i].data; if (SPA_LIKELY(s && s->rmask)) s->func(s); } - pthread_mutex_unlock(&impl->lock); pthread_cleanup_pop(true); @@ -738,9 +792,11 @@ static int loop_iterate(void *object, int timeout) impl->polling = true; spa_loop_control_hook_before(&impl->hooks_list); + pthread_mutex_unlock(&impl->lock); nfds = spa_system_pollfd_wait(impl->system, impl->poll_fd, ep, SPA_N_ELEMENTS(ep), timeout); + pthread_mutex_lock(&impl->lock); spa_loop_control_hook_after(&impl->hooks_list); impl->polling = false; @@ -761,13 +817,11 @@ static int loop_iterate(void *object, int timeout) if (SPA_UNLIKELY(!spa_list_is_empty(&impl->destroy_list))) process_destroy(impl); - pthread_mutex_lock(&impl->lock); for (i = 0; i < nfds; i++) { struct spa_source *s = ep[i].data; if (SPA_LIKELY(s && s->rmask)) s->func(s); } - pthread_mutex_unlock(&impl->lock); for (i = 0; i < nfds; i++) { struct spa_source *s = ep[i].data; @@ -1175,6 +1229,7 @@ static const struct spa_loop_control_methods impl_loop_control_cancel = { .get_time = loop_get_time, .wait = loop_wait, .signal = loop_signal, + .accept = loop_accept, }; static const struct spa_loop_control_methods impl_loop_control = { @@ -1190,6 +1245,7 @@ static const struct spa_loop_control_methods impl_loop_control = { .get_time = loop_get_time, .wait = loop_wait, .signal = loop_signal, + .accept = loop_accept, }; static const struct spa_loop_utils_methods impl_loop_utils = { @@ -1331,6 +1387,7 @@ impl_init(const struct spa_handle_factory *factory, CHECK(pthread_condattr_setclock(&cattr, CLOCK_REALTIME), error_exit_free_mutex); CHECK(pthread_cond_init(&impl->cond, &cattr), error_exit_free_mutex); + CHECK(pthread_cond_init(&impl->accept_cond, &cattr), error_exit_free_mutex); pthread_condattr_destroy(&cattr); impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); diff --git a/src/pipewire/loop.c b/src/pipewire/loop.c index 2e7fb47d5..2d38d4bac 100644 --- a/src/pipewire/loop.c +++ b/src/pipewire/loop.c @@ -178,15 +178,3 @@ int pw_loop_set_name(struct pw_loop *loop, const char *name) snprintf(impl->name, sizeof(impl->name), "%s", name); return 0; } - -SPA_EXPORT -int pw_loop_check(struct pw_loop *loop) -{ - struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, this); - int res; - if (impl->cb && impl->cb->check) - res = impl->cb->check(impl->user_data, loop); - else - res = spa_loop_control_check(loop->control); - return res; -} diff --git a/src/pipewire/loop.h b/src/pipewire/loop.h index 8bb762170..2190344c1 100644 --- a/src/pipewire/loop.h +++ b/src/pipewire/loop.h @@ -88,6 +88,36 @@ PW_API_LOOP_IMPL int pw_loop_iterate(struct pw_loop *object, { return spa_loop_control_iterate_fast(object->control, timeout); } +PW_API_LOOP_IMPL int pw_loop_check(struct pw_loop *object) +{ + return spa_loop_control_check(object->control); +} +PW_API_LOOP_IMPL int pw_loop_lock(struct pw_loop *object) +{ + return spa_loop_control_lock(object->control); +} +PW_API_LOOP_IMPL int pw_loop_unlock(struct pw_loop *object) +{ + return spa_loop_control_unlock(object->control); +} +PW_API_LOOP_IMPL int pw_loop_get_time(struct pw_loop *object, struct timespec *abstime, int64_t timeout) +{ + return spa_loop_control_get_time(object->control, abstime, timeout); +} +PW_API_LOOP_IMPL int pw_loop_wait(struct pw_loop *object, const struct timespec *abstime) +{ + return spa_loop_control_wait(object->control, abstime); +} +PW_API_LOOP_IMPL int pw_loop_signal(struct pw_loop *object, bool wait_for_accept) +{ + return spa_loop_control_signal(object->control, wait_for_accept); +} +PW_API_LOOP_IMPL int pw_loop_accept(struct pw_loop *object) +{ + return spa_loop_control_accept(object->control); +} + + PW_API_LOOP_IMPL struct spa_source * pw_loop_add_io(struct pw_loop *object, int fd, uint32_t mask, diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 75db3fe43..3cdb258df 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -343,18 +343,6 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq, va_end(args); } -struct pw_loop_callbacks { -#define PW_VERSION_LOOP_CALLBACKS 0 - uint32_t version; - - int (*check) (void *data, struct pw_loop *loop); -}; - -void -pw_loop_set_callbacks(struct pw_loop *loop, const struct pw_loop_callbacks *cb, void *data); - -int pw_loop_check(struct pw_loop *loop); - #define ensure_loop(loop,...) ({ \ int res = pw_loop_check(loop); \ if (res != 1) { \ diff --git a/src/pipewire/thread-loop.c b/src/pipewire/thread-loop.c index 5f74f9487..36fbd0183 100644 --- a/src/pipewire/thread-loop.c +++ b/src/pipewire/thread-loop.c @@ -27,92 +27,18 @@ struct pw_thread_loop { struct spa_hook_list listener_list; - pthread_mutex_t lock; - pthread_cond_t cond; - pthread_cond_t accept_cond; - pthread_t thread; - int recurse; struct spa_hook hook; struct spa_source *event; - int n_waiting; - int n_waiting_for_accept; unsigned int created:1; unsigned int running:1; unsigned int start_signal:1; }; /** \endcond */ -static int do_lock(struct pw_thread_loop *this) -{ - int res; - if ((res = pthread_mutex_lock(&this->lock)) != 0) - pw_log_error("%p: thread:%p: %s", this, (void *) pthread_self(), strerror(res)); - else - this->recurse++; - return -res; -} - -static int do_unlock(struct pw_thread_loop *this) -{ - int res; - spa_return_val_if_fail(this->recurse > 0, -EIO); - this->recurse--; - if ((res = pthread_mutex_unlock(&this->lock)) != 0) { - pw_log_error("%p: thread:%p: %s", this, (void *) pthread_self(), strerror(res)); - this->recurse++; - } - return -res; -} - -static void impl_before(void *data) -{ - struct pw_thread_loop *this = data; - do_unlock(this); -} - -static void impl_after(void *data) -{ - struct pw_thread_loop *this = data; - do_lock(this); -} - -static const struct spa_loop_control_hooks impl_hooks = { - SPA_VERSION_LOOP_CONTROL_HOOKS, - .before = impl_before, - .after = impl_after, -}; - -static int impl_check(void *data, struct pw_loop *loop) -{ - struct pw_thread_loop *this = data; - int res; - - /* we are in the thread running the loop */ - if (spa_loop_control_check(this->loop->control) == 1) - return 1; - - /* if lock taken by something else, error */ - if ((res = pthread_mutex_trylock(&this->lock)) != 0) { - pw_log_debug("%p: thread:%p: %s", this, (void *) pthread_self(), strerror(res)); - return -res; - } - /* we could take the lock, check if we actually locked it somewhere */ - res = this->recurse > 0 ? 1 : -EPERM; - if (res < 0) - pw_log_debug("%p: thread:%p: recurse:%d", this, (void *) pthread_self(), this->recurse); - pthread_mutex_unlock(&this->lock); - return res; -} - -static const struct pw_loop_callbacks impl_callbacks = { - PW_VERSION_LOOP_CALLBACKS, - .check = impl_check, -}; - static void do_stop(void *data, uint64_t count) { struct pw_thread_loop *this = data; @@ -134,8 +60,6 @@ static struct pw_thread_loop *loop_new(struct pw_loop *loop, const struct spa_dict *props) { struct pw_thread_loop *this; - pthread_mutexattr_t attr; - pthread_condattr_t cattr; int res; this = calloc(1, sizeof(struct pw_thread_loop)); @@ -162,32 +86,13 @@ static struct pw_thread_loop *loop_new(struct pw_loop *loop, spa_hook_list_init(&this->listener_list); - CHECK(pthread_mutexattr_init(&attr), clean_this); - CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), clean_this); - CHECK(pthread_mutex_init(&this->lock, &attr), clean_this); - - CHECK(pthread_condattr_init(&cattr), clean_lock); - CHECK(pthread_condattr_setclock(&cattr, CLOCK_REALTIME), clean_lock); - - CHECK(pthread_cond_init(&this->cond, &cattr), clean_lock); - CHECK(pthread_cond_init(&this->accept_cond, &cattr), clean_cond); - if ((this->event = pw_loop_add_event(this->loop, do_stop, this)) == NULL) { res = -errno; - goto clean_acceptcond; + goto clean_this; } - pw_loop_set_callbacks(loop, &impl_callbacks, this); - pw_loop_add_hook(loop, &this->hook, &impl_hooks, this); - return this; -clean_acceptcond: - pthread_cond_destroy(&this->accept_cond); -clean_cond: - pthread_cond_destroy(&this->cond); -clean_lock: - pthread_mutex_destroy(&this->lock); clean_this: if (this->created && this->loop) pw_loop_destroy(this->loop); @@ -245,7 +150,6 @@ void pw_thread_loop_destroy(struct pw_thread_loop *loop) pw_thread_loop_stop(loop); - pw_loop_set_callbacks(loop->loop, NULL, NULL); spa_hook_remove(&loop->hook); spa_hook_list_clean(&loop->listener_list); @@ -255,10 +159,6 @@ void pw_thread_loop_destroy(struct pw_thread_loop *loop) if (loop->created) pw_loop_destroy(loop->loop); - pthread_cond_destroy(&loop->accept_cond); - pthread_cond_destroy(&loop->cond); - pthread_mutex_destroy(&loop->lock); - free(loop); } @@ -283,7 +183,6 @@ static void *do_loop(void *user_data) struct pw_thread_loop *this = user_data; int res; - do_lock(this); pw_log_debug("%p: enter thread", this); pw_loop_enter(this->loop); @@ -300,7 +199,6 @@ static void *do_loop(void *user_data) } pw_log_debug("%p: leave thread", this); pw_loop_leave(this->loop); - do_unlock(this); return NULL; } @@ -367,7 +265,7 @@ void pw_thread_loop_stop(struct pw_thread_loop *loop) SPA_EXPORT void pw_thread_loop_lock(struct pw_thread_loop *loop) { - do_lock(loop); + pw_loop_lock(loop->loop); pw_log_trace("%p", loop); } @@ -380,7 +278,7 @@ SPA_EXPORT void pw_thread_loop_unlock(struct pw_thread_loop *loop) { pw_log_trace("%p", loop); - do_unlock(loop); + pw_loop_unlock(loop->loop); } /** Signal the thread @@ -395,20 +293,8 @@ void pw_thread_loop_unlock(struct pw_thread_loop *loop) SPA_EXPORT void pw_thread_loop_signal(struct pw_thread_loop *loop, bool wait_for_accept) { - pw_log_trace("%p, waiting:%d accept:%d", - loop, loop->n_waiting, wait_for_accept); - if (loop->n_waiting > 0) - pthread_cond_broadcast(&loop->cond); - - if (wait_for_accept) { - loop->n_waiting_for_accept++; - - while (loop->n_waiting_for_accept > 0) { - int res; - if ((res = pthread_cond_wait(&loop->accept_cond, &loop->lock)) != 0) - pw_log_error("%p: thread:%p: %s", loop, (void *) pthread_self(), strerror(res)); - } - } + pw_log_trace("%p,accept:%d", loop, wait_for_accept); + pw_loop_signal(loop->loop, wait_for_accept); } /** Wait for the loop thread to call \ref pw_thread_loop_signal() @@ -419,17 +305,7 @@ void pw_thread_loop_signal(struct pw_thread_loop *loop, bool wait_for_accept) SPA_EXPORT void pw_thread_loop_wait(struct pw_thread_loop *loop) { - int res; - - pw_log_trace("%p, waiting:%d recurse:%d", loop, loop->n_waiting, loop->recurse); - spa_return_if_fail(loop->recurse > 0); - loop->n_waiting++; - loop->recurse--; - if ((res = pthread_cond_wait(&loop->cond, &loop->lock)) != 0) - pw_log_error("%p: thread:%p: %s", loop, (void *) pthread_self(), strerror(res)); - loop->recurse++; - loop->n_waiting--; - pw_log_trace("%p, waiting done %d", loop, loop->n_waiting); + pw_loop_wait(loop->loop, NULL); } /** Wait for the loop thread to call \ref pw_thread_loop_signal() @@ -464,16 +340,7 @@ int pw_thread_loop_timed_wait(struct pw_thread_loop *loop, int wait_max_sec) SPA_EXPORT int pw_thread_loop_get_time(struct pw_thread_loop *loop, struct timespec *abstime, int64_t timeout) { - if (clock_gettime(CLOCK_REALTIME, abstime) < 0) - return -errno; - - abstime->tv_sec += timeout / SPA_NSEC_PER_SEC; - abstime->tv_nsec += timeout % SPA_NSEC_PER_SEC; - if (abstime->tv_nsec >= SPA_NSEC_PER_SEC) { - abstime->tv_sec++; - abstime->tv_nsec -= SPA_NSEC_PER_SEC; - } - return 0; + return pw_loop_get_time(loop->loop, abstime, timeout); } /** Wait for the loop thread to call \ref pw_thread_loop_signal() @@ -487,14 +354,7 @@ int pw_thread_loop_get_time(struct pw_thread_loop *loop, struct timespec *abstim SPA_EXPORT int pw_thread_loop_timed_wait_full(struct pw_thread_loop *loop, const struct timespec *abstime) { - int ret; - spa_return_val_if_fail(loop->recurse > 0, -EIO); - loop->n_waiting++; - loop->recurse--; - ret = pthread_cond_timedwait(&loop->cond, &loop->lock, abstime); - loop->recurse++; - loop->n_waiting--; - return -ret; + return pw_loop_wait(loop->loop, abstime); } /** Signal the loop thread waiting for accept with \ref pw_thread_loop_signal() @@ -505,8 +365,7 @@ int pw_thread_loop_timed_wait_full(struct pw_thread_loop *loop, const struct tim SPA_EXPORT void pw_thread_loop_accept(struct pw_thread_loop *loop) { - loop->n_waiting_for_accept--; - pthread_cond_signal(&loop->accept_cond); + pw_loop_accept(loop->loop); } /** Check if we are inside the thread of the loop