From 22c45af7e05bcd958817c1afcb305db968a9f307 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 21 Oct 2024 17:47:31 +0200 Subject: [PATCH] loop: refcount the queues The loop in the TSS gets an extra refcount and is unreffed when the TSS destroy is called. We can then also ref the queue during the function callback. When the queue (thread) was destroyed during the callback, ignore the result and continue with the next queues. See #4356 --- Makefile.in | 2 +- spa/plugins/support/loop.c | 34 ++++++++++++++++++++++++++++++---- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/Makefile.in b/Makefile.in index 67b199bbc..104619316 100644 --- a/Makefile.in +++ b/Makefile.in @@ -38,7 +38,7 @@ gdb: $(MAKE) run DBG=gdb valgrind: - $(MAKE) run DBG="DISABLE_RTKIT=1 PIPEWIRE_DLCLOSE=false valgrind --trace-children=yes" + $(MAKE) run DBG="DISABLE_RTKIT=1 PIPEWIRE_DLCLOSE=false valgrind --trace-children=yes --leak-check=full" test: all ninja -C $(BUILD_ROOT) test diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index b1abe3d17..d3a62db14 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -85,6 +85,8 @@ struct queue { struct impl *impl; struct spa_list link; + int ref; + #define QUEUE_FLAG_NONE (0) #define QUEUE_FLAG_ACK_FD (1<<0) #define QUEUE_FLAG_IN_TSS (1<<1) @@ -190,6 +192,7 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) queue->impl = impl; queue->flags = flags; + queue->ref = 1; queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; queue->rate_limit.burst = 1; @@ -208,6 +211,8 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) } else { queue->ack_fd = -1; } + if (flags & QUEUE_FLAG_IN_TSS) + queue->ref++; pthread_mutex_lock(&impl->queue_lock); spa_list_append(&impl->queue_list, &queue->link); @@ -223,6 +228,23 @@ error: return NULL; } +static inline void loop_queue_ref(struct queue *queue) +{ + SPA_ATOMIC_INC(queue->ref); +} + +static bool loop_queue_unref(struct queue *queue) +{ + bool do_free; + struct impl *impl = queue->impl; + + do_free = SPA_ATOMIC_DEC(queue->ref) == 0; + if (do_free) { + spa_log_debug(impl->log, "%p: free queue %p", impl, queue); + free(queue); + } + return do_free; +} static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { @@ -265,10 +287,14 @@ static void flush_all_queues(struct impl *impl) * might get overwritten. */ func = spa_steal_ptr(item->func); if (func) { + loop_queue_ref(queue); pthread_mutex_unlock(&impl->queue_lock); - item->res = func(&impl->loop, true, item->seq, item->data, + res = func(&impl->loop, true, item->seq, item->data, item->size, item->user_data); pthread_mutex_lock(&impl->queue_lock); + if (loop_queue_unref(queue)) + continue; + item->res = res; } /* if this function did a recursive invoke, it now flushed the @@ -424,9 +450,9 @@ static void loop_queue_destroy(void *data) if (queue->flags & QUEUE_FLAG_ACK_FD) spa_system_close(impl->system, queue->ack_fd); + + loop_queue_unref(queue); } - if (!SPA_FLAG_IS_SET(queue->flags, QUEUE_FLAG_IN_TSS)) - free(queue); } static void loop_queue_destroy_tss(void *data) @@ -435,7 +461,7 @@ static void loop_queue_destroy_tss(void *data) if (queue) { SPA_ATOMIC_DEC(queue->impl->tss_ref); loop_queue_destroy(queue); - free(queue); + loop_queue_unref(queue); } }