loop: move invoke queue to separate object

Make an internal queue object that implements the invoke queue.

Because we can not do invokes concurrently from different threads, this
is required to make per-thread invoke queues later.
This commit is contained in:
Wim Taymans 2024-04-29 12:01:48 +02:00
parent ac35ecf329
commit c76424da36

View file

@ -60,6 +60,7 @@ struct impl {
struct spa_list source_list; struct spa_list source_list;
struct spa_list destroy_list; struct spa_list destroy_list;
struct spa_list queue_list;
struct spa_hook_list hooks_list; struct spa_hook_list hooks_list;
int retry_timeout; int retry_timeout;
@ -68,6 +69,16 @@ struct impl {
int enter_count; int enter_count;
struct spa_source *wakeup; struct spa_source *wakeup;
struct queue *queue;
unsigned int polling:1;
};
struct queue {
struct impl *impl;
struct spa_list link;
int ack_fd; int ack_fd;
struct spa_ratelimit rate_limit; struct spa_ratelimit rate_limit;
@ -76,7 +87,6 @@ struct impl {
uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN];
uint32_t flush_count; uint32_t flush_count;
unsigned int polling:1;
}; };
struct source_impl { struct source_impl {
@ -157,24 +167,25 @@ static int loop_remove_source(void *object, struct spa_source *source)
return res; return res;
} }
static void flush_items(struct impl *impl) static void queue_flush_items(struct queue *queue)
{ {
struct impl *impl = queue->impl;
uint32_t index, flush_count; uint32_t index, flush_count;
int32_t avail; int32_t avail;
int res; int res;
flush_count = ++impl->flush_count; flush_count = ++queue->flush_count;
avail = spa_ringbuffer_get_read_index(&impl->buffer, &index); avail = spa_ringbuffer_get_read_index(&queue->buffer, &index);
while (avail > 0) { while (avail > 0) {
struct invoke_item *item; struct invoke_item *item;
bool block; bool block;
spa_invoke_func_t func; spa_invoke_func_t func;
item = SPA_PTROFF(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); item = SPA_PTROFF(queue->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item);
block = item->block; block = item->block;
func = item->func; func = item->func;
spa_log_trace_fp(impl->log, "%p: flush item %p", impl, item); spa_log_trace_fp(impl->log, "%p: flush item %p", queue, item);
/* first we remove the function from the item so that recursive /* first we remove the function from the item so that recursive
* calls don't call the callback again. We can't update the * calls don't call the callback again. We can't update the
* read index before we call the function because then the item * read index before we call the function because then the item
@ -186,23 +197,30 @@ static void flush_items(struct impl *impl)
/* if this function did a recursive invoke, it now flushed the /* if this function did a recursive invoke, it now flushed the
* ringbuffer and we can exit */ * ringbuffer and we can exit */
if (flush_count != impl->flush_count) if (flush_count != queue->flush_count)
break; break;
index += item->item_size; index += item->item_size;
avail -= item->item_size; avail -= item->item_size;
spa_ringbuffer_read_update(&impl->buffer, index); spa_ringbuffer_read_update(&queue->buffer, index);
if (block) { if (block) {
if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) if ((res = spa_system_eventfd_write(impl->system, queue->ack_fd, 1)) < 0)
spa_log_warn(impl->log, "%p: failed to write event fd:%d: %s", spa_log_warn(impl->log, "%p: failed to write event fd:%d: %s",
impl, impl->ack_fd, spa_strerror(res)); queue, queue->ack_fd, spa_strerror(res));
} }
} }
} }
static void flush_all_queues(struct impl *impl)
{
struct queue *queue;
spa_list_for_each(queue, &impl->queue_list, link)
queue_flush_items(queue);
}
static int static int
loop_invoke_inthread(struct impl *impl, loop_queue_invoke_inthread(struct queue *queue,
spa_invoke_func_t func, spa_invoke_func_t func,
uint32_t seq, uint32_t seq,
const void *data, const void *data,
@ -213,12 +231,12 @@ loop_invoke_inthread(struct impl *impl,
/* we should probably have a second ringbuffer for the in-thread pending /* we should probably have a second ringbuffer for the in-thread pending
* callbacks. A recursive callback when flushing will insert itself * callbacks. A recursive callback when flushing will insert itself
* before this one. */ * before this one. */
flush_items(impl); flush_all_queues(queue->impl);
return func ? func(&impl->loop, true, seq, data, size, user_data) : 0; return func ? func(&queue->impl->loop, true, seq, data, size, user_data) : 0;
} }
static int static int
loop_invoke(void *object, loop_queue_invoke(void *object,
spa_invoke_func_t func, spa_invoke_func_t func,
uint32_t seq, uint32_t seq,
const void *data, const void *data,
@ -226,7 +244,8 @@ loop_invoke(void *object,
bool block, bool block,
void *user_data) void *user_data)
{ {
struct impl *impl = object; struct queue *queue = object;
struct impl *impl = queue->impl;
struct invoke_item *item; struct invoke_item *item;
int res, suppressed; int res, suppressed;
int32_t filled; int32_t filled;
@ -238,10 +257,10 @@ loop_invoke(void *object,
* in the same thread as the loop, don't write into the ringbuffer * in the same thread as the loop, don't write into the ringbuffer
* but try to emit the calback right away after flushing what we have */ * but try to emit the calback right away after flushing what we have */
if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()))
return loop_invoke_inthread(impl, func, seq, data, size, block, user_data); return loop_queue_invoke_inthread(queue, func, seq, data, size, block, user_data);
retry: retry:
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx);
spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun");
avail = (uint32_t)(DATAS_SIZE - filled); avail = (uint32_t)(DATAS_SIZE - filled);
if (avail < sizeof(struct invoke_item)) { if (avail < sizeof(struct invoke_item)) {
@ -254,7 +273,7 @@ retry:
* invoke_item, see below */ * invoke_item, see below */
l0 = DATAS_SIZE - offset; l0 = DATAS_SIZE - offset;
item = SPA_PTROFF(impl->buffer_data, offset, struct invoke_item); item = SPA_PTROFF(queue->buffer_data, offset, struct invoke_item);
item->func = func; item->func = func;
item->seq = seq; item->seq = seq;
item->size = size; item->size = size;
@ -263,7 +282,7 @@ retry:
item->res = 0; item->res = 0;
item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN);
spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", impl, item, filled); spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", queue, item, filled);
if (l0 >= item->item_size) { if (l0 >= item->item_size) {
/* item + size fit in current ringbuffer idx */ /* item + size fit in current ringbuffer idx */
@ -276,7 +295,7 @@ retry:
} else { } else {
/* item does not fit, place the invoke_item at idx and start the /* item does not fit, place the invoke_item at idx and start the
* data at the start of the ringbuffer */ * data at the start of the ringbuffer */
item->data = impl->buffer_data; item->data = queue->buffer_data;
item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN); item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN);
} }
if (avail < item->item_size) { if (avail < item->item_size) {
@ -286,7 +305,7 @@ retry:
if (data && size > 0) if (data && size > 0)
memcpy(item->data, data, size); memcpy(item->data, data, size);
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size);
loop_signal_event(impl, impl->wakeup); loop_signal_event(impl, impl->wakeup);
@ -295,9 +314,9 @@ retry:
spa_loop_control_hook_before(&impl->hooks_list); spa_loop_control_hook_before(&impl->hooks_list);
if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0) if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0)
spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s", spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s",
impl, impl->ack_fd, spa_strerror(res)); queue, queue->ack_fd, spa_strerror(res));
spa_loop_control_hook_after(&impl->hooks_list); spa_loop_control_hook_after(&impl->hooks_list);
@ -313,9 +332,9 @@ retry:
xrun: xrun:
nsec = get_time_ns(impl->system); nsec = get_time_ns(impl->system);
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)",
impl, avail, need, suppressed); queue, avail, need, suppressed);
} }
if (impl->retry_timeout == 0) if (impl->retry_timeout == 0)
return -EPIPE; return -EPIPE;
@ -326,7 +345,59 @@ xrun:
static void wakeup_func(void *data, uint64_t count) static void wakeup_func(void *data, uint64_t count)
{ {
struct impl *impl = data; struct impl *impl = data;
flush_items(impl); flush_all_queues(impl);
}
static void loop_queue_destroy(void *data)
{
struct queue *queue = data;
struct impl *impl = queue->impl;
spa_list_remove(&queue->link);
spa_system_close(impl->system, queue->ack_fd);
free(queue);
}
static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
const void *data, size_t size, bool block, void *user_data)
{
struct impl *impl = object;
return loop_queue_invoke(impl->queue, func, seq, data, size, block, user_data);
}
static struct queue *loop_create_queue(void *object, uint32_t flags)
{
struct impl *impl = object;
struct queue *queue;
int res;
queue = calloc(1, sizeof(struct queue));
if (queue == NULL)
return NULL;
queue->impl = impl;
queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
queue->rate_limit.burst = 1;
queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t);
spa_ringbuffer_init(&queue->buffer);
if ((res = spa_system_eventfd_create(impl->system,
SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) {
spa_log_error(impl->log, "%p: can't create ack event: %s",
impl, spa_strerror(res));
goto error;
}
queue->ack_fd = res;
spa_list_append(&impl->queue_list, &queue->link);
return queue;
error:
free(queue);
errno = -res;
return NULL;
} }
static int loop_get_fd(void *object) static int loop_get_fd(void *object)
@ -376,7 +447,7 @@ static void loop_leave(void *object)
if (--impl->enter_count == 0) { if (--impl->enter_count == 0) {
impl->thread = 0; impl->thread = 0;
flush_items(impl); flush_all_queues(impl);
impl->polling = false; impl->polling = false;
} }
} }
@ -956,6 +1027,7 @@ static int impl_clear(struct spa_handle *handle)
{ {
struct impl *impl; struct impl *impl;
struct source_impl *source; struct source_impl *source;
struct queue *queue;
spa_return_val_if_fail(handle != NULL, -EINVAL); spa_return_val_if_fail(handle != NULL, -EINVAL);
@ -967,8 +1039,9 @@ static int impl_clear(struct spa_handle *handle)
spa_list_consume(source, &impl->source_list, link) spa_list_consume(source, &impl->source_list, link)
loop_destroy_source(impl, &source->source); loop_destroy_source(impl, &source->source);
spa_list_consume(queue, &impl->queue_list, link)
loop_queue_destroy(queue);
spa_system_close(impl->system, impl->ack_fd);
spa_system_close(impl->system, impl->poll_fd); spa_system_close(impl->system, impl->poll_fd);
return 0; return 0;
@ -1030,8 +1103,6 @@ impl_init(const struct spa_handle_factory *factory,
res = -EINVAL; res = -EINVAL;
goto error_exit; goto error_exit;
} }
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) {
spa_log_error(impl->log, "%p: can't create pollfd: %s", spa_log_error(impl->log, "%p: can't create pollfd: %s",
@ -1041,26 +1112,23 @@ impl_init(const struct spa_handle_factory *factory,
impl->poll_fd = res; impl->poll_fd = res;
spa_list_init(&impl->source_list); spa_list_init(&impl->source_list);
spa_list_init(&impl->queue_list);
spa_list_init(&impl->destroy_list); spa_list_init(&impl->destroy_list);
spa_hook_list_init(&impl->hooks_list); spa_hook_list_init(&impl->hooks_list);
impl->buffer_data = SPA_PTR_ALIGN(impl->buffer_mem, MAX_ALIGN, uint8_t);
spa_ringbuffer_init(&impl->buffer);
impl->wakeup = loop_add_event(impl, wakeup_func, impl); impl->wakeup = loop_add_event(impl, wakeup_func, impl);
if (impl->wakeup == NULL) { if (impl->wakeup == NULL) {
res = -errno; res = -errno;
spa_log_error(impl->log, "%p: can't create wakeup event: %m", impl); spa_log_error(impl->log, "%p: can't create wakeup event: %m", impl);
goto error_exit_free_poll; goto error_exit_free_poll;
} }
if ((res = spa_system_eventfd_create(impl->system,
SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { impl->queue = loop_create_queue(impl, 0);
spa_log_error(impl->log, "%p: can't create ack event: %s", if (impl->queue == NULL) {
impl, spa_strerror(res)); res = -errno;
spa_log_error(impl->log, "%p: can't create queue: %m", impl);
goto error_exit_free_wakeup; goto error_exit_free_wakeup;
} }
impl->ack_fd = res;
spa_log_debug(impl->log, "%p: initialized", impl); spa_log_debug(impl->log, "%p: initialized", impl);
return 0; return 0;