From 8ecfcbf884dca9cbb940ade6e38c299c535aa5af Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 7 Dec 2022 21:18:16 +0100 Subject: [PATCH] loop: support recursive loop flush Always append the item to the ringbuffer, even if we are invoking from the thread itself. This ensure all items are always invoked in the right order. If we invoke from the thread, flush all items of the ringbuffer and return. Make sure to set the callback to NULL before invoking so that recursive invoke doesn't call it again. When while flushing the items we get a recursive invoke, detect this with a counter and return immediately. --- spa/plugins/support/loop.c | 56 ++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index c705a273b..99cd9d170 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -89,7 +89,7 @@ struct impl { uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; - unsigned int flushing:1; + uint32_t flush_count; unsigned int polling:1; }; @@ -166,27 +166,35 @@ static int loop_remove_source(void *object, struct spa_source *source) static void flush_items(struct impl *impl) { - uint32_t index; - int32_t avail; + uint32_t index, flush_count; int res; - impl->flushing = true; - avail = spa_ringbuffer_get_read_index(&impl->buffer, &index); - while (avail > 0) { + flush_count = ++impl->flush_count; + while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { struct invoke_item *item; bool block; + spa_invoke_func_t func; item = SPA_PTROFF(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); block = item->block; + func = item->func; spa_log_trace_fp(impl->log, "%p: flush item %p", impl, item); - item->res = item->func ? item->func(&impl->loop, - true, item->seq, item->data, item->size, - item->user_data) : 0; + /* first we remove the function from the item so that recursive + * calls don't call the callback again. We can't update the + * read index before we call the function because then the item + * might get overwritten. */ + item->func = NULL; + if (func) + item->res = func(&impl->loop, true, item->seq, item->data, + item->size, item->user_data); - index += item->item_size; - avail -= item->item_size; - spa_ringbuffer_read_update(&impl->buffer, index); + /* if this function did a recursive invoke, it now flushed the + * ringbuffer and we can exit */ + if (flush_count != impl->flush_count) + break; + + spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); if (block) { if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) @@ -194,21 +202,6 @@ static void flush_items(struct impl *impl) impl, impl->ack_fd, spa_strerror(res)); } } - impl->flushing = false; -} - -static int -loop_invoke_inthread(struct impl *impl, - spa_invoke_func_t func, - uint32_t seq, - const void *data, - size_t size, - bool block, - void *user_data) -{ - if (!impl->flushing) - flush_items(impl); - return func ? func(&impl->loop, true, seq, data, size, user_data) : 0; } static int @@ -226,9 +219,6 @@ loop_invoke(void *object, int32_t filled; uint32_t avail, idx, offset, l0; - if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) - return loop_invoke_inthread(impl, func, seq, data, size, block, user_data); - filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); if (filled < 0 || filled > DATAS_SIZE) { spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled); @@ -251,6 +241,7 @@ loop_invoke(void *object, item->size = size; item->block = block; item->user_data = user_data; + item->res = 0; item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", impl, item, filled); @@ -279,6 +270,11 @@ loop_invoke(void *object, spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) { + flush_items(impl); + return item->res; + } + loop_signal_event(impl, impl->wakeup); if (block) {