diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index d17411f91..25637ea82 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -120,6 +120,32 @@ static int loop_remove_source(void *object, struct spa_source *source) return spa_system_pollfd_del(impl->system, impl->poll_fd, source->fd); } +static void flush_items(struct impl *impl, bool async) +{ + uint32_t index; + int res; + + while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { + struct invoke_item *item; + bool block; + + item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); + block = item->block; + + item->res = item->func ? item->func(&impl->loop, + true, item->seq, item->data, item->size, + item->user_data) : 0; + + spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); + + if (block && async) { + if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) + spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s", + impl, spa_strerror(res)); + } + } +} + static int loop_invoke(void *object, spa_invoke_func_t func, @@ -135,7 +161,8 @@ loop_invoke(void *object, int res; if (in_thread) { - res = func(&impl->loop, false, seq, data, size, user_data); + flush_items(impl, false); + res = func ? func(&impl->loop, false, seq, data, size, user_data) : 0; } else { int32_t filled; uint32_t avail, idx, offset, l0; @@ -161,6 +188,8 @@ loop_invoke(void *object, item->block = block; item->user_data = user_data; + spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled); + if (l0 > sizeof(struct invoke_item) + size) { item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); item->item_size = sizeof(struct invoke_item) + size; @@ -202,28 +231,7 @@ loop_invoke(void *object, static void wakeup_func(void *data, uint64_t count) { struct impl *impl = data; - uint32_t index; - int res; - - while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { - struct invoke_item *item; - bool block; - - item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); - block = item->block; - - item->res = item->func(&impl->loop, - true, item->seq, item->data, item->size, - item->user_data); - - spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); - - if (block) { - if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) - spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s", - impl, spa_strerror(res)); - } - } + flush_items(impl, true); } static int loop_get_fd(void *object) @@ -246,11 +254,13 @@ static void loop_enter(void *object) { struct impl *impl = object; impl->thread = pthread_self(); + spa_log_trace(impl->log, NAME" %p: enter %lu", impl, impl->thread); } static void loop_leave(void *object) { struct impl *impl = object; + spa_log_trace(impl->log, NAME" %p: leave %lu", impl, impl->thread); impl->thread = 0; }