From ef9fcd10937bbaa553044bee10b308652a7cecfd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 17 May 2018 17:40:08 +0200 Subject: [PATCH] node: keep better track of driver node When a session disappears, try to move nodes to another session. --- spa/include/spa/graph/graph.h | 1 - .../module-client-node/client-stream.c | 13 ++- src/modules/module-media-session.c | 53 +++++++---- src/pipewire/link.c | 54 ++++-------- src/pipewire/node.c | 88 ++++++++++++++++--- src/pipewire/node.h | 3 + src/pipewire/private.h | 4 + 7 files changed, 144 insertions(+), 72 deletions(-) diff --git a/spa/include/spa/graph/graph.h b/spa/include/spa/graph/graph.h index 6562de562..70e20bef6 100644 --- a/spa/include/spa/graph/graph.h +++ b/spa/include/spa/graph/graph.h @@ -84,7 +84,6 @@ struct spa_graph_callbacks { #define spa_graph_finish(g) ((g)->callbacks->finish((g)->callbacks_data)) struct spa_graph { -#define SPA_GRAPH_FLAG_DRIVER (1 << 0) uint32_t flags; /* flags */ struct spa_graph_node *parent; /* parent node or NULL when driver */ struct spa_graph_state *state; /* state of graph */ diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index f180a4e7c..7c45ceafd 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -689,11 +689,9 @@ static int impl_node_process(struct spa_node *node) struct impl *impl = this->impl; int status; - impl->client_node->node->driver_node = impl->this.node->driver_node; impl->ctrl.min_size = impl->ctrl.max_size = impl->this.node->driver_node->rt.quantum->size; - spa_log_trace(this->log, "%p: process %d", this, impl->ctrl.max_size); if (impl->use_converter) { @@ -897,8 +895,9 @@ static void client_node_destroy(void *data) struct impl *impl = data; pw_log_debug("client-stream %p: destroy", &impl->this); - spa_hook_remove(&impl->client_node_listener); + pw_node_set_driver(impl->client_node->node, NULL); + spa_hook_remove(&impl->client_node_listener); spa_hook_remove(&impl->node_listener); pw_node_destroy(impl->this.node); cleanup(impl); @@ -935,14 +934,22 @@ static void node_destroy(void *data) pw_log_debug("client-stream %p: destroy", &impl->this); + spa_hook_remove(&impl->node_listener); spa_hook_remove(&impl->client_node_listener); pw_client_node_destroy(impl->client_node); cleanup(impl); } +static void node_driver_changed(void *data, struct pw_node *driver) +{ + struct impl *impl = data; + impl->client_node->node->driver_node = driver; +} + static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .destroy = node_destroy, + .driver_changed = node_driver_changed, }; /** Create a new client stream diff --git a/src/modules/module-media-session.c b/src/modules/module-media-session.c index af8a73037..acbc11a55 100644 --- a/src/modules/module-media-session.c +++ b/src/modules/module-media-session.c @@ -126,6 +126,9 @@ struct link_data { }; +static int handle_autoconnect(struct impl *impl, struct pw_node *node, + const struct pw_properties *props); + /** \endcond */ static void link_data_remove(struct link_data *data) @@ -151,10 +154,17 @@ static void session_destroy(struct session *sess) spa_list_remove(&sess->l); spa_hook_remove(&sess->node_listener); - spa_list_for_each_safe(ni, t, &sess->node_list, l) - node_info_free(ni); - if (sess->dsp) + if (sess->dsp) { + spa_hook_remove(&sess->dsp_listener); pw_node_destroy(sess->dsp); + } + spa_list_for_each_safe(ni, t, &sess->node_list, l) { + pw_node_set_state(ni->node, PW_NODE_STATE_SUSPENDED); + pw_node_set_driver(ni->node, NULL); + handle_autoconnect(ni->impl, ni->node, + pw_node_get_properties(ni->node)); + node_info_free(ni); + } free(sess); } @@ -306,7 +316,7 @@ static void reconfigure_session(struct session *sess) sess->node->rt.quantum->rate.num = 1; sess->node->rt.quantum->rate.denom = sess->sample_rate; - sess->node->rt.quantum->size = buffer_size; + sess->node->rt.quantum->size = sess->buffer_size; pw_log_info("module %p: driver node:%p quantum:%d/%d", impl, sess->node, sess->sample_rate, buffer_size); @@ -358,6 +368,8 @@ static int link_session_dsp(struct session *session) } pw_link_register(session->link, NULL, pw_module_get_global(impl->module), NULL); + reconfigure_session(session); + return 0; } @@ -412,7 +424,7 @@ static int find_session(void *data, struct session *sess) return 0; } -static void handle_autoconnect(struct impl *impl, struct pw_node *node, +static int handle_autoconnect(struct impl *impl, struct pw_node *node, const struct pw_properties *props) { struct pw_node *peer; @@ -423,6 +435,11 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node, struct session *session; struct node_info *info; uint32_t sample_rate, buffer_size; + int res; + + str = pw_properties_get(props, PW_NODE_PROP_AUTOCONNECT); + if (str == NULL || !pw_properties_parse_bool(str)) + return 0; sample_rate = DEFAULT_SAMPLE_RATE; @@ -455,16 +472,16 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node, else if (strcmp(category, "Capture") == 0) find.media_class = "Audio/Source"; else - return; + return -EINVAL; } else if (strcmp(media, "Video") == 0) { if (strcmp(category, "Capture") == 0) find.media_class = "Video/Source"; else - return; + return -EINVAL; } else - return; + return -EINVAL; str = pw_properties_get(props, PW_NODE_PROP_TARGET_NODE); if (str != NULL) @@ -479,7 +496,7 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node, spa_list_for_each(session, &impl->session_list, l) find_session(&find, session); if (find.sess == NULL) - return; + return -ENOENT; session = find.sess; @@ -488,24 +505,24 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node, else if (strcmp(category, "Playback") == 0) direction = PW_DIRECTION_INPUT; else - return; + return -EINVAL; if (exclusive || session->dsp == NULL) { if (exclusive && !spa_list_is_empty(&session->node_list)) { pw_log_warn("session busy, can't get exclusive access"); - return; + return -EBUSY; } if (session->link != NULL) { pw_log_warn("session busy with DSP"); - return; + return -EBUSY; } peer = session->node; session->exclusive = exclusive; } else { if (session->link == NULL) { - if (link_session_dsp(session) < 0) - return; + if ((res = link_session_dsp(session)) < 0) + return res; } peer = session->dsp; } @@ -520,13 +537,15 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node, info->buffer_size = buffer_size; spa_list_init(&info->links); - spa_list_append(&info->session->node_list, &info->l); + spa_list_append(&session->node_list, &info->l); pw_node_add_listener(node, &info->node_listener, &node_info_events, info); pw_node_for_each_port(peer, direction, on_peer_port, info); reconfigure_session(session); + + return 1; } static void node_destroy(void *data) @@ -647,9 +666,7 @@ static int on_global(void *data, struct pw_global *global) properties = pw_node_get_properties(node); - str = pw_properties_get(properties, PW_NODE_PROP_AUTOCONNECT); - if (str != NULL && pw_properties_parse_bool(str)) { - handle_autoconnect(impl, node, properties); + if (handle_autoconnect(impl, node, properties) == 1) { return 0; } else if ((str = pw_properties_get(properties, "media.class")) == NULL) diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 27cc87b20..4ba00ad6b 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -791,9 +791,11 @@ do_activate_link(struct spa_loop *loop, spa_graph_port_add(&this->input->rt.mix_node, &this->rt.mix[SPA_DIRECTION_INPUT].port); spa_graph_port_link(&this->rt.mix[SPA_DIRECTION_OUTPUT].port, &this->rt.mix[SPA_DIRECTION_INPUT].port); + this->rt.link.signal_data = &this->input->node->rt.root; spa_graph_link_add(&this->output->node->rt.root, this->input->node->rt.root.state, &this->rt.link); + return 0; } @@ -1032,6 +1034,7 @@ do_deactivate_link(struct spa_loop *loop, spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_OUTPUT].port); spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_INPUT].port); spa_graph_link_remove(&this->rt.link); + this->rt.link.signal_data = NULL; return 0; } @@ -1169,46 +1172,22 @@ move_graph(struct spa_graph *dst, struct spa_graph *src) } } -static int -do_join_graphs(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) +static int find_driver(struct pw_link *this) { - struct pw_link *this = user_data; - struct spa_graph *in_graph, *out_graph; - struct spa_graph_node *in_root, *out_root; + struct pw_node *out_driver, *in_driver; - in_root = &this->input->node->rt.root; - out_root = &this->output->node->rt.root; + out_driver = this->output->node->driver_node; + in_driver = this->input->node->driver_node; - in_graph = in_root->graph; - out_graph = out_root->graph; + pw_log_debug("link %p: drivers %p/%p", this, out_driver, in_driver); - pw_log_debug("link %p: roots %p/%p graphs %p/%p %d/%d", this, - in_root, out_root, in_graph, out_graph, - in_graph->flags, out_graph->flags); + if (out_driver == in_driver) + return 0; - if (in_graph != out_graph) { - struct spa_graph *src, *dst; - bool out_driver; - - out_driver = SPA_FLAG_CHECK(out_graph->flags, SPA_GRAPH_FLAG_DRIVER); - - if (out_driver) { - src = in_graph; - dst = out_graph; - this->input->node->driver_node = this->output->node->driver_node; - pw_log_debug("link %p: in_graph to out_graph %p", this, this->output->node); - } - else { - src = out_graph; - dst = in_graph; - this->output->node->driver_node = this->input->node->driver_node; - pw_log_debug("link %p: out_graph to in_graph %p", this, this->input->node); - } - move_graph(dst, src); - } - this->rt.link.signal = spa_graph_link_signal_node; - this->rt.link.signal_data = in_root; + if (out_driver->driver) + pw_node_set_driver(in_driver, out_driver); + else + pw_node_set_driver(out_driver, in_driver); return 0; } @@ -1291,12 +1270,13 @@ struct pw_link *pw_link_new(struct pw_core *core, pw_port_init_mix(output, &this->rt.mix[SPA_DIRECTION_OUTPUT]); pw_port_init_mix(input, &this->rt.mix[SPA_DIRECTION_INPUT]); + this->rt.link.signal = spa_graph_link_signal_node; + pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl, output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id, input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id); - pw_loop_invoke(output->node->data_loop, - do_join_graphs, SPA_ID_INVALID, NULL, 0, false, this); + find_driver(this); spa_hook_list_call(&output->listener_list, struct pw_port_events, link_added, this); spa_hook_list_call(&input->listener_list, struct pw_port_events, link_added, this); diff --git a/src/pipewire/node.c b/src/pipewire/node.c index c500b3230..180a34290 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -403,6 +403,58 @@ int pw_node_initialized(struct pw_node *this) return 0; } +static int +do_move_nodes(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct impl *src = user_data; + struct pw_node *this = &src->this; + struct impl *dst = *(struct impl **)data; + struct spa_graph_node *n, *t; + + spa_graph_node_remove(&this->rt.root); + spa_graph_node_add(&src->driver_graph, &this->rt.root); + + spa_list_for_each_safe(n, t, &src->driver_graph.nodes, link) { + spa_graph_node_remove(n); + spa_graph_node_add(&dst->driver_graph, n); + } + return 0; +} + +int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) +{ + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + struct pw_node *n, *t; + + pw_log_debug("node %p: driver:%p current:%p", node, driver, node->driver_node); + + if (driver == NULL) + driver = node; + if (node->driver_node == driver) + return 0; + + spa_list_remove(&node->driver_link); + spa_list_append(&driver->driver_list, &node->driver_link); + node->driver_node = driver; + + spa_list_for_each_safe(n, t, &node->driver_list, driver_link) { + spa_list_remove(&n->driver_link); + spa_list_append(&driver->driver_list, &n->driver_link); + n->driver_node = driver; + spa_hook_list_call(&n->listener_list, struct pw_node_events, + driver_changed, driver); + pw_log_debug("node %p: add %p", driver, n); + } + pw_loop_invoke(node->data_loop, + do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *), + true, impl); + + spa_hook_list_call(&node->listener_list, struct pw_node_events, driver_changed, driver); + + return 0; +} + static void check_properties(struct pw_node *node) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); @@ -418,13 +470,6 @@ static void check_properties(struct pw_node *node) else node->driver = false; - if (node->driver) - SPA_FLAG_SET(impl->driver_graph.flags, SPA_GRAPH_FLAG_DRIVER); - else - SPA_FLAG_UNSET(impl->driver_graph.flags, SPA_GRAPH_FLAG_DRIVER); - - node->driver_node = node; - pw_log_debug("node %p: graph %p driver:%d", node, &impl->driver_graph, node->driver); } @@ -478,6 +523,7 @@ struct pw_node *pw_node_new(struct pw_core *core, this->data_loop = core->data_loop; + spa_list_init(&this->driver_list); spa_list_init(&this->resource_list); spa_hook_list_init(&this->listener_list); @@ -490,6 +536,7 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_list_init(&this->output_ports); pw_map_init(&this->output_port_map, 64, 64); + spa_graph_init(&impl->driver_graph, &impl->driver_state); spa_graph_data_init(&impl->driver_data, &impl->driver_graph); spa_graph_set_callbacks(&impl->driver_graph, @@ -498,7 +545,6 @@ struct pw_node *pw_node_new(struct pw_core *core, this->rt.driver = &impl->driver_graph; this->rt.activation = &impl->root_activation; spa_graph_node_init(&this->rt.root, &this->rt.activation->state); - spa_graph_node_add(&impl->driver_graph, &this->rt.root); spa_graph_init(&impl->graph, &impl->graph_state); spa_graph_data_init(&impl->graph_data, &impl->graph); @@ -517,6 +563,10 @@ struct pw_node *pw_node_new(struct pw_core *core, check_properties(this); + this->driver_node = this; + spa_list_append(&this->driver_list, &this->driver_link); + spa_graph_node_add(&impl->driver_graph, &this->rt.root); + return this; no_mem: @@ -682,6 +732,7 @@ void pw_node_destroy(struct pw_node *node) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); struct pw_resource *resource, *tmp; + struct pw_node *n, *t; struct pw_port *port, *tmpp; pw_log_debug("node %p: destroy", impl); @@ -689,12 +740,21 @@ void pw_node_destroy(struct pw_node *node) pause_node(node); - pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node); + pw_log_debug("node %p: driver node %p", impl, node->driver_node); - if (node->registered) { - spa_list_remove(&node->link); + /* move all nodes driven by us to their own driver */ + spa_list_for_each_safe(n, t, &node->driver_list, driver_link) + pw_node_set_driver(n, NULL); + + if (node->driver_node != node) { + /* remove ourself from the (other) driver node */ + spa_list_remove(&node->driver_link); + pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node); } + if (node->registered) + spa_list_remove(&node->link); + pw_log_debug("node %p: unlink ports", node); spa_list_for_each(port, &node->input_ports, link) pw_port_unlink(port); @@ -703,11 +763,13 @@ void pw_node_destroy(struct pw_node *node) pw_log_debug("node %p: destroy ports", node); spa_list_for_each_safe(port, tmpp, &node->input_ports, link) { - spa_hook_list_call(&node->listener_list, struct pw_node_events, port_removed, port); + spa_hook_list_call(&node->listener_list, struct pw_node_events, + port_removed, port); pw_port_destroy(port); } spa_list_for_each_safe(port, tmpp, &node->output_ports, link) { - spa_hook_list_call(&node->listener_list, struct pw_node_events, port_removed, port); + spa_hook_list_call(&node->listener_list, struct pw_node_events, + port_removed, port); pw_port_destroy(port); } diff --git a/src/pipewire/node.h b/src/pipewire/node.h index 637ce6bc7..36687c861 100644 --- a/src/pipewire/node.h +++ b/src/pipewire/node.h @@ -83,6 +83,9 @@ struct pw_node_events { /** an event is emited */ void (*event) (void *data, const struct spa_event *event); + /** the driver of the node changed */ + void (*driver_changed) (void *data, struct pw_node *driver); + /** the node wants to process the graph */ void (*process) (void *data); /** the node has a buffer to reuse */ diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 6d813e949..619b16814 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -265,6 +265,8 @@ struct pw_node { * is selected to drive the graph */ struct pw_node *driver_node; + struct spa_list driver_list; + struct spa_list driver_link; struct spa_clock *clock; /**< handle to SPA clock if any */ struct spa_node *node; /**< SPA node implementation */ @@ -615,6 +617,8 @@ int pw_node_update_ports(struct pw_node *node); int pw_node_initialized(struct pw_node *node); +int pw_node_set_driver(struct pw_node *node, struct pw_node *driver); + /** Activate a link \memberof pw_link * Starts the negotiation of formats and buffers on \a link and then * starts data streaming */