diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index ca2ecba5e..457895a3d 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -35,7 +35,6 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.loop"); #define ITEM_ALIGN 8 #define DATAS_SIZE (4096*8) #define MAX_EP 32 -#define DEFAULT_RETRY (1 * SPA_USEC_PER_SEC) /** \cond */ @@ -66,7 +65,6 @@ struct impl { struct spa_list destroy_list; struct spa_list queue_list; struct spa_hook_list hooks_list; - int retry_timeout; int poll_fd; pthread_t thread; @@ -86,6 +84,11 @@ struct queue { struct impl *impl; struct spa_list link; +#define QUEUE_FLAG_NONE (0) +#define QUEUE_FLAG_ACK_FD (1<<0) + uint32_t flags; + struct queue *overflow; + int ack_fd; struct spa_ratelimit rate_limit; @@ -172,6 +175,52 @@ static int loop_remove_source(void *object, struct spa_source *source) return res; } +static struct queue *loop_create_queue(void *object, uint32_t flags) +{ + struct impl *impl = object; + struct queue *queue; + int res; + + queue = calloc(1, sizeof(struct queue)); + if (queue == NULL) + return NULL; + + queue->impl = impl; + queue->flags = flags; + + queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; + queue->rate_limit.burst = 1; + + queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); + spa_ringbuffer_init(&queue->buffer); + + if (flags & QUEUE_FLAG_ACK_FD) { + if ((res = spa_system_eventfd_create(impl->system, + SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { + spa_log_error(impl->log, "%p: can't create ack event: %s", + impl, spa_strerror(res)); + goto error; + } + queue->ack_fd = res; + } else { + queue->ack_fd = -1; + } + + pthread_mutex_lock(&impl->queue_lock); + spa_list_append(&impl->queue_list, &queue->link); + pthread_mutex_unlock(&impl->queue_lock); + + spa_log_info(impl->log, "%p created queue %p", impl, queue); + + return queue; + +error: + free(queue); + errno = -res; + return NULL; +} + + static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { return (int32_t)(a->count - b->count); @@ -185,7 +234,7 @@ static void flush_all_queues(struct impl *impl) pthread_mutex_lock(&impl->queue_lock); flush_count = ++impl->flush_count; while (true) { - struct queue *cqueue, *queue; + struct queue *cqueue, *queue = NULL; struct invoke_item *citem, *item = NULL; uint32_t cindex, index; spa_invoke_func_t func; @@ -228,7 +277,7 @@ static void flush_all_queues(struct impl *impl) block = item->block; spa_ringbuffer_read_update(&queue->buffer, index); - if (block) { + if (block && queue->ack_fd != -1) { if ((res = spa_system_eventfd_write(impl->system, queue->ack_fd, 1)) < 0) spa_log_warn(impl->log, "%p: failed to write event fd:%d: %s", queue, queue->ack_fd, spa_strerror(res)); @@ -313,7 +362,7 @@ retry: } else { loop_signal_event(impl, impl->wakeup); - if (block) { + if (block && queue->ack_fd != -1) { uint64_t count = 1; if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) @@ -332,15 +381,18 @@ retry: return res; xrun: - nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { - spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", - queue, avail, need, suppressed); + if (queue->overflow == NULL) { + nsec = get_time_ns(impl->system); + if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { + spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", + queue, avail, need, suppressed); + } + queue->overflow = loop_create_queue(impl, QUEUE_FLAG_NONE); + if (queue->overflow == NULL) + return -errno; + queue->overflow->ack_fd = queue->ack_fd; } - loop_signal_event(impl, impl->wakeup); - if (impl->retry_timeout == 0) - return -EPIPE; - usleep(impl->retry_timeout); + queue = queue->overflow; goto retry; } @@ -359,50 +411,11 @@ static void loop_queue_destroy(void *data) spa_list_remove(&queue->link); pthread_mutex_unlock(&impl->queue_lock); - spa_system_close(impl->system, queue->ack_fd); + if (queue->flags & QUEUE_FLAG_ACK_FD) + spa_system_close(impl->system, queue->ack_fd); free(queue); } -static struct queue *loop_create_queue(void *object, uint32_t flags) -{ - struct impl *impl = object; - struct queue *queue; - int res; - - queue = calloc(1, sizeof(struct queue)); - if (queue == NULL) - return NULL; - - queue->impl = impl; - - queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; - queue->rate_limit.burst = 1; - - queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); - spa_ringbuffer_init(&queue->buffer); - - if ((res = spa_system_eventfd_create(impl->system, - SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { - spa_log_error(impl->log, "%p: can't create ack event: %s", - impl, spa_strerror(res)); - goto error; - } - queue->ack_fd = res; - - pthread_mutex_lock(&impl->queue_lock); - spa_list_append(&impl->queue_list, &queue->link); - pthread_mutex_unlock(&impl->queue_lock); - - spa_log_info(impl->log, "%p created queue %p", impl, queue); - - return queue; - -error: - free(queue); - errno = -res; - return NULL; -} - static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, const void *data, size_t size, bool block, void *user_data) { @@ -410,7 +423,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, struct queue *local_queue = tss_get(impl->queue_tss_id); if (local_queue == NULL) { - local_queue = loop_create_queue(impl, 0); + local_queue = loop_create_queue(impl, QUEUE_FLAG_ACK_FD); if (local_queue == NULL) return -errno; tss_set(impl->queue_tss_id, local_queue); @@ -1116,13 +1129,10 @@ impl_init(const struct spa_handle_factory *factory, SPA_VERSION_LOOP_UTILS, &impl_loop_utils, impl); - impl->retry_timeout = DEFAULT_RETRY; if (info) { if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL && spa_atob(str)) impl->control.iface.cb.funcs = &impl_loop_control_cancel; - if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL) - impl->retry_timeout = atoi(str); } CHECK(pthread_mutexattr_init(&attr), error_exit); diff --git a/src/pipewire/keys.h b/src/pipewire/keys.h index 9ee5de433..2a2ed6035 100644 --- a/src/pipewire/keys.h +++ b/src/pipewire/keys.h @@ -74,9 +74,6 @@ extern "C" { #define PW_KEY_LOOP_CLASS "loop.class" /**< the classes this loop handles, array of strings */ #define PW_KEY_LOOP_RT_PRIO "loop.rt-prio" /**< realtime priority of the loop */ #define PW_KEY_LOOP_CANCEL "loop.cancel" /**< if the loop can be canceled */ -#define PW_KEY_LOOP_RETRY_TIMEOUT "loop.retry-timeout" /**< when the loop invoke queue is full, the timeout - * in microseconds before retrying. - * default = 1 second, 0 = disable */ /* context */ #define PW_KEY_CONTEXT_PROFILE_MODULES "context.profile.modules" /**< a context profile for modules, deprecated */ @@ -368,9 +365,11 @@ extern "C" { # ifdef PW_ENABLE_DEPRECATED # define PW_KEY_PRIORITY_MASTER "priority.master" /**< deprecated, use priority.driver */ # define PW_KEY_NODE_TARGET "node.target" /**< deprecated since 0.3.64, use target.object. */ +# define PW_KEY_LOOP_RETRY_TIMEOUT "loop.retry-timeout" /**< deprecated since 1.3.0 */ # else # define PW_KEY_PRIORITY_MASTER PW_DEPRECATED("priority.master") # define PW_KEY_NODE_TARGET PW_DEPRECATED("node.target") +# define PW_KEY_LOOP_RETRY_TIMEOUT PW_DEPRECATED("loop.retry-timeout") # endif /* PW_ENABLE_DEPRECATED */ #endif /* PW_REMOVE_DEPRECATED */