From 8c1a69f1b57a378454b908fd34a23c672077dfd1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 6 Aug 2024 12:05:11 +0200 Subject: [PATCH] loop: don't usleep when queue is full When the queue is full, before this patch we used to go into usleep in the hope that the other thread will run and empty the queue and that we can retry after the usleep. This however does not always work because the other thread might be waiting for the thread that does the invoke call and we lock forever. Therefore we should always try to make progress in some way. Instead of waiting, allocate an (or use the previously allocated) overflow queue and write to that one. We can chain multiple overflow queues together as many as we need (but we might want to bound that as well). The loop.retry-timeout property is now deprecated. See #4114 --- spa/plugins/support/loop.c | 126 ++++++++++++++++++++----------------- src/pipewire/keys.h | 5 +- 2 files changed, 70 insertions(+), 61 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index ca2ecba5e..457895a3d 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -35,7 +35,6 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.loop"); #define ITEM_ALIGN 8 #define DATAS_SIZE (4096*8) #define MAX_EP 32 -#define DEFAULT_RETRY (1 * SPA_USEC_PER_SEC) /** \cond */ @@ -66,7 +65,6 @@ struct impl { struct spa_list destroy_list; struct spa_list queue_list; struct spa_hook_list hooks_list; - int retry_timeout; int poll_fd; pthread_t thread; @@ -86,6 +84,11 @@ struct queue { struct impl *impl; struct spa_list link; +#define QUEUE_FLAG_NONE (0) +#define QUEUE_FLAG_ACK_FD (1<<0) + uint32_t flags; + struct queue *overflow; + int ack_fd; struct spa_ratelimit rate_limit; @@ -172,6 +175,52 @@ static int loop_remove_source(void *object, struct spa_source *source) return res; } +static struct queue *loop_create_queue(void *object, uint32_t flags) +{ + struct impl *impl = object; + struct queue *queue; + int res; + + queue = calloc(1, sizeof(struct queue)); + if (queue == NULL) + return NULL; + + queue->impl = impl; + queue->flags = flags; + + queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; + queue->rate_limit.burst = 1; + + queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); + spa_ringbuffer_init(&queue->buffer); + + if (flags & QUEUE_FLAG_ACK_FD) { + if ((res = spa_system_eventfd_create(impl->system, + SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { + spa_log_error(impl->log, "%p: can't create ack event: %s", + impl, spa_strerror(res)); + goto error; + } + queue->ack_fd = res; + } else { + queue->ack_fd = -1; + } + + pthread_mutex_lock(&impl->queue_lock); + spa_list_append(&impl->queue_list, &queue->link); + pthread_mutex_unlock(&impl->queue_lock); + + spa_log_info(impl->log, "%p created queue %p", impl, queue); + + return queue; + +error: + free(queue); + errno = -res; + return NULL; +} + + static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { return (int32_t)(a->count - b->count); @@ -185,7 +234,7 @@ static void flush_all_queues(struct impl *impl) pthread_mutex_lock(&impl->queue_lock); flush_count = ++impl->flush_count; while (true) { - struct queue *cqueue, *queue; + struct queue *cqueue, *queue = NULL; struct invoke_item *citem, *item = NULL; uint32_t cindex, index; spa_invoke_func_t func; @@ -228,7 +277,7 @@ static void flush_all_queues(struct impl *impl) block = item->block; spa_ringbuffer_read_update(&queue->buffer, index); - if (block) { + if (block && queue->ack_fd != -1) { if ((res = spa_system_eventfd_write(impl->system, queue->ack_fd, 1)) < 0) spa_log_warn(impl->log, "%p: failed to write event fd:%d: %s", queue, queue->ack_fd, spa_strerror(res)); @@ -313,7 +362,7 @@ retry: } else { loop_signal_event(impl, impl->wakeup); - if (block) { + if (block && queue->ack_fd != -1) { uint64_t count = 1; if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) @@ -332,15 +381,18 @@ retry: return res; xrun: - nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { - spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", - queue, avail, need, suppressed); + if (queue->overflow == NULL) { + nsec = get_time_ns(impl->system); + if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { + spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", + queue, avail, need, suppressed); + } + queue->overflow = loop_create_queue(impl, QUEUE_FLAG_NONE); + if (queue->overflow == NULL) + return -errno; + queue->overflow->ack_fd = queue->ack_fd; } - loop_signal_event(impl, impl->wakeup); - if (impl->retry_timeout == 0) - return -EPIPE; - usleep(impl->retry_timeout); + queue = queue->overflow; goto retry; } @@ -359,50 +411,11 @@ static void loop_queue_destroy(void *data) spa_list_remove(&queue->link); pthread_mutex_unlock(&impl->queue_lock); - spa_system_close(impl->system, queue->ack_fd); + if (queue->flags & QUEUE_FLAG_ACK_FD) + spa_system_close(impl->system, queue->ack_fd); free(queue); } -static struct queue *loop_create_queue(void *object, uint32_t flags) -{ - struct impl *impl = object; - struct queue *queue; - int res; - - queue = calloc(1, sizeof(struct queue)); - if (queue == NULL) - return NULL; - - queue->impl = impl; - - queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; - queue->rate_limit.burst = 1; - - queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); - spa_ringbuffer_init(&queue->buffer); - - if ((res = spa_system_eventfd_create(impl->system, - SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { - spa_log_error(impl->log, "%p: can't create ack event: %s", - impl, spa_strerror(res)); - goto error; - } - queue->ack_fd = res; - - pthread_mutex_lock(&impl->queue_lock); - spa_list_append(&impl->queue_list, &queue->link); - pthread_mutex_unlock(&impl->queue_lock); - - spa_log_info(impl->log, "%p created queue %p", impl, queue); - - return queue; - -error: - free(queue); - errno = -res; - return NULL; -} - static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, const void *data, size_t size, bool block, void *user_data) { @@ -410,7 +423,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, struct queue *local_queue = tss_get(impl->queue_tss_id); if (local_queue == NULL) { - local_queue = loop_create_queue(impl, 0); + local_queue = loop_create_queue(impl, QUEUE_FLAG_ACK_FD); if (local_queue == NULL) return -errno; tss_set(impl->queue_tss_id, local_queue); @@ -1116,13 +1129,10 @@ impl_init(const struct spa_handle_factory *factory, SPA_VERSION_LOOP_UTILS, &impl_loop_utils, impl); - impl->retry_timeout = DEFAULT_RETRY; if (info) { if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL && spa_atob(str)) impl->control.iface.cb.funcs = &impl_loop_control_cancel; - if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL) - impl->retry_timeout = atoi(str); } CHECK(pthread_mutexattr_init(&attr), error_exit); diff --git a/src/pipewire/keys.h b/src/pipewire/keys.h index 9ee5de433..2a2ed6035 100644 --- a/src/pipewire/keys.h +++ b/src/pipewire/keys.h @@ -74,9 +74,6 @@ extern "C" { #define PW_KEY_LOOP_CLASS "loop.class" /**< the classes this loop handles, array of strings */ #define PW_KEY_LOOP_RT_PRIO "loop.rt-prio" /**< realtime priority of the loop */ #define PW_KEY_LOOP_CANCEL "loop.cancel" /**< if the loop can be canceled */ -#define PW_KEY_LOOP_RETRY_TIMEOUT "loop.retry-timeout" /**< when the loop invoke queue is full, the timeout - * in microseconds before retrying. - * default = 1 second, 0 = disable */ /* context */ #define PW_KEY_CONTEXT_PROFILE_MODULES "context.profile.modules" /**< a context profile for modules, deprecated */ @@ -368,9 +365,11 @@ extern "C" { # ifdef PW_ENABLE_DEPRECATED # define PW_KEY_PRIORITY_MASTER "priority.master" /**< deprecated, use priority.driver */ # define PW_KEY_NODE_TARGET "node.target" /**< deprecated since 0.3.64, use target.object. */ +# define PW_KEY_LOOP_RETRY_TIMEOUT "loop.retry-timeout" /**< deprecated since 1.3.0 */ # else # define PW_KEY_PRIORITY_MASTER PW_DEPRECATED("priority.master") # define PW_KEY_NODE_TARGET PW_DEPRECATED("node.target") +# define PW_KEY_LOOP_RETRY_TIMEOUT PW_DEPRECATED("loop.retry-timeout") # endif /* PW_ENABLE_DEPRECATED */ #endif /* PW_REMOVE_DEPRECATED */