From 8ff40e6252a81280fefbbb45bde3e7b9a9617815 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 29 Apr 2024 15:56:00 +0200 Subject: [PATCH] loop: improve in_thread handling of invoke queue Because we now have a dedicated queue per thread, we can simply add our invoke item to the queue and then flush all the queues when we are running in the thread of the loop. This simplifies some things and removes potential out-of-order messages that got queued while flushing. --- spa/include/spa/support/loop.h | 3 +- spa/plugins/support/loop.c | 68 ++++++++++++++-------------------- 2 files changed, 28 insertions(+), 43 deletions(-) diff --git a/spa/include/spa/support/loop.h b/spa/include/spa/support/loop.h index f0e6df8d4..7dc55f3ae 100644 --- a/spa/include/spa/support/loop.h +++ b/spa/include/spa/support/loop.h @@ -94,8 +94,7 @@ struct spa_loop_methods { struct spa_source *source); /** Invoke a function in the context of this loop. - * May be called from the loop's thread, but otherwise - * can only be called by a single thread at a time. + * May be called from any thread and multiple threads at the same time. * If called from the loop's thread, all callbacks previously queued with * invoke() will be run synchronously, which might cause unexpected * reentrancy problems. diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index ead18328b..b71a56782 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -223,22 +223,6 @@ static void flush_all_queues(struct impl *impl) pthread_mutex_unlock(&impl->queue_lock); } -static int -loop_queue_invoke_inthread(struct queue *queue, - spa_invoke_func_t func, - uint32_t seq, - const void *data, - size_t size, - bool block, - void *user_data) -{ - /* we should probably have a second ringbuffer for the in-thread pending - * callbacks. A recursive callback when flushing will insert itself - * before this one. */ - flush_all_queues(queue->impl); - return func ? func(&queue->impl->loop, true, seq, data, size, user_data) : 0; -} - static int loop_queue_invoke(void *object, spa_invoke_func_t func, @@ -256,12 +240,9 @@ loop_queue_invoke(void *object, uint32_t avail, idx, offset, l0; size_t need; uint64_t nsec; + bool in_thread; - /* the ringbuffer can only be written to from one thread, if we are - * in the same thread as the loop, don't write into the ringbuffer - * but try to emit the calback right away after flushing what we have */ - if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) - return loop_queue_invoke_inthread(queue, func, seq, data, size, block, user_data); + in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); retry: filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); @@ -281,7 +262,7 @@ retry: item->func = func; item->seq = seq; item->size = size; - item->block = block; + item->block = in_thread ? false : block; item->user_data = user_data; item->res = 0; item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); @@ -311,26 +292,31 @@ retry: spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); - loop_signal_event(impl, impl->wakeup); - - if (block) { - uint64_t count = 1; - - spa_loop_control_hook_before(&impl->hooks_list); - - if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) - spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s", - queue, queue->ack_fd, spa_strerror(res)); - - spa_loop_control_hook_after(&impl->hooks_list); - + if (in_thread) { + flush_all_queues(impl); res = item->res; - } - else { - if (seq != SPA_ID_INVALID) - res = SPA_RESULT_RETURN_ASYNC(seq); - else - res = 0; + } else { + loop_signal_event(impl, impl->wakeup); + + if (block) { + uint64_t count = 1; + + spa_loop_control_hook_before(&impl->hooks_list); + + if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) + spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s", + queue, queue->ack_fd, spa_strerror(res)); + + spa_loop_control_hook_after(&impl->hooks_list); + + res = item->res; + } + else { + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC(seq); + else + res = 0; + } } return res;