mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-31 22:25:38 -04:00
loop: avoid corruption of ringbuffer
The ringbuffer can't be written to from multiple threads. When both the main loop and data thread do _invoke, they both write to the ringbuffer and cause it to be corrupted because the ringbuffer is not multi-writer safe. Doing invoke from the thread itself is usually done to flush things out so we really only need to flush the ringbuffer and call the callback. See #1451
This commit is contained in:
parent
d1905716e4
commit
78f52a7073
1 changed files with 20 additions and 9 deletions
|
|
@ -154,6 +154,20 @@ static void flush_items(struct impl *impl)
|
||||||
impl->flushing = false;
|
impl->flushing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
loop_invoke_inthread(struct impl *impl,
|
||||||
|
spa_invoke_func_t func,
|
||||||
|
uint32_t seq,
|
||||||
|
const void *data,
|
||||||
|
size_t size,
|
||||||
|
bool block,
|
||||||
|
void *user_data)
|
||||||
|
{
|
||||||
|
if (!impl->flushing)
|
||||||
|
flush_items(impl);
|
||||||
|
return func ? func(&impl->loop, true, seq, data, size, user_data) : 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
loop_invoke(void *object,
|
loop_invoke(void *object,
|
||||||
spa_invoke_func_t func,
|
spa_invoke_func_t func,
|
||||||
|
|
@ -164,12 +178,14 @@ loop_invoke(void *object,
|
||||||
void *user_data)
|
void *user_data)
|
||||||
{
|
{
|
||||||
struct impl *impl = object;
|
struct impl *impl = object;
|
||||||
bool in_thread = pthread_equal(impl->thread, pthread_self());
|
|
||||||
struct invoke_item *item;
|
struct invoke_item *item;
|
||||||
int res;
|
int res;
|
||||||
int32_t filled;
|
int32_t filled;
|
||||||
uint32_t avail, idx, offset, l0;
|
uint32_t avail, idx, offset, l0;
|
||||||
|
|
||||||
|
if (pthread_equal(impl->thread, pthread_self()))
|
||||||
|
return loop_invoke_inthread(impl, func, seq, data, size, block, user_data);
|
||||||
|
|
||||||
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
|
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
|
||||||
if (filled < 0 || filled > DATAS_SIZE) {
|
if (filled < 0 || filled > DATAS_SIZE) {
|
||||||
spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled);
|
spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled);
|
||||||
|
|
@ -190,7 +206,7 @@ loop_invoke(void *object,
|
||||||
item->func = func;
|
item->func = func;
|
||||||
item->seq = seq;
|
item->seq = seq;
|
||||||
item->size = size;
|
item->size = size;
|
||||||
item->block = block && !in_thread;
|
item->block = block;
|
||||||
item->user_data = user_data;
|
item->user_data = user_data;
|
||||||
item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, 8);
|
item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, 8);
|
||||||
|
|
||||||
|
|
@ -220,14 +236,9 @@ loop_invoke(void *object,
|
||||||
|
|
||||||
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
|
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
|
||||||
|
|
||||||
if (in_thread) {
|
loop_signal_event(impl, impl->wakeup);
|
||||||
if (!impl->flushing)
|
|
||||||
flush_items(impl);
|
|
||||||
} else {
|
|
||||||
loop_signal_event(impl, impl->wakeup);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (block && !in_thread) {
|
if (block) {
|
||||||
uint64_t count = 1;
|
uint64_t count = 1;
|
||||||
|
|
||||||
spa_loop_control_hook_before(&impl->hooks_list);
|
spa_loop_control_hook_before(&impl->hooks_list);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue