loop: support recursive loop flush

Always append the item to the ringbuffer, even if we are invoking from
the thread itself. This ensure all items are always invoked in the
right order.

If we invoke from the thread, flush all items of the ringbuffer and
return.

Make sure to set the callback to NULL before invoking so that recursive
invoke doesn't call it again.

When while flushing the items we get a recursive invoke, detect this
with a counter and return immediately.
This commit is contained in:
Wim Taymans 2022-12-07 21:18:16 +01:00
parent 40f58f43fb
commit 8ecfcbf884

View file

@ -89,7 +89,7 @@ struct impl {
uint8_t *buffer_data;
uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN];
unsigned int flushing:1;
uint32_t flush_count;
unsigned int polling:1;
};
@ -166,27 +166,35 @@ static int loop_remove_source(void *object, struct spa_source *source)
static void flush_items(struct impl *impl)
{
uint32_t index;
int32_t avail;
uint32_t index, flush_count;
int res;
impl->flushing = true;
avail = spa_ringbuffer_get_read_index(&impl->buffer, &index);
while (avail > 0) {
flush_count = ++impl->flush_count;
while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) {
struct invoke_item *item;
bool block;
spa_invoke_func_t func;
item = SPA_PTROFF(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item);
block = item->block;
func = item->func;
spa_log_trace_fp(impl->log, "%p: flush item %p", impl, item);
item->res = item->func ? item->func(&impl->loop,
true, item->seq, item->data, item->size,
item->user_data) : 0;
/* first we remove the function from the item so that recursive
* calls don't call the callback again. We can't update the
* read index before we call the function because then the item
* might get overwritten. */
item->func = NULL;
if (func)
item->res = func(&impl->loop, true, item->seq, item->data,
item->size, item->user_data);
index += item->item_size;
avail -= item->item_size;
spa_ringbuffer_read_update(&impl->buffer, index);
/* if this function did a recursive invoke, it now flushed the
* ringbuffer and we can exit */
if (flush_count != impl->flush_count)
break;
spa_ringbuffer_read_update(&impl->buffer, index + item->item_size);
if (block) {
if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0)
@ -194,21 +202,6 @@ static void flush_items(struct impl *impl)
impl, impl->ack_fd, spa_strerror(res));
}
}
impl->flushing = false;
}
static int
loop_invoke_inthread(struct impl *impl,
spa_invoke_func_t func,
uint32_t seq,
const void *data,
size_t size,
bool block,
void *user_data)
{
if (!impl->flushing)
flush_items(impl);
return func ? func(&impl->loop, true, seq, data, size, user_data) : 0;
}
static int
@ -226,9 +219,6 @@ loop_invoke(void *object,
int32_t filled;
uint32_t avail, idx, offset, l0;
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()))
return loop_invoke_inthread(impl, func, seq, data, size, block, user_data);
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > DATAS_SIZE) {
spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled);
@ -251,6 +241,7 @@ loop_invoke(void *object,
item->size = size;
item->block = block;
item->user_data = user_data;
item->res = 0;
item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN);
spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", impl, item, filled);
@ -279,6 +270,11 @@ loop_invoke(void *object,
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) {
flush_items(impl);
return item->res;
}
loop_signal_event(impl, impl->wakeup);
if (block) {