Make scheduler more generic

Add some callbacks to trigger push and pull in a graph
Remove the scheduler, make some implementations of graph push/pull
functions.
Add some properties to jack clients and nodes
Fix the parent of the clients.
Notify link format changes
This commit is contained in:
Wim Taymans 2017-08-20 18:33:07 +02:00
parent eba2b82c8e
commit bece3a013b
20 changed files with 248 additions and 158 deletions

View file

@ -31,39 +31,39 @@ extern "C" {
#define SPA_GRAPH_STATE_CHECK_IN 2
#define SPA_GRAPH_STATE_CHECK_OUT 3
struct spa_graph_scheduler {
struct spa_graph_data {
struct spa_graph *graph;
struct spa_list ready;
struct spa_graph_node *node;
};
static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
struct spa_graph *graph)
static inline void spa_graph_data_init(struct spa_graph_data *data,
struct spa_graph *graph)
{
sched->graph = graph;
spa_list_init(&sched->ready);
sched->node = NULL;
data->graph = graph;
spa_list_init(&data->ready);
data->node = NULL;
}
static inline int spa_graph_scheduler_input(void *data)
static inline int spa_graph_node_impl_input(void *data)
{
struct spa_node *n = data;
return spa_node_process_input(n);
}
static inline int spa_graph_scheduler_output(void *data)
static inline int spa_graph_node_impl_output(void *data)
{
struct spa_node *n = data;
return spa_node_process_output(n);
}
static const struct spa_graph_node_callbacks spa_graph_scheduler_default = {
static const struct spa_graph_node_callbacks spa_graph_node_impl_default = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
spa_graph_scheduler_input,
spa_graph_scheduler_output,
spa_graph_node_impl_input,
spa_graph_node_impl_output,
};
static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, struct spa_graph_port *port)
static inline void spa_graph_data_port_check(struct spa_graph_data *data, struct spa_graph_port *port)
{
struct spa_graph_node *node = port->node;
@ -75,23 +75,23 @@ static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, s
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->state = SPA_GRAPH_STATE_IN;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);
spa_list_insert(data->ready.prev, &node->ready_link);
} else if (node->ready_link.next) {
spa_list_remove(&node->ready_link);
node->ready_link.next = NULL;
}
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
static inline bool spa_graph_data_iterate(struct spa_graph_data *data)
{
bool res;
int state;
struct spa_graph_port *p;
struct spa_graph_node *n;
res = !spa_list_is_empty(&sched->ready);
res = !spa_list_is_empty(&data->ready);
if (res) {
n = spa_list_first(&sched->ready, struct spa_graph_node, ready_link);
n = spa_list_first(&data->ready, struct spa_graph_node, ready_link);
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
@ -106,9 +106,9 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
debug("node %p processed input state %d\n", n, n->state);
if (n == sched->node)
if (n == data->node)
break;
spa_list_insert(sched->ready.prev, &n->ready_link);
spa_list_append(&data->ready, &n->ready_link);
break;
case SPA_GRAPH_STATE_OUT:
@ -118,7 +118,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
debug("node %p processed output state %d\n", n, n->state);
spa_list_insert(sched->ready.prev, &n->ready_link);
spa_list_append(&data->ready, &n->ready_link);
break;
case SPA_GRAPH_STATE_CHECK_IN:
@ -126,10 +126,10 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
if (pn != sched->node
if (pn != data->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->state = SPA_GRAPH_STATE_OUT;
spa_list_insert(sched->ready.prev,
spa_list_append(&data->ready,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
@ -137,35 +137,52 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
}
case SPA_GRAPH_STATE_CHECK_OUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_scheduler_port_check(sched, p->peer);
spa_graph_data_port_check(data, p->peer);
break;
default:
break;
}
res = !spa_list_is_empty(&sched->ready);
res = !spa_list_is_empty(&data->ready);
}
return res;
}
static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node)
{
struct spa_graph_data *d = data;
debug("node %p start pull\n", node);
node->state = SPA_GRAPH_STATE_CHECK_IN;
sched->node = node;
d->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);
spa_list_append(&d->ready, &node->ready_link);
while(spa_graph_data_iterate(data));
return SPA_RESULT_OK;
}
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *node)
{
struct spa_graph_data *d = data;
debug("node %p start push\n", node);
node->state = SPA_GRAPH_STATE_OUT;
sched->node = node;
d->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);
spa_list_append(&d->ready, &node->ready_link);
while(spa_graph_data_iterate(data));
return SPA_RESULT_OK;
}
static const struct spa_graph_callbacks spa_graph_impl_default = {
SPA_VERSION_GRAPH_CALLBACKS,
.need_input = spa_graph_impl_need_input,
.have_output = spa_graph_impl_have_output,
};
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -26,37 +26,26 @@ extern "C" {
#include <spa/graph.h>
struct spa_graph_scheduler {
struct spa_graph *graph;
};
static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
struct spa_graph *graph)
{
sched->graph = graph;
}
static inline int spa_graph_node_scheduler_input(void *data)
static inline int spa_graph_node_impl_input(void *data)
{
struct spa_node *n = data;
return spa_node_process_input(n);
}
static inline int spa_graph_node_scheduler_output(void *data)
static inline int spa_graph_node_impl_output(void *data)
{
struct spa_node *n = data;
return spa_node_process_output(n);
}
static const struct spa_graph_node_callbacks spa_graph_node_scheduler_default = {
static const struct spa_graph_node_callbacks spa_graph_node_impl_default = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
spa_graph_node_scheduler_input,
spa_graph_node_scheduler_output,
spa_graph_node_impl_input,
spa_graph_node_impl_output,
};
static inline int spa_graph_port_scheduler_reuse_buffer(void *data,
uint32_t buffer_id)
static inline int spa_graph_port_impl_reuse_buffer(void *data,
uint32_t buffer_id)
{
struct spa_graph_port *port = data;
struct spa_node *node = port->node->callbacks_data;
@ -64,12 +53,12 @@ static inline int spa_graph_port_scheduler_reuse_buffer(void *data,
return spa_node_port_reuse_buffer(node, port->port_id, buffer_id);
}
static const struct spa_graph_port_callbacks spa_graph_port_scheduler_default = {
static const struct spa_graph_port_callbacks spa_graph_port_impl_default = {
SPA_VERSION_GRAPH_PORT_CALLBACKS,
spa_graph_port_scheduler_reuse_buffer,
spa_graph_port_impl_reuse_buffer,
};
static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node)
{
struct spa_graph_port *p;
struct spa_graph_node *n, *t;
@ -99,7 +88,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
n->state = n->callbacks->process_output(n->callbacks_data);
debug("peer %p processed out %d\n", n, n->state);
if (n->state == SPA_RESULT_NEED_BUFFER)
spa_graph_scheduler_pull(sched, n);
spa_graph_need_input(n->graph, n);
else {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
if (p->io->status == SPA_RESULT_HAVE_BUFFER)
@ -123,43 +112,14 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
}
}
}
return SPA_RESULT_OK;
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
{
return false;
}
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node);
static inline void spa_graph_scheduler_chain(struct spa_graph_scheduler *sched,
struct spa_list *ready)
{
struct spa_graph_node *n, *t;
struct spa_graph_port *p;
spa_list_for_each_safe(n, t, ready, ready_link) {
n->state = n->callbacks->process_input(n->callbacks_data);
debug("node %p chain processed in %d\n", n, n->state);
if (n->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_scheduler_push(sched, n);
else {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
n->ready_in++;
}
}
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
}
}
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *node)
{
struct spa_graph_port *p;
struct spa_list ready;
struct spa_graph_node *n, *t;
debug("node %p start push\n", node);
@ -182,7 +142,22 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
spa_list_append(&ready, &pnode->ready_link);
}
spa_graph_scheduler_chain(sched, &ready);
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->state = n->callbacks->process_input(n->callbacks_data);
debug("node %p chain processed in %d\n", n, n->state);
if (n->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_have_output(n->graph, n);
else {
n->ready_in = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK &&
!(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
n->ready_in++;
}
}
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
}
node->state = node->callbacks->process_output(node->callbacks_data);
debug("node %p processed out %d\n", node, node->state);
@ -193,8 +168,16 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
node->ready_in++;
}
}
return SPA_RESULT_OK;
}
static const struct spa_graph_callbacks spa_graph_impl_default = {
SPA_VERSION_GRAPH_CALLBACKS,
.need_input = spa_graph_impl_need_input,
.have_output = spa_graph_impl_have_output,
};
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -40,10 +40,23 @@ struct spa_graph;
struct spa_graph_node;
struct spa_graph_port;
struct spa_graph_callbacks {
#define SPA_VERSION_GRAPH_CALLBACKS 0
uint32_t version;
int (*need_input) (void *data, struct spa_graph_node *node);
int (*have_output) (void *data, struct spa_graph_node *node);
};
struct spa_graph {
struct spa_list nodes;
const struct spa_graph_callbacks *callbacks;
void *callbacks_data;
};
#define spa_graph_need_input(g,n) ((g)->callbacks->need_input((g)->callbacks_data, (n)))
#define spa_graph_have_output(g,n) ((g)->callbacks->have_output((g)->callbacks_data, (n)))
struct spa_graph_node_callbacks {
#define SPA_VERSION_GRAPH_NODE_CALLBACKS 0
uint32_t version;
@ -61,6 +74,7 @@ struct spa_graph_port_callbacks {
struct spa_graph_node {
struct spa_list link; /**< link in graph nodes list */
struct spa_graph *graph; /**< owner graph */
struct spa_list ports[2]; /**< list of input and output ports */
struct spa_list ready_link; /**< link for scheduler */
#define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0)
@ -93,6 +107,15 @@ static inline void spa_graph_init(struct spa_graph *graph)
spa_list_init(&graph->nodes);
}
static inline void
spa_graph_set_callbacks(struct spa_graph *graph,
const struct spa_graph_callbacks *callbacks,
void *data)
{
graph->callbacks = callbacks;
graph->callbacks_data = data;
}
static inline void
spa_graph_node_init(struct spa_graph_node *node)
{
@ -116,6 +139,7 @@ static inline void
spa_graph_node_add(struct spa_graph *graph,
struct spa_graph_node *node)
{
node->graph = graph;
node->state = SPA_RESULT_NEED_BUFFER;
node->ready_link.next = NULL;
spa_list_append(&graph->nodes, &node->link);

View file

@ -98,7 +98,7 @@ struct data {
uint32_t n_support;
struct spa_graph graph;
struct spa_graph_scheduler sched;
struct spa_graph_data graph_data;
struct spa_graph_node source_node;
struct spa_graph_port source_out;
struct spa_graph_port volume_in;
@ -230,10 +230,7 @@ static void on_sink_event(void *data, struct spa_event *event)
static void on_sink_need_input(void *_data)
{
struct data *data = _data;
spa_graph_scheduler_pull(&data->sched, &data->sink_node);
while (spa_graph_scheduler_iterate(&data->sched));
spa_graph_need_input(&data->graph, &data->sink_node);
}
static void
@ -340,13 +337,13 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]);
spa_graph_node_init(&data->source_node);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_scheduler_default, data->source);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
spa_graph_port_init(&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_init(&data->volume_node);
spa_graph_node_set_callbacks(&data->volume_node, &spa_graph_scheduler_default, data->volume);
spa_graph_node_set_callbacks(&data->volume_node, &spa_graph_node_impl_default, data->volume);
spa_graph_node_add(&data->graph, &data->volume_node);
spa_graph_port_init(&data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_port_add(&data->volume_node, &data->volume_in);
@ -357,7 +354,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->volume_node, &data->volume_out);
spa_graph_node_init(&data->sink_node);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_scheduler_default, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);
@ -532,7 +529,8 @@ int main(int argc, char *argv[])
spa_graph_init(&data.graph);
spa_graph_scheduler_init(&data.sched, &data.graph);
spa_graph_data_init(&data.graph_data, &data.graph);
spa_graph_set_callbacks(&data.graph, &spa_graph_impl_default, &data.graph_data);
data.map = &default_map.map;
data.log = &default_log.log;

View file

@ -100,7 +100,7 @@ struct data {
uint32_t n_support;
struct spa_graph graph;
struct spa_graph_scheduler sched;
struct spa_graph_data graph_data;
struct spa_graph_node source1_node;
struct spa_graph_port source1_out;
struct spa_graph_node source2_node;
@ -242,9 +242,7 @@ static void on_sink_need_input(void *_data)
{
struct data *data = _data;
#ifdef USE_GRAPH
spa_graph_scheduler_pull(&data->sched, &data->sink_node);
while (spa_graph_scheduler_iterate(&data->sched));
spa_graph_need_input(&data->graph, &data->sink_node);
#else
int res;
@ -416,19 +414,19 @@ static int make_nodes(struct data *data, const char *device)
#ifdef USE_GRAPH
spa_graph_node_init(&data->source1_node);
spa_graph_node_set_callbacks(&data->source1_node, &spa_graph_scheduler_default, data->source1);
spa_graph_node_set_callbacks(&data->source1_node, &spa_graph_node_impl_default, data->source1);
spa_graph_port_init(&data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->source1_node, &data->source1_out);
spa_graph_node_add(&data->graph, &data->source1_node);
spa_graph_node_init(&data->source2_node);
spa_graph_node_set_callbacks(&data->source2_node, &spa_graph_scheduler_default, data->source2);
spa_graph_node_set_callbacks(&data->source2_node, &spa_graph_node_impl_default, data->source2);
spa_graph_port_init(&data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]);
spa_graph_port_add(&data->source2_node, &data->source2_out);
spa_graph_node_add(&data->graph, &data->source2_node);
spa_graph_node_init(&data->mix_node);
spa_graph_node_set_callbacks(&data->mix_node, &spa_graph_scheduler_default, data->mix);
spa_graph_node_set_callbacks(&data->mix_node, &spa_graph_node_impl_default, data->mix);
spa_graph_port_init(&data->mix_in[0], SPA_DIRECTION_INPUT,
data->mix_ports[0], 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->mix_node, &data->mix_in[0]);
@ -444,7 +442,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->mix_node, &data->mix_out);
spa_graph_node_init(&data->sink_node);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_scheduler_default, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);
spa_graph_node_add(&data->graph, &data->sink_node);
@ -651,7 +649,8 @@ int main(int argc, char *argv[])
data.data_loop.invoke = do_invoke;
spa_graph_init(&data.graph);
spa_graph_scheduler_init(&data.sched, &data.graph);
spa_graph_data_init(&data.graph_data, &data.graph);
spa_graph_set_callbacks(&data.graph, &spa_graph_impl_default, &data.graph_data);
if ((str = getenv("SPA_DEBUG")))
data.log->level = atoi(str);

View file

@ -103,7 +103,7 @@ struct data {
int iterations;
struct spa_graph graph;
struct spa_graph_scheduler sched;
struct spa_graph_data graph_data;
struct spa_graph_node source_node;
struct spa_graph_port source_out;
struct spa_graph_port sink_in;
@ -225,8 +225,7 @@ static void on_sink_pull(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
spa_graph_scheduler_pull(&data->sched, &data->sink_node);
while (spa_graph_scheduler_iterate(&data->sched));
spa_graph_need_input(&data->graph, &data->sink_node);
}
}
@ -237,8 +236,7 @@ static void on_source_push(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
spa_graph_scheduler_push(&data->sched, &data->source_node);
while (spa_graph_scheduler_iterate(&data->sched));
spa_graph_have_output(&data->graph, &data->source_node);
}
}
@ -364,7 +362,7 @@ static int make_nodes(struct data *data)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]);
spa_graph_node_init(&data->source_node);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_scheduler_default, data->source);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
@ -372,7 +370,7 @@ static int make_nodes(struct data *data)
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_init(&data->sink_node);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_scheduler_default, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
@ -529,7 +527,8 @@ int main(int argc, char *argv[])
const char *str;
spa_graph_init(&data.graph);
spa_graph_scheduler_init(&data.sched, &data.graph);
spa_graph_data_init(&data.graph_data, &data.graph);
spa_graph_set_callbacks(&data.graph, &spa_graph_impl_default, &data.graph_data);
data.map = &default_map.map;
data.log = &default_log.log;