link: streamline state changes a little

port: add methods for things that can change the port state.
This commit is contained in:
Wim Taymans 2017-06-29 12:05:03 +02:00
parent 83e65c31ab
commit a1fdfdc818
4 changed files with 180 additions and 131 deletions

View file

@ -38,6 +38,8 @@
struct impl {
struct pw_link this;
bool active;
struct pw_work_queue *work;
struct spa_format *format_filter;
@ -160,34 +162,29 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
if (out_state == PW_PORT_STATE_CONFIGURE) {
pw_log_debug("link %p: doing set format on output", this);
if ((res = spa_node_port_set_format(this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port_id,
SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) {
if ((res = pw_port_set_format(this->output, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) {
asprintf(&error, "error set output format: %d", res);
goto error;
}
pw_work_queue_add(impl->work, this->output->node, res, complete_ready,
this->output);
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->output->node, res, complete_ready,
this->output);
}
if (in_state == PW_PORT_STATE_CONFIGURE) {
pw_log_debug("link %p: doing set format on input", this);
if ((res2 = spa_node_port_set_format(this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port_id,
SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) {
if ((res2 = pw_port_set_format(this->input, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) {
asprintf(&error, "error set input format: %d", res2);
goto error;
}
pw_work_queue_add(impl->work, this->input->node, res2, complete_ready, this->input);
res = res2 != SPA_RESULT_OK ? res2 : res;
if (SPA_RESULT_IS_ASYNC(res2))
pw_work_queue_add(impl->work, this->input->node, res2, complete_ready, this->input);
}
if (this->info.format)
free(this->info.format);
this->info.format = format;
return res;
return SPA_RESULT_OK;
error:
pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
@ -592,39 +589,27 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
}
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = spa_node_port_alloc_buffers(this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port_id,
params, n_params,
impl->buffers,
&impl->n_buffers)) < 0) {
if ((res = pw_port_alloc_buffers(this->output, params, n_params,
impl->buffers, &impl->n_buffers)) < 0) {
asprintf(&error, "error alloc output buffers: %d", res);
goto error;
}
pw_work_queue_add(impl->work, this->output->node, res, complete_paused,
this->output);
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = true;
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->output->node, res, complete_paused,
this->output);
this->output->buffer_mem = impl->buffer_mem;
impl->buffer_owner = this->output;
pw_log_debug("allocated %d buffers %p from output port", impl->n_buffers,
impl->buffers);
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = spa_node_port_alloc_buffers(this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port_id,
params, n_params,
impl->buffers,
&impl->n_buffers)) < 0) {
if ((res = pw_port_alloc_buffers(this->input, params, n_params,
impl->buffers, &impl->n_buffers)) < 0) {
asprintf(&error, "error alloc input buffers: %d", res);
goto error;
}
pw_work_queue_add(impl->work, this->input->node, res, complete_paused,
this->input);
this->input->buffers = impl->buffers;
this->input->n_buffers = impl->n_buffers;
this->input->allocated = true;
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->input->node, res, complete_paused,
this->input);
this->input->buffer_mem = impl->buffer_mem;
impl->buffer_owner = this->input;
pw_log_debug("allocated %d buffers %p from input port", impl->n_buffers,
@ -634,37 +619,26 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("using %d buffers %p on input port", impl->n_buffers, impl->buffers);
if ((res = spa_node_port_use_buffers(this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port_id,
impl->buffers, impl->n_buffers)) < 0) {
if ((res = pw_port_use_buffers(this->input, impl->buffers, impl->n_buffers)) < 0) {
asprintf(&error, "error use input buffers: %d", res);
goto error;
}
pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input);
this->input->buffers = impl->buffers;
this->input->n_buffers = impl->n_buffers;
this->input->allocated = false;
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input);
} else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("using %d buffers %p on output port", impl->n_buffers, impl->buffers);
if ((res = spa_node_port_use_buffers(this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port_id,
impl->buffers, impl->n_buffers)) < 0) {
if ((res = pw_port_use_buffers(this->output, impl->buffers, impl->n_buffers)) < 0) {
asprintf(&error, "error use output buffers: %d", res);
goto error;
}
pw_work_queue_add(impl->work, this->output->node, res, complete_paused,
this->output);
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = false;
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->output->node, res, complete_paused, this->output);
} else {
asprintf(&error, "no common buffer alloc found");
goto error;
}
return res;
return SPA_RESULT_OK;
error:
this->output->buffers = NULL;
@ -679,27 +653,43 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state)
{
int res = SPA_RESULT_OK;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
char *error = NULL;
int res;
if (in_state < PW_PORT_STATE_PAUSED || out_state < PW_PORT_STATE_PAUSED)
return SPA_RESULT_OK;
else if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) {
pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL);
} else {
pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
if (in_state == PW_PORT_STATE_PAUSED) {
res = pw_node_set_state(this->input->node, PW_NODE_STATE_RUNNING);
pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
if (in_state == PW_PORT_STATE_PAUSED) {
if ((res = pw_node_set_state(this->input->node, PW_NODE_STATE_RUNNING)) < 0) {
asprintf(&error, "error starting input node: %d", res);
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->input->node, res, complete_streaming,
this->input);
else
complete_streaming(this->input->node, this->input, res, 0);
}
if (out_state == PW_PORT_STATE_PAUSED) {
if ((res = pw_node_set_state(this->output->node, PW_NODE_STATE_RUNNING)) < 0) {
asprintf(&error, "error starting output node: %d", res);
goto error;
}
if (out_state == PW_PORT_STATE_PAUSED) {
res = pw_node_set_state(this->output->node, PW_NODE_STATE_RUNNING);
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, this->output->node, res, complete_streaming,
this->output);
}
else
complete_streaming(this->output->node, this->output, res, 0);
}
return SPA_RESULT_OK;
error:
pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
return res;
}
@ -721,11 +711,13 @@ static int check_states(struct pw_link *this, void *user_data, int res)
in_state = this->input->state;
out_state = this->output->state;
if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING)
return SPA_RESULT_OK;
pw_log_debug("link %p: input state %d, output state %d", this, in_state, out_state);
if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) {
pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL);
return SPA_RESULT_OK;
}
if ((res = do_negotiate(this, in_state, out_state)) != SPA_RESULT_OK)
goto exit;
@ -791,7 +783,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port)
impl->n_buffers = 0;
pw_log_debug("link %p: clear input allocated buffers on port %p", this, other);
pw_port_clear_buffers(other);
pw_port_use_buffers(other, NULL, 0);
}
pw_signal_emit(&this->port_unlinked, this, port);
@ -818,6 +810,11 @@ bool pw_link_activate(struct pw_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
if (impl->active)
return true;
impl->active = true;
pw_log_debug("link %p: activate", this);
pw_work_queue_add(impl->work,
this, SPA_RESULT_WAIT_SYNC, (pw_work_func_t) check_states, this);
@ -826,6 +823,8 @@ bool pw_link_activate(struct pw_link *this)
bool pw_link_deactivate(struct pw_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
impl->active = false;
return true;
}
@ -931,15 +930,8 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
if (impl->buffer_owner != port && port->state > PW_PORT_STATE_READY) {
pw_log_debug("link %p: clear buffers on port %p", link, port);
spa_node_port_use_buffers(port->node->node,
port->direction, port->port_id, NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
port->state = PW_PORT_STATE_READY;
pw_log_debug("port %p: state READY", port);
}
if (impl->buffer_owner != port)
pw_port_use_buffers(port, NULL, 0);
}
static int

View file

@ -91,9 +91,8 @@ static void update_port_ids(struct pw_node *node)
pw_log_debug("node %p: input port added %d", node, input_port_ids[i]);
np = pw_port_new(node, PW_DIRECTION_INPUT, input_port_ids[i]);
if ((res =
spa_node_port_set_io(node->node, SPA_DIRECTION_INPUT, np->port_id,
&np->io)) < 0)
if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_INPUT,
np->port_id, &np->io)) < 0)
pw_log_warn("node %p: can't set input IO %d", node, res);
spa_list_insert(ports, &np->link);
@ -133,9 +132,8 @@ static void update_port_ids(struct pw_node *node)
pw_log_debug("node %p: output port added %d", node, output_port_ids[i]);
np = pw_port_new(node, PW_DIRECTION_OUTPUT, output_port_ids[i]);
if ((res =
spa_node_port_set_io(node->node, SPA_DIRECTION_OUTPUT, np->port_id,
&np->io)) < 0)
if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_OUTPUT,
np->port_id, &np->io)) < 0)
pw_log_warn("node %p: can't set output IO %d", node, res);
spa_list_insert(ports, &np->link);
@ -198,29 +196,13 @@ static int suspend_node(struct pw_node *this)
pw_log_debug("node %p: suspend node", this);
spa_list_for_each(p, &this->input_ports, link) {
if ((res =
spa_node_port_set_format(this->node, SPA_DIRECTION_INPUT, p->port_id, 0,
NULL)) < 0)
if ((res = pw_port_set_format(p, 0, NULL)) < 0)
pw_log_warn("error unset format input: %d", res);
p->buffers = NULL;
p->n_buffers = 0;
if (p->allocated)
pw_memblock_free(&p->buffer_mem);
p->allocated = false;
p->state = PW_PORT_STATE_CONFIGURE;
}
spa_list_for_each(p, &this->output_ports, link) {
if ((res =
spa_node_port_set_format(this->node, SPA_DIRECTION_OUTPUT, p->port_id, 0,
NULL)) < 0)
if ((res = pw_port_set_format(p, 0, NULL)) < 0)
pw_log_warn("error unset format output: %d", res);
p->buffers = NULL;
p->n_buffers = 0;
if (p->allocated)
pw_memblock_free(&p->buffer_mem);
p->allocated = false;
p->state = PW_PORT_STATE_CONFIGURE;
}
return res;
}

View file

@ -68,6 +68,14 @@ void pw_port_destroy(struct pw_port *port)
free(port);
}
static void port_update_state(struct pw_port *port, enum pw_port_state state)
{
if (port->state != state) {
pw_log_debug("port %p: state %d -> %d", port, port->state, state);
port->state = state;
}
}
static int
do_add_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
@ -169,8 +177,7 @@ int pw_port_pause_rt(struct pw_port *port)
port->port_id,
&SPA_COMMAND_INIT(port->node->core->type.command_node.
Pause));
port->state = PW_PORT_STATE_PAUSED;
pw_log_debug("port %p: state PAUSED", port);
port_update_state (port, PW_PORT_STATE_PAUSED);
return res;
}
@ -204,6 +211,9 @@ int pw_port_unlink(struct pw_port *port, struct pw_link *link)
res = pw_loop_invoke(node->data_loop->loop,
do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, true, port);
if (port->state > PW_PORT_STATE_PAUSED)
port_update_state (port, PW_PORT_STATE_PAUSED);
pw_log_debug("port %p: finish unlink", port);
if (port->direction == PW_DIRECTION_OUTPUT) {
if (link->output) {
@ -219,49 +229,105 @@ int pw_port_unlink(struct pw_port *port, struct pw_link *link)
}
}
if (!port->allocated && port->state > PW_PORT_STATE_READY) {
pw_log_debug("port %p: clear buffers on port", port);
spa_node_port_use_buffers(node->node,
port->direction, port->port_id, NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
port->state = PW_PORT_STATE_READY;
pw_log_debug("port %p: state READY", port);
}
if (!port->allocated)
pw_port_use_buffers(port, NULL, 0);
if (node->n_used_output_links == 0 && node->n_used_input_links == 0) {
if (node->n_used_output_links == 0 && node->n_used_input_links == 0)
pw_node_update_state(node, PW_NODE_STATE_IDLE, NULL);
}
return res;
}
static int
do_clear_buffers(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
do_port_pause(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
return pw_port_pause_rt(port);
}
int pw_port_clear_buffers(struct pw_port *port)
int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format)
{
int res;
res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format);
pw_log_debug("port %p: set format %d", port, res);
if (!SPA_RESULT_IS_ASYNC(res)) {
if (format == NULL) {
port->buffers = NULL;
port->n_buffers = 0;
if (port->allocated)
pw_memblock_free(&port->buffer_mem);
port->allocated = false;
port_update_state (port, PW_PORT_STATE_CONFIGURE);
}
else {
port_update_state (port, PW_PORT_STATE_READY);
}
}
return res;
}
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers)
{
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
int res;
pw_log_debug("port %p: clear buffers", port);
res = pw_loop_invoke(port->node->data_loop->loop,
do_clear_buffers, impl->seq++, 0, NULL, true, port);
if (port->state <= PW_PORT_STATE_READY)
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
return SPA_RESULT_OK;
pw_log_debug("port %p: clear buffers finish", port);
res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
port->state = PW_PORT_STATE_READY;
pw_log_debug("port %p: state READY", port);
if (n_buffers > 0 && port->state < PW_PORT_STATE_READY)
return SPA_RESULT_NO_FORMAT;
if (port->state > PW_PORT_STATE_PAUSED) {
res = pw_loop_invoke(port->node->data_loop->loop,
do_port_pause, impl->seq++, 0, NULL, true, port);
port_update_state (port, PW_PORT_STATE_PAUSED);
}
pw_log_debug("port %p: use %d buffers", port, n_buffers);
res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers);
port->buffers = buffers;
port->n_buffers = n_buffers;
if (port->allocated)
pw_memblock_free(&port->buffer_mem);
port->allocated = false;
if (port->n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED);
return res;
}
int pw_port_alloc_buffers(struct pw_port *port,
struct spa_param **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers)
{
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
int res;
if (port->state < PW_PORT_STATE_READY)
return SPA_RESULT_NO_FORMAT;
if (port->state > PW_PORT_STATE_PAUSED) {
res = pw_loop_invoke(port->node->data_loop->loop,
do_port_pause, impl->seq++, 0, NULL, true, port);
port_update_state (port, PW_PORT_STATE_PAUSED);
}
pw_log_debug("port %p: alloc %d buffers", port, *n_buffers);
res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id,
params, n_params, buffers, n_buffers);
port->buffers = buffers;
port->n_buffers = *n_buffers;
port->allocated = true;
if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED);
return res;
}

View file

@ -102,12 +102,21 @@ pw_port_link(struct pw_port *output_port, /**< output port */
/** Unlink a port \memberof pw_port */
int pw_port_unlink(struct pw_port *port, struct pw_link *link);
/** Set a format on a port \memberof pw_port */
int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format);
/** Use buffers on a port \memberof pw_port */
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers);
/** Allocate memory for buffers on a port \memberof pw_port */
int pw_port_alloc_buffers(struct pw_port *port,
struct spa_param **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers);
/** Pause a port, should be called from data thread \memberof pw_port */
int pw_port_pause_rt(struct pw_port *port);
/** Clear the buffers on a port \memberof pw_port */
int pw_port_clear_buffers(struct pw_port *port);
#ifdef __cplusplus
}
#endif