diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 3ddbb1cc4..a098b9349 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -110,12 +110,13 @@ struct queue { uint16_t next; int ack_fd; + bool close_fd; + + struct queue *overflow; struct spa_ringbuffer buffer; uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; - - bool overflowed; }; struct source_impl { @@ -201,15 +202,18 @@ static void loop_queue_destroy(void *data) struct queue *queue = data; struct impl *impl = queue->impl; - if (queue->ack_fd != -1) + if (queue->close_fd) spa_system_close(impl->system, queue->ack_fd); + if (queue->overflow) + loop_queue_destroy(queue->overflow); + spa_log_info(impl->log, "%p destroyed queue %p idx:%d", impl, queue, queue->idx); free(queue); } -static struct queue *loop_create_queue(void *object) +static struct queue *loop_create_queue(void *object, bool with_fd) { struct impl *impl = object; struct queue *queue; @@ -222,35 +226,36 @@ static struct queue *loop_create_queue(void *object) queue->idx = IDX_INVALID; queue->next = IDX_INVALID; queue->impl = impl; - queue->ack_fd = -1; queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); spa_ringbuffer_init(&queue->buffer); - if ((res = spa_system_eventfd_create(impl->system, - SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { - spa_log_error(impl->log, "%p: can't create ack event: %s", - impl, spa_strerror(res)); - goto error; - } - queue->ack_fd = res; - - while (true) { - uint16_t idx = SPA_ATOMIC_LOAD(impl->n_queues); - if (idx >= QUEUES_MAX) { - /* this is pretty bad, there are QUEUES_MAX concurrent threads - * that are doing an invoke */ - spa_log_error(impl->log, "max queues %d exceeded!", idx); - res = -ENOSPC; + if (with_fd) { + if ((res = spa_system_eventfd_create(impl->system, + SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { + spa_log_error(impl->log, "%p: can't create ack event: %s", + impl, spa_strerror(res)); goto error; } - queue->idx = idx; - if (SPA_ATOMIC_CAS(impl->queues[queue->idx], NULL, queue)) { - SPA_ATOMIC_INC(impl->n_queues); - break; + queue->ack_fd = res; + queue->close_fd = true; + + while (true) { + uint16_t idx = SPA_ATOMIC_LOAD(impl->n_queues); + if (idx >= QUEUES_MAX) { + /* this is pretty bad, there are QUEUES_MAX concurrent threads + * that are doing an invoke */ + spa_log_error(impl->log, "max queues %d exceeded!", idx); + res = -ENOSPC; + goto error; + } + queue->idx = idx; + if (SPA_ATOMIC_CAS(impl->queues[queue->idx], NULL, queue)) { + SPA_ATOMIC_INC(impl->n_queues); + break; + } } } - spa_log_info(impl->log, "%p created queue %p idx:%d %p", impl, queue, queue->idx, (void*)pthread_self()); @@ -326,20 +331,16 @@ static void flush_all_queues(struct impl *impl) uint32_t cindex, index; spa_invoke_func_t func; bool block; - uint32_t i, n_queues, old_queues; + uint32_t i, n_queues; n_queues = SPA_ATOMIC_LOAD(impl->n_queues); - do { - old_queues = n_queues; - for (i = 0; i < n_queues; i++) { - cqueue = impl->queues[i]; + for (i = 0; i < n_queues; i++) { + /* loop over all queues and overflow queues */ + for (cqueue = impl->queues[i]; cqueue != NULL; + cqueue = SPA_ATOMIC_LOAD(cqueue->overflow)) { if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < - (int32_t)sizeof(struct invoke_item)) { - if (SPA_ATOMIC_LOAD(cqueue->overflowed) && - SPA_ATOMIC_CAS(cqueue->overflowed, true, false)) - put_queue(impl, cqueue); + (int32_t)sizeof(struct invoke_item)) continue; - } citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); @@ -350,10 +351,7 @@ static void flush_all_queues(struct impl *impl) index = cindex; } } - n_queues = SPA_ATOMIC_LOAD(impl->n_queues); } - while (n_queues != old_queues); - if (item == NULL) break; @@ -395,13 +393,14 @@ loop_queue_invoke(void *object, void *user_data, bool in_thread) { - struct queue *queue = object; + struct queue *queue = object, *orig = queue, *overflow; struct impl *impl = queue->impl; struct invoke_item *item; int res; int32_t filled; uint32_t avail, idx, offset, l0; +again: filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); avail = (uint32_t)(DATAS_SIZE - filled); @@ -448,7 +447,7 @@ loop_queue_invoke(void *object, spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); if (in_thread) { - put_queue(impl, queue); + put_queue(impl, orig); flush_all_queues(impl); res = item->res; } else { @@ -473,15 +472,23 @@ loop_queue_invoke(void *object, else res = 0; } - put_queue(impl, queue); + put_queue(impl, orig); } return res; xrun: - /* we overflowed the queue, we can't push back the queue here - * because then we will just pop it off again, so set a flag. We will - * push the queue back for reuse after we flush. */ - SPA_ATOMIC_STORE(queue->overflowed, true); - return -ENOSPC; + /* we overflow, make a new queue that shares the same fd + * and place it in the overflow array. We hold the queue so there + * is only ever one writer to the overflow field. */ + overflow = queue->overflow; + if (overflow == NULL) { + overflow = loop_create_queue(impl, false); + if (overflow == NULL) + return -errno; + overflow->ack_fd = queue->ack_fd; + SPA_ATOMIC_STORE(queue->overflow, overflow); + } + queue = overflow; + goto again; } static void wakeup_func(void *data, uint64_t count) @@ -502,7 +509,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, while (true) { queue = get_queue(impl); if (SPA_UNLIKELY(queue == NULL)) - queue = loop_create_queue(impl); + queue = loop_create_queue(impl, true); if (SPA_UNLIKELY(queue == NULL)) { if (SPA_UNLIKELY(errno != ENOSPC)) return -errno; @@ -513,17 +520,15 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, if (impl->retry_timeout == 0) return -EPIPE; + nsec = get_time_ns(impl->system); + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { + spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", + impl, suppressed); + } usleep(impl->retry_timeout); } else { res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread); - if (SPA_LIKELY(res != -ENOSPC)) - break; - } - /* the queue was full or no more queues, retry and use another queue */ - nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { - spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", - impl, suppressed); + break; } } return res;