diff --git a/Makefile.in b/Makefile.in index 67b199bbc..104619316 100644 --- a/Makefile.in +++ b/Makefile.in @@ -38,7 +38,7 @@ gdb: $(MAKE) run DBG=gdb valgrind: - $(MAKE) run DBG="DISABLE_RTKIT=1 PIPEWIRE_DLCLOSE=false valgrind --trace-children=yes" + $(MAKE) run DBG="DISABLE_RTKIT=1 PIPEWIRE_DLCLOSE=false valgrind --trace-children=yes --leak-check=full" test: all ninja -C $(BUILD_ROOT) test diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index b1abe3d17..d3a62db14 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -85,6 +85,8 @@ struct queue { struct impl *impl; struct spa_list link; + int ref; + #define QUEUE_FLAG_NONE (0) #define QUEUE_FLAG_ACK_FD (1<<0) #define QUEUE_FLAG_IN_TSS (1<<1) @@ -190,6 +192,7 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) queue->impl = impl; queue->flags = flags; + queue->ref = 1; queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; queue->rate_limit.burst = 1; @@ -208,6 +211,8 @@ static struct queue *loop_create_queue(void *object, uint32_t flags) } else { queue->ack_fd = -1; } + if (flags & QUEUE_FLAG_IN_TSS) + queue->ref++; pthread_mutex_lock(&impl->queue_lock); spa_list_append(&impl->queue_list, &queue->link); @@ -223,6 +228,23 @@ error: return NULL; } +static inline void loop_queue_ref(struct queue *queue) +{ + SPA_ATOMIC_INC(queue->ref); +} + +static bool loop_queue_unref(struct queue *queue) +{ + bool do_free; + struct impl *impl = queue->impl; + + do_free = SPA_ATOMIC_DEC(queue->ref) == 0; + if (do_free) { + spa_log_debug(impl->log, "%p: free queue %p", impl, queue); + free(queue); + } + return do_free; +} static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) { @@ -265,10 +287,14 @@ static void flush_all_queues(struct impl *impl) * might get overwritten. */ func = spa_steal_ptr(item->func); if (func) { + loop_queue_ref(queue); pthread_mutex_unlock(&impl->queue_lock); - item->res = func(&impl->loop, true, item->seq, item->data, + res = func(&impl->loop, true, item->seq, item->data, item->size, item->user_data); pthread_mutex_lock(&impl->queue_lock); + if (loop_queue_unref(queue)) + continue; + item->res = res; } /* if this function did a recursive invoke, it now flushed the @@ -424,9 +450,9 @@ static void loop_queue_destroy(void *data) if (queue->flags & QUEUE_FLAG_ACK_FD) spa_system_close(impl->system, queue->ack_fd); + + loop_queue_unref(queue); } - if (!SPA_FLAG_IS_SET(queue->flags, QUEUE_FLAG_IN_TSS)) - free(queue); } static void loop_queue_destroy_tss(void *data) @@ -435,7 +461,7 @@ static void loop_queue_destroy_tss(void *data) if (queue) { SPA_ATOMIC_DEC(queue->impl->tss_ref); loop_queue_destroy(queue); - free(queue); + loop_queue_unref(queue); } }