diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index a098b9349..8af61e7ae 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -90,6 +90,7 @@ struct impl { uint32_t n_queues; struct queue *queues[QUEUES_MAX]; + pthread_mutex_t queue_lock; int poll_fd; pthread_t thread; @@ -390,8 +391,7 @@ loop_queue_invoke(void *object, const void *data, size_t size, bool block, - void *user_data, - bool in_thread) + void *user_data) { struct queue *queue = object, *orig = queue, *overflow; struct impl *impl = queue->impl; @@ -399,8 +399,13 @@ loop_queue_invoke(void *object, int res; int32_t filled; uint32_t avail, idx, offset, l0; + bool in_thread; + pthread_t loop_thread, current_thread = pthread_self(); again: + loop_thread = impl->thread; + in_thread = (loop_thread == 0 || pthread_equal(loop_thread, current_thread)); + filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); avail = (uint32_t)(DATAS_SIZE - filled); @@ -448,7 +453,18 @@ again: if (in_thread) { put_queue(impl, orig); + + /* when there is no thread running the loop we flush the queues from + * this invoking thread but we need to serialize the flushing here with + * a mutex */ + if (loop_thread == 0) + pthread_mutex_lock(&impl->queue_lock); + flush_all_queues(impl); + + if (loop_thread == 0) + pthread_mutex_unlock(&impl->queue_lock); + res = item->res; } else { loop_signal_event(impl, impl->wakeup); @@ -504,7 +520,6 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, struct queue *queue; int res = 0, suppressed; uint64_t nsec; - bool in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); while (true) { queue = get_queue(impl); @@ -527,7 +542,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, } usleep(impl->retry_timeout); } else { - res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread); + res = loop_queue_invoke(queue, func, seq, data, size, block, user_data); break; } } @@ -1181,6 +1196,8 @@ static int impl_clear(struct spa_handle *handle) spa_system_close(impl->system, impl->poll_fd); + pthread_mutex_destroy(&impl->queue_lock); + return 0; } @@ -1209,6 +1226,7 @@ impl_init(const struct spa_handle_factory *factory, { struct impl *impl; const char *str; + pthread_mutexattr_t attr; int res; spa_return_val_if_fail(factory != NULL, -EINVAL); @@ -1242,6 +1260,10 @@ impl_init(const struct spa_handle_factory *factory, impl->retry_timeout = atoi(str); } + CHECK(pthread_mutexattr_init(&attr), error_exit); + CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit); + CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit); + impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); spa_log_topic_init(impl->log, &log_topic); impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); @@ -1249,12 +1271,12 @@ impl_init(const struct spa_handle_factory *factory, if (impl->system == NULL) { spa_log_error(impl->log, "%p: a System is needed", impl); res = -EINVAL; - goto error_exit; + goto error_exit_free_mutex; } if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { spa_log_error(impl->log, "%p: can't create pollfd: %s", impl, spa_strerror(res)); - goto error_exit; + goto error_exit_free_mutex; } impl->poll_fd = res; @@ -1277,6 +1299,8 @@ impl_init(const struct spa_handle_factory *factory, error_exit_free_poll: spa_system_close(impl->system, impl->poll_fd); +error_exit_free_mutex: + pthread_mutex_destroy(&impl->queue_lock); error_exit: return res; }