graph: improve API a little

Node allow linking to multiple inputs when there is a mixer.
This commit is contained in:
Wim Taymans 2017-06-29 09:31:50 +02:00
parent 9ee0cd7d8d
commit 83e65c31ab
6 changed files with 79 additions and 49 deletions

View file

@ -709,41 +709,50 @@ struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction di
pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, *n_ports);
/* first try to find an unlinked port */
spa_list_for_each(p, ports, link) {
if (spa_list_is_empty(&p->links)) {
port = p;
break;
}
if (spa_list_is_empty(&p->links))
return p;
}
if (port == NULL) {
/* no port, can we create one ? */
if (*n_ports < max_ports) {
for (i = 0; i < max_ports && port == NULL; i++) {
if (portmap[i] == NULL) {
pw_log_debug("node %p: creating port direction %d %u", node,
direction, i);
port = portmap[i] = pw_port_new(node, direction, i);
spa_list_insert(ports, &port->link);
(*n_ports)++;
if ((res = spa_node_add_port(node->node, direction, i)) < 0) {
pw_log_error("node %p: could not add port %d", node,
i);
} else {
spa_node_port_set_io(node->node, direction, i,
&port->io);
}
/* no port, can we create one ? */
if (*n_ports < max_ports) {
for (i = 0; i < max_ports; i++) {
if (portmap[i] == NULL) {
pw_log_debug("node %p: creating port direction %d %u", node, direction, i);
port = pw_port_new(node, direction, i);
if (port == NULL)
goto no_mem;
spa_list_insert(ports, &port->link);
if ((res = spa_node_add_port(node->node, direction, i)) < 0) {
pw_log_error("node %p: could not add port %d", node, i);
pw_port_destroy(port);
continue;
} else {
spa_node_port_set_io(node->node, direction, i, &port->io);
}
(*n_ports)++;
portmap[i] = port;
break;
}
} else {
/* for output we can reuse an existing port */
if (direction == PW_DIRECTION_OUTPUT && !spa_list_is_empty(ports)) {
port = spa_list_first(ports, struct pw_port, link);
}
}
} else {
if (!spa_list_is_empty(ports)) {
port = spa_list_first(ports, struct pw_port, link);
/* for output we can reuse an existing port, for input only
* when there is a mixer */
if (direction == PW_DIRECTION_INPUT && port->mixer == NULL)
port = NULL;
}
}
return port;
no_mem:
pw_log_error("node %p: can't create new port", node);
return NULL;
}
static void on_state_complete(struct pw_node *node, void *data, int res)

View file

@ -72,6 +72,8 @@ struct pw_port {
struct spa_list links; /**< list of \ref pw_link */
void *mixer; /**< optional port buffer mixer */
struct {
struct spa_list links; /**< list of \ref pw_link only accessed from the
* data thread */

View file

@ -34,6 +34,7 @@ struct spa_graph_port;
struct spa_graph {
struct spa_list nodes;
struct spa_list ready;
struct spa_graph_node *node;
};
typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node);
@ -74,14 +75,16 @@ static inline void spa_graph_init(struct spa_graph *graph)
static inline int spa_graph_node_schedule_default(struct spa_graph_node *node)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
return spa_node_process_input(n);
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
return spa_node_process_output(n);
res = spa_node_process_output(n);
else
return SPA_RESULT_ERROR;
res = SPA_RESULT_ERROR;
return res;
}
static inline void
@ -163,14 +166,13 @@ spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *out,
in->peer = NULL;
}
static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_graph_node *node)
static inline bool spa_graph_node_iterate(struct spa_graph *graph)
{
bool res;
struct spa_graph_port *p;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
while (!spa_list_is_empty(&graph->ready)) {
res = !spa_list_is_empty(&graph->ready);
if (res) {
struct spa_graph_node *n =
spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
@ -181,8 +183,8 @@ static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_g
case SPA_GRAPH_ACTION_IN:
case SPA_GRAPH_ACTION_OUT:
n->state = n->schedule(n);
if (n->action == SPA_GRAPH_ACTION_IN && n == node)
continue;
if (n->action == SPA_GRAPH_ACTION_IN && n == graph->node)
break;
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(graph->ready.prev, &n->ready_link);
break;
@ -193,7 +195,7 @@ static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_g
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != node
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
spa_list_insert(graph->ready.prev,
@ -204,14 +206,33 @@ static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_g
}
} else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
spa_graph_port_check(graph, p->peer);
}
break;
default:
break;
}
res = !spa_list_is_empty(&graph->ready);
}
return res;
}
static inline void spa_graph_node_pull(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_RESULT_NEED_BUFFER;
graph->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
static inline void spa_graph_node_push(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_OUT;
graph->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
#ifdef __cplusplus

View file

@ -229,10 +229,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data)
{
struct data *data = user_data;
data->sink_node.action = SPA_GRAPH_ACTION_CHECK;
data->sink_node.state = SPA_RESULT_NEED_BUFFER;
spa_graph_node_pull(&data->graph, &data->sink_node);
spa_graph_node_schedule(&data->graph, &data->sink_node);
while (spa_graph_node_iterate(&data->graph));
}
static void

View file

@ -37,7 +37,7 @@
#include <spa/format-utils.h>
#include <spa/format-builder.h>
#undef USE_GRAPH
#define USE_GRAPH
static SPA_TYPE_MAP_IMPL(default_map, 4096);
static SPA_LOG_IMPL(default_log);
@ -240,10 +240,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data)
{
struct data *data = user_data;
#ifdef USE_GRAPH
data->sink_node.action = PROCESS_CHECK;
data->sink_node.state = SPA_RESULT_NEED_BUFFER;
spa_graph_node_pull(&data->graph, &data->sink_node);
while (spa_graph_node_iterate(&data->graph));
spa_graph_node_schedule(&data->graph, &data->sink_node);
#else
int res;

View file

@ -223,8 +223,8 @@ static void on_sink_pull(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
data->sink_node.action = SPA_GRAPH_ACTION_CHECK;
spa_graph_node_schedule(&data->graph, &data->sink_node);
spa_graph_node_pull(&data->graph, &data->sink_node);
while (spa_graph_node_iterate(&data->graph));
}
}
@ -235,8 +235,8 @@ static void on_source_push(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
data->source_node.action = SPA_GRAPH_ACTION_OUT;
spa_graph_node_schedule(&data->graph, &data->source_node);
spa_graph_node_push(&data->graph, &data->source_node);
while (spa_graph_node_iterate(&data->graph));
}
}