loop: improve in_thread handling of invoke queue

Because we now have a dedicated queue per thread, we can simply add our
invoke item to the queue and then flush all the queues when we are
running in the thread of the loop.

This simplifies some things and removes potential out-of-order messages
that got queued while flushing.
This commit is contained in:
Wim Taymans 2024-04-29 15:56:00 +02:00
parent de0db48f17
commit 8ff40e6252
2 changed files with 28 additions and 43 deletions

View file

@ -223,22 +223,6 @@ static void flush_all_queues(struct impl *impl)
pthread_mutex_unlock(&impl->queue_lock);
}
static int
loop_queue_invoke_inthread(struct queue *queue,
spa_invoke_func_t func,
uint32_t seq,
const void *data,
size_t size,
bool block,
void *user_data)
{
/* we should probably have a second ringbuffer for the in-thread pending
* callbacks. A recursive callback when flushing will insert itself
* before this one. */
flush_all_queues(queue->impl);
return func ? func(&queue->impl->loop, true, seq, data, size, user_data) : 0;
}
static int
loop_queue_invoke(void *object,
spa_invoke_func_t func,
@ -256,12 +240,9 @@ loop_queue_invoke(void *object,
uint32_t avail, idx, offset, l0;
size_t need;
uint64_t nsec;
bool in_thread;
/* the ringbuffer can only be written to from one thread, if we are
* in the same thread as the loop, don't write into the ringbuffer
* but try to emit the calback right away after flushing what we have */
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()))
return loop_queue_invoke_inthread(queue, func, seq, data, size, block, user_data);
in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()));
retry:
filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx);
@ -281,7 +262,7 @@ retry:
item->func = func;
item->seq = seq;
item->size = size;
item->block = block;
item->block = in_thread ? false : block;
item->user_data = user_data;
item->res = 0;
item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN);
@ -311,26 +292,31 @@ retry:
spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size);
loop_signal_event(impl, impl->wakeup);
if (block) {
uint64_t count = 1;
spa_loop_control_hook_before(&impl->hooks_list);
if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0)
spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s",
queue, queue->ack_fd, spa_strerror(res));
spa_loop_control_hook_after(&impl->hooks_list);
if (in_thread) {
flush_all_queues(impl);
res = item->res;
}
else {
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC(seq);
else
res = 0;
} else {
loop_signal_event(impl, impl->wakeup);
if (block) {
uint64_t count = 1;
spa_loop_control_hook_before(&impl->hooks_list);
if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0)
spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s",
queue, queue->ack_fd, spa_strerror(res));
spa_loop_control_hook_after(&impl->hooks_list);
res = item->res;
}
else {
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC(seq);
else
res = 0;
}
}
return res;