diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index d3a62db14..deed3b177 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,12 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.loop"); #define DATAS_SIZE (4096*8) #define MAX_EP 32 +/* the number of concurrent queues for invoke. This is also the number + * of threads that can concurrently invoke. When there are more, the + * retry timeout will be used to retry. */ +#define QUEUES_MAX 128 +#define DEFAULT_RETRY (1 * SPA_USEC_PER_SEC) + /** \cond */ struct invoke_item { @@ -52,6 +59,17 @@ struct invoke_item { static int loop_signal_event(void *object, struct spa_source *source); +struct queue; + +#define IDX_INVALID ((uint16_t)0xffff) +union tag { + struct { + uint16_t idx; + uint16_t count; + } t; + uint32_t v; +}; + struct impl { struct spa_handle handle; struct spa_loop loop; @@ -63,18 +81,21 @@ struct impl { struct spa_list source_list; struct spa_list destroy_list; - struct spa_list queue_list; struct spa_hook_list hooks_list; + int retry_timeout; + + union tag head; + + uint32_t n_queues; + struct queue *queues[QUEUES_MAX]; + int poll_fd; pthread_t thread; int enter_count; struct spa_source *wakeup; - int tss_ref; - tss_t queue_tss_id; - pthread_mutex_t queue_lock; uint32_t count; uint32_t flush_count; @@ -83,25 +104,19 @@ struct impl { struct queue { struct impl *impl; - struct spa_list link; - int ref; - -#define QUEUE_FLAG_NONE (0) -#define QUEUE_FLAG_ACK_FD (1<<0) -#define QUEUE_FLAG_IN_TSS (1<<1) - uint32_t flags; - struct queue *overflow; + uint16_t idx; + uint16_t next; int ack_fd; struct spa_ratelimit rate_limit; - bool destroyed; struct spa_ringbuffer buffer; uint8_t *buffer_data; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; }; + struct source_impl { struct spa_source source; @@ -180,7 +195,20 @@ static int loop_remove_source(void *object, struct spa_source *source) return res; } -static struct queue *loop_create_queue(void *object, uint32_t flags) +static void loop_queue_destroy(void *data) +{ + struct queue *queue = data; + struct impl *impl = queue->impl; + + if (queue->ack_fd != -1) + spa_system_close(impl->system, queue->ack_fd); + + spa_log_info(impl->log, "%p destroyed queue %p idx:%d", impl, queue, queue->idx); + + free(queue); +} + +static struct queue *loop_create_queue(void *object) { struct impl *impl = object; struct queue *queue; @@ -190,9 +218,10 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) if (queue == NULL) return NULL; + queue->idx = IDX_INVALID; + queue->next = IDX_INVALID; queue->impl = impl; - queue->flags = flags; - queue->ref = 1; + queue->ack_fd = -1; queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; queue->rate_limit.burst = 1; @@ -200,52 +229,88 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); spa_ringbuffer_init(&queue->buffer); - if (flags & QUEUE_FLAG_ACK_FD) { - if ((res = spa_system_eventfd_create(impl->system, - SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { - spa_log_error(impl->log, "%p: can't create ack event: %s", - impl, spa_strerror(res)); + if ((res = spa_system_eventfd_create(impl->system, + SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { + spa_log_error(impl->log, "%p: can't create ack event: %s", + impl, spa_strerror(res)); + goto error; + } + queue->ack_fd = res; + + while (true) { + uint16_t idx = SPA_ATOMIC_LOAD(impl->n_queues); + if (idx >= QUEUES_MAX) { + /* this is pretty bad, there are QUEUES_MAX concurrent threads + * that are doing an invoke */ + spa_log_error(impl->log, "max queues %d exceeded!", idx); + res = -ENOSPC; goto error; } - queue->ack_fd = res; - } else { - queue->ack_fd = -1; + queue->idx = idx; + if (SPA_ATOMIC_CAS(impl->queues[queue->idx], NULL, queue)) { + SPA_ATOMIC_INC(impl->n_queues); + break; + } } - if (flags & QUEUE_FLAG_IN_TSS) - queue->ref++; - pthread_mutex_lock(&impl->queue_lock); - spa_list_append(&impl->queue_list, &queue->link); - pthread_mutex_unlock(&impl->queue_lock); - - spa_log_info(impl->log, "%p created queue %p", impl, queue); + spa_log_info(impl->log, "%p created queue %p idx:%d %p", impl, queue, queue->idx, + (void*)pthread_self()); return queue; error: - free(queue); + loop_queue_destroy(queue); errno = -res; return NULL; } -static inline void loop_queue_ref(struct queue *queue) -{ - SPA_ATOMIC_INC(queue->ref); -} -static bool loop_queue_unref(struct queue *queue) +static inline struct queue *get_queue(struct impl *impl) { - bool do_free; - struct impl *impl = queue->impl; + union tag head, next; - do_free = SPA_ATOMIC_DEC(queue->ref) == 0; - if (do_free) { - spa_log_debug(impl->log, "%p: free queue %p", impl, queue); - free(queue); + head.v = SPA_ATOMIC_LOAD(impl->head.v); + + while (true) { + struct queue *queue; + + if (SPA_UNLIKELY(head.t.idx == IDX_INVALID)) + return NULL; + + queue = impl->queues[head.t.idx]; + next.t.idx = queue->next; + next.t.count = head.t.count+1; + + if (SPA_LIKELY(__atomic_compare_exchange_n(&impl->head.v, &head.v, next.v, + 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))) { + spa_log_trace(impl->log, "%p idx:%d %p", queue, queue->idx, (void*)pthread_self()); + return queue; + } } - return do_free; + return NULL; } +static inline void put_queue(struct impl *impl, struct queue *queue) +{ + union tag head, next; + + spa_log_trace(impl->log, "%p idx:%d %p", queue, queue->idx, (void*)pthread_self()); + + head.v = SPA_ATOMIC_LOAD(impl->head.v); + + while (true) { + queue->next = head.t.idx; + + next.t.idx = queue->idx; + next.t.count = head.t.count+1; + + if (SPA_LIKELY(__atomic_compare_exchange_n(&impl->head.v, &head.v, next.v, + 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))) + break; + } +} + + static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { return (int32_t)(a->count - b->count); @@ -256,27 +321,36 @@ static void flush_all_queues(struct impl *impl) uint32_t flush_count; int res; - pthread_mutex_lock(&impl->queue_lock); - flush_count = ++impl->flush_count; + flush_count = SPA_ATOMIC_INC(impl->flush_count); while (true) { struct queue *cqueue, *queue = NULL; struct invoke_item *citem, *item = NULL; uint32_t cindex, index; spa_invoke_func_t func; bool block; + uint32_t i, n_queues, old_queues; - spa_list_for_each(cqueue, &impl->queue_list, link) { - if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < - (int32_t)sizeof(struct invoke_item)) - continue; - citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); + n_queues = SPA_ATOMIC_LOAD(impl->n_queues); + do { + old_queues = n_queues; + for (i = 0; i < n_queues; i++) { + cqueue = impl->queues[i]; + if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < + (int32_t)sizeof(struct invoke_item)) + continue; - if (item == NULL || item_compare(citem, item) < 0) { - item = citem; - queue = cqueue; - index = cindex; + citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); + + if (item == NULL || item_compare(citem, item) < 0) { + item = citem; + queue = cqueue; + index = cindex; + } } + n_queues = SPA_ATOMIC_LOAD(impl->n_queues); } + while (n_queues != old_queues); + if (item == NULL) break; @@ -287,19 +361,13 @@ static void flush_all_queues(struct impl *impl) * might get overwritten. */ func = spa_steal_ptr(item->func); if (func) { - loop_queue_ref(queue); - pthread_mutex_unlock(&impl->queue_lock); - res = func(&impl->loop, true, item->seq, item->data, + item->res = func(&impl->loop, true, item->seq, item->data, item->size, item->user_data); - pthread_mutex_lock(&impl->queue_lock); - if (loop_queue_unref(queue)) - continue; - item->res = res; } /* if this function did a recursive invoke, it now flushed the * ringbuffer and we can exit */ - if (flush_count != impl->flush_count) + if (flush_count != SPA_ATOMIC_LOAD(impl->flush_count)) break; index += item->item_size; @@ -312,7 +380,6 @@ static void flush_all_queues(struct impl *impl) queue, queue->ack_fd, spa_strerror(res)); } } - pthread_mutex_unlock(&impl->queue_lock); } static int @@ -327,23 +394,18 @@ loop_queue_invoke(void *object, struct queue *queue = object; struct impl *impl = queue->impl; struct invoke_item *item; - int res, suppressed; + int res; int32_t filled; uint32_t avail, idx, offset, l0; - size_t need; - uint64_t nsec; bool in_thread; in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); -retry: filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); avail = (uint32_t)(DATAS_SIZE - filled); - if (avail < sizeof(struct invoke_item)) { - need = sizeof(struct invoke_item); + if (avail < sizeof(struct invoke_item)) goto xrun; - } offset = idx & (DATAS_SIZE - 1); /* l0 is remaining size in ringbuffer, this should always be larger than @@ -360,7 +422,7 @@ retry: item->res = 0; item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); - spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", queue, item, filled); + spa_log_trace(impl->log, "%p: add item %p filled:%d block:%d", queue, item, filled, block); if (l0 >= item->item_size) { /* item + size fit in current ringbuffer idx */ @@ -376,16 +438,16 @@ retry: item->data = queue->buffer_data; item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN); } - if (avail < item->item_size) { - need = item->item_size; + if (avail < item->item_size) goto xrun; - } + if (data && size > 0) memcpy(item->data, data, size); spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); if (in_thread) { + put_queue(impl, queue); flush_all_queues(impl); res = item->res; } else { @@ -410,23 +472,12 @@ retry: else res = 0; } + put_queue(impl, queue); } return res; - xrun: - if (queue->overflow == NULL) { - nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { - spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", - queue, avail, need, suppressed); - } - queue->overflow = loop_create_queue(impl, QUEUE_FLAG_NONE); - if (queue->overflow == NULL) - return -errno; - queue->overflow->ack_fd = queue->ack_fd; - } - queue = queue->overflow; - goto retry; + put_queue(impl, queue); + return -ENOSPC; } static void wakeup_func(void *data, uint64_t count) @@ -435,51 +486,41 @@ static void wakeup_func(void *data, uint64_t count) flush_all_queues(impl); } -static void loop_queue_destroy(void *data) -{ - struct queue *queue = data; - struct impl *impl = queue->impl; - - if (SPA_ATOMIC_CAS(queue->destroyed, false, true)) { - pthread_mutex_lock(&impl->queue_lock); - spa_list_remove(&queue->link); - pthread_mutex_unlock(&impl->queue_lock); - - if (queue->overflow) - loop_queue_destroy(queue->overflow); - - if (queue->flags & QUEUE_FLAG_ACK_FD) - spa_system_close(impl->system, queue->ack_fd); - - loop_queue_unref(queue); - } -} - -static void loop_queue_destroy_tss(void *data) -{ - struct queue *queue = data; - if (queue) { - SPA_ATOMIC_DEC(queue->impl->tss_ref); - loop_queue_destroy(queue); - loop_queue_unref(queue); - } -} - static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, const void *data, size_t size, bool block, void *user_data) { struct impl *impl = object; - struct queue *local_queue; + struct queue *queue; + int res = 0, suppressed; + uint64_t nsec; - local_queue = tss_get(impl->queue_tss_id); - if (local_queue == NULL) { - local_queue = loop_create_queue(impl, QUEUE_FLAG_ACK_FD | QUEUE_FLAG_IN_TSS); - if (local_queue == NULL) - return -errno; - SPA_ATOMIC_INC(impl->tss_ref); - tss_set(impl->queue_tss_id, local_queue); + while (true) { + queue = get_queue(impl); + if (SPA_UNLIKELY(queue == NULL)) + queue = loop_create_queue(impl); + if (SPA_UNLIKELY(queue == NULL)) { + if (SPA_UNLIKELY(errno != ENOSPC)) + return -errno; + + /* there was no space for a new queue. This means QUEUE_MAX + * threads are concurrently doing an invoke. We can wait a little + * and retry to get a queue */ + if (impl->retry_timeout == 0) + return -EPIPE; + usleep(impl->retry_timeout); + } else { + res = loop_queue_invoke(queue, func, seq, data, size, block, user_data); + if (SPA_LIKELY(res != -ENOSPC)) + break; + } + /* the queue was full or no more queues, retry and use another queue */ + nsec = get_time_ns(impl->system); + if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { + spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)", + queue, suppressed); + } } - return loop_queue_invoke(local_queue, func, seq, data, size, block, user_data); + return res; } static int loop_get_fd(void *object) @@ -1110,31 +1151,24 @@ static int impl_clear(struct spa_handle *handle) { struct impl *impl; struct source_impl *source; - struct queue *queue; + uint32_t i; spa_return_val_if_fail(handle != NULL, -EINVAL); impl = (struct impl *) handle; + spa_log_debug(impl->log, "%p: clear", impl); + if (impl->enter_count != 0 || impl->polling) spa_log_warn(impl->log, "%p: loop is entered %d times polling:%d", impl, impl->enter_count, impl->polling); spa_list_consume(source, &impl->source_list, link) loop_destroy_source(impl, &source->source); - spa_list_consume(queue, &impl->queue_list, link) - loop_queue_destroy(queue); - - /* free the tss from this thread if any */ - loop_queue_destroy_tss(tss_get(impl->queue_tss_id)); + for (i = 0; i < impl->n_queues; i++) + loop_queue_destroy(impl->queues[i]); spa_system_close(impl->system, impl->poll_fd); - pthread_mutex_destroy(&impl->queue_lock); - - if (SPA_ATOMIC_DEC(impl->tss_ref) != 0) - spa_log_warn(impl->log, "%p: loop still has %d queues in TSS", - impl, impl->tss_ref); - tss_delete(impl->queue_tss_id); return 0; } @@ -1164,7 +1198,6 @@ impl_init(const struct spa_handle_factory *factory, { struct impl *impl; const char *str; - pthread_mutexattr_t attr; int res; spa_return_val_if_fail(factory != NULL, -EINVAL); @@ -1187,16 +1220,15 @@ impl_init(const struct spa_handle_factory *factory, SPA_VERSION_LOOP_UTILS, &impl_loop_utils, impl); + impl->retry_timeout = DEFAULT_RETRY; if (info) { if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL && spa_atob(str)) impl->control.iface.cb.funcs = &impl_loop_control_cancel; + if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL) + impl->retry_timeout = atoi(str); } - CHECK(pthread_mutexattr_init(&attr), error_exit); - CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit); - CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit); - impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); spa_log_topic_init(impl->log, &log_topic); impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); @@ -1204,17 +1236,16 @@ impl_init(const struct spa_handle_factory *factory, if (impl->system == NULL) { spa_log_error(impl->log, "%p: a System is needed", impl); res = -EINVAL; - goto error_exit_free_mutex; + goto error_exit; } if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { spa_log_error(impl->log, "%p: can't create pollfd: %s", impl, spa_strerror(res)); - goto error_exit_free_mutex; + goto error_exit; } impl->poll_fd = res; spa_list_init(&impl->source_list); - spa_list_init(&impl->queue_list); spa_list_init(&impl->destroy_list); spa_hook_list_init(&impl->hooks_list); @@ -1225,23 +1256,14 @@ impl_init(const struct spa_handle_factory *factory, goto error_exit_free_poll; } - if (tss_create(&impl->queue_tss_id, (tss_dtor_t)loop_queue_destroy_tss) != thrd_success) { - res = -errno; - spa_log_error(impl->log, "%p: can't create tss: %m", impl); - goto error_exit_free_wakeup; - } - impl->tss_ref = 1; + impl->head.t.idx = IDX_INVALID; spa_log_debug(impl->log, "%p: initialized", impl); return 0; -error_exit_free_wakeup: - loop_destroy_source(impl, impl->wakeup); error_exit_free_poll: spa_system_close(impl->system, impl->poll_fd); -error_exit_free_mutex: - pthread_mutex_destroy(&impl->queue_lock); error_exit: return res; }