From 2fb38af3e0464fcb2656ec40c16935f541570afc Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 19 Feb 2026 14:25:03 +0100 Subject: [PATCH] modules: move the scheduler to a module Mostly because we can but also because there are more ways of doing the scheduling and this opens the door for some experimentation. --- src/daemon/minimal.conf.in | 4 + src/daemon/pipewire.conf.in | 4 + src/modules/meson.build | 10 + src/modules/module-scheduler-v1.c | 1009 +++++++++++++++++++++++++++++ src/pipewire/context.c | 847 +----------------------- src/pipewire/context.h | 5 +- src/pipewire/impl-link.c | 1 + src/pipewire/private.h | 3 + 8 files changed, 1041 insertions(+), 842 deletions(-) create mode 100644 src/modules/module-scheduler-v1.c diff --git a/src/daemon/minimal.conf.in b/src/daemon/minimal.conf.in index 82647e9ca..6def01bcf 100644 --- a/src/daemon/minimal.conf.in +++ b/src/daemon/minimal.conf.in @@ -100,6 +100,10 @@ context.modules = [ } flags = [ ifexists nofail ] } + # the graph scheduler + { name = libpipewire-module-scheduler-v1 + condition = [ { module.scheduler-v1 = !false } ] + } # The native communication protocol. { name = libpipewire-module-protocol-native } diff --git a/src/daemon/pipewire.conf.in b/src/daemon/pipewire.conf.in index c3eb7120f..a9142cede 100644 --- a/src/daemon/pipewire.conf.in +++ b/src/daemon/pipewire.conf.in @@ -121,6 +121,10 @@ context.modules = [ flags = [ ifexists nofail ] condition = [ { module.rt = !false } ] } + # the graph scheduler + { name = libpipewire-module-scheduler-v1 + condition = [ { module.scheduler-v1 = !false } ] + } # The native communication protocol. { name = libpipewire-module-protocol-native diff --git a/src/modules/meson.build b/src/modules/meson.build index 59f46ae13..8636286e6 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -46,6 +46,7 @@ module_sources = [ 'module-vban-recv.c', 'module-vban-send.c', 'module-session-manager.c', + 'module-scheduler-v1.c', 'module-zeroconf-discover.c', 'module-roc-source.c', 'module-roc-sink.c', @@ -532,6 +533,15 @@ pipewire_module_adapter = shared_library('pipewire-module-adapter', dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], ) +pipewire_module_scheduler_v1 = shared_library('pipewire-module-scheduler-v1', + [ 'module-scheduler-v1.c' ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], +) + pipewire_module_session_manager = shared_library('pipewire-module-session-manager', [ 'module-session-manager.c', 'module-session-manager/client-endpoint/client-endpoint.c', diff --git a/src/modules/module-scheduler-v1.c b/src/modules/module-scheduler-v1.c new file mode 100644 index 000000000..22965b257 --- /dev/null +++ b/src/modules/module-scheduler-v1.c @@ -0,0 +1,1009 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2018 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_SYS_VFS_H +#include +#endif +#ifdef HAVE_SYS_MOUNT_H +#include +#endif + +#include +#include +#include +#include + +#include +#include + +/** \page page_module_scheduler_v1 SchedulerV1 + * + * + * ## Module Name + * + * `libpipewire-module-scheduler-v1` + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * ## General options + * + * Options with well-known behavior: + * + * ## Config override + * + * A `module.scheduler-v1.args` config section can be added + * to override the module arguments. + * + *\code{.unparsed} + * # ~/.config/pipewire/pipewire.conf.d/my-scheduler-v1-args.conf + * + * module.scheduler-v1.args = { + * } + *\endcode + * + * ## Example configuration + * + *\code{.unparsed} + * context.modules = [ + * { name = libpipewire-module-scheduler-v1 + * args = { + * } + * } + *] + *\endcode + * + * Since: 1.7.0 + */ + +#define NAME "scheduler-v1" + +PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +#define MODULE_USAGE "" + +static const struct spa_dict_item module_props[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "Implement the Scheduler V1" }, + { PW_KEY_MODULE_USAGE, MODULE_USAGE }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +#define MAX_HOPS 64 +#define MAX_SYNC 4u + +struct impl { + struct pw_context *context; + + struct pw_properties *props; + + struct spa_hook context_listener; + struct spa_hook module_listener; +}; + +static int ensure_state(struct pw_impl_node *node, bool running) +{ + enum pw_node_state state = node->info.state; + if (node->active && node->runnable && + !SPA_FLAG_IS_SET(node->spa_flags, SPA_NODE_FLAG_NEED_CONFIGURE) && running) + state = PW_NODE_STATE_RUNNING; + else if (state > PW_NODE_STATE_IDLE) + state = PW_NODE_STATE_IDLE; + return pw_impl_node_set_state(node, state); +} + +/* From a node (that is runnable) follow all prepared links in the given direction + * and groups to active nodes and make them recursively runnable as well. + */ +static inline int run_nodes(struct pw_context *context, struct pw_impl_node *node, + struct spa_list *nodes, enum pw_direction direction, int hop) +{ + struct pw_impl_node *t; + struct pw_impl_port *p; + struct pw_impl_link *l; + + if (hop == MAX_HOPS) { + pw_log_warn("exceeded hops (%d)", hop); + return -EIO; + } + + pw_log_debug("node %p: '%s' direction:%s", node, node->name, + pw_direction_as_string(direction)); + + SPA_FLAG_SET(node->checked, 1u<input_ports, link) { + spa_list_for_each(l, &p->links, input_link) { + t = l->output->node; + + if (!t->active || !l->prepared || + (!t->driving && SPA_FLAG_IS_SET(t->checked, 1u<driving && p->node == t) + continue; + + pw_log_debug(" peer %p: '%s'", t, t->name); + t->runnable = true; + run_nodes(context, t, nodes, direction, hop + 1); + } + } + } else { + spa_list_for_each(p, &node->output_ports, link) { + spa_list_for_each(l, &p->links, output_link) { + t = l->input->node; + + if (!t->active || !l->prepared || + (!t->driving && SPA_FLAG_IS_SET(t->checked, 1u<driving && p->node == t) + continue; + + pw_log_debug(" peer %p: '%s'", t, t->name); + t->runnable = true; + run_nodes(context, t, nodes, direction, hop + 1); + } + } + } + /* now go through all the nodes that have the same link group and + * that are not yet visited. Note how nodes with the same group + * don't get included here. They were added to the same driver but + * need to otherwise stay idle unless some non-passive link activates + * them. */ + if (node->link_groups != NULL) { + spa_list_for_each(t, nodes, sort_link) { + if (t->exported || !t->active || + SPA_FLAG_IS_SET(t->checked, 1u<link_groups, node->link_groups) < 0) + continue; + + pw_log_debug(" group %p: '%s'", t, t->name); + t->runnable = true; + if (!t->driving) + run_nodes(context, t, nodes, direction, hop + 1); + } + } + return 0; +} + +/* Follow all prepared links and groups from node, activate the links. + * If a non-passive link is found, we set the peer runnable flag. + * + * After this is done, we end up with a list of nodes in collect that are all + * linked to node. + * Some of the nodes have the runnable flag set. We then start from those nodes + * and make all linked nodes and groups runnable as well. (see run_nodes). + * + * This ensures that we only activate the paths from the runnable nodes to the + * driver nodes and leave the other nodes idle. + */ +static int collect_nodes(struct pw_context *context, struct pw_impl_node *node, struct spa_list *collect) +{ + struct spa_list queue; + struct pw_impl_node *n, *t; + struct pw_impl_port *p; + struct pw_impl_link *l; + uint32_t n_sync; + char *sync[MAX_SYNC+1]; + + pw_log_debug("node %p: '%s'", node, node->name); + + /* start with node in the queue */ + spa_list_init(&queue); + spa_list_append(&queue, &node->sort_link); + node->visited = true; + + n_sync = 0; + sync[0] = NULL; + + /* now follow all the links from the nodes in the queue + * and add the peers to the queue. */ + spa_list_consume(n, &queue, sort_link) { + spa_list_remove(&n->sort_link); + spa_list_append(collect, &n->sort_link); + + pw_log_debug(" next node %p: '%s' runnable:%u active:%d", + n, n->name, n->runnable, n->active); + + if (!n->active) + continue; + + if (n->sync) { + for (uint32_t i = 0; n->sync_groups[i]; i++) { + if (n_sync >= MAX_SYNC) + break; + if (pw_strv_find(sync, n->sync_groups[i]) >= 0) + continue; + sync[n_sync++] = n->sync_groups[i]; + sync[n_sync] = NULL; + } + } + + spa_list_for_each(p, &n->input_ports, link) { + spa_list_for_each(l, &p->links, input_link) { + t = l->output->node; + + if (!t->active) + continue; + + pw_impl_link_prepare(l); + + if (!l->prepared) + continue; + + if (!l->passive) + t->runnable = true; + + if (!t->visited) { + t->visited = true; + spa_list_append(&queue, &t->sort_link); + } + } + } + spa_list_for_each(p, &n->output_ports, link) { + spa_list_for_each(l, &p->links, output_link) { + t = l->input->node; + + if (!t->active) + continue; + + pw_impl_link_prepare(l); + + if (!l->prepared) + continue; + + if (!l->passive) + t->runnable = true; + + if (!t->visited) { + t->visited = true; + spa_list_append(&queue, &t->sort_link); + } + } + } + /* now go through all the nodes that have the same group and + * that are not yet visited */ + if (n->groups != NULL || n->link_groups != NULL || sync[0] != NULL) { + spa_list_for_each(t, &context->node_list, link) { + if (t->exported || !t->active || t->visited) + continue; + /* the other node will be scheduled with this one if it's in + * the same group or link group */ + if (pw_strv_find_common(t->groups, n->groups) < 0 && + pw_strv_find_common(t->link_groups, n->link_groups) < 0 && + pw_strv_find_common(t->sync_groups, sync) < 0) + continue; + + pw_log_debug("%p: %s join group of %s", + t, t->name, n->name); + t->visited = true; + spa_list_append(&queue, &t->sort_link); + } + } + pw_log_debug(" next node %p: '%s' runnable:%u %p %p %p", n, n->name, n->runnable, + n->groups, n->link_groups, sync); + } + /* All non-driver runnable nodes (ie. reachable with a non-passive link) now make + * all linked nodes up and downstream runnable as well */ + spa_list_for_each(n, collect, sort_link) { + if (!n->driver && n->runnable) { + run_nodes(context, n, collect, PW_DIRECTION_OUTPUT, 0); + run_nodes(context, n, collect, PW_DIRECTION_INPUT, 0); + } + } + /* now we might have made a driver runnable, if the node is not runnable at this point + * it means it was linked to the driver with passives links and some other node + * made the driver active. If the node is a leaf it can not be activated in any other + * way and we will also make it, and all its peers, runnable */ + spa_list_for_each(n, collect, sort_link) { + if (!n->driver && n->driver_node->runnable && !n->runnable && n->leaf && n->active) { + n->runnable = true; + run_nodes(context, n, collect, PW_DIRECTION_OUTPUT, 0); + run_nodes(context, n, collect, PW_DIRECTION_INPUT, 0); + } + } + + return 0; +} + +static void move_to_driver(struct pw_context *context, struct spa_list *nodes, + struct pw_impl_node *driver) +{ + struct pw_impl_node *n; + pw_log_debug("driver: %p %s runnable:%u", driver, driver->name, driver->runnable); + spa_list_consume(n, nodes, sort_link) { + spa_list_remove(&n->sort_link); + + driver->runnable |= n->runnable; + + pw_log_debug(" follower: %p %s runnable:%u driver-runnable:%u", n, n->name, + n->runnable, driver->runnable); + pw_impl_node_set_driver(n, driver); + } +} +static void remove_from_driver(struct pw_context *context, struct spa_list *nodes) +{ + struct pw_impl_node *n; + spa_list_consume(n, nodes, sort_link) { + spa_list_remove(&n->sort_link); + pw_impl_node_set_driver(n, NULL); + ensure_state(n, false); + } +} + +static inline void get_quantums(struct pw_context *context, uint32_t *def, + uint32_t *min, uint32_t *max, uint32_t *rate, uint32_t *floor, uint32_t *ceil) +{ + struct settings *s = &context->settings; + if (s->clock_force_quantum != 0) { + *def = *min = *max = s->clock_force_quantum; + *rate = 0; + } else { + *def = s->clock_quantum; + *min = s->clock_min_quantum; + *max = s->clock_max_quantum; + *rate = s->clock_rate; + } + *floor = s->clock_quantum_floor; + *ceil = s->clock_quantum_limit; +} + +static inline const uint32_t *get_rates(struct pw_context *context, uint32_t *def, uint32_t *n_rates, + bool *force) +{ + struct settings *s = &context->settings; + if (s->clock_force_rate != 0) { + *force = true; + *n_rates = 1; + *def = s->clock_force_rate; + return &s->clock_force_rate; + } else { + *force = false; + *n_rates = s->n_clock_rates; + *def = s->clock_rate; + return s->clock_rates; + } +} +static void reconfigure_driver(struct pw_context *context, struct pw_impl_node *n) +{ + struct pw_impl_node *s; + + spa_list_for_each(s, &n->follower_list, follower_link) { + if (s == n) + continue; + pw_log_debug("%p: follower %p: '%s' suspend", + context, s, s->name); + pw_impl_node_set_state(s, PW_NODE_STATE_SUSPENDED); + } + pw_log_debug("%p: driver %p: '%s' suspend", + context, n, n->name); + + if (n->info.state >= PW_NODE_STATE_IDLE) + n->need_resume = !n->pause_on_idle; + pw_impl_node_set_state(n, PW_NODE_STATE_SUSPENDED); +} + +/* find smaller power of 2 */ +static uint32_t flp2(uint32_t x) +{ + x = x | (x >> 1); + x = x | (x >> 2); + x = x | (x >> 4); + x = x | (x >> 8); + x = x | (x >> 16); + return x - (x >> 1); +} + +/* cmp fractions, avoiding overflows */ +static int fraction_compare(const struct spa_fraction *a, const struct spa_fraction *b) +{ + uint64_t fa = (uint64_t)a->num * (uint64_t)b->denom; + uint64_t fb = (uint64_t)b->num * (uint64_t)a->denom; + return fa < fb ? -1 : (fa > fb ? 1 : 0); +} + +static inline uint32_t calc_gcd(uint32_t a, uint32_t b) +{ + while (b != 0) { + uint32_t temp = a; + a = b; + b = temp % b; + } + return a; +} + +struct rate_info { + uint32_t rate; + uint32_t gcd; + uint32_t diff; +}; + +static inline void update_highest_rate(struct rate_info *best, struct rate_info *current) +{ + /* find highest rate */ + if (best->rate == 0 || best->rate < current->rate) + *best = *current; +} + +static inline void update_nearest_gcd(struct rate_info *best, struct rate_info *current) +{ + /* find nearest GCD */ + if (best->rate == 0 || + (best->gcd < current->gcd) || + (best->gcd == current->gcd && best->diff > current->diff)) + *best = *current; +} +static inline void update_nearest_rate(struct rate_info *best, struct rate_info *current) +{ + /* find nearest rate */ + if (best->rate == 0 || best->diff > current->diff) + *best = *current; +} + +static uint32_t find_best_rate(const uint32_t *rates, uint32_t n_rates, uint32_t rate, uint32_t def) +{ + uint32_t i, limit; + struct rate_info best; + struct rate_info info[n_rates]; + + for (i = 0; i < n_rates; i++) { + info[i].rate = rates[i]; + info[i].gcd = calc_gcd(rate, rates[i]); + info[i].diff = SPA_ABS((int32_t)rate - (int32_t)rates[i]); + } + + /* first find higher nearest GCD. This tries to find next bigest rate that + * requires the least amount of resample filter banks. Usually these are + * rates that are multiples of each other or multiples of a common rate. + * + * 44100 and [ 32000 56000 88200 96000 ] -> 88200 + * 48000 and [ 32000 56000 88200 96000 ] -> 96000 + * 88200 and [ 44100 48000 96000 192000 ] -> 96000 + * 32000 and [ 44100 192000 ] -> 44100 + * 8000 and [ 44100 48000 ] -> 48000 + * 8000 and [ 44100 192000 ] -> 44100 + * 11025 and [ 44100 48000 ] -> 44100 + * 44100 and [ 48000 176400 ] -> 48000 + * 144 and [ 44100 48000 88200 96000] -> 48000 + */ + spa_zero(best); + /* Don't try to do excessive upsampling by limiting the max rate + * for desired < default to default*2. For other rates allow + * a x3 upsample rate max. For values lower than half of the default, + * limit to the default. */ + limit = rate < def/2 ? def : rate < def ? def*2 : rate*3; + for (i = 0; i < n_rates; i++) { + if (info[i].rate >= rate && info[i].rate <= limit) + update_nearest_gcd(&best, &info[i]); + } + if (best.rate != 0) + return best.rate; + + /* we would need excessive upsampling, pick a nearest higher rate */ + spa_zero(best); + for (i = 0; i < n_rates; i++) { + if (info[i].rate >= rate) + update_nearest_rate(&best, &info[i]); + } + if (best.rate != 0) + return best.rate; + + /* There is nothing above the rate, we need to downsample. Try to downsample + * but only to something that is from a common rate family. Also don't + * try to downsample to something that will sound worse (< 44100). + * + * 88200 and [ 22050 44100 48000 ] -> 44100 + * 88200 and [ 22050 48000 ] -> 48000 + */ + spa_zero(best); + for (i = 0; i < n_rates; i++) { + if (info[i].rate >= 44100) + update_nearest_gcd(&best, &info[i]); + } + if (best.rate != 0) + return best.rate; + + /* There is nothing to downsample above our threshold. Downsample to whatever + * is the highest rate then. */ + spa_zero(best); + for (i = 0; i < n_rates; i++) + update_highest_rate(&best, &info[i]); + if (best.rate != 0) + return best.rate; + + return def; +} + +/* here we evaluate the complete state of the graph. + * + * It roughly operates in 3 stages: + * + * 1. go over all drivers and collect the nodes that need to be scheduled with the + * driver. This include all nodes that have an active link with the driver or + * with a node already scheduled with the driver. + * + * 2. go over all nodes that are not assigned to a driver. The ones that require + * a driver are moved to some random active driver found in step 1. + * + * 3. go over all drivers again, collect the quantum/rate of all followers, select + * the desired final value and activate the followers and then the driver. + * + * A complete graph evaluation is performed for each change that is made to the + * graph, such as making/destroying links, adding/removing nodes, property changes such + * as quantum/rate changes or metadata changes. + */ +static void context_recalc_graph(void *data) +{ + struct impl *impl = data; + struct pw_context *context = impl->context; + struct settings *settings = &context->settings; + struct pw_impl_node *n, *s, *target, *fallback; + const uint32_t *rates; + uint32_t max_quantum, min_quantum, def_quantum, rate_quantum, floor_quantum, ceil_quantum; + uint32_t n_rates, def_rate, transport; + bool freewheel, global_force_rate, global_force_quantum; + struct spa_list collect; + +again: + freewheel = false; + + /* clean up the flags first */ + spa_list_for_each(n, &context->node_list, link) { + n->visited = false; + n->checked = 0; + n->runnable = n->always_process && n->active; + } + + get_quantums(context, &def_quantum, &min_quantum, &max_quantum, &rate_quantum, + &floor_quantum, &ceil_quantum); + rates = get_rates(context, &def_rate, &n_rates, &global_force_rate); + + global_force_quantum = rate_quantum == 0; + + /* start from all drivers and group all nodes that are linked + * to it. Some nodes are not (yet) linked to anything and they + * will end up 'unassigned' to a driver. Other nodes are drivers + * and if they have active followers, we can use them to schedule + * the unassigned nodes. */ + target = fallback = NULL; + spa_list_for_each(n, &context->driver_list, driver_link) { + if (n->exported) + continue; + + if (!n->visited) { + spa_list_init(&collect); + collect_nodes(context, n, &collect); + move_to_driver(context, &collect, n); + } + /* from now on we are only interested in active driving nodes + * with a driver_priority. We're going to see if there are + * active followers. */ + if (!n->driving || !n->active || n->priority_driver <= 0) + continue; + + /* first active driving node is fallback */ + if (fallback == NULL) + fallback = n; + + if (!n->runnable) + continue; + + spa_list_for_each(s, &n->follower_list, follower_link) { + pw_log_debug("%p: driver %p: follower %p %s: active:%d", + context, n, s, s->name, s->active); + if (s != n && s->active) { + /* if the driving node has active followers, it + * is a target for our unassigned nodes */ + if (target == NULL) + target = n; + if (n->freewheel) + freewheel = true; + break; + } + } + } + /* no active node, use fallback driving node */ + if (target == NULL) + target = fallback; + + /* update the freewheel status */ + pw_context_set_freewheel(context, freewheel); + + /* now go through all available nodes. The ones we didn't visit + * in collect_nodes() are not linked to any driver. We assign them + * to either an active driver or the first driver if they are in a + * group that needs a driver. Else we remove them from a driver + * and stop them. */ + spa_list_for_each(n, &context->node_list, link) { + struct pw_impl_node *t, *driver; + + if (n->exported || n->visited) + continue; + + pw_log_debug("%p: unassigned node %p: '%s' active:%d want_driver:%d target:%p", + context, n, n->name, n->active, n->want_driver, target); + + /* collect all nodes in this group */ + spa_list_init(&collect); + collect_nodes(context, n, &collect); + + driver = NULL; + spa_list_for_each(t, &collect, sort_link) { + /* is any active and want a driver */ + if ((t->want_driver && t->active && t->runnable) || + t->always_process) { + driver = target; + break; + } + } + if (driver != NULL) { + driver->runnable = true; + /* driver needed for this group */ + move_to_driver(context, &collect, driver); + } else { + /* no driver, make sure the nodes stop */ + remove_from_driver(context, &collect); + } + } + + /* assign final quantum and set state for followers and drivers */ + spa_list_for_each(n, &context->driver_list, driver_link) { + bool running = false, lock_quantum = false, lock_rate = false; + struct spa_fraction latency = SPA_FRACTION(0, 0); + struct spa_fraction max_latency = SPA_FRACTION(0, 0); + struct spa_fraction rate = SPA_FRACTION(0, 0); + uint32_t target_quantum, target_rate, current_rate, current_quantum; + uint64_t quantum_stamp = 0, rate_stamp = 0; + bool force_rate, force_quantum, restore_rate = false, restore_quantum = false; + bool do_reconfigure = false, need_resume, was_target_pending; + bool have_request = false; + const uint32_t *node_rates; + uint32_t node_n_rates, node_def_rate; + uint32_t node_max_quantum, node_min_quantum, node_def_quantum, node_rate_quantum; + + if (!n->driving || n->exported) + continue; + + node_def_quantum = def_quantum; + node_min_quantum = min_quantum; + node_max_quantum = max_quantum; + node_rate_quantum = rate_quantum; + force_quantum = global_force_quantum; + + node_def_rate = def_rate; + node_n_rates = n_rates; + node_rates = rates; + force_rate = global_force_rate; + + /* collect quantum and rate */ + spa_list_for_each(s, &n->follower_list, follower_link) { + + if (!s->moved) { + /* We only try to enforce the lock flags for nodes that + * are not recently moved between drivers. The nodes that + * are moved should try to enforce their quantum on the + * new driver. */ + lock_quantum |= s->lock_quantum; + lock_rate |= s->lock_rate; + } + if (!global_force_quantum && s->force_quantum > 0 && + s->stamp > quantum_stamp) { + node_def_quantum = node_min_quantum = node_max_quantum = s->force_quantum; + node_rate_quantum = 0; + quantum_stamp = s->stamp; + force_quantum = true; + } + if (!global_force_rate && s->force_rate > 0 && + s->stamp > rate_stamp) { + node_def_rate = s->force_rate; + node_n_rates = 1; + node_rates = &s->force_rate; + force_rate = true; + rate_stamp = s->stamp; + } + + /* smallest latencies */ + if (latency.denom == 0 || + (s->latency.denom > 0 && + fraction_compare(&s->latency, &latency) < 0)) + latency = s->latency; + if (max_latency.denom == 0 || + (s->max_latency.denom > 0 && + fraction_compare(&s->max_latency, &max_latency) < 0)) + max_latency = s->max_latency; + + /* largest rate, which is in fact the smallest fraction */ + if (rate.denom == 0 || + (s->rate.denom > 0 && + fraction_compare(&s->rate, &rate) < 0)) + rate = s->rate; + + if (s->active) + running = n->runnable; + + pw_log_debug("%p: follower %p running:%d runnable:%d rate:%u/%u latency %u/%u '%s'", + context, s, running, s->runnable, rate.num, rate.denom, + latency.num, latency.denom, s->name); + + if (running && s != n && s->supports_request > 0) + have_request = true; + + s->moved = false; + } + + if (n->forced_rate && !force_rate && n->runnable) { + /* A node that was forced to a rate but is no longer being + * forced can restore its rate */ + pw_log_info("(%s-%u) restore rate", n->name, n->info.id); + restore_rate = true; + } + if (n->forced_quantum && !force_quantum && n->runnable) { + /* A node that was forced to a quantum but is no longer being + * forced can restore its quantum */ + pw_log_info("(%s-%u) restore quantum", n->name, n->info.id); + restore_quantum = true; + } + + if (force_quantum) + lock_quantum = false; + if (force_rate) + lock_rate = false; + + need_resume = n->need_resume; + if (need_resume) { + running = true; + n->need_resume = false; + } + + current_rate = n->target_rate.denom; + if (!restore_rate && + (lock_rate || need_resume || !running || + (!force_rate && (n->info.state > PW_NODE_STATE_IDLE)))) { + pw_log_debug("%p: keep rate:1/%u restore:%u lock:%u resume:%u " + "running:%u force:%u state:%s", context, + current_rate, restore_rate, lock_rate, need_resume, + running, force_rate, + pw_node_state_as_string(n->info.state)); + + /* when we don't need to restore or rate and + * when someone wants us to lock the rate of this driver or + * when we are in the process of reconfiguring the driver or + * when we are not running any followers or + * when the driver is busy and we don't need to force a rate, + * keep the current rate */ + target_rate = current_rate; + } + else { + /* Here we are allowed to change the rate of the driver. + * Start with the default rate. If the desired rate is + * allowed, switch to it */ + if (rate.denom != 0 && rate.num == 1) + target_rate = rate.denom; + else + target_rate = node_def_rate; + + target_rate = find_best_rate(node_rates, node_n_rates, + target_rate, node_def_rate); + + pw_log_debug("%p: def_rate:%d target_rate:%d rate:%d/%d", context, + node_def_rate, target_rate, rate.num, rate.denom); + } + + was_target_pending = n->target_pending; + + if (target_rate != current_rate) { + /* we doing a rate switch */ + pw_log_info("(%s-%u) state:%s new rate:%u/(%u)->%u", + n->name, n->info.id, + pw_node_state_as_string(n->info.state), + n->target_rate.denom, current_rate, + target_rate); + + if (force_rate) { + if (settings->clock_rate_update_mode == CLOCK_RATE_UPDATE_MODE_HARD) + do_reconfigure |= !was_target_pending; + } else { + if (n->info.state >= PW_NODE_STATE_SUSPENDED) + do_reconfigure |= !was_target_pending; + } + /* we're setting the pending rate. This will become the new + * current rate in the next iteration of the graph. */ + n->target_rate = SPA_FRACTION(1, target_rate); + n->forced_rate = force_rate; + n->target_pending = true; + current_rate = target_rate; + } + + if (node_rate_quantum != 0 && current_rate != node_rate_quantum) { + /* the quantum values are scaled with the current rate */ + node_def_quantum = SPA_SCALE32(node_def_quantum, current_rate, node_rate_quantum); + node_min_quantum = SPA_SCALE32(node_min_quantum, current_rate, node_rate_quantum); + node_max_quantum = SPA_SCALE32(node_max_quantum, current_rate, node_rate_quantum); + } + + /* calculate desired quantum. Don't limit to the max_latency when we are + * going to force a quantum or rate and reconfigure the nodes. */ + if (max_latency.denom != 0 && !force_quantum && !force_rate) { + uint32_t tmp = SPA_SCALE32(max_latency.num, current_rate, max_latency.denom); + if (tmp < node_max_quantum) + node_max_quantum = tmp; + } + + current_quantum = n->target_quantum; + if (!restore_quantum && (lock_quantum || need_resume || !running)) { + pw_log_debug("%p: keep quantum:%u restore:%u lock:%u resume:%u " + "running:%u force:%u state:%s", context, + current_quantum, restore_quantum, lock_quantum, need_resume, + running, force_quantum, + pw_node_state_as_string(n->info.state)); + target_quantum = current_quantum; + } + else { + target_quantum = node_def_quantum; + if (latency.denom != 0) + target_quantum = SPA_SCALE32(latency.num, current_rate, latency.denom); + target_quantum = SPA_CLAMP(target_quantum, node_min_quantum, node_max_quantum); + target_quantum = SPA_CLAMP(target_quantum, floor_quantum, ceil_quantum); + + if (settings->clock_power_of_two_quantum && !force_quantum) + target_quantum = flp2(target_quantum); + } + + if (target_quantum != current_quantum) { + pw_log_info("(%s-%u) new quantum:%"PRIu64"->%u", + n->name, n->info.id, + n->target_quantum, + target_quantum); + /* this is the new pending quantum */ + n->target_quantum = target_quantum; + n->forced_quantum = force_quantum; + n->target_pending = true; + + if (force_quantum) + do_reconfigure |= !was_target_pending; + } + + if (n->target_pending) { + if (do_reconfigure) { + reconfigure_driver(context, n); + /* we might be suspended now and the links need to be prepared again */ + goto again; + } + /* we have a pending change. We place the new values in the + * pending fields so that they are picked up by the driver in + * the next cycle */ + pw_log_debug("%p: apply duration:%"PRIu64" rate:%u/%u", context, + n->target_quantum, n->target_rate.num, + n->target_rate.denom); + SPA_SEQ_WRITE(n->rt.position->clock.target_seq); + n->rt.position->clock.target_duration = n->target_quantum; + n->rt.position->clock.target_rate = n->target_rate; + SPA_SEQ_WRITE(n->rt.position->clock.target_seq); + + if (n->info.state < PW_NODE_STATE_RUNNING) { + n->rt.position->clock.duration = n->target_quantum; + n->rt.position->clock.rate = n->target_rate; + } + n->target_pending = false; + } else { + n->target_quantum = n->rt.position->clock.target_duration; + n->target_rate = n->rt.position->clock.target_rate; + } + + if (n->info.state < PW_NODE_STATE_RUNNING) + n->rt.position->clock.nsec = get_time_ns(n->rt.target.system); + + SPA_FLAG_UPDATE(n->rt.position->clock.flags, + SPA_IO_CLOCK_FLAG_LAZY, have_request && n->supports_lazy > 0); + + pw_log_debug("%p: driver %p running:%d runnable:%d quantum:%u rate:%u (%"PRIu64"/%u)'%s'", + context, n, running, n->runnable, target_quantum, target_rate, + n->rt.position->clock.target_duration, + n->rt.position->clock.target_rate.denom, n->name); + + transport = PW_NODE_ACTIVATION_COMMAND_NONE; + + /* first change the node states of the followers to the new target */ + spa_list_for_each(s, &n->follower_list, follower_link) { + if (s->transport != PW_NODE_ACTIVATION_COMMAND_NONE) { + transport = s->transport; + s->transport = PW_NODE_ACTIVATION_COMMAND_NONE; + } + if (s == n) + continue; + pw_log_debug("%p: follower %p: active:%d '%s'", + context, s, s->active, s->name); + ensure_state(s, running); + } + + if (transport != PW_NODE_ACTIVATION_COMMAND_NONE) { + pw_log_info("%s: transport %d", n->name, transport); + SPA_ATOMIC_STORE(n->rt.target.activation->command, transport); + } + + /* now that all the followers are ready, start the driver */ + ensure_state(n, running); + } +} + +static const struct pw_context_events context_events = { + PW_VERSION_CONTEXT_EVENTS, + .recalc_graph = context_recalc_graph, +}; + +static void module_destroy(void *data) +{ + struct impl *impl = data; + + if (impl->context) { + spa_hook_remove(&impl->context_listener); + spa_hook_remove(&impl->module_listener); + } + + pw_properties_free(impl->props); + + free(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args_str) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct pw_properties *args; + struct impl *impl; + int res; + + PW_LOG_TOPIC_INIT(mod_topic); + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return -errno; + + pw_log_debug("module %p: new %s", impl, args_str); + + if (args_str) + args = pw_properties_new_string(args_str); + else + args = pw_properties_new(NULL, NULL); + + if (!args) { + res = -errno; + goto error; + } + + pw_context_conf_update_props(context, "module."NAME".args", args); + + impl->props = args; + impl->context = context; + + pw_context_add_listener(context, &impl->context_listener, &context_events, impl); + pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); + + return 0; + +error: + module_destroy(impl); + return res; +} diff --git a/src/pipewire/context.c b/src/pipewire/context.c index 1c30be70a..40ed031bb 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -36,8 +36,6 @@ PW_LOG_TOPIC_EXTERN(log_context); #define PW_LOG_TOPIC_DEFAULT log_context -#define MAX_HOPS 64 -#define MAX_SYNC 4u #define MAX_LOOPS 64u #define DEFAULT_DATA_LOOPS 1 @@ -112,13 +110,17 @@ static void fill_core_properties(struct pw_context *context) pw_properties_set(properties, PW_KEY_CORE_NAME, context->core->info.name); } -static int context_set_freewheel(struct pw_context *context, bool freewheel) +SPA_EXPORT +int pw_context_set_freewheel(struct pw_context *context, bool freewheel) { struct impl *impl = SPA_CONTAINER_OF(context, struct impl, this); struct spa_thread *thr; uint32_t i; int res = 0; + if (context->freewheeling == freewheel) + return 0; + for (i = 0; i < impl->n_data_loops; i++) { if (impl->data_loops[i].impl == NULL || (thr = pw_data_loop_get_thread(impl->data_loops[i].impl)) == NULL) @@ -982,468 +984,9 @@ SPA_PRINTF_FUNC(7, 8) int pw_context_debug_port_params(struct pw_context *this, return 0; } -static int ensure_state(struct pw_impl_node *node, bool running) -{ - enum pw_node_state state = node->info.state; - if (node->active && node->runnable && - !SPA_FLAG_IS_SET(node->spa_flags, SPA_NODE_FLAG_NEED_CONFIGURE) && running) - state = PW_NODE_STATE_RUNNING; - else if (state > PW_NODE_STATE_IDLE) - state = PW_NODE_STATE_IDLE; - return pw_impl_node_set_state(node, state); -} - -/* From a node (that is runnable) follow all prepared links in the given direction - * and groups to active nodes and make them recursively runnable as well. - */ -static inline int run_nodes(struct pw_context *context, struct pw_impl_node *node, - struct spa_list *nodes, enum pw_direction direction, int hop) -{ - struct pw_impl_node *t; - struct pw_impl_port *p; - struct pw_impl_link *l; - - if (hop == MAX_HOPS) { - pw_log_warn("exceeded hops (%d)", hop); - return -EIO; - } - - pw_log_debug("node %p: '%s' direction:%s", node, node->name, - pw_direction_as_string(direction)); - - SPA_FLAG_SET(node->checked, 1u<input_ports, link) { - spa_list_for_each(l, &p->links, input_link) { - t = l->output->node; - - if (!t->active || !l->prepared || - (!t->driving && SPA_FLAG_IS_SET(t->checked, 1u<driving && p->node == t) - continue; - - pw_log_debug(" peer %p: '%s'", t, t->name); - t->runnable = true; - run_nodes(context, t, nodes, direction, hop + 1); - } - } - } else { - spa_list_for_each(p, &node->output_ports, link) { - spa_list_for_each(l, &p->links, output_link) { - t = l->input->node; - - if (!t->active || !l->prepared || - (!t->driving && SPA_FLAG_IS_SET(t->checked, 1u<driving && p->node == t) - continue; - - pw_log_debug(" peer %p: '%s'", t, t->name); - t->runnable = true; - run_nodes(context, t, nodes, direction, hop + 1); - } - } - } - /* now go through all the nodes that have the same link group and - * that are not yet visited. Note how nodes with the same group - * don't get included here. They were added to the same driver but - * need to otherwise stay idle unless some non-passive link activates - * them. */ - if (node->link_groups != NULL) { - spa_list_for_each(t, nodes, sort_link) { - if (t->exported || !t->active || - SPA_FLAG_IS_SET(t->checked, 1u<link_groups, node->link_groups) < 0) - continue; - - pw_log_debug(" group %p: '%s'", t, t->name); - t->runnable = true; - if (!t->driving) - run_nodes(context, t, nodes, direction, hop + 1); - } - } - return 0; -} - -/* Follow all prepared links and groups from node, activate the links. - * If a non-passive link is found, we set the peer runnable flag. - * - * After this is done, we end up with a list of nodes in collect that are all - * linked to node. - * Some of the nodes have the runnable flag set. We then start from those nodes - * and make all linked nodes and groups runnable as well. (see run_nodes). - * - * This ensures that we only activate the paths from the runnable nodes to the - * driver nodes and leave the other nodes idle. - */ -static int collect_nodes(struct pw_context *context, struct pw_impl_node *node, struct spa_list *collect) -{ - struct spa_list queue; - struct pw_impl_node *n, *t; - struct pw_impl_port *p; - struct pw_impl_link *l; - uint32_t n_sync; - char *sync[MAX_SYNC+1]; - - pw_log_debug("node %p: '%s'", node, node->name); - - /* start with node in the queue */ - spa_list_init(&queue); - spa_list_append(&queue, &node->sort_link); - node->visited = true; - - n_sync = 0; - sync[0] = NULL; - - /* now follow all the links from the nodes in the queue - * and add the peers to the queue. */ - spa_list_consume(n, &queue, sort_link) { - spa_list_remove(&n->sort_link); - spa_list_append(collect, &n->sort_link); - - pw_log_debug(" next node %p: '%s' runnable:%u active:%d", - n, n->name, n->runnable, n->active); - - if (!n->active) - continue; - - if (n->sync) { - for (uint32_t i = 0; n->sync_groups[i]; i++) { - if (n_sync >= MAX_SYNC) - break; - if (pw_strv_find(sync, n->sync_groups[i]) >= 0) - continue; - sync[n_sync++] = n->sync_groups[i]; - sync[n_sync] = NULL; - } - } - - spa_list_for_each(p, &n->input_ports, link) { - spa_list_for_each(l, &p->links, input_link) { - t = l->output->node; - - if (!t->active) - continue; - - pw_impl_link_prepare(l); - - if (!l->prepared) - continue; - - if (!l->passive) - t->runnable = true; - - if (!t->visited) { - t->visited = true; - spa_list_append(&queue, &t->sort_link); - } - } - } - spa_list_for_each(p, &n->output_ports, link) { - spa_list_for_each(l, &p->links, output_link) { - t = l->input->node; - - if (!t->active) - continue; - - pw_impl_link_prepare(l); - - if (!l->prepared) - continue; - - if (!l->passive) - t->runnable = true; - - if (!t->visited) { - t->visited = true; - spa_list_append(&queue, &t->sort_link); - } - } - } - /* now go through all the nodes that have the same group and - * that are not yet visited */ - if (n->groups != NULL || n->link_groups != NULL || sync[0] != NULL) { - spa_list_for_each(t, &context->node_list, link) { - if (t->exported || !t->active || t->visited) - continue; - /* the other node will be scheduled with this one if it's in - * the same group or link group */ - if (pw_strv_find_common(t->groups, n->groups) < 0 && - pw_strv_find_common(t->link_groups, n->link_groups) < 0 && - pw_strv_find_common(t->sync_groups, sync) < 0) - continue; - - pw_log_debug("%p: %s join group of %s", - t, t->name, n->name); - t->visited = true; - spa_list_append(&queue, &t->sort_link); - } - } - pw_log_debug(" next node %p: '%s' runnable:%u %p %p %p", n, n->name, n->runnable, - n->groups, n->link_groups, sync); - } - /* All non-driver runnable nodes (ie. reachable with a non-passive link) now make - * all linked nodes up and downstream runnable as well */ - spa_list_for_each(n, collect, sort_link) { - if (!n->driver && n->runnable) { - run_nodes(context, n, collect, PW_DIRECTION_OUTPUT, 0); - run_nodes(context, n, collect, PW_DIRECTION_INPUT, 0); - } - } - /* now we might have made a driver runnable, if the node is not runnable at this point - * it means it was linked to the driver with passives links and some other node - * made the driver active. If the node is a leaf it can not be activated in any other - * way and we will also make it, and all its peers, runnable */ - spa_list_for_each(n, collect, sort_link) { - if (!n->driver && n->driver_node->runnable && !n->runnable && n->leaf && n->active) { - n->runnable = true; - run_nodes(context, n, collect, PW_DIRECTION_OUTPUT, 0); - run_nodes(context, n, collect, PW_DIRECTION_INPUT, 0); - } - } - - return 0; -} - -static void move_to_driver(struct pw_context *context, struct spa_list *nodes, - struct pw_impl_node *driver) -{ - struct pw_impl_node *n; - pw_log_debug("driver: %p %s runnable:%u", driver, driver->name, driver->runnable); - spa_list_consume(n, nodes, sort_link) { - spa_list_remove(&n->sort_link); - - driver->runnable |= n->runnable; - - pw_log_debug(" follower: %p %s runnable:%u driver-runnable:%u", n, n->name, - n->runnable, driver->runnable); - pw_impl_node_set_driver(n, driver); - } -} -static void remove_from_driver(struct pw_context *context, struct spa_list *nodes) -{ - struct pw_impl_node *n; - spa_list_consume(n, nodes, sort_link) { - spa_list_remove(&n->sort_link); - pw_impl_node_set_driver(n, NULL); - ensure_state(n, false); - } -} - -static inline void get_quantums(struct pw_context *context, uint32_t *def, - uint32_t *min, uint32_t *max, uint32_t *rate, uint32_t *floor, uint32_t *ceil) -{ - struct settings *s = &context->settings; - if (s->clock_force_quantum != 0) { - *def = *min = *max = s->clock_force_quantum; - *rate = 0; - } else { - *def = s->clock_quantum; - *min = s->clock_min_quantum; - *max = s->clock_max_quantum; - *rate = s->clock_rate; - } - *floor = s->clock_quantum_floor; - *ceil = s->clock_quantum_limit; -} - -static inline const uint32_t *get_rates(struct pw_context *context, uint32_t *def, uint32_t *n_rates, - bool *force) -{ - struct settings *s = &context->settings; - if (s->clock_force_rate != 0) { - *force = true; - *n_rates = 1; - *def = s->clock_force_rate; - return &s->clock_force_rate; - } else { - *force = false; - *n_rates = s->n_clock_rates; - *def = s->clock_rate; - return s->clock_rates; - } -} -static void reconfigure_driver(struct pw_context *context, struct pw_impl_node *n) -{ - struct pw_impl_node *s; - - spa_list_for_each(s, &n->follower_list, follower_link) { - if (s == n) - continue; - pw_log_debug("%p: follower %p: '%s' suspend", - context, s, s->name); - pw_impl_node_set_state(s, PW_NODE_STATE_SUSPENDED); - } - pw_log_debug("%p: driver %p: '%s' suspend", - context, n, n->name); - - if (n->info.state >= PW_NODE_STATE_IDLE) - n->need_resume = !n->pause_on_idle; - pw_impl_node_set_state(n, PW_NODE_STATE_SUSPENDED); -} - -/* find smaller power of 2 */ -static uint32_t flp2(uint32_t x) -{ - x = x | (x >> 1); - x = x | (x >> 2); - x = x | (x >> 4); - x = x | (x >> 8); - x = x | (x >> 16); - return x - (x >> 1); -} - -/* cmp fractions, avoiding overflows */ -static int fraction_compare(const struct spa_fraction *a, const struct spa_fraction *b) -{ - uint64_t fa = (uint64_t)a->num * (uint64_t)b->denom; - uint64_t fb = (uint64_t)b->num * (uint64_t)a->denom; - return fa < fb ? -1 : (fa > fb ? 1 : 0); -} - -static inline uint32_t calc_gcd(uint32_t a, uint32_t b) -{ - while (b != 0) { - uint32_t temp = a; - a = b; - b = temp % b; - } - return a; -} - -struct rate_info { - uint32_t rate; - uint32_t gcd; - uint32_t diff; -}; - -static inline void update_highest_rate(struct rate_info *best, struct rate_info *current) -{ - /* find highest rate */ - if (best->rate == 0 || best->rate < current->rate) - *best = *current; -} - -static inline void update_nearest_gcd(struct rate_info *best, struct rate_info *current) -{ - /* find nearest GCD */ - if (best->rate == 0 || - (best->gcd < current->gcd) || - (best->gcd == current->gcd && best->diff > current->diff)) - *best = *current; -} -static inline void update_nearest_rate(struct rate_info *best, struct rate_info *current) -{ - /* find nearest rate */ - if (best->rate == 0 || best->diff > current->diff) - *best = *current; -} - -static uint32_t find_best_rate(const uint32_t *rates, uint32_t n_rates, uint32_t rate, uint32_t def) -{ - uint32_t i, limit; - struct rate_info best; - struct rate_info info[n_rates]; - - for (i = 0; i < n_rates; i++) { - info[i].rate = rates[i]; - info[i].gcd = calc_gcd(rate, rates[i]); - info[i].diff = SPA_ABS((int32_t)rate - (int32_t)rates[i]); - } - - /* first find higher nearest GCD. This tries to find next bigest rate that - * requires the least amount of resample filter banks. Usually these are - * rates that are multiples of each other or multiples of a common rate. - * - * 44100 and [ 32000 56000 88200 96000 ] -> 88200 - * 48000 and [ 32000 56000 88200 96000 ] -> 96000 - * 88200 and [ 44100 48000 96000 192000 ] -> 96000 - * 32000 and [ 44100 192000 ] -> 44100 - * 8000 and [ 44100 48000 ] -> 48000 - * 8000 and [ 44100 192000 ] -> 44100 - * 11025 and [ 44100 48000 ] -> 44100 - * 44100 and [ 48000 176400 ] -> 48000 - * 144 and [ 44100 48000 88200 96000] -> 48000 - */ - spa_zero(best); - /* Don't try to do excessive upsampling by limiting the max rate - * for desired < default to default*2. For other rates allow - * a x3 upsample rate max. For values lower than half of the default, - * limit to the default. */ - limit = rate < def/2 ? def : rate < def ? def*2 : rate*3; - for (i = 0; i < n_rates; i++) { - if (info[i].rate >= rate && info[i].rate <= limit) - update_nearest_gcd(&best, &info[i]); - } - if (best.rate != 0) - return best.rate; - - /* we would need excessive upsampling, pick a nearest higher rate */ - spa_zero(best); - for (i = 0; i < n_rates; i++) { - if (info[i].rate >= rate) - update_nearest_rate(&best, &info[i]); - } - if (best.rate != 0) - return best.rate; - - /* There is nothing above the rate, we need to downsample. Try to downsample - * but only to something that is from a common rate family. Also don't - * try to downsample to something that will sound worse (< 44100). - * - * 88200 and [ 22050 44100 48000 ] -> 44100 - * 88200 and [ 22050 48000 ] -> 48000 - */ - spa_zero(best); - for (i = 0; i < n_rates; i++) { - if (info[i].rate >= 44100) - update_nearest_gcd(&best, &info[i]); - } - if (best.rate != 0) - return best.rate; - - /* There is nothing to downsample above our threshold. Downsample to whatever - * is the highest rate then. */ - spa_zero(best); - for (i = 0; i < n_rates; i++) - update_highest_rate(&best, &info[i]); - if (best.rate != 0) - return best.rate; - - return def; -} - -/* here we evaluate the complete state of the graph. - * - * It roughly operates in 3 stages: - * - * 1. go over all drivers and collect the nodes that need to be scheduled with the - * driver. This include all nodes that have an active link with the driver or - * with a node already scheduled with the driver. - * - * 2. go over all nodes that are not assigned to a driver. The ones that require - * a driver are moved to some random active driver found in step 1. - * - * 3. go over all drivers again, collect the quantum/rate of all followers, select - * the desired final value and activate the followers and then the driver. - * - * A complete graph evaluation is performed for each change that is made to the - * graph, such as making/destroying links, adding/removing nodes, property changes such - * as quantum/rate changes or metadata changes. - */ int pw_context_recalc_graph(struct pw_context *context, const char *reason) { struct impl *impl = SPA_CONTAINER_OF(context, struct impl, this); - struct settings *settings = &context->settings; - struct pw_impl_node *n, *s, *target, *fallback; - const uint32_t *rates; - uint32_t max_quantum, min_quantum, def_quantum, rate_quantum, floor_quantum, ceil_quantum; - uint32_t n_rates, def_rate, transport; - bool freewheel, global_force_rate, global_force_quantum; - struct spa_list collect; pw_log_info("%p: busy:%d reason:%s", context, impl->recalc, reason); @@ -1454,392 +997,14 @@ int pw_context_recalc_graph(struct pw_context *context, const char *reason) again: impl->recalc = true; - freewheel = false; - /* clean up the flags first */ - spa_list_for_each(n, &context->node_list, link) { - n->visited = false; - n->checked = 0; - n->runnable = n->always_process && n->active; - } + pw_context_emit_recalc_graph(context); - get_quantums(context, &def_quantum, &min_quantum, &max_quantum, &rate_quantum, - &floor_quantum, &ceil_quantum); - rates = get_rates(context, &def_rate, &n_rates, &global_force_rate); - - global_force_quantum = rate_quantum == 0; - - /* start from all drivers and group all nodes that are linked - * to it. Some nodes are not (yet) linked to anything and they - * will end up 'unassigned' to a driver. Other nodes are drivers - * and if they have active followers, we can use them to schedule - * the unassigned nodes. */ - target = fallback = NULL; - spa_list_for_each(n, &context->driver_list, driver_link) { - if (n->exported) - continue; - - if (!n->visited) { - spa_list_init(&collect); - collect_nodes(context, n, &collect); - move_to_driver(context, &collect, n); - } - /* from now on we are only interested in active driving nodes - * with a driver_priority. We're going to see if there are - * active followers. */ - if (!n->driving || !n->active || n->priority_driver <= 0) - continue; - - /* first active driving node is fallback */ - if (fallback == NULL) - fallback = n; - - if (!n->runnable) - continue; - - spa_list_for_each(s, &n->follower_list, follower_link) { - pw_log_debug("%p: driver %p: follower %p %s: active:%d", - context, n, s, s->name, s->active); - if (s != n && s->active) { - /* if the driving node has active followers, it - * is a target for our unassigned nodes */ - if (target == NULL) - target = n; - if (n->freewheel) - freewheel = true; - break; - } - } - } - /* no active node, use fallback driving node */ - if (target == NULL) - target = fallback; - - /* update the freewheel status */ - if (context->freewheeling != freewheel) - context_set_freewheel(context, freewheel); - - /* now go through all available nodes. The ones we didn't visit - * in collect_nodes() are not linked to any driver. We assign them - * to either an active driver or the first driver if they are in a - * group that needs a driver. Else we remove them from a driver - * and stop them. */ - spa_list_for_each(n, &context->node_list, link) { - struct pw_impl_node *t, *driver; - - if (n->exported || n->visited) - continue; - - pw_log_debug("%p: unassigned node %p: '%s' active:%d want_driver:%d target:%p", - context, n, n->name, n->active, n->want_driver, target); - - /* collect all nodes in this group */ - spa_list_init(&collect); - collect_nodes(context, n, &collect); - - driver = NULL; - spa_list_for_each(t, &collect, sort_link) { - /* is any active and want a driver */ - if ((t->want_driver && t->active && t->runnable) || - t->always_process) { - driver = target; - break; - } - } - if (driver != NULL) { - driver->runnable = true; - /* driver needed for this group */ - move_to_driver(context, &collect, driver); - } else { - /* no driver, make sure the nodes stop */ - remove_from_driver(context, &collect); - } - } - - /* assign final quantum and set state for followers and drivers */ - spa_list_for_each(n, &context->driver_list, driver_link) { - bool running = false, lock_quantum = false, lock_rate = false; - struct spa_fraction latency = SPA_FRACTION(0, 0); - struct spa_fraction max_latency = SPA_FRACTION(0, 0); - struct spa_fraction rate = SPA_FRACTION(0, 0); - uint32_t target_quantum, target_rate, current_rate, current_quantum; - uint64_t quantum_stamp = 0, rate_stamp = 0; - bool force_rate, force_quantum, restore_rate = false, restore_quantum = false; - bool do_reconfigure = false, need_resume, was_target_pending; - bool have_request = false; - const uint32_t *node_rates; - uint32_t node_n_rates, node_def_rate; - uint32_t node_max_quantum, node_min_quantum, node_def_quantum, node_rate_quantum; - - if (!n->driving || n->exported) - continue; - - node_def_quantum = def_quantum; - node_min_quantum = min_quantum; - node_max_quantum = max_quantum; - node_rate_quantum = rate_quantum; - force_quantum = global_force_quantum; - - node_def_rate = def_rate; - node_n_rates = n_rates; - node_rates = rates; - force_rate = global_force_rate; - - /* collect quantum and rate */ - spa_list_for_each(s, &n->follower_list, follower_link) { - - if (!s->moved) { - /* We only try to enforce the lock flags for nodes that - * are not recently moved between drivers. The nodes that - * are moved should try to enforce their quantum on the - * new driver. */ - lock_quantum |= s->lock_quantum; - lock_rate |= s->lock_rate; - } - if (!global_force_quantum && s->force_quantum > 0 && - s->stamp > quantum_stamp) { - node_def_quantum = node_min_quantum = node_max_quantum = s->force_quantum; - node_rate_quantum = 0; - quantum_stamp = s->stamp; - force_quantum = true; - } - if (!global_force_rate && s->force_rate > 0 && - s->stamp > rate_stamp) { - node_def_rate = s->force_rate; - node_n_rates = 1; - node_rates = &s->force_rate; - force_rate = true; - rate_stamp = s->stamp; - } - - /* smallest latencies */ - if (latency.denom == 0 || - (s->latency.denom > 0 && - fraction_compare(&s->latency, &latency) < 0)) - latency = s->latency; - if (max_latency.denom == 0 || - (s->max_latency.denom > 0 && - fraction_compare(&s->max_latency, &max_latency) < 0)) - max_latency = s->max_latency; - - /* largest rate, which is in fact the smallest fraction */ - if (rate.denom == 0 || - (s->rate.denom > 0 && - fraction_compare(&s->rate, &rate) < 0)) - rate = s->rate; - - if (s->active) - running = n->runnable; - - pw_log_debug("%p: follower %p running:%d runnable:%d rate:%u/%u latency %u/%u '%s'", - context, s, running, s->runnable, rate.num, rate.denom, - latency.num, latency.denom, s->name); - - if (running && s != n && s->supports_request > 0) - have_request = true; - - s->moved = false; - } - - if (n->forced_rate && !force_rate && n->runnable) { - /* A node that was forced to a rate but is no longer being - * forced can restore its rate */ - pw_log_info("(%s-%u) restore rate", n->name, n->info.id); - restore_rate = true; - } - if (n->forced_quantum && !force_quantum && n->runnable) { - /* A node that was forced to a quantum but is no longer being - * forced can restore its quantum */ - pw_log_info("(%s-%u) restore quantum", n->name, n->info.id); - restore_quantum = true; - } - - if (force_quantum) - lock_quantum = false; - if (force_rate) - lock_rate = false; - - need_resume = n->need_resume; - if (need_resume) { - running = true; - n->need_resume = false; - } - - current_rate = n->target_rate.denom; - if (!restore_rate && - (lock_rate || need_resume || !running || - (!force_rate && (n->info.state > PW_NODE_STATE_IDLE)))) { - pw_log_debug("%p: keep rate:1/%u restore:%u lock:%u resume:%u " - "running:%u force:%u state:%s", context, - current_rate, restore_rate, lock_rate, need_resume, - running, force_rate, - pw_node_state_as_string(n->info.state)); - - /* when we don't need to restore or rate and - * when someone wants us to lock the rate of this driver or - * when we are in the process of reconfiguring the driver or - * when we are not running any followers or - * when the driver is busy and we don't need to force a rate, - * keep the current rate */ - target_rate = current_rate; - } - else { - /* Here we are allowed to change the rate of the driver. - * Start with the default rate. If the desired rate is - * allowed, switch to it */ - if (rate.denom != 0 && rate.num == 1) - target_rate = rate.denom; - else - target_rate = node_def_rate; - - target_rate = find_best_rate(node_rates, node_n_rates, - target_rate, node_def_rate); - - pw_log_debug("%p: def_rate:%d target_rate:%d rate:%d/%d", context, - node_def_rate, target_rate, rate.num, rate.denom); - } - - was_target_pending = n->target_pending; - - if (target_rate != current_rate) { - /* we doing a rate switch */ - pw_log_info("(%s-%u) state:%s new rate:%u/(%u)->%u", - n->name, n->info.id, - pw_node_state_as_string(n->info.state), - n->target_rate.denom, current_rate, - target_rate); - - if (force_rate) { - if (settings->clock_rate_update_mode == CLOCK_RATE_UPDATE_MODE_HARD) - do_reconfigure |= !was_target_pending; - } else { - if (n->info.state >= PW_NODE_STATE_SUSPENDED) - do_reconfigure |= !was_target_pending; - } - /* we're setting the pending rate. This will become the new - * current rate in the next iteration of the graph. */ - n->target_rate = SPA_FRACTION(1, target_rate); - n->forced_rate = force_rate; - n->target_pending = true; - current_rate = target_rate; - } - - if (node_rate_quantum != 0 && current_rate != node_rate_quantum) { - /* the quantum values are scaled with the current rate */ - node_def_quantum = SPA_SCALE32(node_def_quantum, current_rate, node_rate_quantum); - node_min_quantum = SPA_SCALE32(node_min_quantum, current_rate, node_rate_quantum); - node_max_quantum = SPA_SCALE32(node_max_quantum, current_rate, node_rate_quantum); - } - - /* calculate desired quantum. Don't limit to the max_latency when we are - * going to force a quantum or rate and reconfigure the nodes. */ - if (max_latency.denom != 0 && !force_quantum && !force_rate) { - uint32_t tmp = SPA_SCALE32(max_latency.num, current_rate, max_latency.denom); - if (tmp < node_max_quantum) - node_max_quantum = tmp; - } - - current_quantum = n->target_quantum; - if (!restore_quantum && (lock_quantum || need_resume || !running)) { - pw_log_debug("%p: keep quantum:%u restore:%u lock:%u resume:%u " - "running:%u force:%u state:%s", context, - current_quantum, restore_quantum, lock_quantum, need_resume, - running, force_quantum, - pw_node_state_as_string(n->info.state)); - target_quantum = current_quantum; - } - else { - target_quantum = node_def_quantum; - if (latency.denom != 0) - target_quantum = SPA_SCALE32(latency.num, current_rate, latency.denom); - target_quantum = SPA_CLAMP(target_quantum, node_min_quantum, node_max_quantum); - target_quantum = SPA_CLAMP(target_quantum, floor_quantum, ceil_quantum); - - if (settings->clock_power_of_two_quantum && !force_quantum) - target_quantum = flp2(target_quantum); - } - - if (target_quantum != current_quantum) { - pw_log_info("(%s-%u) new quantum:%"PRIu64"->%u", - n->name, n->info.id, - n->target_quantum, - target_quantum); - /* this is the new pending quantum */ - n->target_quantum = target_quantum; - n->forced_quantum = force_quantum; - n->target_pending = true; - - if (force_quantum) - do_reconfigure |= !was_target_pending; - } - - if (n->target_pending) { - if (do_reconfigure) { - reconfigure_driver(context, n); - /* we might be suspended now and the links need to be prepared again */ - goto again; - } - /* we have a pending change. We place the new values in the - * pending fields so that they are picked up by the driver in - * the next cycle */ - pw_log_debug("%p: apply duration:%"PRIu64" rate:%u/%u", context, - n->target_quantum, n->target_rate.num, - n->target_rate.denom); - SPA_SEQ_WRITE(n->rt.position->clock.target_seq); - n->rt.position->clock.target_duration = n->target_quantum; - n->rt.position->clock.target_rate = n->target_rate; - SPA_SEQ_WRITE(n->rt.position->clock.target_seq); - - if (n->info.state < PW_NODE_STATE_RUNNING) { - n->rt.position->clock.duration = n->target_quantum; - n->rt.position->clock.rate = n->target_rate; - } - n->target_pending = false; - } else { - n->target_quantum = n->rt.position->clock.target_duration; - n->target_rate = n->rt.position->clock.target_rate; - } - - if (n->info.state < PW_NODE_STATE_RUNNING) - n->rt.position->clock.nsec = get_time_ns(n->rt.target.system); - - SPA_FLAG_UPDATE(n->rt.position->clock.flags, - SPA_IO_CLOCK_FLAG_LAZY, have_request && n->supports_lazy > 0); - - pw_log_debug("%p: driver %p running:%d runnable:%d quantum:%u rate:%u (%"PRIu64"/%u)'%s'", - context, n, running, n->runnable, target_quantum, target_rate, - n->rt.position->clock.target_duration, - n->rt.position->clock.target_rate.denom, n->name); - - transport = PW_NODE_ACTIVATION_COMMAND_NONE; - - /* first change the node states of the followers to the new target */ - spa_list_for_each(s, &n->follower_list, follower_link) { - if (s->transport != PW_NODE_ACTIVATION_COMMAND_NONE) { - transport = s->transport; - s->transport = PW_NODE_ACTIVATION_COMMAND_NONE; - } - if (s == n) - continue; - pw_log_debug("%p: follower %p: active:%d '%s'", - context, s, s->active, s->name); - ensure_state(s, running); - } - - if (transport != PW_NODE_ACTIVATION_COMMAND_NONE) { - pw_log_info("%s: transport %d", n->name, transport); - SPA_ATOMIC_STORE(n->rt.target.activation->command, transport); - } - - /* now that all the followers are ready, start the driver */ - ensure_state(n, running); - } impl->recalc = false; if (impl->recalc_pending) { impl->recalc_pending = false; goto again; } - return 0; } diff --git a/src/pipewire/context.h b/src/pipewire/context.h index 61c6662c4..5eaa8de30 100644 --- a/src/pipewire/context.h +++ b/src/pipewire/context.h @@ -51,7 +51,7 @@ struct pw_impl_node; /** context events emitted by the context object added with \ref pw_context_add_listener */ struct pw_context_events { -#define PW_VERSION_CONTEXT_EVENTS 1 +#define PW_VERSION_CONTEXT_EVENTS 2 uint32_t version; /** The context is being destroyed */ @@ -69,6 +69,9 @@ struct pw_context_events { void (*driver_added) (void *data, struct pw_impl_node *node); /** a driver was removed, since 0.3.75 version:1 */ void (*driver_removed) (void *data, struct pw_impl_node *node); + + /** recalculate the graph state, since 1.7.0 version:2 */ + void (*recalc_graph) (void *data); }; /** Make a new context object for a given main_loop. Ownership of the properties is taken, even diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index a77dcf35f..610a6d620 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -969,6 +969,7 @@ static void output_remove(struct pw_impl_link *this) this->output = NULL; } +SPA_EXPORT int pw_impl_link_prepare(struct pw_impl_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 5582709ad..95a1a4646 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -364,6 +364,7 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq, #define pw_context_emit_global_removed(c,g) pw_context_emit(c, global_removed, 0, g) #define pw_context_emit_driver_added(c,n) pw_context_emit(c, driver_added, 1, n) #define pw_context_emit_driver_removed(c,n) pw_context_emit(c, driver_removed, 1, n) +#define pw_context_emit_recalc_graph(c) pw_context_emit(c, recalc_graph, 2) struct pw_context { struct pw_impl_core *core; /**< core object */ @@ -1269,6 +1270,8 @@ int pw_context_debug_port_params(struct pw_context *context, struct spa_node *node, enum spa_direction direction, uint32_t port_id, uint32_t id, int err, const char *debug, ...); +int pw_context_set_freewheel(struct pw_context *context, bool freewheel); + int pw_proxy_init(struct pw_proxy *proxy, struct pw_core *core, const char *type, uint32_t version); void pw_proxy_remove(struct pw_proxy *proxy);