From 2c76ec51937e38866290a710b1a88d16f2a37385 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 22 Apr 2020 17:09:42 +0200 Subject: [PATCH] context: simplify state changes Keep track of when a link is prepared, this is when the link has successfully negotiated a format and buffers. Only follow prepared links when collecting nodes in the graph. Set the state of the driver and its nodes based on how many active nodes the driver has. We don't have to do state changes on the nodes from the link anymore then and we can get rid of the counters. Only set the io on the mixer ports when prepared because we might need a special mixer element based on the format. Remove passive links for now. This fixes many cases where the graph would stall when linking/unlinking ports in various combinations. Fixes #221 --- src/pipewire/context.c | 27 +++++--- src/pipewire/impl-device.c | 1 + src/pipewire/impl-link.c | 126 +++++++------------------------------ src/pipewire/impl-node.c | 26 +++----- src/pipewire/private.h | 10 +-- 5 files changed, 52 insertions(+), 138 deletions(-) diff --git a/src/pipewire/context.c b/src/pipewire/context.c index 1c1693bfd..31383e2e8 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -772,7 +772,7 @@ static int collect_nodes(struct pw_impl_node *driver) spa_list_for_each(p, &n->input_ports, link) { spa_list_for_each(l, &p->links, input_link) { t = l->output->node; - if (!t->visited && t->active) { + if (l->prepared && !t->visited && t->active) { t->visited = true; spa_list_append(&queue, &t->sort_link); } @@ -781,7 +781,7 @@ static int collect_nodes(struct pw_impl_node *driver) spa_list_for_each(p, &n->output_ports, link) { spa_list_for_each(l, &p->links, output_link) { t = l->input->node; - if (!t->visited && t->active) { + if (l->prepared && !t->visited && t->active) { t->visited = true; spa_list_append(&queue, &t->sort_link); } @@ -810,7 +810,7 @@ int pw_context_recalc_graph(struct pw_context *context) * the unassigned nodes. */ target = NULL; spa_list_for_each(n, &context->driver_list, driver_link) { - uint32_t active_followers; + n->active_followers = 0; if (n->exported) continue; @@ -824,19 +824,18 @@ int pw_context_recalc_graph(struct pw_context *context) if (!n->master) continue; - active_followers = 0; spa_list_for_each(s, &n->follower_list, follower_link) { pw_log_debug(NAME" %p: driver %p: follower %p %s: %d", context, n, s, s->name, s->active); if (s != n && s->active) - active_followers++; + n->active_followers++; } pw_log_debug(NAME" %p: driver %p active followers %d", - context, n, active_followers); + context, n, n->active_followers); /* if the master has active followers, it is a target for our * unassigned nodes */ - if (active_followers > 0) { + if (n->active_followers > 0) { if (target == NULL) target = n; } @@ -871,9 +870,17 @@ int pw_context_recalc_graph(struct pw_context *context) /* assign final quantum and debug masters and followers */ spa_list_for_each(n, &context->driver_list, driver_link) { + enum pw_node_state state; + if (!n->master || n->exported) continue; + state = n->info.state; + if (n->active_followers > 0) + state = PW_NODE_STATE_RUNNING; + else if (state > PW_NODE_STATE_IDLE) + state = PW_NODE_STATE_IDLE; + if (n->rt.position && n->quantum_current != n->rt.position->clock.duration) { n->rt.position->clock.duration = n->quantum_current; pw_log_info("(%s-%u) new quantum:%u", @@ -882,9 +889,13 @@ int pw_context_recalc_graph(struct pw_context *context) pw_log_debug(NAME" %p: master %p quantum:%u '%s'", context, n, n->quantum_current, n->name); - spa_list_for_each(s, &n->follower_list, follower_link) + spa_list_for_each(s, &n->follower_list, follower_link) { pw_log_debug(NAME" %p: follower %p: active:%d '%s'", context, s, s->active, s->name); + if (s != n && s->active) + pw_impl_node_set_state(s, state); + } + pw_impl_node_set_state(n, state); } return 0; } diff --git a/src/pipewire/impl-device.c b/src/pipewire/impl-device.c index 01e89cd0e..5c4a4296f 100644 --- a/src/pipewire/impl-device.c +++ b/src/pipewire/impl-device.c @@ -140,6 +140,7 @@ struct pw_impl_device *pw_context_create_device(struct pw_context *context, } this = &impl->this; + this->name = strdup("device"); pw_log_debug(NAME" %p: new", this); if (properties == NULL) diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index 6bb3fd3d1..908cf081f 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -50,7 +50,6 @@ struct impl { unsigned int prepare:1; unsigned int io_set:1; unsigned int activated:1; - unsigned int passive:1; unsigned int output_destroyed:1; unsigned int input_destroyed:1; @@ -74,25 +73,6 @@ struct impl { /** \endcond */ -static void debug_link(struct pw_impl_link *link) -{ - struct pw_impl_node *in = link->input->node, *out = link->output->node; - - pw_log_debug(NAME" %p: %d %d %d out %d %d %d , %d %d %d in %d %d %d", link, - out->n_used_input_links, - out->n_ready_input_links, - out->idle_used_input_links, - out->n_used_output_links, - out->n_ready_output_links, - out->idle_used_output_links, - in->n_used_input_links, - in->n_ready_input_links, - in->idle_used_input_links, - in->n_used_output_links, - in->n_ready_output_links, - in->idle_used_output_links); -} - static void info_changed(struct pw_impl_link *link) { struct pw_resource *resource; @@ -109,7 +89,6 @@ static void info_changed(struct pw_impl_link *link) static void pw_impl_link_update_state(struct pw_impl_link *link, enum pw_link_state state, char *error) { enum pw_link_state old = link->info.state; - struct pw_impl_node *in = link->input->node, *out = link->output->node; if (state == old) return; @@ -136,24 +115,12 @@ static void pw_impl_link_update_state(struct pw_impl_link *link, enum pw_link_st link->info.change_mask |= PW_LINK_CHANGE_MASK_STATE; info_changed(link); - debug_link(link); - if (old != PW_LINK_STATE_PAUSED && state == PW_LINK_STATE_PAUSED) { - if (++out->n_ready_output_links == out->n_used_output_links && - out->n_ready_input_links == out->n_used_input_links) - pw_impl_node_set_state(out, PW_NODE_STATE_RUNNING); - if (++in->n_ready_input_links == in->n_used_input_links && - in->n_ready_output_links == in->n_used_output_links) - pw_impl_node_set_state(in, PW_NODE_STATE_RUNNING); - pw_impl_link_activate(link); - } - else if (old == PW_LINK_STATE_PAUSED && state < PW_LINK_STATE_PAUSED) { - if (--out->n_ready_output_links == 0 && - out->n_ready_input_links == 0) - pw_impl_node_set_state(out, PW_NODE_STATE_IDLE); - if (--in->n_ready_input_links == 0 && - in->n_ready_output_links == 0) - pw_impl_node_set_state(in, PW_NODE_STATE_IDLE); + link->prepared = true; + pw_context_recalc_graph(link->context); + } else if (old == PW_LINK_STATE_PAUSED && state < PW_LINK_STATE_PAUSED) { + link->prepared = false; + pw_context_recalc_graph(link->context); } } @@ -553,23 +520,24 @@ int pw_impl_link_activate(struct pw_impl_link *this) pw_impl_link_prepare(this); - if (!impl->io_set) { - if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io, - sizeof(struct spa_io_buffers), &this->rt.out_mix, - impl->output_destroyed)) < 0) - return res; + if (this->prepared) { + if (!impl->io_set) { + if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io, + sizeof(struct spa_io_buffers), &this->rt.out_mix, + impl->output_destroyed)) < 0) + return res; - if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io, - sizeof(struct spa_io_buffers), &this->rt.in_mix, - impl->input_destroyed)) < 0) - return res; - impl->io_set = true; - } - - if (this->info.state == PW_LINK_STATE_PAUSED) { + if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io, + sizeof(struct spa_io_buffers), &this->rt.in_mix, + impl->input_destroyed)) < 0) + return res; + impl->io_set = true; + } pw_loop_invoke(this->output->node->data_loop, do_activate_link, SPA_ID_INVALID, NULL, 0, false, this); + impl->activated = true; + pw_log_debug(NAME" %p: activated %d", this, impl->activated); } return 0; } @@ -713,16 +681,6 @@ int pw_impl_link_prepare(struct pw_impl_link *this) impl->prepare = true; - this->output->node->n_used_output_links++; - this->input->node->n_used_input_links++; - - if (impl->passive) { - this->output->node->idle_used_output_links++; - this->input->node->idle_used_input_links++; - } - - debug_link(this); - pw_work_queue_add(impl->work, this, -EBUSY, (pw_work_func_t) check_states, this); @@ -759,7 +717,6 @@ do_deactivate_link(struct spa_loop *loop, int pw_impl_link_deactivate(struct pw_impl_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - struct pw_impl_node *input_node, *output_node; pw_log_debug(NAME" %p: deactivate %d %d", this, impl->prepare, impl->activated); @@ -767,6 +724,7 @@ int pw_impl_link_deactivate(struct pw_impl_link *this) return 0; impl->prepare = false; + this->prepared = false; if (impl->activated) { pw_loop_invoke(this->output->node->data_loop, do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this); @@ -780,35 +738,6 @@ int pw_impl_link_deactivate(struct pw_impl_link *this) impl->activated = false; } - output_node = this->output->node; - input_node = this->input->node; - - output_node->n_used_output_links--; - input_node->n_used_input_links--; - - if (impl->passive) { - output_node->idle_used_output_links--; - input_node->idle_used_input_links--; - } - - debug_link(this); - - if (input_node->n_used_input_links <= input_node->idle_used_input_links && - input_node->n_used_output_links <= input_node->idle_used_output_links && - input_node->info.state > PW_NODE_STATE_IDLE) { - pw_impl_node_set_state(input_node, PW_NODE_STATE_IDLE); - pw_log_debug(NAME" %p: input port %p state %d -> %d", this, - this->input, this->input->state, PW_IMPL_PORT_STATE_PAUSED); - } - - if (output_node->n_used_input_links <= output_node->idle_used_input_links && - output_node->n_used_output_links <= output_node->idle_used_output_links && - output_node->info.state > PW_NODE_STATE_IDLE) { - pw_impl_node_set_state(output_node, PW_NODE_STATE_IDLE); - pw_log_debug(NAME" %p: output port %p state %d -> %d", this, - this->output, this->output->state, PW_IMPL_PORT_STATE_PAUSED); - } - pw_impl_link_update_state(this, PW_LINK_STATE_INIT, NULL); return 0; @@ -1050,7 +979,6 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, struct impl *impl; struct pw_impl_link *this; struct pw_impl_node *input_node, *output_node; - const char *str; int res; if (output == input) @@ -1094,10 +1022,6 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, this->output = output; this->input = input; - /* passive means that this link does not make the nodes active */ - if ((str = pw_properties_get(properties, PW_KEY_LINK_PASSIVE)) != NULL) - impl->passive = pw_properties_parse_bool(str); - spa_hook_list_init(&this->listener_list); impl->format_filter = format_filter; @@ -1111,8 +1035,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, input_node->live = output_node->live; - pw_log_debug(NAME" %p: output node %p live %d, passive %d, feedback %d", - this, output_node, output_node->live, impl->passive, this->feedback); + pw_log_debug(NAME" %p: output node %p live %d, feedback %d", + this, output_node, output_node->live, this->feedback); spa_list_append(&output->links, &this->output_link); spa_list_append(&input->links, &this->input_link); @@ -1261,11 +1185,7 @@ int pw_impl_link_register(struct pw_impl_link *link, pw_global_add_listener(link->global, &link->global_listener, &global_events, link); pw_global_register(link->global); - debug_link(link); - - if ((input_node->n_used_input_links >= input_node->idle_used_input_links || - output_node->n_used_output_links >= output_node->idle_used_output_links) && - input_node->active && output_node->active) + if (input_node->active && output_node->active) pw_impl_link_prepare(link); return 0; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 8ee90d606..72925c7c9 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -183,13 +183,7 @@ static int start_node(struct pw_impl_node *this) if (this->info.state >= PW_NODE_STATE_RUNNING) return 0; - pw_log_debug(NAME" %p: start node %d %d %d %d", this, this->n_ready_output_links, - this->n_used_output_links, this->n_ready_input_links, - this->n_used_input_links); - - if (this->n_ready_output_links != this->n_used_output_links || - this->n_ready_input_links != this->n_used_input_links) - return 0; + pw_log_debug(NAME" %p: start node", this); res = spa_node_send_command(this->node, &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Start)); @@ -984,6 +978,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, this = &impl->this; this->context = context; + this->name = strdup("node"); if (user_data_size > 0) this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); @@ -1812,10 +1807,8 @@ int pw_impl_node_set_state(struct pw_impl_node *node, enum pw_node_state state) pw_node_state_as_string(state), node->active); - if (old == state) - return 0; - - pw_impl_node_emit_state_request(node, state); + if (old != state) + pw_impl_node_emit_state_request(node, state); switch (state) { case PW_NODE_STATE_CREATING: @@ -1846,8 +1839,9 @@ int pw_impl_node_set_state(struct pw_impl_node *node, enum pw_node_state state) if (SPA_RESULT_IS_ASYNC(res)) { res = spa_node_sync(node->node, res); } - pw_work_queue_add(impl->work, - node, res, on_state_complete, SPA_INT_TO_PTR(state)); + if (old != state) + pw_work_queue_add(impl->work, + node, res, on_state_complete, SPA_INT_TO_PTR(state)); return res; } @@ -1860,15 +1854,9 @@ int pw_impl_node_set_active(struct pw_impl_node *node, bool active) if (old != active) { pw_log_debug(NAME" %p: %s", node, active ? "activate" : "deactivate"); - if (!active) - pw_impl_node_set_state(node, PW_NODE_STATE_IDLE); - node->active = active; pw_impl_node_emit_active_changed(node, active); - if (active) - node_activate(node); - if (node->registered) pw_context_recalc_graph(node->context); } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 629ae3caf..3464ebbdc 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -532,6 +532,7 @@ struct pw_impl_node { struct pw_impl_node *driver_node; struct spa_list follower_list; struct spa_list follower_link; + uint32_t active_followers; struct spa_list sort_link; /**< link used to sort nodes */ @@ -543,14 +544,6 @@ struct pw_impl_node { struct spa_list output_ports; /**< list of output ports */ struct pw_map output_port_map; /**< map from port_id to port */ - uint32_t n_used_input_links; /**< number of active input links */ - uint32_t idle_used_input_links; /**< number of active input to be idle */ - uint32_t n_ready_input_links; /**< number of ready input links */ - - uint32_t n_used_output_links; /**< number of active output links */ - uint32_t idle_used_output_links; /**< number of active output to be idle */ - uint32_t n_ready_output_links; /**< number of ready output links */ - struct spa_hook_list listener_list; struct pw_loop *data_loop; /**< the data loop for this node */ @@ -735,6 +728,7 @@ struct pw_impl_link { unsigned int registered:1; unsigned int feedback:1; + unsigned int prepared:1; }; #define pw_resource_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_resource_events, m, v, ##__VA_ARGS__)