From eba2b82c8eea24a3fcb314059a9ec3ad244c098b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 18 Aug 2017 18:54:45 +0200 Subject: [PATCH] graph: fix the API a little Add test for atomic graph updates --- spa/include/spa/graph-scheduler1.h | 71 ++++---- spa/include/spa/graph-scheduler2.h | 85 +++++----- spa/include/spa/graph.h | 45 +++-- spa/tests/meson.build | 4 + spa/tests/test-graph2.c | 260 +++++++++++++++++++++++++++++ 5 files changed, 368 insertions(+), 97 deletions(-) create mode 100644 spa/tests/test-graph2.c diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h index 8fe0a6cd5..32c16bc0b 100644 --- a/spa/include/spa/graph-scheduler1.h +++ b/spa/include/spa/graph-scheduler1.h @@ -26,10 +26,14 @@ extern "C" { #include +#define SPA_GRAPH_STATE_IN 0 +#define SPA_GRAPH_STATE_OUT 1 +#define SPA_GRAPH_STATE_CHECK_IN 2 +#define SPA_GRAPH_STATE_CHECK_OUT 3 + struct spa_graph_scheduler { struct spa_graph *graph; struct spa_list ready; - struct spa_list pending; struct spa_graph_node *node; }; @@ -38,7 +42,6 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, { sched->graph = graph; spa_list_init(&sched->ready); - spa_list_init(&sched->pending); sched->node = NULL; } @@ -70,7 +73,7 @@ static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, s debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); if (node->required_in > 0 && node->ready_in == node->required_in) { - node->action = SPA_GRAPH_ACTION_IN; + node->state = SPA_GRAPH_STATE_IN; if (node->ready_link.next == NULL) spa_list_insert(sched->ready.prev, &node->ready_link); } else if (node->ready_link.next) { @@ -82,6 +85,7 @@ static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, s static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched) { bool res; + int state; struct spa_graph_port *p; struct spa_graph_node *n; @@ -92,44 +96,48 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched spa_list_remove(&n->ready_link); n->ready_link.next = NULL; - debug("node %p action %d state %d\n", n, n->action, n->state); + debug("node %p state %d\n", n, n->state); - switch (n->action) { - case SPA_GRAPH_ACTION_IN: - n->state = n->callbacks->process_input(n->callbacks_data); + switch (n->state) { + case SPA_GRAPH_STATE_IN: + state = n->callbacks->process_input(n->callbacks_data); + if (state == SPA_RESULT_NEED_BUFFER) + n->state = SPA_GRAPH_STATE_CHECK_IN; + else if (state == SPA_RESULT_HAVE_BUFFER) + n->state = SPA_GRAPH_STATE_CHECK_OUT; debug("node %p processed input state %d\n", n, n->state); if (n == sched->node) break; - n->action = SPA_GRAPH_ACTION_CHECK; spa_list_insert(sched->ready.prev, &n->ready_link); break; - case SPA_GRAPH_ACTION_OUT: - n->state = n->callbacks->process_output(n->callbacks_data); + case SPA_GRAPH_STATE_OUT: + state = n->callbacks->process_output(n->callbacks_data); + if (state == SPA_RESULT_NEED_BUFFER) + n->state = SPA_GRAPH_STATE_CHECK_IN; + else if (state == SPA_RESULT_HAVE_BUFFER) + n->state = SPA_GRAPH_STATE_CHECK_OUT; debug("node %p processed output state %d\n", n, n->state); - n->action = SPA_GRAPH_ACTION_CHECK; spa_list_insert(sched->ready.prev, &n->ready_link); break; - case SPA_GRAPH_ACTION_CHECK: - if (n->state == SPA_RESULT_NEED_BUFFER) { - n->ready_in = 0; - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - struct spa_graph_node *pn = p->peer->node; - if (p->io->status == SPA_RESULT_NEED_BUFFER) { - if (pn != sched->node - || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { - pn->action = SPA_GRAPH_ACTION_OUT; - spa_list_insert(sched->ready.prev, - &pn->ready_link); - } - } else if (p->io->status == SPA_RESULT_OK) - n->ready_in++; - } - } else if (n->state == SPA_RESULT_HAVE_BUFFER) { - spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) - spa_scheduler_port_check(sched, p->peer); + case SPA_GRAPH_STATE_CHECK_IN: + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_node *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + if (pn != sched->node + || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { + pn->state = SPA_GRAPH_STATE_OUT; + spa_list_insert(sched->ready.prev, + &pn->ready_link); + } + } else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; } + case SPA_GRAPH_STATE_CHECK_OUT: + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_scheduler_port_check(sched, p->peer); break; default: @@ -143,8 +151,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node) { debug("node %p start pull\n", node); - node->action = SPA_GRAPH_ACTION_CHECK; - node->state = SPA_RESULT_NEED_BUFFER; + node->state = SPA_GRAPH_STATE_CHECK_IN; sched->node = node; if (node->ready_link.next == NULL) spa_list_insert(sched->ready.prev, &node->ready_link); @@ -153,7 +160,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) { debug("node %p start push\n", node); - node->action = SPA_GRAPH_ACTION_OUT; + node->state = SPA_GRAPH_STATE_OUT; sched->node = node; if (node->ready_link.next == NULL) spa_list_insert(sched->ready.prev, &node->ready_link); diff --git a/spa/include/spa/graph-scheduler2.h b/spa/include/spa/graph-scheduler2.h index 2b2f08a5c..7353ac07e 100644 --- a/spa/include/spa/graph-scheduler2.h +++ b/spa/include/spa/graph-scheduler2.h @@ -77,7 +77,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph) struct spa_graph_port *p; struct spa_graph_node *n; int iter = 1; - uint32_t action; + uint32_t state; next: empty = spa_list_is_empty(&graph->ready); @@ -94,26 +94,29 @@ next: spa_list_remove(&n->ready_link); n->ready_link.next = NULL; - action = n->action; + debug("node %p state %d\n", n, n->state); - debug("node %p action %d, state %d\n", n, action, n->state); + switch (n->state) { + case SPA_GRAPH_STATE_IN: + case SPA_GRAPH_STATE_OUT: + case SPA_GRAPH_STATE_END: + if (n->state == SPA_GRAPH_STATE_END) + n->state = SPA_GRAPH_STATE_OUT; - switch (action) { - case SPA_GRAPH_ACTION_IN: - case SPA_GRAPH_ACTION_OUT: - case SPA_GRAPH_ACTION_END: - if (action == SPA_GRAPH_ACTION_END) - n->action = SPA_GRAPH_ACTION_OUT; + state = n->schedule(n); + debug("node %p schedule %d res %d\n", n, action, state); - n->state = n->schedule(n); - debug("node %p schedule %d res %d\n", n, action, n->state); - - if (action == SPA_GRAPH_ACTION_IN && n == graph->node) + if (n->state == SPA_GRAPH_STATE_IN && n == graph->node) break; - if (action != SPA_GRAPH_ACTION_END) { + if (n->state != SPA_GRAPH_STATE_END) { debug("node %p add ready for CHECK\n", n); - n->action = SPA_GRAPH_ACTION_CHECK; + if (state == SPA_RESULT_NEED_BUFFER) + n->state = SPA_GRAPH_STATE_CHECK_IN; + else if (state == SPA_RESULT_HAVE_BUFFER) + n->state = SPA_GRAPH_STATE_CHECK_OUT; + else if (state == SPA_RESULT_OK) + n->state = SPA_GRAPH_STATE_CHECK_OK; spa_list_insert(graph->ready.prev, &n->ready_link); } else { @@ -121,34 +124,34 @@ next: } break; - case SPA_GRAPH_ACTION_CHECK: - if (n->state == SPA_RESULT_NEED_BUFFER) { - n->ready_in = 0; - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - struct spa_graph_node *pn = p->peer->node; - if (p->io->status == SPA_RESULT_NEED_BUFFER) { - if (pn != graph->node - || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { - pn->action = SPA_GRAPH_ACTION_OUT; - debug("node %p add ready OUT\n", n); - spa_list_insert(graph->ready.prev, - &pn->ready_link); - } - } else if (p->io->status == SPA_RESULT_OK) - n->ready_in++; - } + case SPA_GRAPH_STATE_CHECK_IN: + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_node *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + if (pn != graph->node + || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { + pn->state = SPA_GRAPH_STATE_OUT; + debug("node %p add ready OUT\n", n); + spa_list_insert(graph->ready.prev, + &pn->ready_link); + } + } else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; } - else if (n->state == SPA_RESULT_HAVE_BUFFER) { - spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) - spa_graph_port_check(graph, p->peer); + break; - debug("node %p add pending\n", n); - n->action = SPA_GRAPH_ACTION_END; - spa_list_insert(&graph->pending, &n->ready_link); - } - else if (n->state == SPA_RESULT_OK) { - spa_graph_node_update(graph, n); - } + case SPA_GRAPH_STATE_CHECK_OUT: + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check(graph, p->peer); + + debug("node %p add pending\n", n); + n->state = SPA_GRAPH_STATE_END; + spa_list_insert(&graph->pending, &n->ready_link); + break; + + case SPA_GRAPH_STATE_CHECK_OK: + spa_graph_node_update(graph, n); break; default: diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index e1be4e193..d1f6616c1 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -60,33 +60,32 @@ struct spa_graph_port_callbacks { }; struct spa_graph_node { - struct spa_list link; - struct spa_list ready_link; - struct spa_list ports[2]; + struct spa_list link; /**< link in graph nodes list */ + struct spa_list ports[2]; /**< list of input and output ports */ + struct spa_list ready_link; /**< link for scheduler */ #define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0) - uint32_t flags; - int state; -#define SPA_GRAPH_ACTION_CHECK 0 -#define SPA_GRAPH_ACTION_IN 1 -#define SPA_GRAPH_ACTION_OUT 2 - uint32_t action; - uint32_t max_in; - uint32_t required_in; - uint32_t ready_in; + uint32_t flags; /**< node flags */ + uint32_t required_in; /**< required number of ports */ + uint32_t ready_in; /**< number of ports with data */ + int state; /**< state of the node */ + /** callbacks and data */ const struct spa_graph_node_callbacks *callbacks; void *callbacks_data; + void *scheduler_data; /**< scheduler private data */ }; struct spa_graph_port { - struct spa_list link; - struct spa_graph_node *node; - enum spa_direction direction; - uint32_t port_id; - uint32_t flags; - struct spa_port_io *io; - struct spa_graph_port *peer; + struct spa_list link; /**< link in node port list */ + struct spa_graph_node *node; /**< owner node */ + enum spa_direction direction; /**< port direction */ + uint32_t port_id; /**< port id */ + uint32_t flags; /**< port flags */ + struct spa_port_io *io; /**< io area of the port */ + struct spa_graph_port *peer; /**< peer */ + /** callbacks and data */ const struct spa_graph_port_callbacks *callbacks; void *callbacks_data; + void *scheduler_data; /**< scheduler private data */ }; static inline void spa_graph_init(struct spa_graph *graph) @@ -100,7 +99,7 @@ spa_graph_node_init(struct spa_graph_node *node) spa_list_init(&node->ports[SPA_DIRECTION_INPUT]); spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]); node->flags = 0; - node->max_in = node->required_in = node->ready_in = 0; + node->required_in = node->ready_in = 0; debug("node %p init\n", node); } @@ -118,9 +117,8 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node) { node->state = SPA_RESULT_NEED_BUFFER; - node->action = SPA_GRAPH_ACTION_OUT; node->ready_link.next = NULL; - spa_list_insert(graph->nodes.prev, &node->link); + spa_list_append(&graph->nodes, &node->link); debug("node %p add\n", node); } @@ -153,8 +151,7 @@ spa_graph_port_add(struct spa_graph_node *node, { debug("port %p add to node %p\n", port, node); port->node = node; - spa_list_insert(node->ports[port->direction].prev, &port->link); - node->max_in++; + spa_list_append(&node->ports[port->direction], &port->link); if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT) node->required_in++; } diff --git a/spa/tests/meson.build b/spa/tests/meson.build index dd5fec970..eb908ae89 100644 --- a/spa/tests/meson.build +++ b/spa/tests/meson.build @@ -10,6 +10,10 @@ executable('test-graph', 'test-graph.c', include_directories : [spa_inc ], dependencies : [dl_lib, pthread_lib], install : false) +executable('test-graph2', 'test-graph2.c', + include_directories : [spa_inc ], + dependencies : [dl_lib, pthread_lib], + install : false) executable('test-perf', 'test-perf.c', include_directories : [spa_inc, spa_libinc ], dependencies : [dl_lib, pthread_lib], diff --git a/spa/tests/test-graph2.c b/spa/tests/test-graph2.c new file mode 100644 index 000000000..503fbc724 --- /dev/null +++ b/spa/tests/test-graph2.c @@ -0,0 +1,260 @@ +/* Spa + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static SPA_TYPE_MAP_IMPL(default_map, 4096); +static SPA_LOG_IMPL(default_log); + +struct type { + uint32_t node; +}; + +static inline void init_type(struct type *type, struct spa_type_map *map) +{ + type->node = spa_type_map_get_id(map, SPA_TYPE__Node); +} + +struct version { + uint16_t current; + uint16_t pending; +}; + +struct data { + struct spa_type_map *map; + struct spa_log *log; + struct spa_loop data_loop; + struct type type; + + int writers; + struct version version; + struct spa_graph graph[2]; + + struct spa_graph_node source_node[2]; + struct spa_graph_port source_out[2]; + struct spa_graph_port volume_in[2]; + struct spa_graph_node volume_node[2]; + struct spa_graph_port volume_out[2]; + struct spa_graph_port sink_in[2]; + struct spa_graph_node sink_node[2]; +}; + +static int copy_graph(struct data *data, int current) +{ + int c = (current)&1; + int v = (current+1)&1; + struct spa_graph *ng, *og; + struct spa_graph_node *nn, *on; + struct spa_graph_port *np, *op; + int d; + + d = (v - c); + + og = &data->graph[c]; + ng = &data->graph[v]; + spa_list_init(&ng->nodes); + + printf("copy graph %d -> %d\n", c, v); + spa_list_for_each(on, &og->nodes, link) { + nn = &on[d]; + *nn = *on; + spa_list_append(&ng->nodes, &nn->link); + + spa_list_init(&nn->ports[SPA_DIRECTION_INPUT]); + spa_list_for_each(op, &on->ports[SPA_DIRECTION_INPUT], link) { + np = &op[d]; + *np = *op; + np->node = nn; + np->peer = &op->peer[d]; + spa_list_append(&nn->ports[SPA_DIRECTION_INPUT], &np->link); + } + + spa_list_init(&nn->ports[SPA_DIRECTION_OUTPUT]); + spa_list_for_each(op, &on->ports[SPA_DIRECTION_OUTPUT], link) { + np = &op[d]; + *np = *op; + np->node = nn; + np->peer = &op->peer[d]; + spa_list_append(&nn->ports[SPA_DIRECTION_OUTPUT], &np->link); + } + } + return 0; +} + +static int start_write(struct data *data) +{ + if (data->writers++ == 0) { + printf("writer start %d %d\n", data->version.current, data->version.pending); + if (data->version.current == data->version.pending) + copy_graph(data, data->version.current); + data->version.pending = data->version.current; + } + return (data->version.current + 1) & 1; +} + +static int end_write(struct data *data) +{ + if (--data->writers == 0) { + data->version.pending++; + printf("writer end %d %d\n", data->version.current, data->version.pending); + } + return 0; +} + +static bool switch_graph(struct data *data) +{ + bool res = data->version.current != data->version.pending; + if (res) { + printf("switch graph %d -> %d\n", data->version.current, data->version.pending); + data->version.current = data->version.pending; + } + return res; +} + +static int print_graph(struct data *data, int v) +{ + struct spa_graph *g; + struct spa_graph_node *n; + struct spa_graph_port *p; + + g = &data->graph[v]; + printf("graph %p (version %d):\n", g, v); + + spa_list_for_each(n, &g->nodes, link) { + printf(" node %p\n", n); + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + printf(" in: %p -> %p\n", p, p->peer); + } + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { + printf(" out: %p -> %p\n", p, p->peer); + } + } + return 0; +} + +static int make_graph1(struct data *data) +{ + int v = start_write(data); + + spa_graph_node_init(&data->source_node[v]); + spa_graph_node_add(&data->graph[v], &data->source_node[v]); + spa_graph_port_add(&data->source_node[v], &data->source_out[v]); + + spa_graph_node_init(&data->volume_node[v]); + spa_graph_node_add(&data->graph[v], &data->volume_node[v]); + spa_graph_port_add(&data->volume_node[v], &data->volume_in[v]); + + spa_graph_port_link(&data->source_out[v], &data->volume_in[v]); + + spa_graph_port_add(&data->volume_node[v], &data->volume_out[v]); + + spa_graph_node_init(&data->sink_node[v]); + spa_graph_node_add(&data->graph[v], &data->sink_node[v]); + spa_graph_port_add(&data->sink_node[v], &data->sink_in[v]); + + spa_graph_port_link(&data->volume_out[v], &data->sink_in[v]); + + end_write(data); + + return 0; +} + +static int make_graph2(struct data *data) +{ + int v = start_write(data); + + spa_graph_port_unlink(&data->volume_in[v]); + spa_graph_port_unlink(&data->volume_out[v]); + spa_graph_node_remove(&data->volume_node[v]); + + spa_graph_port_link(&data->source_out[v], &data->sink_in[v]); + + end_write(data); + + return 0; +} + +static int make_graph3(struct data *data) +{ + int v = start_write(data); + + spa_graph_port_unlink(&data->source_out[v]); + + spa_graph_node_add(&data->graph[v], &data->volume_node[v]); + + spa_graph_port_link(&data->source_out[v], &data->volume_in[v]); + spa_graph_port_link(&data->volume_out[v], &data->sink_in[v]); + + end_write(data); + + return 0; +} + +int main(int argc, char *argv[]) +{ + struct data data = { NULL }; + const char *str; + + spa_graph_init(&data.graph[0]); + spa_graph_init(&data.graph[1]); + + data.map = &default_map.map; + data.log = &default_log.log; + + if ((str = getenv("SPA_DEBUG"))) + data.log->level = atoi(str); + + init_type(&data.type, data.map); + + print_graph(&data, 0); + print_graph(&data, 1); + make_graph1(&data); + print_graph(&data, 0); + print_graph(&data, 1); + switch_graph(&data); + print_graph(&data, 0); + print_graph(&data, 1); + make_graph2(&data); + print_graph(&data, 0); + print_graph(&data, 1); + switch_graph(&data); + make_graph3(&data); + print_graph(&data, 0); + print_graph(&data, 1); + +}