loop: refcount the queues

The loop in the TSS gets an extra refcount and is unreffed when the TSS
destroy is called.

We can then also ref the queue during the function callback. When the
queue (thread) was destroyed during the callback, ignore the result and
continue with the next queues.

See #4356
This commit is contained in:
Wim Taymans 2024-10-21 17:47:31 +02:00
parent bb53aa08ad
commit 22c45af7e0
2 changed files with 31 additions and 5 deletions

View file

@ -38,7 +38,7 @@ gdb:
$(MAKE) run DBG=gdb
valgrind:
$(MAKE) run DBG="DISABLE_RTKIT=1 PIPEWIRE_DLCLOSE=false valgrind --trace-children=yes"
$(MAKE) run DBG="DISABLE_RTKIT=1 PIPEWIRE_DLCLOSE=false valgrind --trace-children=yes --leak-check=full"
test: all
ninja -C $(BUILD_ROOT) test

View file

@ -85,6 +85,8 @@ struct queue {
struct impl *impl;
struct spa_list link;
int ref;
#define QUEUE_FLAG_NONE (0)
#define QUEUE_FLAG_ACK_FD (1<<0)
#define QUEUE_FLAG_IN_TSS (1<<1)
@ -190,6 +192,7 @@ static struct queue *loop_create_queue(void *object, uint32_t flags)
queue->impl = impl;
queue->flags = flags;
queue->ref = 1;
queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
queue->rate_limit.burst = 1;
@ -208,6 +211,8 @@ static struct queue *loop_create_queue(void *object, uint32_t flags)
} else {
queue->ack_fd = -1;
}
if (flags & QUEUE_FLAG_IN_TSS)
queue->ref++;
pthread_mutex_lock(&impl->queue_lock);
spa_list_append(&impl->queue_list, &queue->link);
@ -223,6 +228,23 @@ error:
return NULL;
}
static inline void loop_queue_ref(struct queue *queue)
{
SPA_ATOMIC_INC(queue->ref);
}
static bool loop_queue_unref(struct queue *queue)
{
bool do_free;
struct impl *impl = queue->impl;
do_free = SPA_ATOMIC_DEC(queue->ref) == 0;
if (do_free) {
spa_log_debug(impl->log, "%p: free queue %p", impl, queue);
free(queue);
}
return do_free;
}
static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b)
{
@ -265,10 +287,14 @@ static void flush_all_queues(struct impl *impl)
* might get overwritten. */
func = spa_steal_ptr(item->func);
if (func) {
loop_queue_ref(queue);
pthread_mutex_unlock(&impl->queue_lock);
item->res = func(&impl->loop, true, item->seq, item->data,
res = func(&impl->loop, true, item->seq, item->data,
item->size, item->user_data);
pthread_mutex_lock(&impl->queue_lock);
if (loop_queue_unref(queue))
continue;
item->res = res;
}
/* if this function did a recursive invoke, it now flushed the
@ -424,9 +450,9 @@ static void loop_queue_destroy(void *data)
if (queue->flags & QUEUE_FLAG_ACK_FD)
spa_system_close(impl->system, queue->ack_fd);
loop_queue_unref(queue);
}
if (!SPA_FLAG_IS_SET(queue->flags, QUEUE_FLAG_IN_TSS))
free(queue);
}
static void loop_queue_destroy_tss(void *data)
@ -435,7 +461,7 @@ static void loop_queue_destroy_tss(void *data)
if (queue) {
SPA_ATOMIC_DEC(queue->impl->tss_ref);
loop_queue_destroy(queue);
free(queue);
loop_queue_unref(queue);
}
}