context: add support for multiple data loops

Add config options to create and start multiple data loops, each with
their own priority and thread affinity if requested.

Make it possible to assign loop.classes to the data-loops. Use the
node.loop.class to find a data-loop for the node of the same class. Try
to evenly spread the nodes over the available matching loops.

With this, it is possible to separate the processing of the nodes
depending on the classes, like audio/video and improve concurency on
the server.

No attempt is done yet to move nodes between loops or to move
independent nodes to separate data loops.

Fixes #3969
This commit is contained in:
Wim Taymans 2024-04-19 14:57:53 +02:00
parent e85bb7194b
commit a4bfdd7f82
10 changed files with 331 additions and 63 deletions

View file

@ -22,6 +22,20 @@ context.properties = {
#log.level = 2
#cpu.zero.denormals = false
#loop.rt-prio = -1 # -1 = use module-rt prio, 0 disable rt
#loop.class = data.rt
#thread.affinity = [ 0 1 ] # optional array of CPUs
#context.num-data-loops = 1 # -1 = num-cpus, 0 = no data loops
#
#context.data-loops = [
# { loop.rt-prio = -1
# loop.class = [ data.rt audio.rt ]
# #library.name.system = support/libspa-support
# thread.name = data-loop.0
# #thread.affinity = [ 0 1 ] # optional array of CPUs
# }
#]
core.daemon = true # listening for socket connections
core.name = pipewire-0 # core name and socket name

View file

@ -9,6 +9,7 @@
#include <regex.h>
#include <limits.h>
#include <sys/mman.h>
#include <fnmatch.h>
#include <pipewire/log.h>
@ -20,6 +21,8 @@
#include <spa/utils/atomic.h>
#include <spa/utils/names.h>
#include <spa/utils/string.h>
#include <spa/utils/json.h>
#include <spa/utils/cleanup.h>
#include <spa/debug/types.h>
#include <pipewire/impl.h>
@ -34,6 +37,14 @@ PW_LOG_TOPIC_EXTERN(log_context);
#define MAX_HOPS 64
#define MAX_SYNC 4u
#define MAX_LOOPS 64u
#define DEFAULT_DATA_LOOPS 1
struct data_loop {
struct pw_data_loop *impl;
int ref;
};
/** \cond */
struct impl {
@ -43,7 +54,10 @@ struct impl {
unsigned int recalc:1;
unsigned int recalc_pending:1;
struct pw_data_loop *data_loop_impl;
uint32_t cpu_count;
uint32_t n_data_loops;
struct data_loop data_loops[MAX_LOOPS];
};
@ -91,24 +105,26 @@ static int context_set_freewheel(struct pw_context *context, bool freewheel)
{
struct impl *impl = SPA_CONTAINER_OF(context, struct impl, this);
struct spa_thread *thr;
uint32_t i;
int res = 0;
if ((thr = pw_data_loop_get_thread(impl->data_loop_impl)) == NULL)
return -EIO;
for (i = 0; i < impl->n_data_loops; i++) {
if ((thr = pw_data_loop_get_thread(impl->data_loops[i].impl)) == NULL)
return -EIO;
if (freewheel) {
pw_log_info("%p: enter freewheel", context);
if (context->thread_utils)
res = spa_thread_utils_drop_rt(context->thread_utils, thr);
} else {
pw_log_info("%p: exit freewheel", context);
/* Use the priority as configured within the realtime module */
if (context->thread_utils)
res = spa_thread_utils_acquire_rt(context->thread_utils, thr, -1);
if (freewheel) {
pw_log_info("%p: enter freewheel", context);
if (context->thread_utils)
res = spa_thread_utils_drop_rt(context->thread_utils, thr);
} else {
pw_log_info("%p: exit freewheel", context);
/* Use the priority as configured within the realtime module */
if (context->thread_utils)
res = spa_thread_utils_acquire_rt(context->thread_utils, thr, -1);
}
if (res < 0)
pw_log_info("%p: freewheel error:%s", context, spa_strerror(res));
}
if (res < 0)
pw_log_info("%p: freewheel error:%s", context, spa_strerror(res));
context->freewheeling = freewheel;
return res;
@ -164,6 +180,96 @@ static int do_data_loop_setup(struct spa_loop *loop, bool async, uint32_t seq,
return 0;
}
static int setup_data_loops(struct impl *impl)
{
struct pw_properties *pr;
struct pw_context *this = &impl->this;
const char *str, *lib_name;
uint32_t i;
int res = 0;
pr = pw_properties_copy(this->properties);
lib_name = pw_properties_get(this->properties, "context.data-loop." PW_KEY_LIBRARY_NAME_SYSTEM);
if ((str = pw_properties_get(this->properties, "context.data-loops")) != NULL) {
struct spa_json it[4];
char key[512];
int r, len = strlen(str);
spa_autofree char *s = strndup(str, len);
i = 0;
spa_json_init(&it[0], s, len);
if (spa_json_enter_array(&it[0], &it[1]) < 0) {
pw_log_error("context.data-loops is not an array in '%s'", str);
res = -EINVAL;
goto exit;
}
while ((r = spa_json_enter_object(&it[1], &it[2])) > 0) {
char *props = NULL;
if (i >= MAX_LOOPS) {
pw_log_warn("too many context.data-loops, using first %d",
MAX_LOOPS);
break;
}
pw_properties_clear(pr);
pw_properties_update(pr, &this->properties->dict);
pw_properties_set(pr, PW_KEY_LIBRARY_NAME_SYSTEM, lib_name);
while (spa_json_get_string(&it[2], key, sizeof(key)) > 0) {
const char *val;
int l;
if ((l = spa_json_next(&it[2], &val)) <= 0) {
pw_log_warn("malformed data-loop: key '%s' has no "
"value in '%.*s'", key, (int)len, str);
break;
}
if (spa_json_is_container(val, l))
l = spa_json_container_len(&it[2], val, l);
props = (char*)val;
spa_json_parse_stringn(val, l, props, l+1);
pw_properties_set(pr, key, props);
pw_log_info("loop %d: \"%s\" = %s", i, key, props);
}
impl->data_loops[i].impl = pw_data_loop_new(&pr->dict);
if (impl->data_loops[i].impl == NULL) {
res = -errno;
goto exit;
}
i++;
}
impl->n_data_loops = i;
} else {
int32_t count = pw_properties_get_int32(pr, "context.num-data-loops",
DEFAULT_DATA_LOOPS);
if (count < 0)
count = impl->cpu_count;
impl->n_data_loops = count;
if (impl->n_data_loops > MAX_LOOPS) {
pw_log_warn("too many context.num-data-loops: %d, using %d",
impl->n_data_loops, MAX_LOOPS);
impl->n_data_loops = MAX_LOOPS;
}
for (i = 0; i < impl->n_data_loops; i++) {
pw_properties_setf(pr, SPA_KEY_THREAD_NAME, "data-loop.%d", i);
impl->data_loops[i].impl = pw_data_loop_new(&pr->dict);
if (impl->data_loops[i].impl == NULL) {
res = -errno;
goto exit;
}
}
}
pw_log_info("created %d data-loops", impl->n_data_loops);
exit:
pw_properties_free(pr);
return res;
}
/** Create a new context object
*
* \param main_loop the main loop to use
@ -180,8 +286,8 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop,
struct pw_context *this;
const char *lib, *str;
void *dbus_iface = NULL;
uint32_t n_support, vm_type;
struct pw_properties *pr, *conf;
uint32_t i, n_support, vm_type;
struct pw_properties *conf;
struct spa_cpu *cpu;
int res = 0;
@ -267,6 +373,7 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop,
if (pw_properties_get(properties, PW_KEY_CPU_MAX_ALIGN) == NULL)
pw_properties_setf(properties, PW_KEY_CPU_MAX_ALIGN,
"%u", spa_cpu_get_max_align(cpu));
impl->cpu_count = spa_cpu_get_count(cpu);
}
if (getenv("PIPEWIRE_DEBUG") == NULL &&
@ -285,16 +392,8 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop,
pw_settings_init(this);
this->settings = this->defaults;
pr = pw_properties_copy(properties);
if ((str = pw_properties_get(pr, "context.data-loop." PW_KEY_LIBRARY_NAME_SYSTEM)))
pw_properties_set(pr, PW_KEY_LIBRARY_NAME_SYSTEM, str);
impl->data_loop_impl = pw_data_loop_new(&pr->dict);
pw_properties_free(pr);
if (impl->data_loop_impl == NULL) {
res = -errno;
if ((res = setup_data_loops(impl)) < 0)
goto error_free;
}
this->pool = pw_mempool_new(NULL);
if (this->pool == NULL) {
@ -302,9 +401,7 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop,
goto error_free;
}
this->data_loop = pw_data_loop_get_loop(impl->data_loop_impl);
this->main_loop = main_loop;
this->work_queue = pw_work_queue_new(this->main_loop);
if (this->work_queue == NULL) {
res = -errno;
@ -316,8 +413,13 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop,
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_System, this->main_loop->system);
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_Loop, this->main_loop->loop);
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_LoopUtils, this->main_loop->utils);
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataSystem, this->data_loop->system);
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataLoop, this->data_loop->loop);
if (impl->n_data_loops > 0) {
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataSystem, impl->data_loops[0].impl->loop->system);
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataLoop, impl->data_loops[0].impl->loop->loop);
} else {
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataSystem, this->main_loop->system);
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataLoop, this->main_loop->loop);
}
this->support[n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_PluginLoader, &impl->plugin_loader);
if ((str = pw_properties_get(properties, "support.dbus")) == NULL ||
@ -367,11 +469,13 @@ struct pw_context *pw_context_new(struct pw_loop *main_loop,
goto error_free;
pw_log_info("%p: parsed %d context.exec items", this, res);
if ((res = pw_data_loop_start(impl->data_loop_impl)) < 0)
goto error_free;
for (i = 0; i < impl->n_data_loops; i++) {
if ((res = pw_data_loop_start(impl->data_loops[i].impl)) < 0)
goto error_free;
pw_data_loop_invoke(impl->data_loop_impl,
do_data_loop_setup, 0, NULL, 0, false, this);
pw_data_loop_invoke(impl->data_loops[i].impl,
do_data_loop_setup, 0, NULL, 0, false, this);
}
pw_settings_expose(this);
@ -404,6 +508,7 @@ void pw_context_destroy(struct pw_context *context)
struct factory_entry *entry;
struct pw_impl_metadata *metadata;
struct pw_impl_core *core_impl;
uint32_t i;
pw_log_debug("%p: destroy", context);
pw_context_emit_destroy(context);
@ -423,8 +528,10 @@ void pw_context_destroy(struct pw_context *context)
spa_list_consume(resource, &context->registry_resource_list, link)
pw_resource_destroy(resource);
if (impl->data_loop_impl)
pw_data_loop_stop(impl->data_loop_impl);
for (i = 0; i < impl->n_data_loops; i++) {
if (impl->data_loops[i].impl)
pw_data_loop_stop(impl->data_loops[i].impl);
}
spa_list_consume(module, &context->module_list, link)
pw_impl_module_destroy(module);
@ -441,8 +548,11 @@ void pw_context_destroy(struct pw_context *context)
pw_log_debug("%p: free", context);
pw_context_emit_free(context);
if (impl->data_loop_impl)
pw_data_loop_destroy(impl->data_loop_impl);
for (i = 0; i < impl->n_data_loops; i++) {
if (impl->data_loops[i].impl)
pw_data_loop_destroy(impl->data_loops[i].impl);
}
if (context->pool)
pw_mempool_destroy(context->pool);
@ -506,18 +616,86 @@ SPA_EXPORT
struct pw_data_loop *pw_context_get_data_loop(struct pw_context *context)
{
struct impl *impl = SPA_CONTAINER_OF(context, struct impl, this);
return impl->data_loop_impl;
if (impl->n_data_loops > 0)
return impl->data_loops[0].impl;
else
return NULL;
}
SPA_EXPORT
struct pw_loop *pw_context_find_loop(struct pw_context *context, const char *name)
struct pw_loop *pw_context_acquire_loop(struct pw_context *context, const struct spa_dict *props)
{
if (spa_strstartswith(name, "main-loop."))
struct impl *impl = SPA_CONTAINER_OF(context, struct impl, this);
const char *name, *klass;
uint32_t i, j;
struct data_loop *best_loop = NULL;
int best_score = 0;
name = spa_dict_lookup(props, PW_KEY_NODE_LOOP_NAME);
klass = spa_dict_lookup(props, PW_KEY_NODE_LOOP_CLASS);
pw_log_info("looking for name:'%s' class:'%s'", name, klass);
if ((impl->n_data_loops == 0) ||
(name && fnmatch(name, "main-loop.0", FNM_EXTMATCH) == 0) ||
(klass && fnmatch(klass, "main", FNM_EXTMATCH) == 0)) {
pw_log_info("using main loop num-data-loops:%d", impl->n_data_loops);
return context->main_loop;
else if (spa_strstartswith(name, "data-loop."))
return context->data_loop;
else
}
if (klass == NULL)
klass = "data.rt";
for (i = 0; i < impl->n_data_loops; i++) {
struct data_loop *l = &impl->data_loops[i];
int score = 0;
if (name && l->impl->name && fnmatch(name, l->impl->name, FNM_EXTMATCH) == 0)
score += 2;
if (klass && l->impl->classes) {
for (j = 0; l->impl->classes[j]; j++) {
if (fnmatch(klass, l->impl->classes[j], FNM_EXTMATCH) == 0) {
score += 1;
break;
}
}
}
pw_log_debug("%d: name:'%s' class:'%s' score:%d ref:%d", i,
l->impl->name, l->impl->class, score, l->ref);
if ((best_loop == NULL) ||
(score > best_score) ||
(score == best_score && l->ref < best_loop->ref)) {
best_loop = l;
best_score = score;
}
}
if (best_loop == NULL)
return NULL;
best_loop->ref++;
pw_log_info("using name:'%s' class:'%s' ref:%d", best_loop->impl->name,
best_loop->impl->class, best_loop->ref);
return best_loop->impl->loop;
}
SPA_EXPORT
void pw_context_release_loop(struct pw_context *context, struct pw_loop *loop)
{
struct impl *impl = SPA_CONTAINER_OF(context, struct impl, this);
uint32_t i;
for (i = 0; i < impl->n_data_loops; i++) {
struct data_loop *l = &impl->data_loops[i];
if (l->impl->loop == loop) {
l->ref--;
pw_log_info("release name:'%s' class:'%s' ref:%d", l->impl->name,
l->impl->class, l->ref);
return;
}
}
}
SPA_EXPORT
@ -1787,10 +1965,15 @@ int pw_context_set_object(struct pw_context *context, const char *type, void *va
entry->value = value;
}
if (spa_streq(type, SPA_TYPE_INTERFACE_ThreadUtils)) {
uint32_t i;
context->thread_utils = value;
if (impl->data_loop_impl)
pw_data_loop_set_thread_utils(impl->data_loop_impl,
context->thread_utils);
for (i = 0; i < impl->n_data_loops; i++) {
if (impl->data_loops[i].impl)
pw_data_loop_set_thread_utils(impl->data_loops[i].impl,
context->thread_utils);
}
}
return 0;
}

View file

@ -4,12 +4,14 @@
#include <pthread.h>
#include <errno.h>
#include <limits.h>
#include <sys/resource.h>
#include "pipewire/log.h"
#include "pipewire/data-loop.h"
#include "pipewire/private.h"
#include "pipewire/thread.h"
#include "pipewire/utils.h"
PW_LOG_TOPIC_EXTERN(log_data_loop);
#define PW_LOG_TOPIC_DEFAULT log_data_loop
@ -86,7 +88,7 @@ static int do_stop(struct spa_loop *loop, bool async, uint32_t seq,
static struct pw_data_loop *loop_new(struct pw_loop *loop, const struct spa_dict *props)
{
struct pw_data_loop *this;
const char *str;
const char *str, *name = NULL, *class = NULL;
int res;
this = calloc(1, sizeof(struct pw_data_loop));
@ -107,15 +109,28 @@ static struct pw_data_loop *loop_new(struct pw_loop *loop, const struct spa_dict
goto error_free;
}
this->loop = loop;
this->rt_prio = -1;
if (props != NULL) {
if ((str = spa_dict_lookup(props, "loop.cancel")) != NULL)
this->cancel = pw_properties_parse_bool(str);
if ((str = spa_dict_lookup(props, "loop.class")) != NULL)
class = str;
if ((str = spa_dict_lookup(props, "loop.rt-prio")) != NULL)
this->rt_prio = atoi(str);
if ((str = spa_dict_lookup(props, SPA_KEY_THREAD_NAME)) != NULL)
this->name = strdup(str);
name = str;
if ((str = spa_dict_lookup(props, SPA_KEY_THREAD_AFFINITY)) != NULL)
this->affinity = strdup(str);
}
if (class == NULL)
class = this->rt_prio != 0 ? "data.rt" : "data";
if (name == NULL)
name = "pw-data-loop";
this->class = strdup(class);
this->classes = pw_strv_parse(class, strlen(class), INT_MAX, NULL);
this->name = strdup(name);
spa_hook_list_init(&this->listener_list);
return this;
@ -157,6 +172,8 @@ void pw_data_loop_destroy(struct pw_data_loop *loop)
free(loop->name);
free(loop->affinity);
free(loop->class);
pw_free_strv(loop->classes);
free(loop);
}
@ -197,8 +214,9 @@ int pw_data_loop_start(struct pw_data_loop *loop)
if ((utils = loop->thread_utils) == NULL)
utils = pw_thread_utils_get();
items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_THREAD_NAME,
loop->name ? loop->name : "pw-data-loop");
if (loop->name)
items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_THREAD_NAME,
loop->name);
if (loop->affinity)
items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_THREAD_AFFINITY,
loop->affinity);
@ -210,7 +228,8 @@ int pw_data_loop_start(struct pw_data_loop *loop)
loop->running = false;
return -errno;
}
spa_thread_utils_acquire_rt(utils, thr, -1);
if (loop->rt_prio != 0)
spa_thread_utils_acquire_rt(utils, thr, loop->rt_prio);
}
return 0;
}

View file

@ -1597,7 +1597,7 @@ pw_filter_connect(struct pw_filter *filter,
filter_set_state(filter, PW_FILTER_STATE_CONNECTING, 0, NULL);
if (!SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_RT_PROCESS)) {
pw_properties_set(filter->properties, PW_KEY_NODE_DATA_LOOP, "main-loop.*");
pw_properties_set(filter->properties, PW_KEY_NODE_LOOP_CLASS, "main");
pw_properties_set(filter->properties, PW_KEY_NODE_ASYNC, "true");
}
if (flags & PW_FILTER_FLAG_DRIVER)

View file

@ -1452,7 +1452,6 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
{
struct impl *impl;
struct pw_impl_node *this;
const char *str;
size_t size;
int res;
@ -1477,12 +1476,9 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
goto error_clean;
}
if ((str = pw_properties_get(properties, PW_KEY_NODE_DATA_LOOP)) == NULL)
str = "data-loop.*";
this->data_loop = pw_context_find_loop(context, str);
this->data_loop = pw_context_acquire_loop(context, &properties->dict);
if (this->data_loop == NULL) {
pw_log_error("unknown data-loop name '%s'", str);
pw_log_error("can't find data-loop");
res = -ENOENT;
goto error_clean;
}
@ -1565,6 +1561,8 @@ error_clean:
pw_memblock_unref(this->activation);
if (this->source.fd != -1)
spa_system_close(this->data_system, this->source.fd);
if (this->data_loop)
pw_context_release_loop(context, this->data_loop);
free(this->name);
free(impl);
error_exit:
@ -2262,6 +2260,10 @@ void pw_impl_node_destroy(struct pw_impl_node *node)
clear_info(node);
spa_system_close(node->data_system, node->source.fd);
if (node->data_loop)
pw_context_release_loop(context, node->data_loop);
free(impl->group);
free(impl->link_group);
free(impl->sync_group);

View file

@ -183,7 +183,8 @@ extern "C" {
#define PW_KEY_NODE_TRANSPORT_SYNC "node.transport.sync" /**< the node handles transport sync */
#define PW_KEY_NODE_DRIVER "node.driver" /**< node can drive the graph */
#define PW_KEY_NODE_ASYNC "node.async" /**< the node wants async scheduling */
#define PW_KEY_NODE_DATA_LOOP "node.data-loop" /**< the data loops to run in */
#define PW_KEY_NODE_LOOP_NAME "node.loop.name" /**< the loop name to run in */
#define PW_KEY_NODE_LOOP_CLASS "node.loop.class" /**< the loop class to run in */
#define PW_KEY_NODE_STREAM "node.stream" /**< node is a stream, the server side should
* add a converter */
#define PW_KEY_NODE_VIRTUAL "node.virtual" /**< the node is some sort of virtual

View file

@ -419,7 +419,6 @@ struct pw_context {
struct spa_thread_utils *thread_utils;
struct pw_loop *main_loop; /**< main loop for control */
struct pw_loop *data_loop; /**< data loop for data passing */
struct pw_work_queue *work_queue; /**< work queue */
struct spa_support support[16]; /**< support for spa plugins */
@ -444,6 +443,9 @@ struct pw_data_loop {
char *name;
char *affinity;
char *class;
char **classes;
int rt_prio;
struct spa_hook_list listener_list;
struct spa_thread_utils *thread_utils;
@ -1181,7 +1183,9 @@ int pw_proxy_init(struct pw_proxy *proxy, struct pw_core *core, const char *type
void pw_proxy_remove(struct pw_proxy *proxy);
int pw_context_recalc_graph(struct pw_context *context, const char *reason);
struct pw_loop *pw_context_find_loop(struct pw_context *context, const char *name);
struct pw_loop *pw_context_acquire_loop(struct pw_context *context, const struct spa_dict *props);
void pw_context_release_loop(struct pw_context *context, struct pw_loop *loop);
void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info);

View file

@ -1957,7 +1957,7 @@ pw_stream_connect(struct pw_stream *stream,
pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true");
if (!SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS)) {
pw_properties_set(stream->properties, PW_KEY_NODE_DATA_LOOP, "main-loop.*");
pw_properties_set(stream->properties, PW_KEY_NODE_LOOP_CLASS, "main");
pw_properties_set(stream->properties, PW_KEY_NODE_ASYNC, "true");
}
if (flags & PW_STREAM_FLAG_DRIVER)