stream: work on stream upload

This commit is contained in:
Wim Taymans 2016-07-25 15:55:56 +02:00
parent f06b991a5a
commit b795fb851f
9 changed files with 224 additions and 36 deletions

View file

@ -20,6 +20,7 @@
#include <string.h>
#include <sys/socket.h>
#include <errno.h>
#include <sys/mman.h>
#include <gio/gunixfdlist.h>
@ -37,6 +38,15 @@
#define MAX_BUFFER_SIZE 1024
#define MAX_FDS 16
typedef struct {
guint32 id;
guint32 type;
int fd;
guint64 offset;
guint64 size;
void *data;
} MemBlock;
struct _PinosClientNodePrivate
{
int fd;
@ -50,6 +60,8 @@ struct _PinosClientNodePrivate
guint8 send_data[MAX_BUFFER_SIZE];
int send_fds[MAX_FDS];
GHashTable *mem_ids;
};
#define PINOS_CLIENT_NODE_GET_PRIVATE(obj) \
@ -100,6 +112,13 @@ pinos_client_node_set_property (GObject *_object,
}
}
static void
free_mem_block (MemBlock *b)
{
munmap (b->data, b->size);
g_slice_free (MemBlock, b);
}
static gboolean
parse_buffer (PinosClientNode *cnode,
PinosBuffer *pbuf)
@ -122,6 +141,8 @@ parse_buffer (PinosClientNode *cnode,
if (!pinos_buffer_iter_parse_format_change (&it, &p))
break;
g_debug ("format change port %d", p.port);
if (!(port = pinos_node_find_port (node, p.port)))
break;
@ -197,6 +218,62 @@ parse_buffer (PinosClientNode *cnode,
}
break;
}
case PINOS_PACKET_TYPE_ADD_MEM:
{
PinosPacketAddMem p;
MemBlock *b;
int fd;
if (!pinos_buffer_iter_parse_add_mem (&it, &p))
break;
fd = pinos_buffer_get_fd (pbuf, p.fd_index);
if (fd == -1)
break;
b = g_slice_new0 (MemBlock);
b->id = p.id;
b->type = p.type;
b->fd = fd;
b->data = mmap (NULL, p.size, PROT_READ, MAP_PRIVATE, fd, p.offset);
b->offset = p.offset;
b->size = p.size;
g_hash_table_insert (priv->mem_ids, GINT_TO_POINTER (p.id), b);
break;
}
case PINOS_PACKET_TYPE_REMOVE_MEM:
{
PinosPacketRemoveMem p;
if (!pinos_buffer_iter_parse_remove_mem (&it, &p))
break;
g_hash_table_remove (priv->mem_ids, GINT_TO_POINTER (p.id));
break;
}
case PINOS_PACKET_TYPE_PROCESS_MEM:
{
PinosPacketProcessMem p;
GError *error = NULL;
if (!pinos_buffer_iter_parse_process_mem (&it, &p))
break;
if (!(port = pinos_node_find_port (node, p.port)))
break;
if (!pinos_port_send_buffer (port, pbuf, &error)) {
g_warning ("client-node %p: port %p failed to receive buffer: %s", node, port, error->message);
g_clear_error (&error);
}
break;
}
case PINOS_PACKET_TYPE_HEADER:
{
break;
}
case PINOS_PACKET_TYPE_REUSE_MEM:
{
break;
@ -390,8 +467,10 @@ static void
pinos_client_node_finalize (GObject * object)
{
PinosClientNode *node = PINOS_CLIENT_NODE (object);
PinosClientNodePrivate *priv = node->priv;
g_debug ("client-node %p: finalize", node);
g_hash_table_unref (priv->mem_ids);
G_OBJECT_CLASS (pinos_client_node_parent_class)->finalize (object);
}
@ -427,7 +506,8 @@ pinos_client_node_class_init (PinosClientNodeClass * klass)
static void
pinos_client_node_init (PinosClientNode * node)
{
node->priv = PINOS_CLIENT_NODE_GET_PRIVATE (node);
PinosClientNodePrivate *priv = node->priv = PINOS_CLIENT_NODE_GET_PRIVATE (node);
g_debug ("client-node %p: new", node);
priv->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) free_mem_block);
}