diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 99cd9d170..3c3e020ee 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -167,10 +167,12 @@ static int loop_remove_source(void *object, struct spa_source *source) static void flush_items(struct impl *impl) { uint32_t index, flush_count; + int32_t avail; int res; flush_count = ++impl->flush_count; - while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { + avail = spa_ringbuffer_get_read_index(&impl->buffer, &index); + while (avail > 0) { struct invoke_item *item; bool block; spa_invoke_func_t func; @@ -194,7 +196,9 @@ static void flush_items(struct impl *impl) if (flush_count != impl->flush_count) break; - spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); + index += item->item_size; + avail -= item->item_size; + spa_ringbuffer_read_update(&impl->buffer, index); if (block) { if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) @@ -204,6 +208,22 @@ static void flush_items(struct impl *impl) } } +static int +loop_invoke_inthread(struct impl *impl, + spa_invoke_func_t func, + uint32_t seq, + const void *data, + size_t size, + bool block, + void *user_data) +{ + /* we should probably have a second ringbuffer for the in-thread pending + * callbacks. A recursive callback when flushing will insert itself + * before this one. */ + flush_items(impl); + return func ? func(&impl->loop, true, seq, data, size, user_data) : 0; +} + static int loop_invoke(void *object, spa_invoke_func_t func, @@ -219,6 +239,12 @@ loop_invoke(void *object, int32_t filled; uint32_t avail, idx, offset, l0; + /* the ringbuffer can only be written to from one thread, if we are + * in the same thread as the loop, don't write into the ringbuffer + * but try to emit the calback right away after flushing what we have */ + if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) + return loop_invoke_inthread(impl, func, seq, data, size, block, user_data); + filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); if (filled < 0 || filled > DATAS_SIZE) { spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled); @@ -270,11 +296,6 @@ loop_invoke(void *object, spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); - if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) { - flush_items(impl); - return item->res; - } - loop_signal_event(impl, impl->wakeup); if (block) {