diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index c705a273b..99cd9d170 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -89,7 +89,7 @@ struct impl { uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; - unsigned int flushing:1; + uint32_t flush_count; unsigned int polling:1; }; @@ -166,27 +166,35 @@ static int loop_remove_source(void *object, struct spa_source *source) static void flush_items(struct impl *impl) { - uint32_t index; - int32_t avail; + uint32_t index, flush_count; int res; - impl->flushing = true; - avail = spa_ringbuffer_get_read_index(&impl->buffer, &index); - while (avail > 0) { + flush_count = ++impl->flush_count; + while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { struct invoke_item *item; bool block; + spa_invoke_func_t func; item = SPA_PTROFF(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); block = item->block; + func = item->func; spa_log_trace_fp(impl->log, "%p: flush item %p", impl, item); - item->res = item->func ? item->func(&impl->loop, - true, item->seq, item->data, item->size, - item->user_data) : 0; + /* first we remove the function from the item so that recursive + * calls don't call the callback again. We can't update the + * read index before we call the function because then the item + * might get overwritten. */ + item->func = NULL; + if (func) + item->res = func(&impl->loop, true, item->seq, item->data, + item->size, item->user_data); - index += item->item_size; - avail -= item->item_size; - spa_ringbuffer_read_update(&impl->buffer, index); + /* if this function did a recursive invoke, it now flushed the + * ringbuffer and we can exit */ + if (flush_count != impl->flush_count) + break; + + spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); if (block) { if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) @@ -194,21 +202,6 @@ static void flush_items(struct impl *impl) impl, impl->ack_fd, spa_strerror(res)); } } - impl->flushing = false; -} - -static int -loop_invoke_inthread(struct impl *impl, - spa_invoke_func_t func, - uint32_t seq, - const void *data, - size_t size, - bool block, - void *user_data) -{ - if (!impl->flushing) - flush_items(impl); - return func ? func(&impl->loop, true, seq, data, size, user_data) : 0; } static int @@ -226,9 +219,6 @@ loop_invoke(void *object, int32_t filled; uint32_t avail, idx, offset, l0; - if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) - return loop_invoke_inthread(impl, func, seq, data, size, block, user_data); - filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); if (filled < 0 || filled > DATAS_SIZE) { spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled); @@ -251,6 +241,7 @@ loop_invoke(void *object, item->size = size; item->block = block; item->user_data = user_data; + item->res = 0; item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", impl, item, filled); @@ -279,6 +270,11 @@ loop_invoke(void *object, spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) { + flush_items(impl); + return item->res; + } + loop_signal_event(impl, impl->wakeup); if (block) {