Improve upload

Only send a buffer when we have received a NEED_DATA message.
Add a signal to pulla buffer from the sink. Restructure the sink to use
a queue like the source and only push a buffer when we can.
Improve SpaData. Offset and size should be between 0 and maxsize, make
sure we clamp correctly when needed.
node_process_output completes the processing of the output after
receiving HAVE_OUTPUT for async elements. It instructs the node that
it now can produce more output.
This commit is contained in:
Wim Taymans 2016-12-20 16:51:57 +01:00
parent 8ce3f949e2
commit 5b0b9c43d0
18 changed files with 201 additions and 112 deletions

View file

@ -192,6 +192,7 @@ pinos_stream_new (PinosContext *context,
pinos_signal_init (&this->add_buffer);
pinos_signal_init (&this->remove_buffer);
pinos_signal_init (&this->new_buffer);
pinos_signal_init (&this->need_buffer);
this->state = PINOS_STREAM_STATE_UNCONNECTED;
@ -473,6 +474,9 @@ on_rtsocket_condition (SpaSource *source,
handle_rtnode_event (stream, ev);
}
}
if (cmd & PINOS_TRANSPORT_CMD_NEED_DATA) {
pinos_signal_emit (&stream->need_buffer, stream);
}
}
}
@ -1080,16 +1084,15 @@ pinos_stream_send_buffer (PinosStream *stream,
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
if ((bid = find_buffer (stream, id))) {
uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
bid->used = true;
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_RESULT_OK;
write (impl->rtfd, &cmd, 1);
return true;
} else {
return true;
if (write (impl->rtfd, &cmd, 1) < 1)
perror ("write");
}
return true;
}

View file

@ -91,6 +91,8 @@ struct _PinosStream {
PINOS_SIGNAL (new_buffer, (PinosListener *listener,
PinosStream *stream,
uint32_t id));
PINOS_SIGNAL (need_buffer, (PinosListener *listener,
PinosStream *stream));
};
PinosStream * pinos_stream_new (PinosContext *context,

View file

@ -75,6 +75,7 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GST_OBJECT_LOCK (pool);
while (p->available.length == 0) {
GST_WARNING ("queue empty");
g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool));
}
*buffer = g_queue_pop_head (&p->available);

View file

@ -230,6 +230,8 @@ gst_pinos_sink_init (GstPinosSink * sink)
sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
(GDestroyNotify) gst_buffer_unref);
g_queue_init (&sink->queue);
sink->loop = pinos_loop_new ();
sink->main_loop = pinos_thread_main_loop_new (sink->loop, "pinos-sink-loop");
GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop);
@ -347,6 +349,7 @@ typedef struct {
SpaBuffer *buf;
SpaMetaHeader *header;
guint flags;
goffset offset;
} ProcessMemData;
static void
@ -403,13 +406,15 @@ on_add_buffer (PinosListener *listener,
case SPA_DATA_TYPE_DMABUF:
{
gmem = gst_fd_allocator_alloc (pinossink->allocator, dup (d->fd),
d->size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->chunk->offset, d->chunk->size);
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->chunk->offset + d->mapoffset, d->chunk->size);
data.offset = d->mapoffset;
break;
}
case SPA_DATA_TYPE_MEMPTR:
gmem = gst_memory_new_wrapped (0, d->data, d->size, d->chunk->offset,
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, d->chunk->offset,
d->chunk->size, NULL, NULL);
data.offset = 0;
break;
default:
break;
@ -466,6 +471,51 @@ on_new_buffer (PinosListener *listener,
}
}
static void
do_send_buffer (GstPinosSink *pinossink)
{
GstBuffer *buffer;
ProcessMemData *data;
gboolean res;
guint i;
buffer = g_queue_pop_head (&pinossink->queue);
if (buffer == NULL)
return;
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer),
process_mem_data_quark);
if (data->header) {
data->header->seq = GST_BUFFER_OFFSET (buffer);
data->header->pts = GST_BUFFER_PTS (buffer);
data->header->dts_offset = GST_BUFFER_DTS (buffer);
}
for (i = 0; i < data->buf->n_datas; i++) {
SpaData *d = &data->buf->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
d->chunk->offset = mem->offset - data->offset;
d->chunk->size = mem->size;
}
if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id)))
g_warning ("can't send buffer");
pinossink->need_ready--;
}
static void
on_need_buffer (PinosListener *listener,
PinosStream *stream)
{
GstPinosSink *pinossink = SPA_CONTAINER_OF (listener, GstPinosSink, stream_need_buffer);
pinossink->need_ready++;
GST_DEBUG ("need buffer %u", pinossink->need_ready);
do_send_buffer (pinossink);
}
static void
on_state_changed (PinosListener *listener,
PinosStream *stream)
@ -610,9 +660,6 @@ static GstFlowReturn
gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
GstPinosSink *pinossink;
gboolean res;
ProcessMemData *data;
guint i;
pinossink = GST_PINOS_SINK (bsink);
@ -633,25 +680,11 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
buffer = b;
}
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer),
process_mem_data_quark);
if (data->header) {
data->header->seq = GST_BUFFER_OFFSET (buffer);
data->header->pts = GST_BUFFER_PTS (buffer);
data->header->dts_offset = GST_BUFFER_DTS (buffer);
}
for (i = 0; i < data->buf->n_datas; i++) {
SpaData *d = &data->buf->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
d->chunk->offset = mem->offset;
d->chunk->size = mem->size;
}
gst_buffer_ref (buffer);
g_queue_push_tail (&pinossink->queue, buffer);
if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id)))
g_warning ("can't send buffer");
if (pinossink->need_ready)
do_send_buffer (pinossink);
done:
pinos_thread_main_loop_unlock (pinossink->main_loop);
@ -707,6 +740,7 @@ gst_pinos_sink_start (GstBaseSink * basesink)
pinos_signal_add (&pinossink->stream->add_buffer, &pinossink->stream_add_buffer, on_add_buffer);
pinos_signal_add (&pinossink->stream->remove_buffer, &pinossink->stream_remove_buffer, on_remove_buffer);
pinos_signal_add (&pinossink->stream->new_buffer, &pinossink->stream_new_buffer, on_new_buffer);
pinos_signal_add (&pinossink->stream->need_buffer, &pinossink->stream_need_buffer, on_need_buffer);
pinos_thread_main_loop_unlock (pinossink->main_loop);
return TRUE;

View file

@ -89,6 +89,7 @@ struct _GstPinosSink {
PinosListener stream_add_buffer;
PinosListener stream_remove_buffer;
PinosListener stream_new_buffer;
PinosListener stream_need_buffer;
GstAllocator *allocator;
GstStructure *properties;
@ -96,6 +97,8 @@ struct _GstPinosSink {
GstPinosPool *pool;
GHashTable *buf_ids;
GQueue queue;
guint need_ready;
};
struct _GstPinosSinkClass {

View file

@ -173,13 +173,19 @@ clock_disabled:
}
}
static void
clear_queue (GstPinosSrc *pinossrc)
{
g_queue_foreach (&pinossrc->queue, (GFunc) gst_mini_object_unref, NULL);
g_queue_clear (&pinossrc->queue);
}
static void
gst_pinos_src_finalize (GObject * object)
{
GstPinosSrc *pinossrc = GST_PINOS_SRC (object);
g_queue_foreach (&pinossrc->queue, (GFunc) gst_mini_object_unref, NULL);
g_queue_clear (&pinossrc->queue);
clear_queue (pinossrc);
pinos_thread_main_loop_destroy (pinossrc->main_loop);
pinossrc->main_loop = NULL;
@ -340,6 +346,7 @@ typedef struct {
SpaBuffer *buf;
SpaMetaHeader *header;
guint flags;
goffset offset;
} ProcessMemData;
static void
@ -417,13 +424,15 @@ on_add_buffer (PinosListener *listener,
case SPA_DATA_TYPE_DMABUF:
{
gmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (d->fd),
d->size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->chunk->offset, d->chunk->size);
d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->chunk->offset + d->mapoffset, d->chunk->size);
data.offset = d->mapoffset;
break;
}
case SPA_DATA_TYPE_MEMPTR:
gmem = gst_memory_new_wrapped (0, d->data, d->size, d->chunk->offset,
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, d->chunk->offset + d->mapoffset,
d->chunk->size, NULL, NULL);
data.offset = 0;
default:
break;
}
@ -488,7 +497,7 @@ on_new_buffer (PinosListener *listener,
for (i = 0; i < data->buf->n_datas; i++) {
SpaData *d = &data->buf->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buf, i);
mem->offset = d->chunk->offset;
mem->offset = d->chunk->offset + data->offset;
mem->size = d->chunk->size;
}
g_queue_push_tail (&pinossrc->queue, buf);
@ -937,13 +946,6 @@ gst_pinos_src_start (GstBaseSrc * basesrc)
return TRUE;
}
static void
clear_queue (GstPinosSrc *pinossrc)
{
g_queue_foreach (&pinossrc->queue, (GFunc) gst_mini_object_unref, NULL);
g_queue_clear (&pinossrc->queue);
}
static gboolean
gst_pinos_src_stop (GstBaseSrc * basesrc)
{

View file

@ -195,6 +195,11 @@ spa_proxy_node_send_command (SpaNode *node,
PINOS_MESSAGE_NODE_COMMAND,
&cnc,
true);
if (command->type == SPA_NODE_COMMAND_START) {
uint8_t cmd = PINOS_TRANSPORT_CMD_NEED_DATA;
write (this->data_source.fd, &cmd, 1);
}
res = SPA_RESULT_RETURN_ASYNC (cnc.seq);
break;
}
@ -686,8 +691,8 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
am.type = d->type;
am.memfd = d->fd;
am.flags = d->flags;
am.offset = d->offset;
am.size = d->size;
am.offset = d->mapoffset;
am.size = d->maxsize;
pinos_resource_send_message (this->resource,
PINOS_MESSAGE_ADD_MEM,
&am,
@ -698,7 +703,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
break;
case SPA_DATA_TYPE_MEMPTR:
b->buffer.datas[j].data = SPA_INT_TO_PTR (b->size);
b->size += d->size;
b->size += d->maxsize;
break;
default:
b->buffer.datas[j].type = SPA_DATA_TYPE_INVALID;
@ -863,8 +868,6 @@ static SpaResult
spa_proxy_node_process_input (SpaNode *node)
{
SpaProxy *this;
unsigned int i;
bool have_error = false;
uint8_t cmd;
if (node == NULL)
@ -872,22 +875,29 @@ spa_proxy_node_process_input (SpaNode *node)
this = SPA_CONTAINER_OF (node, SpaProxy, node);
for (i = 0; i < this->n_inputs; i++) {
SpaProxyPort *port = &this->in_ports[i];
SpaPortInput *input;
#if 0
{
unsigned int i;
bool have_error = false;
if ((input = port->io) == NULL)
continue;
for (i = 0; i < this->n_inputs; i++) {
SpaProxyPort *port = &this->in_ports[i];
SpaPortInput *input;
if (!CHECK_PORT_BUFFER (this, input->buffer_id, port)) {
input->status = SPA_RESULT_INVALID_BUFFER_ID;
have_error = true;
continue;
if ((input = port->io) == NULL)
continue;
if (!CHECK_PORT_BUFFER (this, input->buffer_id, port)) {
input->status = SPA_RESULT_INVALID_BUFFER_ID;
have_error = true;
continue;
}
copy_meta_out (this, port, input->buffer_id);
}
copy_meta_out (this, port, input->buffer_id);
if (have_error)
return SPA_RESULT_ERROR;
}
if (have_error)
return SPA_RESULT_ERROR;
#endif
cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
write (this->data_source.fd, &cmd, 1);
@ -899,28 +909,37 @@ static SpaResult
spa_proxy_node_process_output (SpaNode *node)
{
SpaProxy *this;
unsigned int i;
bool have_error = false;
uint8_t cmd;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
for (i = 0; i < this->n_outputs; i++) {
SpaProxyPort *port = &this->out_ports[i];
SpaPortOutput *output;
#if 0
{
unsigned int i;
bool have_error = false;
if ((output = port->io) == NULL)
continue;
for (i = 0; i < this->n_outputs; i++) {
SpaProxyPort *port = &this->out_ports[i];
SpaPortOutput *output;
copy_meta_in (this, port, output->buffer_id);
if ((output = port->io) == NULL)
continue;
if (output->status != SPA_RESULT_OK)
have_error = true;
copy_meta_in (this, port, output->buffer_id);
if (output->status != SPA_RESULT_OK)
have_error = true;
}
if (have_error)
return SPA_RESULT_ERROR;
}
if (have_error)
return SPA_RESULT_ERROR;
#endif
cmd = PINOS_TRANSPORT_CMD_NEED_DATA;
write (this->data_source.fd, &cmd, 1);
return SPA_RESULT_OK;
}
@ -1052,6 +1071,17 @@ proxy_on_data_fd_events (SpaSource *source)
}
if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) {
SpaNodeEventHaveOutput ho;
unsigned int i;
for (i = 0; i < this->n_outputs; i++) {
SpaProxyPort *port = &this->out_ports[i];
SpaPortOutput *output;
if ((output = port->io) == NULL)
continue;
copy_meta_in (this, port, output->buffer_id);
}
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
ho.port_id = 0;

View file

@ -324,10 +324,10 @@ alloc_buffers (PinosLink *this,
d->type = SPA_DATA_TYPE_MEMFD;
d->flags = 0;
d->fd = mem->fd;
d->offset = 0;
d->size = mem->size;
d->data = mem->ptr;
d->chunk->offset = SPA_PTRDIFF (ddp, d->data);
d->mapoffset = SPA_PTRDIFF (ddp, mem->ptr);
d->maxsize = data_sizes[j];
d->data = SPA_MEMBER (mem->ptr, d->mapoffset, void);
d->chunk->offset = 0;
d->chunk->size = data_sizes[j];
d->chunk->stride = data_strides[j];
ddp += data_sizes[j];

View file

@ -328,11 +328,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
PinosPort *port = this->output_port_map[ho->port_id];
PinosLink *link;
if ((res = spa_node_process_output (node)) < 0) {
pinos_log_warn ("node %p: got pull error %d, %d", this, res, po->status);
break;
}
spa_list_for_each (link, &port->rt.links, rt.output_link) {
size_t offset;
@ -356,6 +351,10 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
if ((res = spa_node_port_reuse_buffer (node, ho->port_id, po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", node, res);
}
if ((res = spa_node_process_output (node)) < 0) {
pinos_log_warn ("node %p: got pull error %d, %d", this, res, po->status);
break;
}
break;
}
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: