loop: move thread-loop to support loop

Add more synchronization primitives to spa loop so that we can replace
the thread-loop with it.
This commit is contained in:
Wim Taymans 2025-03-10 13:31:41 +01:00
parent cd1d9ceff1
commit fb49e0795c
6 changed files with 133 additions and 194 deletions

View file

@ -306,15 +306,27 @@ struct spa_loop_control_methods {
* \param[in] abstime the maximum time to wait for the signal or NULL
* \return 0 on success or a negative return value on error.
*/
int (*wait) (void *object, struct timespec *abstime);
int (*wait) (void *object, const struct timespec *abstime);
/** Signal waiters
* Wake up all thread blocked in wait. Since version 2:2
* Wake up all threads blocked in wait. Since version 2:2
* When wait_for_accept is set, this functions blocks until all
* threads performed accept.
*
* \param[in] object the control
* \param[in] wait_for_accept block for accept
* \return 0 on success or a negative return value on error.
*/
int (*signal) (void *object, bool wait_for_accept);
/** Accept signalers
* Resume the thread that signaled with wait_for accept.
*
* \param[in] object the control
* \return 0 on success or a negative return value on error.
*/
int (*signal) (void *object);
int (*accept) (void *object);
};
SPA_API_LOOP int spa_loop_control_get_fd(struct spa_loop_control *object)
@ -371,15 +383,20 @@ SPA_API_LOOP int spa_loop_control_get_time(struct spa_loop_control *object,
spa_loop_control, &object->iface, get_time, 2, abstime, timeout);
}
SPA_API_LOOP int spa_loop_control_wait(struct spa_loop_control *object,
struct timespec *abstime)
const struct timespec *abstime)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, wait, 2, abstime);
}
SPA_API_LOOP int spa_loop_control_signal(struct spa_loop_control *object)
SPA_API_LOOP int spa_loop_control_signal(struct spa_loop_control *object, bool wait_for_accept)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, signal, 2);
spa_loop_control, &object->iface, signal, 2, wait_for_accept);
}
SPA_API_LOOP int spa_loop_control_accept(struct spa_loop_control *object)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, accept, 2);
}
typedef void (*spa_source_io_func_t) (void *data, int fd, uint32_t mask);

View file

@ -92,10 +92,14 @@ struct impl {
struct queue *queues[QUEUES_MAX];
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_cond_t accept_cond;
int n_waiting;
int n_waiting_for_accept;
int poll_fd;
pthread_t thread;
int enter_count;
int recurse;
struct spa_source *wakeup;
@ -573,6 +577,7 @@ static void loop_enter(void *object)
struct impl *impl = object;
pthread_t thread_id = pthread_self();
pthread_mutex_lock(&impl->lock);
if (impl->enter_count == 0) {
spa_return_if_fail(impl->thread == 0);
impl->thread = thread_id;
@ -597,28 +602,49 @@ static void loop_leave(void *object)
if (--impl->enter_count == 0) {
impl->thread = 0;
pthread_mutex_lock(&impl->lock);
flush_all_queues(impl);
pthread_mutex_unlock(&impl->lock);
impl->polling = false;
}
pthread_mutex_unlock(&impl->lock);
}
static int loop_check(void *object)
{
struct impl *impl = object;
pthread_t thread_id = pthread_self();
return (impl->thread == 0 || pthread_equal(impl->thread, thread_id)) ? 1 : 0;
int res;
/* we are in the thread running the loop */
if (impl->thread == 0 || pthread_equal(impl->thread, thread_id))
return 1;
/* if lock taken by something else, error */
if ((res = pthread_mutex_trylock(&impl->lock)) != 0)
return -res;
/* we could take the lock, check if we actually locked it somewhere */
res = impl->recurse > 0 ? 1 : -EPERM;
pthread_mutex_unlock(&impl->lock);
return res;
}
static int loop_lock(void *object)
{
struct impl *impl = object;
return -pthread_mutex_lock(&impl->lock);
int res;
if ((res = pthread_mutex_lock(&impl->lock)) == 0)
impl->recurse++;
return -res;
}
static int loop_unlock(void *object)
{
struct impl *impl = object;
return -pthread_mutex_unlock(&impl->lock);
int res;
spa_return_val_if_fail(impl->recurse > 0, -EIO);
impl->recurse--;
if ((res = pthread_mutex_unlock(&impl->lock)) != 0)
impl->recurse++;
return -res;
}
static int loop_get_time(void *object, struct timespec *abstime, int64_t timeout)
{
@ -633,18 +659,46 @@ static int loop_get_time(void *object, struct timespec *abstime, int64_t timeout
}
return 0;
}
static int loop_wait(void *object, struct timespec *abstime)
static int loop_wait(void *object, const struct timespec *abstime)
{
struct impl *impl = object;
int res;
impl->n_waiting++;
impl->recurse--;
if (abstime)
return pthread_cond_timedwait(&impl->cond, &impl->lock, abstime);
res = pthread_cond_timedwait(&impl->cond, &impl->lock, abstime);
else
return pthread_cond_wait(&impl->cond, &impl->lock);
res = pthread_cond_wait(&impl->cond, &impl->lock);
impl->recurse++;
impl->n_waiting--;
return -res;
}
static int loop_signal(void *object)
static int loop_signal(void *object, bool wait_for_accept)
{
struct impl *impl = object;
return pthread_cond_signal(&impl->cond);
int res;
if (impl->n_waiting > 0)
if ((res = pthread_cond_broadcast(&impl->cond)) != 0)
return -res;
if (wait_for_accept) {
impl->n_waiting_for_accept++;
while (impl->n_waiting_for_accept > 0) {
if ((res = pthread_cond_wait(&impl->accept_cond, &impl->lock)) != 0)
return -res;
}
}
return res;
}
static int loop_accept(void *object)
{
struct impl *impl = object;
impl->n_waiting_for_accept--;
return -pthread_cond_signal(&impl->accept_cond);
}
static inline void free_source(struct source_impl *s)
@ -689,9 +743,11 @@ static int loop_iterate_cancel(void *object, int timeout)
impl->polling = true;
spa_loop_control_hook_before(&impl->hooks_list);
pthread_mutex_unlock(&impl->lock);
nfds = spa_system_pollfd_wait(impl->system, impl->poll_fd, ep, SPA_N_ELEMENTS(ep), timeout);
pthread_mutex_lock(&impl->lock);
spa_loop_control_hook_after(&impl->hooks_list);
impl->polling = false;
@ -717,13 +773,11 @@ static int loop_iterate_cancel(void *object, int timeout)
if (SPA_UNLIKELY(!spa_list_is_empty(&impl->destroy_list)))
process_destroy(impl);
pthread_mutex_lock(&impl->lock);
for (i = 0; i < nfds; i++) {
struct spa_source *s = ep[i].data;
if (SPA_LIKELY(s && s->rmask))
s->func(s);
}
pthread_mutex_unlock(&impl->lock);
pthread_cleanup_pop(true);
@ -738,9 +792,11 @@ static int loop_iterate(void *object, int timeout)
impl->polling = true;
spa_loop_control_hook_before(&impl->hooks_list);
pthread_mutex_unlock(&impl->lock);
nfds = spa_system_pollfd_wait(impl->system, impl->poll_fd, ep, SPA_N_ELEMENTS(ep), timeout);
pthread_mutex_lock(&impl->lock);
spa_loop_control_hook_after(&impl->hooks_list);
impl->polling = false;
@ -761,13 +817,11 @@ static int loop_iterate(void *object, int timeout)
if (SPA_UNLIKELY(!spa_list_is_empty(&impl->destroy_list)))
process_destroy(impl);
pthread_mutex_lock(&impl->lock);
for (i = 0; i < nfds; i++) {
struct spa_source *s = ep[i].data;
if (SPA_LIKELY(s && s->rmask))
s->func(s);
}
pthread_mutex_unlock(&impl->lock);
for (i = 0; i < nfds; i++) {
struct spa_source *s = ep[i].data;
@ -1175,6 +1229,7 @@ static const struct spa_loop_control_methods impl_loop_control_cancel = {
.get_time = loop_get_time,
.wait = loop_wait,
.signal = loop_signal,
.accept = loop_accept,
};
static const struct spa_loop_control_methods impl_loop_control = {
@ -1190,6 +1245,7 @@ static const struct spa_loop_control_methods impl_loop_control = {
.get_time = loop_get_time,
.wait = loop_wait,
.signal = loop_signal,
.accept = loop_accept,
};
static const struct spa_loop_utils_methods impl_loop_utils = {
@ -1331,6 +1387,7 @@ impl_init(const struct spa_handle_factory *factory,
CHECK(pthread_condattr_setclock(&cattr, CLOCK_REALTIME), error_exit_free_mutex);
CHECK(pthread_cond_init(&impl->cond, &cattr), error_exit_free_mutex);
CHECK(pthread_cond_init(&impl->accept_cond, &cattr), error_exit_free_mutex);
pthread_condattr_destroy(&cattr);
impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);