From a4b553f3d45f4b7d421e251372bd1e661b8d586a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 3 Dec 2024 16:38:28 +0100 Subject: [PATCH] spa: serialize in_thread flushes with a mutex When we have no thread running the loop, we need to flush the queues from the invoking thread. Make sure that when multiple threads attempt this that we serialize the flushing because the flushing code is not thread safe. --- spa/plugins/support/loop.c | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index a098b9349..8af61e7ae 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -90,6 +90,7 @@ struct impl { uint32_t n_queues; struct queue *queues[QUEUES_MAX]; + pthread_mutex_t queue_lock; int poll_fd; pthread_t thread; @@ -390,8 +391,7 @@ loop_queue_invoke(void *object, const void *data, size_t size, bool block, - void *user_data, - bool in_thread) + void *user_data) { struct queue *queue = object, *orig = queue, *overflow; struct impl *impl = queue->impl; @@ -399,8 +399,13 @@ loop_queue_invoke(void *object, int res; int32_t filled; uint32_t avail, idx, offset, l0; + bool in_thread; + pthread_t loop_thread, current_thread = pthread_self(); again: + loop_thread = impl->thread; + in_thread = (loop_thread == 0 || pthread_equal(loop_thread, current_thread)); + filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); avail = (uint32_t)(DATAS_SIZE - filled); @@ -448,7 +453,18 @@ again: if (in_thread) { put_queue(impl, orig); + + /* when there is no thread running the loop we flush the queues from + * this invoking thread but we need to serialize the flushing here with + * a mutex */ + if (loop_thread == 0) + pthread_mutex_lock(&impl->queue_lock); + flush_all_queues(impl); + + if (loop_thread == 0) + pthread_mutex_unlock(&impl->queue_lock); + res = item->res; } else { loop_signal_event(impl, impl->wakeup); @@ -504,7 +520,6 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, struct queue *queue; int res = 0, suppressed; uint64_t nsec; - bool in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); while (true) { queue = get_queue(impl); @@ -527,7 +542,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, } usleep(impl->retry_timeout); } else { - res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread); + res = loop_queue_invoke(queue, func, seq, data, size, block, user_data); break; } } @@ -1181,6 +1196,8 @@ static int impl_clear(struct spa_handle *handle) spa_system_close(impl->system, impl->poll_fd); + pthread_mutex_destroy(&impl->queue_lock); + return 0; } @@ -1209,6 +1226,7 @@ impl_init(const struct spa_handle_factory *factory, { struct impl *impl; const char *str; + pthread_mutexattr_t attr; int res; spa_return_val_if_fail(factory != NULL, -EINVAL); @@ -1242,6 +1260,10 @@ impl_init(const struct spa_handle_factory *factory, impl->retry_timeout = atoi(str); } + CHECK(pthread_mutexattr_init(&attr), error_exit); + CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit); + CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit); + impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); spa_log_topic_init(impl->log, &log_topic); impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); @@ -1249,12 +1271,12 @@ impl_init(const struct spa_handle_factory *factory, if (impl->system == NULL) { spa_log_error(impl->log, "%p: a System is needed", impl); res = -EINVAL; - goto error_exit; + goto error_exit_free_mutex; } if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { spa_log_error(impl->log, "%p: can't create pollfd: %s", impl, spa_strerror(res)); - goto error_exit; + goto error_exit_free_mutex; } impl->poll_fd = res; @@ -1277,6 +1299,8 @@ impl_init(const struct spa_handle_factory *factory, error_exit_free_poll: spa_system_close(impl->system, impl->poll_fd); +error_exit_free_mutex: + pthread_mutex_destroy(&impl->queue_lock); error_exit: return res; }