loop: handle queue overflow better

When a queue overflows we place the queue back in the stack and try
again. Because it's at the top of the stack we take exactly the same
queue and keep on looping forever if the other thread is blocked for
some reason.

Instead, mark the queue as overflowed and only place it back in the
stack when we have flushed it.

This avoids a deadlock when the main-thread invokes on the data loop
and blocks and when the data loop invokes on the main-thread and
overflows the queue.
This commit is contained in:
Wim Taymans 2024-11-14 15:58:09 +01:00
parent bb2d848bf6
commit 6cf320e387

View file

@ -83,6 +83,7 @@ struct impl {
struct spa_list destroy_list; struct spa_list destroy_list;
struct spa_hook_list hooks_list; struct spa_hook_list hooks_list;
struct spa_ratelimit rate_limit;
int retry_timeout; int retry_timeout;
union tag head; union tag head;
@ -109,13 +110,13 @@ struct queue {
uint16_t next; uint16_t next;
int ack_fd; int ack_fd;
struct spa_ratelimit rate_limit;
struct spa_ringbuffer buffer; struct spa_ringbuffer buffer;
uint8_t *buffer_data; uint8_t *buffer_data;
uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN];
};
bool overflowed;
};
struct source_impl { struct source_impl {
struct spa_source source; struct spa_source source;
@ -223,9 +224,6 @@ static struct queue *loop_create_queue(void *object)
queue->impl = impl; queue->impl = impl;
queue->ack_fd = -1; queue->ack_fd = -1;
queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
queue->rate_limit.burst = 1;
queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t);
spa_ringbuffer_init(&queue->buffer); spa_ringbuffer_init(&queue->buffer);
@ -336,10 +334,15 @@ static void flush_all_queues(struct impl *impl)
for (i = 0; i < n_queues; i++) { for (i = 0; i < n_queues; i++) {
cqueue = impl->queues[i]; cqueue = impl->queues[i];
if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) <
(int32_t)sizeof(struct invoke_item)) (int32_t)sizeof(struct invoke_item)) {
if (SPA_ATOMIC_LOAD(cqueue->overflowed) &&
SPA_ATOMIC_CAS(cqueue->overflowed, true, false))
put_queue(impl, cqueue);
continue; continue;
}
citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); citem = SPA_PTROFF(cqueue->buffer_data,
cindex & (DATAS_SIZE - 1), struct invoke_item);
if (item == NULL || item_compare(citem, item) < 0) { if (item == NULL || item_compare(citem, item) < 0) {
item = citem; item = citem;
@ -389,7 +392,8 @@ loop_queue_invoke(void *object,
const void *data, const void *data,
size_t size, size_t size,
bool block, bool block,
void *user_data) void *user_data,
bool in_thread)
{ {
struct queue *queue = object; struct queue *queue = object;
struct impl *impl = queue->impl; struct impl *impl = queue->impl;
@ -397,9 +401,6 @@ loop_queue_invoke(void *object,
int res; int res;
int32_t filled; int32_t filled;
uint32_t avail, idx, offset, l0; uint32_t avail, idx, offset, l0;
bool in_thread;
in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()));
filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx);
spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun");
@ -476,7 +477,10 @@ loop_queue_invoke(void *object,
} }
return res; return res;
xrun: xrun:
put_queue(impl, queue); /* we overflowed the queue, we can't push back the queue here
* because then we will just pop it off again, so set a flag. We will
* push the queue back for reuse after we flush. */
SPA_ATOMIC_STORE(queue->overflowed, true);
return -ENOSPC; return -ENOSPC;
} }
@ -493,6 +497,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
struct queue *queue; struct queue *queue;
int res = 0, suppressed; int res = 0, suppressed;
uint64_t nsec; uint64_t nsec;
bool in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()));
while (true) { while (true) {
queue = get_queue(impl); queue = get_queue(impl);
@ -507,17 +512,18 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
* and retry to get a queue */ * and retry to get a queue */
if (impl->retry_timeout == 0) if (impl->retry_timeout == 0)
return -EPIPE; return -EPIPE;
usleep(impl->retry_timeout); usleep(impl->retry_timeout);
} else { } else {
res = loop_queue_invoke(queue, func, seq, data, size, block, user_data); res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread);
if (SPA_LIKELY(res != -ENOSPC)) if (SPA_LIKELY(res != -ENOSPC))
break; break;
} }
/* the queue was full or no more queues, retry and use another queue */ /* the queue was full or no more queues, retry and use another queue */
nsec = get_time_ns(impl->system); nsec = get_time_ns(impl->system);
if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)",
queue, suppressed); impl, suppressed);
} }
} }
return res; return res;
@ -1220,6 +1226,8 @@ impl_init(const struct spa_handle_factory *factory,
SPA_VERSION_LOOP_UTILS, SPA_VERSION_LOOP_UTILS,
&impl_loop_utils, impl); &impl_loop_utils, impl);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
impl->retry_timeout = DEFAULT_RETRY; impl->retry_timeout = DEFAULT_RETRY;
if (info) { if (info) {
if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL && if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL &&