Use graph to schedule things

Make real spa_graph nodes and ports and schedule those. This makes
it possible to add explicit tee and mixers in the real graph.
Rework the way we add and remove ports and nodes from the graph.
Remove confusing pw_port_link and merge core with pw_link_new()
Move scheduling in separate files, add some more graph-schedulers.
This commit is contained in:
Wim Taymans 2017-06-30 19:32:11 +02:00
parent 7297c18839
commit d2f877912a
19 changed files with 845 additions and 428 deletions

View file

@ -0,0 +1,137 @@
/* Simple Plugin API
* Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __SPA_GRAPH_SCHEDULER_H__
#define __SPA_GRAPH_SCHEDULER_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <spa/graph.h>
struct spa_graph_scheduler {
struct spa_graph *graph;
struct spa_list pending;
struct spa_graph_node *node;
};
static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
struct spa_graph *graph)
{
sched->graph = graph;
spa_list_init(&sched->pending);
sched->node = NULL;
}
static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = SPA_RESULT_ERROR;
return res;
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
{
bool res;
struct spa_graph *graph = sched->graph;
struct spa_graph_port *p;
struct spa_graph_node *n;
res = !spa_list_is_empty(&graph->ready);
if (res) {
n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
debug("node %p action %d state %d\n", n, n->action, n->state);
switch (n->action) {
case SPA_GRAPH_ACTION_IN:
case SPA_GRAPH_ACTION_OUT:
n->state = n->schedule(n);
debug("node %p scheduled action %d state %d\n", n, n->action, n->state);
if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node)
break;
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(graph->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != sched->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
spa_list_insert(graph->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
} else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
}
break;
default:
break;
}
res = !spa_list_is_empty(&graph->ready);
}
return res;
}
static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
debug("node %p start pull\n", node);
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_RESULT_NEED_BUFFER;
sched->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->graph->ready.prev, &node->ready_link);
}
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
debug("node %p start push\n", node);
node->action = SPA_GRAPH_ACTION_OUT;
sched->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->graph->ready.prev, &node->ready_link);
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* __SPA_GRAPH_SCHEDULER_H__ */

View file

@ -0,0 +1,153 @@
/* Simple Plugin API
* Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __SPA_GRAPH_SCHEDULER_H__
#define __SPA_GRAPH_SCHEDULER_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <spa/graph.h>
static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = SPA_RESULT_ERROR;
return res;
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
{
bool empty;
struct spa_graph_port *p;
struct spa_graph_node *n;
int iter = 1;
uint32_t action;
next:
empty = spa_list_is_empty(&graph->ready);
if (empty && !spa_list_is_empty(&graph->pending)) {
debug("copy pending\n");
spa_list_insert_list(&graph->ready, &graph->pending);
spa_list_init(&graph->pending);
empty = false;
}
if (iter-- == 0 || empty)
return !empty;
n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
action = n->action;
debug("node %p action %d, state %d\n", n, action, n->state);
switch (action) {
case SPA_GRAPH_ACTION_IN:
case SPA_GRAPH_ACTION_OUT:
case SPA_GRAPH_ACTION_END:
if (action == SPA_GRAPH_ACTION_END)
n->action = SPA_GRAPH_ACTION_OUT;
n->state = n->schedule(n);
debug("node %p schedule %d res %d\n", n, action, n->state);
if (action == SPA_GRAPH_ACTION_IN && n == graph->node)
break;
if (action != SPA_GRAPH_ACTION_END) {
debug("node %p add ready for CHECK\n", n);
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(graph->ready.prev, &n->ready_link);
}
else {
spa_graph_node_update(graph, n);
}
break;
case SPA_GRAPH_ACTION_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
debug("node %p add ready OUT\n", n);
spa_list_insert(graph->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
}
else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
debug("node %p add pending\n", n);
n->action = SPA_GRAPH_ACTION_END;
spa_list_insert(&graph->pending, &n->ready_link);
}
else if (n->state == SPA_RESULT_OK) {
spa_graph_node_update(graph, n);
}
break;
default:
break;
}
goto next;
}
static inline void spa_graph_scheduler_pull(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_RESULT_NEED_BUFFER;
graph->node = node;
debug("node %p start pull\n", node);
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
static inline void spa_graph_scheduler_push(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_OUT;
graph->node = node;
debug("node %p start push\n", node);
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* __SPA_GRAPH_SCHEDULER_H__ */

View file

@ -0,0 +1,173 @@
/* Simple Plugin API
* Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __SPA_GRAPH_SCHEDULER_H__
#define __SPA_GRAPH_SCHEDULER_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <spa/graph.h>
struct spa_graph_scheduler {
struct spa_graph *graph;
struct spa_list pending;
struct spa_graph_node *node;
};
static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
struct spa_graph *graph)
{
sched->graph = graph;
spa_list_init(&sched->pending);
sched->node = NULL;
}
static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = SPA_RESULT_ERROR;
return res;
}
static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
struct spa_graph_port *p;
struct spa_graph_node *n, *t;
struct spa_list ready;
debug("node %p start pull\n", node);
spa_list_init(&ready);
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_port *pport = p->peer;
struct spa_graph_node *pnode = pport->node;
debug("node %p peer %p io %d\n", node, pnode, pport->io->status);
if (pport->io->status == SPA_RESULT_NEED_BUFFER) {
spa_list_insert(ready.prev, &pnode->ready_link);
}
else if (pport->io->status == SPA_RESULT_OK && !(pnode->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready_in++;
}
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->action = SPA_GRAPH_ACTION_OUT;
n->state = n->schedule(n);
debug("peer %p scheduled %d %d\n", n, n->action, n->state);
if (n->state == SPA_RESULT_NEED_BUFFER)
spa_graph_scheduler_pull(sched, n);
else {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
if (p->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
}
}
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
}
debug("node %p %d %d\n", node, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
node->state = node->schedule(node);
debug("node %p scheduled %d %d\n", node, node->action, node->state);
if (node->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
if (p->io->status == SPA_RESULT_HAVE_BUFFER)
p->peer->node->ready_in++;
}
}
}
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
{
return false;
}
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
struct spa_graph_port *p;
struct spa_graph_node *n, *t;
struct spa_list ready;
debug("node %p start push\n", node);
spa_list_init(&ready);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
struct spa_graph_port *pport = p->peer;
struct spa_graph_node *pnode = pport->node;
if (pport->io->status == SPA_RESULT_HAVE_BUFFER)
pnode->ready_in++;
debug("node %p peer %p io %d %d %d\n", node, pnode, pport->io->status,
pnode->ready_in, pnode->required_in);
if (pnode->required_in > 0 && pnode->ready_in == pnode->required_in)
spa_list_insert(ready.prev, &pnode->ready_link);
}
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->action = SPA_GRAPH_ACTION_IN;
n->state = n->schedule(n);
debug("peer %p scheduled %d %d\n", n, n->action, n->state);
if (n->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_scheduler_push(sched, n);
else {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready_in++;
}
}
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
}
node->action = SPA_GRAPH_ACTION_OUT;
node->state = node->schedule(node);
debug("node %p scheduled %d %d\n", node, node->action, node->state);
if (node->state == SPA_RESULT_NEED_BUFFER) {
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
p->peer->node->ready_in++;
}
}
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* __SPA_GRAPH_SCHEDULER_H__ */

View file

@ -26,6 +26,13 @@ extern "C" {
#include <spa/defs.h>
#include <spa/list.h>
#include <spa/node.h>
#if 0
#define debug(...) printf(__VA_ARGS__)
#else
#define debug(...)
#endif
struct spa_graph;
struct spa_graph_node;
@ -34,7 +41,6 @@ struct spa_graph_port;
struct spa_graph {
struct spa_list nodes;
struct spa_list ready;
struct spa_graph_node *node;
};
typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node);
@ -73,20 +79,6 @@ static inline void spa_graph_init(struct spa_graph *graph)
spa_list_init(&graph->ready);
}
static inline int spa_graph_node_schedule_default(struct spa_graph_node *node)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = SPA_RESULT_ERROR;
return res;
}
static inline void
spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node,
spa_graph_node_func_t schedule, void *user_data)
@ -94,12 +86,14 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node,
spa_list_init(&node->ports[SPA_DIRECTION_INPUT]);
spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]);
node->flags = 0;
node->state = SPA_RESULT_OK;
node->state = SPA_RESULT_NEED_BUFFER;
node->action = SPA_GRAPH_ACTION_OUT;
node->schedule = schedule;
node->user_data = user_data;
node->ready_link.next = NULL;
spa_list_insert(graph->nodes.prev, &node->link);
node->max_in = node->required_in = node->ready_in = 0;
debug("node %p add\n", node);
}
static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port)
@ -109,6 +103,8 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
if (node->ready_link.next == NULL)
@ -119,6 +115,17 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap
}
}
static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) {
struct spa_graph_port *p;
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready_in++;
}
debug("node %p update %d ready\n", node, node->ready_in);
}
static inline void
spa_graph_port_add(struct spa_graph *graph,
struct spa_graph_node *node,
@ -128,13 +135,13 @@ spa_graph_port_add(struct spa_graph *graph,
uint32_t flags,
struct spa_port_io *io)
{
debug("port %p add %d to node %p \n", port, direction, node);
port->node = node;
port->direction = direction;
port->port_id = port_id;
port->flags = flags;
port->io = io;
port->peer = NULL;
spa_list_insert(node->ports[port->direction].prev, &port->link);
spa_list_insert(node->ports[direction].prev, &port->link);
node->max_in++;
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT)
node->required_in++;
@ -143,96 +150,36 @@ spa_graph_port_add(struct spa_graph *graph,
static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node)
{
debug("node %p remove\n", node);
spa_list_remove(&node->link);
if (node->ready_link.next)
spa_list_remove(&node->ready_link);
}
static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_graph_port *port)
{
debug("port %p remove\n", port);
spa_list_remove(&port->link);
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT)
port->node->required_in--;
}
static inline void
spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct spa_graph_port *in)
{
debug("port %p link to %p \n", out, in);
out->peer = in;
in->peer = out;
}
static inline void
spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *out,
struct spa_graph_port *in)
spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *port)
{
out->peer = NULL;
in->peer = NULL;
}
static inline bool spa_graph_node_iterate(struct spa_graph *graph)
{
bool res;
struct spa_graph_port *p;
res = !spa_list_is_empty(&graph->ready);
if (res) {
struct spa_graph_node *n =
spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
switch (n->action) {
case SPA_GRAPH_ACTION_IN:
case SPA_GRAPH_ACTION_OUT:
n->state = n->schedule(n);
if (n->action == SPA_GRAPH_ACTION_IN && n == graph->node)
break;
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(graph->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
spa_list_insert(graph->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
} else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
}
break;
default:
break;
}
res = !spa_list_is_empty(&graph->ready);
debug("port %p unlink from %p \n", port, port->peer);
if (port->peer) {
port->peer->peer = NULL;
port->peer = NULL;
}
return res;
}
static inline void spa_graph_node_pull(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_RESULT_NEED_BUFFER;
graph->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
static inline void spa_graph_node_push(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_OUT;
graph->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
#ifdef __cplusplus

View file

@ -45,6 +45,14 @@ static inline void spa_list_insert(struct spa_list *list, struct spa_list *elem)
elem->next->prev = elem;
}
static inline void spa_list_insert_list(struct spa_list *list, struct spa_list *other)
{
other->next->prev = list;
other->prev->next = list->next;
list->next->prev = other->prev;
list->next = other->next;
}
static inline void spa_list_remove(struct spa_list *elem)
{
elem->prev->next = elem->next;

View file

@ -36,6 +36,7 @@
#include <spa/format-utils.h>
#include <spa/format-builder.h>
#include <spa/graph.h>
#include <spa/graph-scheduler1.h>
static SPA_TYPE_MAP_IMPL(default_map, 4096);
static SPA_LOG_IMPL(default_log);
@ -97,6 +98,7 @@ struct data {
uint32_t n_support;
struct spa_graph graph;
struct spa_graph_scheduler sched;
struct spa_graph_node source_node;
struct spa_graph_port source_out;
struct spa_graph_port volume_in;
@ -229,9 +231,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data)
{
struct data *data = user_data;
spa_graph_node_pull(&data->graph, &data->sink_node);
spa_graph_scheduler_pull(&data->sched, &data->sink_node);
while (spa_graph_node_iterate(&data->graph));
while (spa_graph_scheduler_iterate(&data->sched));
}
static void
@ -338,12 +340,12 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]);
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]);
spa_graph_node_add(&data->graph, &data->source_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->source_node, spa_graph_scheduler_default,
data->source);
spa_graph_port_add(&data->graph, &data->source_node, &data->source_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_scheduler_default,
data->volume);
spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_in, SPA_DIRECTION_INPUT,
0, 0, &data->source_volume_io[0]);
@ -353,7 +355,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->graph, &data->volume_node,
&data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]);
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->volume_sink_io[0]);
@ -528,6 +530,7 @@ int main(int argc, char *argv[])
spa_graph_init(&data.graph);
spa_graph_scheduler_init(&data.sched, &data.graph);
data.map = &default_map.map;
data.log = &default_log.log;

View file

@ -31,6 +31,7 @@
#include <spa/log-impl.h>
#include <spa/loop.h>
#include <spa/graph.h>
#include <spa/graph-scheduler1.h>
#include <spa/type-map.h>
#include <spa/type-map-impl.h>
#include <spa/audio/format-utils.h>
@ -99,6 +100,7 @@ struct data {
uint32_t n_support;
struct spa_graph graph;
struct spa_graph_scheduler sched;
struct spa_graph_node source1_node;
struct spa_graph_port source1_out;
struct spa_graph_node source2_node;
@ -240,8 +242,8 @@ static void on_sink_need_input(struct spa_node *node, void *user_data)
{
struct data *data = user_data;
#ifdef USE_GRAPH
spa_graph_node_pull(&data->graph, &data->sink_node);
while (spa_graph_node_iterate(&data->graph));
spa_graph_scheduler_pull(&data->sched, &data->sink_node);
while (spa_graph_scheduler_iterate(&data->sched));
#else
int res;
@ -416,17 +418,17 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]);
#ifdef USE_GRAPH
spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_scheduler_default,
data->source1);
spa_graph_port_add(&data->graph, &data->source1_node, &data->source1_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]);
spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_scheduler_default,
data->source2);
spa_graph_port_add(&data->graph, &data->source2_node, &data->source2_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]);
spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_scheduler_default,
data->mix);
spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[0], SPA_DIRECTION_INPUT,
data->mix_ports[0], 0, &data->source1_mix_io[0]);
@ -439,7 +441,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->graph, &data->mix_node,
&data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]);
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->mix_sink_io[0]);
@ -646,6 +648,7 @@ int main(int argc, char *argv[])
data.data_loop.invoke = do_invoke;
spa_graph_init(&data.graph);
spa_graph_scheduler_init(&data.sched, &data.graph);
if ((str = getenv("SPA_DEBUG")))
data.log->level = atoi(str);

View file

@ -34,6 +34,7 @@
#include <spa/format-utils.h>
#include <spa/format-builder.h>
#include <spa/graph.h>
#include <spa/graph-scheduler1.h>
#define MODE_SYNC_PUSH (1<<0)
#define MODE_SYNC_PULL (1<<1)
@ -102,6 +103,7 @@ struct data {
int iterations;
struct spa_graph graph;
struct spa_graph_scheduler sched;
struct spa_graph_node source_node;
struct spa_graph_port source_out;
struct spa_graph_port sink_in;
@ -223,8 +225,8 @@ static void on_sink_pull(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
spa_graph_node_pull(&data->graph, &data->sink_node);
while (spa_graph_node_iterate(&data->graph));
spa_graph_scheduler_pull(&data->sched, &data->sink_node);
while (spa_graph_scheduler_iterate(&data->sched));
}
}
@ -235,8 +237,8 @@ static void on_source_push(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
spa_graph_node_push(&data->graph, &data->source_node);
while (spa_graph_node_iterate(&data->graph));
spa_graph_scheduler_push(&data->sched, &data->source_node);
while (spa_graph_scheduler_iterate(&data->sched));
}
}
@ -365,12 +367,12 @@ static int make_nodes(struct data *data)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]);
spa_graph_node_add(&data->graph, &data->source_node,
spa_graph_node_schedule_default, data->source);
spa_graph_scheduler_default, data->source);
data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
spa_graph_port_add(&data->graph, &data->source_node,
&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default,
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
spa_graph_port_add(&data->graph, &data->sink_node,
@ -526,6 +528,7 @@ int main(int argc, char *argv[])
const char *str;
spa_graph_init(&data.graph);
spa_graph_scheduler_init(&data.sched, &data.graph);
data.map = &default_map.map;
data.log = &default_log.log;