From 1de16afc40b16f198a94e46ac6ad75949f2de81e Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 5 Jul 2021 10:26:56 +0200 Subject: [PATCH] modules: use pw_thread_utils interface for module-rt* Let the module-rt and module-rtkit provide a new pw_thread_utils implementation. The pw_thread_utils interface has many advantages over the old way of boosting the thread: 1. Does not require to add a source to the loop and do priority boost in-thread. 2. Works on simple threads without data-loop interface. 3. Allows dropping RT priority as well. The rtkit implementation requires a bit more work because there is currently no pthread API to map an pthread to a tid. We make a small wrapper thread to capture the tid with getpid() to do the mapping ourselves. This should go when the new API arrives. See also: (https://sourceware.org/bugzilla/show_bug.cgi?id=27880) --- src/modules/module-rt.c | 201 +++++++++++++----------- src/modules/module-rtkit.c | 312 ++++++++++++++++++++++++------------- src/pipewire/context.c | 12 +- src/pipewire/impl-module.c | 2 +- 4 files changed, 324 insertions(+), 203 deletions(-) diff --git a/src/modules/module-rt.c b/src/modules/module-rt.c index ffbc1a8a7..ab8b8cf4a 100644 --- a/src/modules/module-rt.c +++ b/src/modules/module-rt.c @@ -32,9 +32,13 @@ #include #include #include +#include + +#include +#include #include -#include +#include #include "config.h" @@ -66,9 +70,7 @@ static const struct spa_dict_item module_props[] = { struct impl { struct pw_context *context; - struct spa_loop *loop; - struct spa_system *system; - struct spa_source source; + struct pw_thread_utils thread_utils; int rt_prio; rlim_t rt_time_soft; @@ -77,26 +79,11 @@ struct impl { struct spa_hook module_listener; }; -static int do_remove_source(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct spa_source *source = user_data; - - spa_loop_remove_source(loop, source); - - return 0; -} - static void module_destroy(void *data) { struct impl *impl = data; - + pw_thread_utils_set_impl(NULL); spa_hook_remove(&impl->module_listener); - - if (impl->source.fd != -1) { - spa_loop_invoke(impl->loop, do_remove_source, SPA_ID_INVALID, NULL, 0, true, &impl->source); - spa_system_close(impl->system, impl->source.fd); - impl->source.fd = -1; - } free(impl); } @@ -105,57 +92,47 @@ static const struct pw_impl_module_events module_events = { .destroy = module_destroy, }; -static void idle_func(struct spa_source *source) -{ - struct impl *impl = source->data; - uint64_t count; - int policy = SCHED_FIFO; - int rtprio = impl->rt_prio; - struct rlimit rl; - struct sched_param sp; - - if (SPA_UNLIKELY(spa_system_eventfd_read(impl->system, impl->source.fd, &count) < 0)) - pw_log_warn("read failed: %m"); - - if (rtprio < sched_get_priority_min(policy) || - rtprio > sched_get_priority_max(policy)) { - pw_log_warn("invalid priority %d for policy %d", rtprio, policy); - return; - } - - rl.rlim_cur = impl->rt_time_soft; - rl.rlim_max = impl->rt_time_hard; - if (setrlimit(RLIMIT_RTTIME, &rl) < 0) - pw_log_warn("could not set rlimit: %m"); - else - pw_log_debug("rt.prio %d, rt.time.soft %"PRIi64", rt.time.hard %"PRIi64, - rtprio, (int64_t)rl.rlim_cur, (int64_t)rl.rlim_max); - - spa_zero(sp); - sp.sched_priority = rtprio; - if (sched_setscheduler(0, policy | SCHED_RESET_ON_FORK, &sp) < 0) { - pw_log_warn("could not make thread realtime: %m"); - return; - } - - pw_log_info("processing thread has realtime priority %d", rtprio); -} - -static void set_nice(struct impl *impl, int nice_level) +static int set_nice(struct impl *impl, int nice_level) { long tid; - int res; + int res = 0; tid = syscall(SYS_gettid); if (tid < 0) { pw_log_warn("could not get main thread id: %m"); tid = 0; /* means current thread in setpriority() on linux */ } - res = setpriority(PRIO_PROCESS, (id_t)tid, nice_level); + if (setpriority(PRIO_PROCESS, (id_t)tid, nice_level) < 0) + res = -errno; + if (res < 0) - pw_log_warn("could not set nice-level to %d: %m", nice_level); + pw_log_warn("could not set nice-level to %d: %s", + nice_level, spa_strerror(res)); else - pw_log_info("main thread nice level set to %d", nice_level); + pw_log_info("main thread nice level set to %d", + nice_level); + + return res; +} + +static int set_rlimit(struct impl *impl) +{ + struct rlimit rl; + int res = 0; + + rl.rlim_cur = impl->rt_time_soft; + rl.rlim_max = impl->rt_time_hard; + + if (setrlimit(RLIMIT_RTTIME, &rl) < 0) + res = -errno; + + if (res < 0) + pw_log_warn("could not set rlimit: %s", spa_strerror(res)); + else + pw_log_debug("rt.time.soft %"PRIi64", rt.time.hard %"PRIi64, + (int64_t)rl.rlim_cur, (int64_t)rl.rlim_max); + + return res; } static int get_default_int(struct pw_properties *properties, const char *name, int def) @@ -182,29 +159,82 @@ static int get_default_int(struct pw_properties *properties, const char *name, i return val; } +static struct pw_thread *impl_create(void *data, + const struct spa_dict *props, + void *(*start)(void*), void *arg) +{ + pthread_t pt; + int err; + if ((err = pthread_create(&pt, NULL, start, arg)) != 0) { + errno = err; + return NULL; + } + return (struct pw_thread*)pt; +} + +static int impl_join(void *data, struct pw_thread *thread, void **retval) +{ + pthread_t pt = (pthread_t)thread; + return pthread_join(pt, retval); +} + +static int impl_acquire_rt(void *data, struct pw_thread *thread, int priority) +{ + int err, policy = SCHED_FIFO; + int rtprio = priority; + struct sched_param sp; + pthread_t pt = (pthread_t)thread; + + if (rtprio < sched_get_priority_min(policy) || + rtprio > sched_get_priority_max(policy)) { + pw_log_warn("invalid priority %d for policy %d", rtprio, policy); + return -EINVAL; + } + + spa_zero(sp); + sp.sched_priority = rtprio; + if ((err = pthread_setschedparam(pt, policy | SCHED_RESET_ON_FORK, + &sp)) != 0) { + pw_log_warn("%p: could not make thread realtime: %s", thread, strerror(err)); + return -err; + } + pw_log_info("thread %p has realtime priority %d", thread, rtprio); + return 0; +} + +static int impl_drop_rt(void *data, struct pw_thread *thread) +{ + struct sched_param sp; + pthread_t pt = (pthread_t)thread; + int err; + + spa_zero(sp); + if ((err = pthread_setschedparam(pt, + SCHED_OTHER | SCHED_RESET_ON_FORK, &sp)) != 0) { + pw_log_warn("%p: could not drop realtime: %s", thread, strerror(err)); + return -err; + } + pw_log_info("thread %p dropped realtime priority", thread); + return 0; +} + +static const struct pw_thread_utils_methods impl_thread_utils = { + PW_VERSION_THREAD_UTILS_METHODS, + .create = impl_create, + .join = impl_join, + .acquire_rt = impl_acquire_rt, + .drop_rt = impl_drop_rt, +}; + SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); struct impl *impl; - struct spa_loop *loop; - struct spa_system *system; - const struct spa_support *support; - uint32_t n_support; struct pw_properties *props; int nice_level; int res; - support = pw_context_get_support(context, &n_support); - - loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); - if (loop == NULL) - return -ENOTSUP; - - system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem); - if (system == NULL) - return -ENOTSUP; - impl = calloc(1, sizeof(struct impl)); if (impl == NULL) return -ENOMEM; @@ -212,8 +242,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_debug("module %p: new %s", impl, args); impl->context = context; - impl->loop = loop; - impl->system = system; props = args ? pw_properties_new_string(args) : pw_properties_new(NULL, NULL); if (props == NULL) { res = -errno; @@ -227,19 +255,14 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->rt_time_soft = get_default_int(props, "rt.time.soft", DEFAULT_RT_TIME_SOFT); impl->rt_time_hard = get_default_int(props, "rt.time.hard", DEFAULT_RT_TIME_HARD); - impl->source.loop = loop; - impl->source.func = idle_func; - impl->source.data = impl; - impl->source.fd = spa_system_eventfd_create(system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); - impl->source.mask = SPA_IO_IN; - if (impl->source.fd == -1) { - res = -errno; - goto error; - } + set_rlimit(impl); - spa_loop_add_source(impl->loop, &impl->source); - if (SPA_UNLIKELY(spa_system_eventfd_write(system, impl->source.fd, 1) < 0)) - pw_log_warn("write failed: %m"); + impl->thread_utils.iface = SPA_INTERFACE_INIT( + PW_TYPE_INTERFACE_ThreadUtils, + PW_VERSION_THREAD_UTILS, + &impl_thread_utils, impl); + + pw_thread_utils_set_impl(&impl->thread_utils); pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); diff --git a/src/modules/module-rtkit.c b/src/modules/module-rtkit.c index dc0259b80..7961adcae 100644 --- a/src/modules/module-rtkit.c +++ b/src/modules/module-rtkit.c @@ -27,7 +27,6 @@ #include #include #include -#include #include #ifdef __FreeBSD__ #include @@ -44,6 +43,7 @@ #include #include +#include /** \page page_module_rtkit PipeWire Module: RTKit */ @@ -67,16 +67,28 @@ static const struct spa_dict_item module_props[] = { struct pw_rtkit_bus; +struct thread { + struct impl *impl; + struct spa_list link; + pthread_t thread; + pid_t pid; + void *(*start)(void*); + void *arg; +}; + struct impl { struct pw_context *context; - struct spa_loop *loop; - struct spa_system *system; - struct spa_source source; struct pw_properties *props; struct pw_rtkit_bus *system_bus; + pthread_mutex_t lock; + pthread_cond_t cond; + + struct spa_list threads_list; + struct pw_thread_utils thread_utils; + int nice_level; int rt_prio; rlim_t rt_time_soft; @@ -171,7 +183,7 @@ void pw_rtkit_bus_free(struct pw_rtkit_bus *system_bus) static pid_t _gettid(void) { #ifndef __FreeBSD__ - return (pid_t) syscall(SYS_gettid); + return (pid_t) gettid(); #else long pid; thr_self(&pid); @@ -419,35 +431,13 @@ finish: return ret; } -static int do_remove_source(struct spa_loop *loop, - bool async, - uint32_t seq, - const void *data, - size_t size, - void *user_data) -{ - struct spa_source *source = user_data; - spa_loop_remove_source(loop, source); - return 0; -} - static void module_destroy(void *data) { struct impl *impl = data; + pw_thread_utils_set_impl(NULL); spa_hook_remove(&impl->module_listener); - if (impl->source.fd != -1) { - spa_loop_invoke(impl->loop, - do_remove_source, - SPA_ID_INVALID, - NULL, - 0, - true, - &impl->source); - spa_system_close(impl->system, impl->source.fd); - impl->source.fd = -1; - } pw_properties_free(impl->props); if (impl->system_bus) pw_rtkit_bus_free(impl->system_bus); @@ -459,58 +449,6 @@ static const struct pw_impl_module_events module_events = { .destroy = module_destroy, }; -static void idle_func(struct spa_source *source) -{ - struct impl *impl = source->data; - struct sched_param sp; - struct rlimit rl; - int r, rtprio; - long long rttime; - uint64_t count; - - spa_system_eventfd_read(impl->system, impl->source.fd, &count); - - rtprio = pw_rtkit_get_max_realtime_priority(impl->system_bus); - if (rtprio >= 0) - rtprio = SPA_MIN(rtprio, impl->rt_prio); - else - rtprio = impl->rt_prio; - - spa_zero(sp); - sp.sched_priority = rtprio; - -#ifndef __FreeBSD__ - if (pthread_setschedparam(pthread_self(), SCHED_OTHER | SCHED_RESET_ON_FORK, &sp) == 0) { - pw_log_debug("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); - goto exit; - } -#endif - - rl.rlim_cur = impl->rt_time_soft; - rl.rlim_max = impl->rt_time_hard; - - rttime = pw_rtkit_get_rttime_usec_max(impl->system_bus); - if (rttime >= 0) { - rl.rlim_cur = SPA_MIN(rl.rlim_cur, (rlim_t)rttime); - rl.rlim_max = SPA_MIN(rl.rlim_max, (rlim_t)rttime); - } - - pw_log_debug("rt.prio:%d rt.time.soft:%"PRIi64" rt.time.hard:%"PRIi64, - rtprio, (int64_t)rl.rlim_cur, (int64_t)rl.rlim_max); - - if (setrlimit(RLIMIT_RTTIME, &rl) < 0) - pw_log_debug("setrlimit() failed: %s", strerror(errno)); - - if ((r = pw_rtkit_make_realtime(impl->system_bus, 0, rtprio)) < 0) { - pw_log_warn("could not make thread realtime: %s", spa_strerror(r)); - } else { - pw_log_info("processing thread made realtime prio:%d", rtprio); - } -exit: - pw_rtkit_bus_free(impl->system_bus); - impl->system_bus = NULL; -} - static int set_nice(struct impl *impl, int nice_level) { int res; @@ -523,6 +461,33 @@ static int set_nice(struct impl *impl, int nice_level) return 0; } +static int set_rlimit(struct impl *impl) +{ + struct rlimit rl; + long long rttime; + int res = 0; + + rl.rlim_cur = impl->rt_time_soft; + rl.rlim_max = impl->rt_time_hard; + + rttime = pw_rtkit_get_rttime_usec_max(impl->system_bus); + if (rttime >= 0) { + rl.rlim_cur = SPA_MIN(rl.rlim_cur, (rlim_t)rttime); + rl.rlim_max = SPA_MIN(rl.rlim_max, (rlim_t)rttime); + } + + if (setrlimit(RLIMIT_RTTIME, &rl) < 0) + res = -errno; + + if (res < 0) + pw_log_debug("setrlimit() failed: %s", spa_strerror(res)); + else + pw_log_debug("rt.time.soft:%"PRIi64" rt.time.hard:%"PRIi64, + (int64_t)rl.rlim_cur, (int64_t)rl.rlim_max); + + return res; +} + static int get_default_int(struct pw_properties *properties, const char *name, int def) { int val; @@ -536,29 +501,164 @@ static int get_default_int(struct pw_properties *properties, const char *name, i return val; } +static struct thread *find_thread_by_pt(struct impl *impl, pthread_t pt) +{ + struct thread *t; + + spa_list_for_each(t, &impl->threads_list, link) { + if (pthread_equal(t->thread, pt)) + return t; + } + return NULL; +} + +static void *custom_start(void *data) +{ + struct thread *this = data; + struct impl *impl = this->impl; + + pthread_mutex_lock(&impl->lock); + this->pid = gettid(); + pthread_cond_broadcast(&impl->cond); + pthread_mutex_unlock(&impl->lock); + + return this->start(this->arg); +} + +static struct pw_thread *impl_create(void *data, const struct spa_dict *props, + void *(*start_routine)(void*), void *arg) +{ + struct impl *impl = data; + struct thread *this; + int err; + + this = calloc(1, sizeof(*this)); + this->impl = impl; + this->start = start_routine; + this->arg = arg; + + pthread_mutex_lock(&impl->lock); + err = pthread_create(&this->thread, NULL, custom_start, this); + if (err != 0) + goto exit; + + pthread_cond_wait(&impl->cond, &impl->lock); + + spa_list_append(&impl->threads_list, &this->link); +exit: + pthread_mutex_unlock(&impl->lock); + + if (err != 0) { + errno = err; + free(this); + return NULL; + } + return (struct pw_thread*)this->thread; +} + +static int impl_join(void *data, struct pw_thread *thread, void **retval) +{ + struct impl *impl = data; + pthread_t pt = (pthread_t)thread; + struct thread *thr; + + pthread_mutex_lock(&impl->lock); + if ((thr = find_thread_by_pt(impl, pt)) != NULL) { + spa_list_remove(&thr->link); + free(thr); + } + pthread_mutex_unlock(&impl->lock); + + return pthread_join(pt, retval); +} + +static pid_t impl_gettid(struct impl *impl, pthread_t pt) +{ + struct thread *thr; + pid_t pid; + + pthread_mutex_lock(&impl->lock); + if ((thr = find_thread_by_pt(impl, pt)) != NULL) + pid = thr->pid; + else + pid = getpid(); + pthread_mutex_unlock(&impl->lock); + + return pid; +} + +static int impl_acquire_rt(void *data, struct pw_thread *thread, int priority) +{ + struct impl *impl = data; + struct sched_param sp; + int r, rtprio; + pthread_t pt = (pthread_t)thread; + pid_t pid; + + priority = impl->rt_prio; + + rtprio = pw_rtkit_get_max_realtime_priority(impl->system_bus); + if (rtprio >= 0) + rtprio = SPA_MIN(rtprio, priority); + else + rtprio = priority; + + spa_zero(sp); + sp.sched_priority = rtprio; + +#ifndef __FreeBSD__ + if (pthread_setschedparam(pt, SCHED_OTHER | SCHED_RESET_ON_FORK, &sp) == 0) { + pw_log_debug("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); + } +#endif + + pid = impl_gettid(impl, pt); + + if ((r = pw_rtkit_make_realtime(impl->system_bus, pid, rtprio)) < 0) { + pw_log_warn("could not make thread realtime: %s", spa_strerror(r)); + } else { + pw_log_info("acquired realtime prio:%d", rtprio); + } + return 0; +} + +static int impl_drop_rt(void *data, struct pw_thread *thread) +{ + struct sched_param sp; + pthread_t pt = (pthread_t)thread; + int err; + + spa_zero(sp); +#ifndef __FreeBSD__ + if ((err = pthread_setschedparam(pt, + SCHED_OTHER | SCHED_RESET_ON_FORK, &sp)) != 0) { + pw_log_debug("thread %p: SCHED_OTHER|SCHED_RESET_ON_FORK failed: %s", + thread, strerror(err)); + return -err; + } +#endif + pw_log_info("thread %p dropped realtime priority", thread); + return 0; +} + +static const struct pw_thread_utils_methods impl_thread_utils = { + PW_VERSION_THREAD_UTILS_METHODS, + .create = impl_create, + .join = impl_join, + .acquire_rt = impl_acquire_rt, + .drop_rt = impl_drop_rt, +}; + + SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); struct impl *impl; - struct spa_loop *loop; - struct spa_system *system; - const struct spa_support *support; - uint32_t n_support; const struct pw_properties *props; const char *str; int res; - support = pw_context_get_support(context, &n_support); - - loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); - if (loop == NULL) - return -ENOTSUP; - - system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem); - if (system == NULL) - return -ENOTSUP; - if ((props = pw_context_get_properties(context)) != NULL && (str = pw_properties_get(props, "support.dbus")) != NULL && !pw_properties_parse_bool(str)) @@ -568,11 +668,13 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (impl == NULL) return -ENOMEM; + spa_list_init(&impl->threads_list); + pthread_mutex_init(&impl->lock, NULL); + pthread_cond_init(&impl->cond, NULL); + pw_log_debug("module %p: new", impl); impl->context = context; - impl->loop = loop; - impl->system = system; impl->props = args ? pw_properties_new_string(args) : pw_properties_new(NULL, NULL); if (impl->props == NULL) { res = -errno; @@ -593,18 +695,14 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->rt_time_soft = get_default_int(impl->props, "rt.time.soft", DEFAULT_RT_TIME_SOFT); impl->rt_time_hard = get_default_int(impl->props, "rt.time.hard", DEFAULT_RT_TIME_HARD); - impl->source.loop = loop; - impl->source.func = idle_func; - impl->source.data = impl; - impl->source.fd = spa_system_eventfd_create(system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); - impl->source.mask = SPA_IO_IN; - if (impl->source.fd == -1) { - res = -errno; - goto error; - } + set_rlimit(impl); - spa_loop_add_source(impl->loop, &impl->source); - spa_system_eventfd_write(system, impl->source.fd, 1); + impl->thread_utils.iface = SPA_INTERFACE_INIT( + PW_TYPE_INTERFACE_ThreadUtils, + PW_VERSION_THREAD_UTILS, + &impl_thread_utils, impl); + + pw_thread_utils_set_impl(&impl->thread_utils); pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); diff --git a/src/pipewire/context.c b/src/pipewire/context.c index 91f99d308..1aa6a5cf5 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -366,9 +366,6 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop, fill_properties(this); - if ((res = pw_data_loop_start(this->data_loop_impl)) < 0) - goto error_free; - if ((res = pw_context_parse_conf_section(this, conf, "context.spa-libs")) < 0) goto error_free; pw_log_info(NAME" %p: parsed %d context.spa-libs items", this, res); @@ -385,6 +382,9 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop, goto error_free; pw_log_info(NAME" %p: parsed %d context.exec items", this, res); + if ((res = pw_data_loop_start(this->data_loop_impl)) < 0) + goto error_free; + pw_settings_init(this); pw_log_debug(NAME" %p: created", this); @@ -435,6 +435,9 @@ void pw_context_destroy(struct pw_context *context) spa_list_consume(resource, &context->registry_resource_list, link) pw_resource_destroy(resource); + if (context->data_loop_impl) + pw_data_loop_destroy(context->data_loop_impl); + spa_list_consume(module, &context->module_list, link) pw_impl_module_destroy(module); @@ -453,9 +456,6 @@ void pw_context_destroy(struct pw_context *context) if (context->pool) pw_mempool_destroy(context->pool); - if (context->data_loop_impl) - pw_data_loop_destroy(context->data_loop_impl); - if (context->work_queue) pw_work_queue_destroy(context->work_queue); diff --git a/src/pipewire/impl-module.c b/src/pipewire/impl-module.c index a2b769275..649a1982f 100644 --- a/src/pipewire/impl-module.c +++ b/src/pipewire/impl-module.c @@ -227,7 +227,7 @@ pw_context_load_module(struct pw_context *context, if (this->global == NULL) goto error_no_global; - spa_list_append(&context->module_list, &this->link); + spa_list_prepend(&context->module_list, &this->link); this->info.id = this->global->id; pw_properties_setf(this->properties, PW_KEY_OBJECT_ID, "%d", this->info.id);