remove client object

Remove the client object, it is not very useful now that we have the
nodes.
Fix some properties on the proxy objects.
Use sendmsg and recvmsg directly because the GIO ones do allocations.
make pinos_properties_merge and use it to combine properties from nodes
and ports.
This commit is contained in:
Wim Taymans 2016-05-18 17:22:34 +02:00
parent 60475165d6
commit 5f10a933a1
22 changed files with 298 additions and 1030 deletions

View file

@ -56,14 +56,16 @@ struct _PinosPortPrivate
GBytes *format;
PinosProperties *properties;
int fd;
GSource *socket_source;
PinosBuffer recv_buffer;
guint8 recv_data[MAX_BUFFER_SIZE];
gint recv_fds[MAX_FDS];
int recv_fds[MAX_FDS];
guint8 send_data[MAX_BUFFER_SIZE];
gint send_fds[MAX_FDS];
int send_fds[MAX_FDS];
PinosBuffer *buffer;
PinosPort *peers[16];
@ -463,14 +465,13 @@ read_buffer (PinosPort *port,
{
PinosPortPrivate *priv = port->priv;
gssize len;
GInputVector ivec;
PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
PinosStackBuffer *sb = (PinosStackBuffer *) &priv->recv_buffer;
gint num_messages = 0;
gint flags = 0;
gsize need;
gint i;
struct cmsghdr *cmsg;
struct msghdr msg = {0};
struct iovec iov[1];
char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))];
g_assert (sb->refcount == 0);
@ -479,28 +480,31 @@ read_buffer (PinosPort *port,
sb->size = 0;
sb->free_data = NULL;
sb->fds = priv->recv_fds;
sb->max_fds = MAX_FDS;
sb->max_fds = 0;
sb->n_fds = 0;
sb->free_fds = NULL;
hdr = sb->data;
/* read header first */
ivec.buffer = hdr;
ivec.size = sizeof (PinosStackHeader);
len = g_socket_receive_message (priv->sockets[0],
NULL,
&ivec,
1,
&messages,
&num_messages,
&flags,
NULL,
error);
if (len == -1)
return NULL;
/* read header and control messages first */
iov[0].iov_base = hdr;
iov[0].iov_len = sizeof (PinosStackHeader);;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof (cmsgbuf);
msg.msg_flags = MSG_CMSG_CLOEXEC;
while (TRUE) {
len = recvmsg (priv->fd, &msg, msg.msg_flags);
if (len < 0) {
if (errno == EINTR)
continue;
else
goto recv_error;
}
break;
}
g_assert (len == sizeof (PinosStackHeader));
/* now we know the total length */
@ -515,103 +519,94 @@ read_buffer (PinosPort *port,
if (hdr->length > 0) {
/* read data */
len = g_socket_receive (priv->sockets[0],
(gchar *)sb->data + sizeof (PinosStackHeader),
hdr->length,
NULL,
error);
if (len == -1)
return NULL;
while (TRUE) {
len = recv (priv->fd, (gchar *)sb->data + sizeof (PinosStackHeader), hdr->length, 0);
if (len < 0) {
if (errno == EINTR)
continue;
else
goto recv_error;
}
break;
}
g_assert (len == hdr->length);
}
if (sb->max_fds < num_messages) {
g_warning ("port %p: realloc receive fds %d -> %d", port, sb->max_fds, num_messages);
sb->max_fds = num_messages;
sb->fds = sb->free_fds = g_realloc (sb->free_fds, num_messages * sizeof (int));
}
/* handle control messages */
for (i = 0; i < num_messages; i++) {
GSocketControlMessage *msg = messages[i];
gint *fds, n_fds, j;
if (g_socket_control_message_get_msg_type (msg) != SCM_RIGHTS)
for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
continue;
fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds);
for (j = 0; j < n_fds; j++)
sb->fds[i] = fds[i];
sb->n_fds = n_fds;
g_free (fds);
g_object_unref (msg);
sb->n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int);
memcpy (sb->fds, CMSG_DATA (cmsg), sb->n_fds * sizeof (int));
}
g_free (messages);
sb->refcount = 1;
sb->magic = PSB_MAGIC;
PINOS_DEBUG_TRANSPORT ("port %p: buffer %p init", &priv->recv_buffer, sb);
return &priv->recv_buffer;
/* ERRORS */
recv_error:
{
g_set_error (error,
G_IO_ERROR,
g_io_error_from_errno (errno),
"could not recvmsg: %s", strerror (errno));
return NULL;
}
}
static gboolean
write_buffer (GSocket *socket,
write_buffer (PinosPort *port,
PinosBuffer *buffer,
GError **error)
{
gssize len;
PinosPortPrivate *priv = port->priv;
PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
GOutputVector ovec[1];
GSocketControlMessage *msg = NULL;
gint n_msg, i, flags = 0;
gssize len;
struct msghdr msg = {0};
struct iovec iov[1];
struct cmsghdr *cmsg;
char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))];
gint fds_len = sb->n_fds * sizeof (int);
g_return_val_if_fail (buffer != NULL, FALSE);
iov[0].iov_base = sb->data;
iov[0].iov_len = sb->size;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsgbuf;
msg.msg_controllen = CMSG_SPACE (fds_len);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN (fds_len);
memcpy(CMSG_DATA(cmsg), sb->fds, fds_len);
msg.msg_controllen = cmsg->cmsg_len;
ovec[0].buffer = sb->data;
ovec[0].size = sb->size;
if (sb->n_fds) {
msg = g_unix_fd_message_new ();
for (i = 0; i < sb->n_fds; i++) {
if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), sb->fds[i], error))
goto append_failed;
while (TRUE) {
len = sendmsg (priv->fd, &msg, 0);
if (len < 0) {
if (errno == EINTR)
continue;
else
goto send_error;
}
n_msg = 1;
break;
}
else {
n_msg = 0;
}
len = g_socket_send_message (socket,
NULL,
ovec,
1,
&msg,
n_msg,
flags,
NULL,
error);
g_clear_object (&msg);
if (len == -1)
goto send_error;
g_assert (len == (gssize) sb->size);
return TRUE;
append_failed:
{
g_warning ("failed to append fd: %s", error ? (*error)->message : "unknown reason" );
return FALSE;
}
/* ERRORS */
send_error:
{
g_warning ("failed to send message: %s", error ? (*error)->message : "unknown reason" );
g_set_error (error,
G_IO_ERROR,
g_io_error_from_errno (errno),
"could not sendmsg: %s", strerror (errno));
return FALSE;
}
}
@ -661,7 +656,7 @@ pinos_port_receive_buffer (PinosPort *port,
if (priv->sockets[0]) {
PINOS_DEBUG_TRANSPORT ("port %p: write buffer %p", port, buffer);
res = write_buffer (priv->sockets[0], buffer, error);
res = write_buffer (port, buffer, error);
}
else {
res = TRUE;
@ -750,6 +745,7 @@ handle_socket (PinosPort *port, GSocket *socket)
GMainContext *context = g_main_context_get_thread_default();
g_debug ("port %p: handle socket in context %p", port, context);
priv->fd = g_socket_get_fd (socket);
priv->socket_source = g_socket_create_source (socket, G_IO_IN, NULL);
g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, port, NULL);
g_source_attach (priv->socket_source, context);
@ -764,6 +760,7 @@ unhandle_socket (PinosPort *port)
if (priv->socket_source) {
g_source_destroy (priv->socket_source);
g_clear_pointer (&priv->socket_source, g_source_unref);
priv->fd = -1;
}
}
@ -819,6 +816,7 @@ pinos_port_send_buffer (PinosPort *port,
PinosPort *peer;
gboolean res = TRUE;
gint i;
GError *err = NULL;
g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
priv = port->priv;
@ -829,14 +827,20 @@ pinos_port_send_buffer (PinosPort *port,
PINOS_DEBUG_TRANSPORT ("port %p: send buffer %p", port, buffer);
if (priv->sockets[0]) {
PINOS_DEBUG_TRANSPORT ("port %p: write buffer %p", port, buffer);
res = write_buffer (priv->sockets[0], buffer, error);
res = write_buffer (port, buffer, &err);
}
for (i = 0; i < priv->n_peers; i++) {
peer = priv->peers[i];
if (peer == NULL)
continue;
res = pinos_port_receive_buffer (peer, buffer, error);
res = pinos_port_receive_buffer (peer, buffer, &err);
}
if (!res) {
if (error == NULL)
g_warning ("could not send buffer: %s", err->message);
g_propagate_error (error, err);
}
return res;
}