loop: don't write from multiple threads

We can only write from one thread to the ringbuffer so bypass the
ringbuffer when doing in-thread invoke. Only flush the current
items so that out-of-thread items don't get inserted.
This commit is contained in:
Wim Taymans 2022-12-08 08:01:40 +01:00
parent 8ecfcbf884
commit ddf6e7ae91

View file

@ -167,10 +167,12 @@ static int loop_remove_source(void *object, struct spa_source *source)
static void flush_items(struct impl *impl)
{
uint32_t index, flush_count;
int32_t avail;
int res;
flush_count = ++impl->flush_count;
while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) {
avail = spa_ringbuffer_get_read_index(&impl->buffer, &index);
while (avail > 0) {
struct invoke_item *item;
bool block;
spa_invoke_func_t func;
@ -194,7 +196,9 @@ static void flush_items(struct impl *impl)
if (flush_count != impl->flush_count)
break;
spa_ringbuffer_read_update(&impl->buffer, index + item->item_size);
index += item->item_size;
avail -= item->item_size;
spa_ringbuffer_read_update(&impl->buffer, index);
if (block) {
if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0)
@ -204,6 +208,22 @@ static void flush_items(struct impl *impl)
}
}
static int
loop_invoke_inthread(struct impl *impl,
spa_invoke_func_t func,
uint32_t seq,
const void *data,
size_t size,
bool block,
void *user_data)
{
/* we should probably have a second ringbuffer for the in-thread pending
* callbacks. A recursive callback when flushing will insert itself
* before this one. */
flush_items(impl);
return func ? func(&impl->loop, true, seq, data, size, user_data) : 0;
}
static int
loop_invoke(void *object,
spa_invoke_func_t func,
@ -219,6 +239,12 @@ loop_invoke(void *object,
int32_t filled;
uint32_t avail, idx, offset, l0;
/* the ringbuffer can only be written to from one thread, if we are
* in the same thread as the loop, don't write into the ringbuffer
* but try to emit the calback right away after flushing what we have */
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()))
return loop_invoke_inthread(impl, func, seq, data, size, block, user_data);
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > DATAS_SIZE) {
spa_log_warn(impl->log, "%p: queue xrun %d", impl, filled);
@ -270,11 +296,6 @@ loop_invoke(void *object,
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) {
flush_items(impl);
return item->res;
}
loop_signal_event(impl, impl->wakeup);
if (block) {