Fix stream restart

Make sure we always send a clock update before sending a start command.
Fix memory size.
Follow the state of the node to do allocation
Use the pinos node to control the state
When doing STREAMOFF, all buffers will be dequeued. requeue them buffers
for when we go to playing again.
This commit is contained in:
Wim Taymans 2016-09-13 10:34:32 +02:00
parent e34ef88dac
commit 021eccb8ad
7 changed files with 107 additions and 63 deletions

View file

@ -812,7 +812,6 @@ handle_node_command (PinosStream *stream,
g_debug ("stream %p: start", stream); g_debug ("stream %p: start", stream);
control_builder_init (stream, &builder); control_builder_init (stream, &builder);
add_request_clock_update (stream, &builder);
if (priv->direction == PINOS_DIRECTION_INPUT) if (priv->direction == PINOS_DIRECTION_INPUT)
add_need_input (stream, &builder, 0); add_need_input (stream, &builder, 0);
add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING);

View file

@ -407,7 +407,7 @@ on_add_buffer (GObject *gobject,
if (mem->fd) { if (mem->fd) {
gmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (mem->fd), gmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (mem->fd),
d->mem.offset + d->mem.size, GST_FD_MEMORY_FLAG_NONE); mem->size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mem.offset, d->mem.size); gst_memory_resize (gmem, d->mem.offset, d->mem.size);
} else { } else {
gmem = gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset, gmem = gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset,

View file

@ -250,13 +250,13 @@ again:
} }
spa_debug_format (format); spa_debug_format (format);
spa_format_fixate (format); spa_format_fixate (format);
} else if (in_state == SPA_NODE_STATE_CONFIGURE) { } else if (in_state == SPA_NODE_STATE_CONFIGURE && out_state > SPA_NODE_STATE_CONFIGURE) {
/* only input needs format */ /* only input needs format */
if ((res = spa_node_port_get_format (this->output_node->node, this->output_port, (const SpaFormat **)&format)) < 0) { if ((res = spa_node_port_get_format (this->output_node->node, this->output_port, (const SpaFormat **)&format)) < 0) {
g_warning ("error get format output: %d", res); g_warning ("error get format output: %d", res);
goto error; goto error;
} }
} else if (out_state == SPA_NODE_STATE_CONFIGURE) { } else if (out_state == SPA_NODE_STATE_CONFIGURE && in_state > SPA_NODE_STATE_CONFIGURE) {
/* only output needs format */ /* only output needs format */
if ((res = spa_node_port_get_format (this->input_node->node, this->input_port, (const SpaFormat **)&format)) < 0) { if ((res = spa_node_port_get_format (this->input_node->node, this->input_port, (const SpaFormat **)&format)) < 0) {
g_warning ("error get format input: %d", res); g_warning ("error get format input: %d", res);
@ -297,7 +297,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
const SpaPortInfo *iinfo, *oinfo; const SpaPortInfo *iinfo, *oinfo;
SpaPortInfoFlags in_flags, out_flags; SpaPortInfoFlags in_flags, out_flags;
if (in_state < SPA_NODE_STATE_READY || out_state < SPA_NODE_STATE_READY) if (in_state != SPA_NODE_STATE_READY && out_state != SPA_NODE_STATE_READY)
return SPA_RESULT_OK; return SPA_RESULT_OK;
g_debug ("link %p: doing alloc buffers %p %p", this, this->output_node, this->input_node); g_debug ("link %p: doing alloc buffers %p %p", this, this->output_node, this->input_node);
@ -310,47 +310,51 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
g_warning ("error get port info: %d", res); g_warning ("error get port info: %d", res);
goto error; goto error;
} }
in_flags = iinfo->flags;
out_flags = oinfo->flags;
if (in_state == SPA_NODE_STATE_READY && out_state == SPA_NODE_STATE_READY) {
if ((out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
} else if ((out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else if ((out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) {
priv->n_in_buffers = 16;
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
if ((res = spa_buffer_alloc (oinfo->params, oinfo->n_params,
priv->in_buffers,
&priv->n_in_buffers)) < 0) {
g_warning ("error alloc buffers: %d", res);
goto error;
}
memcpy (priv->out_buffers, priv->in_buffers, priv->n_in_buffers * sizeof (SpaBuffer*));
priv->n_out_buffers = priv->n_in_buffers;
} else if ((out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else {
g_warning ("error no common allocation found");
res = SPA_RESULT_ERROR;
goto error;
}
} else if (in_state == SPA_NODE_STATE_READY && out_state > SPA_NODE_STATE_READY) {
out_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
} else if (out_state == SPA_NODE_STATE_READY && in_state > SPA_NODE_STATE_READY) {
in_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
} else
return SPA_RESULT_OK;
spa_debug_port_info (oinfo); spa_debug_port_info (oinfo);
spa_debug_port_info (iinfo); spa_debug_port_info (iinfo);
if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) &&
(iinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
} else if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) &&
(iinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) &&
(iinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) {
priv->n_in_buffers = 16;
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
if ((res = spa_buffer_alloc (oinfo->params, oinfo->n_params,
priv->in_buffers,
&priv->n_in_buffers)) < 0) {
g_warning ("error alloc buffers: %d", res);
goto error;
}
memcpy (priv->out_buffers, priv->in_buffers, priv->n_in_buffers * sizeof (SpaBuffer*));
priv->n_out_buffers = priv->n_in_buffers;
} else if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) &&
(iinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else {
g_warning ("error no common allocation found");
res = SPA_RESULT_ERROR;
goto error;
}
if (in_state > SPA_NODE_STATE_READY)
in_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
if (out_state > SPA_NODE_STATE_READY)
out_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
priv->n_in_buffers = 16; priv->n_in_buffers = 16;
if ((res = spa_node_port_alloc_buffers (this->input_node->node, this->input_port, if ((res = spa_node_port_alloc_buffers (this->input_node->node, this->input_port,
@ -360,7 +364,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
goto error; goto error;
} }
} }
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { else if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
priv->n_out_buffers = 16; priv->n_out_buffers = 16;
if ((res = spa_node_port_alloc_buffers (this->output_node->node, this->output_port, if ((res = spa_node_port_alloc_buffers (this->output_node->node, this->output_port,
iinfo->params, iinfo->n_params, iinfo->params, iinfo->n_params,
@ -376,7 +380,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
goto error; goto error;
} }
} }
if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
if ((res = spa_node_port_use_buffers (this->output_node->node, this->output_port, if ((res = spa_node_port_use_buffers (this->output_node->node, this->output_port,
priv->in_buffers, priv->n_in_buffers)) < 0) { priv->in_buffers, priv->n_in_buffers)) < 0) {
g_warning ("error use buffers: %d", res); g_warning ("error use buffers: %d", res);
@ -395,20 +399,16 @@ error:
static SpaResult static SpaResult
do_start (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) do_start (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
{ {
SpaNodeCommand cmd;
SpaResult res = SPA_RESULT_OK; SpaResult res = SPA_RESULT_OK;
cmd.type = SPA_NODE_COMMAND_START; if (in_state < SPA_NODE_STATE_PAUSED || out_state < SPA_NODE_STATE_PAUSED)
cmd.data = NULL; return SPA_RESULT_OK;
cmd.size = 0;
if (in_state == SPA_NODE_STATE_PAUSED) { if (in_state == SPA_NODE_STATE_PAUSED)
if ((res = spa_node_send_command (this->input_node->node, &cmd)) < 0) pinos_node_set_state (this->input_node, PINOS_NODE_STATE_RUNNING);
g_warning ("got error %d", res);
} if (out_state == SPA_NODE_STATE_PAUSED)
if (out_state == SPA_NODE_STATE_PAUSED) { pinos_node_set_state (this->output_node, PINOS_NODE_STATE_RUNNING);
if ((res = spa_node_send_command (this->output_node->node, &cmd)) < 0)
g_warning ("got error %d", res);
}
return res; return res;
} }

View file

@ -289,6 +289,21 @@ pause_node (PinosNode *this)
g_debug ("got error %d", res); g_debug ("got error %d", res);
} }
static void
start_node (PinosNode *this)
{
SpaResult res;
SpaNodeCommand cmd;
g_debug ("node %p: start node", this);
cmd.type = SPA_NODE_COMMAND_START;
cmd.data = NULL;
cmd.size = 0;
if ((res = spa_node_send_command (this->node, &cmd)) < 0)
g_debug ("got error %d", res);
}
static void static void
suspend_node (PinosNode *this) suspend_node (PinosNode *this)
{ {
@ -343,6 +358,8 @@ node_set_state (PinosNode *this,
break; break;
case PINOS_NODE_STATE_RUNNING: case PINOS_NODE_STATE_RUNNING:
send_clock_update (this);
start_node (this);
break; break;
case PINOS_NODE_STATE_ERROR: case PINOS_NODE_STATE_ERROR:
@ -424,7 +441,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{ {
SpaPollItem *poll = event->data; SpaPollItem *poll = event->data;
g_debug ("node %p: add poll %d, n_fds %d", this, poll->id, poll->n_fds); g_debug ("node %p: add pollid %d, n_poll %d, n_fds %d", this, poll->id, priv->n_poll, poll->n_fds);
priv->poll[priv->n_poll] = *poll; priv->poll[priv->n_poll] = *poll;
priv->n_poll++; priv->n_poll++;
if (poll->n_fds) if (poll->n_fds)
@ -1215,6 +1232,8 @@ pinos_node_link (PinosNode *output_node,
if (output_node->priv->clock) if (output_node->priv->clock)
input_node->priv->clock = output_node->priv->clock; input_node->priv->clock = output_node->priv->clock;
g_debug ("node %p: clock %p", output_node, output_node->priv->clock);
output_port = get_free_node_port (output_node, PINOS_DIRECTION_OUTPUT); output_port = get_free_node_port (output_node, PINOS_DIRECTION_OUTPUT);
if (output_port == SPA_ID_INVALID) if (output_port == SPA_ID_INVALID)
output_port = output_node->priv->output_port_ids[0]; output_port = output_node->priv->output_port_ids[0];

View file

@ -256,7 +256,6 @@ spa_proxy_node_send_command (SpaNode *node,
spa_control_clear (&control); spa_control_clear (&control);
break; break;
break;
} }
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
@ -703,6 +702,9 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
if (!port->format) if (!port->format)
return SPA_RESULT_NO_FORMAT; return SPA_RESULT_NO_FORMAT;
if (port->n_buffers == n_buffers && port->buffers == buffers)
return SPA_RESULT_OK;
spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds)); spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds));
if (buffers == NULL || n_buffers == 0) { if (buffers == NULL || n_buffers == 0) {

View file

@ -272,8 +272,12 @@ spa_v4l2_source_node_send_command (SpaNode *node,
case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_FLUSH:
case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_DRAIN:
case SPA_NODE_COMMAND_MARKER: case SPA_NODE_COMMAND_MARKER:
case SPA_NODE_COMMAND_CLOCK_UPDATE:
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
case SPA_NODE_COMMAND_CLOCK_UPDATE:
{
return SPA_RESULT_OK;
}
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -592,8 +596,12 @@ spa_v4l2_source_node_port_alloc_buffers (SpaNode *node,
res = spa_v4l2_alloc_buffers (this, params, n_params, buffers, n_buffers); res = spa_v4l2_alloc_buffers (this, params, n_params, buffers, n_buffers);
if (state->have_buffers) if (state->have_buffers) {
update_state (this, SPA_NODE_STATE_PAUSED); if (state->started)
update_state (this, SPA_NODE_STATE_STREAMING);
else
update_state (this, SPA_NODE_STATE_PAUSED);
}
return res; return res;
} }

View file

@ -862,6 +862,13 @@ v4l2_on_fd_events (SpaPollNotifyData *data)
SpaNodeEvent event; SpaNodeEvent event;
SpaNodeEventHaveOutput ho; SpaNodeEventHaveOutput ho;
if (data->fds[0].revents & POLLERR) {
return -1;
}
if (mmap_read (this) < 0) if (mmap_read (this) < 0)
return 0; return 0;
@ -1171,22 +1178,31 @@ spa_v4l2_pause (SpaV4l2Source *this)
SpaV4l2State *state = &this->state[0]; SpaV4l2State *state = &this->state[0];
enum v4l2_buf_type type; enum v4l2_buf_type type;
SpaNodeEvent event; SpaNodeEvent event;
int i;
if (!state->started) if (!state->started)
return SPA_RESULT_OK; return SPA_RESULT_OK;
state->started = false;
event.type = SPA_NODE_EVENT_TYPE_REMOVE_POLL; event.type = SPA_NODE_EVENT_TYPE_REMOVE_POLL;
event.data = &state->poll; event.data = &state->poll;
event.size = sizeof (state->poll); event.size = sizeof (state->poll);
this->event_cb (&this->node, &event, this->user_data); this->event_cb (&this->node, &event, this->user_data);
state->started = false;
type = V4L2_BUF_TYPE_VIDEO_CAPTURE; type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) { if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) {
perror ("VIDIOC_STREAMOFF"); perror ("VIDIOC_STREAMOFF");
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
for (i = 0; i < state->reqbuf.count; i++) {
V4l2Buffer *b;
b = &state->alloc_buffers[i];
if (!b->outstanding)
if (xioctl (state->fd, VIDIOC_QBUF, &b->v4l2_buffer) < 0)
perror ("VIDIOC_QBUF");
}
update_state (this, SPA_NODE_STATE_PAUSED); update_state (this, SPA_NODE_STATE_PAUSED);
return SPA_RESULT_OK; return SPA_RESULT_OK;