Rework how clients connect.

Add buffer flags. The idea is to make it possible to easily check when a
buffer contains control information that we need to parse to update the
port fields.
Make the client create remote nodes and ports and set up proxies for
them.
Make a port base class implementing most of the logic to pass buffers
locally and remotely.
Remove most code from stream.c, it's now in the port.
Make a portsink and portsrc that can write and read to/from any port. We
use these in the server to send and receive data.
Rework format negotiation. The final format is now sent in-line before
the data. The server will select a format on output ports.
This commit is contained in:
Wim Taymans 2016-05-17 09:38:30 +02:00
parent e85c3002f7
commit 4a5ed1e1f5
35 changed files with 3111 additions and 761 deletions

View file

@ -54,10 +54,6 @@ struct _PinosStreamPrivate
gboolean disconnecting;
PinosStreamMode mode;
GSocket *socket;
GSource *socket_source;
PinosStackBuffer buffer;
};
#define PINOS_STREAM_GET_PRIVATE(obj) \
@ -74,7 +70,6 @@ enum
PROP_STATE,
PROP_POSSIBLE_FORMATS,
PROP_FORMAT,
PROP_SOCKET,
};
enum
@ -119,10 +114,6 @@ pinos_stream_get_property (GObject *_object,
g_value_set_boxed (value, priv->format);
break;
case PROP_SOCKET:
g_value_set_object (value, priv->socket);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (stream, prop_id, pspec);
break;
@ -153,6 +144,13 @@ pinos_stream_set_property (GObject *_object,
priv->properties = g_value_dup_boxed (value);
break;
case PROP_FORMAT:
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_value_dup_boxed (value);
g_object_set (priv->port, "format", priv->format, NULL);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (stream, prop_id, pspec);
break;
@ -210,6 +208,9 @@ subscription_cb (PinosSubscribe *subscribe,
}
break;
case PINOS_SUBSCRIPTION_FLAG_PORT:
break;
default:
break;
}
@ -237,9 +238,7 @@ pinos_stream_finalize (GObject * object)
g_debug ("free stream %p", stream);
g_clear_object (&priv->socket);
g_clear_object (&priv->node);
g_clear_object (&priv->port);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
@ -258,9 +257,6 @@ pinos_stream_finalize (GObject * object)
g_clear_object (&priv->context);
g_free (priv->name);
g_free (priv->buffer.free_data);
g_free (priv->buffer.free_fds);
G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object);
}
@ -358,25 +354,8 @@ pinos_stream_class_init (PinosStreamClass * klass)
"Format",
"The format of the stream",
G_TYPE_BYTES,
G_PARAM_READABLE |
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
/**
* PinosStream:socket
*
* The socket of the stream. When doing pinos_stream_start() with
* #PINOS_STREAM_MODE_SOCKET, the socket will contain a data stream with
* meta data and anciliary data containing fds with the data.
*/
g_object_class_install_property (gobject_class,
PROP_SOCKET,
g_param_spec_object ("socket",
"Socket",
"The stream socket",
G_TYPE_SOCKET,
G_PARAM_READABLE |
G_PARAM_STATIC_STRINGS));
/**
* PinosStream:new-buffer
*
@ -494,6 +473,37 @@ pinos_stream_get_error (PinosStream *stream)
return stream->priv->error;
}
static void
on_received_buffer (PinosPort *port,
gpointer user_data)
{
PinosStream *stream = user_data;
g_debug ("buffer received");
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
}
static void
on_port_notify (GObject *object,
GParamSpec *pspec,
gpointer user_data)
{
PinosPort *port = PINOS_PORT (object);
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "format")) {
g_clear_pointer (&priv->format, g_bytes_unref);
g_object_get (port, "format", &priv->format, NULL);
g_object_notify (G_OBJECT (stream), "format");
}
if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "possible-formats")) {
g_clear_pointer (&priv->possible_formats, g_bytes_unref);
g_object_get (port, "possible-formats", &priv->possible_formats, NULL);
g_object_notify (G_OBJECT (stream), "possible-formats");
}
}
static void
on_port_created (GObject *source_object,
GAsyncResult *res,
@ -511,6 +521,11 @@ on_port_created (GObject *source_object,
if (priv->port == NULL)
goto create_failed;
on_port_notify (G_OBJECT (priv->port), NULL, stream);
g_signal_connect (priv->port, "notify", (GCallback) on_port_notify, stream);
pinos_port_set_received_buffer_cb (priv->port, on_received_buffer, stream, NULL);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
@ -808,186 +823,19 @@ pinos_stream_disconnect (PinosStream *stream)
return TRUE;
}
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
switch (condition) {
case G_IO_IN:
{
gssize len;
GInputVector ivec;
PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
gint num_messages = 0;
gint flags = 0;
gsize need;
GError *error = NULL;
gint i;
need = sizeof (PinosStackHeader);
if (priv->buffer.max_size < need) {
priv->buffer.max_size = need;
priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need);
}
hdr = priv->buffer.data;
/* read header first */
ivec.buffer = hdr;
ivec.size = sizeof (PinosStackHeader);
len = g_socket_receive_message (socket,
NULL,
&ivec,
1,
&messages,
&num_messages,
&flags,
NULL,
&error);
g_assert (len == sizeof (PinosStackHeader));
/* now we know the total length */
need += hdr->length;
if (priv->buffer.max_size < need) {
priv->buffer.max_size = need;
hdr = priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need);
}
priv->buffer.size = need;
if (hdr->length > 0) {
/* read data */
len = g_socket_receive (socket,
(gchar *)priv->buffer.data + sizeof (PinosStackHeader),
hdr->length,
NULL,
&error);
g_assert (len == hdr->length);
}
if (priv->buffer.max_fds < num_messages) {
priv->buffer.max_fds = num_messages;
priv->buffer.fds = priv->buffer.free_fds = g_realloc (priv->buffer.free_fds,
num_messages * sizeof (int));
}
/* handle control messages */
for (i = 0; i < num_messages; i++) {
GSocketControlMessage *msg = messages[i];
gint *fds, n_fds, j;
if (g_socket_control_message_get_msg_type (msg) != SCM_RIGHTS)
continue;
fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds);
for (j = 0; j < n_fds; j++)
priv->buffer.fds[i] = fds[i];
g_free (fds);
g_object_unref (msg);
}
g_free (messages);
priv->buffer.magic = PSB_MAGIC;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
priv->buffer.magic = 0;
priv->buffer.size = 0;
priv->buffer.n_fds = 0;
break;
}
case G_IO_OUT:
g_warning ("can do IO\n");
break;
default:
break;
}
return TRUE;
}
static void
handle_socket (PinosStream *stream, gint fd)
{
PinosStreamPrivate *priv = stream->priv;
GError *error = NULL;
priv->socket = g_socket_new_from_fd (fd, &error);
if (priv->socket == NULL)
goto socket_failed;
switch (priv->mode) {
case PINOS_STREAM_MODE_SOCKET:
g_object_notify (G_OBJECT (stream), "socket");
break;
case PINOS_STREAM_MODE_BUFFER:
{
priv->socket_source = g_socket_create_source (priv->socket, G_IO_IN, NULL);
g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL);
g_source_attach (priv->socket_source, priv->context->priv->context);
break;
}
default:
break;
}
return;
/* ERRORS */
socket_failed:
{
g_warning ("failed to create socket: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
return;
}
}
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
switch (priv->mode) {
case PINOS_STREAM_MODE_SOCKET:
g_clear_object (&priv->socket);
g_object_notify (G_OBJECT (stream), "socket");
break;
case PINOS_STREAM_MODE_BUFFER:
if (priv->socket_source) {
g_source_destroy (priv->socket_source);
g_clear_pointer (&priv->socket_source, g_source_unref);
}
break;
default:
break;
}
}
/**
* pinos_stream_peek_buffer:
* pinos_stream_get_buffer:
* @stream: a #PinosStream
* @buffer: a #PinosBuffer
*
* Peek the next buffer from @stream. This function should be called from
* Get the next buffer from @stream. This function should be called from
* the new-buffer signal callback.
*
* Returns: %TRUE when @buffer contains valid information
*/
gboolean
pinos_stream_peek_buffer (PinosStream *stream,
PinosBuffer **buffer)
pinos_stream_get_buffer (PinosStream *stream,
PinosBuffer **buffer)
{
PinosStreamPrivate *priv;
@ -995,10 +843,9 @@ pinos_stream_peek_buffer (PinosStream *stream,
g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
g_return_val_if_fail (is_valid_buffer (&priv->buffer), FALSE);
//g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
*buffer = (PinosBuffer *) &priv->buffer;
*buffer = pinos_port_get_buffer (priv->port);
return TRUE;
}
@ -1023,13 +870,7 @@ pinos_stream_send_buffer (PinosStream *stream,
PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
gssize len;
PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
GOutputVector ovec[1];
GSocketControlMessage *msg = NULL;
gint flags = 0;
GError *error = NULL;
gint i, n_msg;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (buffer != NULL, FALSE);
@ -1037,47 +878,17 @@ pinos_stream_send_buffer (PinosStream *stream,
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
ovec[0].buffer = sb->data;
ovec[0].size = sb->size;
if (sb->n_fds) {
n_msg = 1;
msg = g_unix_fd_message_new ();
for (i = 0; i < sb->n_fds; i++)
if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), sb->fds[i], &error))
goto append_failed;
}
else {
n_msg = 0;
}
len = g_socket_send_message (priv->socket,
NULL,
ovec,
1,
&msg,
n_msg,
flags,
NULL,
&error);
if (len == -1)
if (!pinos_port_send_buffer (priv->port, buffer, &error))
goto send_error;
g_assert (len == (gssize) sb->size);
return TRUE;
append_failed:
{
g_warning ("failed to append fd: %s", error->message);
g_object_unref (msg);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
return FALSE;
}
/* ERRORS */
send_error:
{
g_warning ("failed to send_message: %s", error->message);
g_warning ("failed to send message: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_clear_error (&error);
return FALSE;
}
}