work on state changes

Make things work better after errors
This commit is contained in:
Wim Taymans 2017-04-25 13:00:09 +02:00
parent baada0a330
commit 8804980c8f
7 changed files with 116 additions and 68 deletions

View file

@ -75,6 +75,51 @@ pinos_link_update_state (PinosLink *link,
}
}
static void
complete_ready (void *obj,
void *data,
SpaResult res,
uint32_t id)
{
PinosPort *port = data;
if (SPA_RESULT_IS_OK (res)) {
port->state = SPA_PORT_STATE_READY;
pinos_log_debug ("port %p: state READY", port);
}
else
pinos_log_warn ("port %p: failed to go to READY", port);
}
static void
complete_paused (void *obj,
void *data,
SpaResult res,
uint32_t id)
{
PinosPort *port = data;
if (SPA_RESULT_IS_OK (res)) {
port->state = SPA_PORT_STATE_PAUSED;
pinos_log_debug ("port %p: state PAUSED", port);
}
else
pinos_log_warn ("port %p: failed to go to PAUSED", port);
}
static void
complete_streaming (void *obj,
void *data,
SpaResult res,
uint32_t id)
{
PinosPort *port = data;
if (SPA_RESULT_IS_OK (res)) {
port->state = SPA_PORT_STATE_STREAMING;
pinos_log_debug ("port %p: state STREAMING", port);
}
else
pinos_log_warn ("port %p: failed to go to STREAMING", port);
}
static SpaResult
do_negotiate (PinosLink *this, uint32_t in_state, uint32_t out_state)
{
@ -121,8 +166,7 @@ do_negotiate (PinosLink *this, uint32_t in_state, uint32_t out_state)
asprintf (&error, "error set output format: %d", res);
goto error;
}
this->output->state = SPA_PORT_STATE_READY;
pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, complete_ready, this->output);
}
if (in_state == SPA_PORT_STATE_CONFIGURE) {
pinos_log_debug ("link %p: doing set format on input", this);
@ -134,8 +178,7 @@ do_negotiate (PinosLink *this, uint32_t in_state, uint32_t out_state)
asprintf (&error, "error set input format: %d", res2);
goto error;
}
this->input->state = SPA_PORT_STATE_READY;
pinos_work_queue_add (impl->work, this->input->node, res2, NULL, NULL);
pinos_work_queue_add (impl->work, this->input->node, res2, complete_ready, this->input);
res = res2 != SPA_RESULT_OK ? res2 : res;
}
return res;
@ -507,8 +550,7 @@ do_allocation (PinosLink *this, uint32_t in_state, uint32_t out_state)
asprintf (&error, "error alloc output buffers: %d", res);
goto error;
}
this->output->state = SPA_PORT_STATE_PAUSED;
pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, complete_paused, this->output);
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = true;
@ -524,8 +566,7 @@ do_allocation (PinosLink *this, uint32_t in_state, uint32_t out_state)
asprintf (&error, "error alloc input buffers: %d", res);
goto error;
}
this->input->state = SPA_PORT_STATE_PAUSED;
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->input->node, res, complete_paused, this->input);
this->input->buffers = impl->buffers;
this->input->n_buffers = impl->n_buffers;
this->input->allocated = true;
@ -545,8 +586,7 @@ do_allocation (PinosLink *this, uint32_t in_state, uint32_t out_state)
asprintf (&error, "error use input buffers: %d", res);
goto error;
}
this->input->state = SPA_PORT_STATE_PAUSED;
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->input->node, res, complete_paused, this->input);
this->input->buffers = impl->buffers;
this->input->n_buffers = impl->n_buffers;
this->input->allocated = false;
@ -561,8 +601,7 @@ do_allocation (PinosLink *this, uint32_t in_state, uint32_t out_state)
asprintf (&error, "error use output buffers: %d", res);
goto error;
}
this->output->state = SPA_PORT_STATE_PAUSED;
pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, complete_paused, this->output);
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = false;
@ -601,13 +640,11 @@ do_start (PinosLink *this, uint32_t in_state, uint32_t out_state)
if (in_state == SPA_PORT_STATE_PAUSED) {
res = pinos_node_set_state (this->input->node, PINOS_NODE_STATE_RUNNING);
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
this->input->state = SPA_PORT_STATE_STREAMING;
pinos_work_queue_add (impl->work, this->input->node, res, complete_streaming, this->input);
}
if (out_state == SPA_PORT_STATE_PAUSED) {
res = pinos_node_set_state (this->output->node, PINOS_NODE_STATE_RUNNING);
this->output->state = SPA_PORT_STATE_STREAMING;
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, complete_streaming, this->output);
}
}
return res;
@ -621,7 +658,6 @@ check_states (PinosLink *this,
PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this);
uint32_t in_state, out_state;
again:
if (this->state == PINOS_LINK_STATE_ERROR)
return SPA_RESULT_ERROR;
@ -646,14 +682,12 @@ again:
if ((res = do_start (this, in_state, out_state)) != SPA_RESULT_OK)
goto exit;
if (this->input->state != in_state)
goto again;
if (this->output->state != out_state)
goto again;
return SPA_RESULT_OK;
exit:
if (SPA_RESULT_IS_ERROR (res)) {
pinos_log_debug ("link %p: got error result %d", this, res);
return res;
}
pinos_work_queue_add (impl->work,
this,
SPA_RESULT_WAIT_SYNC,
@ -906,15 +940,16 @@ clear_port_buffers (PinosLink *link, PinosPort *port)
{
PinosLinkImpl *impl = SPA_CONTAINER_OF (link, PinosLinkImpl, this);
if (impl->buffer_owner != port) {
if (impl->buffer_owner != port && port->state > SPA_PORT_STATE_READY) {
pinos_log_debug ("link %p: clear buffers on port %p", link, port);
spa_node_port_use_buffers (port->node->node,
port->direction,
port->port_id,
NULL, 0);
port->state = SPA_PORT_STATE_READY;
port->buffers = NULL;
port->n_buffers = 0;
port->state = SPA_PORT_STATE_READY;
pinos_log_debug ("port %p: state READY", port);
}
}
@ -937,7 +972,7 @@ do_link_remove_done (SpaLoop *loop,
if (this->input->node->n_used_input_links == 0 &&
this->input->node->n_used_output_links == 0)
pinos_node_set_state (this->input->node, PINOS_NODE_STATE_IDLE);
pinos_node_update_state (this->input->node, PINOS_NODE_STATE_IDLE, NULL);
this->input = NULL;
}
@ -949,7 +984,7 @@ do_link_remove_done (SpaLoop *loop,
if (this->output->node->n_used_input_links == 0 &&
this->output->node->n_used_output_links == 0)
pinos_node_set_state (this->output->node, PINOS_NODE_STATE_IDLE);
pinos_node_update_state (this->output->node, PINOS_NODE_STATE_IDLE, NULL);
this->output = NULL;
}
@ -971,10 +1006,12 @@ do_link_remove (SpaLoop *loop,
PinosLink *this = user_data;
if (this->rt.input) {
pinos_port_pause_rt (this->rt.input);
spa_list_remove (&this->rt.input_link);
this->rt.input = NULL;
}
if (this->rt.output) {
pinos_port_pause_rt (this->rt.output);
spa_list_remove (&this->rt.output_link);
this->rt.output = NULL;
}

View file

@ -165,6 +165,9 @@ pause_node (PinosNode *this)
{
SpaResult res;
if (this->state <= PINOS_NODE_STATE_IDLE)
return SPA_RESULT_OK;
pinos_log_debug ("node %p: pause node", this);
{
SpaCommand cmd = SPA_COMMAND_INIT (this->core->type.command_node.Pause);
@ -649,6 +652,7 @@ do_node_remove (SpaLoop *loop,
spa_list_for_each_safe (port, tmp, &this->input_ports, link) {
PinosLink *link, *tlink;
spa_list_for_each_safe (link, tlink, &port->rt.links, rt.input_link) {
pinos_port_pause_rt (link->rt.input);
spa_list_remove (&link->rt.input_link);
link->rt.input = NULL;
}
@ -656,6 +660,7 @@ do_node_remove (SpaLoop *loop,
spa_list_for_each_safe (port, tmp, &this->output_ports, link) {
PinosLink *link, *tlink;
spa_list_for_each_safe (link, tlink, &port->rt.links, rt.output_link) {
pinos_port_pause_rt (link->rt.output);
spa_list_remove (&link->rt.output_link);
link->rt.output = NULL;
}

View file

@ -199,18 +199,22 @@ no_mem:
return NULL;
}
static SpaResult
pinos_port_pause (PinosPort *port)
SpaResult
pinos_port_pause_rt (PinosPort *port)
{
SpaCommand cmd = SPA_COMMAND_INIT (port->node->core->type.command_node.Pause);
SpaResult res;
if (port->state <= SPA_PORT_STATE_PAUSED)
return SPA_RESULT_OK;
port->state = SPA_PORT_STATE_PAUSED;
return spa_node_port_send_command (port->node->node,
res = spa_node_port_send_command (port->node->node,
port->direction,
port->port_id,
&cmd);
port->state = SPA_PORT_STATE_PAUSED;
pinos_log_debug ("port %p: state PAUSED", port);
return res;
}
static SpaResult
@ -240,15 +244,16 @@ do_remove_link_done (SpaLoop *loop,
}
}
if (!port->allocated) {
if (!port->allocated && port->state > SPA_PORT_STATE_READY) {
pinos_log_debug ("port %p: clear buffers on port", port);
spa_node_port_use_buffers (port->node->node,
port->direction,
port->port_id,
NULL, 0);
port->state = SPA_PORT_STATE_READY;
port->buffers = NULL;
port->n_buffers = 0;
port->state = SPA_PORT_STATE_READY;
pinos_log_debug ("port %p: state READY", port);
}
if (node->n_used_output_links == 0 &&
@ -273,9 +278,11 @@ do_remove_link (SpaLoop *loop,
SpaResult res;
if (port->direction == PINOS_DIRECTION_INPUT) {
pinos_port_pause_rt (link->rt.input);
spa_list_remove (&link->rt.input_link);
link->rt.input = NULL;
} else {
pinos_port_pause_rt (link->rt.output);
spa_list_remove (&link->rt.output_link);
link->rt.output = NULL;
}
@ -317,15 +324,18 @@ do_clear_buffers_done (SpaLoop *loop,
PinosPort *port = user_data;
SpaResult res;
pinos_log_debug ("port %p: clear buffers finish", port);
if (port->state <= SPA_PORT_STATE_READY)
return SPA_RESULT_OK;
pinos_log_debug ("port %p: clear buffers finish", port);
res = spa_node_port_use_buffers (port->node->node,
port->direction,
port->port_id,
NULL, 0);
port->state = SPA_PORT_STATE_READY;
port->buffers = NULL;
port->n_buffers = 0;
port->state = SPA_PORT_STATE_READY;
pinos_log_debug ("port %p: state READY", port);
return res;
}
@ -342,7 +352,7 @@ do_clear_buffers (SpaLoop *loop,
PinosNode *node = port->node;
SpaResult res;
pinos_port_pause (port);
pinos_port_pause_rt (port);
res = pinos_loop_invoke (node->core->main_loop->loop,
do_clear_buffers_done,

View file

@ -74,6 +74,7 @@ PinosLink * pinos_port_link (PinosPort *output
SpaResult pinos_port_unlink (PinosPort *port,
PinosLink *link);
SpaResult pinos_port_pause_rt (PinosPort *port);
SpaResult pinos_port_clear_buffers (PinosPort *port);

View file

@ -70,7 +70,7 @@ process_work_queue (SpaLoopUtils *utils,
spa_list_remove (&item->link);
if (item->func) {
pinos_log_debug ("work-queue %p: process work item %p %d", this, item->obj, item->seq);
pinos_log_debug ("work-queue %p: process work item %p %d %d", this, item->obj, item->seq, item->res);
item->func (item->obj, item->data, item->res, item->id);
}
spa_list_insert (impl->free_list.prev, &item->link);

View file

@ -235,15 +235,11 @@ do_pause_done (SpaLoop *loop,
void *user_data)
{
SpaV4l2Source *this = user_data;
SpaV4l2State *state = &this->state[0];
SpaEventNodeAsyncComplete *ac = data;
if (SPA_RESULT_IS_OK (ac->body.res.value))
ac->body.res.value = spa_v4l2_stream_off (this);
if (SPA_RESULT_IS_OK (ac->body.res.value)) {
state->started = false;
}
this->event_cb (&this->node, (SpaEvent *)ac, this->user_data);
return SPA_RESULT_OK;
@ -288,12 +284,8 @@ do_start_done (SpaLoop *loop,
void *user_data)
{
SpaV4l2Source *this = user_data;
SpaV4l2State *state = &this->state[0];
SpaEventNodeAsyncComplete *ac = data;
if (SPA_RESULT_IS_OK (ac->body.res.value)) {
state->started = true;
}
this->event_cb (&this->node, (SpaEvent *)ac, this->user_data);
return SPA_RESULT_OK;
@ -351,9 +343,6 @@ spa_v4l2_source_node_send_command (SpaNode *node,
if (state->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
if (state->started)
return SPA_RESULT_OK;
if ((res = spa_v4l2_stream_on (this)) < 0)
return res;
@ -373,9 +362,6 @@ spa_v4l2_source_node_send_command (SpaNode *node,
if (state->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
if (!state->started)
return SPA_RESULT_OK;
return spa_loop_invoke (this->state[0].data_loop,
do_pause,
++this->seq,

View file

@ -138,6 +138,21 @@ spa_v4l2_clear_buffers (SpaV4l2Source *this)
return SPA_RESULT_OK;
}
static SpaResult
spa_v4l2_port_set_enabled (SpaV4l2Source *this, bool enabled)
{
SpaV4l2State *state = &this->state[0];
if (state->source_enabled != enabled) {
spa_log_info (state->log, "v4l2: enabled %d", enabled);
state->source_enabled = enabled;
if (enabled)
spa_loop_add_source (state->data_loop, &state->source);
else
spa_loop_remove_source (state->data_loop, &state->source);
}
return SPA_RESULT_OK;
}
static int
spa_v4l2_close (SpaV4l2Source *this)
{
@ -149,10 +164,9 @@ spa_v4l2_close (SpaV4l2Source *this)
if (state->n_buffers > 0)
return 0;
spa_log_info (state->log, "v4l2: close");
spa_v4l2_port_set_enabled (this, false);
if (state->source_enabled)
spa_loop_remove_source (state->data_loop, &state->source);
spa_log_info (state->log, "v4l2: close");
if (close(state->fd))
perror ("close");
@ -1210,25 +1224,16 @@ spa_v4l2_stream_on (SpaV4l2Source *this)
SpaV4l2State *state = &this->state[0];
enum v4l2_buf_type type;
if (state->started)
return SPA_RESULT_OK;
type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMON, &type) < 0) {
spa_log_error (this->log, "VIDIOC_STREAMON: %s", strerror (errno));
return SPA_RESULT_ERROR;
}
return SPA_RESULT_OK;
}
state->started = true;
static SpaResult
spa_v4l2_port_set_enabled (SpaV4l2Source *this, bool enabled)
{
SpaV4l2State *state = &this->state[0];
if (state->source_enabled != enabled) {
state->source_enabled = enabled;
if (enabled)
spa_loop_add_source (state->data_loop, &state->source);
else
spa_loop_remove_source (state->data_loop, &state->source);
}
return SPA_RESULT_OK;
}
@ -1239,6 +1244,9 @@ spa_v4l2_stream_off (SpaV4l2Source *this)
enum v4l2_buf_type type;
int i;
if (!state->started)
return SPA_RESULT_OK;
type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) {
spa_log_error (this->log, "VIDIOC_STREAMOFF: %s", strerror (errno));
@ -1252,6 +1260,7 @@ spa_v4l2_stream_off (SpaV4l2Source *this)
if (xioctl (state->fd, VIDIOC_QBUF, &b->v4l2_buffer) < 0)
spa_log_warn (this->log, "VIDIOC_QBUF: %s", strerror (errno));
}
state->started = false;
return SPA_RESULT_OK;
}