connection: Dynamically resize connection buffers

When using fixed size connection buffers, if either the client or the
server is sending requests faster than the other end can cope with, the
connection buffers will fill up, eventually killing the connection.

This can be a problem for example with Xwayland mapping a lot of
windows, faster than the Wayland compositor can cope with, or a
high-rate mouse flooding the Wayland client with pointer events.

To avoid the issue, resize the connection buffers dynamically when they
get full.

Both data and fd buffers are resized on demand.

The default max buffer size is controlled via the wl_display interface
while each client's connection buffer size is adjustable for finer
control.

The purpose is to explicitly have larger connection buffers for specific
clients such as Xwayland, or set a larger buffer size for the client
with pointer focus to deal with a higher input events rate.

v0: Manuel:
   Dynamically resize connection buffers - Both data and fd buffers are
   resized on demand.
v1: Olivier
1. Add support for unbounded buffers on the client side and growable
   (yet limited) connection buffers on the server side.
2. Add the API to set the default maximum size and a limit for a given
   client.
3. Add tests for growable connection buffers and adjustable limits.
v2: Additional fixes by John:
1. Fix the size calculation in ring_buffer_check_space()
2. Fix wl_connection_read() to return gracefully once it has read up to
   the max buffer size, rather than returning an error.
3. If wl_connection_flush() fails with EAGAIN but the transmit
   ring-buffer has space remaining (or can be expanded),
   wl_connection_queue() should store the message rather than
   returning an error.
4. When the receive ring-buffer is at capacity but more data is
   available to be read, wl_connection_read() should attempt to
   expand the ring-buffer in order to read the remaining data.
v3: Thomas Lukaszewicz <tluk@chromium.org>
   Add a test for unbounded buffers
v4: Add a client API as well to force bounded buffers (unbounded
    by default (Olivier)
v5: Simplify ring_buffer_ensure_space() (Sebastian)

Co-authored-by: Olivier Fourdan <ofourdan@redhat.com>
Co-authored-by: John Lindgren <john@jlindgren.net>
Co-authored-by: Sebastian Wick <sebastian@sebastianwick.net>
Signed-off-by: Manuel Stoeckl <code@mstoeckl.com>
Signed-off-by: Olivier Fourdan <ofourdan@redhat.com>
Signed-off-by: John Lindgren <john@jlindgren.net>
Signed-off-by: Sebastian Wick <sebastian@sebastianwick.net>
Closes: https://gitlab.freedesktop.org/wayland/wayland/-/issues/237
This commit is contained in:
Manuel Stoeckl 2021-09-25 22:34:44 -04:00 committed by Simon Ser
parent 36cef8653f
commit d074d52902
9 changed files with 460 additions and 87 deletions

View file

@ -26,6 +26,7 @@
#define _GNU_SOURCE
#include <assert.h>
#include <math.h>
#include <stdlib.h>
#include <stdint.h>
@ -55,12 +56,12 @@ div_roundup(uint32_t n, size_t a)
}
struct wl_ring_buffer {
char data[4096];
uint32_t head, tail;
char *data;
size_t head, tail;
uint32_t size_bits;
uint32_t max_size_bits; /* 0 for unlimited */
};
#define MASK(i) ((i) & 4095)
#define MAX_FDS_OUT 28
#define CLEN (CMSG_LEN(MAX_FDS_OUT * sizeof(int32_t)))
@ -71,26 +72,38 @@ struct wl_connection {
int want_flush;
};
static inline size_t
size_pot(uint32_t size_bits)
{
assert(size_bits < 8 * sizeof(size_t));
return ((size_t)1) << size_bits;
}
static size_t
ring_buffer_capacity(const struct wl_ring_buffer *b) {
return size_pot(b->size_bits);
}
static size_t
ring_buffer_mask(const struct wl_ring_buffer *b, size_t i) {
size_t m = ring_buffer_capacity(b) - 1;
return i & m;
}
static int
ring_buffer_put(struct wl_ring_buffer *b, const void *data, size_t count)
{
uint32_t head, size;
if (count > sizeof(b->data)) {
wl_log("Data too big for buffer (%d > %d).\n",
count, sizeof(b->data));
errno = E2BIG;
return -1;
}
size_t head, size;
if (count == 0)
return 0;
head = MASK(b->head);
if (head + count <= sizeof b->data) {
head = ring_buffer_mask(b, b->head);
if (head + count <= ring_buffer_capacity(b)) {
memcpy(b->data + head, data, count);
} else {
size = sizeof b->data - head;
size = ring_buffer_capacity(b) - head;
memcpy(b->data + head, data, size);
memcpy(b->data, (const char *) data + size, count - size);
}
@ -103,21 +116,21 @@ ring_buffer_put(struct wl_ring_buffer *b, const void *data, size_t count)
static void
ring_buffer_put_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
{
uint32_t head, tail;
size_t head, tail;
head = MASK(b->head);
tail = MASK(b->tail);
head = ring_buffer_mask(b, b->head);
tail = ring_buffer_mask(b, b->tail);
if (head < tail) {
iov[0].iov_base = b->data + head;
iov[0].iov_len = tail - head;
*count = 1;
} else if (tail == 0) {
iov[0].iov_base = b->data + head;
iov[0].iov_len = sizeof b->data - head;
iov[0].iov_len = ring_buffer_capacity(b) - head;
*count = 1;
} else {
iov[0].iov_base = b->data + head;
iov[0].iov_len = sizeof b->data - head;
iov[0].iov_len = ring_buffer_capacity(b) - head;
iov[1].iov_base = b->data;
iov[1].iov_len = tail;
*count = 2;
@ -127,21 +140,21 @@ ring_buffer_put_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
static void
ring_buffer_get_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
{
uint32_t head, tail;
size_t head, tail;
head = MASK(b->head);
tail = MASK(b->tail);
head = ring_buffer_mask(b, b->head);
tail = ring_buffer_mask(b, b->tail);
if (tail < head) {
iov[0].iov_base = b->data + tail;
iov[0].iov_len = head - tail;
*count = 1;
} else if (head == 0) {
iov[0].iov_base = b->data + tail;
iov[0].iov_len = sizeof b->data - tail;
iov[0].iov_len = ring_buffer_capacity(b) - tail;
*count = 1;
} else {
iov[0].iov_base = b->data + tail;
iov[0].iov_len = sizeof b->data - tail;
iov[0].iov_len = ring_buffer_capacity(b) - tail;
iov[1].iov_base = b->data;
iov[1].iov_len = head;
*count = 2;
@ -151,29 +164,158 @@ ring_buffer_get_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
static void
ring_buffer_copy(struct wl_ring_buffer *b, void *data, size_t count)
{
uint32_t tail, size;
size_t tail, size;
if (count == 0)
return;
tail = MASK(b->tail);
if (tail + count <= sizeof b->data) {
tail = ring_buffer_mask(b, b->tail);
if (tail + count <= ring_buffer_capacity(b)) {
memcpy(data, b->data + tail, count);
} else {
size = sizeof b->data - tail;
size = ring_buffer_capacity(b) - tail;
memcpy(data, b->data + tail, size);
memcpy((char *) data + size, b->data, count - size);
}
}
static uint32_t
static size_t
ring_buffer_size(struct wl_ring_buffer *b)
{
return b->head - b->tail;
}
static char *
ring_buffer_tail(const struct wl_ring_buffer *b)
{
return b->data + ring_buffer_mask(b, b->tail);
}
static uint32_t
get_max_size_bits_for_size(size_t buffer_size)
{
uint32_t max_size_bits = WL_BUFFER_DEFAULT_SIZE_POT;
/* buffer_size == 0 means unbound buffer size */
if (buffer_size == 0)
return 0;
while (max_size_bits < 8 * sizeof(size_t) && size_pot(max_size_bits) < buffer_size)
max_size_bits++;
return max_size_bits;
}
static int
ring_buffer_allocate(struct wl_ring_buffer *b, size_t size_bits)
{
char *new_data;
new_data = calloc(size_pot(size_bits), 1);
if (!new_data)
return -1;
ring_buffer_copy(b, new_data, ring_buffer_size(b));
free(b->data);
b->data = new_data;
b->size_bits = size_bits;
b->head = ring_buffer_size(b);
b->tail = 0;
return 0;
}
static size_t
ring_buffer_get_bits_for_size(struct wl_ring_buffer *b, size_t net_size)
{
size_t max_size_bits = get_max_size_bits_for_size(net_size);
if (max_size_bits < WL_BUFFER_DEFAULT_SIZE_POT)
max_size_bits = WL_BUFFER_DEFAULT_SIZE_POT;
if (b->max_size_bits > 0 && max_size_bits > b->max_size_bits)
max_size_bits = b->max_size_bits;
return max_size_bits;
}
static bool
ring_buffer_is_max_size_reached(struct wl_ring_buffer *b)
{
size_t net_size = ring_buffer_size(b) + 1;
size_t size_bits = ring_buffer_get_bits_for_size(b, net_size);
return net_size >= size_pot(size_bits);
}
static int
ring_buffer_ensure_space(struct wl_ring_buffer *b, size_t count)
{
size_t net_size = ring_buffer_size(b) + count;
size_t size_bits = ring_buffer_get_bits_for_size(b, net_size);
/* The 'size_bits' value represents the required size (in POT) to store
* 'net_size', which depending whether the buffers are bounded or not
* might not be sufficient (i.e. we might have reached the maximum size
* allowed).
*/
if (net_size > size_pot(size_bits)) {
wl_log("Data too big for buffer (%d + %zd > %zd).\n",
ring_buffer_size(b), count, size_pot(size_bits));
errno = E2BIG;
return -1;
}
/* The following test here is a short-cut to avoid reallocating a buffer
* of the same size.
*/
if (size_bits == b->size_bits)
return 0;
/* Otherwise, we (re)allocate the buffer to match the required size */
return ring_buffer_allocate(b, size_bits);
}
static void
ring_buffer_close_fds(struct wl_ring_buffer *buffer, int32_t count)
{
int32_t i, *p;
size_t size, tail;
size = ring_buffer_capacity(buffer);
tail = ring_buffer_mask(buffer, buffer->tail);
p = (int32_t *) (buffer->data + tail);
for (i = 0; i < count; i++) {
if (p >= (int32_t *) (buffer->data + size))
p = (int32_t *) buffer->data;
close(*p++);
}
}
void
wl_connection_set_max_buffer_size(struct wl_connection *connection,
size_t max_buffer_size)
{
uint32_t max_size_bits;
max_size_bits = get_max_size_bits_for_size(max_buffer_size);
connection->fds_in.max_size_bits = max_size_bits;
ring_buffer_ensure_space(&connection->fds_in, 0);
connection->fds_out.max_size_bits = max_size_bits;
ring_buffer_ensure_space(&connection->fds_out, 0);
connection->in.max_size_bits = max_size_bits;
ring_buffer_ensure_space(&connection->in, 0);
connection->out.max_size_bits = max_size_bits;
ring_buffer_ensure_space(&connection->out, 0);
}
struct wl_connection *
wl_connection_create(int fd)
wl_connection_create(int fd, size_t max_buffer_size)
{
struct wl_connection *connection;
@ -181,6 +323,8 @@ wl_connection_create(int fd)
if (connection == NULL)
return NULL;
wl_connection_set_max_buffer_size(connection, max_buffer_size);
connection->fd = fd;
return connection;
@ -189,20 +333,20 @@ wl_connection_create(int fd)
static void
close_fds(struct wl_ring_buffer *buffer, int max)
{
int32_t fds[sizeof(buffer->data) / sizeof(int32_t)], i, count;
size_t size;
int32_t count;
size = ring_buffer_size(buffer);
if (size == 0)
return;
ring_buffer_copy(buffer, fds, size);
count = size / sizeof fds[0];
count = size / sizeof(int32_t);
if (max > 0 && max < count)
count = max;
size = count * sizeof fds[0];
for (i = 0; i < count; i++)
close(fds[i]);
ring_buffer_close_fds(buffer, count);
size = count * sizeof(int32_t);
buffer->tail += size;
}
@ -218,7 +362,13 @@ wl_connection_destroy(struct wl_connection *connection)
int fd = connection->fd;
close_fds(&connection->fds_out, -1);
free(connection->fds_out.data);
free(connection->out.data);
close_fds(&connection->fds_in, -1);
free(connection->fds_in.data);
free(connection->in.data);
free(connection);
return fd;
@ -262,7 +412,7 @@ static int
decode_cmsg(struct wl_ring_buffer *buffer, struct msghdr *msg)
{
struct cmsghdr *cmsg;
size_t size, max, i;
size_t size, i;
int overflow = 0;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL;
@ -272,8 +422,8 @@ decode_cmsg(struct wl_ring_buffer *buffer, struct msghdr *msg)
continue;
size = cmsg->cmsg_len - CMSG_LEN(0);
max = sizeof(buffer->data) - ring_buffer_size(buffer);
if (size > max || overflow) {
if (ring_buffer_ensure_space(buffer, size) < 0 || overflow) {
overflow = 1;
size /= sizeof(int32_t);
for (i = 0; i < size; i++)
@ -299,17 +449,40 @@ wl_connection_flush(struct wl_connection *connection)
char cmsg[CLEN];
int len = 0, count;
size_t clen;
uint32_t tail;
size_t tail;
if (!connection->want_flush)
return 0;
tail = connection->out.tail;
while (connection->out.head - connection->out.tail > 0) {
ring_buffer_get_iov(&connection->out, iov, &count);
while (ring_buffer_size(&connection->out) > 0) {
build_cmsg(&connection->fds_out, cmsg, &clen);
if (clen >= CLEN) {
/* UNIX domain sockets allows to send file descriptors
* using ancillary data.
*
* As per the UNIX domain sockets man page (man 7 unix),
* "at least one byte of real data should be sent when
* sending ancillary data".
*
* This is why we send only a single byte here, to ensure
* all file descriptors are sent before the bytes are
* cleared out.
*
* Otherwise This can fail to clear the file descriptors
* first if individual messages are allowed to have 224
* (8 bytes * MAX_FDS_OUT = 224) file descriptors .
*/
iov[0].iov_base = ring_buffer_tail(&connection->out);
iov[0].iov_len = 1;
count = 1;
} else {
ring_buffer_get_iov(&connection->out, iov, &count);
}
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = count;
msg.msg_control = (clen > 0) ? cmsg : NULL;
@ -347,35 +520,48 @@ wl_connection_read(struct wl_connection *connection)
char cmsg[CLEN];
int len, count, ret;
if (ring_buffer_size(&connection->in) >= sizeof(connection->in.data)) {
errno = EOVERFLOW;
return -1;
while (1) {
int data_size = ring_buffer_size(&connection->in);
/* Stop once we've read the max buffer size. */
if (ring_buffer_is_max_size_reached(&connection->in))
return data_size;
if (ring_buffer_ensure_space(&connection->in, 1) < 0)
return -1;
ring_buffer_put_iov(&connection->in, iov, &count);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = count;
msg.msg_control = cmsg;
msg.msg_controllen = sizeof cmsg;
msg.msg_flags = 0;
do {
len = wl_os_recvmsg_cloexec(connection->fd, &msg, MSG_DONTWAIT);
} while (len < 0 && errno == EINTR);
if (len == 0) {
/* EOF, return previously read data first */
return data_size;
}
if (len < 0) {
if (errno == EAGAIN && data_size > 0) {
/* nothing new read, return previously read data */
return data_size;
}
return len;
}
ret = decode_cmsg(&connection->fds_in, &msg);
if (ret)
return -1;
connection->in.head += len;
}
ring_buffer_put_iov(&connection->in, iov, &count);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = count;
msg.msg_control = cmsg;
msg.msg_controllen = sizeof cmsg;
msg.msg_flags = 0;
do {
len = wl_os_recvmsg_cloexec(connection->fd, &msg, MSG_DONTWAIT);
} while (len < 0 && errno == EINTR);
if (len <= 0)
return len;
ret = decode_cmsg(&connection->fds_in, &msg);
if (ret)
return -1;
connection->in.head += len;
return wl_connection_pending_input(connection);
}
int
@ -394,13 +580,23 @@ int
wl_connection_queue(struct wl_connection *connection,
const void *data, size_t count)
{
if (connection->out.head - connection->out.tail +
count > ARRAY_LENGTH(connection->out.data)) {
/* We want to try to flush when the buffer reaches the default maximum
* size even if the buffer has been previously expanded.
*
* Otherwise the larger buffer will cause us to flush less frequently,
* which could increase lag.
*
* We'd like to flush often and get the buffer size back down if possible.
*/
if (ring_buffer_size(&connection->out) + count > WL_BUFFER_DEFAULT_MAX_SIZE) {
connection->want_flush = 1;
if (wl_connection_flush(connection) < 0)
if (wl_connection_flush(connection) < 0 && errno != EAGAIN)
return -1;
}
if (ring_buffer_ensure_space(&connection->out, count) < 0)
return -1;
return ring_buffer_put(&connection->out, data, count);
}
@ -426,12 +622,15 @@ wl_connection_get_fd(struct wl_connection *connection)
static int
wl_connection_put_fd(struct wl_connection *connection, int32_t fd)
{
if (ring_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd) {
if (ring_buffer_size(&connection->fds_out) >= MAX_FDS_OUT * sizeof fd) {
connection->want_flush = 1;
if (wl_connection_flush(connection) < 0)
if (wl_connection_flush(connection) < 0 && errno != EAGAIN)
return -1;
}
if (ring_buffer_ensure_space(&connection->fds_out, sizeof fd) < 0)
return -1;
return ring_buffer_put(&connection->fds_out, &fd, sizeof fd);
}