spa: move thread to SPA support

It can be more generally useful eventually.
This commit is contained in:
Wim Taymans 2021-07-06 15:48:20 +02:00
parent 3a879e8b1a
commit 1ff535e6eb
9 changed files with 184 additions and 124 deletions

View file

@ -5485,14 +5485,14 @@ SPA_EXPORT
int jack_acquire_real_time_scheduling (jack_native_thread_t thread, int priority) int jack_acquire_real_time_scheduling (jack_native_thread_t thread, int priority)
{ {
pw_log_info("acquire"); pw_log_info("acquire");
return pw_thread_utils_acquire_rt((struct pw_thread*)thread, priority); return pw_thread_utils_acquire_rt((struct spa_thread*)thread, priority);
} }
SPA_EXPORT SPA_EXPORT
int jack_drop_real_time_scheduling (jack_native_thread_t thread) int jack_drop_real_time_scheduling (jack_native_thread_t thread)
{ {
pw_log_info("drop"); pw_log_info("drop");
return pw_thread_utils_drop_rt((struct pw_thread*)thread); return pw_thread_utils_drop_rt((struct spa_thread*)thread);
} }
/** /**
@ -5519,7 +5519,6 @@ int jack_client_create_thread (jack_client_t* client,
void *(*start_routine)(void*), void *(*start_routine)(void*),
void *arg) void *arg)
{ {
struct pw_thread *thr;
int res = 0; int res = 0;
spa_return_val_if_fail(client != NULL, -EINVAL); spa_return_val_if_fail(client != NULL, -EINVAL);
@ -5538,6 +5537,8 @@ int jack_client_create_thread (jack_client_t* client,
pthread_attr_destroy(&attributes); pthread_attr_destroy(&attributes);
} else { } else {
struct spa_thread *thr;
thr = pw_thread_utils_create(NULL, start_routine, arg); thr = pw_thread_utils_create(NULL, start_routine, arg);
if (thr == NULL) if (thr == NULL)
res = -errno; res = -errno;
@ -5566,7 +5567,7 @@ int jack_client_stop_thread(jack_client_t* client, jack_native_thread_t thread)
return -EINVAL; return -EINVAL;
pw_log_debug("join thread %lu", thread); pw_log_debug("join thread %lu", thread);
pw_thread_utils_join((struct pw_thread*)thread, &status); pw_thread_utils_join((struct spa_thread*)thread, &status);
pw_log_debug("stopped thread %lu", thread); pw_log_debug("stopped thread %lu", thread);
return 0; return 0;
} }
@ -5582,7 +5583,7 @@ int jack_client_kill_thread(jack_client_t* client, jack_native_thread_t thread)
pw_log_debug("cancel thread %lu", thread); pw_log_debug("cancel thread %lu", thread);
pthread_cancel(thread); pthread_cancel(thread);
pw_log_debug("join thread %lu", thread); pw_log_debug("join thread %lu", thread);
pw_thread_utils_join((struct pw_thread*)thread, &status); pw_thread_utils_join((struct spa_thread*)thread, &status);
pw_log_debug("stopped thread %lu", thread); pw_log_debug("stopped thread %lu", thread);
return 0; return 0;
} }

View file

@ -0,0 +1,129 @@
/* Simple Plugin API
*
* Copyright © 2021 Wim Taymans
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice (including the next
* paragraph) shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
#ifndef SPA_THREAD_H
#define SPA_THREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include <string.h>
#include <errno.h>
#include <spa/utils/defs.h>
#include <spa/utils/dict.h>
/**
* \addtogroup spa_support
* \{
*/
/** a thread object.
* This can be cast to a platform native thread, like pthread on posix systems
*/
#define SPA_TYPE_INFO_Thread SPA_TYPE_INFO_POINTER_BASE "Thread"
struct spa_thread;
#define SPA_TYPE_INTERFACE_ThreadUtils SPA_TYPE_INFO_INTERFACE_BASE "ThreadUtils"
#define SPA_VERSION_THREAD_UTILS 0
struct spa_thread_utils { struct spa_interface iface; };
/** thread utils */
struct spa_thread_utils_methods {
#define SPA_VERSION_THREAD_UTILS_METHODS 0
uint32_t version;
/** create a new thread that runs \a start with \a arg */
struct spa_thread * (*create) (void *data, const struct spa_dict *props,
void *(*start)(void*), void *arg);
/** stop and join a thread */
int (*join)(void *data, struct spa_thread *thread, void **retval);
/** get realtime priority range for threads created with \a props */
int (*get_rt_range) (void *data, const struct spa_dict *props, int *min, int *max);
/** acquire realtime priority */
int (*acquire_rt) (void *data, struct spa_thread *thread, int priority);
/** drop realtime priority */
int (*drop_rt) (void *data, struct spa_thread *thread);
};
static inline struct spa_thread *spa_thread_utils_create(struct spa_thread_utils *o,
const struct spa_dict *props, void *(*start_routine)(void*), void *arg)
{
struct spa_thread *res = NULL;
spa_interface_call_res(&o->iface,
struct spa_thread_utils_methods, res, create, 0,
props, start_routine, arg);
return res;
}
static inline int spa_thread_utils_join(struct spa_thread_utils *o,
struct spa_thread *thread, void **retval)
{
int res = -ENOTSUP;
spa_interface_call_res(&o->iface,
struct spa_thread_utils_methods, res, join, 0,
thread, retval);
return res;
}
static inline int spa_thread_utils_get_rt_range(struct spa_thread_utils *o,
const struct spa_dict *props, int *min, int *max)
{
int res = -ENOTSUP;
spa_interface_call_res(&o->iface,
struct spa_thread_utils_methods, res, get_rt_range, 0,
props, min, max);
return res;
}
static inline int spa_thread_utils_acquire_rt(struct spa_thread_utils *o,
struct spa_thread *thread, int priority)
{
int res = -ENOTSUP;
spa_interface_call_res(&o->iface,
struct spa_thread_utils_methods, res, acquire_rt, 0,
thread, priority);
return res;
}
static inline int spa_thread_utils_drop_rt(struct spa_thread_utils *o,
struct spa_thread *thread)
{
int res = -ENOTSUP;
spa_interface_call_res(&o->iface,
struct spa_thread_utils_methods, res, drop_rt, 0, thread);
return res;
}
/**
* \}
*/
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* SPA_THREAD_H */

View file

@ -72,7 +72,7 @@ static const struct spa_dict_item module_props[] = {
struct impl { struct impl {
struct pw_context *context; struct pw_context *context;
struct pw_thread_utils thread_utils; struct spa_thread_utils thread_utils;
int rt_prio; int rt_prio;
rlim_t rt_time_soft; rlim_t rt_time_soft;
@ -84,7 +84,7 @@ struct impl {
static void module_destroy(void *data) static void module_destroy(void *data)
{ {
struct impl *impl = data; struct impl *impl = data;
pw_thread_utils_set_impl(NULL); pw_thread_utils_set(NULL);
spa_hook_remove(&impl->module_listener); spa_hook_remove(&impl->module_listener);
free(impl); free(impl);
} }
@ -161,7 +161,7 @@ static int get_default_int(struct pw_properties *properties, const char *name, i
return val; return val;
} }
static struct pw_thread *impl_create(void *data, static struct spa_thread *impl_create(void *data,
const struct spa_dict *props, const struct spa_dict *props,
void *(*start)(void*), void *arg) void *(*start)(void*), void *arg)
{ {
@ -171,10 +171,10 @@ static struct pw_thread *impl_create(void *data,
errno = err; errno = err;
return NULL; return NULL;
} }
return (struct pw_thread*)pt; return (struct spa_thread*)pt;
} }
static int impl_join(void *data, struct pw_thread *thread, void **retval) static int impl_join(void *data, struct spa_thread *thread, void **retval)
{ {
pthread_t pt = (pthread_t)thread; pthread_t pt = (pthread_t)thread;
return pthread_join(pt, retval); return pthread_join(pt, retval);
@ -191,7 +191,7 @@ static int impl_get_rt_range(void *data, const struct spa_dict *props,
return 0; return 0;
} }
static int impl_acquire_rt(void *data, struct pw_thread *thread, int priority) static int impl_acquire_rt(void *data, struct spa_thread *thread, int priority)
{ {
int err, policy = DEFAULT_POLICY; int err, policy = DEFAULT_POLICY;
int rtprio = priority; int rtprio = priority;
@ -215,7 +215,7 @@ static int impl_acquire_rt(void *data, struct pw_thread *thread, int priority)
return 0; return 0;
} }
static int impl_drop_rt(void *data, struct pw_thread *thread) static int impl_drop_rt(void *data, struct spa_thread *thread)
{ {
struct sched_param sp; struct sched_param sp;
pthread_t pt = (pthread_t)thread; pthread_t pt = (pthread_t)thread;
@ -231,8 +231,8 @@ static int impl_drop_rt(void *data, struct pw_thread *thread)
return 0; return 0;
} }
static const struct pw_thread_utils_methods impl_thread_utils = { static const struct spa_thread_utils_methods impl_thread_utils = {
PW_VERSION_THREAD_UTILS_METHODS, SPA_VERSION_THREAD_UTILS_METHODS,
.create = impl_create, .create = impl_create,
.join = impl_join, .join = impl_join,
.get_rt_range = impl_get_rt_range, .get_rt_range = impl_get_rt_range,
@ -272,11 +272,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
set_rlimit(impl); set_rlimit(impl);
impl->thread_utils.iface = SPA_INTERFACE_INIT( impl->thread_utils.iface = SPA_INTERFACE_INIT(
PW_TYPE_INTERFACE_ThreadUtils, SPA_TYPE_INTERFACE_ThreadUtils,
PW_VERSION_THREAD_UTILS, SPA_VERSION_THREAD_UTILS,
&impl_thread_utils, impl); &impl_thread_utils, impl);
pw_thread_utils_set_impl(&impl->thread_utils); pw_thread_utils_set(&impl->thread_utils);
pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);

View file

@ -87,7 +87,7 @@ struct impl {
pthread_cond_t cond; pthread_cond_t cond;
struct spa_list threads_list; struct spa_list threads_list;
struct pw_thread_utils thread_utils; struct spa_thread_utils thread_utils;
int nice_level; int nice_level;
int rt_prio; int rt_prio;
@ -435,7 +435,7 @@ static void module_destroy(void *data)
{ {
struct impl *impl = data; struct impl *impl = data;
pw_thread_utils_set_impl(NULL); pw_thread_utils_set(NULL);
spa_hook_remove(&impl->module_listener); spa_hook_remove(&impl->module_listener);
pw_properties_free(impl->props); pw_properties_free(impl->props);
@ -525,7 +525,7 @@ static void *custom_start(void *data)
return this->start(this->arg); return this->start(this->arg);
} }
static struct pw_thread *impl_create(void *data, const struct spa_dict *props, static struct spa_thread *impl_create(void *data, const struct spa_dict *props,
void *(*start_routine)(void*), void *arg) void *(*start_routine)(void*), void *arg)
{ {
struct impl *impl = data; struct impl *impl = data;
@ -553,10 +553,10 @@ exit:
free(this); free(this);
return NULL; return NULL;
} }
return (struct pw_thread*)this->thread; return (struct spa_thread*)this->thread;
} }
static int impl_join(void *data, struct pw_thread *thread, void **retval) static int impl_join(void *data, struct spa_thread *thread, void **retval)
{ {
struct impl *impl = data; struct impl *impl = data;
pthread_t pt = (pthread_t)thread; pthread_t pt = (pthread_t)thread;
@ -598,7 +598,7 @@ static pid_t impl_gettid(struct impl *impl, pthread_t pt)
return pid; return pid;
} }
static int impl_acquire_rt(void *data, struct pw_thread *thread, int priority) static int impl_acquire_rt(void *data, struct spa_thread *thread, int priority)
{ {
struct impl *impl = data; struct impl *impl = data;
struct sched_param sp; struct sched_param sp;
@ -633,7 +633,7 @@ static int impl_acquire_rt(void *data, struct pw_thread *thread, int priority)
return 0; return 0;
} }
static int impl_drop_rt(void *data, struct pw_thread *thread) static int impl_drop_rt(void *data, struct spa_thread *thread)
{ {
struct sched_param sp; struct sched_param sp;
pthread_t pt = (pthread_t)thread; pthread_t pt = (pthread_t)thread;
@ -652,8 +652,8 @@ static int impl_drop_rt(void *data, struct pw_thread *thread)
return 0; return 0;
} }
static const struct pw_thread_utils_methods impl_thread_utils = { static const struct spa_thread_utils_methods impl_thread_utils = {
PW_VERSION_THREAD_UTILS_METHODS, SPA_VERSION_THREAD_UTILS_METHODS,
.create = impl_create, .create = impl_create,
.join = impl_join, .join = impl_join,
.get_rt_range = impl_get_rt_range, .get_rt_range = impl_get_rt_range,
@ -710,11 +710,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
set_rlimit(impl); set_rlimit(impl);
impl->thread_utils.iface = SPA_INTERFACE_INIT( impl->thread_utils.iface = SPA_INTERFACE_INIT(
PW_TYPE_INTERFACE_ThreadUtils, SPA_TYPE_INTERFACE_ThreadUtils,
PW_VERSION_THREAD_UTILS, SPA_VERSION_THREAD_UTILS,
&impl_thread_utils, impl); &impl_thread_utils, impl);
pw_thread_utils_set_impl(&impl->thread_utils); pw_thread_utils_set(&impl->thread_utils);
pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);

View file

@ -187,7 +187,7 @@ static int try_load_conf(struct pw_context *this, const char *conf_prefix,
static int context_set_freewheel(struct pw_context *context, bool freewheel) static int context_set_freewheel(struct pw_context *context, bool freewheel)
{ {
struct pw_thread *thr; struct spa_thread *thr;
int res; int res;
if ((thr = pw_data_loop_get_thread(context->data_loop_impl)) == NULL) if ((thr = pw_data_loop_get_thread(context->data_loop_impl)) == NULL)

View file

@ -260,12 +260,12 @@ bool pw_data_loop_in_thread(struct pw_data_loop * loop)
* \param loop the data loop to get the thread of * \param loop the data loop to get the thread of
* \return the thread object or NULL when the thread is not running * \return the thread object or NULL when the thread is not running
* *
* On posix based systems this returns a pthread_t * * On posix based systems this returns a pthread_t
*/ */
SPA_EXPORT SPA_EXPORT
struct pw_thread *pw_data_loop_get_thread(struct pw_data_loop * loop) struct spa_thread *pw_data_loop_get_thread(struct pw_data_loop * loop)
{ {
return loop->running ? (struct pw_thread*)loop->thread : NULL; return loop->running ? (struct spa_thread*)loop->thread : NULL;
} }
SPA_EXPORT SPA_EXPORT

View file

@ -87,7 +87,7 @@ int pw_data_loop_stop(struct pw_data_loop *loop);
/** Check if the current thread is the processing thread */ /** Check if the current thread is the processing thread */
bool pw_data_loop_in_thread(struct pw_data_loop *loop); bool pw_data_loop_in_thread(struct pw_data_loop *loop);
/** Get the thread object */ /** Get the thread object */
struct pw_thread *pw_data_loop_get_thread(struct pw_data_loop *loop); struct spa_thread *pw_data_loop_get_thread(struct pw_data_loop *loop);
/** invoke func in the context of the thread or in the caller thread when /** invoke func in the context of the thread or in the caller thread when
* the loop is not running. Since 0.3.3 */ * the loop is not running. Since 0.3.3 */

View file

@ -34,8 +34,7 @@
#include "thread.h" #include "thread.h"
static struct spa_thread *impl_create(void *data,
static struct pw_thread *impl_create(void *data,
const struct spa_dict *props, const struct spa_dict *props,
void *(*start)(void*), void *arg) void *(*start)(void*), void *arg)
{ {
@ -45,10 +44,10 @@ static struct pw_thread *impl_create(void *data,
errno = err; errno = err;
return NULL; return NULL;
} }
return (struct pw_thread*)pt; return (struct spa_thread*)pt;
} }
static int impl_join(void *data, struct pw_thread *thread, void **retval) static int impl_join(void *data, struct spa_thread *thread, void **retval)
{ {
pthread_t pt = (pthread_t)thread; pthread_t pt = (pthread_t)thread;
return pthread_join(pt, retval); return pthread_join(pt, retval);
@ -65,24 +64,24 @@ static int impl_get_rt_range(void *data, const struct spa_dict *props,
} }
static struct { static struct {
struct pw_thread_utils utils; struct spa_thread_utils utils;
struct pw_thread_utils_methods methods; struct spa_thread_utils_methods methods;
} default_impl = { } default_impl = {
{ { PW_TYPE_INTERFACE_ThreadUtils, { { SPA_TYPE_INTERFACE_ThreadUtils,
PW_VERSION_THREAD_UTILS, SPA_VERSION_THREAD_UTILS,
SPA_CALLBACKS_INIT(&default_impl.methods, SPA_CALLBACKS_INIT(&default_impl.methods,
&default_impl) } }, &default_impl) } },
{ PW_VERSION_THREAD_UTILS_METHODS, { SPA_VERSION_THREAD_UTILS_METHODS,
.create = impl_create, .create = impl_create,
.join = impl_join, .join = impl_join,
.get_rt_range = impl_get_rt_range .get_rt_range = impl_get_rt_range
} }
}; };
static struct pw_thread_utils *global_impl = &default_impl.utils; static struct spa_thread_utils *global_impl = &default_impl.utils;
SPA_EXPORT SPA_EXPORT
void pw_thread_utils_set_impl(struct pw_thread_utils *impl) void pw_thread_utils_set(struct spa_thread_utils *impl)
{ {
if (impl == NULL) if (impl == NULL)
impl = &default_impl.utils; impl = &default_impl.utils;
@ -90,7 +89,7 @@ void pw_thread_utils_set_impl(struct pw_thread_utils *impl)
} }
SPA_EXPORT SPA_EXPORT
struct pw_thread_utils *pw_thread_utils_get_impl(void) struct spa_thread_utils *pw_thread_utils_get(void)
{ {
return global_impl; return global_impl;
} }

View file

@ -32,90 +32,21 @@ extern "C" {
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <spa/utils/dict.h> #include <spa/support/thread.h>
#include <pipewire/type.h>
/** \defgroup pw_thread Thread related functions /** \defgroup spa_thread Thread related functions
* *
* \brief functions to manipulate threads * \brief functions to manipulate threads
*/ */
#define PW_TYPE_INTERFACE_ThreadUtils PW_TYPE_INFO_INTERFACE_BASE "ThreadUtils" void pw_thread_utils_set(struct spa_thread_utils *impl);
struct spa_thread_utils *pw_thread_utils_get(void);
/** a thread object. #define pw_thread_utils_create(...) spa_thread_utils_create(pw_thread_utils_get(), ##__VA_ARGS__)
* This can be cast to a platform native thread, like pthread on posix systems #define pw_thread_utils_join(...) spa_thread_utils_join(pw_thread_utils_get(), ##__VA_ARGS__)
*/ #define pw_thread_utils_get_rt_range(...) spa_thread_utils_get_rt_range(pw_thread_utils_get(), ##__VA_ARGS__)
struct pw_thread; #define pw_thread_utils_acquire_rt(...) spa_thread_utils_acquire_rt(pw_thread_utils_get(), ##__VA_ARGS__)
#define pw_thread_utils_drop_rt(...) spa_thread_utils_drop_rt(pw_thread_utils_get(), ##__VA_ARGS__)
#define PW_VERSION_THREAD_UTILS 0
struct pw_thread_utils { struct spa_interface iface; };
/** thread utils */
struct pw_thread_utils_methods {
#define PW_VERSION_THREAD_UTILS_METHODS 0
uint32_t version;
/** create a new thread that runs \a start with \a arg */
struct pw_thread * (*create) (void *data, const struct spa_dict *props,
void *(*start)(void*), void *arg);
/** stop and join a thread */
int (*join)(void *data, struct pw_thread *thread, void **retval);
/** get realtime priority range for threads created with \a props */
int (*get_rt_range) (void *data, const struct spa_dict *props, int *min, int *max);
/** acquire realtime priority */
int (*acquire_rt) (void *data, struct pw_thread *thread, int priority);
/** drop realtime priority */
int (*drop_rt) (void *data, struct pw_thread *thread);
};
void pw_thread_utils_set_impl(struct pw_thread_utils *impl);
struct pw_thread_utils *pw_thread_utils_get_impl(void);
static inline struct pw_thread *pw_thread_utils_create(const struct spa_dict *props,
void *(*start_routine)(void*), void *arg)
{
struct pw_thread *res = NULL;
spa_interface_call_res(&pw_thread_utils_get_impl()->iface,
struct pw_thread_utils_methods, res, create, 0,
props, start_routine, arg);
return res;
}
static inline int pw_thread_utils_join(struct pw_thread *thread, void **retval)
{
int res = -ENOTSUP;
spa_interface_call_res(&pw_thread_utils_get_impl()->iface,
struct pw_thread_utils_methods, res, join, 0,
thread, retval);
return res;
}
static inline int pw_thread_utils_get_rt_range(const struct spa_dict *props, int *min, int *max)
{
int res = -ENOTSUP;
spa_interface_call_res(&pw_thread_utils_get_impl()->iface,
struct pw_thread_utils_methods, res, get_rt_range, 0,
props, min, max);
return res;
}
static inline int pw_thread_utils_acquire_rt(struct pw_thread *thread, int priority)
{
int res = -ENOTSUP;
spa_interface_call_res(&pw_thread_utils_get_impl()->iface,
struct pw_thread_utils_methods, res, acquire_rt, 0,
thread, priority);
return res;
}
static inline int pw_thread_utils_drop_rt(struct pw_thread *thread)
{
int res = -ENOTSUP;
spa_interface_call_res(&pw_thread_utils_get_impl()->iface,
struct pw_thread_utils_methods, res, drop_rt, 0, thread);
return res;
}
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */