WIP: work on per driver graphs

Make a graph per driver node and let nodes that link to this join this
graph
Work on an idea for an even simpler scheduling mechanism.
This commit is contained in:
Wim Taymans 2018-03-16 07:50:22 +01:00
parent 2d77c4dc34
commit 933635f63f
17 changed files with 325 additions and 239 deletions

View file

@ -36,15 +36,12 @@ static inline void spa_graph_data_init(struct spa_graph_data *data,
data->graph = graph;
}
static inline int spa_graph_impl_process(void *data, struct spa_graph_node *node)
static inline int spa_graph_trigger(struct spa_graph *g, struct spa_graph_node *node)
{
struct spa_graph_data *d = (struct spa_graph_data *) data;
struct spa_graph *g = d->graph;
int old = node->state->status, res = 0;
uint32_t val;
spa_debug("node %p: pending %d required %d %d", node,
node->state->pending, node->state->required, old);
spa_debug("node %p: pending %d required %d", node,
node->state->pending, node->state->required);
if (node->state->pending == 0) {
spa_debug("node %p: nothing pending", node);
@ -52,25 +49,10 @@ static inline int spa_graph_impl_process(void *data, struct spa_graph_node *node
}
val = __atomic_sub_fetch(&node->state->pending, 1, __ATOMIC_SEQ_CST);
if (val == 0) {
if (old == SPA_STATUS_NEED_BUFFER &&
node->implementation->process_input) {
res = spa_node_process_input(node->implementation);
}
else {
res = spa_node_process_output(node->implementation);
}
spa_debug("node %p: process %d", node, res);
if (val == 0)
spa_graph_node_process(node);
if (res == SPA_STATUS_HAVE_BUFFER)
spa_graph_have_output(g, node);
node->state->status = res;
spa_debug("node %p: end %d", node, res);
}
return node->state->status;
}
static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node)
@ -118,8 +100,9 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
if (pn->sched_link.next != NULL)
continue;
if (pp->io->status == SPA_STATUS_NEED_BUFFER) {
pn->state->status = spa_node_process_output(pn->implementation);
if (pp->io->status == SPA_STATUS_NEED_BUFFER &&
pn->state->status == SPA_STATUS_HAVE_BUFFER) {
pn->state->status = spa_graph_node_process(pn);
} else {
n->state->pending--;
}
@ -132,7 +115,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
n->sched_link.next = NULL;
spa_debug("schedule node %p: %d", n, n->state->status);
spa_graph_process(d->graph, n);
spa_graph_trigger(d->graph, n);
}
return 0;
}
@ -145,19 +128,22 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *
spa_debug("node %p start push", node);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_process(d->graph, p->peer->node);
spa_graph_trigger(d->graph, p->peer->node);
return 0;
}
static inline int spa_graph_impl_run(void *data)
{
return 0;
}
static const struct spa_graph_callbacks spa_graph_impl_default = {
SPA_VERSION_GRAPH_CALLBACKS,
.need_input = spa_graph_impl_need_input,
.have_output = spa_graph_impl_have_output,
.process = spa_graph_impl_process,
};
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -26,158 +26,143 @@ extern "C" {
#include <spa/graph/graph.h>
static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
struct spa_graph_data {
struct spa_graph *graph;
};
static inline void spa_graph_data_init(struct spa_graph_data *data,
struct spa_graph *graph)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = -EBADF;
return res;
data->graph = graph;
}
static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port)
static inline int spa_graph_trigger(struct spa_graph *g, struct spa_graph_node *node)
{
struct spa_graph_node *node = port->node;
uint32_t val;
if (port->io->status == SPA_STATUS_HAVE_BUFFER)
node->ready++;
spa_debug("node %p: pending %d required %d", node,
node->state->pending, node->state->required);
spa_debug("port %p node %p check %d %d %d", port, node, port->io->status, node->ready, node->required);
if (node->required > 0 && node->ready == node->required) {
node->action = SPA_GRAPH_ACTION_IN;
if (node->ready_link.next == NULL)
spa_list_append(&graph->ready, &node->ready_link);
} else if (node->ready_link.next) {
spa_list_remove(&node->ready_link);
node->ready_link.next = NULL;
if (node->state->pending == 0) {
spa_debug("node %p: nothing pending", node);
return node->state->status;
}
val = __atomic_sub_fetch(&node->state->pending, 1, __ATOMIC_SEQ_CST);
if (val == 0)
spa_graph_node_process(node);
return node->state->status;
}
static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) {
struct spa_graph_port *p;
node->ready = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_STATUS_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready++;
}
spa_debug("node %p update %d ready", node, node->ready);
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node)
{
bool empty;
struct spa_graph_port *p;
struct spa_graph_node *n;
int iter = 1;
uint32_t state;
#if 0
struct spa_graph_data *d = (struct spa_graph_data *) data;
struct spa_list queue, pending;
struct spa_graph_node *n, *pn;
struct spa_graph_port *p, *pp;
next:
empty = spa_list_is_empty(&graph->ready);
if (empty && !spa_list_is_empty(&graph->pending)) {
spa_debug("copy pending");
spa_list_insert_list(&graph->ready, &graph->pending);
spa_list_init(&graph->pending);
empty = false;
}
if (iter-- == 0 || empty)
return !empty;
n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
spa_debug("node %p state %d", n, n->state);
switch (n->state) {
case SPA_GRAPH_STATE_IN:
case SPA_GRAPH_STATE_OUT:
case SPA_GRAPH_STATE_END:
if (n->state == SPA_GRAPH_STATE_END)
n->state = SPA_GRAPH_STATE_OUT;
state = n->schedule(n);
spa_debug("node %p schedule %d res %d", n, action, state);
if (n->state == SPA_GRAPH_STATE_IN && n == graph->node)
break;
if (n->state != SPA_GRAPH_STATE_END) {
spa_debug("node %p add ready for CHECK", n);
if (state == SPA_STATUS_NEED_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_STATUS_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
else if (state == SPA_STATUS_OK)
n->state = SPA_GRAPH_STATE_CHECK_OK;
spa_list_append(&graph->ready, &n->ready_link);
}
else {
spa_graph_node_update(graph, n);
}
break;
case SPA_GRAPH_STATE_CHECK_IN:
n->ready = 0;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_node *pn = p->peer->node;
if (p->io->status == SPA_STATUS_NEED_BUFFER) {
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->state = SPA_GRAPH_STATE_OUT;
spa_debug("node %p add ready OUT", n);
spa_list_append(&graph->ready, &pn->ready_link);
}
} else if (p->io->status == SPA_STATUS_OK)
n->ready++;
}
break;
case SPA_GRAPH_STATE_CHECK_OUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
spa_debug("node %p add pending", n);
n->state = SPA_GRAPH_STATE_END;
spa_list_insert(&graph->pending, &n->ready_link);
break;
case SPA_GRAPH_STATE_CHECK_OK:
spa_graph_node_update(graph, n);
break;
default:
break;
}
goto next;
}
static inline void spa_graph_scheduler_pull(struct spa_graph *graph, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_STATUS_NEED_BUFFER;
graph->node = node;
spa_debug("node %p start pull", node);
if (node->ready_link.next == NULL)
spa_list_append(&graph->ready, &node->ready_link);
spa_list_init(&queue);
spa_list_init(&pending);
node->state->status = SPA_STATUS_NEED_BUFFER;
if (node->sched_link.next == NULL)
spa_list_append(&queue, &node->sched_link);
while (!spa_list_is_empty(&queue)) {
n = spa_list_first(&queue, struct spa_graph_node, sched_link);
spa_list_remove(&n->sched_link);
n->sched_link.next = NULL;
n->state->pending = n->state->required + 1;
spa_debug("node %p: add %d %d status %d", n,
n->state->pending, n->state->required,
n->state->status);
spa_list_prepend(&pending, &n->sched_link);
if (n->state->status == SPA_STATUS_HAVE_BUFFER)
continue;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
pp = p->peer;
if (pp == NULL)
continue;
pn = pp->node;
spa_debug("node %p: %p in io:%d state:%d %p", n, pn, pp->io->status,
pn->state->status, pn->sched_link.next);
if (pn->sched_link.next != NULL)
continue;
if (pp->io->status == SPA_STATUS_NEED_BUFFER &&
pn->state->status == SPA_STATUS_HAVE_BUFFER) {
pn->state->status = spa_graph_node_process(pn);
} else {
n->state->pending--;
}
spa_list_append(&queue, &pn->sched_link);
}
}
while (!spa_list_is_empty(&pending)) {
n = spa_list_first(&pending, struct spa_graph_node, sched_link);
spa_list_remove(&n->sched_link);
n->sched_link.next = NULL;
spa_debug("schedule node %p: %d", n, n->state->status);
spa_graph_trigger(d->graph, n);
}
#endif
return 0;
}
static inline void spa_graph_scheduler_push(struct spa_graph *graph, struct spa_graph_node *node)
static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *node)
{
node->action = SPA_GRAPH_ACTION_OUT;
graph->node = node;
struct spa_graph_data *d = (struct spa_graph_data *) data;
struct spa_graph_port *p;
spa_debug("node %p start push", node);
if (node->ready_link.next == NULL)
spa_list_append(&graph->ready, &node->ready_link);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
if (p->peer)
spa_graph_trigger(d->graph, p->peer->node);
return 0;
}
static inline int spa_graph_impl_run(void *data)
{
struct spa_graph_data *d = (struct spa_graph_data *) data;
struct spa_graph *g = d->graph;
struct spa_graph_node *n;
spa_debug("graph %p run", d->graph);
spa_list_for_each(n, &g->nodes, link) {
n->state->pending = n->state->required + 1;
spa_debug("node %p: add %d %d status %d", n,
n->state->pending, n->state->required,
n->state->status);
}
spa_list_for_each(n, &g->nodes, link)
spa_graph_trigger(d->graph, n);
return 0;
}
static const struct spa_graph_callbacks spa_graph_impl_default = {
SPA_VERSION_GRAPH_CALLBACKS,
.need_input = spa_graph_impl_need_input,
.have_output = spa_graph_impl_have_output,
.run = spa_graph_impl_run,
};
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -43,9 +43,7 @@ struct spa_graph_callbacks {
int (*need_input) (void *data, struct spa_graph_node *node);
int (*have_output) (void *data, struct spa_graph_node *node);
int (*process) (void *data, struct spa_graph_node *node);
int (*reuse_buffer) (void *data, struct spa_graph_node *node,
uint32_t port_id, uint32_t buffer_id);
int (*run) (void *data);
};
struct spa_graph {
@ -56,8 +54,7 @@ struct spa_graph {
#define spa_graph_need_input(g,n) ((g)->callbacks->need_input((g)->callbacks_data, (n)))
#define spa_graph_have_output(g,n) ((g)->callbacks->have_output((g)->callbacks_data, (n)))
#define spa_graph_process(g,n) ((g)->callbacks->process((g)->callbacks_data, (n)))
#define spa_graph_reuse_buffer(g,n,p,i) ((g)->callbacks->reuse_buffer((g)->callbacks_data, (n),(p),(i)))
#define spa_graph_run(g) ((g)->callbacks->run((g)->callbacks_data))
struct spa_graph_state {
int status; /**< status of the node */
@ -65,18 +62,31 @@ struct spa_graph_state {
uint32_t pending; /**< number of input nodes pending */
};
struct spa_graph_node_callbacks {
#define SPA_VERSION_GRAPH_NODE_CALLBACKS 0
uint32_t version;
int (*process) (void *data, struct spa_graph_node *node);
int (*reuse_buffer) (void *data, struct spa_graph_node *node,
uint32_t port_id, uint32_t buffer_id);
};
struct spa_graph_node {
struct spa_list link; /**< link in graph nodes list */
struct spa_graph *graph; /**< owner graph */
struct spa_list ports[2]; /**< list of input and output ports */
#define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0)
uint32_t flags; /**< node flags */
struct spa_node *implementation;/**< node implementation */
struct spa_graph_state *state; /**< state of the node */
const struct spa_graph_node_callbacks *callbacks;
void *callbacks_data;
struct spa_list sched_link; /**< link for scheduler */
void *scheduler_data; /**< scheduler private data */
};
#define spa_graph_node_process(n) ((n)->callbacks->process((n)->callbacks_data, (n)))
#define spa_graph_node_reuse_buffer(n,p,i) ((n)->callbacks->reuse_buffer((n)->callbacks_data, (n),(p),(i)))
struct spa_graph_port {
struct spa_list link; /**< link in node port list */
struct spa_graph_node *node; /**< owner node */
@ -115,10 +125,12 @@ spa_graph_node_init(struct spa_graph_node *node, struct spa_graph_state *state)
}
static inline void
spa_graph_node_set_implementation(struct spa_graph_node *node,
struct spa_node *implementation)
spa_graph_node_set_callbacks(struct spa_graph_node *node,
const struct spa_graph_node_callbacks *callbacks,
void *callbacks_data)
{
node->implementation = implementation;
node->callbacks = callbacks;
node->callbacks_data = callbacks_data;
}
static inline void
@ -202,6 +214,45 @@ spa_graph_port_unlink(struct spa_graph_port *port)
}
}
static inline int spa_graph_node_impl_process(void *data, struct spa_graph_node *node)
{
struct spa_graph *g = node->graph;
struct spa_node *n = data;
//int old = node->state->status, res = 0;
int res = 0;
// if (old == SPA_STATUS_NEED_BUFFER && n->process_input &&
if (n->process_input &&
!spa_list_is_empty(&node->ports[SPA_DIRECTION_INPUT]))
res = spa_node_process_input(n);
else
res = spa_node_process_output(n);
spa_debug("node %p: process %d", node, res);
node->state->status = res;
if (res == SPA_STATUS_HAVE_BUFFER)
spa_graph_have_output(g, node);
spa_debug("node %p: end %d", node, res);
return res;
}
static inline int spa_graph_node_impl_reuse_buffer(void *data, struct spa_graph_node *node,
uint32_t port_id, uint32_t buffer_id)
{
struct spa_node *n = data;
return spa_node_port_reuse_buffer(n, port_id, buffer_id);
}
static const struct spa_graph_node_callbacks spa_graph_node_impl_default = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
.process = spa_graph_node_impl_process,
.reuse_buffer = spa_graph_node_impl_reuse_buffer,
};
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -862,6 +862,7 @@ static int impl_node_process_output(struct spa_node *node)
static const struct spa_dict_item info_items[] = {
{ "media.class", "Video/Source" },
{ "node.pause-on-idle", "false" },
{ "node.driver", "true" },
};
static const struct spa_dict info = {

View file

@ -399,13 +399,13 @@ static int make_nodes(struct data *data, const char *device)
&data->source_sink_io[0], sizeof(data->source_sink_io[0]));
spa_graph_node_init(&data->source_node, &data->source_state);
spa_graph_node_set_implementation(&data->source_node, data->source);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
spa_graph_port_init(&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_init(&data->sink_node, &data->sink_state);
spa_graph_node_set_implementation(&data->sink_node, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);

View file

@ -357,13 +357,13 @@ static int make_nodes(struct data *data, const char *device)
&data->volume_sink_io[0], sizeof(data->volume_sink_io[0]));
spa_graph_node_init(&data->source_node, &data->source_state);
spa_graph_node_set_implementation(&data->source_node, data->source);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
spa_graph_port_init(&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_init(&data->volume_node, &data->volume_state);
spa_graph_node_set_implementation(&data->volume_node, data->volume);
spa_graph_node_set_callbacks(&data->volume_node, &spa_graph_node_impl_default, data->volume);
spa_graph_node_add(&data->graph, &data->volume_node);
spa_graph_port_init(&data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_port_add(&data->volume_node, &data->volume_in);
@ -374,7 +374,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->volume_node, &data->volume_out);
spa_graph_node_init(&data->sink_node, &data->sink_state);
spa_graph_node_set_implementation(&data->sink_node, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);

View file

@ -472,19 +472,19 @@ static int make_nodes(struct data *data, const char *device)
#ifdef USE_GRAPH
spa_graph_node_init(&data->source1_node, &data->source1_state);
spa_graph_node_set_implementation(&data->source1_node, data->source1);
spa_graph_node_set_callbacks(&data->source1_node, &spa_graph_node_impl_default, data->source1);
spa_graph_port_init(&data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->source1_node, &data->source1_out);
spa_graph_node_add(&data->graph, &data->source1_node);
spa_graph_node_init(&data->source2_node, &data->source2_state);
spa_graph_node_set_implementation(&data->source2_node, data->source2);
spa_graph_node_set_callbacks(&data->source2_node, &spa_graph_node_impl_default, data->source2);
spa_graph_port_init(&data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]);
spa_graph_port_add(&data->source2_node, &data->source2_out);
spa_graph_node_add(&data->graph, &data->source2_node);
spa_graph_node_init(&data->mix_node, &data->mix_state);
spa_graph_node_set_implementation(&data->mix_node, data->mix);
spa_graph_node_set_callbacks(&data->mix_node, &spa_graph_node_impl_default, data->mix);
spa_graph_port_init(&data->mix_in[0], SPA_DIRECTION_INPUT,
data->mix_ports[0], 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->mix_node, &data->mix_in[0]);
@ -500,7 +500,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->mix_node, &data->mix_out);
spa_graph_node_init(&data->sink_node, &data->sink_state);
spa_graph_node_set_implementation(&data->sink_node, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);
spa_graph_node_add(&data->graph, &data->sink_node);

View file

@ -375,7 +375,7 @@ static int make_nodes(struct data *data)
&data->source_sink_io[0], sizeof(data->source_sink_io[0]));
spa_graph_node_init(&data->source_node, &data->source_state);
spa_graph_node_set_implementation(&data->source_node, data->source);
spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
@ -383,7 +383,7 @@ static int make_nodes(struct data *data)
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_init(&data->sink_node, &data->sink_state);
spa_graph_node_set_implementation(&data->sink_node, data->sink);
spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;