diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index deed3b177..3ddbb1cc4 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -83,6 +83,7 @@ struct impl { struct spa_list destroy_list; struct spa_hook_list hooks_list; + struct spa_ratelimit rate_limit; int retry_timeout; union tag head; @@ -109,13 +110,13 @@ struct queue { uint16_t next; int ack_fd; - struct spa_ratelimit rate_limit; struct spa_ringbuffer buffer; uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; -}; + bool overflowed; +}; struct source_impl { struct spa_source source; @@ -223,9 +224,6 @@ static struct queue *loop_create_queue(void *object) queue->impl = impl; queue->ack_fd = -1; - queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; - queue->rate_limit.burst = 1; - queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); spa_ringbuffer_init(&queue->buffer); @@ -336,10 +334,15 @@ static void flush_all_queues(struct impl *impl) for (i = 0; i < n_queues; i++) { cqueue = impl->queues[i]; if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < - (int32_t)sizeof(struct invoke_item)) + (int32_t)sizeof(struct invoke_item)) { + if (SPA_ATOMIC_LOAD(cqueue->overflowed) && + SPA_ATOMIC_CAS(cqueue->overflowed, true, false)) + put_queue(impl, cqueue); continue; + } - citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); + citem = SPA_PTROFF(cqueue->buffer_data, + cindex & (DATAS_SIZE - 1), struct invoke_item); if (item == NULL || item_compare(citem, item) < 0) { item = citem; @@ -389,7 +392,8 @@ loop_queue_invoke(void *object, const void *data, size_t size, bool block, - void *user_data) + void *user_data, + bool in_thread) { struct queue *queue = object; struct impl *impl = queue->impl; @@ -397,9 +401,6 @@ loop_queue_invoke(void *object, int res; int32_t filled; uint32_t avail, idx, offset, l0; - bool in_thread; - - in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); @@ -476,7 +477,10 @@ loop_queue_invoke(void *object, } return res; xrun: - put_queue(impl, queue); + /* we overflowed the queue, we can't push back the queue here + * because then we will just pop it off again, so set a flag. We will + * push the queue back for reuse after we flush. */ + SPA_ATOMIC_STORE(queue->overflowed, true); return -ENOSPC; } @@ -493,6 +497,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, struct queue *queue; int res = 0, suppressed; uint64_t nsec; + bool in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); while (true) { queue = get_queue(impl); @@ -507,17 +512,18 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, * and retry to get a queue */ if (impl->retry_timeout == 0) return -EPIPE; + usleep(impl->retry_timeout); } else { - res = loop_queue_invoke(queue, func, seq, data, size, block, user_data); + res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread); if (SPA_LIKELY(res != -ENOSPC)) break; } /* the queue was full or no more queues, retry and use another queue */ nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", - queue, suppressed); + impl, suppressed); } } return res; @@ -1220,6 +1226,8 @@ impl_init(const struct spa_handle_factory *factory, SPA_VERSION_LOOP_UTILS, &impl_loop_utils, impl); + impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; + impl->rate_limit.burst = 1; impl->retry_timeout = DEFAULT_RETRY; if (info) { if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL &&