node: improve scheduling

Do a pull in links that need a buffer.
When pulling, stop when we find a node that has buffers.
This commit is contained in:
Wim Taymans 2018-03-09 12:42:34 +01:00
parent ad6ecf6be8
commit e85f7501b5

View file

@ -377,37 +377,39 @@ static void node_need_input(void *data);
static int impl_node_process(struct pw_node *node)
{
struct spa_graph_port *p;
int res = 0;
int res = 0, old;
pw_log_trace("node %p: process %d", node, node->rt.activation->status);
old = node->rt.activation->status;
if (node->rt.activation->status != SPA_STATUS_HAVE_BUFFER) {
pw_log_trace("node %p: process %d", node, old);
if (old == SPA_STATUS_NEED_BUFFER) {
if (!spa_list_is_empty(&node->rt.node.ports[SPA_DIRECTION_INPUT])) {
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_input(p->peer->node->implementation);
if (node->node->process_input)
res = node->rt.activation->status = spa_node_process_input(node->node);
res = spa_node_process_input(node->node);
}
else {
if (node->node->process_output)
res = node->rt.activation->status = spa_node_process_output(node->node);
res = spa_node_process_output(node->node);
}
} else if (node->rt.activation->status == SPA_STATUS_HAVE_BUFFER) {
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link)
spa_node_process_output(p->peer->node->implementation);
if (node->node->process_output)
res = node->rt.activation->status = spa_node_process_output(node->node);
}
pw_log_trace("node %p: process %d", node, res);
if (res == SPA_STATUS_HAVE_BUFFER)
if (res == SPA_STATUS_HAVE_BUFFER) {
node_have_output(node);
else if (res == SPA_STATUS_NEED_BUFFER)
node_need_input(node);
}
else if (res == SPA_STATUS_NEED_BUFFER) {
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_output(p->peer->node->implementation);
}
pw_log_trace("node %p: process %d", node, res);
return node->rt.activation->status;
node->rt.activation->status = res;
return res;
}
static void check_properties(struct pw_node *node)
@ -576,45 +578,63 @@ static void node_need_input(void *data)
struct spa_graph_port *p;
pw_log_trace("node %p: need input", node);
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_output(p->peer->node->implementation);
spa_hook_list_call(&node->listener_list, struct pw_node_events, need_input);
spa_list_init(&queue);
spa_list_init(&pending);
node->rt.activation->status = SPA_STATUS_NEED_BUFFER;
spa_list_append(&queue, &node->rt.sched_link);
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_output(p->peer->node->implementation);
if (node->rt.sched_link.next == NULL)
spa_list_append(&queue, &node->rt.sched_link);
while (!spa_list_is_empty(&queue)) {
struct pw_link *l;
n = spa_list_first(&queue, struct pw_node, rt.sched_link);
spa_list_remove(&n->rt.sched_link);
n->rt.sched_link.next = NULL;
if (n != node);
spa_list_prepend(&pending, &n->rt.sched_link);
n->rt.activation->pending = n->rt.activation->required + 1;
pw_log_trace("node %p: add %d %d status %d", n,
n->rt.activation->pending, n->rt.activation->required,
n->rt.activation->status);
spa_list_prepend(&pending, &n->rt.sched_link);
n->rt.activation->pending = 1;
if (n->rt.activation->status == SPA_STATUS_HAVE_BUFFER)
continue;
n->rt.activation->pending += n->rt.activation->required;
pw_log_trace("node %p: add %d %d",
n, n->rt.activation->pending, n->rt.activation->required);
spa_list_for_each(l, &n->rt.links[SPA_DIRECTION_INPUT], rt.in_node_link) {
pn = l->output->node;
pw_log_trace("node %p: %p in %p %d", n, pn, l, l->io->status);
if (l->io->status == SPA_STATUS_OK) {
n->rt.activation->pending -= 1;
pw_log_trace("node %p: %p in %p %d %d", n, pn, l, l->io->status,
pn->rt.activation->status);
if (pn->rt.sched_link.next != NULL)
continue;
if (l->io->status == SPA_STATUS_NEED_BUFFER) {
spa_list_for_each(p, &pn->rt.node.ports[SPA_DIRECTION_OUTPUT], link)
spa_node_process_output(p->peer->node->implementation);
if (pn->node->process_output)
pn->rt.activation->status = spa_node_process_output(pn->node);
if (pn->rt.activation->status == SPA_STATUS_NEED_BUFFER) {
spa_list_for_each(p, &pn->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_output(p->peer->node->implementation);
}
} else {
n->rt.activation->pending--;
}
if (pn->rt.sched_link.next == NULL)
spa_list_append(&queue, &pn->rt.sched_link);
spa_list_append(&queue, &pn->rt.sched_link);
}
}
while (!spa_list_is_empty(&pending)) {
@ -622,6 +642,7 @@ static void node_need_input(void *data)
spa_list_remove(&n->rt.sched_link);
n->rt.sched_link.next = NULL;
pw_log_trace("schedule node %p: %d", n, n->rt.activation->status);
node_process(n);
}
}
@ -633,8 +654,6 @@ static void node_have_output(void *data)
struct spa_graph_port *p;
pw_log_trace("node %p: have output", node);
node->rt.activation->status = SPA_STATUS_HAVE_BUFFER;
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link)
spa_node_process_input(p->peer->node->implementation);