From 32f039e2e5e8eeab7f29124bbd14c6fdaac94715 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 10 Apr 2019 16:19:10 +0200 Subject: [PATCH] core: rework how nodes are added to a driver Add a function to recalculate all nodes associated with a driver by iterating the graph for each driver node. We used to do this in an incremental way, which is easy to join graph but expensive to split. A full scan simplifies some things and we can't avoid it when we need to calculate latencies later. It will also simplifies assigning master and slave roles to drivers when the graphs are joined/split. When we link/unlink or add/remove nodes, recalc the graph. --- src/pipewire/core.c | 77 ++++++++++++++++++++++++++ src/pipewire/link.c | 26 ++------- src/pipewire/node.c | 123 ++++++++++++----------------------------- src/pipewire/private.h | 10 ++++ 4 files changed, 127 insertions(+), 109 deletions(-) diff --git a/src/pipewire/core.c b/src/pipewire/core.c index 645790c5f..c14e78e92 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -478,6 +478,7 @@ struct pw_core *pw_core_new(struct pw_loop *main_loop, spa_list_init(&this->control_list[0]); spa_list_init(&this->control_list[1]); spa_list_init(&this->export_list); + spa_list_init(&this->driver_list); spa_hook_list_init(&this->listener_list); if ((name = pw_properties_get(properties, PW_CORE_PROP_NAME)) == NULL) { @@ -932,3 +933,79 @@ struct pw_factory *pw_core_find_factory(struct pw_core *core, } return NULL; } + +static int collect_nodes(struct pw_node *driver) +{ + struct spa_list queue; + struct pw_node *n, *t; + struct pw_port *p; + struct pw_link *l; + uint32_t quantum = DEFAULT_QUANTUM; + + spa_list_init(&driver->driver_list); + + pw_log_info("driver %p: '%s'", driver, driver->info.name); + spa_list_init(&queue); + spa_list_append(&queue, &driver->sort_link); + driver->visited = true; + + spa_list_consume(n, &queue, sort_link) { + spa_list_remove(&n->sort_link); + spa_list_append(&driver->driver_list, &n->driver_link); + + pw_node_set_driver(n, driver); + + if (n->quantum_size > 0 && n->quantum_size < quantum) + quantum = n->quantum_size; + + spa_list_for_each(p, &n->input_ports, link) { + spa_list_for_each(l, &p->links, input_link) { + t = l->output->node; + if (!t->visited) { + t->visited = true; + spa_list_append(&queue, &t->sort_link); + } + } + } + spa_list_for_each(p, &n->output_ports, link) { + spa_list_for_each(l, &p->links, output_link) { + t = l->input->node; + if (!t->visited) { + t->visited = true; + spa_list_append(&queue, &t->sort_link); + } + } + } + } + quantum = SPA_MAX(quantum, MIN_QUANTUM); + + if (driver->rt.position && quantum != driver->rt.position->size) { + driver->rt.position->size = quantum; + } + + return 0; +} + +int pw_core_recalc_graph(struct pw_core *core) +{ + struct pw_node *n, *s; + + spa_list_for_each(n, &core->driver_list, core_driver_link) { + if (!n->visited) + collect_nodes(n); + } + + spa_list_for_each(n, &core->node_list, link) { + if (!n->visited) { + pw_log_info("unassigned node %p: '%s'", n, n->info.name); + } + n->visited = false; + } + + spa_list_for_each(n, &core->driver_list, core_driver_link) { + pw_log_info("driver %p: quantum:%d '%s'", n, n->rt.position->size, n->info.name); + spa_list_for_each(s, &n->driver_list, driver_link) + pw_log_info("slave %p: active:%d '%s'", s, s->active, s->info.name); + } + return 0; +} diff --git a/src/pipewire/link.c b/src/pipewire/link.c index a12eb4c67..20e2688bd 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -1102,26 +1102,6 @@ static const struct pw_node_events output_node_events = { .result = output_node_result, }; -static int find_driver(struct pw_link *this) -{ - struct pw_node *out_driver, *in_driver; - - out_driver = this->output->node->driver_node; - in_driver = this->input->node->driver_node; - - pw_log_debug("link %p: drivers %p/%p", this, out_driver, in_driver); - - if (out_driver == in_driver) - return 0; - - if (out_driver->driver) - pw_node_set_driver(in_driver, out_driver); - else - pw_node_set_driver(out_driver, in_driver); - - return 0; -} - static bool pw_node_can_reach(struct pw_node *output, struct pw_node *input) { struct pw_port *p; @@ -1288,8 +1268,6 @@ struct pw_link *pw_link_new(struct pw_core *core, output_node, output->port_id, this->rt.out_mix.port.port_id, input_node, input->port_id, this->rt.in_mix.port.port_id); - find_driver(this); - pw_port_emit_link_added(output, this); pw_port_emit_link_added(input, this); @@ -1297,6 +1275,8 @@ struct pw_link *pw_link_new(struct pw_core *core, pw_node_emit_peer_added(output_node, input_node); + pw_core_recalc_graph(core); + return this; no_io: @@ -1413,6 +1393,8 @@ void pw_link_destroy(struct pw_link *link) if (link->properties) pw_properties_free(link->properties); + pw_core_recalc_graph(link->core); + free(link->info.format); free(impl); } diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 92752fb93..903484637 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -43,9 +43,6 @@ #include "pipewire/type.h" #include "pipewire/work-queue.h" -#define DEFAULT_QUANTUM 1024u -#define MIN_QUANTUM 64u - /** \cond */ struct impl { struct pw_node this; @@ -541,39 +538,13 @@ do_move_nodes(struct spa_loop *loop, struct impl *src = user_data; struct pw_node *driver = *(struct pw_node **)data; struct pw_node *this = &src->this; - struct pw_node_target *n, *t; - pw_log_trace("node %p: driver:%p->%p", this, this, driver); + pw_log_trace("node %p: driver:%p->%p", this, this->driver_node, driver); if (this->source.loop != NULL) { remove_node(this); add_node(this, driver); } - - spa_list_for_each_safe(n, t, &this->rt.target_list, link) { - struct pw_node *node = n->node; - if (node && node->source.loop != NULL) { - remove_node(node); - add_node(node, driver); - } - } - return 0; -} - -static int recalc_quantum(struct pw_node *driver) -{ - uint32_t quantum = DEFAULT_QUANTUM; - struct pw_node *n; - - spa_list_for_each(n, &driver->driver_list, driver_link) { - if (n->quantum_size > 0 && n->quantum_size < quantum) - quantum = n->quantum_size; - } - quantum = SPA_MAX(quantum, MIN_QUANTUM); - if (driver->rt.position && quantum != driver->rt.position->size) { - driver->rt.position->size = quantum; - pw_log_info("node %p: driver quantum %d", driver, quantum); - } return 0; } @@ -581,57 +552,31 @@ SPA_EXPORT int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - struct pw_node *n, *old; - struct spa_list nodes; + struct pw_node *old = node->driver_node; int res; - old = node->driver_node; - if (driver == NULL) driver = node; - /* first move this node to a new list */ - spa_list_init(&nodes); - spa_list_remove(&node->driver_link); - spa_list_append(&nodes, &node->driver_link); - /* move all nodes managed by this node to list */ - spa_list_insert_list(&nodes, &node->driver_list); - /* clear current node list */ - spa_list_init(&node->driver_list); + if (old == driver) + return 0; - pw_log_debug("node %p: driver:%p current:%p", node, driver, old); + node->driver_node = driver; + pw_node_emit_driver_changed(node, old, driver); - /* now move all nodes to the (new) driver node */ - spa_list_consume(n, &nodes, driver_link) { - old = n->driver_node; - - pw_log_debug("driver %p: add %p old %p", driver, n, old); - - spa_list_remove(&n->driver_link); - spa_list_append(&driver->driver_list, &n->driver_link); - if (n->driver_node == driver) - continue; - - n->driver_node = driver; - pw_node_emit_driver_changed(n, old, driver); - - if ((res = spa_node_set_io(n->node, - SPA_IO_Position, - &driver->rt.activation->position, - sizeof(struct spa_io_position))) < 0) { - pw_log_warn("node %p: set position %s", n, spa_strerror(res)); - } else { - pw_log_trace("node %p: set position %p", n, &driver->rt.activation->position); - n->rt.position = &driver->rt.activation->position; - } + if ((res = spa_node_set_io(node->node, + SPA_IO_Position, + &driver->rt.activation->position, + sizeof(struct spa_io_position))) < 0) { + pw_log_warn("node %p: set position %s", node, spa_strerror(res)); + } else { + pw_log_trace("node %p: set position %p", node, &driver->rt.activation->position); + node->rt.position = &driver->rt.activation->position; } - recalc_quantum(driver); - pw_loop_invoke(node->data_loop, do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *), true, impl); - return 0; } @@ -649,6 +594,7 @@ static void check_properties(struct pw_node *node) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); const char *str; + bool driver; if ((str = pw_properties_get(node->properties, "node.pause-on-idle"))) impl->pause_on_idle = pw_properties_parse_bool(str); @@ -656,9 +602,17 @@ static void check_properties(struct pw_node *node) impl->pause_on_idle = true; if ((str = pw_properties_get(node->properties, "node.driver"))) - node->driver = pw_properties_parse_bool(str); + driver = pw_properties_parse_bool(str); else - node->driver = false; + driver = false; + + if (node->driver != driver) { + node->driver = driver; + if (driver) + spa_list_append(&node->core->driver_list, &node->core_driver_link); + else + spa_list_remove(&node->core_driver_link); + } if ((str = pw_properties_get(node->properties, "node.latency"))) { uint32_t num, denom; @@ -1165,7 +1119,6 @@ SPA_EXPORT void pw_node_destroy(struct pw_node *node) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - struct pw_node *n; struct pw_port *port; pw_log_debug("node %p: destroy", impl); @@ -1176,15 +1129,12 @@ void pw_node_destroy(struct pw_node *node) pw_log_debug("node %p: driver node %p", impl, node->driver_node); + if (node->driver) + spa_list_remove(&node->core_driver_link); + /* remove ourself from the (other) driver node */ spa_list_remove(&node->driver_link); - recalc_quantum(node->driver_node); - - /* move all nodes driven by us to their own driver */ - spa_list_consume(n, &node->driver_list, driver_link) - pw_node_set_driver(n, NULL); - if (node->registered) spa_list_remove(&node->link); @@ -1471,18 +1421,17 @@ int pw_node_set_active(struct pw_node *node, bool active) if (old != active) { pw_log_debug("node %p: %s", node, active ? "activate" : "deactivate"); + + if (!active) + pw_node_set_state(node, PW_NODE_STATE_IDLE); + node->active = active; pw_node_emit_active_changed(node, active); - if (active) { - if (node->enabled) - node_activate(node); - } - else { - node->active = true; - pw_node_set_state(node, PW_NODE_STATE_IDLE); - node->active = false; - } + if (active && node->enabled) + node_activate(node); + + pw_core_recalc_graph(node->core); } return 0; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 69814c7b5..267233c3f 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -45,6 +45,9 @@ extern "C" { #define spa_debug pw_log_trace #endif +#define DEFAULT_QUANTUM 1024u +#define MIN_QUANTUM 64u + #define MAX_PARAMS 32 struct pw_command; @@ -185,6 +188,7 @@ struct pw_core { struct spa_list link_list; /**< list of links */ struct spa_list control_list[2]; /**< list of controls, indexed by direction */ struct spa_list export_list; /**< list of export types */ + struct spa_list driver_list; /**< list of driver nodes */ struct spa_hook_list listener_list; @@ -370,13 +374,17 @@ struct pw_node { int remote:1; /**< if the node is implemented remotely */ int master:1; /**< a master node is one of the driver nodes that * is selected to drive the graph */ + int visited:1; /**< for sorting */ uint32_t port_user_data_size; /**< extra size for port user data */ + struct spa_list core_driver_link; struct pw_node *driver_node; struct spa_list driver_list; struct spa_list driver_link; + struct spa_list sort_link; /**< link used to sort nodes */ + struct spa_node *node; /**< SPA node implementation */ struct spa_hook listener; @@ -744,6 +752,8 @@ pw_core_find_port(struct pw_core *core, const struct pw_export_type *pw_core_find_export_type(struct pw_core *core, uint32_t type); +int pw_core_recalc_graph(struct pw_core *core); + /** Create a new port \memberof pw_port * \return a newly allocated port */ struct pw_port *