diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index b71a56782..aae27b732 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -41,6 +43,7 @@ struct invoke_item { size_t item_size; spa_invoke_func_t func; uint32_t seq; + uint32_t count; void *data; size_t size; bool block; @@ -73,6 +76,8 @@ struct impl { tss_t queue_tss_id; pthread_mutex_t queue_lock; + uint32_t count; + uint32_t flush_count; unsigned int polling:1; }; @@ -87,8 +92,6 @@ struct queue { struct spa_ringbuffer buffer; uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; - - uint32_t flush_count; }; struct source_impl { @@ -169,41 +172,57 @@ static int loop_remove_source(void *object, struct spa_source *source) return res; } -static void queue_flush_items(struct queue *queue) +static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { - struct impl *impl = queue->impl; - uint32_t index, flush_count; - int32_t avail; + return (int32_t)(a->count - b->count); +} + +static void flush_all_queues(struct impl *impl) +{ + uint32_t flush_count; int res; - flush_count = ++queue->flush_count; - avail = spa_ringbuffer_get_read_index(&queue->buffer, &index); - while (avail > 0) { - struct invoke_item *item; - bool block; + pthread_mutex_lock(&impl->queue_lock); + flush_count = ++impl->flush_count; + while (true) { + struct queue *cqueue, *queue; + struct invoke_item *citem, *item = NULL; + uint32_t cindex, index; spa_invoke_func_t func; + bool block; - item = SPA_PTROFF(queue->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); - block = item->block; - func = item->func; + spa_list_for_each(cqueue, &impl->queue_list, link) { + if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < + (int32_t)sizeof(struct invoke_item)) + continue; + citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); + + if (item == NULL || item_compare(citem, item) < 0) { + item = citem; + queue = cqueue; + index = cindex; + } + } + if (item == NULL) + break; spa_log_trace_fp(impl->log, "%p: flush item %p", queue, item); /* first we remove the function from the item so that recursive * calls don't call the callback again. We can't update the * read index before we call the function because then the item * might get overwritten. */ - item->func = NULL; + func = spa_steal_ptr(item->func); if (func) item->res = func(&impl->loop, true, item->seq, item->data, item->size, item->user_data); /* if this function did a recursive invoke, it now flushed the * ringbuffer and we can exit */ - if (flush_count != queue->flush_count) + if (flush_count != impl->flush_count) break; index += item->item_size; - avail -= item->item_size; + block = item->block; spa_ringbuffer_read_update(&queue->buffer, index); if (block) { @@ -212,14 +231,6 @@ static void queue_flush_items(struct queue *queue) queue, queue->ack_fd, spa_strerror(res)); } } -} - -static void flush_all_queues(struct impl *impl) -{ - struct queue *queue; - pthread_mutex_lock(&impl->queue_lock); - spa_list_for_each(queue, &impl->queue_list, link) - queue_flush_items(queue); pthread_mutex_unlock(&impl->queue_lock); } @@ -261,6 +272,7 @@ retry: item = SPA_PTROFF(queue->buffer_data, offset, struct invoke_item); item->func = func; item->seq = seq; + item->count = SPA_ATOMIC_INC(impl->count); item->size = size; item->block = in_thread ? false : block; item->user_data = user_data;