link: track active state of nodes

Keep track of the node active state, when both are active, we can
prepare (negotiate) the link.

After a link has been prepared we can activate it. When we deactivate
the link, we don't need to prepare again.

When a port loses buffers or format, set the link back to the
unprepared state.

This fixes the case where:
 1) a node becomes inactive and goes to suspend, the link becomes
    unprepared
 2) the node becomes active again and need to be prepared again
This commit is contained in:
Wim Taymans 2020-04-28 11:45:52 +02:00
parent 68268a0116
commit f5e4a6b225
2 changed files with 57 additions and 46 deletions

View file

@ -47,7 +47,6 @@
struct impl {
struct pw_impl_link this;
unsigned int prepare:1;
unsigned int io_set:1;
unsigned int activated:1;
@ -117,9 +116,11 @@ static void pw_impl_link_update_state(struct pw_impl_link *link, enum pw_link_st
if (old != PW_LINK_STATE_PAUSED && state == PW_LINK_STATE_PAUSED) {
link->prepared = true;
link->preparing = false;
pw_context_recalc_graph(link->context, "link prepared");
} else if (old == PW_LINK_STATE_PAUSED && state < PW_LINK_STATE_PAUSED) {
link->prepared = false;
link->preparing = false;
pw_context_recalc_graph(link->context, "link unprepared");
}
}
@ -513,32 +514,30 @@ int pw_impl_link_activate(struct pw_impl_link *this)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
int res;
pw_log_debug(NAME" %p: activate %d %d", this, impl->activated, this->info.state);
pw_log_debug(NAME" %p: activate activated:%d state:%s", this, impl->activated,
pw_link_state_as_string(this->info.state));
if (impl->activated)
if (impl->activated || !this->prepared)
return 0;
pw_impl_link_prepare(this);
if (!impl->io_set) {
if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.out_mix,
impl->output_destroyed)) < 0)
return res;
if (this->prepared) {
if (!impl->io_set) {
if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.out_mix,
impl->output_destroyed)) < 0)
return res;
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.in_mix,
impl->input_destroyed)) < 0)
return res;
impl->io_set = true;
}
pw_loop_invoke(this->output->node->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, this);
impl->activated = true;
pw_log_debug(NAME" %p: activated %d", this, impl->activated);
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.in_mix,
impl->input_destroyed)) < 0)
return res;
impl->io_set = true;
}
pw_loop_invoke(this->output->node->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, this);
impl->activated = true;
pw_log_info("(%s) activated", this->name);
return 0;
}
static void check_states(void *obj, void *user_data, int res, uint32_t id)
@ -565,9 +564,9 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
if (output->node->info.state == PW_NODE_STATE_ERROR ||
input->node->info.state == PW_NODE_STATE_ERROR) {
pw_log_warn(NAME" %p: one of the nodes is in error out:%d in:%d", this,
output->node->info.state,
input->node->info.state);
pw_log_warn(NAME" %p: one of the nodes is in error out:%s in:%s", this,
pw_node_state_as_string(output->node->info.state),
pw_node_state_as_string(input->node->info.state));
return;
}
@ -674,12 +673,12 @@ int pw_impl_link_prepare(struct pw_impl_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_debug(NAME" %p: prepare %d", this, impl->prepare);
pw_log_debug(NAME" %p: prepare busy:%d", this, this->preparing);
if (impl->prepare)
if (this->preparing)
return 0;
impl->prepare = true;
this->preparing = true;
pw_work_queue_add(impl->work,
this, -EBUSY, (pw_work_func_t) check_states, this);
@ -687,7 +686,6 @@ int pw_impl_link_prepare(struct pw_impl_link *this)
return 0;
}
static int
do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -718,27 +716,22 @@ int pw_impl_link_deactivate(struct pw_impl_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_debug(NAME" %p: deactivate %d %d", this, impl->prepare, impl->activated);
pw_log_debug(NAME" %p: deactivate activated:%d", this, impl->activated);
if (!impl->prepare)
if (!impl->activated)
return 0;
impl->prepare = false;
this->prepared = false;
if (impl->activated) {
pw_loop_invoke(this->output->node->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this);
pw_loop_invoke(this->output->node->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this);
port_set_io(this, this->output, SPA_IO_Buffers, NULL, 0,
&this->rt.out_mix, impl->output_destroyed);
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0,
&this->rt.in_mix, impl->input_destroyed);
port_set_io(this, this->output, SPA_IO_Buffers, NULL, 0,
&this->rt.out_mix, impl->output_destroyed);
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0,
&this->rt.in_mix, impl->input_destroyed);
impl->io_set = false;
impl->activated = false;
}
pw_impl_link_update_state(this, PW_LINK_STATE_INIT, NULL);
impl->io_set = false;
impl->activated = false;
pw_log_info("(%s) deactivated", this->name);
return 0;
}
@ -772,11 +765,17 @@ error_resource:
static void port_state_changed(struct pw_impl_link *this, struct pw_impl_port *port, struct pw_impl_port *other,
enum pw_impl_port_state state, const char *error)
{
pw_log_debug(NAME" %p: port state %d", this, state);
switch (state) {
case PW_IMPL_PORT_STATE_ERROR:
pw_impl_link_update_state(this, PW_LINK_STATE_ERROR, error ? strdup(error) : NULL);
break;
default:
if (state < PW_IMPL_PORT_STATE_PAUSED) {
this->preparing = false;
this->prepared = false;
pw_impl_link_update_state(this, PW_LINK_STATE_INIT, NULL);
}
break;
}
}
@ -831,18 +830,29 @@ static void output_node_result(void *data, int seq, int res, uint32_t type, cons
struct pw_impl_port *port = impl->this.output;
pw_log_trace(NAME" %p: output port %p result seq:%d res:%d type:%u",
impl, port, seq, res, type);
node_result(impl, port, seq, res, type, result);
}
static void node_active_changed(void *data, bool active)
{
struct impl *impl = data;
struct pw_impl_link *this = &impl->this;
pw_log_debug(NAME" %p: input active:%d output active:%d", impl,
impl->inode->active, impl->onode->active);
if (impl->inode->active && impl->onode->active)
pw_impl_link_prepare(this);
}
static const struct pw_impl_node_events input_node_events = {
PW_VERSION_IMPL_NODE_EVENTS,
.result = input_node_result,
.active_changed = node_active_changed,
};
static const struct pw_impl_node_events output_node_events = {
PW_VERSION_IMPL_NODE_EVENTS,
.result = output_node_result,
.active_changed = node_active_changed,
};
static bool pw_impl_node_can_reach(struct pw_impl_node *output, struct pw_impl_node *input)

View file

@ -726,6 +726,7 @@ struct pw_impl_link {
unsigned int registered:1;
unsigned int feedback:1;
unsigned int preparing:1;
unsigned int prepared:1;
};