spa: add locking to the loop

We can add a PTHREAD_PRIO_INHERIT lock to the loop to protect the
callbacks and then use this to update shared data in an RT-safe way.

This can avoid some invoke calls that require a context switch but
also due to the nature of epoll cause locking in the kernel with non-RT
guarantees.

Because we use PRIO_INHERIT, the code executed in the lock must not use
any RT-unsafe functions.
This commit is contained in:
Wim Taymans 2025-03-10 09:13:52 +01:00
parent d258892392
commit 65cbbf1a02
2 changed files with 152 additions and 12 deletions

View file

@ -38,7 +38,7 @@ extern "C" {
struct spa_loop { struct spa_interface iface; };
#define SPA_TYPE_INTERFACE_LoopControl SPA_TYPE_INFO_INTERFACE_BASE "LoopControl"
#define SPA_VERSION_LOOP_CONTROL 1
#define SPA_VERSION_LOOP_CONTROL 2
struct spa_loop_control { struct spa_interface iface; };
#define SPA_TYPE_INTERFACE_LoopUtils SPA_TYPE_INFO_INTERFACE_BASE "LoopUtils"
@ -213,7 +213,7 @@ SPA_API_LOOP void spa_loop_control_hook_after(struct spa_hook_list *l)
struct spa_loop_control_methods {
/* the version of this structure. This can be used to expand this
* structure in the future */
#define SPA_VERSION_LOOP_CONTROL_METHODS 1
#define SPA_VERSION_LOOP_CONTROL_METHODS 2
uint32_t version;
/** get the loop fd
@ -275,6 +275,46 @@ struct spa_loop_control_methods {
* returns 1 on success, 0 or negative errno value on error.
*/
int (*check) (void *object);
/** Lock the loop.
* This will ensure the loop is not in the process of dispatching
* callbacks. Since version 2:2
*
* \param[in] object the control
* \return 0 on success or a negative return value on error.
*/
int (*lock) (void *object);
/** Unlock the loop.
* Unlocks the loop again so that callbacks can be dispatched
* again. Since version 2:2
*
* \param[in] object the control
* \return 0 on success or a negative return value on error.
*/
int (*unlock) (void *object);
/** get the absolute time
* Get the current time with \ref timeout that can be used in wait.
* Since version 2:2
*/
int (*get_time) (void *object, struct timespec *abstime, int64_t timeout);
/** Wait for a signal
* Wait until a thread performs signal. Since version 2:2
*
* \param[in] object the control
* \param[in] abstime the maximum time to wait for the signal or NULL
* \return 0 on success or a negative return value on error.
*/
int (*wait) (void *object, struct timespec *abstime);
/** Signal waiters
* Wake up all thread blocked in wait. Since version 2:2
*
* \param[in] object the control
* \return 0 on success or a negative return value on error.
*/
int (*signal) (void *object);
};
SPA_API_LOOP int spa_loop_control_get_fd(struct spa_loop_control *object)
@ -314,6 +354,33 @@ SPA_API_LOOP int spa_loop_control_check(struct spa_loop_control *object)
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, check, 1);
}
SPA_API_LOOP int spa_loop_control_lock(struct spa_loop_control *object)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, lock, 2);
}
SPA_API_LOOP int spa_loop_control_unlock(struct spa_loop_control *object)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, unlock, 2);
}
SPA_API_LOOP int spa_loop_control_get_time(struct spa_loop_control *object,
struct timespec *abstime, int64_t timeout)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, get_time, 2, abstime, timeout);
}
SPA_API_LOOP int spa_loop_control_wait(struct spa_loop_control *object,
struct timespec *abstime)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, wait, 2, abstime);
}
SPA_API_LOOP int spa_loop_control_signal(struct spa_loop_control *object)
{
return spa_api_method_r(int, -ENOTSUP,
spa_loop_control, &object->iface, signal, 2);
}
typedef void (*spa_source_io_func_t) (void *data, int fd, uint32_t mask);
typedef void (*spa_source_idle_func_t) (void *data);

View file

@ -90,7 +90,8 @@ struct impl {
uint32_t n_queues;
struct queue *queues[QUEUES_MAX];
pthread_mutex_t queue_lock;
pthread_mutex_t lock;
pthread_cond_t cond;
int poll_fd;
pthread_t thread;
@ -458,12 +459,12 @@ again:
* this invoking thread but we need to serialize the flushing here with
* a mutex */
if (loop_thread == 0)
pthread_mutex_lock(&impl->queue_lock);
pthread_mutex_lock(&impl->lock);
flush_all_queues(impl);
if (loop_thread == 0)
pthread_mutex_unlock(&impl->queue_lock);
pthread_mutex_unlock(&impl->lock);
res = item->res;
} else {
@ -596,7 +597,9 @@ static void loop_leave(void *object)
if (--impl->enter_count == 0) {
impl->thread = 0;
pthread_mutex_lock(&impl->lock);
flush_all_queues(impl);
pthread_mutex_unlock(&impl->lock);
impl->polling = false;
}
}
@ -607,6 +610,42 @@ static int loop_check(void *object)
pthread_t thread_id = pthread_self();
return (impl->thread == 0 || pthread_equal(impl->thread, thread_id)) ? 1 : 0;
}
static int loop_lock(void *object)
{
struct impl *impl = object;
return -pthread_mutex_lock(&impl->lock);
}
static int loop_unlock(void *object)
{
struct impl *impl = object;
return -pthread_mutex_unlock(&impl->lock);
}
static int loop_get_time(void *object, struct timespec *abstime, int64_t timeout)
{
if (clock_gettime(CLOCK_REALTIME, abstime) < 0)
return -errno;
abstime->tv_sec += timeout / SPA_NSEC_PER_SEC;
abstime->tv_nsec += timeout % SPA_NSEC_PER_SEC;
if (abstime->tv_nsec >= SPA_NSEC_PER_SEC) {
abstime->tv_sec++;
abstime->tv_nsec -= SPA_NSEC_PER_SEC;
}
return 0;
}
static int loop_wait(void *object, struct timespec *abstime)
{
struct impl *impl = object;
if (abstime)
return pthread_cond_timedwait(&impl->cond, &impl->lock, abstime);
else
return pthread_cond_wait(&impl->cond, &impl->lock);
}
static int loop_signal(void *object)
{
struct impl *impl = object;
return pthread_cond_signal(&impl->cond);
}
static inline void free_source(struct source_impl *s)
{
@ -678,11 +717,13 @@ static int loop_iterate_cancel(void *object, int timeout)
if (SPA_UNLIKELY(!spa_list_is_empty(&impl->destroy_list)))
process_destroy(impl);
pthread_mutex_lock(&impl->lock);
for (i = 0; i < nfds; i++) {
struct spa_source *s = ep[i].data;
if (SPA_LIKELY(s && s->rmask))
s->func(s);
}
pthread_mutex_unlock(&impl->lock);
pthread_cleanup_pop(true);
@ -720,11 +761,14 @@ static int loop_iterate(void *object, int timeout)
if (SPA_UNLIKELY(!spa_list_is_empty(&impl->destroy_list)))
process_destroy(impl);
pthread_mutex_lock(&impl->lock);
for (i = 0; i < nfds; i++) {
struct spa_source *s = ep[i].data;
if (SPA_LIKELY(s && s->rmask))
s->func(s);
}
pthread_mutex_unlock(&impl->lock);
for (i = 0; i < nfds; i++) {
struct spa_source *s = ep[i].data;
if (SPA_LIKELY(s)) {
@ -1126,6 +1170,11 @@ static const struct spa_loop_control_methods impl_loop_control_cancel = {
.leave = loop_leave,
.iterate = loop_iterate_cancel,
.check = loop_check,
.lock = loop_lock,
.unlock = loop_unlock,
.get_time = loop_get_time,
.wait = loop_wait,
.signal = loop_signal,
};
static const struct spa_loop_control_methods impl_loop_control = {
@ -1136,6 +1185,11 @@ static const struct spa_loop_control_methods impl_loop_control = {
.leave = loop_leave,
.iterate = loop_iterate,
.check = loop_check,
.lock = loop_lock,
.unlock = loop_unlock,
.get_time = loop_get_time,
.wait = loop_wait,
.signal = loop_signal,
};
static const struct spa_loop_utils_methods impl_loop_utils = {
@ -1196,7 +1250,8 @@ static int impl_clear(struct spa_handle *handle)
spa_system_close(impl->system, impl->poll_fd);
pthread_mutex_destroy(&impl->queue_lock);
pthread_cond_destroy(&impl->cond);
pthread_mutex_destroy(&impl->lock);
return 0;
}
@ -1227,6 +1282,7 @@ impl_init(const struct spa_handle_factory *factory,
struct impl *impl;
const char *str;
pthread_mutexattr_t attr;
pthread_condattr_t cattr;
int res;
spa_return_val_if_fail(factory != NULL, -EINVAL);
@ -1249,6 +1305,9 @@ impl_init(const struct spa_handle_factory *factory,
SPA_VERSION_LOOP_UTILS,
&impl_loop_utils, impl);
CHECK(pthread_mutexattr_init(&attr), error_exit);
CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit_free_attr);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
impl->retry_timeout = DEFAULT_RETRY;
@ -1258,11 +1317,21 @@ impl_init(const struct spa_handle_factory *factory,
impl->control.iface.cb.funcs = &impl_loop_control_cancel;
if ((str = spa_dict_lookup(info, "loop.retry-timeout")) != NULL)
impl->retry_timeout = atoi(str);
if ((str = spa_dict_lookup(info, "loop.prio-inherit")) != NULL &&
spa_atob(str)) {
CHECK(pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT),
error_exit_free_attr)
}
}
CHECK(pthread_mutexattr_init(&attr), error_exit);
CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit);
CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit);
CHECK(pthread_mutex_init(&impl->lock, &attr), error_exit_free_attr);
pthread_mutexattr_destroy(&attr);
CHECK(pthread_condattr_init(&cattr), error_exit_free_mutex);
CHECK(pthread_condattr_setclock(&cattr, CLOCK_REALTIME), error_exit_free_mutex);
CHECK(pthread_cond_init(&impl->cond, &cattr), error_exit_free_mutex);
pthread_condattr_destroy(&cattr);
impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
spa_log_topic_init(impl->log, &log_topic);
@ -1271,12 +1340,12 @@ impl_init(const struct spa_handle_factory *factory,
if (impl->system == NULL) {
spa_log_error(impl->log, "%p: a System is needed", impl);
res = -EINVAL;
goto error_exit_free_mutex;
goto error_exit_free_cond;
}
if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) {
spa_log_error(impl->log, "%p: can't create pollfd: %s",
impl, spa_strerror(res));
goto error_exit_free_mutex;
goto error_exit_free_cond;
}
impl->poll_fd = res;
@ -1299,8 +1368,12 @@ impl_init(const struct spa_handle_factory *factory,
error_exit_free_poll:
spa_system_close(impl->system, impl->poll_fd);
error_exit_free_cond:
pthread_cond_destroy(&impl->cond);
error_exit_free_mutex:
pthread_mutex_destroy(&impl->queue_lock);
pthread_mutex_destroy(&impl->lock);
error_exit_free_attr:
pthread_mutexattr_destroy(&attr);
error_exit:
return res;
}