buffer: improve memory management

Add refcounting to buffers.
Use static memory on ports for buffer input and output memory.
Improve negotiation, let format and properties be negotiated on ports as
the data passes.
Improve some debug info.
This commit is contained in:
Wim Taymans 2016-05-17 20:14:06 +02:00
parent 4a5ed1e1f5
commit 0f67434ae8
21 changed files with 368 additions and 334 deletions

View file

@ -36,6 +36,15 @@
#define PINOS_PORT_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_PORT, PinosPortPrivate))
#if 0
#define PINOS_DEBUG_TRANSPORT(format,args...) g_debug(format,##args)
#else
#define PINOS_DEBUG_TRANSPORT(format,args...)
#endif
#define MAX_BUFFER_SIZE 1024
#define MAX_FDS 16
struct _PinosPortPrivate
{
PinosNode *node;
@ -49,7 +58,12 @@ struct _PinosPortPrivate
GSource *socket_source;
PinosBuffer net_buffer;
PinosBuffer recv_buffer;
guint8 recv_data[MAX_BUFFER_SIZE];
gint recv_fds[MAX_FDS];
guint8 send_data[MAX_BUFFER_SIZE];
gint send_fds[MAX_FDS];
PinosBuffer *buffer;
PinosPort *peers[16];
@ -367,7 +381,6 @@ pinos_port_link (PinosPort *source, PinosPort *destination)
source->priv->peers[source->priv->n_peers++] = destination;
destination->priv->peers[destination->priv->n_peers++] = source;
g_object_set (destination, "format", source->priv->format, NULL);
g_debug ("port %p: linked to %p", source, destination);
g_signal_emit (source, signals[SIGNAL_LINKED], 0, destination);
@ -444,27 +457,31 @@ pinos_port_get_n_links (PinosPort *port)
return port->priv->n_peers;
}
static gboolean
read_buffer (GSocket *socket,
PinosBuffer *buffer,
static PinosBuffer *
read_buffer (PinosPort *port,
GError **error)
{
PinosPortPrivate *priv = port->priv;
gssize len;
GInputVector ivec;
PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
PinosStackBuffer *sb = (PinosStackBuffer *) &priv->recv_buffer;
gint num_messages = 0;
gint flags = 0;
gsize need;
gint i;
need = sizeof (PinosStackHeader);
g_assert (sb->refcount == 0);
if (sb->max_size < need) {
sb->max_size = need;
sb->data = sb->free_data = g_realloc (sb->free_data, need);
}
sb->data = priv->recv_data;
sb->max_size = MAX_BUFFER_SIZE;
sb->size = 0;
sb->free_data = NULL;
sb->fds = priv->recv_fds;
sb->max_fds = MAX_FDS;
sb->n_fds = 0;
sb->free_fds = NULL;
hdr = sb->data;
@ -472,7 +489,7 @@ read_buffer (GSocket *socket,
ivec.buffer = hdr;
ivec.size = sizeof (PinosStackHeader);
len = g_socket_receive_message (socket,
len = g_socket_receive_message (priv->sockets[0],
NULL,
&ivec,
1,
@ -482,14 +499,15 @@ read_buffer (GSocket *socket,
NULL,
error);
if (len == -1)
return FALSE;
return NULL;
g_assert (len == sizeof (PinosStackHeader));
/* now we know the total length */
need += hdr->length;
need = sizeof (PinosStackHeader) + hdr->length;
if (sb->max_size < need) {
g_warning ("port %p: realloc receive memory %" G_GSIZE_FORMAT" -> %" G_GSIZE_FORMAT, port, sb->max_size, need);
sb->max_size = need;
hdr = sb->data = sb->free_data = g_realloc (sb->free_data, need);
}
@ -497,18 +515,19 @@ read_buffer (GSocket *socket,
if (hdr->length > 0) {
/* read data */
len = g_socket_receive (socket,
len = g_socket_receive (priv->sockets[0],
(gchar *)sb->data + sizeof (PinosStackHeader),
hdr->length,
NULL,
error);
if (len == -1)
return FALSE;
return NULL;
g_assert (len == hdr->length);
}
if (sb->max_fds < num_messages) {
g_warning ("port %p: realloc receive fds %d -> %d", port, sb->max_fds, num_messages);
sb->max_fds = num_messages;
sb->fds = sb->free_fds = g_realloc (sb->free_fds, num_messages * sizeof (int));
}
@ -524,14 +543,18 @@ read_buffer (GSocket *socket,
fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds);
for (j = 0; j < n_fds; j++)
sb->fds[i] = fds[i];
sb->n_fds = n_fds;
g_free (fds);
g_object_unref (msg);
}
g_free (messages);
sb->refcount = 1;
sb->magic = PSB_MAGIC;
return TRUE;
g_debug ("port %p: buffer %p init", &priv->recv_buffer, sb);
return &priv->recv_buffer;
}
@ -593,6 +616,73 @@ send_error:
}
}
static void
parse_control_buffer (PinosPort *port, PinosBuffer *buffer)
{
PinosPortPrivate *priv = port->priv;
PinosBufferIter it;
pinos_buffer_iter_init (&it, buffer);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
case PINOS_PACKET_TYPE_FORMAT_CHANGE:
{
PinosPacketFormatChange change;
if (!pinos_buffer_iter_parse_format_change (&it, &change))
continue;
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_bytes_new (change.format, strlen (change.format) + 1);
g_object_notify (G_OBJECT (port), "format");
break;
}
default:
break;
}
}
}
static gboolean
pinos_port_receive_buffer (PinosPort *port,
PinosBuffer *buffer,
GError **error)
{
PinosPortPrivate *priv = port->priv;
gboolean res;
if (priv->buffer)
goto buffer_queued;
PINOS_DEBUG_TRANSPORT ("port %p: receive buffer %p", port, buffer);
if (pinos_buffer_get_flags (buffer) & PINOS_BUFFER_FLAG_CONTROL)
parse_control_buffer (port, buffer);
if (priv->sockets[0]) {
PINOS_DEBUG_TRANSPORT ("port %p: write buffer %p", port, buffer);
res = write_buffer (priv->sockets[0], buffer, error);
}
else {
res = TRUE;
priv->buffer = buffer;
if (priv->received_buffer_cb)
priv->received_buffer_cb (port, priv->received_buffer_data);
priv->buffer = NULL;
}
return res;
/* ERRORS */
buffer_queued:
{
g_set_error (error,
G_IO_ERROR,
G_IO_ERROR_NOT_FOUND,
"buffer was already queued on port");
return FALSE;
}
}
static gboolean
on_socket_condition (GSocket *socket,
@ -605,27 +695,42 @@ on_socket_condition (GSocket *socket,
switch (condition) {
case G_IO_IN:
if (!read_buffer (socket, &priv->net_buffer, &error))
goto read_failed;
g_debug ("port %p: received buffer", port);
{
gint i;
PinosBuffer *buffer;
if (priv->direction == PINOS_DIRECTION_INPUT) {
priv->buffer = &priv->net_buffer;
if (priv->received_buffer_cb)
priv->received_buffer_cb (port, priv->received_buffer_data);
} else {
gint i;
for (i = 0; i < priv->n_peers; i++) {
PinosPort *peer = priv->peers[i];
if (peer == NULL)
continue;
buffer = read_buffer (port, &error);
if (buffer == NULL) {
g_warning ("port %p: failed to read buffer: %s", port, error->message);
g_clear_error (&error);
return TRUE;
}
g_debug ("port %p: send buffer %p to peer %p", port, &priv->net_buffer, peer);
if (!pinos_port_receive_buffer (peer, &priv->net_buffer, &error))
goto read_failed;
if (pinos_buffer_get_flags (buffer) & PINOS_BUFFER_FLAG_CONTROL)
parse_control_buffer (port, buffer);
PINOS_DEBUG_TRANSPORT ("port %p: read buffer %p", port, buffer);
if (priv->received_buffer_cb) {
PINOS_DEBUG_TRANSPORT ("port %p: notify buffer %p", port, buffer);
priv->buffer = buffer;
priv->received_buffer_cb (port, priv->received_buffer_data);
priv->buffer = NULL;
}
PINOS_DEBUG_TRANSPORT ("port %p: send to peer buffer %p", port, buffer);
for (i = 0; i < priv->n_peers; i++) {
PinosPort *peer = priv->peers[i];
if (peer == NULL)
continue;
if (!pinos_port_receive_buffer (peer, buffer, &error)) {
g_warning ("peer %p: failed to receive buffer: %s", peer, error->message);
g_clear_error (&error);
}
}
g_assert (pinos_buffer_unref (buffer) == FALSE);
break;
}
case G_IO_OUT:
g_warning ("can do IO OUT\n");
@ -635,13 +740,6 @@ on_socket_condition (GSocket *socket,
break;
}
return TRUE;
read_failed:
{
g_warning ("failed to read buffer: %s", error->message);
g_clear_error (&error);
return TRUE;
}
}
@ -669,62 +767,11 @@ unhandle_socket (PinosPort *port)
}
}
/**
* pinos_port_recieve_buffer:
* @port: a #PinosPort
* @buffer: a #PinosBuffer
* @error: a #GError or %NULL
*
* Receive a buffer on @port.
*
* Returns: %TRUE if the buffer could be accepted. %FALSE if port
* already has an unconsumed buffer.
*/
gboolean
pinos_port_receive_buffer (PinosPort *port,
PinosBuffer *buffer,
GError **error)
{
PinosPortPrivate *priv;
gboolean res;
g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
priv = port->priv;
if (priv->buffer)
goto buffer_queued;
if (priv->sockets[0]) {
g_debug ("port %p: receive buffer %p write to socket", port, buffer);
res = write_buffer (priv->sockets[0], buffer, error);
}
else {
g_debug ("port %p: receive buffer %p signal", port, buffer);
res = TRUE;
priv->buffer = buffer;
if (priv->received_buffer_cb)
priv->received_buffer_cb (port, priv->received_buffer_data);
}
return res;
/* ERRORS */
buffer_queued:
{
g_set_error (error,
G_IO_ERROR,
G_IO_ERROR_NOT_FOUND,
"buffer was already queued on port");
return FALSE;
}
}
/**
* pinos_port_peek_buffer:
* @port: a #PinosPort
*
* Check if there is a buffer on @port and peek it without dequeueing it
* from the port.
* Peek the buffer on @port.
*
* Returns: a #PinosBuffer or %NULL when no buffer has arrived on the pad.
*/
@ -739,27 +786,18 @@ pinos_port_peek_buffer (PinosPort *port)
return priv->buffer;
}
/**
* pinos_port_get_buffer:
* @port: a #PinosPort
*
* Get the buffer on @port. The buffer will no longer be queued on the port.
*
* Returns: a #PinosBuffer or %NULL when no buffer has arrived on the pad.
*/
PinosBuffer *
pinos_port_get_buffer (PinosPort *port)
void
pinos_port_buffer_builder_init (PinosPort *port,
PinosBufferBuilder *builder)
{
PinosPortPrivate *priv;
PinosBuffer *res;
g_return_val_if_fail (PINOS_IS_PORT (port), NULL);
g_return_if_fail (PINOS_IS_PORT (port));
priv = port->priv;
res = priv->buffer;
priv->buffer = NULL;
return res;
pinos_buffer_builder_init_into (builder,
priv->send_data, MAX_BUFFER_SIZE,
priv->send_fds, MAX_FDS);
}
/**
@ -785,15 +823,18 @@ pinos_port_send_buffer (PinosPort *port,
g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
priv = port->priv;
if (priv->direction == PINOS_DIRECTION_OUTPUT && priv->sockets[0]) {
g_debug ("port %p: send buffer %p write to socket", port, buffer);
if (pinos_buffer_get_flags (buffer) & PINOS_BUFFER_FLAG_CONTROL)
parse_control_buffer (port, buffer);
PINOS_DEBUG_TRANSPORT ("port %p: send buffer %p", port, buffer);
if (priv->sockets[0]) {
PINOS_DEBUG_TRANSPORT ("port %p: write buffer %p", port, buffer);
res = write_buffer (priv->sockets[0], buffer, error);
}
for (i = 0; i < priv->n_peers; i++) {
peer = priv->peers[i];
if (peer == NULL)
continue;
g_debug ("port %p: send buffer %p to peer %p", port, buffer, peer);
res = pinos_port_receive_buffer (peer, buffer, error);
}
return res;
@ -853,80 +894,6 @@ create_failed:
}
}
static void
set_format (PinosPort *port, GBytes *format)
{
PinosPortPrivate *priv = port->priv;
if (priv->format)
g_bytes_unref (priv->format);
priv->format = format;
g_debug ("port %p: set format", port);
if (priv->direction == PINOS_DIRECTION_OUTPUT) {
gint i;
for (i = 0; i < priv->n_peers; i++) {
PinosPort *peer = priv->peers[i];
if (peer == NULL)
continue;
set_format (peer, g_bytes_ref (format));
g_object_notify (G_OBJECT (peer), "format");
}
}
}
/**
* pinos_port_update_format:
* @port: a #PinosPort
* @format: a new format
* @error: a #GError or %NULL
*
* Update the format on @port.
*
* Returns: %TRUE on succes, @error is set when %FALSE is returned.
*/
gboolean
pinos_port_update_format (PinosPort *port, GBytes *format, GError **error)
{
PinosPortPrivate *priv;
gboolean res = TRUE;
g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
priv = port->priv;
if (priv->format)
g_bytes_unref (priv->format);
priv->format = format;
g_debug ("port %p: update format", port);
if (priv->direction == PINOS_DIRECTION_INPUT && priv->sockets[0]) {
PinosBufferBuilder builder;
PinosBuffer pbuf;
PinosPacketFormatChange fc;
pinos_buffer_builder_init (&builder);
fc.id = 0;
fc.format = g_bytes_get_data (format, NULL);
pinos_buffer_builder_add_format_change (&builder, &fc);
pinos_buffer_builder_end (&builder, &pbuf);
g_debug ("port %p: send format message %s", port, fc.format);
res = write_buffer (priv->sockets[0], &pbuf, error);
pinos_buffer_clear (&pbuf);
} else if (priv->direction == PINOS_DIRECTION_OUTPUT) {
gint i;
for (i = 0; i < priv->n_peers; i++) {
PinosPort *peer = priv->peers[i];
if (peer == NULL)
continue;
res = pinos_port_update_format (peer, g_bytes_ref (format), error);
}
}
return res;
}
static void
pinos_port_get_property (GObject *_object,
guint prop_id,
@ -983,7 +950,6 @@ pinos_port_set_property (GObject *_object,
switch (prop_id) {
case PROP_NODE:
priv->node = g_value_get_object (value);
g_debug ("port %p: set node %p %d", port, priv->node, G_OBJECT (priv->node)->ref_count);
break;
case PROP_SOCKET:
@ -1005,7 +971,9 @@ pinos_port_set_property (GObject *_object,
break;
case PROP_FORMAT:
set_format (port, g_value_dup_boxed (value));
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_value_dup_boxed (value);
break;
case PROP_PROPERTIES:
@ -1026,8 +994,8 @@ pinos_port_constructed (GObject * object)
PinosPort *port = PINOS_PORT (object);
PinosPortPrivate *priv = port->priv;
g_debug ("port %p: %s port constructed, node %p %d",
port, pinos_direction_as_string (priv->direction), priv->node, G_OBJECT (priv->node)->ref_count);
g_debug ("port %p: %s port constructed, node %p",
port, pinos_direction_as_string (priv->direction), priv->node);
if (priv->sockets[0])
handle_socket (port, priv->sockets[0]);
@ -1141,7 +1109,7 @@ pinos_port_class_init (PinosPortClass * klass)
"The format of the port",
G_TYPE_BYTES,
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
@ -1151,7 +1119,7 @@ pinos_port_class_init (PinosPortClass * klass)
"The properties of the port",
PINOS_TYPE_PROPERTIES,
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));