loop: sleep and retry when the invoke queue is full

When the invoke ringbuffer is full, sleep a little and try again.
Add an option to set the retty timeout, setting this to 0 restores
the old behaviour of returning -EPIPE.

Most callers don't check the return values and might assume the invoke
call is queued or executed, which could cause crashes or leaks.

When the queue overruns, it's better to log a warning and hope that the
problem is resolved soon. We might abort or return the error to the
caller later if we want to break the retry loop.

See !1887
This commit is contained in:
Wim Taymans 2024-02-05 19:40:05 +01:00
parent 83050e647b
commit e7e6742200

View file

@ -32,6 +32,7 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.loop");
#define ITEM_ALIGN 8
#define DATAS_SIZE (4096*8)
#define MAX_EP 32
#define DEFAULT_RETRY (1 * SPA_USEC_PER_SEC)
/** \cond */
@ -60,6 +61,7 @@ struct impl {
struct spa_list source_list;
struct spa_list destroy_list;
struct spa_hook_list hooks_list;
int retry_timeout;
int poll_fd;
pthread_t thread;
@ -229,6 +231,7 @@ loop_invoke(void *object,
int res;
int32_t filled;
uint32_t avail, idx, offset, l0;
size_t need;
/* the ringbuffer can only be written to from one thread, if we are
* in the same thread as the loop, don't write into the ringbuffer
@ -236,15 +239,13 @@ loop_invoke(void *object,
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()))
return loop_invoke_inthread(impl, func, seq, data, size, block, user_data);
retry:
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > DATAS_SIZE) {
spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled);
return -EPIPE;
}
avail = DATAS_SIZE - filled;
spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun");
avail = (uint32_t)(DATAS_SIZE - filled);
if (avail < sizeof(struct invoke_item)) {
spa_log_warn(impl->log, "%p: queue full %d", impl, avail);
return -EPIPE;
need = sizeof(struct invoke_item);
goto xrun;
}
offset = idx & (DATAS_SIZE - 1);
@ -278,13 +279,8 @@ loop_invoke(void *object,
item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN);
}
if (avail < item->item_size) {
int suppressed;
uint64_t nsec = get_time_ns(impl->system);
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)",
impl, avail, item->item_size, suppressed);
}
return -EPIPE;
need = item->item_size;
goto xrun;
}
if (data && size > 0)
memcpy(item->data, data, size);
@ -313,6 +309,18 @@ loop_invoke(void *object,
res = 0;
}
return res;
xrun:
int suppressed;
uint64_t nsec = get_time_ns(impl->system);
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)",
impl, avail, need, suppressed);
}
if (impl->retry_timeout == 0)
return -EPIPE;
usleep(impl->retry_timeout);
goto retry;
}
static void wakeup_func(void *data, uint64_t count)
@ -1004,10 +1012,13 @@ impl_init(const struct spa_handle_factory *factory,
SPA_VERSION_LOOP_UTILS,
&impl_loop_utils, impl);
impl->retry_timeout = DEFAULT_RETRY;
if (info) {
if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL &&
spa_atob(str))
impl->control.iface.cb.funcs = &impl_loop_control_cancel;
if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL)
impl->retry_timeout = atoi(str);
}
impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);