node: remove and add node from graph in idle/running

Add the node and all links to the graph when going to running and
remove them again when idle.
This commit is contained in:
Wim Taymans 2018-10-02 05:10:55 +02:00
parent cd1c7b74af
commit 0f69a7b767
6 changed files with 121 additions and 105 deletions

View file

@ -241,7 +241,7 @@ spa_graph_node_add(struct spa_graph *graph,
static inline void spa_graph_node_remove(struct spa_graph_node *node)
{
spa_debug("node %p remove", node);
spa_debug("node %p remove from graph %p", node, node->graph);
spa_graph_link_remove(&node->graph_link);
spa_list_remove(&node->link);
}

View file

@ -625,7 +625,9 @@ static int impl_node_process(struct spa_node *node)
input = this->io;
spa_return_val_if_fail(input != NULL, -EIO);
spa_log_trace(this->log, NAME " %p: process %d %d", this, input->status, input->buffer_id);
spa_log_trace(this->log, NAME " %p: process %d %d/%d", this, input->status,
input->buffer_id,
this->n_buffers);
if (input->status == SPA_STATUS_HAVE_BUFFER &&
input->buffer_id < this->n_buffers) {

View file

@ -788,7 +788,7 @@ static int impl_node_process(struct spa_node *node)
trigger = status & SPA_STATUS_HAVE_BUFFER;
if (trigger && !impl->this.node->driver)
spa_graph_run(impl->client_node->node->rt.root.graph);
spa_graph_run(impl->client_node->node->rt.driver);
return status;
}
@ -880,7 +880,7 @@ static void client_node_initialized(void *data)
else
exclusive = false;
impl->client_node->node->rt.driver = impl->this.node->rt.driver;
spa_graph_node_add(impl->client_node->node->rt.driver, &impl->client_node->node->rt.root);
impl->client_port = pw_node_find_port(impl->client_node->node, impl->direction, 0);
if (impl->client_port == NULL)

View file

@ -71,12 +71,46 @@ struct resource_data {
/** \endcond */
static void node_deactivate(struct pw_node *this)
{
struct pw_port *port;
static int do_pause_node(struct pw_node *this)
pw_log_debug("node %p: deactivate", this);
spa_list_for_each(port, &this->input_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, input_link)
pw_link_deactivate(link);
}
spa_list_for_each(port, &this->output_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, output_link)
pw_link_deactivate(link);
}
}
static int
do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
if (this->rt.root.graph != NULL) {
spa_graph_node_remove(&this->rt.root);
this->rt.root.graph = NULL;
}
return 0;
}
static int pause_node(struct pw_node *this)
{
int res = 0;
if (this->info.state <= PW_NODE_STATE_IDLE)
return 0;
pw_log_debug("node %p: pause node", this);
node_deactivate(this);
pw_loop_invoke(this->data_loop, do_node_remove, 1, NULL, 0, true, this);
res = spa_node_send_command(this->node,
&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Pause));
if (res < 0)
@ -85,18 +119,23 @@ static int do_pause_node(struct pw_node *this)
return res;
}
static int pause_node(struct pw_node *this)
static int
do_node_add(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
if (this->info.state <= PW_NODE_STATE_IDLE)
return 0;
return do_pause_node(this);
struct pw_node *this = user_data;
if (this->rt.root.graph == NULL)
spa_graph_node_add(this->driver_node->rt.driver, &this->rt.root);
return 0;
}
static int start_node(struct pw_node *this)
{
int res = 0;
if (this->info.state >= PW_NODE_STATE_RUNNING)
return 0;
pw_log_debug("node %p: start node", this);
res = spa_node_send_command(this->node,
&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Start));
@ -129,6 +168,48 @@ static int suspend_node(struct pw_node *this)
return res;
}
static void node_update_state(struct pw_node *node, enum pw_node_state state, char *error)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
enum pw_node_state old;
struct pw_resource *resource;
old = node->info.state;
if (old == state)
return;
if (state == PW_NODE_STATE_ERROR) {
pw_log_error("node %p: update state from %s -> error (%s)", node,
pw_node_state_as_string(old), error);
} else {
pw_log_debug("node %p: update state from %s -> %s", node,
pw_node_state_as_string(old), pw_node_state_as_string(state));
}
if (node->info.error)
free((char*)node->info.error);
node->info.error = error;
node->info.state = state;
switch (state) {
case PW_NODE_STATE_RUNNING:
pw_loop_invoke(node->data_loop, do_node_add, 1, NULL, 0, true, node);
break;
default:
break;
}
pw_node_events_state_changed(node, old, state, error);
node->info.change_mask |= PW_NODE_CHANGE_MASK_STATE;
pw_node_events_info_changed(node, &node->info);
spa_list_for_each(resource, &node->resource_list, link)
pw_node_resource_info(resource, &node->info);
node->info.change_mask = 0;
}
static void node_unbind_func(void *data)
{
struct pw_resource *resource = data;
@ -405,7 +486,7 @@ int pw_node_initialized(struct pw_node *this)
{
pw_log_debug("node %p initialized", this);
pw_node_events_initialized(this);
pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL);
node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL);
return 0;
}
@ -418,8 +499,15 @@ do_move_nodes(struct spa_loop *loop,
struct impl *dst = *(struct impl **)data;
struct spa_graph_node *n, *t;
spa_graph_node_remove(&this->rt.root);
spa_graph_node_add(&dst->driver_graph, &this->rt.root);
pw_log_trace("node %p: root %p driver:%p", this, &this->rt.root, &dst->driver_graph);
if (this->rt.root.graph != NULL) {
spa_graph_node_remove(&this->rt.root);
spa_graph_node_add(&dst->driver_graph, &this->rt.root);
}
if (&src->driver_graph == &dst->driver_graph)
return 0;
spa_list_for_each_safe(n, t, &src->driver_graph.nodes, link) {
spa_graph_node_remove(n);
@ -445,25 +533,28 @@ static int recalc_quantum(struct pw_node *driver)
int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
struct pw_node *n, *t;
struct pw_node *n, *t, *old;
pw_log_debug("node %p: driver:%p current:%p", node, driver, node->driver_node);
old = node->driver_node;
pw_log_debug("node %p: driver:%p current:%p", node, driver, old);
if (driver == NULL)
driver = node;
if (node->driver_node == driver)
return 0;
spa_list_remove(&node->driver_link);
spa_list_append(&driver->driver_list, &node->driver_link);
node->driver_node = driver;
spa_list_for_each_safe(n, t, &node->driver_list, driver_link) {
pw_log_debug("driver %p: add %p old %p", driver, n, n->driver_node);
if (n->driver_node == driver)
continue;
spa_list_remove(&n->driver_link);
spa_list_append(&driver->driver_list, &n->driver_link);
n->driver_node = driver;
pw_node_events_driver_changed(n, driver);
pw_log_debug("node %p: add %p", driver, n);
}
recalc_quantum(driver);
@ -472,7 +563,10 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *),
true, impl);
pw_node_events_driver_changed(node, driver);
if (old != driver) {
node->driver_node = driver;
pw_node_events_driver_changed(node, driver);
}
return 0;
}
@ -592,7 +686,6 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->driver_node = this;
spa_list_append(&this->driver_list, &this->driver_link);
spa_graph_node_add(&impl->driver_graph, &this->rt.root);
return this;
@ -755,15 +848,6 @@ void pw_node_add_listener(struct pw_node *node,
spa_hook_list_append(&node->listener_list, listener, events, data);
}
static int
do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
spa_graph_node_remove(&this->rt.root);
return 0;
}
/** Destroy a node
* \param node a node to destroy
*
@ -793,7 +877,6 @@ void pw_node_destroy(struct pw_node *node)
if (node->driver_node != node) {
/* remove ourself from the (other) driver node */
spa_list_remove(&node->driver_link);
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
recalc_quantum(node->driver_node);
}
@ -966,24 +1049,7 @@ static void on_state_complete(struct pw_node *node, void *data, int res)
asprintf(&error, "error changing node state: %d", res);
state = PW_NODE_STATE_ERROR;
}
pw_node_update_state(node, state, error);
}
static void node_deactivate(struct pw_node *this)
{
struct pw_port *port;
pw_log_debug("node %p: deactivate", this);
spa_list_for_each(port, &this->input_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, input_link)
pw_link_deactivate(link);
}
spa_list_for_each(port, &this->output_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, output_link)
pw_link_deactivate(link);
}
node_update_state(node, state, error);
}
static void node_activate(struct pw_node *this)
@ -1037,7 +1103,7 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state)
break;
case PW_NODE_STATE_IDLE:
if (!node->active)
if (node->active && impl->pause_on_idle)
res = pause_node(node);
break;
@ -1060,56 +1126,6 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state)
return res;
}
/** Update the node state
* \param node a \ref pw_node
* \param state a \ref pw_node_state
* \param error error when \a state is \ref PW_NODE_STATE_ERROR
*
* Update the state of a node. This method is used from inside \a node
* itself.
*
* \memberof pw_node
*/
void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char *error)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
enum pw_node_state old;
struct pw_resource *resource;
old = node->info.state;
if (old == state)
return;
if (state == PW_NODE_STATE_ERROR) {
pw_log_error("node %p: update state from %s -> error (%s)", node,
pw_node_state_as_string(old), error);
} else {
pw_log_debug("node %p: update state from %s -> %s", node,
pw_node_state_as_string(old), pw_node_state_as_string(state));
}
if (node->info.error)
free((char*)node->info.error);
node->info.error = error;
node->info.state = state;
if (state == PW_NODE_STATE_IDLE) {
if (impl->pause_on_idle)
do_pause_node(node);
node_deactivate(node);
}
pw_node_events_state_changed(node, old, state, error);
node->info.change_mask |= PW_NODE_CHANGE_MASK_STATE;
pw_node_events_info_changed(node, &node->info);
spa_list_for_each(resource, &node->resource_list, link)
pw_node_resource_info(resource, &node->info);
node->info.change_mask = 0;
}
int pw_node_set_active(struct pw_node *node, bool active)
{
bool old = node->active;

View file

@ -741,9 +741,6 @@ int pw_port_send_command(struct pw_port *port, bool block, const struct spa_comm
/** Change the state of the node */
int pw_node_set_state(struct pw_node *node, enum pw_node_state state);
/** Update the state of the node, mostly used by node implementations */
void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char *error);
int pw_node_update_ports(struct pw_node *node);
int pw_node_initialized(struct pw_node *node);

View file

@ -503,7 +503,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
{
struct pw_proxy *proxy = user_data;
struct node_data *data = proxy->user_data;
struct spa_graph_node *node = &data->node->rt.node;
struct spa_graph_node *node = &data->node->rt.root;
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pw_log_warn("got error");
@ -518,7 +518,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd);
pw_log_trace("remote %p: process", data->remote);
spa_graph_run(node->graph->parent->graph);
spa_graph_run(node->graph);
}
}
@ -1405,6 +1405,7 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
data->callbacks = *node->rt.root.callbacks;
spa_graph_node_set_callbacks(&node->rt.root, &impl_root, data);
spa_graph_link_add(&node->rt.root, &data->state, &data->link);
spa_graph_node_add(node->rt.driver, &node->rt.root);
node->exported = true;