From 0d9cc9e36e4570f45f2ea5d763ed7fd7c04b8cda Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 16 Nov 2020 15:16:20 +0100 Subject: [PATCH] loop: always place the invoke item in the queue Always place the invoke item in the queue and then either signal the other thread or flush the queue when not already flushing. --- spa/plugins/support/loop.c | 110 ++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index beb3756c2..2717ed6a6 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -136,6 +136,7 @@ static void flush_items(struct impl *impl) item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); block = item->block; + spa_log_trace(impl->log, NAME " %p: flush item %p", impl, item); item->res = item->func ? item->func(&impl->loop, true, item->seq, item->data, item->size, item->user_data) : 0; @@ -164,71 +165,70 @@ loop_invoke(void *object, bool in_thread = pthread_equal(impl->thread, pthread_self()); struct invoke_item *item; int res; + int32_t filled; + uint32_t avail, idx, offset, l0; - if (in_thread && !impl->flushing) { - flush_items(impl); - res = func ? func(&impl->loop, false, seq, data, size, user_data) : 0; + filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); + if (filled < 0 || filled > DATAS_SIZE) { + spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled); + return -EPIPE; + } + avail = DATAS_SIZE - filled; + if (avail < sizeof(struct invoke_item)) { + spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail); + return -EPIPE; + } + offset = idx & (DATAS_SIZE - 1); + + l0 = DATAS_SIZE - offset; + + item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item); + item->func = func; + item->seq = seq; + item->size = size; + item->block = block; + item->user_data = user_data; + + spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled); + + if (l0 > sizeof(struct invoke_item) + size) { + item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); + item->item_size = sizeof(struct invoke_item) + size; + if (l0 < sizeof(struct invoke_item) + item->item_size) + item->item_size = l0; } else { - int32_t filled; - uint32_t avail, idx, offset, l0; + item->data = impl->buffer_data; + item->item_size = l0 + size; + } + memcpy(item->data, data, size); - filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); - if (filled < 0 || filled > DATAS_SIZE) { - spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled); - return -EPIPE; - } - avail = DATAS_SIZE - filled; - if (avail < sizeof(struct invoke_item)) { - spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail); - return -EPIPE; - } - offset = idx & (DATAS_SIZE - 1); - - l0 = DATAS_SIZE - offset; - - item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item); - item->func = func; - item->seq = seq; - item->size = size; - item->block = block; - item->user_data = user_data; - - spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled); - - if (l0 > sizeof(struct invoke_item) + size) { - item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); - item->item_size = sizeof(struct invoke_item) + size; - if (l0 < sizeof(struct invoke_item) + item->item_size) - item->item_size = l0; - } else { - item->data = impl->buffer_data; - item->item_size = l0 + size; - } - memcpy(item->data, data, size); - - spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + if (in_thread) { + if (!impl->flushing) + flush_items(impl); + } else { loop_signal_event(impl, impl->wakeup); + } - if (block) { - uint64_t count = 1; + if (block) { + uint64_t count = 1; - spa_loop_control_hook_before(&impl->hooks_list); + spa_loop_control_hook_before(&impl->hooks_list); - if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0) - spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s", - impl, spa_strerror(res)); + if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0) + spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s", + impl, spa_strerror(res)); - spa_loop_control_hook_after(&impl->hooks_list); + spa_loop_control_hook_after(&impl->hooks_list); - res = item->res; - } - else { - if (seq != SPA_ID_INVALID) - res = SPA_RESULT_RETURN_ASYNC(seq); - else - res = 0; - } + res = item->res; + } + else { + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC(seq); + else + res = 0; } return res; }