module-rt: do rtkit from a thread-loop

First check if we can adjust priorities with rlimits, if we can't and we
need to use rtkit, spawn a thread-loop to handle the rtkit stuff.

First do a pw_loop_invoke() to do the rtkit setup, including getting
some properties and then setting the nice level and rlimits.

Use pw_loop_invoke to queue rtkit realtime requests, this ensures we
always perform the requests after the setup and when the properties
are fetched.

See #3357
This commit is contained in:
Wim Taymans 2023-07-20 12:03:29 +02:00
parent d74153a231
commit 09277cbfa4

View file

@ -173,6 +173,7 @@ struct impl {
struct spa_thread_utils thread_utils; struct spa_thread_utils thread_utils;
struct rlimit rl;
int nice_level; int nice_level;
int rt_prio; int rt_prio;
rlim_t rt_time_soft; rlim_t rt_time_soft;
@ -191,6 +192,10 @@ struct impl {
const char* object_path; const char* object_path;
const char* interface; const char* interface;
struct pw_rtkit_bus *rtkit_bus; struct pw_rtkit_bus *rtkit_bus;
struct pw_thread_loop *thread_loop;
int max_rtprio;
int min_nice_level;
rlim_t rttime_max;
/* These are only for the RTKit implementation to fill in the `thread` /* These are only for the RTKit implementation to fill in the `thread`
* struct. Since there's barely any overhead here we'll do this * struct. Since there's barely any overhead here we'll do this
@ -383,35 +388,6 @@ finish:
return ret; return ret;
} }
static int pw_rtkit_get_max_realtime_priority(struct impl *impl)
{
long long retval;
int err;
err = rtkit_get_int_property(impl, "MaxRealtimePriority", &retval);
return err < 0 ? err : retval;
}
static int pw_rtkit_get_min_nice_level(struct impl *impl, int *min_nice_level)
{
long long retval;
int err;
err = rtkit_get_int_property(impl, "MinNiceLevel", &retval);
if (err >= 0)
*min_nice_level = retval;
return err;
}
static long long pw_rtkit_get_rttime_usec_max(struct impl *impl)
{
long long retval;
int err;
err = rtkit_get_int_property(impl, "RTTimeUSecMax", &retval);
return err < 0 ? err : retval;
}
static int pw_rtkit_make_realtime(struct impl *impl, pid_t thread, int priority) static int pw_rtkit_make_realtime(struct impl *impl, pid_t thread, int priority)
{ {
DBusMessage *m = NULL; DBusMessage *m = NULL;
@ -516,6 +492,8 @@ static void module_destroy(void *data)
#ifdef HAVE_DBUS #ifdef HAVE_DBUS
if (impl->rtkit_bus) if (impl->rtkit_bus)
pw_rtkit_bus_free(impl->rtkit_bus); pw_rtkit_bus_free(impl->rtkit_bus);
if (impl->thread_loop)
pw_thread_loop_destroy(impl->thread_loop);
#endif #endif
free(impl); free(impl);
@ -623,25 +601,19 @@ static int set_nice(struct impl *impl, int nice_level, bool warn)
#ifdef HAVE_DBUS #ifdef HAVE_DBUS
if (impl->use_rtkit) { if (impl->use_rtkit) {
int min_nice = nice_level; if (nice_level < impl->min_nice_level) {
pw_rtkit_get_min_nice_level(impl, &min_nice);
if (nice_level < min_nice) {
pw_log_info("clamped nice level %d to %d", pw_log_info("clamped nice level %d to %d",
nice_level, min_nice); nice_level, impl->min_nice_level);
nice_level = min_nice; nice_level = impl->min_nice_level;
} }
res = pw_rtkit_make_high_priority(impl, 0, nice_level); res = pw_rtkit_make_high_priority(impl, 0, nice_level);
} }
else if (impl->rlimits_enabled)
res = sched_set_nice(nice_level);
else else
res = -ENOTSUP; #endif
#else
if (impl->rlimits_enabled) if (impl->rlimits_enabled)
res = sched_set_nice(nice_level); res = sched_set_nice(nice_level);
else else
res = -ENOTSUP; res = -ENOTSUP;
#endif
if (res < 0) { if (res < 0) {
if (warn) if (warn)
@ -659,37 +631,16 @@ static int set_nice(struct impl *impl, int nice_level, bool warn)
static int set_rlimit(struct impl *impl) static int set_rlimit(struct impl *impl)
{ {
struct rlimit rl;
int res = 0; int res = 0;
spa_zero(rl); if (setrlimit(RLIMIT_RTTIME, &impl->rl) < 0)
rl.rlim_cur = impl->rt_time_soft;
rl.rlim_max = impl->rt_time_hard;
#ifdef HAVE_DBUS
if (impl->use_rtkit) {
long long rttime;
rttime = pw_rtkit_get_rttime_usec_max(impl);
if (rttime >= 0) {
if ((rlim_t)rttime < rl.rlim_cur) {
pw_log_debug("clamping rt.time.soft from %llu to %lld because of RTKit",
(long long)rl.rlim_cur, rttime);
}
rl.rlim_cur = SPA_MIN(rl.rlim_cur, (rlim_t)rttime);
rl.rlim_max = SPA_MIN(rl.rlim_max, (rlim_t)rttime);
}
}
#endif
if (setrlimit(RLIMIT_RTTIME, &rl) < 0)
res = -errno; res = -errno;
if (res < 0) if (res < 0)
pw_log_debug("setrlimit() failed: %s", spa_strerror(res)); pw_log_debug("setrlimit() failed: %s", spa_strerror(res));
else else
pw_log_debug("rt.time.soft:%"PRIi64" rt.time.hard:%"PRIi64, pw_log_debug("rt.time.soft:%"PRIi64" rt.time.hard:%"PRIi64,
(int64_t)rl.rlim_cur, (int64_t)rl.rlim_max); (int64_t)impl->rl.rlim_cur, (int64_t)impl->rl.rlim_max);
return res; return res;
} }
@ -815,8 +766,7 @@ static int get_rtkit_priority_range(struct impl *impl, int *min, int *max)
if (min) if (min)
*min = 1; *min = 1;
if (max) { if (max) {
if ((*max = pw_rtkit_get_max_realtime_priority(impl)) < 0) *max = impl->max_rtprio;
return *max;
if (*max < 1) if (*max < 1)
*max = 1; *max = 1;
} }
@ -850,47 +800,61 @@ static pid_t impl_gettid(struct impl *impl, pthread_t pt)
return pid; return pid;
} }
struct rt_params {
pid_t pid;
int priority;
};
static int do_make_realtime(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct impl *impl = user_data;
const struct rt_params *params = data;
int err, min, max, priority = params->priority;
pid_t pid = params->pid;
pw_log_info(".");
if ((err = get_rtkit_priority_range(impl, &min, &max)) < 0)
return err;
if (priority < min || priority > max) {
pw_log_info("clamping requested priority %d for thread %d "
"between %d and %d", priority, pid, min, max);
priority = SPA_CLAMP(priority, min, max);
}
if ((err = pw_rtkit_make_realtime(impl, pid, priority)) < 0) {
pw_log_warn("could not make thread %d realtime using RTKit: %s", pid, spa_strerror(err));
return err;
}
pw_log_info("acquired realtime priority %d for thread %d using RTKit", priority, pid);
return 0;
}
static int impl_acquire_rt(void *object, struct spa_thread *thread, int priority) static int impl_acquire_rt(void *object, struct spa_thread *thread, int priority)
{ {
struct impl *impl = object; struct impl *impl = object;
struct sched_param sp; struct sched_param sp;
int err;
pthread_t pt = (pthread_t)thread; pthread_t pt = (pthread_t)thread;
pid_t pid;
/* See the docstring on `spa_thread_utils_methods::acquire_rt` */ /* See the docstring on `spa_thread_utils_methods::acquire_rt` */
if (priority == -1) { if (priority == -1) {
priority = impl->rt_prio; priority = impl->rt_prio;
} }
if (impl->use_rtkit) { if (impl->use_rtkit) {
int min, max; struct rt_params params;
if ((err = get_rtkit_priority_range(impl, &min, &max)) < 0)
return err;
pid = impl_gettid(impl, pt);
if (priority < min || priority > max) {
pw_log_info("clamping requested priority %d for thread %d "
"between %d and %d", priority, pid, min, max);
priority = SPA_CLAMP(priority, min, max);
}
spa_zero(sp); spa_zero(sp);
sp.sched_priority = priority;
if (pthread_setschedparam(pt, SCHED_OTHER | PW_SCHED_RESET_ON_FORK, &sp) == 0) { if (pthread_setschedparam(pt, SCHED_OTHER | PW_SCHED_RESET_ON_FORK, &sp) == 0) {
pw_log_debug("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); pw_log_debug("SCHED_OTHER|SCHED_RESET_ON_FORK worked.");
} }
params.pid = impl_gettid(impl, pt);
params.priority = priority;
if ((err = pw_rtkit_make_realtime(impl, pid, priority)) < 0) { return pw_loop_invoke(pw_thread_loop_get_loop(impl->thread_loop),
pw_log_warn("could not make thread %d realtime using RTKit: %s", pid, spa_strerror(err)); do_make_realtime, 0, &params, sizeof(params), false, impl);
return err;
}
pw_log_info("acquired realtime priority %d for thread %d using RTKit", priority, pid);
return 0;
} else { } else {
return acquire_rt_sched(thread, priority); return acquire_rt_sched(thread, priority);
} }
@ -961,6 +925,75 @@ static int check_rtkit(struct impl *impl, struct pw_context *context, bool *can_
return 0; return 0;
} }
static int do_rtkit_setup(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct impl *impl = user_data;
int res;
long long retval;
pw_log_debug("enter dbus setup");
/* Checking xdg-desktop-portal. It works fine in all situations. */
if (impl->rtportal_enabled)
impl->rtkit_bus = pw_rtkit_bus_get_session();
else
pw_log_info("Portal Realtime disabled");
if (impl->rtkit_bus != NULL) {
if (pw_rtkit_check_xdg_portal(impl->rtkit_bus)) {
impl->service_name = XDG_PORTAL_SERVICE_NAME;
impl->object_path = XDG_PORTAL_OBJECT_PATH;
impl->interface = XDG_PORTAL_INTERFACE;
} else {
pw_log_info("found session bus but no portal, trying RTKit fallback");
pw_rtkit_bus_free(impl->rtkit_bus);
impl->rtkit_bus = NULL;
}
}
/* Failed to get xdg-desktop-portal, try to use rtkit. */
if (impl->rtkit_bus == NULL) {
if (impl->rtkit_enabled)
impl->rtkit_bus = pw_rtkit_bus_get_system();
else
pw_log_info("RTkit disabled");
if (impl->rtkit_bus != NULL) {
impl->service_name = RTKIT_SERVICE_NAME;
impl->object_path = RTKIT_OBJECT_PATH;
impl->interface = RTKIT_INTERFACE;
} else {
res = -errno;
pw_log_warn("Realtime scheduling disabled: unsufficient realtime privileges, "
"Portal not found on session bus, and no system bus for RTKit: %m");
return res;
}
}
/* get some properties */
if (rtkit_get_int_property(impl, "MaxRealtimePriority", &retval) >= 0)
impl->max_rtprio = retval;
if (rtkit_get_int_property(impl, "MinNiceLevel", &retval) >= 0)
impl->min_nice_level = retval;
if (rtkit_get_int_property(impl, "RTTimeUSecMax", &retval) >= 0)
impl->rttime_max = retval;
/* Retry set_nice with rtkit */
if (IS_VALID_NICE_LEVEL(impl->nice_level))
set_nice(impl, impl->nice_level, true);
/* Set rlimit with rtkit limits */
if (impl->rttime_max < impl->rl.rlim_cur) {
pw_log_debug("clamping rt.time.soft from %llu to %lld because of RTKit",
(long long)impl->rl.rlim_cur, (long long)impl->rttime_max);
}
impl->rl.rlim_cur = SPA_MIN(impl->rl.rlim_cur, impl->rttime_max);
impl->rl.rlim_max = SPA_MIN(impl->rl.rlim_max, impl->rttime_max);
set_rlimit(impl);
return 0;
}
#endif /* HAVE_DBUS */ #endif /* HAVE_DBUS */
SPA_EXPORT SPA_EXPORT
@ -994,6 +1027,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->rtportal_enabled = pw_properties_get_bool(props, "rtportal.enabled", true); impl->rtportal_enabled = pw_properties_get_bool(props, "rtportal.enabled", true);
impl->rtkit_enabled = pw_properties_get_bool(props, "rtkit.enabled", true); impl->rtkit_enabled = pw_properties_get_bool(props, "rtkit.enabled", true);
impl->rl.rlim_cur = impl->rt_time_soft;
impl->rl.rlim_max = impl->rt_time_hard;
bool can_use_rtkit = false, use_rtkit = false; bool can_use_rtkit = false, use_rtkit = false;
if (!IS_VALID_NICE_LEVEL(impl->nice_level)) { if (!IS_VALID_NICE_LEVEL(impl->nice_level)) {
@ -1009,6 +1045,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((res = check_rtkit(impl, context, &can_use_rtkit)) < 0) if ((res = check_rtkit(impl, context, &can_use_rtkit)) < 0)
goto error; goto error;
#endif #endif
/* If the user has permissions to use regular realtime scheduling, as well as /* If the user has permissions to use regular realtime scheduling, as well as
* the nice level we want, then we'll use that instead of RTKit */ * the nice level we want, then we'll use that instead of RTKit */
@ -1026,50 +1063,31 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (set_nice(impl, impl->nice_level, !can_use_rtkit) < 0) if (set_nice(impl, impl->nice_level, !can_use_rtkit) < 0)
use_rtkit = can_use_rtkit; use_rtkit = can_use_rtkit;
} }
if (!use_rtkit)
set_rlimit(impl);
#ifdef HAVE_DBUS #ifdef HAVE_DBUS
impl->use_rtkit = use_rtkit; impl->use_rtkit = use_rtkit;
if (impl->use_rtkit) { if (impl->use_rtkit) {
/* Checking xdg-desktop-portal. It works fine in all situations. */ impl->thread_loop = pw_thread_loop_new("module-rt", NULL);
if (impl->rtportal_enabled) if (impl->thread_loop == NULL) {
impl->rtkit_bus = pw_rtkit_bus_get_session(); res = -errno;
else goto error;
pw_log_info("Portal Realtime disabled");
if (impl->rtkit_bus != NULL) {
if (pw_rtkit_check_xdg_portal(impl->rtkit_bus)) {
impl->service_name = XDG_PORTAL_SERVICE_NAME;
impl->object_path = XDG_PORTAL_OBJECT_PATH;
impl->interface = XDG_PORTAL_INTERFACE;
} else {
pw_log_info("found session bus but no portal, trying RTKit fallback");
pw_rtkit_bus_free(impl->rtkit_bus);
impl->rtkit_bus = NULL;
}
} }
/* Failed to get xdg-desktop-portal, try to use rtkit. */ pw_thread_loop_lock(impl->thread_loop);
if (impl->rtkit_bus == NULL) { pw_thread_loop_start(impl->thread_loop);
if (impl->rtkit_enabled) pw_thread_loop_unlock(impl->thread_loop);
impl->rtkit_bus = pw_rtkit_bus_get_system();
else
pw_log_info("RTkit disabled");
if (impl->rtkit_bus != NULL) { pw_loop_invoke(pw_thread_loop_get_loop(impl->thread_loop),
impl->service_name = RTKIT_SERVICE_NAME; do_rtkit_setup, 0, NULL, 0, false, impl);
impl->object_path = RTKIT_OBJECT_PATH;
impl->interface = RTKIT_INTERFACE; pw_log_debug("initialized using RTKit");
} else { } else {
res = -errno; pw_log_debug("initialized using regular realtime scheduling");
pw_log_warn("Realtime scheduling disabled: unsufficient realtime privileges, "
"Portal not found on session bus, and no system bus for RTKit: %m");
goto error;
}
}
/* Retry set_nice with rtkit */
if (IS_VALID_NICE_LEVEL(impl->nice_level))
set_nice(impl, impl->nice_level, true);
} }
#else
pw_log_debug("initialized using regular realtime scheduling");
#endif #endif
set_rlimit(impl);
impl->thread_utils.iface = SPA_INTERFACE_INIT( impl->thread_utils.iface = SPA_INTERFACE_INIT(
SPA_TYPE_INTERFACE_ThreadUtils, SPA_TYPE_INTERFACE_ThreadUtils,
@ -1084,22 +1102,14 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props));
pw_impl_module_update_properties(module, &props->dict); pw_impl_module_update_properties(module, &props->dict);
#ifdef HAVE_DBUS
if (impl->use_rtkit) {
pw_log_debug("initialized using RTKit");
} else {
pw_log_debug("initialized using regular realtime scheduling");
}
#else
pw_log_debug("initialized using regular realtime scheduling");
#endif
goto done; goto done;
error: error:
#ifdef HAVE_DBUS #ifdef HAVE_DBUS
if (impl->rtkit_bus) if (impl->rtkit_bus)
pw_rtkit_bus_free(impl->rtkit_bus); pw_rtkit_bus_free(impl->rtkit_bus);
if (impl->thread_loop)
pw_thread_loop_destroy(impl->thread_loop);
#endif #endif
free(impl); free(impl);
done: done: