From c315c95e5593bdb9787e7f5769345e78bf213e90 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 2 Jul 2018 12:14:50 +0200 Subject: [PATCH] link: improve negotiation Separate negotiation and activation steps of the link. Keep track of the pending amount of links to negotiate before activating a node. --- src/pipewire/link.c | 110 +++++++++++++++++------------------------ src/pipewire/private.h | 5 ++ 2 files changed, 49 insertions(+), 66 deletions(-) diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 28258df48..e3695fc93 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -39,7 +39,7 @@ struct impl { struct pw_link this; - bool active; + bool prepare; bool have_io; bool activated; bool passive; @@ -68,6 +68,8 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state, enum pw_link_state old = link->state; if (state != old) { + struct pw_node *in = link->input->node, *out = link->output->node; + pw_log_debug("link %p: update state %s -> %s (%s)", link, pw_link_state_as_string(old), pw_link_state_as_string(state), error); @@ -76,7 +78,27 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state, free(link->error); link->error = error; - spa_hook_list_call(&link->listener_list, struct pw_link_events, state_changed, old, state, error); + spa_hook_list_call(&link->listener_list, struct pw_link_events, + state_changed, old, state, error); + + if (old != PW_LINK_STATE_PAUSED && state == PW_LINK_STATE_PAUSED) { + + pw_log_debug("link %p: %d %d %d %d", link, + out->n_ready_output_links, out->n_used_output_links, + in->n_ready_input_links, in->n_used_input_links); + + if (++out->n_ready_output_links == out->n_used_output_links) + pw_node_set_state(out, PW_NODE_STATE_RUNNING); + if (++in->n_ready_input_links == in->n_used_input_links) + pw_node_set_state(in, PW_NODE_STATE_RUNNING); + } + else if (old == PW_LINK_STATE_PAUSED && state < PW_LINK_STATE_PAUSED) { + if (--out->n_ready_output_links == 0) + pw_node_set_state(out, PW_NODE_STATE_IDLE); + if (--in->n_ready_input_links == 0) + pw_node_set_state(in, PW_NODE_STATE_IDLE); + } + } } @@ -104,18 +126,6 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) } } -static void complete_streaming(void *obj, void *data, int res, uint32_t id) -{ - struct pw_port *port = data; - if (SPA_RESULT_IS_OK(res)) { - port->state = PW_PORT_STATE_STREAMING; - pw_log_debug("port %p: state STREAMING", port); - } else { - port->state = PW_PORT_STATE_ERROR; - pw_log_warn("port %p: failed to go to STREAMING", port); - } -} - static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_state) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); @@ -800,55 +810,23 @@ do_activate_link(struct spa_loop *loop, return 0; } -static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state) +int pw_link_activate(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - char *error = NULL; - int res; - struct pw_port *input, *output; - if (in_state < PW_PORT_STATE_PAUSED || out_state < PW_PORT_STATE_PAUSED) + pw_log_debug("link %p: activate %d %d", this, impl->activated, this->state); + + if (impl->activated) return 0; - pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); + pw_link_prepare(this); - input = this->input; - output = this->output; - - if (!impl->activated) { - pw_loop_invoke(output->node->data_loop, + if (this->state == PW_LINK_STATE_PAUSED) { + pw_loop_invoke(this->output->node->data_loop, do_activate_link, SPA_ID_INVALID, NULL, 0, false, this); impl->activated = true; } - - - if (in_state == PW_PORT_STATE_PAUSED) { - if ((res = pw_node_set_state(input->node, PW_NODE_STATE_RUNNING)) < 0) { - asprintf(&error, "error starting input node: %d", res); - goto error; - } - - if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, input->node, res, complete_streaming, input); - else - complete_streaming(input->node, input, res, 0); - } - if (out_state == PW_PORT_STATE_PAUSED) { - if ((res = pw_node_set_state(output->node, PW_NODE_STATE_RUNNING)) < 0) { - asprintf(&error, "error starting output node: %d", res); - goto error; - } - - if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, output->node, res, complete_streaming, output); - else - complete_streaming(output->node, output, res, 0); - } return 0; - - error: - pw_link_update_state(this, PW_LINK_STATE_ERROR, error); - return res; } static int check_states(struct pw_link *this, void *user_data, int res) @@ -880,8 +858,8 @@ static int check_states(struct pw_link *this, void *user_data, int res) return -EIO; } - if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) { - pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL); + if (in_state == PW_PORT_STATE_PAUSED && out_state == PW_PORT_STATE_PAUSED) { + pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); return 0; } @@ -891,9 +869,6 @@ static int check_states(struct pw_link *this, void *user_data, int res) if ((res = do_allocation(this, in_state, out_state)) != 0) goto exit; - if ((res = do_start(this, in_state, out_state)) != 0) - goto exit; - exit: if (SPA_RESULT_IS_ERROR(res)) { pw_log_debug("link %p: got error result %d", this, res); @@ -998,16 +973,16 @@ static void output_port_destroy(void *data) on_port_destroy(&impl->this, impl->this.output); } -int pw_link_activate(struct pw_link *this) +int pw_link_prepare(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - pw_log_debug("link %p: activate %d", this, impl->active); + pw_log_debug("link %p: prepare %d", this, impl->prepare); - if (impl->active) + if (impl->prepare) return 0; - impl->active = true; + impl->prepare = true; this->output->node->n_used_output_links++; this->input->node->n_used_input_links++; @@ -1022,6 +997,7 @@ int pw_link_activate(struct pw_link *this) return 0; } + static int do_deactivate_link(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) @@ -1044,12 +1020,12 @@ int pw_link_deactivate(struct pw_link *this) struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_node *input_node, *output_node; - pw_log_debug("link %p: deactivate %d", this, impl->active); + pw_log_debug("link %p: deactivate %d %d", this, impl->prepare, impl->activated); - if (!impl->active) + if (!impl->prepare) return 0; - impl->active = false; + impl->prepare = false; if (impl->activated) { pw_loop_invoke(this->output->node->data_loop, do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this); @@ -1095,6 +1071,8 @@ int pw_link_deactivate(struct pw_link *this) } this->output->state = PW_PORT_STATE_PAUSED; + pw_link_update_state(this, PW_LINK_STATE_INIT, NULL); + return 0; } @@ -1364,7 +1342,7 @@ int pw_link_register(struct pw_link *link, if ((input_node->n_used_input_links >= input_node->idle_used_input_links || output_node->n_used_output_links >= output_node->idle_used_output_links) && input_node->active && output_node->active && !impl->passive) - pw_link_activate(link); + pw_link_prepare(link); return 0; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 9e4ddfa23..b9658bfdb 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -278,11 +278,13 @@ struct pw_node { struct pw_map input_port_map; /**< map from port_id to port */ uint32_t n_used_input_links; /**< number of active input links */ uint32_t idle_used_input_links; /**< number of active input to be idle */ + uint32_t n_ready_input_links; /**< number of ready input links */ struct spa_list output_ports; /**< list of output ports */ struct pw_map output_port_map; /**< map from port_id to port */ uint32_t n_used_output_links; /**< number of active output links */ uint32_t idle_used_output_links; /**< number of active output to be idle */ + uint32_t n_ready_output_links; /**< number of ready output links */ struct spa_hook_list listener_list; @@ -622,6 +624,9 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver); void pw_node_process(struct pw_node *node, int status); +/** Prepare a link \memberof pw_link + * Starts the negotiation of formats and buffers on \a link */ +int pw_link_prepare(struct pw_link *link); /** starts streaming on a link */ int pw_link_activate(struct pw_link *link);