diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index beb3756c2..2717ed6a6 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -136,6 +136,7 @@ static void flush_items(struct impl *impl) item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); block = item->block; + spa_log_trace(impl->log, NAME " %p: flush item %p", impl, item); item->res = item->func ? item->func(&impl->loop, true, item->seq, item->data, item->size, item->user_data) : 0; @@ -164,71 +165,70 @@ loop_invoke(void *object, bool in_thread = pthread_equal(impl->thread, pthread_self()); struct invoke_item *item; int res; + int32_t filled; + uint32_t avail, idx, offset, l0; - if (in_thread && !impl->flushing) { - flush_items(impl); - res = func ? func(&impl->loop, false, seq, data, size, user_data) : 0; + filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); + if (filled < 0 || filled > DATAS_SIZE) { + spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled); + return -EPIPE; + } + avail = DATAS_SIZE - filled; + if (avail < sizeof(struct invoke_item)) { + spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail); + return -EPIPE; + } + offset = idx & (DATAS_SIZE - 1); + + l0 = DATAS_SIZE - offset; + + item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item); + item->func = func; + item->seq = seq; + item->size = size; + item->block = block; + item->user_data = user_data; + + spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled); + + if (l0 > sizeof(struct invoke_item) + size) { + item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); + item->item_size = sizeof(struct invoke_item) + size; + if (l0 < sizeof(struct invoke_item) + item->item_size) + item->item_size = l0; } else { - int32_t filled; - uint32_t avail, idx, offset, l0; + item->data = impl->buffer_data; + item->item_size = l0 + size; + } + memcpy(item->data, data, size); - filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); - if (filled < 0 || filled > DATAS_SIZE) { - spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled); - return -EPIPE; - } - avail = DATAS_SIZE - filled; - if (avail < sizeof(struct invoke_item)) { - spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail); - return -EPIPE; - } - offset = idx & (DATAS_SIZE - 1); - - l0 = DATAS_SIZE - offset; - - item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item); - item->func = func; - item->seq = seq; - item->size = size; - item->block = block; - item->user_data = user_data; - - spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled); - - if (l0 > sizeof(struct invoke_item) + size) { - item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); - item->item_size = sizeof(struct invoke_item) + size; - if (l0 < sizeof(struct invoke_item) + item->item_size) - item->item_size = l0; - } else { - item->data = impl->buffer_data; - item->item_size = l0 + size; - } - memcpy(item->data, data, size); - - spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + if (in_thread) { + if (!impl->flushing) + flush_items(impl); + } else { loop_signal_event(impl, impl->wakeup); + } - if (block) { - uint64_t count = 1; + if (block) { + uint64_t count = 1; - spa_loop_control_hook_before(&impl->hooks_list); + spa_loop_control_hook_before(&impl->hooks_list); - if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0) - spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s", - impl, spa_strerror(res)); + if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0) + spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s", + impl, spa_strerror(res)); - spa_loop_control_hook_after(&impl->hooks_list); + spa_loop_control_hook_after(&impl->hooks_list); - res = item->res; - } - else { - if (seq != SPA_ID_INVALID) - res = SPA_RESULT_RETURN_ASYNC(seq); - else - res = 0; - } + res = item->res; + } + else { + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC(seq); + else + res = 0; } return res; }