node: keep better track of driver node

When a session disappears, try to move nodes to another session.
This commit is contained in:
Wim Taymans 2018-05-17 17:40:08 +02:00
parent 1bff5afe05
commit ef9fcd1093
7 changed files with 144 additions and 72 deletions

View file

@ -84,7 +84,6 @@ struct spa_graph_callbacks {
#define spa_graph_finish(g) ((g)->callbacks->finish((g)->callbacks_data))
struct spa_graph {
#define SPA_GRAPH_FLAG_DRIVER (1 << 0)
uint32_t flags; /* flags */
struct spa_graph_node *parent; /* parent node or NULL when driver */
struct spa_graph_state *state; /* state of graph */

View file

@ -689,11 +689,9 @@ static int impl_node_process(struct spa_node *node)
struct impl *impl = this->impl;
int status;
impl->client_node->node->driver_node = impl->this.node->driver_node;
impl->ctrl.min_size = impl->ctrl.max_size =
impl->this.node->driver_node->rt.quantum->size;
spa_log_trace(this->log, "%p: process %d", this, impl->ctrl.max_size);
if (impl->use_converter) {
@ -897,8 +895,9 @@ static void client_node_destroy(void *data)
struct impl *impl = data;
pw_log_debug("client-stream %p: destroy", &impl->this);
spa_hook_remove(&impl->client_node_listener);
pw_node_set_driver(impl->client_node->node, NULL);
spa_hook_remove(&impl->client_node_listener);
spa_hook_remove(&impl->node_listener);
pw_node_destroy(impl->this.node);
cleanup(impl);
@ -935,14 +934,22 @@ static void node_destroy(void *data)
pw_log_debug("client-stream %p: destroy", &impl->this);
spa_hook_remove(&impl->node_listener);
spa_hook_remove(&impl->client_node_listener);
pw_client_node_destroy(impl->client_node);
cleanup(impl);
}
static void node_driver_changed(void *data, struct pw_node *driver)
{
struct impl *impl = data;
impl->client_node->node->driver_node = driver;
}
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = node_destroy,
.driver_changed = node_driver_changed,
};
/** Create a new client stream

View file

@ -126,6 +126,9 @@ struct link_data {
};
static int handle_autoconnect(struct impl *impl, struct pw_node *node,
const struct pw_properties *props);
/** \endcond */
static void link_data_remove(struct link_data *data)
@ -151,10 +154,17 @@ static void session_destroy(struct session *sess)
spa_list_remove(&sess->l);
spa_hook_remove(&sess->node_listener);
spa_list_for_each_safe(ni, t, &sess->node_list, l)
node_info_free(ni);
if (sess->dsp)
if (sess->dsp) {
spa_hook_remove(&sess->dsp_listener);
pw_node_destroy(sess->dsp);
}
spa_list_for_each_safe(ni, t, &sess->node_list, l) {
pw_node_set_state(ni->node, PW_NODE_STATE_SUSPENDED);
pw_node_set_driver(ni->node, NULL);
handle_autoconnect(ni->impl, ni->node,
pw_node_get_properties(ni->node));
node_info_free(ni);
}
free(sess);
}
@ -306,7 +316,7 @@ static void reconfigure_session(struct session *sess)
sess->node->rt.quantum->rate.num = 1;
sess->node->rt.quantum->rate.denom = sess->sample_rate;
sess->node->rt.quantum->size = buffer_size;
sess->node->rt.quantum->size = sess->buffer_size;
pw_log_info("module %p: driver node:%p quantum:%d/%d",
impl, sess->node, sess->sample_rate, buffer_size);
@ -358,6 +368,8 @@ static int link_session_dsp(struct session *session)
}
pw_link_register(session->link, NULL, pw_module_get_global(impl->module), NULL);
reconfigure_session(session);
return 0;
}
@ -412,7 +424,7 @@ static int find_session(void *data, struct session *sess)
return 0;
}
static void handle_autoconnect(struct impl *impl, struct pw_node *node,
static int handle_autoconnect(struct impl *impl, struct pw_node *node,
const struct pw_properties *props)
{
struct pw_node *peer;
@ -423,6 +435,11 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node,
struct session *session;
struct node_info *info;
uint32_t sample_rate, buffer_size;
int res;
str = pw_properties_get(props, PW_NODE_PROP_AUTOCONNECT);
if (str == NULL || !pw_properties_parse_bool(str))
return 0;
sample_rate = DEFAULT_SAMPLE_RATE;
@ -455,16 +472,16 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node,
else if (strcmp(category, "Capture") == 0)
find.media_class = "Audio/Source";
else
return;
return -EINVAL;
}
else if (strcmp(media, "Video") == 0) {
if (strcmp(category, "Capture") == 0)
find.media_class = "Video/Source";
else
return;
return -EINVAL;
}
else
return;
return -EINVAL;
str = pw_properties_get(props, PW_NODE_PROP_TARGET_NODE);
if (str != NULL)
@ -479,7 +496,7 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node,
spa_list_for_each(session, &impl->session_list, l)
find_session(&find, session);
if (find.sess == NULL)
return;
return -ENOENT;
session = find.sess;
@ -488,24 +505,24 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node,
else if (strcmp(category, "Playback") == 0)
direction = PW_DIRECTION_INPUT;
else
return;
return -EINVAL;
if (exclusive || session->dsp == NULL) {
if (exclusive && !spa_list_is_empty(&session->node_list)) {
pw_log_warn("session busy, can't get exclusive access");
return;
return -EBUSY;
}
if (session->link != NULL) {
pw_log_warn("session busy with DSP");
return;
return -EBUSY;
}
peer = session->node;
session->exclusive = exclusive;
}
else {
if (session->link == NULL) {
if (link_session_dsp(session) < 0)
return;
if ((res = link_session_dsp(session)) < 0)
return res;
}
peer = session->dsp;
}
@ -520,13 +537,15 @@ static void handle_autoconnect(struct impl *impl, struct pw_node *node,
info->buffer_size = buffer_size;
spa_list_init(&info->links);
spa_list_append(&info->session->node_list, &info->l);
spa_list_append(&session->node_list, &info->l);
pw_node_add_listener(node, &info->node_listener, &node_info_events, info);
pw_node_for_each_port(peer, direction, on_peer_port, info);
reconfigure_session(session);
return 1;
}
static void node_destroy(void *data)
@ -647,9 +666,7 @@ static int on_global(void *data, struct pw_global *global)
properties = pw_node_get_properties(node);
str = pw_properties_get(properties, PW_NODE_PROP_AUTOCONNECT);
if (str != NULL && pw_properties_parse_bool(str)) {
handle_autoconnect(impl, node, properties);
if (handle_autoconnect(impl, node, properties) == 1) {
return 0;
}
else if ((str = pw_properties_get(properties, "media.class")) == NULL)

View file

@ -791,9 +791,11 @@ do_activate_link(struct spa_loop *loop,
spa_graph_port_add(&this->input->rt.mix_node, &this->rt.mix[SPA_DIRECTION_INPUT].port);
spa_graph_port_link(&this->rt.mix[SPA_DIRECTION_OUTPUT].port,
&this->rt.mix[SPA_DIRECTION_INPUT].port);
this->rt.link.signal_data = &this->input->node->rt.root;
spa_graph_link_add(&this->output->node->rt.root,
this->input->node->rt.root.state,
&this->rt.link);
return 0;
}
@ -1032,6 +1034,7 @@ do_deactivate_link(struct spa_loop *loop,
spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_OUTPUT].port);
spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_INPUT].port);
spa_graph_link_remove(&this->rt.link);
this->rt.link.signal_data = NULL;
return 0;
}
@ -1169,46 +1172,22 @@ move_graph(struct spa_graph *dst, struct spa_graph *src)
}
}
static int
do_join_graphs(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
static int find_driver(struct pw_link *this)
{
struct pw_link *this = user_data;
struct spa_graph *in_graph, *out_graph;
struct spa_graph_node *in_root, *out_root;
struct pw_node *out_driver, *in_driver;
in_root = &this->input->node->rt.root;
out_root = &this->output->node->rt.root;
out_driver = this->output->node->driver_node;
in_driver = this->input->node->driver_node;
in_graph = in_root->graph;
out_graph = out_root->graph;
pw_log_debug("link %p: drivers %p/%p", this, out_driver, in_driver);
pw_log_debug("link %p: roots %p/%p graphs %p/%p %d/%d", this,
in_root, out_root, in_graph, out_graph,
in_graph->flags, out_graph->flags);
if (out_driver == in_driver)
return 0;
if (in_graph != out_graph) {
struct spa_graph *src, *dst;
bool out_driver;
out_driver = SPA_FLAG_CHECK(out_graph->flags, SPA_GRAPH_FLAG_DRIVER);
if (out_driver) {
src = in_graph;
dst = out_graph;
this->input->node->driver_node = this->output->node->driver_node;
pw_log_debug("link %p: in_graph to out_graph %p", this, this->output->node);
}
else {
src = out_graph;
dst = in_graph;
this->output->node->driver_node = this->input->node->driver_node;
pw_log_debug("link %p: out_graph to in_graph %p", this, this->input->node);
}
move_graph(dst, src);
}
this->rt.link.signal = spa_graph_link_signal_node;
this->rt.link.signal_data = in_root;
if (out_driver->driver)
pw_node_set_driver(in_driver, out_driver);
else
pw_node_set_driver(out_driver, in_driver);
return 0;
}
@ -1291,12 +1270,13 @@ struct pw_link *pw_link_new(struct pw_core *core,
pw_port_init_mix(output, &this->rt.mix[SPA_DIRECTION_OUTPUT]);
pw_port_init_mix(input, &this->rt.mix[SPA_DIRECTION_INPUT]);
this->rt.link.signal = spa_graph_link_signal_node;
pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl,
output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id);
pw_loop_invoke(output->node->data_loop,
do_join_graphs, SPA_ID_INVALID, NULL, 0, false, this);
find_driver(this);
spa_hook_list_call(&output->listener_list, struct pw_port_events, link_added, this);
spa_hook_list_call(&input->listener_list, struct pw_port_events, link_added, this);

View file

@ -403,6 +403,58 @@ int pw_node_initialized(struct pw_node *this)
return 0;
}
static int
do_move_nodes(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *src = user_data;
struct pw_node *this = &src->this;
struct impl *dst = *(struct impl **)data;
struct spa_graph_node *n, *t;
spa_graph_node_remove(&this->rt.root);
spa_graph_node_add(&src->driver_graph, &this->rt.root);
spa_list_for_each_safe(n, t, &src->driver_graph.nodes, link) {
spa_graph_node_remove(n);
spa_graph_node_add(&dst->driver_graph, n);
}
return 0;
}
int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
struct pw_node *n, *t;
pw_log_debug("node %p: driver:%p current:%p", node, driver, node->driver_node);
if (driver == NULL)
driver = node;
if (node->driver_node == driver)
return 0;
spa_list_remove(&node->driver_link);
spa_list_append(&driver->driver_list, &node->driver_link);
node->driver_node = driver;
spa_list_for_each_safe(n, t, &node->driver_list, driver_link) {
spa_list_remove(&n->driver_link);
spa_list_append(&driver->driver_list, &n->driver_link);
n->driver_node = driver;
spa_hook_list_call(&n->listener_list, struct pw_node_events,
driver_changed, driver);
pw_log_debug("node %p: add %p", driver, n);
}
pw_loop_invoke(node->data_loop,
do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *),
true, impl);
spa_hook_list_call(&node->listener_list, struct pw_node_events, driver_changed, driver);
return 0;
}
static void check_properties(struct pw_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
@ -418,13 +470,6 @@ static void check_properties(struct pw_node *node)
else
node->driver = false;
if (node->driver)
SPA_FLAG_SET(impl->driver_graph.flags, SPA_GRAPH_FLAG_DRIVER);
else
SPA_FLAG_UNSET(impl->driver_graph.flags, SPA_GRAPH_FLAG_DRIVER);
node->driver_node = node;
pw_log_debug("node %p: graph %p driver:%d", node, &impl->driver_graph, node->driver);
}
@ -478,6 +523,7 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->data_loop = core->data_loop;
spa_list_init(&this->driver_list);
spa_list_init(&this->resource_list);
spa_hook_list_init(&this->listener_list);
@ -490,6 +536,7 @@ struct pw_node *pw_node_new(struct pw_core *core,
spa_list_init(&this->output_ports);
pw_map_init(&this->output_port_map, 64, 64);
spa_graph_init(&impl->driver_graph, &impl->driver_state);
spa_graph_data_init(&impl->driver_data, &impl->driver_graph);
spa_graph_set_callbacks(&impl->driver_graph,
@ -498,7 +545,6 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->rt.driver = &impl->driver_graph;
this->rt.activation = &impl->root_activation;
spa_graph_node_init(&this->rt.root, &this->rt.activation->state);
spa_graph_node_add(&impl->driver_graph, &this->rt.root);
spa_graph_init(&impl->graph, &impl->graph_state);
spa_graph_data_init(&impl->graph_data, &impl->graph);
@ -517,6 +563,10 @@ struct pw_node *pw_node_new(struct pw_core *core,
check_properties(this);
this->driver_node = this;
spa_list_append(&this->driver_list, &this->driver_link);
spa_graph_node_add(&impl->driver_graph, &this->rt.root);
return this;
no_mem:
@ -682,6 +732,7 @@ void pw_node_destroy(struct pw_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
struct pw_resource *resource, *tmp;
struct pw_node *n, *t;
struct pw_port *port, *tmpp;
pw_log_debug("node %p: destroy", impl);
@ -689,12 +740,21 @@ void pw_node_destroy(struct pw_node *node)
pause_node(node);
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
pw_log_debug("node %p: driver node %p", impl, node->driver_node);
if (node->registered) {
spa_list_remove(&node->link);
/* move all nodes driven by us to their own driver */
spa_list_for_each_safe(n, t, &node->driver_list, driver_link)
pw_node_set_driver(n, NULL);
if (node->driver_node != node) {
/* remove ourself from the (other) driver node */
spa_list_remove(&node->driver_link);
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
}
if (node->registered)
spa_list_remove(&node->link);
pw_log_debug("node %p: unlink ports", node);
spa_list_for_each(port, &node->input_ports, link)
pw_port_unlink(port);
@ -703,11 +763,13 @@ void pw_node_destroy(struct pw_node *node)
pw_log_debug("node %p: destroy ports", node);
spa_list_for_each_safe(port, tmpp, &node->input_ports, link) {
spa_hook_list_call(&node->listener_list, struct pw_node_events, port_removed, port);
spa_hook_list_call(&node->listener_list, struct pw_node_events,
port_removed, port);
pw_port_destroy(port);
}
spa_list_for_each_safe(port, tmpp, &node->output_ports, link) {
spa_hook_list_call(&node->listener_list, struct pw_node_events, port_removed, port);
spa_hook_list_call(&node->listener_list, struct pw_node_events,
port_removed, port);
pw_port_destroy(port);
}

View file

@ -83,6 +83,9 @@ struct pw_node_events {
/** an event is emited */
void (*event) (void *data, const struct spa_event *event);
/** the driver of the node changed */
void (*driver_changed) (void *data, struct pw_node *driver);
/** the node wants to process the graph */
void (*process) (void *data);
/** the node has a buffer to reuse */

View file

@ -265,6 +265,8 @@ struct pw_node {
* is selected to drive the graph */
struct pw_node *driver_node;
struct spa_list driver_list;
struct spa_list driver_link;
struct spa_clock *clock; /**< handle to SPA clock if any */
struct spa_node *node; /**< SPA node implementation */
@ -615,6 +617,8 @@ int pw_node_update_ports(struct pw_node *node);
int pw_node_initialized(struct pw_node *node);
int pw_node_set_driver(struct pw_node *node, struct pw_node *driver);
/** Activate a link \memberof pw_link
* Starts the negotiation of formats and buffers on \a link and then
* starts data streaming */