link: improve negotiation

Separate negotiation and activation steps of the link.
Keep track of the pending amount of links to negotiate before activating
a node.
This commit is contained in:
Wim Taymans 2018-07-02 12:14:50 +02:00
parent 0122e15611
commit c315c95e55
2 changed files with 49 additions and 66 deletions

View file

@ -39,7 +39,7 @@
struct impl { struct impl {
struct pw_link this; struct pw_link this;
bool active; bool prepare;
bool have_io; bool have_io;
bool activated; bool activated;
bool passive; bool passive;
@ -68,6 +68,8 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state,
enum pw_link_state old = link->state; enum pw_link_state old = link->state;
if (state != old) { if (state != old) {
struct pw_node *in = link->input->node, *out = link->output->node;
pw_log_debug("link %p: update state %s -> %s (%s)", link, pw_log_debug("link %p: update state %s -> %s (%s)", link,
pw_link_state_as_string(old), pw_link_state_as_string(state), error); pw_link_state_as_string(old), pw_link_state_as_string(state), error);
@ -76,7 +78,27 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state,
free(link->error); free(link->error);
link->error = error; link->error = error;
spa_hook_list_call(&link->listener_list, struct pw_link_events, state_changed, old, state, error); spa_hook_list_call(&link->listener_list, struct pw_link_events,
state_changed, old, state, error);
if (old != PW_LINK_STATE_PAUSED && state == PW_LINK_STATE_PAUSED) {
pw_log_debug("link %p: %d %d %d %d", link,
out->n_ready_output_links, out->n_used_output_links,
in->n_ready_input_links, in->n_used_input_links);
if (++out->n_ready_output_links == out->n_used_output_links)
pw_node_set_state(out, PW_NODE_STATE_RUNNING);
if (++in->n_ready_input_links == in->n_used_input_links)
pw_node_set_state(in, PW_NODE_STATE_RUNNING);
}
else if (old == PW_LINK_STATE_PAUSED && state < PW_LINK_STATE_PAUSED) {
if (--out->n_ready_output_links == 0)
pw_node_set_state(out, PW_NODE_STATE_IDLE);
if (--in->n_ready_input_links == 0)
pw_node_set_state(in, PW_NODE_STATE_IDLE);
}
} }
} }
@ -104,18 +126,6 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
} }
} }
static void complete_streaming(void *obj, void *data, int res, uint32_t id)
{
struct pw_port *port = data;
if (SPA_RESULT_IS_OK(res)) {
port->state = PW_PORT_STATE_STREAMING;
pw_log_debug("port %p: state STREAMING", port);
} else {
port->state = PW_PORT_STATE_ERROR;
pw_log_warn("port %p: failed to go to STREAMING", port);
}
}
static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_state) static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_state)
{ {
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
@ -800,55 +810,23 @@ do_activate_link(struct spa_loop *loop,
return 0; return 0;
} }
static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state) int pw_link_activate(struct pw_link *this)
{ {
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
char *error = NULL;
int res;
struct pw_port *input, *output;
if (in_state < PW_PORT_STATE_PAUSED || out_state < PW_PORT_STATE_PAUSED) pw_log_debug("link %p: activate %d %d", this, impl->activated, this->state);
if (impl->activated)
return 0; return 0;
pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); pw_link_prepare(this);
input = this->input; if (this->state == PW_LINK_STATE_PAUSED) {
output = this->output; pw_loop_invoke(this->output->node->data_loop,
if (!impl->activated) {
pw_loop_invoke(output->node->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, this); do_activate_link, SPA_ID_INVALID, NULL, 0, false, this);
impl->activated = true; impl->activated = true;
} }
if (in_state == PW_PORT_STATE_PAUSED) {
if ((res = pw_node_set_state(input->node, PW_NODE_STATE_RUNNING)) < 0) {
asprintf(&error, "error starting input node: %d", res);
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, input->node, res, complete_streaming, input);
else
complete_streaming(input->node, input, res, 0);
}
if (out_state == PW_PORT_STATE_PAUSED) {
if ((res = pw_node_set_state(output->node, PW_NODE_STATE_RUNNING)) < 0) {
asprintf(&error, "error starting output node: %d", res);
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, output->node, res, complete_streaming, output);
else
complete_streaming(output->node, output, res, 0);
}
return 0; return 0;
error:
pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
return res;
} }
static int check_states(struct pw_link *this, void *user_data, int res) static int check_states(struct pw_link *this, void *user_data, int res)
@ -880,8 +858,8 @@ static int check_states(struct pw_link *this, void *user_data, int res)
return -EIO; return -EIO;
} }
if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) { if (in_state == PW_PORT_STATE_PAUSED && out_state == PW_PORT_STATE_PAUSED) {
pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL); pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
return 0; return 0;
} }
@ -891,9 +869,6 @@ static int check_states(struct pw_link *this, void *user_data, int res)
if ((res = do_allocation(this, in_state, out_state)) != 0) if ((res = do_allocation(this, in_state, out_state)) != 0)
goto exit; goto exit;
if ((res = do_start(this, in_state, out_state)) != 0)
goto exit;
exit: exit:
if (SPA_RESULT_IS_ERROR(res)) { if (SPA_RESULT_IS_ERROR(res)) {
pw_log_debug("link %p: got error result %d", this, res); pw_log_debug("link %p: got error result %d", this, res);
@ -998,16 +973,16 @@ static void output_port_destroy(void *data)
on_port_destroy(&impl->this, impl->this.output); on_port_destroy(&impl->this, impl->this.output);
} }
int pw_link_activate(struct pw_link *this) int pw_link_prepare(struct pw_link *this)
{ {
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_debug("link %p: activate %d", this, impl->active); pw_log_debug("link %p: prepare %d", this, impl->prepare);
if (impl->active) if (impl->prepare)
return 0; return 0;
impl->active = true; impl->prepare = true;
this->output->node->n_used_output_links++; this->output->node->n_used_output_links++;
this->input->node->n_used_input_links++; this->input->node->n_used_input_links++;
@ -1022,6 +997,7 @@ int pw_link_activate(struct pw_link *this)
return 0; return 0;
} }
static int static int
do_deactivate_link(struct spa_loop *loop, do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -1044,12 +1020,12 @@ int pw_link_deactivate(struct pw_link *this)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_node *input_node, *output_node; struct pw_node *input_node, *output_node;
pw_log_debug("link %p: deactivate %d", this, impl->active); pw_log_debug("link %p: deactivate %d %d", this, impl->prepare, impl->activated);
if (!impl->active) if (!impl->prepare)
return 0; return 0;
impl->active = false; impl->prepare = false;
if (impl->activated) { if (impl->activated) {
pw_loop_invoke(this->output->node->data_loop, pw_loop_invoke(this->output->node->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this); do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this);
@ -1095,6 +1071,8 @@ int pw_link_deactivate(struct pw_link *this)
} }
this->output->state = PW_PORT_STATE_PAUSED; this->output->state = PW_PORT_STATE_PAUSED;
pw_link_update_state(this, PW_LINK_STATE_INIT, NULL);
return 0; return 0;
} }
@ -1364,7 +1342,7 @@ int pw_link_register(struct pw_link *link,
if ((input_node->n_used_input_links >= input_node->idle_used_input_links || if ((input_node->n_used_input_links >= input_node->idle_used_input_links ||
output_node->n_used_output_links >= output_node->idle_used_output_links) && output_node->n_used_output_links >= output_node->idle_used_output_links) &&
input_node->active && output_node->active && !impl->passive) input_node->active && output_node->active && !impl->passive)
pw_link_activate(link); pw_link_prepare(link);
return 0; return 0;
} }

View file

@ -278,11 +278,13 @@ struct pw_node {
struct pw_map input_port_map; /**< map from port_id to port */ struct pw_map input_port_map; /**< map from port_id to port */
uint32_t n_used_input_links; /**< number of active input links */ uint32_t n_used_input_links; /**< number of active input links */
uint32_t idle_used_input_links; /**< number of active input to be idle */ uint32_t idle_used_input_links; /**< number of active input to be idle */
uint32_t n_ready_input_links; /**< number of ready input links */
struct spa_list output_ports; /**< list of output ports */ struct spa_list output_ports; /**< list of output ports */
struct pw_map output_port_map; /**< map from port_id to port */ struct pw_map output_port_map; /**< map from port_id to port */
uint32_t n_used_output_links; /**< number of active output links */ uint32_t n_used_output_links; /**< number of active output links */
uint32_t idle_used_output_links; /**< number of active output to be idle */ uint32_t idle_used_output_links; /**< number of active output to be idle */
uint32_t n_ready_output_links; /**< number of ready output links */
struct spa_hook_list listener_list; struct spa_hook_list listener_list;
@ -622,6 +624,9 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver);
void pw_node_process(struct pw_node *node, int status); void pw_node_process(struct pw_node *node, int status);
/** Prepare a link \memberof pw_link
* Starts the negotiation of formats and buffers on \a link */
int pw_link_prepare(struct pw_link *link);
/** starts streaming on a link */ /** starts streaming on a link */
int pw_link_activate(struct pw_link *link); int pw_link_activate(struct pw_link *link);