graph: fix the API a little

Add test for atomic graph updates
This commit is contained in:
Wim Taymans 2017-08-18 18:54:45 +02:00
parent c3d63983e1
commit eba2b82c8e
5 changed files with 368 additions and 97 deletions

View file

@ -26,10 +26,14 @@ extern "C" {
#include <spa/graph.h>
#define SPA_GRAPH_STATE_IN 0
#define SPA_GRAPH_STATE_OUT 1
#define SPA_GRAPH_STATE_CHECK_IN 2
#define SPA_GRAPH_STATE_CHECK_OUT 3
struct spa_graph_scheduler {
struct spa_graph *graph;
struct spa_list ready;
struct spa_list pending;
struct spa_graph_node *node;
};
@ -38,7 +42,6 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
{
sched->graph = graph;
spa_list_init(&sched->ready);
spa_list_init(&sched->pending);
sched->node = NULL;
}
@ -70,7 +73,7 @@ static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, s
debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
node->state = SPA_GRAPH_STATE_IN;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);
} else if (node->ready_link.next) {
@ -82,6 +85,7 @@ static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, s
static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
{
bool res;
int state;
struct spa_graph_port *p;
struct spa_graph_node *n;
@ -92,44 +96,48 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
debug("node %p action %d state %d\n", n, n->action, n->state);
debug("node %p state %d\n", n, n->state);
switch (n->action) {
case SPA_GRAPH_ACTION_IN:
n->state = n->callbacks->process_input(n->callbacks_data);
switch (n->state) {
case SPA_GRAPH_STATE_IN:
state = n->callbacks->process_input(n->callbacks_data);
if (state == SPA_RESULT_NEED_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
debug("node %p processed input state %d\n", n, n->state);
if (n == sched->node)
break;
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(sched->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_OUT:
n->state = n->callbacks->process_output(n->callbacks_data);
case SPA_GRAPH_STATE_OUT:
state = n->callbacks->process_output(n->callbacks_data);
if (state == SPA_RESULT_NEED_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
debug("node %p processed output state %d\n", n, n->state);
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(sched->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != sched->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
spa_list_insert(sched->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
} else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_scheduler_port_check(sched, p->peer);
case SPA_GRAPH_STATE_CHECK_IN:
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != sched->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->state = SPA_GRAPH_STATE_OUT;
spa_list_insert(sched->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
case SPA_GRAPH_STATE_CHECK_OUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_scheduler_port_check(sched, p->peer);
break;
default:
@ -143,8 +151,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
debug("node %p start pull\n", node);
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_RESULT_NEED_BUFFER;
node->state = SPA_GRAPH_STATE_CHECK_IN;
sched->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);
@ -153,7 +160,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
debug("node %p start push\n", node);
node->action = SPA_GRAPH_ACTION_OUT;
node->state = SPA_GRAPH_STATE_OUT;
sched->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);

View file

@ -77,7 +77,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
struct spa_graph_port *p;
struct spa_graph_node *n;
int iter = 1;
uint32_t action;
uint32_t state;
next:
empty = spa_list_is_empty(&graph->ready);
@ -94,26 +94,29 @@ next:
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
action = n->action;
debug("node %p state %d\n", n, n->state);
debug("node %p action %d, state %d\n", n, action, n->state);
switch (n->state) {
case SPA_GRAPH_STATE_IN:
case SPA_GRAPH_STATE_OUT:
case SPA_GRAPH_STATE_END:
if (n->state == SPA_GRAPH_STATE_END)
n->state = SPA_GRAPH_STATE_OUT;
switch (action) {
case SPA_GRAPH_ACTION_IN:
case SPA_GRAPH_ACTION_OUT:
case SPA_GRAPH_ACTION_END:
if (action == SPA_GRAPH_ACTION_END)
n->action = SPA_GRAPH_ACTION_OUT;
state = n->schedule(n);
debug("node %p schedule %d res %d\n", n, action, state);
n->state = n->schedule(n);
debug("node %p schedule %d res %d\n", n, action, n->state);
if (action == SPA_GRAPH_ACTION_IN && n == graph->node)
if (n->state == SPA_GRAPH_STATE_IN && n == graph->node)
break;
if (action != SPA_GRAPH_ACTION_END) {
if (n->state != SPA_GRAPH_STATE_END) {
debug("node %p add ready for CHECK\n", n);
n->action = SPA_GRAPH_ACTION_CHECK;
if (state == SPA_RESULT_NEED_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
else if (state == SPA_RESULT_OK)
n->state = SPA_GRAPH_STATE_CHECK_OK;
spa_list_insert(graph->ready.prev, &n->ready_link);
}
else {
@ -121,34 +124,34 @@ next:
}
break;
case SPA_GRAPH_ACTION_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
debug("node %p add ready OUT\n", n);
spa_list_insert(graph->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
case SPA_GRAPH_STATE_CHECK_IN:
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->state = SPA_GRAPH_STATE_OUT;
debug("node %p add ready OUT\n", n);
spa_list_insert(graph->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
break;
debug("node %p add pending\n", n);
n->action = SPA_GRAPH_ACTION_END;
spa_list_insert(&graph->pending, &n->ready_link);
}
else if (n->state == SPA_RESULT_OK) {
spa_graph_node_update(graph, n);
}
case SPA_GRAPH_STATE_CHECK_OUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
debug("node %p add pending\n", n);
n->state = SPA_GRAPH_STATE_END;
spa_list_insert(&graph->pending, &n->ready_link);
break;
case SPA_GRAPH_STATE_CHECK_OK:
spa_graph_node_update(graph, n);
break;
default:

View file

@ -60,33 +60,32 @@ struct spa_graph_port_callbacks {
};
struct spa_graph_node {
struct spa_list link;
struct spa_list ready_link;
struct spa_list ports[2];
struct spa_list link; /**< link in graph nodes list */
struct spa_list ports[2]; /**< list of input and output ports */
struct spa_list ready_link; /**< link for scheduler */
#define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0)
uint32_t flags;
int state;
#define SPA_GRAPH_ACTION_CHECK 0
#define SPA_GRAPH_ACTION_IN 1
#define SPA_GRAPH_ACTION_OUT 2
uint32_t action;
uint32_t max_in;
uint32_t required_in;
uint32_t ready_in;
uint32_t flags; /**< node flags */
uint32_t required_in; /**< required number of ports */
uint32_t ready_in; /**< number of ports with data */
int state; /**< state of the node */
/** callbacks and data */
const struct spa_graph_node_callbacks *callbacks;
void *callbacks_data;
void *scheduler_data; /**< scheduler private data */
};
struct spa_graph_port {
struct spa_list link;
struct spa_graph_node *node;
enum spa_direction direction;
uint32_t port_id;
uint32_t flags;
struct spa_port_io *io;
struct spa_graph_port *peer;
struct spa_list link; /**< link in node port list */
struct spa_graph_node *node; /**< owner node */
enum spa_direction direction; /**< port direction */
uint32_t port_id; /**< port id */
uint32_t flags; /**< port flags */
struct spa_port_io *io; /**< io area of the port */
struct spa_graph_port *peer; /**< peer */
/** callbacks and data */
const struct spa_graph_port_callbacks *callbacks;
void *callbacks_data;
void *scheduler_data; /**< scheduler private data */
};
static inline void spa_graph_init(struct spa_graph *graph)
@ -100,7 +99,7 @@ spa_graph_node_init(struct spa_graph_node *node)
spa_list_init(&node->ports[SPA_DIRECTION_INPUT]);
spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]);
node->flags = 0;
node->max_in = node->required_in = node->ready_in = 0;
node->required_in = node->ready_in = 0;
debug("node %p init\n", node);
}
@ -118,9 +117,8 @@ spa_graph_node_add(struct spa_graph *graph,
struct spa_graph_node *node)
{
node->state = SPA_RESULT_NEED_BUFFER;
node->action = SPA_GRAPH_ACTION_OUT;
node->ready_link.next = NULL;
spa_list_insert(graph->nodes.prev, &node->link);
spa_list_append(&graph->nodes, &node->link);
debug("node %p add\n", node);
}
@ -153,8 +151,7 @@ spa_graph_port_add(struct spa_graph_node *node,
{
debug("port %p add to node %p\n", port, node);
port->node = node;
spa_list_insert(node->ports[port->direction].prev, &port->link);
node->max_in++;
spa_list_append(&node->ports[port->direction], &port->link);
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT)
node->required_in++;
}