diff --git a/pipewire/modules/module-autolink.c b/pipewire/modules/module-autolink.c index f4cb6132e..02746a2d7 100644 --- a/pipewire/modules/module-autolink.c +++ b/pipewire/modules/module-autolink.c @@ -159,9 +159,9 @@ static void try_link_port(struct pw_node *node, struct pw_port *port, struct nod goto error; if (port->direction == PW_DIRECTION_OUTPUT) - link = pw_port_link(port, target, NULL, NULL, &error); + link = pw_link_new(impl->core, port, target, NULL, NULL, &error); else - link = pw_port_link(target, port, NULL, NULL, &error); + link = pw_link_new(impl->core, target, port, NULL, NULL, &error); if (link == NULL) goto error; diff --git a/pipewire/modules/module-client-node/client-node.c b/pipewire/modules/module-client-node/client-node.c index f049054f8..2d8f4da33 100644 --- a/pipewire/modules/module-client-node/client-node.c +++ b/pipewire/modules/module-client-node/client-node.c @@ -782,7 +782,10 @@ static int spa_proxy_node_process_input(struct spa_node *node) } send_have_output(this); - return SPA_RESULT_OK; + if (this->callbacks->need_input) + return SPA_RESULT_OK; + else + return SPA_RESULT_NEED_BUFFER; } static int spa_proxy_node_process_output(struct spa_node *node) diff --git a/pipewire/modules/module-mixer.c b/pipewire/modules/module-mixer.c index 716d04916..ff133f846 100644 --- a/pipewire/modules/module-mixer.c +++ b/pipewire/modules/module-mixer.c @@ -157,7 +157,7 @@ static struct impl *module_new(struct pw_core *core, struct pw_properties *prope if (op == NULL) continue; - pw_port_link(op, ip, NULL, NULL, &error); + pw_link_new(core, op, ip, NULL, NULL, &error); } return impl; } diff --git a/pipewire/server/core.c b/pipewire/server/core.c index 2aed3bafc..9b8c981a8 100644 --- a/pipewire/server/core.c +++ b/pipewire/server/core.c @@ -291,6 +291,9 @@ struct pw_core *pw_core_new(struct pw_main_loop *main_loop, struct pw_properties pw_type_init(&this->type); pw_map_init(&this->objects, 128, 32); + spa_graph_init(&this->rt.graph); + spa_graph_scheduler_init(&this->rt.sched, &this->rt.graph); + spa_debug_set_type_map(this->type.map); impl->support[0] = SPA_SUPPORT_INIT(SPA_TYPE__TypeMap, this->type.map); diff --git a/pipewire/server/core.h b/pipewire/server/core.h index cd9a076cb..01950ca9b 100644 --- a/pipewire/server/core.h +++ b/pipewire/server/core.h @@ -25,6 +25,7 @@ extern "C" { #endif #include +#include struct pw_global; @@ -180,6 +181,11 @@ struct pw_core { /** Emited when a global is removed */ PW_SIGNAL(global_removed, (struct pw_listener *listener, struct pw_core *core, struct pw_global *global)); + + struct { + struct spa_graph_scheduler sched; + struct spa_graph graph; + } rt; }; struct pw_core * diff --git a/pipewire/server/link.c b/pipewire/server/link.c index 5f62d7c57..3f0855705 100644 --- a/pipewire/server/link.c +++ b/pipewire/server/link.c @@ -758,21 +758,59 @@ on_output_async_complete_notify(struct pw_listener *listener, pw_work_queue_complete(impl->work, node, seq, res); } +static int +do_remove_input(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + struct pw_port *port = ((struct pw_port **) data)[0]; + spa_graph_port_remove(port->rt.graph, &this->rt.in_port); + return SPA_RESULT_OK; +} + +static void input_remove(struct pw_link *this, struct pw_port *port) +{ + struct impl *impl = (struct impl *) this; + + pw_log_debug("link %p: remove input port %p", this, port); + pw_signal_remove(&impl->input_port_destroy); + pw_signal_remove(&impl->input_async_complete); + pw_loop_invoke(port->node->data_loop->loop, + do_remove_input, 1, sizeof(struct pw_port*), &port, true, this); +} + +static int +do_remove_output(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + struct pw_port *port = ((struct pw_port **) data)[0]; + spa_graph_port_remove(port->rt.graph, &this->rt.out_port); + return SPA_RESULT_OK; +} + +static void output_remove(struct pw_link *this, struct pw_port *port) +{ + struct impl *impl = (struct impl *) this; + + pw_log_debug("link %p: remove output port %p", this, port); + pw_signal_remove(&impl->output_port_destroy); + pw_signal_remove(&impl->output_async_complete); + pw_loop_invoke(port->node->data_loop->loop, + do_remove_output, 1, sizeof(struct pw_port*), &port, true, this); +} + static void on_port_destroy(struct pw_link *this, struct pw_port *port) { struct impl *impl = (struct impl *) this; struct pw_port *other; if (port == this->input) { - pw_log_debug("link %p: input port destroyed %p", this, port); - pw_signal_remove(&impl->input_port_destroy); - pw_signal_remove(&impl->input_async_complete); + input_remove(this, port); this->input = NULL; other = this->output; } else if (port == this->output) { - pw_log_debug("link %p: output port destroyed %p", this, port); - pw_signal_remove(&impl->output_port_destroy); - pw_signal_remove(&impl->output_async_complete); + output_remove(this, port); this->output = NULL; other = this->input; } else @@ -784,6 +822,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port) pw_log_debug("link %p: clear input allocated buffers on port %p", this, other); pw_port_use_buffers(other, NULL, 0); + impl->buffer_owner = NULL; } pw_signal_emit(&this->port_unlinked, this, port); @@ -863,18 +902,54 @@ link_bind_func(struct pw_global *global, struct pw_client *client, uint32_t vers return SPA_RESULT_NO_MEMORY; } +static int +do_add_link(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + struct pw_port *port = ((struct pw_port **) data)[0]; + + if (port->direction == PW_DIRECTION_OUTPUT) { + spa_graph_port_add(port->rt.graph, + &port->rt.mix_node, + &this->rt.out_port, + PW_DIRECTION_OUTPUT, + this->rt.out_port.port_id, + 0, + &this->io); + } else { + spa_graph_port_add(port->rt.graph, + &port->rt.mix_node, + &this->rt.in_port, + PW_DIRECTION_INPUT, + this->rt.in_port.port_id, + 0, + &this->io); + } + + return SPA_RESULT_OK; +} + struct pw_link *pw_link_new(struct pw_core *core, struct pw_port *output, struct pw_port *input, struct spa_format *format_filter, - struct pw_properties *properties) + struct pw_properties *properties, + char **error) { struct impl *impl; struct pw_link *this; + struct pw_node *input_node, *output_node; + + if (output == input) + goto same_ports; + + if (pw_link_find(output, input)) + goto link_exists; impl = calloc(1, sizeof(struct impl)); if (impl == NULL) - return NULL; + goto no_mem; this = &impl->this; pw_log_debug("link %p: new", this); @@ -888,6 +963,9 @@ struct pw_link *pw_link_new(struct pw_core *core, this->input = input; this->output = output; + input_node = input->node; + output_node = output->node; + spa_list_init(&this->resource_list); pw_signal_init(&this->port_unlinked); pw_signal_init(&this->state_changed); @@ -895,21 +973,33 @@ struct pw_link *pw_link_new(struct pw_core *core, impl->format_filter = format_filter; - pw_signal_add(&this->input->destroy_signal, + pw_signal_add(&input->destroy_signal, &impl->input_port_destroy, on_input_port_destroy); - pw_signal_add(&this->input->node->async_complete, + pw_signal_add(&input_node->async_complete, &impl->input_async_complete, on_input_async_complete_notify); - pw_signal_add(&this->output->destroy_signal, + pw_signal_add(&output->destroy_signal, &impl->output_port_destroy, on_output_port_destroy); - pw_signal_add(&this->output->node->async_complete, + pw_signal_add(&output_node->async_complete, &impl->output_async_complete, on_output_async_complete_notify); pw_log_debug("link %p: constructed %p:%d -> %p:%d", impl, - this->output->node, this->output->port_id, - this->input->node, this->input->port_id); + output_node, output->port_id, input_node, input->port_id); + + input_node->live = output_node->live; + if (output_node->clock) + input_node->clock = output_node->clock; + + pw_log_debug("link %p: output node %p clock %p, live %d", this, output_node, output_node->clock, + output_node->live); + + spa_list_insert(output->links.prev, &this->output_link); + spa_list_insert(input->links.prev, &this->input_link); + + output_node->n_used_output_links++; + input_node->n_used_input_links++; spa_list_insert(core->link_list.prev, &this->link); @@ -922,7 +1012,26 @@ struct pw_link *pw_link_new(struct pw_core *core, this->info.input_port_id = input ? input->port_id : -1; this->info.format = NULL; + spa_graph_port_link(output_node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port); + + pw_loop_invoke(output_node->data_loop->loop, + do_add_link, + SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this); + pw_loop_invoke(input_node->data_loop->loop, + do_add_link, + SPA_ID_INVALID, sizeof(struct pw_port *), &input, false, this); + return this; + + same_ports: + asprintf(error, "can't link the same ports"); + return NULL; + link_exists: + asprintf(error, "link already exists"); + return NULL; + no_mem: + asprintf(error, "no memory"); + return NULL; } static void clear_port_buffers(struct pw_link *link, struct pw_port *port) @@ -933,22 +1042,6 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port) pw_port_use_buffers(port, NULL, 0); } -static int -do_link_remove(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct pw_link *this = user_data; - - if (this->rt.input) { - spa_list_remove(&this->rt.input_link); - this->rt.input = NULL; - } - if (this->rt.output) { - spa_list_remove(&this->rt.output_link); - this->rt.output = NULL; - } - return SPA_RESULT_OK; -} void pw_link_destroy(struct pw_link *link) { @@ -965,21 +1058,8 @@ void pw_link_destroy(struct pw_link *link) pw_resource_destroy(resource); if (link->input) { - pw_signal_remove(&impl->input_port_destroy); - pw_signal_remove(&impl->input_async_complete); + input_remove(link, link->input); - pw_loop_invoke(link->input->node->data_loop->loop, - do_link_remove, 1, 0, NULL, true, link); - } - if (link->output) { - pw_signal_remove(&impl->output_port_destroy); - pw_signal_remove(&impl->output_async_complete); - - pw_loop_invoke(link->output->node->data_loop->loop, - do_link_remove, 2, 0, NULL, true, link); - } - - if (link->input) { spa_list_remove(&link->input_link); link->input->node->n_used_input_links--; @@ -993,6 +1073,8 @@ void pw_link_destroy(struct pw_link *link) link->input = NULL; } if (link->output) { + output_remove(link, link->output); + spa_list_remove(&link->output_link); link->output->node->n_used_output_links--; @@ -1016,3 +1098,14 @@ void pw_link_destroy(struct pw_link *link) free(impl); } + +struct pw_link *pw_link_find(struct pw_port *output_port, struct pw_port *input_port) +{ + struct pw_link *pl; + + spa_list_for_each(pl, &output_port->links, output_link) { + if (pl->input == input_port) + return pl; + } + return NULL; +} diff --git a/pipewire/server/link.h b/pipewire/server/link.h index d9be699d2..f55d22580 100644 --- a/pipewire/server/link.h +++ b/pipewire/server/link.h @@ -68,19 +68,20 @@ struct pw_link { struct spa_list resource_list; /**< list of bound resources */ + struct spa_port_io io; /**< link io area */ + struct pw_port *output; /**< output port */ struct spa_list output_link; /**< link in output port links */ struct pw_port *input; /**< input port */ struct spa_list input_link; /**< link in input port links */ + /** Emited when the port is unlinked */ PW_SIGNAL(port_unlinked, (struct pw_listener *listener, struct pw_link *link, struct pw_port *port)); struct { - struct pw_port *input; - struct pw_port *output; - struct spa_list input_link; - struct spa_list output_link; + struct spa_graph_port out_port; + struct spa_graph_port in_port; } rt; }; @@ -92,11 +93,15 @@ pw_link_new(struct pw_core *core, /**< the core object */ struct pw_port *output, /**< an output port */ struct pw_port *input, /**< an input port */ struct spa_format *format_filter, /**< an optional format filter */ - struct pw_properties *properties /**< extra properties */); + struct pw_properties *properties /**< extra properties */, + char **error /**< error string */); /** Destroy a link \memberof pw_link */ void pw_link_destroy(struct pw_link *link); +/** Find the link between 2 ports \memberof pw_link */ +struct pw_link * pw_link_find(struct pw_port *output, struct pw_port *input); + /** Activate a link \memberof pw_link * Starts the negotiation of formats and buffers on \a link and then * starts data streaming */ diff --git a/pipewire/server/node.c b/pipewire/server/node.c index f69cd8f8f..a2daafa9e 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -21,6 +21,8 @@ #include #include +#include + #include "pipewire/client/pipewire.h" #include "pipewire/client/interfaces.h" @@ -237,63 +239,6 @@ static void send_clock_update(struct pw_node *this) pw_log_debug("got error %d", res); } -static int do_pull(struct pw_node *this) -{ - int res = SPA_RESULT_OK; - struct pw_port *inport; - bool have_output = false; - - spa_list_for_each(inport, &this->input_ports, link) { - struct pw_link *link; - struct pw_port *outport; - struct spa_port_io *pi; - struct spa_port_io *po; - - pi = &inport->io; - pw_log_trace("node %p: need input port %d, %d %d", this, - inport->port_id, pi->buffer_id, pi->status); - - if (pi->status != SPA_RESULT_NEED_BUFFER) - continue; - - spa_list_for_each(link, &inport->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - outport = link->rt.output; - po = &outport->io; - - /* pull */ - *po = *pi; - pi->buffer_id = SPA_ID_INVALID; - - pw_log_trace("node %p: process output %p %d", outport->node, po, - po->buffer_id); - - res = spa_node_process_output(outport->node->node); - - if (res == SPA_RESULT_NEED_BUFFER) { - res = do_pull(outport->node); - pw_log_trace("node %p: pull return %d", outport->node, res); - } - if (res == SPA_RESULT_HAVE_BUFFER) { - *pi = *po; - pw_log_trace("node %p: have output %d %d", this, pi->status, - pi->buffer_id); - have_output = true; - } else if (res < 0) { - pw_log_warn("node %p: got process output %d", outport->node, res); - } - - } - } - if (have_output) { - pw_log_trace("node %p: doing process input", this); - res = spa_node_process_input(this->node); - } - return res; -} - static void on_node_done(struct spa_node *node, int seq, int res, void *user_data) { struct impl *impl = user_data; @@ -319,49 +264,24 @@ static void on_node_need_input(struct spa_node *node, void *user_data) struct impl *impl = user_data; struct pw_node *this = &impl->this; - do_pull(this); + spa_graph_scheduler_pull(this->rt.sched, &this->rt.node); + while (spa_graph_scheduler_iterate(this->rt.sched)); } static void on_node_have_output(struct spa_node *node, void *user_data) { struct impl *impl = user_data; struct pw_node *this = &impl->this; - int res; - struct pw_port *outport; - spa_list_for_each(outport, &this->output_ports, link) { - struct pw_link *link; - struct spa_port_io *po; - - po = &outport->io; - if (po->buffer_id == SPA_ID_INVALID) - continue; - - pw_log_trace("node %p: have output %d", this, po->buffer_id); - - spa_list_for_each(link, &outport->rt.links, rt.output_link) { - struct pw_port *inport; - - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - inport = link->rt.input; - inport->io = *po; - - pw_log_trace("node %p: do process input %d", this, po->buffer_id); - - if ((res = spa_node_process_input(inport->node->node)) < 0) - pw_log_warn("node %p: got process input %d", inport->node, res); - - } - po->status = SPA_RESULT_NEED_BUFFER; - } - res = spa_node_process_output(this->node); + spa_graph_scheduler_push(this->rt.sched, &this->rt.node); + while (spa_graph_scheduler_iterate(this->rt.sched)); } static void on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data) { + +#if 0 struct impl *impl = user_data; struct pw_node *this = &impl->this; struct pw_port *inport; @@ -380,6 +300,7 @@ on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id outport->io.buffer_id = buffer_id; } } +#endif } static void node_unbind_func(void *data) @@ -476,11 +397,17 @@ static void init_complete(struct pw_node *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + spa_graph_node_add(this->rt.sched->graph, + &this->rt.node, + spa_graph_scheduler_default, + this->node); + update_port_ids(this); pw_log_debug("node %p: init completed", this); impl->async_init = false; spa_list_insert(this->core->node_list.prev, &this->link); + pw_core_add_global(this->core, this->owner, this->core->type.node, 0, this, node_bind_func, &this->global); @@ -529,6 +456,8 @@ struct pw_node *pw_node_new(struct pw_core *core, this->clock = clock; this->data_loop = core->data_loop; + this->rt.sched = &core->rt.sched; + spa_list_init(&this->resource_list); if (spa_node_set_callbacks(this->node, &node_callbacks, impl) < 0) @@ -585,26 +514,11 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_node *this = user_data; - struct pw_port *port, *tmp; pause_node(this); - spa_list_for_each_safe(port, tmp, &this->input_ports, link) { - struct pw_link *link, *tlink; - spa_list_for_each_safe(link, tlink, &port->rt.links, rt.input_link) { - pw_port_pause_rt(link->rt.input); - spa_list_remove(&link->rt.input_link); - link->rt.input = NULL; - } - } - spa_list_for_each_safe(port, tmp, &this->output_ports, link) { - struct pw_link *link, *tlink; - spa_list_for_each_safe(link, tlink, &port->rt.links, rt.output_link) { - pw_port_pause_rt(link->rt.output); - spa_list_remove(&link->rt.output_link); - link->rt.output = NULL; - } - } + spa_graph_node_remove(this->rt.sched->graph, &this->rt.node); + return SPA_RESULT_OK; } diff --git a/pipewire/server/node.h b/pipewire/server/node.h index c8ca05526..88b916e37 100644 --- a/pipewire/server/node.h +++ b/pipewire/server/node.h @@ -106,6 +106,12 @@ struct pw_node { struct pw_node *node, uint32_t seq, int res)); struct pw_data_loop *data_loop; /**< the data loop for this node */ + + struct { + struct spa_graph_scheduler *sched; + struct spa_graph_node node; + } rt; + }; /** Create a new node \memberof pw_node */ diff --git a/pipewire/server/port.c b/pipewire/server/port.c index cc7a23208..bfa0b3e67 100644 --- a/pipewire/server/port.c +++ b/pipewire/server/port.c @@ -33,6 +33,96 @@ struct impl { }; /** \endcond */ + +static int schedule_tee(struct spa_graph_node *node) +{ + int res; + struct pw_port *this = node->user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; + + if (node->action == SPA_GRAPH_ACTION_IN) { + if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) { + io->status = SPA_RESULT_NEED_BUFFER; + res = SPA_RESULT_NEED_BUFFER; + } + else { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) + *p->io = *io; + io->status = SPA_RESULT_OK; + io->buffer_id = SPA_ID_INVALID; + res = SPA_RESULT_HAVE_BUFFER; + } + } + else if (node->action == SPA_GRAPH_ACTION_OUT) { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) + *io = *p->io; + io->status = SPA_RESULT_NEED_BUFFER; + res = SPA_RESULT_NEED_BUFFER; + } + else + res = SPA_RESULT_ERROR; + + return res; +} + +static int schedule_mix(struct spa_graph_node *node) +{ + int res; + struct pw_port *this = node->user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; + + if (node->action == SPA_GRAPH_ACTION_IN) { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + *io = *p->io; + p->io->status = SPA_RESULT_OK; + p->io->buffer_id = SPA_ID_INVALID; + } + res = SPA_RESULT_HAVE_BUFFER; + } + else if (node->action == SPA_GRAPH_ACTION_OUT) { + io->status = SPA_RESULT_NEED_BUFFER; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) + *p->io = *io; + res = SPA_RESULT_NEED_BUFFER; + } + else + res = SPA_RESULT_ERROR; + + return res; +} + +static int do_add_port(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_port *this = user_data; + + spa_graph_port_add(this->rt.graph, + &this->node->rt.node, + &this->rt.port, + this->direction, + this->port_id, + 0, + &this->io); + spa_graph_node_add(this->rt.graph, + &this->rt.mix_node, + this->direction == PW_DIRECTION_INPUT ? schedule_mix : schedule_tee, + this); + spa_graph_port_add(this->rt.graph, + &this->rt.mix_node, + &this->rt.mix_port, + pw_direction_reverse(this->direction), + 0, + 0, + &this->io); + spa_graph_port_link(this->rt.graph, + &this->rt.port, + &this->rt.mix_port); + + return SPA_RESULT_OK; +} + struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id) { struct impl *impl; @@ -51,18 +141,46 @@ struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, u this->io.buffer_id = SPA_ID_INVALID; spa_list_init(&this->links); - spa_list_init(&this->rt.links); + pw_signal_init(&this->destroy_signal); + this->rt.graph = node->rt.sched->graph; + + pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, this); + return this; } +static int do_remove_port(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_port *this = user_data; + struct spa_graph_port *p; + + spa_graph_port_unlink(this->rt.graph, + &this->rt.port); + spa_graph_port_remove(this->rt.graph, + &this->rt.port); + + spa_list_for_each(p, &this->rt.mix_node.ports[this->direction], link) + spa_graph_port_remove(this->rt.graph, p); + + spa_graph_port_remove(this->rt.graph, + &this->rt.mix_port); + spa_graph_node_remove(this->rt.graph, + &this->rt.mix_node); + + return SPA_RESULT_OK; +} + void pw_port_destroy(struct pw_port *port) { pw_log_debug("port %p: destroy", port); pw_signal_emit(&port->destroy_signal, port); + pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port); + spa_list_remove(&port->link); free(port); @@ -76,95 +194,6 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state) } } -static int -do_add_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct pw_port *this = user_data; - struct pw_link *link = ((struct pw_link **) data)[0]; - - if (this->direction == PW_DIRECTION_INPUT) { - spa_list_insert(this->rt.links.prev, &link->rt.input_link); - link->rt.input = this; - } else { - spa_list_insert(this->rt.links.prev, &link->rt.output_link); - link->rt.output = this; - } - - return SPA_RESULT_OK; -} - -static struct pw_link *find_link(struct pw_port *output_port, struct pw_port *input_port) -{ - struct pw_link *pl; - - spa_list_for_each(pl, &output_port->links, output_link) { - if (pl->input == input_port) - return pl; - } - return NULL; -} - -struct pw_link *pw_port_link(struct pw_port *output_port, - struct pw_port *input_port, - struct spa_format *format_filter, - struct pw_properties *properties, - char **error) -{ - struct pw_node *input_node, *output_node; - struct pw_link *link; - - output_node = output_port->node; - input_node = input_port->node; - - pw_log_debug("port link %p:%u -> %p:%u", output_node, output_port->port_id, input_node, - input_port->port_id); - - if (output_node == input_node) - goto same_node; - - if (!spa_list_is_empty(&input_port->links)) - goto was_linked; - - link = find_link(output_port, input_port); - - if (link == NULL) { - input_node->live = output_node->live; - if (output_node->clock) - input_node->clock = output_node->clock; - pw_log_debug("node %p: clock %p, live %d", output_node, output_node->clock, - output_node->live); - - link = pw_link_new(output_node->core, - output_port, input_port, format_filter, properties); - if (link == NULL) - goto no_mem; - - spa_list_insert(output_port->links.prev, &link->output_link); - spa_list_insert(input_port->links.prev, &link->input_link); - - output_node->n_used_output_links++; - input_node->n_used_input_links++; - - pw_loop_invoke(output_node->data_loop->loop, - do_add_link, - SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, output_port); - pw_loop_invoke(input_node->data_loop->loop, - do_add_link, - SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, input_port); - } - return link; - - same_node: - asprintf(error, "can't link a node to itself"); - return NULL; - was_linked: - asprintf(error, "input port was already linked"); - return NULL; - no_mem: - return NULL; -} - int pw_port_pause_rt(struct pw_port *port) { int res; @@ -181,63 +210,6 @@ int pw_port_pause_rt(struct pw_port *port) return res; } -static int -do_remove_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct pw_port *port = user_data; - struct pw_link *link = ((struct pw_link **) data)[0]; - - if (port->direction == PW_DIRECTION_INPUT) { - pw_port_pause_rt(link->rt.input); - spa_list_remove(&link->rt.input_link); - link->rt.input = NULL; - } else { - pw_port_pause_rt(link->rt.output); - spa_list_remove(&link->rt.output_link); - link->rt.output = NULL; - } - return SPA_RESULT_OK; -} - -int pw_port_unlink(struct pw_port *port, struct pw_link *link) -{ - int res; - struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); - struct pw_node *node = port->node; - - pw_log_debug("port %p: start unlink %p", port, link); - - res = pw_loop_invoke(node->data_loop->loop, - do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, true, port); - - if (port->state > PW_PORT_STATE_PAUSED) - port_update_state (port, PW_PORT_STATE_PAUSED); - - pw_log_debug("port %p: finish unlink", port); - if (port->direction == PW_DIRECTION_OUTPUT) { - if (link->output) { - spa_list_remove(&link->output_link); - node->n_used_output_links--; - link->output = NULL; - } - } else { - if (link->input) { - spa_list_remove(&link->input_link); - node->n_used_input_links--; - link->input = NULL; - } - } - - if (!port->allocated) - pw_port_use_buffers(port, NULL, 0); - - if (node->n_used_output_links == 0 && node->n_used_input_links == 0) - pw_node_update_state(node, PW_NODE_STATE_IDLE, NULL); - - return res; -} - static int do_port_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) diff --git a/pipewire/server/port.h b/pipewire/server/port.h index bcd0babef..c3184db99 100644 --- a/pipewire/server/port.h +++ b/pipewire/server/port.h @@ -75,8 +75,10 @@ struct pw_port { void *multiplex; /**< optional port buffer mix/split */ struct { - struct spa_list links; /**< list of \ref pw_link only accessed from the - * data thread */ + struct spa_graph *graph; + struct spa_graph_port port; + struct spa_graph_port mix_port; + struct spa_graph_node mix_node; } rt; /**< data only accessed from the data thread */ }; @@ -88,20 +90,6 @@ pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id) /** Destroy a port \memberof pw_port */ void pw_port_destroy(struct pw_port *port); -/** Link two ports with an optional filter \memberof pw_port - * \return a newly allocated \ref pw_link or NULL and \a error is set. - * - * If the ports were already linked, the existing link will be returned. */ -struct pw_link * -pw_port_link(struct pw_port *output_port, /**< output port */ - struct pw_port *input_port, /**< input port */ - struct spa_format *format_filter, /**< optional filter */ - struct pw_properties *properties, /**< extra properties */ - char **error /**< result error message or NULL */); - -/** Unlink a port \memberof pw_port */ -int pw_port_unlink(struct pw_port *port, struct pw_link *link); - /** Set a format on a port \memberof pw_port */ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format); diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h new file mode 100644 index 000000000..6d12714c4 --- /dev/null +++ b/spa/include/spa/graph-scheduler1.h @@ -0,0 +1,137 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +struct spa_graph_scheduler { + struct spa_graph *graph; + struct spa_list pending; + struct spa_graph_node *node; +}; + +static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, + struct spa_graph *graph) +{ + sched->graph = graph; + spa_list_init(&sched->pending); + sched->node = NULL; +} + +static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +{ + int res; + struct spa_node *n = node->user_data; + + if (node->action == SPA_GRAPH_ACTION_IN) + res = spa_node_process_input(n); + else if (node->action == SPA_GRAPH_ACTION_OUT) + res = spa_node_process_output(n); + else + res = SPA_RESULT_ERROR; + + return res; +} + +static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched) +{ + bool res; + struct spa_graph *graph = sched->graph; + struct spa_graph_port *p; + struct spa_graph_node *n; + + res = !spa_list_is_empty(&graph->ready); + if (res) { + n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link); + + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + + debug("node %p action %d state %d\n", n, n->action, n->state); + + switch (n->action) { + case SPA_GRAPH_ACTION_IN: + case SPA_GRAPH_ACTION_OUT: + n->state = n->schedule(n); + debug("node %p scheduled action %d state %d\n", n, n->action, n->state); + if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node) + break; + n->action = SPA_GRAPH_ACTION_CHECK; + spa_list_insert(graph->ready.prev, &n->ready_link); + break; + + case SPA_GRAPH_ACTION_CHECK: + if (n->state == SPA_RESULT_NEED_BUFFER) { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_node *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + if (pn != sched->node + || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { + pn->action = SPA_GRAPH_ACTION_OUT; + spa_list_insert(graph->ready.prev, + &pn->ready_link); + } + } else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; + } + } else if (n->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check(graph, p->peer); + } + break; + + default: + break; + } + res = !spa_list_is_empty(&graph->ready); + } + return res; +} + +static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + debug("node %p start pull\n", node); + node->action = SPA_GRAPH_ACTION_CHECK; + node->state = SPA_RESULT_NEED_BUFFER; + sched->node = node; + if (node->ready_link.next == NULL) + spa_list_insert(sched->graph->ready.prev, &node->ready_link); +} + +static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + debug("node %p start push\n", node); + node->action = SPA_GRAPH_ACTION_OUT; + sched->node = node; + if (node->ready_link.next == NULL) + spa_list_insert(sched->graph->ready.prev, &node->ready_link); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph-scheduler2.h b/spa/include/spa/graph-scheduler2.h new file mode 100644 index 000000000..4615bc507 --- /dev/null +++ b/spa/include/spa/graph-scheduler2.h @@ -0,0 +1,153 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +{ + int res; + struct spa_node *n = node->user_data; + + if (node->action == SPA_GRAPH_ACTION_IN) + res = spa_node_process_input(n); + else if (node->action == SPA_GRAPH_ACTION_OUT) + res = spa_node_process_output(n); + else + res = SPA_RESULT_ERROR; + + return res; +} + +static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph) +{ + bool empty; + struct spa_graph_port *p; + struct spa_graph_node *n; + int iter = 1; + uint32_t action; + +next: + empty = spa_list_is_empty(&graph->ready); + if (empty && !spa_list_is_empty(&graph->pending)) { + debug("copy pending\n"); + spa_list_insert_list(&graph->ready, &graph->pending); + spa_list_init(&graph->pending); + empty = false; + } + if (iter-- == 0 || empty) + return !empty; + + n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link); + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + + action = n->action; + + debug("node %p action %d, state %d\n", n, action, n->state); + + switch (action) { + case SPA_GRAPH_ACTION_IN: + case SPA_GRAPH_ACTION_OUT: + case SPA_GRAPH_ACTION_END: + if (action == SPA_GRAPH_ACTION_END) + n->action = SPA_GRAPH_ACTION_OUT; + + n->state = n->schedule(n); + debug("node %p schedule %d res %d\n", n, action, n->state); + + if (action == SPA_GRAPH_ACTION_IN && n == graph->node) + break; + + if (action != SPA_GRAPH_ACTION_END) { + debug("node %p add ready for CHECK\n", n); + n->action = SPA_GRAPH_ACTION_CHECK; + spa_list_insert(graph->ready.prev, &n->ready_link); + } + else { + spa_graph_node_update(graph, n); + } + break; + + case SPA_GRAPH_ACTION_CHECK: + if (n->state == SPA_RESULT_NEED_BUFFER) { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_node *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + if (pn != graph->node + || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { + pn->action = SPA_GRAPH_ACTION_OUT; + debug("node %p add ready OUT\n", n); + spa_list_insert(graph->ready.prev, + &pn->ready_link); + } + } else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; + } + } + else if (n->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check(graph, p->peer); + + debug("node %p add pending\n", n); + n->action = SPA_GRAPH_ACTION_END; + spa_list_insert(&graph->pending, &n->ready_link); + } + else if (n->state == SPA_RESULT_OK) { + spa_graph_node_update(graph, n); + } + break; + + default: + break; + } + goto next; +} + +static inline void spa_graph_scheduler_pull(struct spa_graph *graph, struct spa_graph_node *node) +{ + node->action = SPA_GRAPH_ACTION_CHECK; + node->state = SPA_RESULT_NEED_BUFFER; + graph->node = node; + debug("node %p start pull\n", node); + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); +} + +static inline void spa_graph_scheduler_push(struct spa_graph *graph, struct spa_graph_node *node) +{ + node->action = SPA_GRAPH_ACTION_OUT; + graph->node = node; + debug("node %p start push\n", node); + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h new file mode 100644 index 000000000..405c51bc8 --- /dev/null +++ b/spa/include/spa/graph-scheduler3.h @@ -0,0 +1,173 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_SCHEDULER_H__ +#define __SPA_GRAPH_SCHEDULER_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +struct spa_graph_scheduler { + struct spa_graph *graph; + struct spa_list pending; + struct spa_graph_node *node; +}; + +static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, + struct spa_graph *graph) +{ + sched->graph = graph; + spa_list_init(&sched->pending); + sched->node = NULL; +} + +static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +{ + int res; + struct spa_node *n = node->user_data; + + if (node->action == SPA_GRAPH_ACTION_IN) + res = spa_node_process_input(n); + else if (node->action == SPA_GRAPH_ACTION_OUT) + res = spa_node_process_output(n); + else + res = SPA_RESULT_ERROR; + + return res; +} + +static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + struct spa_graph_port *p; + struct spa_graph_node *n, *t; + struct spa_list ready; + + debug("node %p start pull\n", node); + + spa_list_init(&ready); + + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + struct spa_graph_port *pport = p->peer; + struct spa_graph_node *pnode = pport->node; + debug("node %p peer %p io %d\n", node, pnode, pport->io->status); + if (pport->io->status == SPA_RESULT_NEED_BUFFER) { + spa_list_insert(ready.prev, &pnode->ready_link); + } + else if (pport->io->status == SPA_RESULT_OK && !(pnode->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + + spa_list_for_each_safe(n, t, &ready, ready_link) { + n->action = SPA_GRAPH_ACTION_OUT; + n->state = n->schedule(n); + debug("peer %p scheduled %d %d\n", n, n->action, n->state); + if (n->state == SPA_RESULT_NEED_BUFFER) + spa_graph_scheduler_pull(sched, n); + else { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { + if (p->io->status == SPA_RESULT_HAVE_BUFFER) + node->ready_in++; + } + } + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + } + + debug("node %p %d %d\n", node, node->ready_in, node->required_in); + + if (node->required_in > 0 && node->ready_in == node->required_in) { + node->action = SPA_GRAPH_ACTION_IN; + node->state = node->schedule(node); + debug("node %p scheduled %d %d\n", node, node->action, node->state); + if (node->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + if (p->io->status == SPA_RESULT_HAVE_BUFFER) + p->peer->node->ready_in++; + } + } + } +} + +static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched) +{ + return false; +} + + +static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) +{ + struct spa_graph_port *p; + struct spa_graph_node *n, *t; + struct spa_list ready; + + debug("node %p start push\n", node); + + spa_list_init(&ready); + + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + struct spa_graph_port *pport = p->peer; + struct spa_graph_node *pnode = pport->node; + if (pport->io->status == SPA_RESULT_HAVE_BUFFER) + pnode->ready_in++; + + debug("node %p peer %p io %d %d %d\n", node, pnode, pport->io->status, + pnode->ready_in, pnode->required_in); + + if (pnode->required_in > 0 && pnode->ready_in == pnode->required_in) + spa_list_insert(ready.prev, &pnode->ready_link); + } + + spa_list_for_each_safe(n, t, &ready, ready_link) { + n->action = SPA_GRAPH_ACTION_IN; + n->state = n->schedule(n); + debug("peer %p scheduled %d %d\n", n, n->action, n->state); + if (n->state == SPA_RESULT_HAVE_BUFFER) + spa_graph_scheduler_push(sched, n); + else { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + } + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + } + + node->action = SPA_GRAPH_ACTION_OUT; + node->state = node->schedule(node); + debug("node %p scheduled %d %d\n", node, node->action, node->state); + if (node->state == SPA_RESULT_NEED_BUFFER) { + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + p->peer->node->ready_in++; + } + } +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_SCHEDULER_H__ */ diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index b65556618..4a8e9749f 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -26,6 +26,13 @@ extern "C" { #include #include +#include + +#if 0 +#define debug(...) printf(__VA_ARGS__) +#else +#define debug(...) +#endif struct spa_graph; struct spa_graph_node; @@ -34,7 +41,6 @@ struct spa_graph_port; struct spa_graph { struct spa_list nodes; struct spa_list ready; - struct spa_graph_node *node; }; typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node); @@ -73,20 +79,6 @@ static inline void spa_graph_init(struct spa_graph *graph) spa_list_init(&graph->ready); } -static inline int spa_graph_node_schedule_default(struct spa_graph_node *node) -{ - int res; - struct spa_node *n = node->user_data; - - if (node->action == SPA_GRAPH_ACTION_IN) - res = spa_node_process_input(n); - else if (node->action == SPA_GRAPH_ACTION_OUT) - res = spa_node_process_output(n); - else - res = SPA_RESULT_ERROR; - return res; -} - static inline void spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node, spa_graph_node_func_t schedule, void *user_data) @@ -94,12 +86,14 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node, spa_list_init(&node->ports[SPA_DIRECTION_INPUT]); spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]); node->flags = 0; - node->state = SPA_RESULT_OK; + node->state = SPA_RESULT_NEED_BUFFER; node->action = SPA_GRAPH_ACTION_OUT; node->schedule = schedule; node->user_data = user_data; + node->ready_link.next = NULL; spa_list_insert(graph->nodes.prev, &node->link); node->max_in = node->required_in = node->ready_in = 0; + debug("node %p add\n", node); } static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port) @@ -109,6 +103,8 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap if (port->io->status == SPA_RESULT_HAVE_BUFFER) node->ready_in++; + debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); + if (node->required_in > 0 && node->ready_in == node->required_in) { node->action = SPA_GRAPH_ACTION_IN; if (node->ready_link.next == NULL) @@ -119,6 +115,17 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap } } +static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) { + struct spa_graph_port *p; + + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + debug("node %p update %d ready\n", node, node->ready_in); +} + static inline void spa_graph_port_add(struct spa_graph *graph, struct spa_graph_node *node, @@ -128,13 +135,13 @@ spa_graph_port_add(struct spa_graph *graph, uint32_t flags, struct spa_port_io *io) { + debug("port %p add %d to node %p \n", port, direction, node); port->node = node; port->direction = direction; port->port_id = port_id; port->flags = flags; port->io = io; - port->peer = NULL; - spa_list_insert(node->ports[port->direction].prev, &port->link); + spa_list_insert(node->ports[direction].prev, &port->link); node->max_in++; if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT) node->required_in++; @@ -143,96 +150,36 @@ spa_graph_port_add(struct spa_graph *graph, static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node) { + debug("node %p remove\n", node); spa_list_remove(&node->link); + if (node->ready_link.next) + spa_list_remove(&node->ready_link); } static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_graph_port *port) { + debug("port %p remove\n", port); spa_list_remove(&port->link); + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT) + port->node->required_in--; } static inline void spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct spa_graph_port *in) { + debug("port %p link to %p \n", out, in); out->peer = in; in->peer = out; } static inline void -spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *out, - struct spa_graph_port *in) +spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *port) { - out->peer = NULL; - in->peer = NULL; -} - -static inline bool spa_graph_node_iterate(struct spa_graph *graph) -{ - bool res; - struct spa_graph_port *p; - - res = !spa_list_is_empty(&graph->ready); - if (res) { - struct spa_graph_node *n = - spa_list_first(&graph->ready, struct spa_graph_node, ready_link); - - spa_list_remove(&n->ready_link); - n->ready_link.next = NULL; - - switch (n->action) { - case SPA_GRAPH_ACTION_IN: - case SPA_GRAPH_ACTION_OUT: - n->state = n->schedule(n); - if (n->action == SPA_GRAPH_ACTION_IN && n == graph->node) - break; - n->action = SPA_GRAPH_ACTION_CHECK; - spa_list_insert(graph->ready.prev, &n->ready_link); - break; - - case SPA_GRAPH_ACTION_CHECK: - if (n->state == SPA_RESULT_NEED_BUFFER) { - n->ready_in = 0; - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - struct spa_graph_node *pn = p->peer->node; - if (p->io->status == SPA_RESULT_NEED_BUFFER) { - if (pn != graph->node - || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { - pn->action = SPA_GRAPH_ACTION_OUT; - spa_list_insert(graph->ready.prev, - &pn->ready_link); - } - } else if (p->io->status == SPA_RESULT_OK) - n->ready_in++; - } - } else if (n->state == SPA_RESULT_HAVE_BUFFER) { - spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) - spa_graph_port_check(graph, p->peer); - } - break; - - default: - break; - } - res = !spa_list_is_empty(&graph->ready); + debug("port %p unlink from %p \n", port, port->peer); + if (port->peer) { + port->peer->peer = NULL; + port->peer = NULL; } - return res; -} - -static inline void spa_graph_node_pull(struct spa_graph *graph, struct spa_graph_node *node) -{ - node->action = SPA_GRAPH_ACTION_CHECK; - node->state = SPA_RESULT_NEED_BUFFER; - graph->node = node; - if (node->ready_link.next == NULL) - spa_list_insert(graph->ready.prev, &node->ready_link); -} - -static inline void spa_graph_node_push(struct spa_graph *graph, struct spa_graph_node *node) -{ - node->action = SPA_GRAPH_ACTION_OUT; - graph->node = node; - if (node->ready_link.next == NULL) - spa_list_insert(graph->ready.prev, &node->ready_link); } #ifdef __cplusplus diff --git a/spa/include/spa/list.h b/spa/include/spa/list.h index 4e6c70239..492bb4847 100644 --- a/spa/include/spa/list.h +++ b/spa/include/spa/list.h @@ -45,6 +45,14 @@ static inline void spa_list_insert(struct spa_list *list, struct spa_list *elem) elem->next->prev = elem; } +static inline void spa_list_insert_list(struct spa_list *list, struct spa_list *other) +{ + other->next->prev = list; + other->prev->next = list->next; + list->next->prev = other->prev; + list->next = other->next; +} + static inline void spa_list_remove(struct spa_list *elem) { elem->prev->next = elem->next; diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index 404ea0eb4..f215da860 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -36,6 +36,7 @@ #include #include #include +#include static SPA_TYPE_MAP_IMPL(default_map, 4096); static SPA_LOG_IMPL(default_log); @@ -97,6 +98,7 @@ struct data { uint32_t n_support; struct spa_graph graph; + struct spa_graph_scheduler sched; struct spa_graph_node source_node; struct spa_graph_port source_out; struct spa_graph_port volume_in; @@ -229,9 +231,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data) { struct data *data = user_data; - spa_graph_node_pull(&data->graph, &data->sink_node); + spa_graph_scheduler_pull(&data->sched, &data->sink_node); - while (spa_graph_node_iterate(&data->graph)); + while (spa_graph_scheduler_iterate(&data->sched)); } static void @@ -338,12 +340,12 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]); spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]); - spa_graph_node_add(&data->graph, &data->source_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->source_node, spa_graph_scheduler_default, data->source); spa_graph_port_add(&data->graph, &data->source_node, &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]); - spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_scheduler_default, data->volume); spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]); @@ -353,7 +355,7 @@ static int make_nodes(struct data *data, const char *device) spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, data->sink); spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]); @@ -528,6 +530,7 @@ int main(int argc, char *argv[]) spa_graph_init(&data.graph); + spa_graph_scheduler_init(&data.sched, &data.graph); data.map = &default_map.map; data.log = &default_log.log; diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 99b614f0d..136ec4a64 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -99,6 +100,7 @@ struct data { uint32_t n_support; struct spa_graph graph; + struct spa_graph_scheduler sched; struct spa_graph_node source1_node; struct spa_graph_port source1_out; struct spa_graph_node source2_node; @@ -240,8 +242,8 @@ static void on_sink_need_input(struct spa_node *node, void *user_data) { struct data *data = user_data; #ifdef USE_GRAPH - spa_graph_node_pull(&data->graph, &data->sink_node); - while (spa_graph_node_iterate(&data->graph)); + spa_graph_scheduler_pull(&data->sched, &data->sink_node); + while (spa_graph_scheduler_iterate(&data->sched)); #else int res; @@ -416,17 +418,17 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); #ifdef USE_GRAPH - spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_scheduler_default, data->source1); spa_graph_port_add(&data->graph, &data->source1_node, &data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]); - spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_scheduler_default, data->source2); spa_graph_port_add(&data->graph, &data->source2_node, &data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]); - spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_scheduler_default, data->mix); spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[0], SPA_DIRECTION_INPUT, data->mix_ports[0], 0, &data->source1_mix_io[0]); @@ -439,7 +441,7 @@ static int make_nodes(struct data *data, const char *device) spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, data->sink); spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]); @@ -646,6 +648,7 @@ int main(int argc, char *argv[]) data.data_loop.invoke = do_invoke; spa_graph_init(&data.graph); + spa_graph_scheduler_init(&data.sched, &data.graph); if ((str = getenv("SPA_DEBUG"))) data.log->level = atoi(str); diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index eb22eff73..e2b0ee022 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -34,6 +34,7 @@ #include #include #include +#include #define MODE_SYNC_PUSH (1<<0) #define MODE_SYNC_PULL (1<<1) @@ -102,6 +103,7 @@ struct data { int iterations; struct spa_graph graph; + struct spa_graph_scheduler sched; struct spa_graph_node source_node; struct spa_graph_port source_out; struct spa_graph_port sink_in; @@ -223,8 +225,8 @@ static void on_sink_pull(struct data *data) spa_node_process_output(data->source); spa_node_process_input(data->sink); } else { - spa_graph_node_pull(&data->graph, &data->sink_node); - while (spa_graph_node_iterate(&data->graph)); + spa_graph_scheduler_pull(&data->sched, &data->sink_node); + while (spa_graph_scheduler_iterate(&data->sched)); } } @@ -235,8 +237,8 @@ static void on_source_push(struct data *data) spa_node_process_output(data->source); spa_node_process_input(data->sink); } else { - spa_graph_node_push(&data->graph, &data->source_node); - while (spa_graph_node_iterate(&data->graph)); + spa_graph_scheduler_push(&data->sched, &data->source_node); + while (spa_graph_scheduler_iterate(&data->sched)); } } @@ -365,12 +367,12 @@ static int make_nodes(struct data *data) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]); spa_graph_node_add(&data->graph, &data->source_node, - spa_graph_node_schedule_default, data->source); + spa_graph_scheduler_default, data->source); data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; spa_graph_port_add(&data->graph, &data->source_node, &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default, + spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, data->sink); data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; spa_graph_port_add(&data->graph, &data->sink_node, @@ -526,6 +528,7 @@ int main(int argc, char *argv[]) const char *str; spa_graph_init(&data.graph); + spa_graph_scheduler_init(&data.sched, &data.graph); data.map = &default_map.map; data.log = &default_log.log;