dynamically resize connection buffers

This commit is contained in:
Wim Taymans 2016-10-18 11:11:38 +02:00
parent d711e15f0a
commit 7ee66cfc35
3 changed files with 108 additions and 104 deletions

View file

@ -28,25 +28,27 @@
#include "connection.h" #include "connection.h"
#include "serialize.h" #include "serialize.h"
#define MAX_BUFFER_SIZE 4096 #define MAX_BUFFER_SIZE 1024
#define MAX_FDS 28 #define MAX_FDS 28
typedef struct { typedef struct {
uint8_t buffer_data[MAX_BUFFER_SIZE]; uint8_t *buffer_data;
size_t buffer_size; size_t buffer_size;
int fds[MAX_FDS]; size_t buffer_maxsize;
unsigned int n_fds; int fds[MAX_FDS];
unsigned int n_fds;
SpaControlCmd cmd; SpaControlCmd cmd;
off_t offset; off_t offset;
void *data; void *data;
size_t size; size_t size;
bool update;
} ConnectionBuffer; } ConnectionBuffer;
struct _SpaConnection { struct _SpaConnection {
ConnectionBuffer in, out; ConnectionBuffer in, out;
int fd; int fd;
bool update;
}; };
#if 0 #if 0
@ -58,26 +60,22 @@ struct _SpaConnection {
static bool static bool
read_length (uint8_t * data, unsigned int size, size_t * length, size_t * skip) read_length (uint8_t * data, unsigned int size, size_t * length, size_t * skip)
{ {
size_t len, offset;
uint8_t b; uint8_t b;
/* start reading the length, we need this to skip to the data later */ /* start reading the length, we need this to skip to the data later */
len = offset = 0; *length = *skip = 0;
do { do {
if (offset >= size) if (*skip >= size)
return false; return false;
b = data[offset++]; b = data[(*skip)++];
len = (len << 7) | (b & 0x7f); *length = (*length << 7) | (b & 0x7f);
} while (b & 0x80); } while (b & 0x80);
/* check remaining command size */ /* check remaining command size */
if (size - offset < len) if (size - *skip < *length)
return false; return false;
*length = len;
*skip = offset;
return true; return true;
} }
@ -160,12 +158,14 @@ connection_parse_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm
#define MAX(a,b) ((a) > (b) ? (a) : (b)) #define MAX(a,b) ((a) > (b) ? (a) : (b))
static void * static void *
connection_ensure_size (SpaConnection *conn, size_t size) connection_ensure_size (SpaConnection *conn, ConnectionBuffer *buf, size_t size)
{ {
if (conn->out.buffer_size + size > MAX_BUFFER_SIZE) { if (buf->buffer_size + size > buf->buffer_maxsize) {
fprintf (stderr, "error connection %p: overflow\n", conn); buf->buffer_maxsize = buf->buffer_size + MAX_BUFFER_SIZE * ((size + MAX_BUFFER_SIZE-1) / MAX_BUFFER_SIZE);
buf->buffer_data = realloc (buf->buffer_data, buf->buffer_maxsize);
fprintf (stderr, "connection %p: resize buffer to %zd\n", conn, buf->buffer_maxsize);
} }
return (uint8_t *) conn->out.buffer_data + conn->out.buffer_size; return (uint8_t *) buf->buffer_data + buf->buffer_size;
} }
static void * static void *
@ -173,17 +173,18 @@ connection_add_cmd (SpaConnection *conn, SpaControlCmd cmd, size_t size)
{ {
uint8_t *p; uint8_t *p;
unsigned int plen; unsigned int plen;
ConnectionBuffer *buf = &conn->out;
plen = 1; plen = 1;
while (size >> (7 * plen)) while (size >> (7 * plen))
plen++; plen++;
/* 1 for cmd, plen for size and size for payload */ /* 1 for cmd, plen for size and size for payload */
p = connection_ensure_size (conn, 1 + plen + size); p = connection_ensure_size (conn, buf, 1 + plen + size);
conn->out.cmd = cmd; buf->cmd = cmd;
conn->out.offset = conn->out.buffer_size; buf->offset = buf->buffer_size;
conn->out.buffer_size += 1 + plen + size; buf->buffer_size += 1 + plen + size;
*p++ = cmd; *p++ = cmd;
/* write length */ /* write length */
@ -382,7 +383,7 @@ connection_add_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm)
} }
static SpaResult static SpaResult
refill_buffer (SpaConnection *conn) refill_buffer (SpaConnection *conn, ConnectionBuffer *buf)
{ {
ssize_t len; ssize_t len;
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
@ -390,13 +391,8 @@ refill_buffer (SpaConnection *conn)
struct iovec iov[1]; struct iovec iov[1];
char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))]; char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))];
conn->in.cmd = SPA_CONTROL_CMD_INVALID; iov[0].iov_base = buf->buffer_data + buf->buffer_size;
conn->in.offset = 0; iov[0].iov_len = buf->buffer_maxsize - buf->buffer_size;
conn->in.size = 0;
conn->in.buffer_size = 0;
iov[0].iov_base = conn->in.buffer_data;
iov[0].iov_len = MAX_BUFFER_SIZE;
msg.msg_iov = iov; msg.msg_iov = iov;
msg.msg_iovlen = 1; msg.msg_iovlen = 1;
msg.msg_control = cmsgbuf; msg.msg_control = cmsgbuf;
@ -417,17 +413,17 @@ refill_buffer (SpaConnection *conn)
if (len < 4) if (len < 4)
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
conn->in.buffer_size = len; buf->buffer_size += len;
/* handle control messages */ /* handle control messages */
for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) { for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
continue; continue;
conn->in.n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int); buf->n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int);
memcpy (conn->in.fds, CMSG_DATA (cmsg), conn->in.n_fds * sizeof (int)); memcpy (buf->fds, CMSG_DATA (cmsg), buf->n_fds * sizeof (int));
} }
SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, conn->in.n_fds); SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, buf->n_fds);
return SPA_RESULT_OK; return SPA_RESULT_OK;
@ -439,6 +435,24 @@ recv_error:
} }
} }
static void
clear_buffer (ConnectionBuffer *buf)
{
unsigned int i;
for (i = 0; i < buf->n_fds; i++) {
if (buf->fds[i] > 0) {
if (close (buf->fds[i]) < 0)
perror ("close");
}
}
buf->n_fds = 0;
buf->cmd = SPA_CONTROL_CMD_INVALID;
buf->offset = 0;
buf->size = 0;
buf->buffer_size = 0;
}
SpaConnection * SpaConnection *
spa_connection_new (int fd) spa_connection_new (int fd)
{ {
@ -446,10 +460,23 @@ spa_connection_new (int fd)
c = calloc (1, sizeof (SpaConnection)); c = calloc (1, sizeof (SpaConnection));
c->fd = fd; c->fd = fd;
c->out.buffer_data = malloc (MAX_BUFFER_SIZE);
c->out.buffer_maxsize = MAX_BUFFER_SIZE;
c->in.buffer_data = malloc (MAX_BUFFER_SIZE);
c->in.buffer_maxsize = MAX_BUFFER_SIZE;
c->in.update = true;
return c; return c;
} }
void
spa_connection_free (SpaConnection *conn)
{
free (conn->out.buffer_data);
free (conn->in.buffer_data);
free (conn);
}
/** /**
* spa_connection_has_next: * spa_connection_has_next:
* @iter: a #SpaConnection * @iter: a #SpaConnection
@ -463,44 +490,47 @@ spa_connection_has_next (SpaConnection *conn)
{ {
size_t len, size, skip; size_t len, size, skip;
uint8_t *data; uint8_t *data;
ConnectionBuffer *buf;
if (conn == NULL) if (conn == NULL)
return SPA_RESULT_INVALID_ARGUMENTS; return SPA_RESULT_INVALID_ARGUMENTS;
/* move to next packet */ buf = &conn->in;
conn->in.offset += conn->in.size;
if (conn->update) { /* move to next packet */
refill_buffer (conn); buf->offset += buf->size;
conn->update = false;
again:
if (buf->update) {
refill_buffer (conn, buf);
buf->update = false;
} }
/* now read packet */ /* now read packet */
data = conn->in.buffer_data; data = buf->buffer_data;
size = conn->in.buffer_size; size = buf->buffer_size;
if (conn->in.offset >= size) { if (buf->offset >= size) {
conn->update = true; clear_buffer (buf);
buf->update = true;
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
data += conn->in.offset; data += buf->offset;
size -= conn->in.offset; size -= buf->offset;
if (size < 1)
return SPA_RESULT_ERROR;
conn->in.cmd = *data;
buf->cmd = *data;
data++; data++;
size--; size--;
if (!read_length (data, size, &len, &skip)) if (!read_length (data, size, &len, &skip)) {
return SPA_RESULT_ERROR; connection_ensure_size (conn, buf, len + skip);
buf->update = true;
conn->in.size = len; goto again;
conn->in.data = data + skip; }
conn->in.offset += 1 + skip; buf->size = len;
buf->data = data + skip;
buf->offset += 1 + skip;
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -619,10 +649,7 @@ spa_connection_get_fd (SpaConnection *conn,
{ {
int fd; int fd;
if (conn == NULL) if (conn == NULL || conn->in.n_fds < index)
return -1;
if (conn->in.n_fds < index)
return -1; return -1;
fd = conn->in.fds[index]; fd = conn->in.fds[index];
@ -766,21 +793,24 @@ spa_connection_flush (SpaConnection *conn)
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))]; char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))];
int *cm, i, fds_len; int *cm, i, fds_len;
ConnectionBuffer *buf;
if (conn == NULL) if (conn == NULL)
return SPA_RESULT_INVALID_ARGUMENTS; return SPA_RESULT_INVALID_ARGUMENTS;
if (conn->out.buffer_size == 0) buf = &conn->out;
if (buf->buffer_size == 0)
return SPA_RESULT_OK; return SPA_RESULT_OK;
fds_len = conn->out.n_fds * sizeof (int); fds_len = buf->n_fds * sizeof (int);
iov[0].iov_base = conn->out.buffer_data; iov[0].iov_base = buf->buffer_data;
iov[0].iov_len = conn->out.buffer_size; iov[0].iov_len = buf->buffer_size;
msg.msg_iov = iov; msg.msg_iov = iov;
msg.msg_iovlen = 1; msg.msg_iovlen = 1;
if (conn->out.n_fds > 0) { if (buf->n_fds > 0) {
msg.msg_control = cmsgbuf; msg.msg_control = cmsgbuf;
msg.msg_controllen = CMSG_SPACE (fds_len); msg.msg_controllen = CMSG_SPACE (fds_len);
cmsg = CMSG_FIRSTHDR(&msg); cmsg = CMSG_FIRSTHDR(&msg);
@ -788,8 +818,8 @@ spa_connection_flush (SpaConnection *conn)
cmsg->cmsg_type = SCM_RIGHTS; cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN (fds_len); cmsg->cmsg_len = CMSG_LEN (fds_len);
cm = (int*)CMSG_DATA (cmsg); cm = (int*)CMSG_DATA (cmsg);
for (i = 0; i < conn->out.n_fds; i++) for (i = 0; i < buf->n_fds; i++)
cm[i] = conn->out.fds[i] > 0 ? conn->out.fds[i] : -conn->out.fds[i]; cm[i] = buf->fds[i] > 0 ? buf->fds[i] : -buf->fds[i];
msg.msg_controllen = cmsg->cmsg_len; msg.msg_controllen = cmsg->cmsg_len;
} else { } else {
msg.msg_control = NULL; msg.msg_control = NULL;
@ -806,10 +836,10 @@ spa_connection_flush (SpaConnection *conn)
} }
break; break;
} }
conn->out.buffer_size -= len; buf->buffer_size -= len;
conn->out.n_fds = 0; buf->n_fds = 0;
SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, conn->out.n_fds); SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, buf->n_fds);
return SPA_RESULT_OK; return SPA_RESULT_OK;
@ -824,27 +854,12 @@ send_error:
SpaResult SpaResult
spa_connection_clear (SpaConnection *conn) spa_connection_clear (SpaConnection *conn)
{ {
unsigned int i;
if (conn == NULL) if (conn == NULL)
return SPA_RESULT_INVALID_ARGUMENTS; return SPA_RESULT_INVALID_ARGUMENTS;
for (i = 0; i < conn->out.n_fds; i++) { clear_buffer (&conn->out);
if (conn->out.fds[i] > 0) { clear_buffer (&conn->in);
if (close (conn->out.fds[i]) < 0) conn->in.update = true;
perror ("close");
}
}
conn->out.n_fds = 0;
conn->out.buffer_size = 0;
for (i = 0; i < conn->in.n_fds; i++) {
if (conn->in.fds[i] > 0) {
if (close (conn->in.fds[i]) < 0)
perror ("close");
}
}
conn->in.n_fds = 0;
conn->in.buffer_size = 0;
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }

View file

@ -662,7 +662,7 @@ add_port_update (PinosStream *stream, uint32_t change_mask)
} }
static void static void
add_need_input (PinosStream *stream, uint32_t port_id) send_need_input (PinosStream *stream, uint32_t port_id)
{ {
PinosStreamPrivate *priv = stream->priv; PinosStreamPrivate *priv = stream->priv;
SpaControlCmdNodeEvent cne; SpaControlCmdNodeEvent cne;
@ -675,14 +675,6 @@ add_need_input (PinosStream *stream, uint32_t port_id)
ne.size = sizeof (ni); ne.size = sizeof (ni);
ni.port_id = port_id; ni.port_id = port_id;
spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne); spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne);
}
static void
send_need_input (PinosStream *stream, uint32_t port_id)
{
PinosStreamPrivate *priv = stream->priv;
add_need_input (stream, port_id);
if (spa_connection_flush (priv->rtconn) < 0) if (spa_connection_flush (priv->rtconn) < 0)
g_warning ("stream %p: error writing connection", stream); g_warning ("stream %p: error writing connection", stream);
@ -917,11 +909,8 @@ handle_node_command (PinosStream *stream,
if (spa_connection_flush (priv->conn) < 0) if (spa_connection_flush (priv->conn) < 0)
g_warning ("stream %p: error writing connection", stream); g_warning ("stream %p: error writing connection", stream);
if (priv->direction == SPA_DIRECTION_INPUT) { if (priv->direction == SPA_DIRECTION_INPUT)
add_need_input (stream, priv->port_id); send_need_input (stream, priv->port_id);
if (spa_connection_flush (priv->rtconn) < 0)
g_warning ("stream %p: error writing connection", stream);
}
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break; break;

View file

@ -1048,7 +1048,7 @@ parse_connection (SpaProxy *this)
case SPA_CONTROL_CMD_SET_PROPERTY: case SPA_CONTROL_CMD_SET_PROPERTY:
case SPA_CONTROL_CMD_NODE_COMMAND: case SPA_CONTROL_CMD_NODE_COMMAND:
case SPA_CONTROL_CMD_PROCESS_BUFFER: case SPA_CONTROL_CMD_PROCESS_BUFFER:
spa_log_error (this->log, "proxy %p: got unexpected connection %d\n", this, cmd); spa_log_error (this->log, "proxy %p: got unexpected command %d\n", this, cmd);
break; break;
case SPA_CONTROL_CMD_NODE_UPDATE: case SPA_CONTROL_CMD_NODE_UPDATE: