diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 6fb3a19c9..adb57b129 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -32,6 +32,7 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.loop"); #define ITEM_ALIGN 8 #define DATAS_SIZE (4096*8) #define MAX_EP 32 +#define DEFAULT_RETRY (1 * SPA_USEC_PER_SEC) /** \cond */ @@ -60,6 +61,7 @@ struct impl { struct spa_list source_list; struct spa_list destroy_list; struct spa_hook_list hooks_list; + int retry_timeout; int poll_fd; pthread_t thread; @@ -229,6 +231,7 @@ loop_invoke(void *object, int res; int32_t filled; uint32_t avail, idx, offset, l0; + size_t need; /* the ringbuffer can only be written to from one thread, if we are * in the same thread as the loop, don't write into the ringbuffer @@ -236,15 +239,13 @@ loop_invoke(void *object, if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) return loop_invoke_inthread(impl, func, seq, data, size, block, user_data); +retry: filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); - if (filled < 0 || filled > DATAS_SIZE) { - spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled); - return -EPIPE; - } - avail = DATAS_SIZE - filled; + spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); + avail = (uint32_t)(DATAS_SIZE - filled); if (avail < sizeof(struct invoke_item)) { - spa_log_warn(impl->log, "%p: queue full %d", impl, avail); - return -EPIPE; + need = sizeof(struct invoke_item); + goto xrun; } offset = idx & (DATAS_SIZE - 1); @@ -278,13 +279,8 @@ loop_invoke(void *object, item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN); } if (avail < item->item_size) { - int suppressed; - uint64_t nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { - spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", - impl, avail, item->item_size, suppressed); - } - return -EPIPE; + need = item->item_size; + goto xrun; } if (data && size > 0) memcpy(item->data, data, size); @@ -313,6 +309,18 @@ loop_invoke(void *object, res = 0; } return res; + +xrun: + int suppressed; + uint64_t nsec = get_time_ns(impl->system); + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { + spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", + impl, avail, need, suppressed); + } + if (impl->retry_timeout == 0) + return -EPIPE; + usleep(impl->retry_timeout); + goto retry; } static void wakeup_func(void *data, uint64_t count) @@ -1004,10 +1012,13 @@ impl_init(const struct spa_handle_factory *factory, SPA_VERSION_LOOP_UTILS, &impl_loop_utils, impl); + impl->retry_timeout = DEFAULT_RETRY; if (info) { if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL && spa_atob(str)) impl->control.iface.cb.funcs = &impl_loop_control_cancel; + if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL) + impl->retry_timeout = atoi(str); } impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);