diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index cd6994304..ead18328b 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -70,7 +71,8 @@ struct impl { struct spa_source *wakeup; - struct queue *queue; + tss_t queue_tss_id; + pthread_mutex_t queue_lock; unsigned int polling:1; }; @@ -215,8 +217,10 @@ static void queue_flush_items(struct queue *queue) static void flush_all_queues(struct impl *impl) { struct queue *queue; + pthread_mutex_lock(&impl->queue_lock); spa_list_for_each(queue, &impl->queue_list, link) queue_flush_items(queue); + pthread_mutex_unlock(&impl->queue_lock); } static int @@ -352,18 +356,15 @@ static void loop_queue_destroy(void *data) { struct queue *queue = data; struct impl *impl = queue->impl; + + pthread_mutex_lock(&impl->queue_lock); spa_list_remove(&queue->link); + pthread_mutex_unlock(&impl->queue_lock); + spa_system_close(impl->system, queue->ack_fd); free(queue); } -static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, - const void *data, size_t size, bool block, void *user_data) -{ - struct impl *impl = object; - return loop_queue_invoke(impl->queue, func, seq, data, size, block, user_data); -} - static struct queue *loop_create_queue(void *object, uint32_t flags) { struct impl *impl = object; @@ -390,7 +391,11 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) } queue->ack_fd = res; + pthread_mutex_lock(&impl->queue_lock); spa_list_append(&impl->queue_list, &queue->link); + pthread_mutex_unlock(&impl->queue_lock); + + spa_log_info(impl->log, "%p created queue %p", impl, queue); return queue; @@ -400,6 +405,21 @@ error: return NULL; } +static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, + const void *data, size_t size, bool block, void *user_data) +{ + struct impl *impl = object; + struct queue *local_queue = tss_get(impl->queue_tss_id); + + if (local_queue == NULL) { + local_queue = loop_create_queue(impl, 0); + if (local_queue == NULL) + return -errno; + tss_set(impl->queue_tss_id, local_queue); + } + return loop_queue_invoke(local_queue, func, seq, data, size, block, user_data); +} + static int loop_get_fd(void *object) { struct impl *impl = object; @@ -1043,6 +1063,8 @@ static int impl_clear(struct spa_handle *handle) loop_queue_destroy(queue); spa_system_close(impl->system, impl->poll_fd); + pthread_mutex_destroy(&impl->queue_lock); + tss_delete(impl->queue_tss_id); return 0; } @@ -1054,6 +1076,15 @@ impl_get_size(const struct spa_handle_factory *factory, return sizeof(struct impl); } +#define CHECK(expression,label) \ +do { \ + if ((errno = (expression)) != 0) { \ + res = -errno; \ + spa_log_error(impl->log, #expression ": %s", strerror(errno)); \ + goto label; \ + } \ +} while(false); + static int impl_init(const struct spa_handle_factory *factory, struct spa_handle *handle, @@ -1063,6 +1094,7 @@ impl_init(const struct spa_handle_factory *factory, { struct impl *impl; const char *str; + pthread_mutexattr_t attr; int res; spa_return_val_if_fail(factory != NULL, -EINVAL); @@ -1094,6 +1126,10 @@ impl_init(const struct spa_handle_factory *factory, impl->retry_timeout = atoi(str); } + CHECK(pthread_mutexattr_init(&attr), error_exit); + CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit); + CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit); + impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); spa_log_topic_init(impl->log, &log_topic); impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); @@ -1101,13 +1137,12 @@ impl_init(const struct spa_handle_factory *factory, if (impl->system == NULL) { spa_log_error(impl->log, "%p: a System is needed", impl); res = -EINVAL; - goto error_exit; + goto error_exit_free_mutex; } - if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { spa_log_error(impl->log, "%p: can't create pollfd: %s", impl, spa_strerror(res)); - goto error_exit; + goto error_exit_free_mutex; } impl->poll_fd = res; @@ -1123,12 +1158,12 @@ impl_init(const struct spa_handle_factory *factory, goto error_exit_free_poll; } - impl->queue = loop_create_queue(impl, 0); - if (impl->queue == NULL) { + if (tss_create(&impl->queue_tss_id, (tss_dtor_t)loop_queue_destroy) != 0) { res = -errno; - spa_log_error(impl->log, "%p: can't create queue: %m", impl); + spa_log_error(impl->log, "%p: can't create tss: %m", impl); goto error_exit_free_wakeup; } + spa_log_debug(impl->log, "%p: initialized", impl); return 0; @@ -1137,6 +1172,8 @@ error_exit_free_wakeup: loop_destroy_source(impl, impl->wakeup); error_exit_free_poll: spa_system_close(impl->system, impl->poll_fd); +error_exit_free_mutex: + pthread_mutex_destroy(&impl->queue_lock); error_exit: return res; }