diff --git a/spa/include/spa/support/loop.h b/spa/include/spa/support/loop.h index f0e6df8d4..7dc55f3ae 100644 --- a/spa/include/spa/support/loop.h +++ b/spa/include/spa/support/loop.h @@ -94,8 +94,7 @@ struct spa_loop_methods { struct spa_source *source); /** Invoke a function in the context of this loop. - * May be called from the loop's thread, but otherwise - * can only be called by a single thread at a time. + * May be called from any thread and multiple threads at the same time. * If called from the loop's thread, all callbacks previously queued with * invoke() will be run synchronously, which might cause unexpected * reentrancy problems. diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index ead18328b..b71a56782 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -223,22 +223,6 @@ static void flush_all_queues(struct impl *impl) pthread_mutex_unlock(&impl->queue_lock); } -static int -loop_queue_invoke_inthread(struct queue *queue, - spa_invoke_func_t func, - uint32_t seq, - const void *data, - size_t size, - bool block, - void *user_data) -{ - /* we should probably have a second ringbuffer for the in-thread pending - * callbacks. A recursive callback when flushing will insert itself - * before this one. */ - flush_all_queues(queue->impl); - return func ? func(&queue->impl->loop, true, seq, data, size, user_data) : 0; -} - static int loop_queue_invoke(void *object, spa_invoke_func_t func, @@ -256,12 +240,9 @@ loop_queue_invoke(void *object, uint32_t avail, idx, offset, l0; size_t need; uint64_t nsec; + bool in_thread; - /* the ringbuffer can only be written to from one thread, if we are - * in the same thread as the loop, don't write into the ringbuffer - * but try to emit the calback right away after flushing what we have */ - if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) - return loop_queue_invoke_inthread(queue, func, seq, data, size, block, user_data); + in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); retry: filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); @@ -281,7 +262,7 @@ retry: item->func = func; item->seq = seq; item->size = size; - item->block = block; + item->block = in_thread ? false : block; item->user_data = user_data; item->res = 0; item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); @@ -311,26 +292,31 @@ retry: spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); - loop_signal_event(impl, impl->wakeup); - - if (block) { - uint64_t count = 1; - - spa_loop_control_hook_before(&impl->hooks_list); - - if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) - spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s", - queue, queue->ack_fd, spa_strerror(res)); - - spa_loop_control_hook_after(&impl->hooks_list); - + if (in_thread) { + flush_all_queues(impl); res = item->res; - } - else { - if (seq != SPA_ID_INVALID) - res = SPA_RESULT_RETURN_ASYNC(seq); - else - res = 0; + } else { + loop_signal_event(impl, impl->wakeup); + + if (block) { + uint64_t count = 1; + + spa_loop_control_hook_before(&impl->hooks_list); + + if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) + spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s", + queue, queue->ack_fd, spa_strerror(res)); + + spa_loop_control_hook_after(&impl->hooks_list); + + res = item->res; + } + else { + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC(seq); + else + res = 0; + } } return res;