Rework the wire protocol

Send a command stream over the socket.
Implement a new buffer object that holds the data and commands.
Make iterator and builders to parse and construct buffers.
Rework gstreamer elements to use new API for creating and parsing
buffers.
Add _release_buffer to notify a stream when we are done processing the
buffer. This will eventually go all the way to the server and will allow
us to do more complicated buffer management.
This commit is contained in:
Wim Taymans 2015-08-24 16:41:04 +02:00
parent d0f3f3125b
commit c47fcd8105
14 changed files with 937 additions and 229 deletions

View file

@ -52,7 +52,7 @@ struct _PinosStreamPrivate
GSocket *socket;
GSource *socket_source;
PinosBufferInfo info;
PinosStackBuffer buffer;
};
#define PINOS_STREAM_GET_PRIVATE(obj) \
@ -238,6 +238,10 @@ pinos_stream_finalize (GObject * object)
g_clear_object (&priv->context);
g_free (priv->name);
if (priv->buffer.message)
g_object_unref (priv->buffer.message);
g_free (priv->buffer.data);
G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object);
}
@ -731,8 +735,6 @@ pinos_stream_disconnect (PinosStream *stream)
return TRUE;
}
#include <gst/wire-protocol.h>
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
@ -746,14 +748,24 @@ on_socket_condition (GSocket *socket,
{
gssize len;
GInputVector ivec;
FDMessage msg;
PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
gint num_messages = 0;
gint flags = 0;
gsize need;
GError *error = NULL;
ivec.buffer = &msg;
ivec.size = sizeof (msg);
need = sizeof (PinosStackHeader);
if (priv->buffer.allocated_size < need) {
priv->buffer.allocated_size = need;
priv->buffer.data = g_realloc (priv->buffer.data, need);
}
hdr = priv->buffer.data;
ivec.buffer = hdr;
ivec.size = sizeof (PinosStackHeader);
len = g_socket_receive_message (socket,
NULL,
@ -765,21 +777,31 @@ on_socket_condition (GSocket *socket,
NULL,
&error);
g_assert (len == sizeof (msg));
if (priv->info.message)
g_object_unref (priv->info.message);
g_assert (len == sizeof (PinosStackHeader));
if (num_messages == 0)
break;
priv->info.flags = msg.flags;
priv->info.seq = msg.seq;
priv->info.pts = msg.pts;
priv->info.dts_offset = msg.dts_offset;
priv->info.offset = msg.offset;
priv->info.size = msg.size;
priv->info.message = messages[0];
need += hdr->length;
if (priv->buffer.allocated_size < need) {
priv->buffer.allocated_size = need;
hdr = priv->buffer.data = g_realloc (priv->buffer.data, need);
}
priv->buffer.size = need;
if (priv->buffer.message)
g_object_unref (priv->buffer.message);
priv->buffer.message = messages[0];
len = g_socket_receive (socket,
(gchar *)priv->buffer.data + sizeof (PinosStackHeader),
hdr->length,
NULL,
&error);
g_assert (len == hdr->length);
priv->buffer.magic = PSB_MAGIC;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
break;
@ -1058,85 +1080,115 @@ pinos_stream_stop (PinosStream *stream)
/**
* pinos_stream_capture_buffer:
* @stream: a #PinosStream
* @info: a #PinosBufferInfo
* @buffer: a #PinosBuffer
*
* Capture the next buffer from @stream. This function should be called every
* time after the new-buffer callback has been emitted.
*
* Returns: %TRUE when @info contains valid information
* Returns: %TRUE when @buffer contains valid information
*/
gboolean
pinos_stream_capture_buffer (PinosStream *stream,
PinosBufferInfo *info)
pinos_stream_capture_buffer (PinosStream *stream,
PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (info != NULL, FALSE);
g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
*info = priv->info;
memcpy (buffer, &priv->buffer, sizeof (PinosStackBuffer));
priv->buffer.data = NULL;
priv->buffer.allocated_size = 0;
priv->buffer.size = 0;
priv->buffer.message = NULL;
return TRUE;
}
/**
* pinos_stream_release_buffer:
* @stream: a #PinosStream
* @buffer: a #PinosBuffer
*
* Release @buffer back to @stream. This function should be called whenever the
* buffer is processed.
*/
void
pinos_stream_release_buffer (PinosStream *stream,
PinosBuffer *buffer)
{
PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (is_valid_buffer (buffer), FALSE);
priv = stream->priv;
if (priv->buffer.data == NULL) {
priv->buffer.data = sb->data;
priv->buffer.allocated_size = sb->allocated_size;
priv->buffer.size = 0;
}
else
g_free (sb->data);
if (sb->message)
g_object_unref (sb->message);
}
/**
* pinos_stream_provide_buffer:
* @stream: a #PinosStream
* @info: a #PinosBufferInfo
* @buffer: a #PinosBuffer
*
* Provide the next buffer from @stream. This function should be called every
* time a new frame becomes available.
*
* Returns: %TRUE when @info was handled
* Returns: %TRUE when @buffer was handled
*/
gboolean
pinos_stream_provide_buffer (PinosStream *stream,
PinosBufferInfo *info)
pinos_stream_provide_buffer (PinosStream *stream,
PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
gssize len;
GOutputVector ovec;
FDMessage msg;
PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
GOutputVector ovec[1];
gint flags = 0;
GError *error = NULL;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (info != NULL, FALSE);
g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
msg.flags = info->flags;
msg.seq = info->seq;
msg.pts = info->pts;
msg.dts_offset = info->dts_offset;
msg.offset = info->offset;
msg.size = info->size;
ovec.buffer = &msg;
ovec.size = sizeof (msg);
ovec[0].buffer = sb->data;
ovec[0].size = sb->size;
len = g_socket_send_message (priv->socket,
NULL,
&ovec,
ovec,
1,
&info->message,
&sb->message,
1,
flags,
NULL,
&error);
if (info->message) {
g_object_unref (info->message);
info->message = NULL;
if (sb->message) {
g_object_unref (sb->message);
sb->message = NULL;
}
if (len == -1)
goto send_error;
g_assert (len == sizeof (msg));
g_assert (len == sb->size);
return TRUE;