loop: add overflow queues again

Add the overflow queues again. We can easily iterate atomically over the
overflow queues and flush them.

Overflowing a queue is quite common when heavy swapping is done and
should never cause a lockup, so allocate new queues as we need them. We
can share the eventfd with the main queue to avoid wastings fds.

The limit on the number of queues is then only for when concurrent
threads want to invoke things, so 128 is plenty enough.
This commit is contained in:
Wim Taymans 2024-11-14 17:31:23 +01:00
parent 687075f2bd
commit 8c59fae42d

View file

@ -110,12 +110,13 @@ struct queue {
uint16_t next; uint16_t next;
int ack_fd; int ack_fd;
bool close_fd;
struct queue *overflow;
struct spa_ringbuffer buffer; struct spa_ringbuffer buffer;
uint8_t *buffer_data; uint8_t *buffer_data;
uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN];
bool overflowed;
}; };
struct source_impl { struct source_impl {
@ -201,15 +202,18 @@ static void loop_queue_destroy(void *data)
struct queue *queue = data; struct queue *queue = data;
struct impl *impl = queue->impl; struct impl *impl = queue->impl;
if (queue->ack_fd != -1) if (queue->close_fd)
spa_system_close(impl->system, queue->ack_fd); spa_system_close(impl->system, queue->ack_fd);
if (queue->overflow)
loop_queue_destroy(queue->overflow);
spa_log_info(impl->log, "%p destroyed queue %p idx:%d", impl, queue, queue->idx); spa_log_info(impl->log, "%p destroyed queue %p idx:%d", impl, queue, queue->idx);
free(queue); free(queue);
} }
static struct queue *loop_create_queue(void *object) static struct queue *loop_create_queue(void *object, bool with_fd)
{ {
struct impl *impl = object; struct impl *impl = object;
struct queue *queue; struct queue *queue;
@ -222,11 +226,11 @@ static struct queue *loop_create_queue(void *object)
queue->idx = IDX_INVALID; queue->idx = IDX_INVALID;
queue->next = IDX_INVALID; queue->next = IDX_INVALID;
queue->impl = impl; queue->impl = impl;
queue->ack_fd = -1;
queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t);
spa_ringbuffer_init(&queue->buffer); spa_ringbuffer_init(&queue->buffer);
if (with_fd) {
if ((res = spa_system_eventfd_create(impl->system, if ((res = spa_system_eventfd_create(impl->system,
SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) {
spa_log_error(impl->log, "%p: can't create ack event: %s", spa_log_error(impl->log, "%p: can't create ack event: %s",
@ -234,6 +238,7 @@ static struct queue *loop_create_queue(void *object)
goto error; goto error;
} }
queue->ack_fd = res; queue->ack_fd = res;
queue->close_fd = true;
while (true) { while (true) {
uint16_t idx = SPA_ATOMIC_LOAD(impl->n_queues); uint16_t idx = SPA_ATOMIC_LOAD(impl->n_queues);
@ -250,7 +255,7 @@ static struct queue *loop_create_queue(void *object)
break; break;
} }
} }
}
spa_log_info(impl->log, "%p created queue %p idx:%d %p", impl, queue, queue->idx, spa_log_info(impl->log, "%p created queue %p idx:%d %p", impl, queue, queue->idx,
(void*)pthread_self()); (void*)pthread_self());
@ -326,20 +331,16 @@ static void flush_all_queues(struct impl *impl)
uint32_t cindex, index; uint32_t cindex, index;
spa_invoke_func_t func; spa_invoke_func_t func;
bool block; bool block;
uint32_t i, n_queues, old_queues; uint32_t i, n_queues;
n_queues = SPA_ATOMIC_LOAD(impl->n_queues); n_queues = SPA_ATOMIC_LOAD(impl->n_queues);
do {
old_queues = n_queues;
for (i = 0; i < n_queues; i++) { for (i = 0; i < n_queues; i++) {
cqueue = impl->queues[i]; /* loop over all queues and overflow queues */
for (cqueue = impl->queues[i]; cqueue != NULL;
cqueue = SPA_ATOMIC_LOAD(cqueue->overflow)) {
if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) <
(int32_t)sizeof(struct invoke_item)) { (int32_t)sizeof(struct invoke_item))
if (SPA_ATOMIC_LOAD(cqueue->overflowed) &&
SPA_ATOMIC_CAS(cqueue->overflowed, true, false))
put_queue(impl, cqueue);
continue; continue;
}
citem = SPA_PTROFF(cqueue->buffer_data, citem = SPA_PTROFF(cqueue->buffer_data,
cindex & (DATAS_SIZE - 1), struct invoke_item); cindex & (DATAS_SIZE - 1), struct invoke_item);
@ -350,10 +351,7 @@ static void flush_all_queues(struct impl *impl)
index = cindex; index = cindex;
} }
} }
n_queues = SPA_ATOMIC_LOAD(impl->n_queues);
} }
while (n_queues != old_queues);
if (item == NULL) if (item == NULL)
break; break;
@ -395,13 +393,14 @@ loop_queue_invoke(void *object,
void *user_data, void *user_data,
bool in_thread) bool in_thread)
{ {
struct queue *queue = object; struct queue *queue = object, *orig = queue, *overflow;
struct impl *impl = queue->impl; struct impl *impl = queue->impl;
struct invoke_item *item; struct invoke_item *item;
int res; int res;
int32_t filled; int32_t filled;
uint32_t avail, idx, offset, l0; uint32_t avail, idx, offset, l0;
again:
filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx);
spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun");
avail = (uint32_t)(DATAS_SIZE - filled); avail = (uint32_t)(DATAS_SIZE - filled);
@ -448,7 +447,7 @@ loop_queue_invoke(void *object,
spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size);
if (in_thread) { if (in_thread) {
put_queue(impl, queue); put_queue(impl, orig);
flush_all_queues(impl); flush_all_queues(impl);
res = item->res; res = item->res;
} else { } else {
@ -473,15 +472,23 @@ loop_queue_invoke(void *object,
else else
res = 0; res = 0;
} }
put_queue(impl, queue); put_queue(impl, orig);
} }
return res; return res;
xrun: xrun:
/* we overflowed the queue, we can't push back the queue here /* we overflow, make a new queue that shares the same fd
* because then we will just pop it off again, so set a flag. We will * and place it in the overflow array. We hold the queue so there
* push the queue back for reuse after we flush. */ * is only ever one writer to the overflow field. */
SPA_ATOMIC_STORE(queue->overflowed, true); overflow = queue->overflow;
return -ENOSPC; if (overflow == NULL) {
overflow = loop_create_queue(impl, false);
if (overflow == NULL)
return -errno;
overflow->ack_fd = queue->ack_fd;
SPA_ATOMIC_STORE(queue->overflow, overflow);
}
queue = overflow;
goto again;
} }
static void wakeup_func(void *data, uint64_t count) static void wakeup_func(void *data, uint64_t count)
@ -502,7 +509,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
while (true) { while (true) {
queue = get_queue(impl); queue = get_queue(impl);
if (SPA_UNLIKELY(queue == NULL)) if (SPA_UNLIKELY(queue == NULL))
queue = loop_create_queue(impl); queue = loop_create_queue(impl, true);
if (SPA_UNLIKELY(queue == NULL)) { if (SPA_UNLIKELY(queue == NULL)) {
if (SPA_UNLIKELY(errno != ENOSPC)) if (SPA_UNLIKELY(errno != ENOSPC))
return -errno; return -errno;
@ -513,18 +520,16 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
if (impl->retry_timeout == 0) if (impl->retry_timeout == 0)
return -EPIPE; return -EPIPE;
usleep(impl->retry_timeout);
} else {
res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread);
if (SPA_LIKELY(res != -ENOSPC))
break;
}
/* the queue was full or no more queues, retry and use another queue */
nsec = get_time_ns(impl->system); nsec = get_time_ns(impl->system);
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)",
impl, suppressed); impl, suppressed);
} }
usleep(impl->retry_timeout);
} else {
res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread);
break;
}
} }
return res; return res;
} }