From 8b23a8a89eacf700f2e5d1a8fb3d912ec81240d6 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 8 May 2024 12:21:54 +0200 Subject: [PATCH] loop: flush items in the order they were added Add a count to each invoke item that is updated with an increasing loop atomic counter. Flush items from the queues based on their count so that items are flushed in the order they were added even if they were added to different queues. --- spa/plugins/support/loop.c | 62 +++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index b71a56782..aae27b732 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -41,6 +43,7 @@ struct invoke_item { size_t item_size; spa_invoke_func_t func; uint32_t seq; + uint32_t count; void *data; size_t size; bool block; @@ -73,6 +76,8 @@ struct impl { tss_t queue_tss_id; pthread_mutex_t queue_lock; + uint32_t count; + uint32_t flush_count; unsigned int polling:1; }; @@ -87,8 +92,6 @@ struct queue { struct spa_ringbuffer buffer; uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; - - uint32_t flush_count; }; struct source_impl { @@ -169,41 +172,57 @@ static int loop_remove_source(void *object, struct spa_source *source) return res; } -static void queue_flush_items(struct queue *queue) +static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { - struct impl *impl = queue->impl; - uint32_t index, flush_count; - int32_t avail; + return (int32_t)(a->count - b->count); +} + +static void flush_all_queues(struct impl *impl) +{ + uint32_t flush_count; int res; - flush_count = ++queue->flush_count; - avail = spa_ringbuffer_get_read_index(&queue->buffer, &index); - while (avail > 0) { - struct invoke_item *item; - bool block; + pthread_mutex_lock(&impl->queue_lock); + flush_count = ++impl->flush_count; + while (true) { + struct queue *cqueue, *queue; + struct invoke_item *citem, *item = NULL; + uint32_t cindex, index; spa_invoke_func_t func; + bool block; - item = SPA_PTROFF(queue->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); - block = item->block; - func = item->func; + spa_list_for_each(cqueue, &impl->queue_list, link) { + if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < + (int32_t)sizeof(struct invoke_item)) + continue; + citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); + + if (item == NULL || item_compare(citem, item) < 0) { + item = citem; + queue = cqueue; + index = cindex; + } + } + if (item == NULL) + break; spa_log_trace_fp(impl->log, "%p: flush item %p", queue, item); /* first we remove the function from the item so that recursive * calls don't call the callback again. We can't update the * read index before we call the function because then the item * might get overwritten. */ - item->func = NULL; + func = spa_steal_ptr(item->func); if (func) item->res = func(&impl->loop, true, item->seq, item->data, item->size, item->user_data); /* if this function did a recursive invoke, it now flushed the * ringbuffer and we can exit */ - if (flush_count != queue->flush_count) + if (flush_count != impl->flush_count) break; index += item->item_size; - avail -= item->item_size; + block = item->block; spa_ringbuffer_read_update(&queue->buffer, index); if (block) { @@ -212,14 +231,6 @@ static void queue_flush_items(struct queue *queue) queue, queue->ack_fd, spa_strerror(res)); } } -} - -static void flush_all_queues(struct impl *impl) -{ - struct queue *queue; - pthread_mutex_lock(&impl->queue_lock); - spa_list_for_each(queue, &impl->queue_list, link) - queue_flush_items(queue); pthread_mutex_unlock(&impl->queue_lock); } @@ -261,6 +272,7 @@ retry: item = SPA_PTROFF(queue->buffer_data, offset, struct invoke_item); item->func = func; item->seq = seq; + item->count = SPA_ATOMIC_INC(impl->count); item->size = size; item->block = in_thread ? false : block; item->user_data = user_data;