diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h index 7886e806a..6f5593b8b 100644 --- a/spa/include/spa/graph-scheduler1.h +++ b/spa/include/spa/graph-scheduler1.h @@ -48,14 +48,15 @@ static inline void spa_graph_data_init(struct spa_graph_data *data, static inline void spa_graph_data_port_check(struct spa_graph_data *data, struct spa_graph_port *port) { struct spa_graph_node *node = port->node; + uint32_t required = node->required[SPA_DIRECTION_INPUT]; if (port->io->status == SPA_RESULT_HAVE_BUFFER) - node->ready_in++; + node->ready[SPA_DIRECTION_INPUT]++; debug("port %p node %p check %d %d %d\n", port, node, - port->io->status, node->ready_in, node->required_in); + port->io->status, node->ready[SPA_DIRECTION_INPUT], required); - if (node->required_in > 0 && node->ready_in == node->required_in) { + if (required > 0 && node->ready[SPA_DIRECTION_INPUT] == required) { node->state = SPA_GRAPH_STATE_IN; if (node->ready_link.next == NULL) spa_list_insert(data->ready.prev, &node->ready_link); @@ -105,7 +106,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data) break; case SPA_GRAPH_STATE_CHECK_IN: - n->ready_in = 0; + n->ready[SPA_DIRECTION_INPUT] = 0; spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { struct spa_graph_node *pn = p->peer->node; if (p->io->status == SPA_RESULT_NEED_BUFFER) { @@ -116,7 +117,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data) &pn->ready_link); } } else if (p->io->status == SPA_RESULT_OK) - n->ready_in++; + n->ready[SPA_DIRECTION_INPUT]++; } case SPA_GRAPH_STATE_CHECK_OUT: spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) diff --git a/spa/include/spa/graph-scheduler2.h b/spa/include/spa/graph-scheduler2.h index 7353ac07e..a47f96631 100644 --- a/spa/include/spa/graph-scheduler2.h +++ b/spa/include/spa/graph-scheduler2.h @@ -46,11 +46,11 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap struct spa_graph_node *node = port->node; if (port->io->status == SPA_RESULT_HAVE_BUFFER) - node->ready_in++; + node->ready++; - debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); + debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready, node->required); - if (node->required_in > 0 && node->ready_in == node->required_in) { + if (node->required > 0 && node->ready == node->required) { node->action = SPA_GRAPH_ACTION_IN; if (node->ready_link.next == NULL) spa_list_insert(graph->ready.prev, &node->ready_link); @@ -63,12 +63,12 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) { struct spa_graph_port *p; - node->ready_in = 0; + node->ready = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - node->ready_in++; + node->ready++; } - debug("node %p update %d ready\n", node, node->ready_in); + debug("node %p update %d ready\n", node, node->ready); } static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph) @@ -125,7 +125,7 @@ next: break; case SPA_GRAPH_STATE_CHECK_IN: - n->ready_in = 0; + n->ready = 0; spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { struct spa_graph_node *pn = p->peer->node; if (p->io->status == SPA_RESULT_NEED_BUFFER) { @@ -137,7 +137,7 @@ next: &pn->ready_link); } } else if (p->io->status == SPA_RESULT_OK) - n->ready_in++; + n->ready++; } break; diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h index 64301c724..f9174d79c 100644 --- a/spa/include/spa/graph-scheduler3.h +++ b/spa/include/spa/graph-scheduler3.h @@ -36,20 +36,20 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n spa_list_init(&ready); - node->ready_in = 0; + node->ready[SPA_DIRECTION_INPUT] = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { struct spa_graph_port *pport; struct spa_graph_node *pnode; if ((pport = p->peer) == NULL) continue; pnode = pport->node; - debug("node %p peer %p io %d\n", node, pnode, pport->io->status); + debug("node %p peer %p io %d %d\n", node, pnode, pport->io->status, pport->io->buffer_id); if (pport->io->status == SPA_RESULT_NEED_BUFFER) { if (pnode->ready_link.next == NULL) spa_list_append(&ready, &pnode->ready_link); } else if (pport->io->status == SPA_RESULT_OK && !(pnode->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - node->ready_in++; + node->ready[SPA_DIRECTION_INPUT]++; } spa_list_for_each_safe(n, t, &ready, ready_link) { @@ -60,23 +60,23 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n else { spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { if (p->io->status == SPA_RESULT_HAVE_BUFFER) - node->ready_in++; + node->ready[SPA_DIRECTION_INPUT]++; } } spa_list_remove(&n->ready_link); n->ready_link.next = NULL; } - debug("node %p ready_in:%d required_in:%d\n", node, node->ready_in, node->required_in); + debug("node %p ready:%d required:%d\n", node, node->ready[SPA_DIRECTION_INPUT], node->required[SPA_DIRECTION_INPUT]); - if (node->required_in > 0 && node->ready_in == node->required_in) { + if (node->required[SPA_DIRECTION_INPUT] > 0 && node->ready[SPA_DIRECTION_INPUT] == node->required[SPA_DIRECTION_INPUT]) { node->state = spa_node_process_input(node->implementation); debug("node %p processed in %d\n", node, node->state); if (node->state == SPA_RESULT_HAVE_BUFFER) { spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { if (p->io->status == SPA_RESULT_HAVE_BUFFER) if (p->peer) - p->peer->node->ready_in++; + p->peer->node->ready[SPA_DIRECTION_INPUT]++; } } } @@ -100,12 +100,12 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node * continue; pnode = pport->node; if (pport->io->status == SPA_RESULT_HAVE_BUFFER) - pnode->ready_in++; + pnode->ready[SPA_DIRECTION_INPUT]++; debug("node %p peer %p io %d %d %d\n", node, pnode, pport->io->status, - pnode->ready_in, pnode->required_in); + pnode->ready[SPA_DIRECTION_INPUT], pnode->required[SPA_DIRECTION_INPUT]); - if (pnode->required_in > 0 && pnode->ready_in == pnode->required_in) + if (pnode->required > 0 && pnode->ready == pnode->required) if (pnode->ready_link.next == NULL) spa_list_append(&ready, &pnode->ready_link); } @@ -116,11 +116,11 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node * if (n->state == SPA_RESULT_HAVE_BUFFER) spa_graph_have_output(n->graph, n); else { - n->ready_in = 0; + n->ready[SPA_DIRECTION_INPUT] = 0; spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - n->ready_in++; + n->ready[SPA_DIRECTION_INPUT]++; } } spa_list_remove(&n->ready_link); @@ -130,10 +130,10 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node * node->state = spa_node_process_output(node->implementation); debug("node %p processed out %d\n", node, node->state); if (node->state == SPA_RESULT_NEED_BUFFER) { - node->ready_in = 0; + node->ready[SPA_DIRECTION_INPUT] = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - node->ready_in++; + node->ready[SPA_DIRECTION_INPUT]++; } } return SPA_RESULT_OK; diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index d2616196e..dc9df0bd8 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -65,8 +65,8 @@ struct spa_graph_node { struct spa_list ready_link; /**< link for scheduler */ #define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0) uint32_t flags; /**< node flags */ - uint32_t required_in; /**< required number of ports */ - uint32_t ready_in; /**< number of ports with data */ + uint32_t required[2]; /**< required number of ports */ + uint32_t ready[2]; /**< number of ports with data */ int state; /**< state of the node */ struct spa_node *implementation;/**< node implementation */ void *scheduler_data; /**< scheduler private data */ @@ -103,7 +103,8 @@ spa_graph_node_init(struct spa_graph_node *node) spa_list_init(&node->ports[SPA_DIRECTION_INPUT]); spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]); node->flags = 0; - node->required_in = node->ready_in = 0; + node->required[SPA_DIRECTION_INPUT] = node->ready[SPA_DIRECTION_INPUT] = 0; + node->required[SPA_DIRECTION_OUTPUT] = node->ready[SPA_DIRECTION_OUTPUT] = 0; debug("node %p init\n", node); } @@ -146,8 +147,8 @@ spa_graph_port_add(struct spa_graph_node *node, debug("port %p add to node %p\n", port, node); port->node = node; spa_list_append(&node->ports[port->direction], &port->link); - if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT) - node->required_in++; + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL)) + node->required[port->direction]++; } static inline void spa_graph_node_remove(struct spa_graph_node *node) @@ -162,8 +163,8 @@ static inline void spa_graph_port_remove(struct spa_graph_port *port) { debug("port %p remove\n", port); spa_list_remove(&port->link); - if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT) - port->node->required_in--; + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL)) + port->node->required[port->direction]--; } static inline void diff --git a/src/modules/module-jack/jack-node.c b/src/modules/module-jack/jack-node.c index e43010cf2..259ea8b56 100644 --- a/src/modules/module-jack/jack-node.c +++ b/src/modules/module-jack/jack-node.c @@ -299,7 +299,7 @@ static int driver_process_output(struct spa_node *node) out->outbuf->datas[0].chunk->size = ctrl->buffer_size * sizeof(int16_t) * 2; spa_hook_list_call(&nd->listener_list, struct pw_jack_node_events, push); - gn->ready_in = gn->required_in; + gn->ready[SPA_DIRECTION_INPUT] = gn->required[SPA_DIRECTION_OUTPUT] = 0; return SPA_RESULT_HAVE_BUFFER; } diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 47cdc1e6a..6d6af4c84 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -433,11 +433,11 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ if (pn->state == SPA_RESULT_HAVE_BUFFER) spa_graph_have_output(data->node->rt.graph, pn); else { - pn->ready_in = 0; + pn->ready[SPA_DIRECTION_INPUT] = 0; spa_list_for_each(pp, &pn->ports[SPA_DIRECTION_INPUT], link) { if (pp->io->status == SPA_RESULT_OK && !(pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - pn->ready_in++; + pn->ready[SPA_DIRECTION_INPUT]++; } } } @@ -446,8 +446,10 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ spa_list_for_each(port, &n->ports[SPA_DIRECTION_OUTPUT], link) { pn = port->peer->node; pn->state = spa_node_process_output(pn->implementation); - if (pn->state == SPA_RESULT_NEED_BUFFER) + pw_log_trace("node %p: process output %d", pn->implementation, pn->state); + if (pn->state == SPA_RESULT_NEED_BUFFER) { spa_graph_need_input(data->node->rt.graph, pn); + } } } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {