diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 619084bb7..cd6994304 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -60,6 +60,7 @@ struct impl { struct spa_list source_list; struct spa_list destroy_list; + struct spa_list queue_list; struct spa_hook_list hooks_list; int retry_timeout; @@ -68,6 +69,16 @@ struct impl { int enter_count; struct spa_source *wakeup; + + struct queue *queue; + + unsigned int polling:1; +}; + +struct queue { + struct impl *impl; + struct spa_list link; + int ack_fd; struct spa_ratelimit rate_limit; @@ -76,7 +87,6 @@ struct impl { uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; uint32_t flush_count; - unsigned int polling:1; }; struct source_impl { @@ -157,24 +167,25 @@ static int loop_remove_source(void *object, struct spa_source *source) return res; } -static void flush_items(struct impl *impl) +static void queue_flush_items(struct queue *queue) { + struct impl *impl = queue->impl; uint32_t index, flush_count; int32_t avail; int res; - flush_count = ++impl->flush_count; - avail = spa_ringbuffer_get_read_index(&impl->buffer, &index); + flush_count = ++queue->flush_count; + avail = spa_ringbuffer_get_read_index(&queue->buffer, &index); while (avail > 0) { struct invoke_item *item; bool block; spa_invoke_func_t func; - item = SPA_PTROFF(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); + item = SPA_PTROFF(queue->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item); block = item->block; func = item->func; - spa_log_trace_fp(impl->log, "%p: flush item %p", impl, item); + spa_log_trace_fp(impl->log, "%p: flush item %p", queue, item); /* first we remove the function from the item so that recursive * calls don't call the callback again. We can't update the * read index before we call the function because then the item @@ -186,23 +197,30 @@ static void flush_items(struct impl *impl) /* if this function did a recursive invoke, it now flushed the * ringbuffer and we can exit */ - if (flush_count != impl->flush_count) + if (flush_count != queue->flush_count) break; index += item->item_size; avail -= item->item_size; - spa_ringbuffer_read_update(&impl->buffer, index); + spa_ringbuffer_read_update(&queue->buffer, index); if (block) { - if ((res = spa_system_eventfd_write(impl->system, impl->ack_fd, 1)) < 0) + if ((res = spa_system_eventfd_write(impl->system, queue->ack_fd, 1)) < 0) spa_log_warn(impl->log, "%p: failed to write event fd:%d: %s", - impl, impl->ack_fd, spa_strerror(res)); + queue, queue->ack_fd, spa_strerror(res)); } } } +static void flush_all_queues(struct impl *impl) +{ + struct queue *queue; + spa_list_for_each(queue, &impl->queue_list, link) + queue_flush_items(queue); +} + static int -loop_invoke_inthread(struct impl *impl, +loop_queue_invoke_inthread(struct queue *queue, spa_invoke_func_t func, uint32_t seq, const void *data, @@ -213,12 +231,12 @@ loop_invoke_inthread(struct impl *impl, /* we should probably have a second ringbuffer for the in-thread pending * callbacks. A recursive callback when flushing will insert itself * before this one. */ - flush_items(impl); - return func ? func(&impl->loop, true, seq, data, size, user_data) : 0; + flush_all_queues(queue->impl); + return func ? func(&queue->impl->loop, true, seq, data, size, user_data) : 0; } static int -loop_invoke(void *object, +loop_queue_invoke(void *object, spa_invoke_func_t func, uint32_t seq, const void *data, @@ -226,7 +244,8 @@ loop_invoke(void *object, bool block, void *user_data) { - struct impl *impl = object; + struct queue *queue = object; + struct impl *impl = queue->impl; struct invoke_item *item; int res, suppressed; int32_t filled; @@ -238,10 +257,10 @@ loop_invoke(void *object, * in the same thread as the loop, don't write into the ringbuffer * but try to emit the calback right away after flushing what we have */ if (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())) - return loop_invoke_inthread(impl, func, seq, data, size, block, user_data); + return loop_queue_invoke_inthread(queue, func, seq, data, size, block, user_data); retry: - filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); + filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); avail = (uint32_t)(DATAS_SIZE - filled); if (avail < sizeof(struct invoke_item)) { @@ -254,7 +273,7 @@ retry: * invoke_item, see below */ l0 = DATAS_SIZE - offset; - item = SPA_PTROFF(impl->buffer_data, offset, struct invoke_item); + item = SPA_PTROFF(queue->buffer_data, offset, struct invoke_item); item->func = func; item->seq = seq; item->size = size; @@ -263,7 +282,7 @@ retry: item->res = 0; item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); - spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", impl, item, filled); + spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", queue, item, filled); if (l0 >= item->item_size) { /* item + size fit in current ringbuffer idx */ @@ -276,7 +295,7 @@ retry: } else { /* item does not fit, place the invoke_item at idx and start the * data at the start of the ringbuffer */ - item->data = impl->buffer_data; + item->data = queue->buffer_data; item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN); } if (avail < item->item_size) { @@ -286,7 +305,7 @@ retry: if (data && size > 0) memcpy(item->data, data, size); - spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); loop_signal_event(impl, impl->wakeup); @@ -295,9 +314,9 @@ retry: spa_loop_control_hook_before(&impl->hooks_list); - if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0) + if ((res = spa_system_eventfd_read(impl->system, queue->ack_fd, &count)) < 0) spa_log_warn(impl->log, "%p: failed to read event fd:%d: %s", - impl, impl->ack_fd, spa_strerror(res)); + queue, queue->ack_fd, spa_strerror(res)); spa_loop_control_hook_after(&impl->hooks_list); @@ -313,9 +332,9 @@ retry: xrun: nsec = get_time_ns(impl->system); - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, nsec)) >= 0) { + if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) { spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)", - impl, avail, need, suppressed); + queue, avail, need, suppressed); } if (impl->retry_timeout == 0) return -EPIPE; @@ -326,7 +345,59 @@ xrun: static void wakeup_func(void *data, uint64_t count) { struct impl *impl = data; - flush_items(impl); + flush_all_queues(impl); +} + +static void loop_queue_destroy(void *data) +{ + struct queue *queue = data; + struct impl *impl = queue->impl; + spa_list_remove(&queue->link); + spa_system_close(impl->system, queue->ack_fd); + free(queue); +} + +static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, + const void *data, size_t size, bool block, void *user_data) +{ + struct impl *impl = object; + return loop_queue_invoke(impl->queue, func, seq, data, size, block, user_data); +} + +static struct queue *loop_create_queue(void *object, uint32_t flags) +{ + struct impl *impl = object; + struct queue *queue; + int res; + + queue = calloc(1, sizeof(struct queue)); + if (queue == NULL) + return NULL; + + queue->impl = impl; + + queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; + queue->rate_limit.burst = 1; + + queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); + spa_ringbuffer_init(&queue->buffer); + + if ((res = spa_system_eventfd_create(impl->system, + SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { + spa_log_error(impl->log, "%p: can't create ack event: %s", + impl, spa_strerror(res)); + goto error; + } + queue->ack_fd = res; + + spa_list_append(&impl->queue_list, &queue->link); + + return queue; + +error: + free(queue); + errno = -res; + return NULL; } static int loop_get_fd(void *object) @@ -376,7 +447,7 @@ static void loop_leave(void *object) if (--impl->enter_count == 0) { impl->thread = 0; - flush_items(impl); + flush_all_queues(impl); impl->polling = false; } } @@ -956,6 +1027,7 @@ static int impl_clear(struct spa_handle *handle) { struct impl *impl; struct source_impl *source; + struct queue *queue; spa_return_val_if_fail(handle != NULL, -EINVAL); @@ -967,8 +1039,9 @@ static int impl_clear(struct spa_handle *handle) spa_list_consume(source, &impl->source_list, link) loop_destroy_source(impl, &source->source); + spa_list_consume(queue, &impl->queue_list, link) + loop_queue_destroy(queue); - spa_system_close(impl->system, impl->ack_fd); spa_system_close(impl->system, impl->poll_fd); return 0; @@ -1030,8 +1103,6 @@ impl_init(const struct spa_handle_factory *factory, res = -EINVAL; goto error_exit; } - impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; - impl->rate_limit.burst = 1; if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { spa_log_error(impl->log, "%p: can't create pollfd: %s", @@ -1041,26 +1112,23 @@ impl_init(const struct spa_handle_factory *factory, impl->poll_fd = res; spa_list_init(&impl->source_list); + spa_list_init(&impl->queue_list); spa_list_init(&impl->destroy_list); spa_hook_list_init(&impl->hooks_list); - impl->buffer_data = SPA_PTR_ALIGN(impl->buffer_mem, MAX_ALIGN, uint8_t); - spa_ringbuffer_init(&impl->buffer); - impl->wakeup = loop_add_event(impl, wakeup_func, impl); if (impl->wakeup == NULL) { res = -errno; spa_log_error(impl->log, "%p: can't create wakeup event: %m", impl); goto error_exit_free_poll; } - if ((res = spa_system_eventfd_create(impl->system, - SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { - spa_log_error(impl->log, "%p: can't create ack event: %s", - impl, spa_strerror(res)); + + impl->queue = loop_create_queue(impl, 0); + if (impl->queue == NULL) { + res = -errno; + spa_log_error(impl->log, "%p: can't create queue: %m", impl); goto error_exit_free_wakeup; } - impl->ack_fd = res; - spa_log_debug(impl->log, "%p: initialized", impl); return 0;