mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-31 22:25:38 -04:00
spa: serialize in_thread flushes with a mutex
When we have no thread running the loop, we need to flush the queues from the invoking thread. Make sure that when multiple threads attempt this that we serialize the flushing because the flushing code is not thread safe.
This commit is contained in:
parent
1274bc2c42
commit
a4b553f3d4
1 changed files with 30 additions and 6 deletions
|
|
@ -90,6 +90,7 @@ struct impl {
|
|||
|
||||
uint32_t n_queues;
|
||||
struct queue *queues[QUEUES_MAX];
|
||||
pthread_mutex_t queue_lock;
|
||||
|
||||
int poll_fd;
|
||||
pthread_t thread;
|
||||
|
|
@ -390,8 +391,7 @@ loop_queue_invoke(void *object,
|
|||
const void *data,
|
||||
size_t size,
|
||||
bool block,
|
||||
void *user_data,
|
||||
bool in_thread)
|
||||
void *user_data)
|
||||
{
|
||||
struct queue *queue = object, *orig = queue, *overflow;
|
||||
struct impl *impl = queue->impl;
|
||||
|
|
@ -399,8 +399,13 @@ loop_queue_invoke(void *object,
|
|||
int res;
|
||||
int32_t filled;
|
||||
uint32_t avail, idx, offset, l0;
|
||||
bool in_thread;
|
||||
pthread_t loop_thread, current_thread = pthread_self();
|
||||
|
||||
again:
|
||||
loop_thread = impl->thread;
|
||||
in_thread = (loop_thread == 0 || pthread_equal(loop_thread, current_thread));
|
||||
|
||||
filled = spa_ringbuffer_get_write_index(&queue->buffer, &idx);
|
||||
spa_assert_se(filled >= 0 && filled <= DATAS_SIZE && "queue xrun");
|
||||
avail = (uint32_t)(DATAS_SIZE - filled);
|
||||
|
|
@ -448,7 +453,18 @@ again:
|
|||
|
||||
if (in_thread) {
|
||||
put_queue(impl, orig);
|
||||
|
||||
/* when there is no thread running the loop we flush the queues from
|
||||
* this invoking thread but we need to serialize the flushing here with
|
||||
* a mutex */
|
||||
if (loop_thread == 0)
|
||||
pthread_mutex_lock(&impl->queue_lock);
|
||||
|
||||
flush_all_queues(impl);
|
||||
|
||||
if (loop_thread == 0)
|
||||
pthread_mutex_unlock(&impl->queue_lock);
|
||||
|
||||
res = item->res;
|
||||
} else {
|
||||
loop_signal_event(impl, impl->wakeup);
|
||||
|
|
@ -504,7 +520,6 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
|
|||
struct queue *queue;
|
||||
int res = 0, suppressed;
|
||||
uint64_t nsec;
|
||||
bool in_thread = (impl->thread == 0 || pthread_equal(impl->thread, pthread_self()));
|
||||
|
||||
while (true) {
|
||||
queue = get_queue(impl);
|
||||
|
|
@ -527,7 +542,7 @@ static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
|
|||
}
|
||||
usleep(impl->retry_timeout);
|
||||
} else {
|
||||
res = loop_queue_invoke(queue, func, seq, data, size, block, user_data, in_thread);
|
||||
res = loop_queue_invoke(queue, func, seq, data, size, block, user_data);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -1181,6 +1196,8 @@ static int impl_clear(struct spa_handle *handle)
|
|||
|
||||
spa_system_close(impl->system, impl->poll_fd);
|
||||
|
||||
pthread_mutex_destroy(&impl->queue_lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -1209,6 +1226,7 @@ impl_init(const struct spa_handle_factory *factory,
|
|||
{
|
||||
struct impl *impl;
|
||||
const char *str;
|
||||
pthread_mutexattr_t attr;
|
||||
int res;
|
||||
|
||||
spa_return_val_if_fail(factory != NULL, -EINVAL);
|
||||
|
|
@ -1242,6 +1260,10 @@ impl_init(const struct spa_handle_factory *factory,
|
|||
impl->retry_timeout = atoi(str);
|
||||
}
|
||||
|
||||
CHECK(pthread_mutexattr_init(&attr), error_exit);
|
||||
CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit);
|
||||
CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit);
|
||||
|
||||
impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
|
||||
spa_log_topic_init(impl->log, &log_topic);
|
||||
impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System);
|
||||
|
|
@ -1249,12 +1271,12 @@ impl_init(const struct spa_handle_factory *factory,
|
|||
if (impl->system == NULL) {
|
||||
spa_log_error(impl->log, "%p: a System is needed", impl);
|
||||
res = -EINVAL;
|
||||
goto error_exit;
|
||||
goto error_exit_free_mutex;
|
||||
}
|
||||
if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) {
|
||||
spa_log_error(impl->log, "%p: can't create pollfd: %s",
|
||||
impl, spa_strerror(res));
|
||||
goto error_exit;
|
||||
goto error_exit_free_mutex;
|
||||
}
|
||||
impl->poll_fd = res;
|
||||
|
||||
|
|
@ -1277,6 +1299,8 @@ impl_init(const struct spa_handle_factory *factory,
|
|||
|
||||
error_exit_free_poll:
|
||||
spa_system_close(impl->system, impl->poll_fd);
|
||||
error_exit_free_mutex:
|
||||
pthread_mutex_destroy(&impl->queue_lock);
|
||||
error_exit:
|
||||
return res;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue