loop: flush items in invoke from thread

When we do invoke from the loop thread, first process the pending
invoke items before handling ours. This can be used to sync the
invoke queue before cleanup.
This commit is contained in:
Wim Taymans 2019-08-16 15:04:27 +02:00
parent 954c96632c
commit 47f0f1f9a3

View file

@ -120,6 +120,32 @@ static int loop_remove_source(void *object, struct spa_source *source)
return spa_system_pollfd_del(impl->system, impl->poll_fd, source->fd); return spa_system_pollfd_del(impl->system, impl->poll_fd, source->fd);
} }
static void flush_items(struct impl *impl, bool async)
{
uint32_t index;
int res;
while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) {
struct invoke_item *item;
bool block;
item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item);
block = item->block;
item->res = item->func ? item->func(&impl->loop,
true, item->seq, item->data, item->size,
item->user_data) : 0;
spa_ringbuffer_read_update(&impl->buffer, index + item->item_size);
if (block && async) {
if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0)
spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s",
impl, spa_strerror(res));
}
}
}
static int static int
loop_invoke(void *object, loop_invoke(void *object,
spa_invoke_func_t func, spa_invoke_func_t func,
@ -135,7 +161,8 @@ loop_invoke(void *object,
int res; int res;
if (in_thread) { if (in_thread) {
res = func(&impl->loop, false, seq, data, size, user_data); flush_items(impl, false);
res = func ? func(&impl->loop, false, seq, data, size, user_data) : 0;
} else { } else {
int32_t filled; int32_t filled;
uint32_t avail, idx, offset, l0; uint32_t avail, idx, offset, l0;
@ -161,6 +188,8 @@ loop_invoke(void *object,
item->block = block; item->block = block;
item->user_data = user_data; item->user_data = user_data;
spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled);
if (l0 > sizeof(struct invoke_item) + size) { if (l0 > sizeof(struct invoke_item) + size) {
item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void);
item->item_size = sizeof(struct invoke_item) + size; item->item_size = sizeof(struct invoke_item) + size;
@ -202,28 +231,7 @@ loop_invoke(void *object,
static void wakeup_func(void *data, uint64_t count) static void wakeup_func(void *data, uint64_t count)
{ {
struct impl *impl = data; struct impl *impl = data;
uint32_t index; flush_items(impl, true);
int res;
while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) {
struct invoke_item *item;
bool block;
item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item);
block = item->block;
item->res = item->func(&impl->loop,
true, item->seq, item->data, item->size,
item->user_data);
spa_ringbuffer_read_update(&impl->buffer, index + item->item_size);
if (block) {
if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0)
spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s",
impl, spa_strerror(res));
}
}
} }
static int loop_get_fd(void *object) static int loop_get_fd(void *object)
@ -246,11 +254,13 @@ static void loop_enter(void *object)
{ {
struct impl *impl = object; struct impl *impl = object;
impl->thread = pthread_self(); impl->thread = pthread_self();
spa_log_trace(impl->log, NAME" %p: enter %lu", impl, impl->thread);
} }
static void loop_leave(void *object) static void loop_leave(void *object)
{ {
struct impl *impl = object; struct impl *impl = object;
spa_log_trace(impl->log, NAME" %p: leave %lu", impl, impl->thread);
impl->thread = 0; impl->thread = 0;
} }