From 0f69a7b7675892dc760ff153fbbc50e62391fbab Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 2 Oct 2018 05:10:55 +0200 Subject: [PATCH] node: remove and add node from graph in idle/running Add the node and all links to the graph when going to running and remove them again when idle. --- spa/include/spa/graph/graph.h | 2 +- spa/plugins/alsa/alsa-sink.c | 4 +- .../module-client-node/client-stream.c | 4 +- src/pipewire/node.c | 208 ++++++++++-------- src/pipewire/private.h | 3 - src/pipewire/remote.c | 5 +- 6 files changed, 121 insertions(+), 105 deletions(-) diff --git a/spa/include/spa/graph/graph.h b/spa/include/spa/graph/graph.h index 5b6b45596..79adb20c1 100644 --- a/spa/include/spa/graph/graph.h +++ b/spa/include/spa/graph/graph.h @@ -241,7 +241,7 @@ spa_graph_node_add(struct spa_graph *graph, static inline void spa_graph_node_remove(struct spa_graph_node *node) { - spa_debug("node %p remove", node); + spa_debug("node %p remove from graph %p", node, node->graph); spa_graph_link_remove(&node->graph_link); spa_list_remove(&node->link); } diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index ec41b0ca5..dccdce9be 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -625,7 +625,9 @@ static int impl_node_process(struct spa_node *node) input = this->io; spa_return_val_if_fail(input != NULL, -EIO); - spa_log_trace(this->log, NAME " %p: process %d %d", this, input->status, input->buffer_id); + spa_log_trace(this->log, NAME " %p: process %d %d/%d", this, input->status, + input->buffer_id, + this->n_buffers); if (input->status == SPA_STATUS_HAVE_BUFFER && input->buffer_id < this->n_buffers) { diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index c5416ac5f..2fe53c286 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -788,7 +788,7 @@ static int impl_node_process(struct spa_node *node) trigger = status & SPA_STATUS_HAVE_BUFFER; if (trigger && !impl->this.node->driver) - spa_graph_run(impl->client_node->node->rt.root.graph); + spa_graph_run(impl->client_node->node->rt.driver); return status; } @@ -880,7 +880,7 @@ static void client_node_initialized(void *data) else exclusive = false; - impl->client_node->node->rt.driver = impl->this.node->rt.driver; + spa_graph_node_add(impl->client_node->node->rt.driver, &impl->client_node->node->rt.root); impl->client_port = pw_node_find_port(impl->client_node->node, impl->direction, 0); if (impl->client_port == NULL) diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 56b02c65c..ce5d02316 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -71,12 +71,46 @@ struct resource_data { /** \endcond */ +static void node_deactivate(struct pw_node *this) +{ + struct pw_port *port; -static int do_pause_node(struct pw_node *this) + pw_log_debug("node %p: deactivate", this); + spa_list_for_each(port, &this->input_ports, link) { + struct pw_link *link; + spa_list_for_each(link, &port->links, input_link) + pw_link_deactivate(link); + } + spa_list_for_each(port, &this->output_ports, link) { + struct pw_link *link; + spa_list_for_each(link, &port->links, output_link) + pw_link_deactivate(link); + } +} + +static int +do_node_remove(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct pw_node *this = user_data; + if (this->rt.root.graph != NULL) { + spa_graph_node_remove(&this->rt.root); + this->rt.root.graph = NULL; + } + return 0; +} + +static int pause_node(struct pw_node *this) { int res = 0; + if (this->info.state <= PW_NODE_STATE_IDLE) + return 0; + pw_log_debug("node %p: pause node", this); + node_deactivate(this); + pw_loop_invoke(this->data_loop, do_node_remove, 1, NULL, 0, true, this); + res = spa_node_send_command(this->node, &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Pause)); if (res < 0) @@ -85,18 +119,23 @@ static int do_pause_node(struct pw_node *this) return res; } -static int pause_node(struct pw_node *this) +static int +do_node_add(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) { - if (this->info.state <= PW_NODE_STATE_IDLE) - return 0; - - return do_pause_node(this); + struct pw_node *this = user_data; + if (this->rt.root.graph == NULL) + spa_graph_node_add(this->driver_node->rt.driver, &this->rt.root); + return 0; } static int start_node(struct pw_node *this) { int res = 0; + if (this->info.state >= PW_NODE_STATE_RUNNING) + return 0; + pw_log_debug("node %p: start node", this); res = spa_node_send_command(this->node, &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Start)); @@ -129,6 +168,48 @@ static int suspend_node(struct pw_node *this) return res; } +static void node_update_state(struct pw_node *node, enum pw_node_state state, char *error) +{ + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + enum pw_node_state old; + struct pw_resource *resource; + + old = node->info.state; + if (old == state) + return; + + if (state == PW_NODE_STATE_ERROR) { + pw_log_error("node %p: update state from %s -> error (%s)", node, + pw_node_state_as_string(old), error); + } else { + pw_log_debug("node %p: update state from %s -> %s", node, + pw_node_state_as_string(old), pw_node_state_as_string(state)); + } + + if (node->info.error) + free((char*)node->info.error); + node->info.error = error; + node->info.state = state; + + switch (state) { + case PW_NODE_STATE_RUNNING: + pw_loop_invoke(node->data_loop, do_node_add, 1, NULL, 0, true, node); + break; + default: + break; + } + + pw_node_events_state_changed(node, old, state, error); + + node->info.change_mask |= PW_NODE_CHANGE_MASK_STATE; + pw_node_events_info_changed(node, &node->info); + + spa_list_for_each(resource, &node->resource_list, link) + pw_node_resource_info(resource, &node->info); + + node->info.change_mask = 0; +} + static void node_unbind_func(void *data) { struct pw_resource *resource = data; @@ -405,7 +486,7 @@ int pw_node_initialized(struct pw_node *this) { pw_log_debug("node %p initialized", this); pw_node_events_initialized(this); - pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL); + node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL); return 0; } @@ -418,8 +499,15 @@ do_move_nodes(struct spa_loop *loop, struct impl *dst = *(struct impl **)data; struct spa_graph_node *n, *t; - spa_graph_node_remove(&this->rt.root); - spa_graph_node_add(&dst->driver_graph, &this->rt.root); + pw_log_trace("node %p: root %p driver:%p", this, &this->rt.root, &dst->driver_graph); + + if (this->rt.root.graph != NULL) { + spa_graph_node_remove(&this->rt.root); + spa_graph_node_add(&dst->driver_graph, &this->rt.root); + } + + if (&src->driver_graph == &dst->driver_graph) + return 0; spa_list_for_each_safe(n, t, &src->driver_graph.nodes, link) { spa_graph_node_remove(n); @@ -445,25 +533,28 @@ static int recalc_quantum(struct pw_node *driver) int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - struct pw_node *n, *t; + struct pw_node *n, *t, *old; - pw_log_debug("node %p: driver:%p current:%p", node, driver, node->driver_node); + old = node->driver_node; + + pw_log_debug("node %p: driver:%p current:%p", node, driver, old); if (driver == NULL) driver = node; - if (node->driver_node == driver) - return 0; spa_list_remove(&node->driver_link); spa_list_append(&driver->driver_list, &node->driver_link); - node->driver_node = driver; spa_list_for_each_safe(n, t, &node->driver_list, driver_link) { + pw_log_debug("driver %p: add %p old %p", driver, n, n->driver_node); + + if (n->driver_node == driver) + continue; + spa_list_remove(&n->driver_link); spa_list_append(&driver->driver_list, &n->driver_link); n->driver_node = driver; pw_node_events_driver_changed(n, driver); - pw_log_debug("node %p: add %p", driver, n); } recalc_quantum(driver); @@ -472,7 +563,10 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *), true, impl); - pw_node_events_driver_changed(node, driver); + if (old != driver) { + node->driver_node = driver; + pw_node_events_driver_changed(node, driver); + } return 0; } @@ -592,7 +686,6 @@ struct pw_node *pw_node_new(struct pw_core *core, this->driver_node = this; spa_list_append(&this->driver_list, &this->driver_link); - spa_graph_node_add(&impl->driver_graph, &this->rt.root); return this; @@ -755,15 +848,6 @@ void pw_node_add_listener(struct pw_node *node, spa_hook_list_append(&node->listener_list, listener, events, data); } -static int -do_node_remove(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct pw_node *this = user_data; - spa_graph_node_remove(&this->rt.root); - return 0; -} - /** Destroy a node * \param node a node to destroy * @@ -793,7 +877,6 @@ void pw_node_destroy(struct pw_node *node) if (node->driver_node != node) { /* remove ourself from the (other) driver node */ spa_list_remove(&node->driver_link); - pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node); recalc_quantum(node->driver_node); } @@ -966,24 +1049,7 @@ static void on_state_complete(struct pw_node *node, void *data, int res) asprintf(&error, "error changing node state: %d", res); state = PW_NODE_STATE_ERROR; } - pw_node_update_state(node, state, error); -} - -static void node_deactivate(struct pw_node *this) -{ - struct pw_port *port; - - pw_log_debug("node %p: deactivate", this); - spa_list_for_each(port, &this->input_ports, link) { - struct pw_link *link; - spa_list_for_each(link, &port->links, input_link) - pw_link_deactivate(link); - } - spa_list_for_each(port, &this->output_ports, link) { - struct pw_link *link; - spa_list_for_each(link, &port->links, output_link) - pw_link_deactivate(link); - } + node_update_state(node, state, error); } static void node_activate(struct pw_node *this) @@ -1037,7 +1103,7 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state) break; case PW_NODE_STATE_IDLE: - if (!node->active) + if (node->active && impl->pause_on_idle) res = pause_node(node); break; @@ -1060,56 +1126,6 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state) return res; } -/** Update the node state - * \param node a \ref pw_node - * \param state a \ref pw_node_state - * \param error error when \a state is \ref PW_NODE_STATE_ERROR - * - * Update the state of a node. This method is used from inside \a node - * itself. - * - * \memberof pw_node - */ -void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char *error) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - enum pw_node_state old; - struct pw_resource *resource; - - old = node->info.state; - if (old == state) - return; - - if (state == PW_NODE_STATE_ERROR) { - pw_log_error("node %p: update state from %s -> error (%s)", node, - pw_node_state_as_string(old), error); - } else { - pw_log_debug("node %p: update state from %s -> %s", node, - pw_node_state_as_string(old), pw_node_state_as_string(state)); - } - - if (node->info.error) - free((char*)node->info.error); - node->info.error = error; - node->info.state = state; - - if (state == PW_NODE_STATE_IDLE) { - if (impl->pause_on_idle) - do_pause_node(node); - node_deactivate(node); - } - - pw_node_events_state_changed(node, old, state, error); - - node->info.change_mask |= PW_NODE_CHANGE_MASK_STATE; - pw_node_events_info_changed(node, &node->info); - - spa_list_for_each(resource, &node->resource_list, link) - pw_node_resource_info(resource, &node->info); - - node->info.change_mask = 0; -} - int pw_node_set_active(struct pw_node *node, bool active) { bool old = node->active; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index aa3f40927..3c7961c8c 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -741,9 +741,6 @@ int pw_port_send_command(struct pw_port *port, bool block, const struct spa_comm /** Change the state of the node */ int pw_node_set_state(struct pw_node *node, enum pw_node_state state); -/** Update the state of the node, mostly used by node implementations */ -void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char *error); - int pw_node_update_ports(struct pw_node *node); int pw_node_initialized(struct pw_node *node); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 98753d873..391fe0fac 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -503,7 +503,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) { struct pw_proxy *proxy = user_data; struct node_data *data = proxy->user_data; - struct spa_graph_node *node = &data->node->rt.node; + struct spa_graph_node *node = &data->node->rt.root; if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_warn("got error"); @@ -518,7 +518,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd); pw_log_trace("remote %p: process", data->remote); - spa_graph_run(node->graph->parent->graph); + spa_graph_run(node->graph); } } @@ -1405,6 +1405,7 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, data->callbacks = *node->rt.root.callbacks; spa_graph_node_set_callbacks(&node->rt.root, &impl_root, data); spa_graph_link_add(&node->rt.root, &data->state, &data->link); + spa_graph_node_add(node->rt.driver, &node->rt.root); node->exported = true;