From 47f0f1f9a3af00be541c23ee08fd8458c495014c Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 16 Aug 2019 15:04:27 +0200 Subject: [PATCH] loop: flush items in invoke from thread When we do invoke from the loop thread, first process the pending invoke items before handling ours. This can be used to sync the invoke queue before cleanup. --- spa/plugins/support/loop.c | 56 ++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index d17411f91..25637ea82 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -120,6 +120,32 @@ static int loop_remove_source(void *object, struct spa_source *source) return spa_system_pollfd_del(impl->system, impl->poll_fd, source->fd); } +static void flush_items(struct impl *impl, bool async) +{ + uint32_t index; + int res; + + while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { + struct invoke_item *item; + bool block; + + item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); + block = item->block; + + item->res = item->func ? item->func(&impl->loop, + true, item->seq, item->data, item->size, + item->user_data) : 0; + + spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); + + if (block && async) { + if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) + spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s", + impl, spa_strerror(res)); + } + } +} + static int loop_invoke(void *object, spa_invoke_func_t func, @@ -135,7 +161,8 @@ loop_invoke(void *object, int res; if (in_thread) { - res = func(&impl->loop, false, seq, data, size, user_data); + flush_items(impl, false); + res = func ? func(&impl->loop, false, seq, data, size, user_data) : 0; } else { int32_t filled; uint32_t avail, idx, offset, l0; @@ -161,6 +188,8 @@ loop_invoke(void *object, item->block = block; item->user_data = user_data; + spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled); + if (l0 > sizeof(struct invoke_item) + size) { item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); item->item_size = sizeof(struct invoke_item) + size; @@ -202,28 +231,7 @@ loop_invoke(void *object, static void wakeup_func(void *data, uint64_t count) { struct impl *impl = data; - uint32_t index; - int res; - - while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { - struct invoke_item *item; - bool block; - - item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); - block = item->block; - - item->res = item->func(&impl->loop, - true, item->seq, item->data, item->size, - item->user_data); - - spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); - - if (block) { - if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) - spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s", - impl, spa_strerror(res)); - } - } + flush_items(impl, true); } static int loop_get_fd(void *object) @@ -246,11 +254,13 @@ static void loop_enter(void *object) { struct impl *impl = object; impl->thread = pthread_self(); + spa_log_trace(impl->log, NAME" %p: enter %lu", impl, impl->thread); } static void loop_leave(void *object) { struct impl *impl = object; + spa_log_trace(impl->log, NAME" %p: leave %lu", impl, impl->thread); impl->thread = 0; }