support: make the loop queue handling lockfree

Don't use TSS to store per-thread queues but keep a lockfree stack of
queues. We can then pick off a queue and write to that one and place it
back after use.

We need to keep the queues indexed by id in the stack because otherwise
we would need to compare-and-swap 128 bits (pointer + tag), which is
more problematic.

Because we keep the queues in an array and no queue is ever removed and
the array can only grow, we can quite easily just iterate the array
without a lock. Without the lock we also fix one of the potential
problems with ardour where the queue_flush thread is canceled while
flushing and the queue_mutex remains locked.

Because we end up with all queues in the array now, we can overflow the
fixed max amount of queues we can manage. When that happens, sleep for a
while and try again. This is a case where more than QUEUES_MAX (128) threads
are invoking at the same time and is rather unlikely.

There is also the queue overflow case which we now also must handle with
a retry. This potentially uses more eventfds but again this should be
unlikely and cause no further problems.

See #4356
This commit is contained in:
Wim Taymans 2024-11-04 17:26:52 +01:00
parent 9ed57c1dba
commit 9c19284f7f

View file

@ -10,6 +10,7 @@
#include <stdio.h> #include <stdio.h>
#include <pthread.h> #include <pthread.h>
#include <threads.h> #include <threads.h>
#include <stdatomic.h>
#include <spa/support/loop.h> #include <spa/support/loop.h>
#include <spa/support/system.h> #include <spa/support/system.h>
@ -36,6 +37,12 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.loop");
#define DATAS_SIZE (4096*8) #define DATAS_SIZE (4096*8)
#define MAX_EP 32 #define MAX_EP 32
/* the number of concurrent queues for invoke. This is also the number
* of threads that can concurrently invoke. When there are more, the
* retry timeout will be used to retry. */
#define QUEUES_MAX 128
#define DEFAULT_RETRY (1 * SPA_USEC_PER_SEC)
/** \cond */ /** \cond */
struct invoke_item { struct invoke_item {
@ -52,6 +59,17 @@ struct invoke_item {
static int loop_signal_event(void *object, struct spa_source *source); static int loop_signal_event(void *object, struct spa_source *source);
struct queue;
#define IDX_INVALID ((uint16_t)0xffff)
union tag {
struct {
uint16_t idx;
uint16_t count;
} t;
uint32_t v;
};
struct impl { struct impl {
struct spa_handle handle; struct spa_handle handle;
struct spa_loop loop; struct spa_loop loop;
@ -63,18 +81,21 @@ struct impl {
struct spa_list source_list; struct spa_list source_list;
struct spa_list destroy_list; struct spa_list destroy_list;
struct spa_list queue_list;
struct spa_hook_list hooks_list; struct spa_hook_list hooks_list;
int retry_timeout;
union tag head;
uint32_t n_queues;
struct queue *queues[QUEUES_MAX];
int poll_fd; int poll_fd;
pthread_t thread; pthread_t thread;
int enter_count; int enter_count;
struct spa_source *wakeup; struct spa_source *wakeup;
int tss_ref;
tss_t queue_tss_id;
pthread_mutex_t queue_lock;
uint32_t count; uint32_t count;
uint32_t flush_count; uint32_t flush_count;
@ -83,25 +104,19 @@ struct impl {
struct queue { struct queue {
struct impl *impl; struct impl *impl;
struct spa_list link;
int ref; uint16_t idx;
uint16_t next;
#define QUEUE_FLAG_NONE (0)
#define QUEUE_FLAG_ACK_FD (1<<0)
#define QUEUE_FLAG_IN_TSS (1<<1)
uint32_t flags;
struct queue *overflow;
int ack_fd; int ack_fd;
struct spa_ratelimit rate_limit; struct spa_ratelimit rate_limit;
bool destroyed;
struct spa_ringbuffer buffer; struct spa_ringbuffer buffer;
uint8_t *buffer_data; uint8_t *buffer_data;
uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN]; uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN];
}; };
struct source_impl { struct source_impl {
struct spa_source source; struct spa_source source;
@ -180,7 +195,20 @@ static int loop_remove_source(void *object, struct spa_source *source)
return res; return res;
} }
static struct queue *loop_create_queue(void *object, uint32_t flags) static void loop_queue_destroy(void *data)
{
struct queue *queue = data;
struct impl *impl = queue->impl;
if (queue->ack_fd != -1)
spa_system_close(impl->system, queue->ack_fd);
spa_log_info(impl->log, "%p destroyed queue %p idx:%d", impl, queue, queue->idx);
free(queue);
}
static struct queue *loop_create_queue(void *object)
{ {
struct impl *impl = object; struct impl *impl = object;
struct queue *queue; struct queue *queue;
@ -190,9 +218,10 @@ static struct queue *loop_create_queue(void *object, uint32_t flags)
if (queue == NULL) if (queue == NULL)
return NULL; return NULL;
queue->idx = IDX_INVALID;
queue->next = IDX_INVALID;
queue->impl = impl; queue->impl = impl;
queue->flags = flags; queue->ack_fd = -1;
queue->ref = 1;
queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; queue->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
queue->rate_limit.burst = 1; queue->rate_limit.burst = 1;
@ -200,52 +229,88 @@ static struct queue *loop_create_queue(void *object, uint32_t flags)
queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t); queue->buffer_data = SPA_PTR_ALIGN(queue->buffer_mem, MAX_ALIGN, uint8_t);
spa_ringbuffer_init(&queue->buffer); spa_ringbuffer_init(&queue->buffer);
if (flags & QUEUE_FLAG_ACK_FD) { if ((res = spa_system_eventfd_create(impl->system,
if ((res = spa_system_eventfd_create(impl->system, SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) {
SPA_FD_EVENT_SEMAPHORE | SPA_FD_CLOEXEC)) < 0) { spa_log_error(impl->log, "%p: can't create ack event: %s",
spa_log_error(impl->log, "%p: can't create ack event: %s", impl, spa_strerror(res));
impl, spa_strerror(res)); goto error;
}
queue->ack_fd = res;
while (true) {
uint16_t idx = SPA_ATOMIC_LOAD(impl->n_queues);
if (idx >= QUEUES_MAX) {
/* this is pretty bad, there are QUEUES_MAX concurrent threads
* that are doing an invoke */
spa_log_error(impl->log, "max queues %d exceeded!", idx);
res = -ENOSPC;
goto error; goto error;
} }
queue->ack_fd = res; queue->idx = idx;
} else { if (SPA_ATOMIC_CAS(impl->queues[queue->idx], NULL, queue)) {
queue->ack_fd = -1; SPA_ATOMIC_INC(impl->n_queues);
break;
}
} }
if (flags & QUEUE_FLAG_IN_TSS)
queue->ref++;
pthread_mutex_lock(&impl->queue_lock); spa_log_info(impl->log, "%p created queue %p idx:%d %p", impl, queue, queue->idx,
spa_list_append(&impl->queue_list, &queue->link); (void*)pthread_self());
pthread_mutex_unlock(&impl->queue_lock);
spa_log_info(impl->log, "%p created queue %p", impl, queue);
return queue; return queue;
error: error:
free(queue); loop_queue_destroy(queue);
errno = -res; errno = -res;
return NULL; return NULL;
} }
static inline void loop_queue_ref(struct queue *queue)
{
SPA_ATOMIC_INC(queue->ref);
}
static bool loop_queue_unref(struct queue *queue) static inline struct queue *get_queue(struct impl *impl)
{ {
bool do_free; union tag head, next;
struct impl *impl = queue->impl;
do_free = SPA_ATOMIC_DEC(queue->ref) == 0; head.v = SPA_ATOMIC_LOAD(impl->head.v);
if (do_free) {
spa_log_debug(impl->log, "%p: free queue %p", impl, queue); while (true) {
free(queue); struct queue *queue;
if (SPA_UNLIKELY(head.t.idx == IDX_INVALID))
return NULL;
queue = impl->queues[head.t.idx];
next.t.idx = queue->next;
next.t.count = head.t.count+1;
if (SPA_LIKELY(__atomic_compare_exchange_n(&impl->head.v, &head.v, next.v,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))) {
spa_log_trace(impl->log, "%p idx:%d %p", queue, queue->idx, (void*)pthread_self());
return queue;
}
} }
return do_free; return NULL;
} }
static inline void put_queue(struct impl *impl, struct queue *queue)
{
union tag head, next;
spa_log_trace(impl->log, "%p idx:%d %p", queue, queue->idx, (void*)pthread_self());
head.v = SPA_ATOMIC_LOAD(impl->head.v);
while (true) {
queue->next = head.t.idx;
next.t.idx = queue->idx;
next.t.count = head.t.count+1;
if (SPA_LIKELY(__atomic_compare_exchange_n(&impl->head.v, &head.v, next.v,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)))
break;
}
}
static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b) static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b)
{ {
return (int32_t)(a->count - b->count); return (int32_t)(a->count - b->count);
@ -256,27 +321,36 @@ static void flush_all_queues(struct impl *impl)
uint32_t flush_count; uint32_t flush_count;
int res; int res;
pthread_mutex_lock(&impl->queue_lock); flush_count = SPA_ATOMIC_INC(impl->flush_count);
flush_count = ++impl->flush_count;
while (true) { while (true) {
struct queue *cqueue, *queue = NULL; struct queue *cqueue, *queue = NULL;
struct invoke_item *citem, *item = NULL; struct invoke_item *citem, *item = NULL;
uint32_t cindex, index; uint32_t cindex, index;
spa_invoke_func_t func; spa_invoke_func_t func;
bool block; bool block;
uint32_t i, n_queues, old_queues;
spa_list_for_each(cqueue, &impl->queue_list, link) { n_queues = SPA_ATOMIC_LOAD(impl->n_queues);
if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) < do {
(int32_t)sizeof(struct invoke_item)) old_queues = n_queues;
continue; for (i = 0; i < n_queues; i++) {
citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item); cqueue = impl->queues[i];
if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) <
(int32_t)sizeof(struct invoke_item))
continue;
if (item == NULL || item_compare(citem, item) < 0) { citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item);
item = citem;
queue = cqueue; if (item == NULL || item_compare(citem, item) < 0) {
index = cindex; item = citem;
queue = cqueue;
index = cindex;
}
} }
n_queues = SPA_ATOMIC_LOAD(impl->n_queues);
} }
while (n_queues != old_queues);
if (item == NULL) if (item == NULL)
break; break;
@ -287,19 +361,13 @@ static void flush_all_queues(struct impl *impl)
* might get overwritten. */ * might get overwritten. */
func = spa_steal_ptr(item->func); func = spa_steal_ptr(item->func);
if (func) { if (func) {
loop_queue_ref(queue); item->res = func(&impl->loop, true, item->seq, item->data,
pthread_mutex_unlock(&impl->queue_lock);
res = func(&impl->loop, true, item->seq, item->data,
item->size, item->user_data); item->size, item->user_data);
pthread_mutex_lock(&impl->queue_lock);
if (loop_queue_unref(queue))
continue;
item->res = res;
} }
/* if this function did a recursive invoke, it now flushed the /* if this function did a recursive invoke, it now flushed the
* ringbuffer and we can exit */ * ringbuffer and we can exit */
if (flush_count != impl->flush_count) if (flush_count != SPA_ATOMIC_LOAD(impl->flush_count))
break; break;
index += item->item_size; index += item->item_size;
@ -312,7 +380,6 @@ static void flush_all_queues(struct impl *impl)
queue, queue->ack_fd, spa_strerror(res)); queue, queue->ack_fd, spa_strerror(res));
} }
} }
pthread_mutex_unlock(&impl->queue_lock);
} }
static int static int
@ -327,23 +394,18 @@ loop_queue_invoke(void *object,
struct queue *queue = object; struct queue *queue = object;
struct impl *impl = queue->impl; struct impl *impl = queue->impl;
struct invoke_item *item; struct invoke_item *item;
int res, suppressed; int res;
int32_t filled; int32_t filled;
uint32_t avail, idx, offset, l0; uint32_t avail, idx, offset, l0;
size_t need;
uint64_t nsec;
bool in_thread; bool in_thread;
in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self())); in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()));
retry:
filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx); filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx);
spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun"); spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun");
avail = (uint32_t)(DATAS_SIZE - filled); avail = (uint32_t)(DATAS_SIZE - filled);
if (avail < sizeof(struct invoke_item)) { if (avail < sizeof(struct invoke_item))
need = sizeof(struct invoke_item);
goto xrun; goto xrun;
}
offset = idx & (DATAS_SIZE - 1); offset = idx & (DATAS_SIZE - 1);
/* l0 is remaining size in ringbuffer, this should always be larger than /* l0 is remaining size in ringbuffer, this should always be larger than
@ -360,7 +422,7 @@ retry:
item->res = 0; item->res = 0;
item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN); item->item_size = SPA_ROUND_UP_N(sizeof(struct invoke_item) + size, ITEM_ALIGN);
spa_log_trace_fp(impl->log, "%p: add item %p filled:%d", queue, item, filled); spa_log_trace(impl->log, "%p: add item %p filled:%d block:%d", queue, item, filled, block);
if (l0 >= item->item_size) { if (l0 >= item->item_size) {
/* item + size fit in current ringbuffer idx */ /* item + size fit in current ringbuffer idx */
@ -376,16 +438,16 @@ retry:
item->data = queue->buffer_data; item->data = queue->buffer_data;
item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN); item->item_size = SPA_ROUND_UP_N(l0 + size, ITEM_ALIGN);
} }
if (avail < item->item_size) { if (avail < item->item_size)
need = item->item_size;
goto xrun; goto xrun;
}
if (data && size > 0) if (data && size > 0)
memcpy(item->data, data, size); memcpy(item->data, data, size);
spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size); spa_ringbuffer_write_update(&queue->buffer, idx + item->item_size);
if (in_thread) { if (in_thread) {
put_queue(impl, queue);
flush_all_queues(impl); flush_all_queues(impl);
res = item->res; res = item->res;
} else { } else {
@ -410,23 +472,12 @@ retry:
else else
res = 0; res = 0;
} }
put_queue(impl, queue);
} }
return res; return res;
xrun: xrun:
if (queue->overflow == NULL) { put_queue(impl, queue);
nsec = get_time_ns(impl->system); return -ENOSPC;
if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: queue full %d, need %zd (%d suppressed)",
queue, avail, need, suppressed);
}
queue->overflow = loop_create_queue(impl, QUEUE_FLAG_NONE);
if (queue->overflow == NULL)
return -errno;
queue->overflow->ack_fd = queue->ack_fd;
}
queue = queue->overflow;
goto retry;
} }
static void wakeup_func(void *data, uint64_t count) static void wakeup_func(void *data, uint64_t count)
@ -435,51 +486,41 @@ static void wakeup_func(void *data, uint64_t count)
flush_all_queues(impl); flush_all_queues(impl);
} }
static void loop_queue_destroy(void *data)
{
struct queue *queue = data;
struct impl *impl = queue->impl;
if (SPA_ATOMIC_CAS(queue->destroyed, false, true)) {
pthread_mutex_lock(&impl->queue_lock);
spa_list_remove(&queue->link);
pthread_mutex_unlock(&impl->queue_lock);
if (queue->overflow)
loop_queue_destroy(queue->overflow);
if (queue->flags & QUEUE_FLAG_ACK_FD)
spa_system_close(impl->system, queue->ack_fd);
loop_queue_unref(queue);
}
}
static void loop_queue_destroy_tss(void *data)
{
struct queue *queue = data;
if (queue) {
SPA_ATOMIC_DEC(queue->impl->tss_ref);
loop_queue_destroy(queue);
loop_queue_unref(queue);
}
}
static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq, static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
const void *data, size_t size, bool block, void *user_data) const void *data, size_t size, bool block, void *user_data)
{ {
struct impl *impl = object; struct impl *impl = object;
struct queue *local_queue; struct queue *queue;
int res = 0, suppressed;
uint64_t nsec;
local_queue = tss_get(impl->queue_tss_id); while (true) {
if (local_queue == NULL) { queue = get_queue(impl);
local_queue = loop_create_queue(impl, QUEUE_FLAG_ACK_FD | QUEUE_FLAG_IN_TSS); if (SPA_UNLIKELY(queue == NULL))
if (local_queue == NULL) queue = loop_create_queue(impl);
return -errno; if (SPA_UNLIKELY(queue == NULL)) {
SPA_ATOMIC_INC(impl->tss_ref); if (SPA_UNLIKELY(errno != ENOSPC))
tss_set(impl->queue_tss_id, local_queue); return -errno;
/* there was no space for a new queue. This means QUEUE_MAX
* threads are concurrently doing an invoke. We can wait a little
* and retry to get a queue */
if (impl->retry_timeout == 0)
return -EPIPE;
usleep(impl->retry_timeout);
} else {
res = loop_queue_invoke(queue, func, seq, data, size, block, user_data);
if (SPA_LIKELY(res != -ENOSPC))
break;
}
/* the queue was full or no more queues, retry and use another queue */
nsec = get_time_ns(impl->system);
if ((suppressed = spa_ratelimit_test(&queue->rate_limit, nsec)) >= 0) {
spa_log_warn(impl->log, "%p: out of queues, retrying (%d suppressed)",
queue, suppressed);
}
} }
return loop_queue_invoke(local_queue, func, seq, data, size, block, user_data); return res;
} }
static int loop_get_fd(void *object) static int loop_get_fd(void *object)
@ -1110,31 +1151,24 @@ static int impl_clear(struct spa_handle *handle)
{ {
struct impl *impl; struct impl *impl;
struct source_impl *source; struct source_impl *source;
struct queue *queue; uint32_t i;
spa_return_val_if_fail(handle != NULL, -EINVAL); spa_return_val_if_fail(handle != NULL, -EINVAL);
impl = (struct impl *) handle; impl = (struct impl *) handle;
spa_log_debug(impl->log, "%p: clear", impl);
if (impl->enter_count != 0 || impl->polling) if (impl->enter_count != 0 || impl->polling)
spa_log_warn(impl->log, "%p: loop is entered %d times polling:%d", spa_log_warn(impl->log, "%p: loop is entered %d times polling:%d",
impl, impl->enter_count, impl->polling); impl, impl->enter_count, impl->polling);
spa_list_consume(source, &impl->source_list, link) spa_list_consume(source, &impl->source_list, link)
loop_destroy_source(impl, &source->source); loop_destroy_source(impl, &source->source);
spa_list_consume(queue, &impl->queue_list, link) for (i = 0; i < impl->n_queues; i++)
loop_queue_destroy(queue); loop_queue_destroy(impl->queues[i]);
/* free the tss from this thread if any */
loop_queue_destroy_tss(tss_get(impl->queue_tss_id));
spa_system_close(impl->system, impl->poll_fd); spa_system_close(impl->system, impl->poll_fd);
pthread_mutex_destroy(&impl->queue_lock);
if (SPA_ATOMIC_DEC(impl->tss_ref) != 0)
spa_log_warn(impl->log, "%p: loop still has %d queues in TSS",
impl, impl->tss_ref);
tss_delete(impl->queue_tss_id);
return 0; return 0;
} }
@ -1164,7 +1198,6 @@ impl_init(const struct spa_handle_factory *factory,
{ {
struct impl *impl; struct impl *impl;
const char *str; const char *str;
pthread_mutexattr_t attr;
int res; int res;
spa_return_val_if_fail(factory != NULL, -EINVAL); spa_return_val_if_fail(factory != NULL, -EINVAL);
@ -1187,16 +1220,15 @@ impl_init(const struct spa_handle_factory *factory,
SPA_VERSION_LOOP_UTILS, SPA_VERSION_LOOP_UTILS,
&impl_loop_utils, impl); &impl_loop_utils, impl);
impl->retry_timeout = DEFAULT_RETRY;
if (info) { if (info) {
if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL && if ((str = spa_dict_lookup(info, "loop.cancel")) != NULL &&
spa_atob(str)) spa_atob(str))
impl->control.iface.cb.funcs = &impl_loop_control_cancel; impl->control.iface.cb.funcs = &impl_loop_control_cancel;
if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL)
impl->retry_timeout = atoi(str);
} }
CHECK(pthread_mutexattr_init(&attr), error_exit);
CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit);
CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit);
impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
spa_log_topic_init(impl->log, &log_topic); spa_log_topic_init(impl->log, &log_topic);
impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System);
@ -1204,17 +1236,16 @@ impl_init(const struct spa_handle_factory *factory,
if (impl->system == NULL) { if (impl->system == NULL) {
spa_log_error(impl->log, "%p: a System is needed", impl); spa_log_error(impl->log, "%p: a System is needed", impl);
res = -EINVAL; res = -EINVAL;
goto error_exit_free_mutex; goto error_exit;
} }
if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) {
spa_log_error(impl->log, "%p: can't create pollfd: %s", spa_log_error(impl->log, "%p: can't create pollfd: %s",
impl, spa_strerror(res)); impl, spa_strerror(res));
goto error_exit_free_mutex; goto error_exit;
} }
impl->poll_fd = res; impl->poll_fd = res;
spa_list_init(&impl->source_list); spa_list_init(&impl->source_list);
spa_list_init(&impl->queue_list);
spa_list_init(&impl->destroy_list); spa_list_init(&impl->destroy_list);
spa_hook_list_init(&impl->hooks_list); spa_hook_list_init(&impl->hooks_list);
@ -1225,23 +1256,14 @@ impl_init(const struct spa_handle_factory *factory,
goto error_exit_free_poll; goto error_exit_free_poll;
} }
if (tss_create(&impl->queue_tss_id, (tss_dtor_t)loop_queue_destroy_tss) != thrd_success) { impl->head.t.idx = IDX_INVALID;
res = -errno;
spa_log_error(impl->log, "%p: can't create tss: %m", impl);
goto error_exit_free_wakeup;
}
impl->tss_ref = 1;
spa_log_debug(impl->log, "%p: initialized", impl); spa_log_debug(impl->log, "%p: initialized", impl);
return 0; return 0;
error_exit_free_wakeup:
loop_destroy_source(impl, impl->wakeup);
error_exit_free_poll: error_exit_free_poll:
spa_system_close(impl->system, impl->poll_fd); spa_system_close(impl->system, impl->poll_fd);
error_exit_free_mutex:
pthread_mutex_destroy(&impl->queue_lock);
error_exit: error_exit:
return res; return res;
} }