From 6cf320e3871333cbea5fcd49ea8eb4058c76cfc1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 14 Nov 2024 15:58:09 +0100 Subject: [PATCH] loop: handle queue overflow better When a queue overflows we place the queue back in the stack and try again. Because it's at the top of the stack we take exactly the same queue and keep on looping forever if the other thread is blocked for some reason. Instead, mark the queue as overflowed and only place it back in the stack when we have flushed it. This avoids a deadlock when the main-thread invokes on the data loop and blocks and when the data loop invokes on the main-thread and overflows the queue. --- spa/plugins/support/loop.c | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index deed3b177..3ddbb1cc4 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -83,6 +83,7 @@ struct impl { struct spa_list destroy_list; struct spa_hook_list hooks_list; + struct spa_ratelimit rate_limit; int retry_timeout; union tag head; @@ -109,13 +110,13 @@ struct queue { uint16_t next; int ack_fd; - struct spa_ratelimit rate_limit; struct spa_ringbuffer buffer; uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; -}; + bool overflowed; +}; struct source_impl { struct spa_source source; @@ -223,9 +224,6 @@ static struct queue *loop_create_queue(void *object) queue->impl = impl; queue->ack_fd = -1; - queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; - queue->rate_limit.burst = 1; - queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); spa_ringbuffer_init(&queue->buffer); @@ -336,10 +334,15 @@ static void flush_all_queues(struct impl *impl) for (i = 0; i < n_queues; i++) { cqueue = impl->queues[i]; if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < - (int32_t)sizeof(struct invoke_item)) + (int32_t)sizeof(struct invoke_item)) { + if (SPA_ATOMIC_LOAD(cqueue->overflowed) && + SPA_ATOMIC_CAS(cqueue->overflowed, true, false)) + put_queue(impl, cqueue); continue; + } - citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); + citem = SPA_PTROFF(cqueue->buffer_data, + cindex & (DATAS_SIZE - 1), struct invoke_item); if (item == NULL || item_compare(citem, item) < 0) { item = citem; @@ -389,7 +392,8 @@ loop_queue_invoke(void *object, const void *data, size_t size, bool block, - void *user_data) + void *user_data, + bool in_thread) { struct queue *queue = object; struct impl *impl = queue->impl; @@ -397,9 +401,6 @@ loop_queue_invoke(void *object, int res; int32_t filled; uint32_t avail, idx, offset, l0; - bool in_thread; - - in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); @@ -476,7 +477,10 @@ loop_queue_invoke(void *object, } return res; xrun: - put_queue(impl, queue); + /* we overflowed the queue, we can't push back the queue here + * because then we will just pop it off again, so set a flag. We will + * push the queue back for reuse after we flush. */ + SPA_ATOMIC_STORE(queue->overflowed, true); return -ENOSPC; } @@ -493,6 +497,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, struct queue *queue; int res = 0, suppressed; uint64_t nsec; + bool in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); while (true) { queue = get_queue(impl); @@ -507,17 +512,18 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, * and retry to get a queue */ if (impl->retry_timeout == 0) return -EPIPE; + usleep(impl->retry_timeout); } else { - res = loop_queue_invoke(queue, func, seq, data, size, block, user_data); + res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread); if (SPA_LIKELY(res != -ENOSPC)) break; } /* the queue was full or no more queues, retry and use another queue */ nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", - queue, suppressed); + impl, suppressed); } } return res; @@ -1220,6 +1226,8 @@ impl_init(const struct spa_handle_factory *factory, SPA_VERSION_LOOP_UTILS, &impl_loop_utils, impl); + impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; + impl->rate_limit.burst = 1; impl->retry_timeout = DEFAULT_RETRY; if (info) { if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL &&