From d2f877912aabda296ea16fb12953ad39209b36ab Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 30 Jun 2017 19:32:11 +0200 Subject: [PATCH] Use graph to schedule things Make real spa_graph nodes and ports and schedule those. This makes it possible to add explicit tee and mixers in the real graph. Rework the way we add and remove ports and nodes from the graph. Remove confusing pw_port_link and merge core with pw_link_new() Move scheduling in separate files, add some more graph-schedulers. --- pipewire/modules/module-autolink.c | 4 +- .../modules/module-client-node/client-node.c | 5 +- pipewire/modules/module-mixer.c | 2 +- pipewire/server/core.c | 3 + pipewire/server/core.h | 6 + pipewire/server/link.c | 181 +++++++++--- pipewire/server/link.h | 15 +- pipewire/server/node.c | 124 ++------ pipewire/server/node.h | 6 + pipewire/server/port.c | 266 ++++++++---------- pipewire/server/port.h | 20 +- spa/include/spa/graph-scheduler1.h | 137 +++++++++ spa/include/spa/graph-scheduler2.h | 153 ++++++++++ spa/include/spa/graph-scheduler3.h | 173 ++++++++++++ spa/include/spa/graph.h | 127 +++------ spa/include/spa/list.h | 8 + spa/tests/test-graph.c | 13 +- spa/tests/test-mixer.c | 15 +- spa/tests/test-perf.c | 15 +- 19 files changed, 845 insertions(+), 428 deletions(-) create mode 100644 spa/include/spa/graph-scheduler1.h create mode 100644 spa/include/spa/graph-scheduler2.h create mode 100644 spa/include/spa/graph-scheduler3.h diff --git a/pipewire/modules/module-autolink.c b/pipewire/modules/module-autolink.c index f4cb6132e..02746a2d7 100644 --- a/pipewire/modules/module-autolink.c +++ b/pipewire/modules/module-autolink.c @@ -159,9 +159,9 @@ static void try_link_port(struct pw_node *node, struct pw_port *port, struct nod goto error; if (port->direction == PW_DIRECTION_OUTPUT) - link = pw_port_link(port, target, NULL, NULL, &error); + link = pw_link_new(impl->core, port, target, NULL, NULL, &error); else - link = pw_port_link(target, port, NULL, NULL, &error); + link = pw_link_new(impl->core, target, port, NULL, NULL, &error); if (link == NULL) goto error; diff --git a/pipewire/modules/module-client-node/client-node.c b/pipewire/modules/module-client-node/client-node.c index f049054f8..2d8f4da33 100644 --- a/pipewire/modules/module-client-node/client-node.c +++ b/pipewire/modules/module-client-node/client-node.c @@ -782,7 +782,10 @@ static int spa_proxy_node_process_input(struct spa_node *node) } send_have_output(this); - return SPA_RESULT_OK; + if (this->callbacks->need_input) + return SPA_RESULT_OK; + else + return SPA_RESULT_NEED_BUFFER; } static int spa_proxy_node_process_output(struct spa_node *node) diff --git a/pipewire/modules/module-mixer.c b/pipewire/modules/module-mixer.c index 716d04916..ff133f846 100644 --- a/pipewire/modules/module-mixer.c +++ b/pipewire/modules/module-mixer.c @@ -157,7 +157,7 @@ static struct impl *module_new(struct pw_core *core, struct pw_properties *prope if (op == NULL) continue; - pw_port_link(op, ip, NULL, NULL, &error); + pw_link_new(core, op, ip, NULL, NULL, &error); } return impl; } diff --git a/pipewire/server/core.c b/pipewire/server/core.c index 2aed3bafc..9b8c981a8 100644 --- a/pipewire/server/core.c +++ b/pipewire/server/core.c @@ -291,6 +291,9 @@ struct pw_core *pw_core_new(struct pw_main_loop *main_loop, struct pw_properties pw_type_init(&this->type); pw_map_init(&this->objects, 128, 32); + spa_graph_init(&this->rt.graph); + spa_graph_scheduler_init(&this->rt.sched, &this->rt.graph); + spa_debug_set_type_map(this->type.map); impl->support[0] = SPA_SUPPORT_INIT(SPA_TYPE__TypeMap, this->type.map); diff --git a/pipewire/server/core.h b/pipewire/server/core.h index cd9a076cb..01950ca9b 100644 --- a/pipewire/server/core.h +++ b/pipewire/server/core.h @@ -25,6 +25,7 @@ extern "C" { #endif #include +#include struct pw_global; @@ -180,6 +181,11 @@ struct pw_core { /** Emited when a global is removed */ PW_SIGNAL(global_removed, (struct pw_listener *listener, struct pw_core *core, struct pw_global *global)); + + struct { + struct spa_graph_scheduler sched; + struct spa_graph graph; + } rt; }; struct pw_core * diff --git a/pipewire/server/link.c b/pipewire/server/link.c index 5f62d7c57..3f0855705 100644 --- a/pipewire/server/link.c +++ b/pipewire/server/link.c @@ -758,21 +758,59 @@ on_output_async_complete_notify(struct pw_listener *listener, pw_work_queue_complete(impl->work, node, seq, res); } +static int +do_remove_input(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + struct pw_port *port = ((struct pw_port **) data)[0]; + spa_graph_port_remove(port->rt.graph, &this->rt.in_port); + return SPA_RESULT_OK; +} + +static void input_remove(struct pw_link *this, struct pw_port *port) +{ + struct impl *impl = (struct impl *) this; + + pw_log_debug("link %p: remove input port %p", this, port); + pw_signal_remove(&impl->input_port_destroy); + pw_signal_remove(&impl->input_async_complete); + pw_loop_invoke(port->node->data_loop->loop, + do_remove_input, 1, sizeof(struct pw_port*), &port, true, this); +} + +static int +do_remove_output(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + struct pw_port *port = ((struct pw_port **) data)[0]; + spa_graph_port_remove(port->rt.graph, &this->rt.out_port); + return SPA_RESULT_OK; +} + +static void output_remove(struct pw_link *this, struct pw_port *port) +{ + struct impl *impl = (struct impl *) this; + + pw_log_debug("link %p: remove output port %p", this, port); + pw_signal_remove(&impl->output_port_destroy); + pw_signal_remove(&impl->output_async_complete); + pw_loop_invoke(port->node->data_loop->loop, + do_remove_output, 1, sizeof(struct pw_port*), &port, true, this); +} + static void on_port_destroy(struct pw_link *this, struct pw_port *port) { struct impl *impl = (struct impl *) this; struct pw_port *other; if (port == this->input) { - pw_log_debug("link %p: input port destroyed %p", this, port); - pw_signal_remove(&impl->input_port_destroy); - pw_signal_remove(&impl->input_async_complete); + input_remove(this, port); this->input = NULL; other = this->output; } else if (port == this->output) { - pw_log_debug("link %p: output port destroyed %p", this, port); - pw_signal_remove(&impl->output_port_destroy); - pw_signal_remove(&impl->output_async_complete); + output_remove(this, port); this->output = NULL; other = this->input; } else @@ -784,6 +822,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port) pw_log_debug("link %p: clear input allocated buffers on port %p", this, other); pw_port_use_buffers(other, NULL, 0); + impl->buffer_owner = NULL; } pw_signal_emit(&this->port_unlinked, this, port); @@ -863,18 +902,54 @@ link_bind_func(struct pw_global *global, struct pw_client *client, uint32_t vers return SPA_RESULT_NO_MEMORY; } +static int +do_add_link(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + struct pw_port *port = ((struct pw_port **) data)[0]; + + if (port->direction == PW_DIRECTION_OUTPUT) { + spa_graph_port_add(port->rt.graph, + &port->rt.mix_node, + &this->rt.out_port, + PW_DIRECTION_OUTPUT, + this->rt.out_port.port_id, + 0, + &this->io); + } else { + spa_graph_port_add(port->rt.graph, + &port->rt.mix_node, + &this->rt.in_port, + PW_DIRECTION_INPUT, + this->rt.in_port.port_id, + 0, + &this->io); + } + + return SPA_RESULT_OK; +} + struct pw_link *pw_link_new(struct pw_core *core, struct pw_port *output, struct pw_port *input, struct spa_format *format_filter, - struct pw_properties *properties) + struct pw_properties *properties, + char **error) { struct impl *impl; struct pw_link *this; + struct pw_node *input_node, *output_node; + + if (output == input) + goto same_ports; + + if (pw_link_find(output, input)) + goto link_exists; impl = calloc(1, sizeof(struct impl)); if (impl == NULL) - return NULL; + goto no_mem; this = &impl->this; pw_log_debug("link %p: new", this); @@ -888,6 +963,9 @@ struct pw_link *pw_link_new(struct pw_core *core, this->input = input; this->output = output; + input_node = input->node; + output_node = output->node; + spa_list_init(&this->resource_list); pw_signal_init(&this->port_unlinked); pw_signal_init(&this->state_changed); @@ -895,21 +973,33 @@ struct pw_link *pw_link_new(struct pw_core *core, impl->format_filter = format_filter; - pw_signal_add(&this->input->destroy_signal, + pw_signal_add(&input->destroy_signal, &impl->input_port_destroy, on_input_port_destroy); - pw_signal_add(&this->input->node->async_complete, + pw_signal_add(&input_node->async_complete, &impl->input_async_complete, on_input_async_complete_notify); - pw_signal_add(&this->output->destroy_signal, + pw_signal_add(&output->destroy_signal, &impl->output_port_destroy, on_output_port_destroy); - pw_signal_add(&this->output->node->async_complete, + pw_signal_add(&output_node->async_complete, &impl->output_async_complete, on_output_async_complete_notify); pw_log_debug("link %p: constructed %p:%d -> %p:%d", impl, - this->output->node, this->output->port_id, - this->input->node, this->input->port_id); + output_node, output->port_id, input_node, input->port_id); + + input_node->live = output_node->live; + if (output_node->clock) + input_node->clock = output_node->clock; + + pw_log_debug("link %p: output node %p clock %p, live %d", this, output_node, output_node->clock, + output_node->live); + + spa_list_insert(output->links.prev, &this->output_link); + spa_list_insert(input->links.prev, &this->input_link); + + output_node->n_used_output_links++; + input_node->n_used_input_links++; spa_list_insert(core->link_list.prev, &this->link); @@ -922,7 +1012,26 @@ struct pw_link *pw_link_new(struct pw_core *core, this->info.input_port_id = input ? input->port_id : -1; this->info.format = NULL; + spa_graph_port_link(output_node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port); + + pw_loop_invoke(output_node->data_loop->loop, + do_add_link, + SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this); + pw_loop_invoke(input_node->data_loop->loop, + do_add_link, + SPA_ID_INVALID, sizeof(struct pw_port *), &input, false, this); + return this; + + same_ports: + asprintf(error, "can't link the same ports"); + return NULL; + link_exists: + asprintf(error, "link already exists"); + return NULL; + no_mem: + asprintf(error, "no memory"); + return NULL; } static void clear_port_buffers(struct pw_link *link, struct pw_port *port) @@ -933,22 +1042,6 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port) pw_port_use_buffers(port, NULL, 0); } -static int -do_link_remove(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct pw_link *this = user_data; - - if (this->rt.input) { - spa_list_remove(&this->rt.input_link); - this->rt.input = NULL; - } - if (this->rt.output) { - spa_list_remove(&this->rt.output_link); - this->rt.output = NULL; - } - return SPA_RESULT_OK; -} void pw_link_destroy(struct pw_link *link) { @@ -965,21 +1058,8 @@ void pw_link_destroy(struct pw_link *link) pw_resource_destroy(resource); if (link->input) { - pw_signal_remove(&impl->input_port_destroy); - pw_signal_remove(&impl->input_async_complete); + input_remove(link, link->input); - pw_loop_invoke(link->input->node->data_loop->loop, - do_link_remove, 1, 0, NULL, true, link); - } - if (link->output) { - pw_signal_remove(&impl->output_port_destroy); - pw_signal_remove(&impl->output_async_complete); - - pw_loop_invoke(link->output->node->data_loop->loop, - do_link_remove, 2, 0, NULL, true, link); - } - - if (link->input) { spa_list_remove(&link->input_link); link->input->node->n_used_input_links--; @@ -993,6 +1073,8 @@ void pw_link_destroy(struct pw_link *link) link->input = NULL; } if (link->output) { + output_remove(link, link->output); + spa_list_remove(&link->output_link); link->output->node->n_used_output_links--; @@ -1016,3 +1098,14 @@ void pw_link_destroy(struct pw_link *link) free(impl); } + +struct pw_link *pw_link_find(struct pw_port *output_port, struct pw_port *input_port) +{ + struct pw_link *pl; + + spa_list_for_each(pl, &output_port->links, output_link) { + if (pl->input == input_port) + return pl; + } + return NULL; +} diff --git a/pipewire/server/link.h b/pipewire/server/link.h index d9be699d2..f55d22580 100644 --- a/pipewire/server/link.h +++ b/pipewire/server/link.h @@ -68,19 +68,20 @@ struct pw_link { struct spa_list resource_list; /**< list of bound resources */ + struct spa_port_io io; /**< link io area */ + struct pw_port *output; /**< output port */ struct spa_list output_link; /**< link in output port links */ struct pw_port *input; /**< input port */ struct spa_list input_link; /**< link in input port links */ + /** Emited when the port is unlinked */ PW_SIGNAL(port_unlinked, (struct pw_listener *listener, struct pw_link *link, struct pw_port *port)); struct { - struct pw_port *input; - struct pw_port *output; - struct spa_list input_link; - struct spa_list output_link; + struct spa_graph_port out_port; + struct spa_graph_port in_port; } rt; }; @@ -92,11 +93,15 @@ pw_link_new(struct pw_core *core, /**< the core object */ struct pw_port *output, /**< an output port */ struct pw_port *input, /**< an input port */ struct spa_format *format_filter, /**< an optional format filter */ - struct pw_properties *properties /**< extra properties */); + struct pw_properties *properties /**< extra properties */, + char **error /**< error string */); /** Destroy a link \memberof pw_link */ void pw_link_destroy(struct pw_link *link); +/** Find the link between 2 ports \memberof pw_link */ +struct pw_link * pw_link_find(struct pw_port *output, struct pw_port *input); + /** Activate a link \memberof pw_link * Starts the negotiation of formats and buffers on \a link and then * starts data streaming */ diff --git a/pipewire/server/node.c b/pipewire/server/node.c index f69cd8f8f..a2daafa9e 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -21,6 +21,8 @@ #include #include +#include + #include "pipewire/client/pipewire.h" #include "pipewire/client/interfaces.h" @@ -237,63 +239,6 @@ static void send_clock_update(struct pw_node *this) pw_log_debug("got error %d", res); } -static int do_pull(struct pw_node *this) -{ - int res = SPA_RESULT_OK; - struct pw_port *inport; - bool have_output = false; - - spa_list_for_each(inport, &this->input_ports, link) { - struct pw_link *link; - struct pw_port *outport; - struct spa_port_io *pi; - struct spa_port_io *po; - - pi = &inport->io; - pw_log_trace("node %p: need input port %d, %d %d", this, - inport->port_id, pi->buffer_id, pi->status); - - if (pi->status != SPA_RESULT_NEED_BUFFER) - continue; - - spa_list_for_each(link, &inport->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - outport = link->rt.output; - po = &outport->io; - - /* pull */ - *po = *pi; - pi->buffer_id = SPA_ID_INVALID; - - pw_log_trace("node %p: process output %p %d", outport->node, po, - po->buffer_id); - - res = spa_node_process_output(outport->node->node); - - if (res == SPA_RESULT_NEED_BUFFER) { - res = do_pull(outport->node); - pw_log_trace("node %p: pull return %d", outport->node, res); - } - if (res == SPA_RESULT_HAVE_BUFFER) { - *pi = *po; - pw_log_trace("node %p: have output %d %d", this, pi->status, - pi->buffer_id); - have_output = true; - } else if (res < 0) { - pw_log_warn("node %p: got process output %d", outport->node, res); - } - - } - } - if (have_output) { - pw_log_trace("node %p: doing process input", this); - res = spa_node_process_input(this->node); - } - return res; -} - static void on_node_done(struct spa_node *node, int seq, int res, void *user_data) { struct impl *impl = user_data; @@ -319,49 +264,24 @@ static void on_node_need_input(struct spa_node *node, void *user_data) struct impl *impl = user_data; struct pw_node *this = &impl->this; - do_pull(this); + spa_graph_scheduler_pull(this->rt.sched, &this->rt.node); + while (spa_graph_scheduler_iterate(this->rt.sched)); } static void on_node_have_output(struct spa_node *node, void *user_data) { struct impl *impl = user_data; struct pw_node *this = &impl->this; - int res; - struct pw_port *outport; - spa_list_for_each(outport, &this->output_ports, link) { - struct pw_link *link; - struct spa_port_io *po; - - po = &outport->io; - if (po->buffer_id == SPA_ID_INVALID) - continue; - - pw_log_trace("node %p: have output %d", this, po->buffer_id); - - spa_list_for_each(link, &outport->rt.links, rt.output_link) { - struct pw_port *inport; - - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - inport = link->rt.input; - inport->io = *po; - - pw_log_trace("node %p: do process input %d", this, po->buffer_id); - - if ((res = spa_node_process_input(inport->node->node)) < 0) - pw_log_warn("node %p: got process input %d", inport->node, res); - - } - po->status = SPA_RESULT_NEED_BUFFER; - } - res = spa_node_process_output(this->node); + spa_graph_scheduler_push(this->rt.sched, &this->rt.node); + while (spa_graph_scheduler_iterate(this->rt.sched)); } static void on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data) { + +#if 0 struct impl *impl = user_data; struct pw_node *this = &impl->this; struct pw_port *inport; @@ -380,6 +300,7 @@ on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id outport->io.buffer_id = buffer_id; } } +#endif } static void node_unbind_func(void *data) @@ -476,11 +397,17 @@ static void init_complete(struct pw_node *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + spa_graph_node_add(this->rt.sched->graph, + &this->rt.node, + spa_graph_scheduler_default, + this->node); + update_port_ids(this); pw_log_debug("node %p: init completed", this); impl->async_init = false; spa_list_insert(this->core->node_list.prev, &this->link); + pw_core_add_global(this->core, this->owner, this->core->type.node, 0, this, node_bind_func, &this->global); @@ -529,6 +456,8 @@ struct pw_node *pw_node_new(struct pw_core *core, this->clock = clock; this->data_loop = core->data_loop; + this->rt.sched = &core->rt.sched; + spa_list_init(&this->resource_list); if (spa_node_set_callbacks(this->node, &node_callbacks, impl) < 0) @@ -585,26 +514,11 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_node *this = user_data; - struct pw_port *port, *tmp; pause_node(this); - spa_list_for_each_safe(port, tmp, &this->input_ports, link) { - struct pw_link *link, *tlink; - spa_list_for_each_safe(link, tlink, &port->rt.links, rt.input_link) { - pw_port_pause_rt(link->rt.input); - spa_list_remove(&link->rt.input_link); - link->rt.input = NULL; - } - } - spa_list_for_each_safe(port, tmp, &this->output_ports, link) { - struct pw_link *link, *tlink; - spa_list_for_each_safe(link, tlink, &port->rt.links, rt.output_link) { - pw_port_pause_rt(link->rt.output); - spa_list_remove(&link->rt.output_link); - link->rt.output = NULL; - } - } + spa_graph_node_remove(this->rt.sched->graph, &this->rt.node); + return SPA_RESULT_OK; } diff --git a/pipewire/server/node.h b/pipewire/server/node.h index c8ca05526..88b916e37 100644 --- a/pipewire/server/node.h +++ b/pipewire/server/node.h @@ -106,6 +106,12 @@ struct pw_node { struct pw_node *node, uint32_t seq, int res)); struct pw_data_loop *data_loop; /**< the data loop for this node */ + + struct { + struct spa_graph_scheduler *sched; + struct spa_graph_node node; + } rt; + }; /** Create a new node \memberof pw_node */ diff --git a/pipewire/server/port.c b/pipewire/server/port.c index cc7a23208..bfa0b3e67 100644 --- a/pipewire/server/port.c +++ b/pipewire/server/port.c @@ -33,6 +33,96 @@ struct impl { }; /** \endcond */ + +static int schedule_tee(struct spa_graph_node *node) +{ + int res; + struct pw_port *this = node->user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; + + if (node->action == SPA_GRAPH_ACTION_IN) { + if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) { + io->status = SPA_RESULT_NEED_BUFFER; + res = SPA_RESULT_NEED_BUFFER; + } + else { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) + *p->io = *io; + io->status = SPA_RESULT_OK; + io->buffer_id = SPA_ID_INVALID; + res = SPA_RESULT_HAVE_BUFFER; + } + } + else if (node->action == SPA_GRAPH_ACTION_OUT) { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) + *io = *p->io; + io->status = SPA_RESULT_NEED_BUFFER; + res = SPA_RESULT_NEED_BUFFER; + } + else + res = SPA_RESULT_ERROR; + + return res; +} + +static int schedule_mix(struct spa_graph_node *node) +{ + int res; + struct pw_port *this = node->user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; + + if (node->action == SPA_GRAPH_ACTION_IN) { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + *io = *p->io; + p->io->status = SPA_RESULT_OK; + p->io->buffer_id = SPA_ID_INVALID; + } + res = SPA_RESULT_HAVE_BUFFER; + } + else if (node->action == SPA_GRAPH_ACTION_OUT) { + io->status = SPA_RESULT_NEED_BUFFER; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) + *p->io = *io; + res = SPA_RESULT_NEED_BUFFER; + } + else + res = SPA_RESULT_ERROR; + + return res; +} + +static int do_add_port(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_port *this = user_data; + + spa_graph_port_add(this->rt.graph, + &this->node->rt.node, + &this->rt.port, + this->direction, + this->port_id, + 0, + &this->io); + spa_graph_node_add(this->rt.graph, + &this->rt.mix_node, + this->direction == PW_DIRECTION_INPUT ? schedule_mix : schedule_tee, + this); + spa_graph_port_add(this->rt.graph, + &this->rt.mix_node, + &this->rt.mix_port, + pw_direction_reverse(this->direction), + 0, + 0, + &this->io); + spa_graph_port_link(this->rt.graph, + &this->rt.port, + &this->rt.mix_port); + + return SPA_RESULT_OK; +} + struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id) { struct impl *impl; @@ -51,18 +141,46 @@ struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, u this->io.buffer_id = SPA_ID_INVALID; spa_list_init(&this->links); - spa_list_init(&this->rt.links); + pw_signal_init(&this->destroy_signal); + this->rt.graph = node->rt.sched->graph; + + pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, this); + return this; } +static int do_remove_port(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_port *this = user_data; + struct spa_graph_port *p; + + spa_graph_port_unlink(this->rt.graph, + &this->rt.port); + spa_graph_port_remove(this->rt.graph, + &this->rt.port); + + spa_list_for_each(p, &this->rt.mix_node.ports[this->direction], link) + spa_graph_port_remove(this->rt.graph, p); + + spa_graph_port_remove(this->rt.graph, + &this->rt.mix_port); + spa_graph_node_remove(this->rt.graph, + &this->rt.mix_node); + + return SPA_RESULT_OK; +} + void pw_port_destroy(struct pw_port *port) { pw_log_debug("port %p: destroy", port); pw_signal_emit(&port->destroy_signal, port); + pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port); + spa_list_remove(&port->link); free(port); @@ -76,95 +194,6 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state) } } -static int -do_add_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct pw_port *this = user_data; - struct pw_link *link = ((struct pw_link **) data)[0]; - - if (this->direction == PW_DIRECTION_INPUT) { - spa_list_insert(this->rt.links.prev, &link->rt.input_link); - link->rt.input = this; - } else { - spa_list_insert(this->rt.links.prev, &link->rt.output_link); - link->rt.output = this; - } - - return SPA_RESULT_OK; -} - -static struct pw_link *find_link(struct pw_port *output_port, struct pw_port *input_port) -{ - struct pw_link *pl; - - spa_list_for_each(pl, &output_port->links, output_link) { - if (pl->input == input_port) - return pl; - } - return NULL; -} - -struct pw_link *pw_port_link(struct pw_port *output_port, - struct pw_port *input_port, - struct spa_format *format_filter, - struct pw_properties *properties, - char **error) -{ - struct pw_node *input_node, *output_node; - struct pw_link *link; - - output_node = output_port->node; - input_node = input_port->node; - - pw_log_debug("port link %p:%u -> %p:%u", output_node, output_port->port_id, input_node, - input_port->port_id); - - if (output_node == input_node) - goto same_node; - - if (!spa_list_is_empty(&input_port->links)) - goto was_linked; - - link = find_link(output_port, input_port); - - if (link == NULL) { - input_node->live = output_node->live; - if (output_node->clock) - input_node->clock = output_node->clock; - pw_log_debug("node %p: clock %p, live %d", output_node, output_node->clock, - output_node->live); - - link = pw_link_new(output_node->core, - output_port, input_port, format_filter, properties); - if (link == NULL) - goto no_mem; - - spa_list_insert(output_port->links.prev, &link->output_link); - spa_list_insert(input_port->links.prev, &link->input_link); - - output_node->n_used_output_links++; - input_node->n_used_input_links++; - - pw_loop_invoke(output_node->data_loop->loop, - do_add_link, - SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, output_port); - pw_loop_invoke(input_node->data_loop->loop, - do_add_link, - SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, input_port); - } - return link; - - same_node: - asprintf(error, "can't link a node to itself"); - return NULL; - was_linked: - asprintf(error, "input port was already linked"); - return NULL; - no_mem: - return NULL; -} - int pw_port_pause_rt(struct pw_port *port) { int res; @@ -181,63 +210,6 @@ int pw_port_pause_rt(struct pw_port *port) return res; } -static int -do_remove_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct pw_port *port = user_data; - struct pw_link *link = ((struct pw_link **) data)[0]; - - if (port->direction == PW_DIRECTION_INPUT) { - pw_port_pause_rt(link->rt.input); - spa_list_remove(&link->rt.input_link); - link->rt.input = NULL; - } else { - pw_port_pause_rt(link->rt.output); - spa_list_remove(&link->rt.output_link); - link->rt.output = NULL; - } - return SPA_RESULT_OK; -} - -int pw_port_unlink(struct pw_port *port, struct pw_link *link) -{ - int res; - struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); - struct pw_node *node = port->node; - - pw_log_debug("port %p: start unlink %p", port, link); - - res = pw_loop_invoke(node->data_loop->loop, - do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, true, port); - - if (port->state > PW_PORT_STATE_PAUSED) - port_update_state (port, PW_PORT_STATE_PAUSED); - - pw_log_debug("port %p: finish unlink", port); - if (port->direction == PW_DIRECTION_OUTPUT) { - if (link->output) { - spa_list_remove(&link->output_link); - node->n_used_output_links--; - link->output = NULL; - } - } else { - if (link->input) { - spa_list_remove(&link->input_link); - node->n_used_input_links--; - link->input = NULL; - } - } - - if (!port->allocated) - pw_port_use_buffers(port, NULL, 0); - - if (node->n_used_output_links == 0 && node->n_used_input_links == 0) - pw_node_update_state(node, PW_NODE_STATE_IDLE, NULL); - - return res; -} - static int do_port_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) diff --git a/pipewire/server/port.h b/pipewire/server/port.h index bcd0babef..c3184db99 100644 --- a/pipewire/server/port.h +++ b/pipewire/server/port.h @@ -75,8 +75,10 @@ struct pw_port { void *multiplex; /**< optional port buffer mix/split */ struct { - struct spa_list links; /**< list of \ref pw_link only accessed from the - * data thread */ + struct spa_graph *graph; + struct spa_graph_port port; + struct spa_graph_port mix_port; + struct spa_graph_node mix_node; } rt; /**< data only accessed from the data thread */ }; @@ -88,20 +90,6 @@ pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id) /** Destroy a port \memberof pw_port */ void pw_port_destroy(struct pw_port *port); -/** Link two ports with an optional filter \memberof pw_port - * \return a newly allocated \ref pw_link or NULL and \a error is set. - * - * If the ports were already linked, the existing link will be returned. */ -struct pw_link * -pw_port_link(struct pw_port *output_port, /**< output port */ - struct pw_port *input_port, /**< input port */ - struct spa_format *format_filter, /**< optional filter */ - struct pw_properties *properties, /**< extra properties */ - char **error /**< result error message or NULL */); - -/** Unlink a port \memberof pw_port */ -int pw_port_unlink(struct pw_port *port, struct pw_link *link); - /** Set a format on a port \memberof pw_port */ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format); diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h new file mode 100644 index 000000000..6d12714c4 --- /dev/null +++ b/spa/include/spa/graph-scheduler1.h @@ -0,0 +1,137 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +struct spa_graph_scheduler { + struct spa_graph *graph; + struct spa_list pending; + struct spa_graph_node *node; +}; + +static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, + struct spa_graph *graph) +{ + sched->graph = graph; + spa_list_init(&sched->pending); + sched->node = NULL; +} + +static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +{ + int res; + struct spa_node *n = node->user_data; + + if (node->action == SPA_GRAPH_ACTION_IN) + res = spa_node_process_input(n); + else if (node->action == SPA_GRAPH_ACTION_OUT) + res = spa_node_process_output(n); + else + res = SPA_RESULT_ERROR; + + return res; +} + +static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched) +{ + bool res; + struct spa_graph *graph = sched->graph; + struct spa_graph_port *p; + struct spa_graph_node *n; + + res = !spa_list_is_empty(&graph->ready); + if (res) { + n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link); + + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + + debug("node %p action %d state %d\n", n, n->action, n->state); + + switch (n->action) { + case SPA_GRAPH_ACTION_IN: + case SPA_GRAPH_ACTION_OUT: + n->state = n->schedule(n); + debug("node %p scheduled action %d state %d\n", n, n->action, n->state); + if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node) + break; + n->action = SPA_GRAPH_ACTION_CHECK; + spa_list_insert(graph->ready.prev, &n->ready_link); + break; + + case SPA_GRAPH_ACTION_CHECK: + if (n->state == SPA_RESULT_NEED_BUFFER) { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_node *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + if (pn != sched->node + || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { + pn->action = SPA_GRAPH_ACTION_OUT; + spa_list_insert(graph->ready.prev, + &pn->ready_link); + } + } else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; + } + } else if (n->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check(graph, p->peer); + } + break; + + default: + break; + } + res = !spa_list_is_empty(&graph->ready); + } + return res; +} + +static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + debug("node %p start pull\n", node); + node->action = SPA_GRAPH_ACTION_CHECK; + node->state = SPA_RESULT_NEED_BUFFER; + sched->node = node; + if (node->ready_link.next == NULL) + spa_list_insert(sched->graph->ready.prev, &node->ready_link); +} + +static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + debug("node %p start push\n", node); + node->action = SPA_GRAPH_ACTION_OUT; + sched->node = node; + if (node->ready_link.next == NULL) + spa_list_insert(sched->graph->ready.prev, &node->ready_link); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph-scheduler2.h b/spa/include/spa/graph-scheduler2.h new file mode 100644 index 000000000..4615bc507 --- /dev/null +++ b/spa/include/spa/graph-scheduler2.h @@ -0,0 +1,153 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +{ + int res; + struct spa_node *n = node->user_data; + + if (node->action == SPA_GRAPH_ACTION_IN) + res = spa_node_process_input(n); + else if (node->action == SPA_GRAPH_ACTION_OUT) + res = spa_node_process_output(n); + else + res = SPA_RESULT_ERROR; + + return res; +} + +static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph) +{ + bool empty; + struct spa_graph_port *p; + struct spa_graph_node *n; + int iter = 1; + uint32_t action; + +next: + empty = spa_list_is_empty(&graph->ready); + if (empty && !spa_list_is_empty(&graph->pending)) { + debug("copy pending\n"); + spa_list_insert_list(&graph->ready, &graph->pending); + spa_list_init(&graph->pending); + empty = false; + } + if (iter-- == 0 || empty) + return !empty; + + n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link); + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + + action = n->action; + + debug("node %p action %d, state %d\n", n, action, n->state); + + switch (action) { + case SPA_GRAPH_ACTION_IN: + case SPA_GRAPH_ACTION_OUT: + case SPA_GRAPH_ACTION_END: + if (action == SPA_GRAPH_ACTION_END) + n->action = SPA_GRAPH_ACTION_OUT; + + n->state = n->schedule(n); + debug("node %p schedule %d res %d\n", n, action, n->state); + + if (action == SPA_GRAPH_ACTION_IN && n == graph->node) + break; + + if (action != SPA_GRAPH_ACTION_END) { + debug("node %p add ready for CHECK\n", n); + n->action = SPA_GRAPH_ACTION_CHECK; + spa_list_insert(graph->ready.prev, &n->ready_link); + } + else { + spa_graph_node_update(graph, n); + } + break; + + case SPA_GRAPH_ACTION_CHECK: + if (n->state == SPA_RESULT_NEED_BUFFER) { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_node *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + if (pn != graph->node + || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { + pn->action = SPA_GRAPH_ACTION_OUT; + debug("node %p add ready OUT\n", n); + spa_list_insert(graph->ready.prev, + &pn->ready_link); + } + } else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; + } + } + else if (n->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check(graph, p->peer); + + debug("node %p add pending\n", n); + n->action = SPA_GRAPH_ACTION_END; + spa_list_insert(&graph->pending, &n->ready_link); + } + else if (n->state == SPA_RESULT_OK) { + spa_graph_node_update(graph, n); + } + break; + + default: + break; + } + goto next; +} + +static inline void spa_graph_scheduler_pull(struct spa_graph *graph, struct spa_graph_node *node) +{ + node->action = SPA_GRAPH_ACTION_CHECK; + node->state = SPA_RESULT_NEED_BUFFER; + graph->node = node; + debug("node %p start pull\n", node); + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); +} + +static inline void spa_graph_scheduler_push(struct spa_graph *graph, struct spa_graph_node *node) +{ + node->action = SPA_GRAPH_ACTION_OUT; + graph->node = node; + debug("node %p start push\n", node); + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h new file mode 100644 index 000000000..405c51bc8 --- /dev/null +++ b/spa/include/spa/graph-scheduler3.h @@ -0,0 +1,173 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +struct spa_graph_scheduler { + struct spa_graph *graph; + struct spa_list pending; + struct spa_graph_node *node; +}; + +static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, + struct spa_graph *graph) +{ + sched->graph = graph; + spa_list_init(&sched->pending); + sched->node = NULL; +} + +static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +{ + int res; + struct spa_node *n = node->user_data; + + if (node->action == SPA_GRAPH_ACTION_IN) + res = spa_node_process_input(n); + else if (node->action == SPA_GRAPH_ACTION_OUT) + res = spa_node_process_output(n); + else + res = SPA_RESULT_ERROR; + + return res; +} + +static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + struct spa_graph_port *p; + struct spa_graph_node *n, *t; + struct spa_list ready; + + debug("node %p start pull\n", node); + + spa_list_init(&ready); + + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_port *pport = p->peer; + struct spa_graph_node *pnode = pport->node; + debug("node %p peer %p io %d\n", node, pnode, pport->io->status); + if (pport->io->status == SPA_RESULT_NEED_BUFFER) { + spa_list_insert(ready.prev, &pnode->ready_link); + } + else if (pport->io->status == SPA_RESULT_OK && !(pnode->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + + spa_list_for_each_safe(n, t, &ready, ready_link) { + n->action = SPA_GRAPH_ACTION_OUT; + n->state = n->schedule(n); + debug("peer %p scheduled %d %d\n", n, n->action, n->state); + if (n->state == SPA_RESULT_NEED_BUFFER) + spa_graph_scheduler_pull(sched, n); + else { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { + if (p->io->status == SPA_RESULT_HAVE_BUFFER) + node->ready_in++; + } + } + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + } + + debug("node %p %d %d\n", node, node->ready_in, node->required_in); + + if (node->required_in > 0 && node->ready_in == node->required_in) { + node->action = SPA_GRAPH_ACTION_IN; + node->state = node->schedule(node); + debug("node %p scheduled %d %d\n", node, node->action, node->state); + if (node->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + if (p->io->status == SPA_RESULT_HAVE_BUFFER) + p->peer->node->ready_in++; + } + } + } +} + +static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched) +{ + return false; +} + + +static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + struct spa_graph_port *p; + struct spa_graph_node *n, *t; + struct spa_list ready; + + debug("node %p start push\n", node); + + spa_list_init(&ready); + + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + struct spa_graph_port *pport = p->peer; + struct spa_graph_node *pnode = pport->node; + if (pport->io->status == SPA_RESULT_HAVE_BUFFER) + pnode->ready_in++; + + debug("node %p peer %p io %d %d %d\n", node, pnode, pport->io->status, + pnode->ready_in, pnode->required_in); + + if (pnode->required_in > 0 && pnode->ready_in == pnode->required_in) + spa_list_insert(ready.prev, &pnode->ready_link); + } + + spa_list_for_each_safe(n, t, &ready, ready_link) { + n->action = SPA_GRAPH_ACTION_IN; + n->state = n->schedule(n); + debug("peer %p scheduled %d %d\n", n, n->action, n->state); + if (n->state == SPA_RESULT_HAVE_BUFFER) + spa_graph_scheduler_push(sched, n); + else { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + } + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + } + + node->action = SPA_GRAPH_ACTION_OUT; + node->state = node->schedule(node); + debug("node %p scheduled %d %d\n", node, node->action, node->state); + if (node->state == SPA_RESULT_NEED_BUFFER) { + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + p->peer->node->ready_in++; + } + } +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index b65556618..4a8e9749f 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -26,6 +26,13 @@ extern "C" { #include #include +#include + +#if 0 +#define debug(...) printf(__VA_ARGS__) +#else +#define debug(...) +#endif struct spa_graph; struct spa_graph_node; @@ -34,7 +41,6 @@ struct spa_graph_port; struct spa_graph { struct spa_list nodes; struct spa_list ready; - struct spa_graph_node *node; }; typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node); @@ -73,20 +79,6 @@ static inline void spa_graph_init(struct spa_graph *graph) spa_list_init(&graph->ready); } -static inline int spa_graph_node_schedule_default(struct spa_graph_node *node) -{ - int res; - struct spa_node *n = node->user_data; - - if (node->action == SPA_GRAPH_ACTION_IN) - res = spa_node_process_input(n); - else if (node->action == SPA_GRAPH_ACTION_OUT) - res = spa_node_process_output(n); - else - res = SPA_RESULT_ERROR; - return res; -} - static inline void spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node, spa_graph_node_func_t schedule, void *user_data) @@ -94,12 +86,14 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node, spa_list_init(&node->ports[SPA_DIRECTION_INPUT]); spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]); node->flags = 0; - node->state = SPA_RESULT_OK; + node->state = SPA_RESULT_NEED_BUFFER; node->action = SPA_GRAPH_ACTION_OUT; node->schedule = schedule; node->user_data = user_data; + node->ready_link.next = NULL; spa_list_insert(graph->nodes.prev, &node->link); node->max_in = node->required_in = node->ready_in = 0; + debug("node %p add\n", node); } static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port) @@ -109,6 +103,8 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap if (port->io->status == SPA_RESULT_HAVE_BUFFER) node->ready_in++; + debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); + if (node->required_in > 0 && node->ready_in == node->required_in) { node->action = SPA_GRAPH_ACTION_IN; if (node->ready_link.next == NULL) @@ -119,6 +115,17 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap } } +static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) { + struct spa_graph_port *p; + + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + debug("node %p update %d ready\n", node, node->ready_in); +} + static inline void spa_graph_port_add(struct spa_graph *graph, struct spa_graph_node *node, @@ -128,13 +135,13 @@ spa_graph_port_add(struct spa_graph *graph, uint32_t flags, struct spa_port_io *io) { + debug("port %p add %d to node %p \n", port, direction, node); port->node = node; port->direction = direction; port->port_id = port_id; port->flags = flags; port->io = io; - port->peer = NULL; - spa_list_insert(node->ports[port->direction].prev, &port->link); + spa_list_insert(node->ports[direction].prev, &port->link); node->max_in++; if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT) node->required_in++; @@ -143,96 +150,36 @@ spa_graph_port_add(struct spa_graph *graph, static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node) { + debug("node %p remove\n", node); spa_list_remove(&node->link); + if (node->ready_link.next) + spa_list_remove(&node->ready_link); } static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_graph_port *port) { + debug("port %p remove\n", port); spa_list_remove(&port->link); + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT) + port->node->required_in--; } static inline void spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct spa_graph_port *in) { + debug("port %p link to %p \n", out, in); out->peer = in; in->peer = out; } static inline void -spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *out, - struct spa_graph_port *in) +spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *port) { - out->peer = NULL; - in->peer = NULL; -} - -static inline bool spa_graph_node_iterate(struct spa_graph *graph) -{ - bool res; - struct spa_graph_port *p; - - res = !spa_list_is_empty(&graph->ready); - if (res) { - struct spa_graph_node *n = - spa_list_first(&graph->ready, struct spa_graph_node, ready_link); - - spa_list_remove(&n->ready_link); - n->ready_link.next = NULL; - - switch (n->action) { - case SPA_GRAPH_ACTION_IN: - case SPA_GRAPH_ACTION_OUT: - n->state = n->schedule(n); - if (n->action == SPA_GRAPH_ACTION_IN && n == graph->node) - break; - n->action = SPA_GRAPH_ACTION_CHECK; - spa_list_insert(graph->ready.prev, &n->ready_link); - break; - - case SPA_GRAPH_ACTION_CHECK: - if (n->state == SPA_RESULT_NEED_BUFFER) { - n->ready_in = 0; - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - struct spa_graph_node *pn = p->peer->node; - if (p->io->status == SPA_RESULT_NEED_BUFFER) { - if (pn != graph->node - || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { - pn->action = SPA_GRAPH_ACTION_OUT; - spa_list_insert(graph->ready.prev, - &pn->ready_link); - } - } else if (p->io->status == SPA_RESULT_OK) - n->ready_in++; - } - } else if (n->state == SPA_RESULT_HAVE_BUFFER) { - spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) - spa_graph_port_check(graph, p->peer); - } - break; - - default: - break; - } - res = !spa_list_is_empty(&graph->ready); + debug("port %p unlink from %p \n", port, port->peer); + if (port->peer) { + port->peer->peer = NULL; + port->peer = NULL; } - return res; -} - -static inline void spa_graph_node_pull(struct spa_graph *graph, struct spa_graph_node *node) -{ - node->action = SPA_GRAPH_ACTION_CHECK; - node->state = SPA_RESULT_NEED_BUFFER; - graph->node = node; - if (node->ready_link.next == NULL) - spa_list_insert(graph->ready.prev, &node->ready_link); -} - -static inline void spa_graph_node_push(struct spa_graph *graph, struct spa_graph_node *node) -{ - node->action = SPA_GRAPH_ACTION_OUT; - graph->node = node; - if (node->ready_link.next == NULL) - spa_list_insert(graph->ready.prev, &node->ready_link); } #ifdef __cplusplus diff --git a/spa/include/spa/list.h b/spa/include/spa/list.h index 4e6c70239..492bb4847 100644 --- a/spa/include/spa/list.h +++ b/spa/include/spa/list.h @@ -45,6 +45,14 @@ static inline void spa_list_insert(struct spa_list *list, struct spa_list *elem) elem->next->prev = elem; } +static inline void spa_list_insert_list(struct spa_list *list, struct spa_list *other) +{ + other->next->prev = list; + other->prev->next = list->next; + list->next->prev = other->prev; + list->next = other->next; +} + static inline void spa_list_remove(struct spa_list *elem) { elem->prev->next = elem->next; diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index 404ea0eb4..f215da860 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -36,6 +36,7 @@ #include #include #include +#include static SPA_TYPE_MAP_IMPL(default_map, 4096); static SPA_LOG_IMPL(default_log); @@ -97,6 +98,7 @@ struct data { uint32_t n_support; struct spa_graph graph; + struct spa_graph_scheduler sched; struct spa_graph_node source_node; struct spa_graph_port source_out; struct spa_graph_port volume_in; @@ -229,9 +231,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data) { struct data *data = user_data; - spa_graph_node_pull(&data->graph, &data->sink_node); + spa_graph_scheduler_pull(&data->sched, &data->sink_node); - while (spa_graph_node_iterate(&data->graph)); + while (spa_graph_scheduler_iterate(&data->sched)); } static void @@ -338,12 +340,12 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]); spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]); - spa_graph_node_add(&data->graph, &data->source_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->source_node, spa_graph_scheduler_default, data->source); spa_graph_port_add(&data->graph, &data->source_node, &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]); - spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_scheduler_default, data->volume); spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]); @@ -353,7 +355,7 @@ static int make_nodes(struct data *data, const char *device) spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, data->sink); spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]); @@ -528,6 +530,7 @@ int main(int argc, char *argv[]) spa_graph_init(&data.graph); + spa_graph_scheduler_init(&data.sched, &data.graph); data.map = &default_map.map; data.log = &default_log.log; diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 99b614f0d..136ec4a64 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -99,6 +100,7 @@ struct data { uint32_t n_support; struct spa_graph graph; + struct spa_graph_scheduler sched; struct spa_graph_node source1_node; struct spa_graph_port source1_out; struct spa_graph_node source2_node; @@ -240,8 +242,8 @@ static void on_sink_need_input(struct spa_node *node, void *user_data) { struct data *data = user_data; #ifdef USE_GRAPH - spa_graph_node_pull(&data->graph, &data->sink_node); - while (spa_graph_node_iterate(&data->graph)); + spa_graph_scheduler_pull(&data->sched, &data->sink_node); + while (spa_graph_scheduler_iterate(&data->sched)); #else int res; @@ -416,17 +418,17 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); #ifdef USE_GRAPH - spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_scheduler_default, data->source1); spa_graph_port_add(&data->graph, &data->source1_node, &data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]); - spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_scheduler_default, data->source2); spa_graph_port_add(&data->graph, &data->source2_node, &data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]); - spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_scheduler_default, data->mix); spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[0], SPA_DIRECTION_INPUT, data->mix_ports[0], 0, &data->source1_mix_io[0]); @@ -439,7 +441,7 @@ static int make_nodes(struct data *data, const char *device) spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, data->sink); spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]); @@ -646,6 +648,7 @@ int main(int argc, char *argv[]) data.data_loop.invoke = do_invoke; spa_graph_init(&data.graph); + spa_graph_scheduler_init(&data.sched, &data.graph); if ((str = getenv("SPA_DEBUG"))) data.log->level = atoi(str); diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index eb22eff73..e2b0ee022 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -34,6 +34,7 @@ #include #include #include +#include #define MODE_SYNC_PUSH (1<<0) #define MODE_SYNC_PULL (1<<1) @@ -102,6 +103,7 @@ struct data { int iterations; struct spa_graph graph; + struct spa_graph_scheduler sched; struct spa_graph_node source_node; struct spa_graph_port source_out; struct spa_graph_port sink_in; @@ -223,8 +225,8 @@ static void on_sink_pull(struct data *data) spa_node_process_output(data->source); spa_node_process_input(data->sink); } else { - spa_graph_node_pull(&data->graph, &data->sink_node); - while (spa_graph_node_iterate(&data->graph)); + spa_graph_scheduler_pull(&data->sched, &data->sink_node); + while (spa_graph_scheduler_iterate(&data->sched)); } } @@ -235,8 +237,8 @@ static void on_source_push(struct data *data) spa_node_process_output(data->source); spa_node_process_input(data->sink); } else { - spa_graph_node_push(&data->graph, &data->source_node); - while (spa_graph_node_iterate(&data->graph)); + spa_graph_scheduler_push(&data->sched, &data->source_node); + while (spa_graph_scheduler_iterate(&data->sched)); } } @@ -365,12 +367,12 @@ static int make_nodes(struct data *data) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]); spa_graph_node_add(&data->graph, &data->source_node, - spa_graph_node_schedule_default, data->source); + spa_graph_scheduler_default, data->source); data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; spa_graph_port_add(&data->graph, &data->source_node, &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, data->sink); data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; spa_graph_port_add(&data->graph, &data->sink_node, @@ -526,6 +528,7 @@ int main(int argc, char *argv[]) const char *str; spa_graph_init(&data.graph); + spa_graph_scheduler_init(&data.sched, &data.graph); data.map = &default_map.map; data.log = &default_log.log;