From df59183a66c7cd844cbeb7e629a647e5e09fd2c0 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sun, 22 Oct 2017 13:12:34 +0200 Subject: [PATCH] scheduler: add new scheduler Make port status SPA_RESULT_OK until events changes it and data processing can start Only start pulling on ports in the OK state Change we way we handle client-nodes, handle them async and continue processing after they signaled completion Add a new scheduler that decouples push and pull. It pushes to peer elements when all inputs are provided and pulls from nodes when all peer outputs are processed. --- spa/include/spa/graph-scheduler6.h | 148 +++++++++++++++++++ spa/include/spa/graph.h | 2 +- spa/include/spa/node.h | 2 +- spa/plugins/alsa/alsa-utils.c | 28 +++- spa/plugins/v4l2/v4l2-utils.c | 2 +- src/modules/module-client-node/client-node.c | 23 ++- src/pipewire/core.c | 2 +- src/pipewire/link.c | 3 +- src/pipewire/port.c | 3 +- src/pipewire/remote.c | 12 +- 10 files changed, 188 insertions(+), 37 deletions(-) create mode 100644 spa/include/spa/graph-scheduler6.h diff --git a/spa/include/spa/graph-scheduler6.h b/spa/include/spa/graph-scheduler6.h new file mode 100644 index 000000000..3b45a0c30 --- /dev/null +++ b/spa/include/spa/graph-scheduler6.h @@ -0,0 +1,148 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node) +{ + struct spa_graph_port *p; + struct spa_graph_node *n, *t; + struct spa_list ready; + + spa_debug("node %p start pull", node); + + spa_list_init(&ready); + + node->ready[SPA_DIRECTION_INPUT] = 0; + node->required[SPA_DIRECTION_INPUT] = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_port *pport; + struct spa_graph_node *pnode; + uint32_t prequired, pready; + + if ((pport = p->peer) == NULL) { + spa_debug("node %p port %p has no peer", node, p); + continue; + } + pnode = pport->node; + + if (pport->io->status == SPA_RESULT_NEED_BUFFER) { + pnode->ready[SPA_DIRECTION_OUTPUT]++; + node->required[SPA_DIRECTION_INPUT]++; + } + + pready = pnode->ready[SPA_DIRECTION_OUTPUT]; + prequired = pnode->required[SPA_DIRECTION_OUTPUT]; + + spa_debug("node %p peer %p io %d %d %d %d", node, pnode, pport->io->status, + pport->io->buffer_id, pready, prequired); + + if (prequired > 0 && pready >= prequired) + if (pnode->ready_link.next == NULL) + spa_list_append(&ready, &pnode->ready_link); + } + + spa_list_for_each_safe(n, t, &ready, ready_link) { + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + + n->state = spa_node_process_output(n->implementation); + + spa_debug("peer %p processed out %d", n, n->state); + if (n->state == SPA_RESULT_NEED_BUFFER) + spa_graph_need_input(n->graph, n); + else if (n->state == SPA_RESULT_HAVE_BUFFER) + spa_graph_have_output(n->graph, n); + } + return SPA_RESULT_OK; +} + +static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *node) +{ + struct spa_graph_port *p; + struct spa_list ready; + struct spa_graph_node *n, *t; + + spa_debug("node %p start push", node); + + spa_list_init(&ready); + + node->ready[SPA_DIRECTION_OUTPUT] = 0; + node->required[SPA_DIRECTION_OUTPUT] = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + struct spa_graph_port *pport; + struct spa_graph_node *pnode; + uint32_t prequired, pready; + + if ((pport = p->peer) == NULL) { + spa_debug("node %p port %p has no peer", node, p); + continue; + } + pnode = pport->node; + + if (p->io->status == SPA_RESULT_HAVE_BUFFER) { + pnode->ready[SPA_DIRECTION_INPUT]++; + node->required[SPA_DIRECTION_OUTPUT]++; + } + + pready = pnode->ready[SPA_DIRECTION_INPUT]; + prequired = pnode->required[SPA_DIRECTION_INPUT]; + + spa_debug("node %p peer %p io %d %d %d", node, pnode, pport->io->status, + pready, prequired); + + if (prequired > 0 && pready >= prequired) + if (pnode->ready_link.next == NULL) + spa_list_append(&ready, &pnode->ready_link); + } + + spa_list_for_each_safe(n, t, &ready, ready_link) { + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + + n->state = spa_node_process_input(n->implementation); + + spa_debug("node %p chain processed in %d", n, n->state); + if (n->state == SPA_RESULT_HAVE_BUFFER) + spa_graph_have_output(n->graph, n); + else if (n->state == SPA_RESULT_NEED_BUFFER) + spa_graph_need_input(n->graph, n); + } + return SPA_RESULT_OK; +} + +static const struct spa_graph_callbacks spa_graph_impl_default = { + SPA_VERSION_GRAPH_CALLBACKS, + .need_input = spa_graph_impl_need_input, + .have_output = spa_graph_impl_have_output, +}; + + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index 10af2313a..f41be330e 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -116,7 +116,7 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node) { node->graph = graph; - node->state = SPA_RESULT_NEED_BUFFER; + node->state = SPA_RESULT_OK; node->ready_link.next = NULL; spa_list_append(&graph->nodes, &node->link); spa_debug("node %p add", node); diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index 58b4d09bc..f4a547ba6 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -56,7 +56,7 @@ struct spa_port_io { struct spa_range range; /**< the requested range */ }; -#define SPA_PORT_IO_INIT (struct spa_port_io) { SPA_RESULT_NEED_BUFFER, SPA_ID_INVALID, } +#define SPA_PORT_IO_INIT (struct spa_port_io) { SPA_RESULT_OK, SPA_ID_INVALID, } /** * struct spa_port_info diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index a85a80dea..426187f75 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -311,6 +311,22 @@ static int set_swparams(struct state *state) return 0; } +static inline void try_pull(struct state *state, snd_pcm_uframes_t frames, bool do_pull) +{ + struct spa_port_io *io = state->io; + + if (spa_list_is_empty(&state->ready) && do_pull) { + spa_log_trace(state->log, "alsa-util %p: %d", state, io->status); + if (io->status != SPA_RESULT_OK) + return; + io->status = SPA_RESULT_NEED_BUFFER; + io->range.offset = state->sample_count * state->frame_size; + io->range.min_size = state->threshold * state->frame_size; + io->range.max_size = frames * state->frame_size; + state->callbacks->need_input(state->callbacks_data); + } +} + static inline snd_pcm_uframes_t pull_frames(struct state *state, const snd_pcm_channel_area_t *my_areas, @@ -319,15 +335,9 @@ pull_frames(struct state *state, bool do_pull) { snd_pcm_uframes_t total_frames = 0, to_write = frames; - struct spa_port_io *io = state->io; - if (spa_list_is_empty(&state->ready) && do_pull) { - io->status = SPA_RESULT_NEED_BUFFER; - io->range.offset = state->sample_count * state->frame_size; - io->range.min_size = state->threshold * state->frame_size; - io->range.max_size = frames * state->frame_size; - state->callbacks->need_input(state->callbacks_data); - } + try_pull(state, frames, do_pull); + while (!spa_list_is_empty(&state->ready) && to_write > 0) { uint8_t *src, *dst; size_t n_bytes, n_frames, size; @@ -374,6 +384,8 @@ pull_frames(struct state *state, spa_log_trace(state->log, "alsa-util %p: reuse buffer %u", state, b->outbuf->id); state->callbacks->reuse_buffer(state->callbacks_data, 0, b->outbuf->id); state->ready_offset = 0; + + try_pull(state, frames, do_pull); } total_frames += n_frames; to_write -= n_frames; diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index aa1a7e464..73dcd4256 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -931,7 +931,7 @@ static int mmap_read(struct impl *this) d[0].chunk->stride = state->fmt.fmt.pix.bytesperline; b->outstanding = true; - if (io->status == SPA_RESULT_NEED_BUFFER) { + if (io->status != SPA_RESULT_HAVE_BUFFER) { io->buffer_id = b->outbuf->id; io->status = SPA_RESULT_HAVE_BUFFER; this->callbacks->have_output(this->callbacks_data); diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 181f7c537..2e7479939 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -756,16 +756,12 @@ static int spa_proxy_node_process_input(struct spa_node *node) impl = this->impl; for (i = 0; i < MAX_INPUTS; i++) { - struct spa_port_io *io = this->in_ports[i].io, tmp; + struct spa_port_io *io = this->in_ports[i].io; if (!io) continue; - tmp = impl->transport->inputs[i]; impl->transport->inputs[i] = *io; - if (res == SPA_RESULT_OK) - res = tmp.status; - *io = tmp; pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, impl->transport->inputs[i].status, impl->transport->inputs[i].buffer_id); @@ -793,16 +789,12 @@ static int spa_proxy_node_process_output(struct spa_node *node) impl = this->impl; for (i = 0; i < MAX_OUTPUTS; i++) { - struct spa_port_io *io = this->out_ports[i].io, tmp; + struct spa_port_io *io = this->out_ports[i].io; if (!io) continue; - tmp = impl->transport->outputs[i]; impl->transport->outputs[i] = *io; - if (res == SPA_RESULT_OK) - res = tmp.status; - *io = tmp; pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, impl->transport->outputs[i].status, impl->transport->outputs[i].buffer_id); @@ -828,12 +820,19 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message continue; *io = impl->transport->outputs[i]; - impl->transport->outputs[i].buffer_id = SPA_ID_INVALID; - impl->transport->outputs[i].status = SPA_RESULT_OK; pw_log_trace("%d %d", io->status, io->buffer_id); } this->callbacks->have_output(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { + for (i = 0; i < MAX_INPUTS; i++) { + struct spa_port_io *io = this->in_ports[i].io; + + if (!io) + continue; + + *io = impl->transport->inputs[i]; + pw_log_trace("%d %d", io->status, io->buffer_id); + } this->callbacks->need_input(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { if (impl->client_reuse) { diff --git a/src/pipewire/core.c b/src/pipewire/core.c index e5d3de0b2..9af267dfb 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -32,7 +32,7 @@ #include #include -#include +#include /** \cond */ struct resource_data { diff --git a/src/pipewire/link.c b/src/pipewire/link.c index dfa923070..7c80c0cde 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -1124,8 +1124,7 @@ struct pw_link *pw_link_new(struct pw_core *core, this->info.format = NULL; this->info.props = this->properties ? &this->properties->dict : NULL; - this->io.buffer_id = SPA_ID_INVALID; - this->io.status = SPA_RESULT_NEED_BUFFER; + this->io = SPA_PORT_IO_INIT; spa_graph_port_init(&this->rt.out_port, PW_DIRECTION_OUTPUT, diff --git a/src/pipewire/port.c b/src/pipewire/port.c index de8ea1006..c7217aee0 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -174,8 +174,7 @@ struct pw_port *pw_port_new(enum pw_direction direction, this->port_id = port_id; this->properties = properties; this->state = PW_PORT_STATE_INIT; - this->io.status = SPA_RESULT_NEED_BUFFER; - this->io.buffer_id = SPA_ID_INVALID; + this->io = SPA_PORT_IO_INIT; if (user_data_size > 0) this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 4c045e2f7..11a80c931 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -565,8 +565,7 @@ static void client_node_transport(void *object, uint32_t node_id, sizeof(struct port)); for (i = 0; i < data->trans->area->max_input_ports; i++) { - data->trans->inputs[i].status = SPA_RESULT_NEED_BUFFER; - data->trans->inputs[i].buffer_id = SPA_ID_INVALID; + data->trans->inputs[i] = SPA_PORT_IO_INIT; spa_graph_port_init(&data->in_ports[i].input, SPA_DIRECTION_INPUT, i, @@ -1108,22 +1107,17 @@ static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint3 static int impl_process_input(struct spa_node *node) { -#if 0 struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, out_node_impl); - node_have_output(data); -#endif pw_log_trace("node %p: process input", node); + node_have_output(data); return SPA_RESULT_OK; } static int impl_process_output(struct spa_node *node) { -#if 0 struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, in_node_impl); - node_need_input(data); - pw_log_trace("node %p: need input", node); -#endif pw_log_trace("node %p: process output", node); + node_need_input(data); return SPA_RESULT_OK; }