diff --git a/pipewire/client/meson.build b/pipewire/client/meson.build index d062d2614..a9c258b67 100644 --- a/pipewire/client/meson.build +++ b/pipewire/client/meson.build @@ -64,5 +64,5 @@ libpipewire = shared_library('pipewire-@0@'.format(apiversion), pipewire_sources pipewire_dep = declare_dependency(link_with : libpipewire, include_directories : [configinc, spa_inc], - dependencies : [pthread_lib], + dependencies : [pthread_lib,spalib_dep], ) diff --git a/pipewire/modules/module-mixer.c b/pipewire/modules/module-mixer.c index ff133f846..cf010f374 100644 --- a/pipewire/modules/module-mixer.c +++ b/pipewire/modules/module-mixer.c @@ -46,7 +46,7 @@ static const struct spa_handle_factory *find_factory(struct impl *impl) char *filename; const char *dir; - if ((dir = getenv("SPA_PLUIGIN_DIR")) == NULL) + if ((dir = getenv("SPA_PLUGIN_DIR")) == NULL) dir = PLUGINDIR; asprintf(&filename, "%s/%s.so", dir, AUDIOMIXER_LIB); @@ -157,6 +157,9 @@ static struct impl *module_new(struct pw_core *core, struct pw_properties *prope if (op == NULL) continue; + n->idle_used_input_links++; + node->idle_used_output_links++; + pw_link_new(core, op, ip, NULL, NULL, &error); } return impl; diff --git a/pipewire/modules/spa/spa-node.h b/pipewire/modules/spa/spa-node.h index c03bbd9c7..de857ad12 100644 --- a/pipewire/modules/spa/spa-node.h +++ b/pipewire/modules/spa/spa-node.h @@ -37,7 +37,8 @@ struct pw_spa_node { }; typedef int (*setup_node_t) (struct pw_core *core, - struct spa_node *spa_node, struct pw_properties *pw_props); + struct spa_node *spa_node, + struct pw_properties *pw_props); struct pw_spa_node * pw_spa_node_load(struct pw_core *core, diff --git a/pipewire/server/link.c b/pipewire/server/link.c index 3f0855705..3c90f7bbe 100644 --- a/pipewire/server/link.c +++ b/pipewire/server/link.c @@ -381,8 +381,8 @@ spa_node_param_filter(struct pw_link *this, struct spa_pod_frame f; uint32_t offset; - if (spa_node_port_enum_params - (out_node, SPA_DIRECTION_OUTPUT, out_port, oidx, &oparam) < 0) + if (spa_node_port_enum_params(out_node, SPA_DIRECTION_OUTPUT, + out_port, oidx, &oparam) < 0) break; if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) @@ -589,7 +589,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s } if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = pw_port_alloc_buffers(this->output, params, n_params, + if ((res = pw_port_alloc_buffers(this->output, + params, n_params, impl->buffers, &impl->n_buffers)) < 0) { asprintf(&error, "error alloc output buffers: %d", res); goto error; @@ -602,7 +603,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s pw_log_debug("allocated %d buffers %p from output port", impl->n_buffers, impl->buffers); } else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = pw_port_alloc_buffers(this->input, params, n_params, + if ((res = pw_port_alloc_buffers(this->input, + params, n_params, impl->buffers, &impl->n_buffers)) < 0) { asprintf(&error, "error alloc input buffers: %d", res); goto error; @@ -619,7 +621,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { pw_log_debug("using %d buffers %p on input port", impl->n_buffers, impl->buffers); - if ((res = pw_port_use_buffers(this->input, impl->buffers, impl->n_buffers)) < 0) { + if ((res = pw_port_use_buffers(this->input, + impl->buffers, impl->n_buffers)) < 0) { asprintf(&error, "error use input buffers: %d", res); goto error; } @@ -627,7 +630,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input); } else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { pw_log_debug("using %d buffers %p on output port", impl->n_buffers, impl->buffers); - if ((res = pw_port_use_buffers(this->output, impl->buffers, impl->n_buffers)) < 0) { + if ((res = pw_port_use_buffers(this->output, + impl->buffers, impl->n_buffers)) < 0) { asprintf(&error, "error use output buffers: %d", res); goto error; } @@ -758,6 +762,14 @@ on_output_async_complete_notify(struct pw_listener *listener, pw_work_queue_complete(impl->work, node, seq, res); } +static void clear_port_buffers(struct pw_link *link, struct pw_port *port) +{ + struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this); + + if (impl->buffer_owner != port) + pw_port_use_buffers(port, NULL, 0); +} + static int do_remove_input(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) @@ -775,8 +787,11 @@ static void input_remove(struct pw_link *this, struct pw_port *port) pw_log_debug("link %p: remove input port %p", this, port); pw_signal_remove(&impl->input_port_destroy); pw_signal_remove(&impl->input_async_complete); + pw_loop_invoke(port->node->data_loop->loop, do_remove_input, 1, sizeof(struct pw_port*), &port, true, this); + + clear_port_buffers(this, this->input); } static int @@ -796,8 +811,11 @@ static void output_remove(struct pw_link *this, struct pw_port *port) pw_log_debug("link %p: remove output port %p", this, port); pw_signal_remove(&impl->output_port_destroy); pw_signal_remove(&impl->output_async_complete); + pw_loop_invoke(port->node->data_loop->loop, do_remove_output, 1, sizeof(struct pw_port*), &port, true, this); + + clear_port_buffers(this, this->output); } static void on_port_destroy(struct pw_link *this, struct pw_port *port) @@ -807,11 +825,9 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port) if (port == this->input) { input_remove(this, port); - this->input = NULL; other = this->output; } else if (port == this->output) { output_remove(this, port); - this->output = NULL; other = this->input; } else return; @@ -820,7 +836,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port) impl->buffers = NULL; impl->n_buffers = 0; - pw_log_debug("link %p: clear input allocated buffers on port %p", this, other); + pw_log_debug("link %p: clear allocated buffers on port %p", this, other); pw_port_use_buffers(other, NULL, 0); impl->buffer_owner = NULL; } @@ -845,6 +861,15 @@ static void on_output_port_destroy(struct pw_listener *listener, struct pw_port on_port_destroy(&impl->this, port); } +static int +do_activate_link(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + spa_graph_port_link(this->output->node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port); + return SPA_RESULT_OK; +} + bool pw_link_activate(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); @@ -855,15 +880,70 @@ bool pw_link_activate(struct pw_link *this) impl->active = true; pw_log_debug("link %p: activate", this); + pw_loop_invoke(this->output->node->data_loop->loop, + do_activate_link, SPA_ID_INVALID, 0, NULL, false, this); + + this->output->node->n_used_output_links++; + this->input->node->n_used_input_links++; + pw_work_queue_add(impl->work, this, SPA_RESULT_WAIT_SYNC, (pw_work_func_t) check_states, this); + return true; } +static int +do_deactivate_link(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_link *this = user_data; + spa_graph_port_unlink(this->output->node->rt.sched->graph, &this->rt.out_port); + return SPA_RESULT_OK; +} + bool pw_link_deactivate(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + struct pw_node *input_node, *output_node; + + if (!impl->active) + return true; + impl->active = false; + pw_log_debug("link %p: deactivate", this); + pw_loop_invoke(this->output->node->data_loop->loop, + do_deactivate_link, SPA_ID_INVALID, 0, NULL, true, this); + + input_node = this->input->node; + output_node = this->output->node; + + input_node->n_used_input_links--; + output_node->n_used_output_links--; + + pw_log_debug("link %p: in %d %d, out %d %d, %d %d %d %d", this, + input_node->n_used_input_links, + input_node->n_used_output_links, + output_node->n_used_input_links, + output_node->n_used_output_links, + input_node->idle_used_input_links, + input_node->idle_used_output_links, + output_node->idle_used_input_links, + output_node->idle_used_output_links); + + if (input_node->n_used_input_links <= input_node->idle_used_input_links && + input_node->n_used_output_links <= input_node->idle_used_output_links && + input_node->info.state > PW_NODE_STATE_IDLE) { + pw_node_update_state(input_node, PW_NODE_STATE_IDLE, NULL); + this->input->state = PW_PORT_STATE_PAUSED; + } + + if (output_node->n_used_input_links <= output_node->idle_used_input_links && + output_node->n_used_output_links <= output_node->idle_used_output_links && + output_node->info.state > PW_NODE_STATE_IDLE) { + pw_node_update_state(output_node, PW_NODE_STATE_IDLE, NULL); + this->output->state = PW_PORT_STATE_PAUSED; + } + return true; } @@ -998,22 +1078,17 @@ struct pw_link *pw_link_new(struct pw_core *core, spa_list_insert(output->links.prev, &this->output_link); spa_list_insert(input->links.prev, &this->input_link); - output_node->n_used_output_links++; - input_node->n_used_input_links++; - spa_list_insert(core->link_list.prev, &this->link); pw_core_add_global(core, NULL, core->type.link, 0, this, link_bind_func, &this->global); this->info.id = this->global->id; - this->info.output_node_id = output ? output->node->global->id : -1; + this->info.output_node_id = output ? output_node->global->id : -1; this->info.output_port_id = output ? output->port_id : -1; - this->info.input_node_id = input ? input->node->global->id : -1; + this->info.input_node_id = input ? input_node->global->id : -1; this->info.input_port_id = input ? input->port_id : -1; this->info.format = NULL; - spa_graph_port_link(output_node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port); - pw_loop_invoke(output_node->data_loop->loop, do_add_link, SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this); @@ -1034,15 +1109,6 @@ struct pw_link *pw_link_new(struct pw_core *core, return NULL; } -static void clear_port_buffers(struct pw_link *link, struct pw_port *port) -{ - struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this); - - if (impl->buffer_owner != port) - pw_port_use_buffers(port, NULL, 0); -} - - void pw_link_destroy(struct pw_link *link) { struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this); @@ -1051,42 +1117,19 @@ void pw_link_destroy(struct pw_link *link) pw_log_debug("link %p: destroy", impl); pw_signal_emit(&link->destroy_signal, link); + pw_link_deactivate(link); + pw_global_destroy(link->global); spa_list_remove(&link->link); spa_list_for_each_safe(resource, tmp, &link->resource_list, link) pw_resource_destroy(resource); - if (link->input) { - input_remove(link, link->input); + input_remove(link, link->input); + spa_list_remove(&link->input_link); - spa_list_remove(&link->input_link); - link->input->node->n_used_input_links--; - - clear_port_buffers(link, link->input); - - if (link->input->node->n_used_input_links == 0 && - link->input->node->n_used_output_links == 0 && - link->input->node->info.state > PW_NODE_STATE_IDLE) - pw_node_update_state(link->input->node, PW_NODE_STATE_IDLE, NULL); - - link->input = NULL; - } - if (link->output) { - output_remove(link, link->output); - - spa_list_remove(&link->output_link); - link->output->node->n_used_output_links--; - - clear_port_buffers(link, link->output); - - if (link->output->node->n_used_input_links == 0 && - link->output->node->n_used_output_links == 0 && - link->output->node->info.state > PW_NODE_STATE_IDLE) - pw_node_update_state(link->output->node, PW_NODE_STATE_IDLE, NULL); - - link->output = NULL; - } + output_remove(link, link->output); + spa_list_remove(&link->output_link); pw_work_queue_destroy(impl->work); diff --git a/pipewire/server/node.c b/pipewire/server/node.c index a2daafa9e..54d90bf56 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -640,7 +640,7 @@ struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction di port = spa_list_first(ports, struct pw_port, link); /* for output we can reuse an existing port, for input only * when there is a multiplex */ - if (direction == PW_DIRECTION_INPUT && port->multiplex == NULL) + if (direction == PW_DIRECTION_INPUT && port->mix == NULL) port = NULL; } } @@ -664,10 +664,28 @@ static void on_state_complete(struct pw_node *node, void *data, int res) pw_node_update_state(node, state, error); } +static void node_deactivate(struct pw_node *this) +{ + struct pw_port *port; + + pw_log_debug("node %p: deactivate", this); + spa_list_for_each(port, &this->input_ports, link) { + struct pw_link *link; + spa_list_for_each(link, &port->links, input_link) + pw_link_deactivate(link); + } + spa_list_for_each(port, &this->output_ports, link) { + struct pw_link *link; + spa_list_for_each(link, &port->links, output_link) + pw_link_deactivate(link); + } +} + static void node_activate(struct pw_node *this) { struct pw_port *port; + pw_log_debug("node %p: activate", this); spa_list_for_each(port, &this->input_ports, link) { struct pw_link *link; spa_list_for_each(link, &port->links, input_link) @@ -754,6 +772,9 @@ void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char * node->info.error = error; node->info.state = state; + if (state == PW_NODE_STATE_IDLE) + node_deactivate(node); + pw_signal_emit(&node->state_changed, node, old, state); node->info.change_mask = 1 << 5; diff --git a/pipewire/server/node.h b/pipewire/server/node.h index 88b916e37..379eb3643 100644 --- a/pipewire/server/node.h +++ b/pipewire/server/node.h @@ -84,10 +84,12 @@ struct pw_node { struct spa_list input_ports; /**< list of input ports */ struct pw_port **input_port_map; /**< map from port_id to port */ uint32_t n_used_input_links; /**< number of active input links */ + uint32_t idle_used_input_links; /**< number of active input to be idle */ struct spa_list output_ports; /**< list of output ports */ struct pw_port **output_port_map; /**< map from port_id to port */ uint32_t n_used_output_links; /**< number of active output links */ + uint32_t idle_used_output_links; /**< number of active output to be idle */ /** Emited when a new port is added */ PW_SIGNAL(port_added, (struct pw_listener *listener, diff --git a/pipewire/server/port.c b/pipewire/server/port.c index bfa0b3e67..2d8c81877 100644 --- a/pipewire/server/port.c +++ b/pipewire/server/port.c @@ -85,12 +85,13 @@ static int schedule_mix(struct spa_graph_node *node) io->status = SPA_RESULT_NEED_BUFFER; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) *p->io = *io; + io->buffer_id = SPA_ID_INVALID; res = SPA_RESULT_NEED_BUFFER; } else res = SPA_RESULT_ERROR; - return res; + return res; } static int do_add_port(struct spa_loop *loop, @@ -194,28 +195,17 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state) } } -int pw_port_pause_rt(struct pw_port *port) -{ - int res; - - if (port->state <= PW_PORT_STATE_PAUSED) - return SPA_RESULT_OK; - - res = spa_node_port_send_command(port->node->node, - port->direction, - port->port_id, - &SPA_COMMAND_INIT(port->node->core->type.command_node. - Pause)); - port_update_state (port, PW_PORT_STATE_PAUSED); - return res; -} - static int do_port_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_port *port = user_data; - return pw_port_pause_rt(port); + + return spa_node_port_send_command(port->node->node, + port->direction, + port->port_id, + &SPA_COMMAND_INIT(port->node->core->type.command_node. + Pause)); } int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) @@ -223,7 +213,6 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format * int res; res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format); - pw_log_debug("port %p: set format %d", port, res); if (!SPA_RESULT_IS_ASYNC(res)) { diff --git a/pipewire/server/port.h b/pipewire/server/port.h index c3184db99..be548ebc1 100644 --- a/pipewire/server/port.h +++ b/pipewire/server/port.h @@ -72,7 +72,7 @@ struct pw_port { struct spa_list links; /**< list of \ref pw_link */ - void *multiplex; /**< optional port buffer mix/split */ + void *mix; /**< optional port buffer mix/split */ struct { struct spa_graph *graph; @@ -102,9 +102,6 @@ int pw_port_alloc_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t *n_buffers); -/** Pause a port, should be called from data thread \memberof pw_port */ -int pw_port_pause_rt(struct pw_port *port); - #ifdef __cplusplus } #endif diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h index 6d12714c4..505d9f951 100644 --- a/spa/include/spa/graph-scheduler1.h +++ b/spa/include/spa/graph-scheduler1.h @@ -28,6 +28,7 @@ extern "C" { struct spa_graph_scheduler { struct spa_graph *graph; + struct spa_list ready; struct spa_list pending; struct spa_graph_node *node; }; @@ -36,6 +37,7 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, struct spa_graph *graph) { sched->graph = graph; + spa_list_init(&sched->ready); spa_list_init(&sched->pending); sched->node = NULL; } @@ -55,16 +57,34 @@ static inline int spa_graph_scheduler_default(struct spa_graph_node *node) return res; } +static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, struct spa_graph_port *port) +{ + struct spa_graph_node *node = port->node; + + if (port->io->status == SPA_RESULT_HAVE_BUFFER) + node->ready_in++; + + debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); + + if (node->required_in > 0 && node->ready_in == node->required_in) { + node->action = SPA_GRAPH_ACTION_IN; + if (node->ready_link.next == NULL) + spa_list_insert(sched->ready.prev, &node->ready_link); + } else if (node->ready_link.next) { + spa_list_remove(&node->ready_link); + node->ready_link.next = NULL; + } +} + static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched) { bool res; - struct spa_graph *graph = sched->graph; struct spa_graph_port *p; struct spa_graph_node *n; - res = !spa_list_is_empty(&graph->ready); + res = !spa_list_is_empty(&sched->ready); if (res) { - n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link); + n = spa_list_first(&sched->ready, struct spa_graph_node, ready_link); spa_list_remove(&n->ready_link); n->ready_link.next = NULL; @@ -79,7 +99,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node) break; n->action = SPA_GRAPH_ACTION_CHECK; - spa_list_insert(graph->ready.prev, &n->ready_link); + spa_list_insert(sched->ready.prev, &n->ready_link); break; case SPA_GRAPH_ACTION_CHECK: @@ -91,7 +111,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched if (pn != sched->node || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { pn->action = SPA_GRAPH_ACTION_OUT; - spa_list_insert(graph->ready.prev, + spa_list_insert(sched->ready.prev, &pn->ready_link); } } else if (p->io->status == SPA_RESULT_OK) @@ -99,14 +119,14 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched } } else if (n->state == SPA_RESULT_HAVE_BUFFER) { spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) - spa_graph_port_check(graph, p->peer); + spa_scheduler_port_check(sched, p->peer); } break; default: break; } - res = !spa_list_is_empty(&graph->ready); + res = !spa_list_is_empty(&sched->ready); } return res; } @@ -118,7 +138,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s node->state = SPA_RESULT_NEED_BUFFER; sched->node = node; if (node->ready_link.next == NULL) - spa_list_insert(sched->graph->ready.prev, &node->ready_link); + spa_list_insert(sched->ready.prev, &node->ready_link); } static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) @@ -127,7 +147,7 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s node->action = SPA_GRAPH_ACTION_OUT; sched->node = node; if (node->ready_link.next == NULL) - spa_list_insert(sched->graph->ready.prev, &node->ready_link); + spa_list_insert(sched->ready.prev, &node->ready_link); } #ifdef __cplusplus diff --git a/spa/include/spa/graph-scheduler2.h b/spa/include/spa/graph-scheduler2.h index 4615bc507..2b2f08a5c 100644 --- a/spa/include/spa/graph-scheduler2.h +++ b/spa/include/spa/graph-scheduler2.h @@ -41,6 +41,36 @@ static inline int spa_graph_scheduler_default(struct spa_graph_node *node) return res; } +static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port) +{ + struct spa_graph_node *node = port->node; + + if (port->io->status == SPA_RESULT_HAVE_BUFFER) + node->ready_in++; + + debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); + + if (node->required_in > 0 && node->ready_in == node->required_in) { + node->action = SPA_GRAPH_ACTION_IN; + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); + } else if (node->ready_link.next) { + spa_list_remove(&node->ready_link); + node->ready_link.next = NULL; + } +} + +static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) { + struct spa_graph_port *p; + + node->ready_in = 0; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; + } + debug("node %p update %d ready\n", node, node->ready_in); +} + static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph) { bool empty; diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h index 405c51bc8..bff4fc24f 100644 --- a/spa/include/spa/graph-scheduler3.h +++ b/spa/include/spa/graph-scheduler3.h @@ -28,7 +28,6 @@ extern "C" { struct spa_graph_scheduler { struct spa_graph *graph; - struct spa_list pending; struct spa_graph_node *node; }; @@ -36,7 +35,6 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, struct spa_graph *graph) { sched->graph = graph; - spa_list_init(&sched->pending); sched->node = NULL; } @@ -67,8 +65,11 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s node->ready_in = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - struct spa_graph_port *pport = p->peer; - struct spa_graph_node *pnode = pport->node; + struct spa_graph_port *pport; + struct spa_graph_node *pnode; + if ((pport = p->peer) == NULL) + continue; + pnode = pport->node; debug("node %p peer %p io %d\n", node, pnode, pport->io->status); if (pport->io->status == SPA_RESULT_NEED_BUFFER) { spa_list_insert(ready.prev, &pnode->ready_link); @@ -102,7 +103,8 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s if (node->state == SPA_RESULT_HAVE_BUFFER) { spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { if (p->io->status == SPA_RESULT_HAVE_BUFFER) - p->peer->node->ready_in++; + if (p->peer) + p->peer->node->ready_in++; } } } @@ -125,8 +127,11 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s spa_list_init(&ready); spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { - struct spa_graph_port *pport = p->peer; - struct spa_graph_node *pnode = pport->node; + struct spa_graph_port *pport; + struct spa_graph_node *pnode; + if ((pport = p->peer) == NULL) + continue; + pnode = pport->node; if (pport->io->status == SPA_RESULT_HAVE_BUFFER) pnode->ready_in++; @@ -161,7 +166,8 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s node->ready_in = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - p->peer->node->ready_in++; + if (p->peer) + p->peer->node->ready_in++; } } } diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index 4a8e9749f..9283cad05 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -40,7 +40,6 @@ struct spa_graph_port; struct spa_graph { struct spa_list nodes; - struct spa_list ready; }; typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node); @@ -76,7 +75,6 @@ struct spa_graph_port { static inline void spa_graph_init(struct spa_graph *graph) { spa_list_init(&graph->nodes); - spa_list_init(&graph->ready); } static inline void @@ -96,36 +94,6 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node, debug("node %p add\n", node); } -static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port) -{ - struct spa_graph_node *node = port->node; - - if (port->io->status == SPA_RESULT_HAVE_BUFFER) - node->ready_in++; - - debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); - - if (node->required_in > 0 && node->ready_in == node->required_in) { - node->action = SPA_GRAPH_ACTION_IN; - if (node->ready_link.next == NULL) - spa_list_insert(graph->ready.prev, &node->ready_link); - } else if (node->ready_link.next) { - spa_list_remove(&node->ready_link); - node->ready_link.next = NULL; - } -} - -static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) { - struct spa_graph_port *p; - - node->ready_in = 0; - spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - node->ready_in++; - } - debug("node %p update %d ready\n", node, node->ready_in); -} - static inline void spa_graph_port_add(struct spa_graph *graph, struct spa_graph_node *node, @@ -135,7 +103,7 @@ spa_graph_port_add(struct spa_graph *graph, uint32_t flags, struct spa_port_io *io) { - debug("port %p add %d to node %p \n", port, direction, node); + debug("port %p add type %d id %d to node %p \n", port, direction, port_id, node); port->node = node; port->direction = direction; port->port_id = port_id; @@ -145,7 +113,6 @@ spa_graph_port_add(struct spa_graph *graph, node->max_in++; if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT) node->required_in++; - spa_graph_port_check(graph, port); } static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node) diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index 525c13f21..c0c482500 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -256,6 +256,8 @@ static int impl_node_add_port(struct spa_node *node, enum spa_direction directio if (this->last_port <= port_id) this->last_port = port_id + 1; + spa_log_info(this->log, NAME " %p: add port %d", this, port_id); + return SPA_RESULT_OK; } @@ -289,6 +291,7 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3 this->last_port = i + 1; } + spa_log_info(this->log, NAME " %p: remove port %d", this, port_id); return SPA_RESULT_OK; } @@ -414,11 +417,11 @@ impl_node_port_set_format(struct spa_node *node, this->copy = this->ops.copy[CONV_F32_F32]; this->add = this->ops.add[CONV_F32_F32]; } - } if (!port->have_format) { this->n_formats++; port->have_format = true; + spa_log_info(this->log, NAME " %p: set format on port %d", this, port_id); } }