node: add node.sync-group and node.sync

node.sync-group can contain a list of strings. When a node in the graph
sets node.sync = true, it will be scheduled with all of the other nodes with
common node.sync-group strings. By default all nodes are placed in
group.sync.0 except the freewheel and dummy driver.

Use this to ensure that all nodes are grouped under the same driver
(that is not the freewheel and dummy driver) as soon as the transport is
started so that the transport is visible to all nodes from the same
sync-groups. We also don't deactive the sync-group anymore for the node,
even if the transport is stopped, to avoid driver changes and transport
jumps.

When the node that activated the sync/transport is destroyed, things are
restored to their original state. Note that this is different from JACK
where starting the transport outlives the application and always needs
to be explicitly stopped again. We can't really do this (by default) because
it leaves the graph in an unnatural state with all devices in sync.

The reason for the node.sync-group is that it is possible to still have N
different subgraphs with a separate transports by manually specifying
the node.sync-group.

It's also slightly different from the node.group, that is always active.
The sync-group is something you only want to enable in specific cases
because it groups drivers together and enables adaptive resampling etc.

It's also possible to place this option in the jack.conf file to
automatically sync all devices and apps as soon as a jack app is started.

Fixes #3850
This commit is contained in:
Wim Taymans 2024-02-29 17:49:46 +01:00
parent d063ab7322
commit 06f63f489a
6 changed files with 63 additions and 8 deletions

View file

@ -6803,6 +6803,7 @@ static int transport_update(struct client* c, int active)
pw_log_info("%p: transport %d", c, active); pw_log_info("%p: transport %d", c, active);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
pw_properties_set(c->props, PW_KEY_NODE_SYNC, "true");
pw_properties_set(c->props, PW_KEY_NODE_TRANSPORT, pw_properties_set(c->props, PW_KEY_NODE_TRANSPORT,
active ? "true" : "false"); active ? "true" : "false");

View file

@ -232,6 +232,7 @@ context.objects = [
factory.name = support.node.driver factory.name = support.node.driver
node.name = Dummy-Driver node.name = Dummy-Driver
node.group = pipewire.dummy node.group = pipewire.dummy
node.sync-group = sync.dummy
priority.driver = 20000 priority.driver = 20000
#clock.id = monotonic # realtime | tai | monotonic-raw | boottime #clock.id = monotonic # realtime | tai | monotonic-raw | boottime
#clock.name = "clock.system.monotonic" #clock.name = "clock.system.monotonic"
@ -243,6 +244,7 @@ context.objects = [
node.name = Freewheel-Driver node.name = Freewheel-Driver
priority.driver = 19000 priority.driver = 19000
node.group = pipewire.freewheel node.group = pipewire.freewheel
node.sync-group = sync.dummy
node.freewheel = true node.freewheel = true
#freewheel.wait = 10 #freewheel.wait = 10
} }

View file

@ -33,6 +33,7 @@ PW_LOG_TOPIC_EXTERN(log_context);
#define PW_LOG_TOPIC_DEFAULT log_context #define PW_LOG_TOPIC_DEFAULT log_context
#define MAX_HOPS 64 #define MAX_HOPS 64
#define MAX_SYNC 4u
/** \cond */ /** \cond */
struct impl { struct impl {
@ -895,7 +896,8 @@ static inline int run_nodes(struct pw_context *context, struct pw_impl_node *nod
* This ensures that we only activate the paths from the runnable nodes to the * This ensures that we only activate the paths from the runnable nodes to the
* driver nodes and leave the other nodes idle. * driver nodes and leave the other nodes idle.
*/ */
static int collect_nodes(struct pw_context *context, struct pw_impl_node *node, struct spa_list *collect) static int collect_nodes(struct pw_context *context, struct pw_impl_node *node, struct spa_list *collect,
char **sync)
{ {
struct spa_list queue; struct spa_list queue;
struct pw_impl_node *n, *t; struct pw_impl_node *n, *t;
@ -920,6 +922,11 @@ static int collect_nodes(struct pw_context *context, struct pw_impl_node *node,
if (!n->active) if (!n->active)
continue; continue;
if (sync[0] != NULL) {
if (pw_strv_find_common(n->sync_groups, sync) < 0)
continue;
}
spa_list_for_each(p, &n->input_ports, link) { spa_list_for_each(p, &n->input_ports, link) {
spa_list_for_each(l, &p->links, input_link) { spa_list_for_each(l, &p->links, input_link) {
t = l->output->node; t = l->output->node;
@ -964,13 +971,15 @@ static int collect_nodes(struct pw_context *context, struct pw_impl_node *node,
} }
/* now go through all the nodes that have the same group and /* now go through all the nodes that have the same group and
* that are not yet visited */ * that are not yet visited */
if (n->groups != NULL || n->link_groups != NULL) { if (n->groups != NULL || n->link_groups != NULL || sync[0] != NULL) {
spa_list_for_each(t, &context->node_list, link) { spa_list_for_each(t, &context->node_list, link) {
if (t->exported || !t->active || t->visited) if (t->exported || !t->active || t->visited)
continue; continue;
if (pw_strv_find_common(t->groups, n->groups) < 0 && if (pw_strv_find_common(t->groups, n->groups) < 0 &&
pw_strv_find_common(t->link_groups, n->link_groups) < 0) pw_strv_find_common(t->link_groups, n->link_groups) < 0 &&
pw_strv_find_common(t->sync_groups, sync) < 0)
continue; continue;
pw_log_debug("%p: %s join group of %s", pw_log_debug("%p: %s join group of %s",
t, t->name, n->name); t, t->name, n->name);
t->visited = true; t->visited = true;
@ -1220,9 +1229,10 @@ int pw_context_recalc_graph(struct pw_context *context, const char *reason)
struct pw_impl_node *n, *s, *target, *fallback; struct pw_impl_node *n, *s, *target, *fallback;
const uint32_t *rates; const uint32_t *rates;
uint32_t max_quantum, min_quantum, def_quantum, lim_quantum, rate_quantum; uint32_t max_quantum, min_quantum, def_quantum, lim_quantum, rate_quantum;
uint32_t n_rates, def_rate; uint32_t n_rates, def_rate, n_sync;
bool freewheel, global_force_rate, global_force_quantum, transport_start; bool freewheel, global_force_rate, global_force_quantum, transport_start;
struct spa_list collect; struct spa_list collect;
char *sync[MAX_SYNC+1];
pw_log_info("%p: busy:%d reason:%s", context, impl->recalc, reason); pw_log_info("%p: busy:%d reason:%s", context, impl->recalc, reason);
@ -1236,11 +1246,23 @@ again:
freewheel = false; freewheel = false;
transport_start = false; transport_start = false;
/* clean up the flags first */ /* clean up the flags first and collect sync */
n_sync = 0;
sync[0] = NULL;
spa_list_for_each(n, &context->node_list, link) { spa_list_for_each(n, &context->node_list, link) {
n->visited = false; n->visited = false;
n->checked = 0; n->checked = 0;
n->runnable = n->always_process && n->active; n->runnable = n->always_process && n->active;
if (n->sync) {
for (uint32_t i = 0; n->sync_groups[i]; i++) {
if (n_sync >= MAX_SYNC)
break;
if (pw_strv_find(sync, n->sync_groups[i]) >= 0)
continue;
sync[n_sync++] = n->sync_groups[i];
sync[n_sync] = NULL;
}
}
} }
get_quantums(context, &def_quantum, &min_quantum, &max_quantum, &lim_quantum, &rate_quantum); get_quantums(context, &def_quantum, &min_quantum, &max_quantum, &lim_quantum, &rate_quantum);
@ -1260,7 +1282,7 @@ again:
if (!n->visited) { if (!n->visited) {
spa_list_init(&collect); spa_list_init(&collect);
collect_nodes(context, n, &collect); collect_nodes(context, n, &collect, sync);
move_to_driver(context, &collect, n); move_to_driver(context, &collect, n);
} }
/* from now on we are only interested in active driving nodes /* from now on we are only interested in active driving nodes
@ -1314,7 +1336,7 @@ again:
/* collect all nodes in this group */ /* collect all nodes in this group */
spa_list_init(&collect); spa_list_init(&collect);
collect_nodes(context, n, &collect); collect_nodes(context, n, &collect, sync);
driver = NULL; driver = NULL;
spa_list_for_each(t, &collect, sort_link) { spa_list_for_each(t, &collect, sort_link) {

View file

@ -47,6 +47,7 @@ struct impl {
char *group; char *group;
char *link_group; char *link_group;
char *sync_group;
}; };
#define pw_node_resource(r,m,v,...) pw_resource_call(r,struct pw_node_events,m,v,__VA_ARGS__) #define pw_node_resource(r,m,v,...) pw_resource_call(r,struct pw_node_events,m,v,__VA_ARGS__)
@ -480,6 +481,7 @@ clear_info(struct pw_impl_node *this)
{ {
pw_free_strv(this->groups); pw_free_strv(this->groups);
pw_free_strv(this->link_groups); pw_free_strv(this->link_groups);
pw_free_strv(this->sync_groups);
free(this->name); free(this->name);
free((char*)this->info.error); free((char*)this->info.error);
} }
@ -938,7 +940,7 @@ static void check_properties(struct pw_impl_node *node)
const char *str, *recalc_reason = NULL; const char *str, *recalc_reason = NULL;
struct spa_fraction frac; struct spa_fraction frac;
uint32_t value; uint32_t value;
bool driver, trigger, transport; bool driver, trigger, transport, sync;
struct match match; struct match match;
match = MATCH_INIT(node); match = MATCH_INIT(node);
@ -1021,6 +1023,26 @@ static void check_properties(struct pw_impl_node *node)
recalc_reason = "link group changed"; recalc_reason = "link group changed";
} }
/* sync group defines what nodes are part of the same sync */
str = pw_properties_get(node->properties, PW_KEY_NODE_SYNC_GROUP);
if (str == NULL)
str = "group.sync.0";
if (!spa_streq(str, impl->sync_group)) {
pw_log_info("%p: sync group '%s'->'%s'", node, impl->sync_group, str);
free(impl->sync_group);
impl->sync_group = str ? strdup(str) : NULL;
pw_free_strv(node->sync_groups);
node->sync_groups = impl->sync_group ?
pw_strv_parse(impl->sync_group, strlen(impl->sync_group), INT_MAX, NULL) : NULL;
recalc_reason = "sync group changed";
}
sync = pw_properties_get_bool(node->properties, PW_KEY_NODE_SYNC, false);
if (sync != node->sync) {
pw_log_info("%p: sync %d -> %d", node, node->sync, sync);
node->sync = sync;
recalc_reason = "sync changed";
}
transport = pw_properties_get_bool(node->properties, PW_KEY_NODE_TRANSPORT, false); transport = pw_properties_get_bool(node->properties, PW_KEY_NODE_TRANSPORT, false);
if (transport != node->transport) { if (transport != node->transport) {
pw_log_info("%p: transport %d -> %d", node, node->transport, transport); pw_log_info("%p: transport %d -> %d", node, node->transport, transport);
@ -2175,6 +2197,7 @@ void pw_impl_node_destroy(struct pw_impl_node *node)
spa_system_close(node->data_system, node->source.fd); spa_system_close(node->data_system, node->source.fd);
free(impl->group); free(impl->group);
free(impl->link_group); free(impl->link_group);
free(impl->sync_group);
free(impl); free(impl);
#ifdef HAVE_MALLOC_TRIM #ifdef HAVE_MALLOC_TRIM

View file

@ -139,6 +139,11 @@ extern "C" {
* in the same group are always scheduled * in the same group are always scheduled
* with the same driver. Can be an array of * with the same driver. Can be an array of
* group names. */ * group names. */
#define PW_KEY_NODE_SYNC_GROUP "node.sync-group" /**< the sync group this node is part of. Nodes
* in the same sync group are always scheduled
* together with the same driver when the sync
* is active. Can be an array of sync names. */
#define PW_KEY_NODE_SYNC "node.sync" /**< if the sync-group is active or not */
#define PW_KEY_NODE_TRANSPORT "node.transport" /**< if the transport is active or not */ #define PW_KEY_NODE_TRANSPORT "node.transport" /**< if the transport is active or not */
#define PW_KEY_NODE_EXCLUSIVE "node.exclusive" /**< node wants exclusive access to resources */ #define PW_KEY_NODE_EXCLUSIVE "node.exclusive" /**< node wants exclusive access to resources */
#define PW_KEY_NODE_AUTOCONNECT "node.autoconnect" /**< node wants to be automatically connected #define PW_KEY_NODE_AUTOCONNECT "node.autoconnect" /**< node wants to be automatically connected

View file

@ -650,6 +650,7 @@ struct pw_impl_node {
uint32_t priority_driver; /** priority for being driver */ uint32_t priority_driver; /** priority for being driver */
char **groups; /** groups to schedule this node in */ char **groups; /** groups to schedule this node in */
char **link_groups; /** groups this node is linked to */ char **link_groups; /** groups this node is linked to */
char **sync_groups; /** sync groups this node is in */
uint64_t spa_flags; uint64_t spa_flags;
unsigned int registered:1; unsigned int registered:1;
@ -683,6 +684,7 @@ struct pw_impl_node {
* trigger to start processing. */ * trigger to start processing. */
unsigned int can_suspend:1; unsigned int can_suspend:1;
unsigned int checked; /**< for sorting */ unsigned int checked; /**< for sorting */
unsigned int sync:1; /**< the sync-groups are active */
unsigned int transport:1; /**< the transport is active */ unsigned int transport:1; /**< the transport is active */
uint32_t port_user_data_size; /**< extra size for port user data */ uint32_t port_user_data_size; /**< extra size for port user data */