diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 7ad5d1707..09da7d922 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -377,37 +377,39 @@ static void node_need_input(void *data); static int impl_node_process(struct pw_node *node) { struct spa_graph_port *p; - int res = 0; + int res = 0, old; - pw_log_trace("node %p: process %d", node, node->rt.activation->status); + old = node->rt.activation->status; - if (node->rt.activation->status != SPA_STATUS_HAVE_BUFFER) { + pw_log_trace("node %p: process %d", node, old); + + if (old == SPA_STATUS_NEED_BUFFER) { if (!spa_list_is_empty(&node->rt.node.ports[SPA_DIRECTION_INPUT])) { spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) spa_node_process_input(p->peer->node->implementation); if (node->node->process_input) - res = node->rt.activation->status = spa_node_process_input(node->node); + res = spa_node_process_input(node->node); } else { if (node->node->process_output) - res = node->rt.activation->status = spa_node_process_output(node->node); + res = spa_node_process_output(node->node); } - - } else if (node->rt.activation->status == SPA_STATUS_HAVE_BUFFER) { - spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link) - spa_node_process_output(p->peer->node->implementation); - - if (node->node->process_output) - res = node->rt.activation->status = spa_node_process_output(node->node); } + pw_log_trace("node %p: process %d", node, res); - if (res == SPA_STATUS_HAVE_BUFFER) + if (res == SPA_STATUS_HAVE_BUFFER) { node_have_output(node); - else if (res == SPA_STATUS_NEED_BUFFER) - node_need_input(node); + } + else if (res == SPA_STATUS_NEED_BUFFER) { + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) + spa_node_process_output(p->peer->node->implementation); + } + pw_log_trace("node %p: process %d", node, res); - return node->rt.activation->status; + node->rt.activation->status = res; + + return res; } static void check_properties(struct pw_node *node) @@ -576,45 +578,63 @@ static void node_need_input(void *data) struct spa_graph_port *p; pw_log_trace("node %p: need input", node); + + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) + spa_node_process_output(p->peer->node->implementation); + spa_hook_list_call(&node->listener_list, struct pw_node_events, need_input); spa_list_init(&queue); spa_list_init(&pending); node->rt.activation->status = SPA_STATUS_NEED_BUFFER; - spa_list_append(&queue, &node->rt.sched_link); - spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) - spa_node_process_output(p->peer->node->implementation); + if (node->rt.sched_link.next == NULL) + spa_list_append(&queue, &node->rt.sched_link); while (!spa_list_is_empty(&queue)) { struct pw_link *l; n = spa_list_first(&queue, struct pw_node, rt.sched_link); spa_list_remove(&n->rt.sched_link); + n->rt.sched_link.next = NULL; - if (n != node); - spa_list_prepend(&pending, &n->rt.sched_link); + n->rt.activation->pending = n->rt.activation->required + 1; + pw_log_trace("node %p: add %d %d status %d", n, + n->rt.activation->pending, n->rt.activation->required, + n->rt.activation->status); + + spa_list_prepend(&pending, &n->rt.sched_link); - n->rt.activation->pending = 1; if (n->rt.activation->status == SPA_STATUS_HAVE_BUFFER) continue; - n->rt.activation->pending += n->rt.activation->required; - pw_log_trace("node %p: add %d %d", - n, n->rt.activation->pending, n->rt.activation->required); - spa_list_for_each(l, &n->rt.links[SPA_DIRECTION_INPUT], rt.in_node_link) { pn = l->output->node; - pw_log_trace("node %p: %p in %p %d", n, pn, l, l->io->status); - if (l->io->status == SPA_STATUS_OK) { - n->rt.activation->pending -= 1; + pw_log_trace("node %p: %p in %p %d %d", n, pn, l, l->io->status, + pn->rt.activation->status); + + if (pn->rt.sched_link.next != NULL) continue; + + if (l->io->status == SPA_STATUS_NEED_BUFFER) { + spa_list_for_each(p, &pn->rt.node.ports[SPA_DIRECTION_OUTPUT], link) + spa_node_process_output(p->peer->node->implementation); + + if (pn->node->process_output) + pn->rt.activation->status = spa_node_process_output(pn->node); + + if (pn->rt.activation->status == SPA_STATUS_NEED_BUFFER) { + spa_list_for_each(p, &pn->rt.node.ports[SPA_DIRECTION_INPUT], link) + spa_node_process_output(p->peer->node->implementation); + } + + } else { + n->rt.activation->pending--; } - if (pn->rt.sched_link.next == NULL) - spa_list_append(&queue, &pn->rt.sched_link); + spa_list_append(&queue, &pn->rt.sched_link); } } while (!spa_list_is_empty(&pending)) { @@ -622,6 +642,7 @@ static void node_need_input(void *data) spa_list_remove(&n->rt.sched_link); n->rt.sched_link.next = NULL; + pw_log_trace("schedule node %p: %d", n, n->rt.activation->status); node_process(n); } } @@ -633,8 +654,6 @@ static void node_have_output(void *data) struct spa_graph_port *p; pw_log_trace("node %p: have output", node); - node->rt.activation->status = SPA_STATUS_HAVE_BUFFER; - spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link) spa_node_process_input(p->peer->node->implementation);