connection: split messages to work around fd limit

Split messages in multiple parts when there are too many fds to
send in one message.
This commit is contained in:
Wim Taymans 2018-09-11 12:01:19 +02:00
parent 968192f9a9
commit 593daa36d4
3 changed files with 66 additions and 49 deletions

View file

@ -31,6 +31,8 @@
#define HEIGHT 480 #define HEIGHT 480
#define BPP 3 #define BPP 3
#define MAX_BUFFERS 64
#include "sdl.h" #include "sdl.h"
struct data { struct data {
@ -170,7 +172,7 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
params[0] = spa_pod_builder_object(&b, params[0] = spa_pod_builder_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
SPA_PARAM_BUFFERS_buffers, &SPA_POD_CHOICE_RANGE_Int(8, 2, 16), SPA_PARAM_BUFFERS_buffers, &SPA_POD_CHOICE_RANGE_Int(8, 2, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, &SPA_POD_Int(1), SPA_PARAM_BUFFERS_blocks, &SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, &SPA_POD_Int(data->stride * data->format.size.height), SPA_PARAM_BUFFERS_size, &SPA_POD_Int(data->stride * data->format.size.height),
SPA_PARAM_BUFFERS_stride, &SPA_POD_Int(data->stride), SPA_PARAM_BUFFERS_stride, &SPA_POD_Int(data->stride),

View file

@ -33,7 +33,8 @@
#include "connection.h" #include "connection.h"
#define MAX_BUFFER_SIZE (1024 * 32) #define MAX_BUFFER_SIZE (1024 * 32)
#define MAX_FDS 28 #define MAX_FDS 1024
#define MAX_FDS_MSG 28
static bool debug_messages = 0; static bool debug_messages = 0;
@ -135,7 +136,7 @@ static bool refill_buffer(struct pw_protocol_native_connection *conn, struct buf
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
struct msghdr msg = { 0 }; struct msghdr msg = { 0 };
struct iovec iov[1]; struct iovec iov[1];
char cmsgbuf[CMSG_SPACE(MAX_FDS * sizeof(int))]; char cmsgbuf[CMSG_SPACE(MAX_FDS_MSG * sizeof(int))];
int n_fds = 0; int n_fds = 0;
iov[0].iov_base = buf->buffer_data + buf->buffer_size; iov[0].iov_base = buf->buffer_data + buf->buffer_size;
@ -424,58 +425,75 @@ pw_protocol_native_connection_end(struct pw_protocol_native_connection *conn,
int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *conn) int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *conn)
{ {
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
ssize_t len; ssize_t sent, outsize;
struct msghdr msg = { 0 }; struct msghdr msg = { 0 };
struct iovec iov[1]; struct iovec iov[1];
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
char cmsgbuf[CMSG_SPACE(MAX_FDS * sizeof(int))]; char cmsgbuf[CMSG_SPACE(MAX_FDS_MSG * sizeof(int))];
int *cm, res = 0; int *cm, res = 0, *fds;
uint32_t i, fds_len; uint32_t i, fds_len, n_fds, outfds;
struct buffer *buf; struct buffer *buf;
void *data;
size_t size;
buf = &impl->out; buf = &impl->out;
data = buf->buffer_data;
size = buf->buffer_size;
fds = buf->fds;
n_fds = buf->n_fds;
if (buf->buffer_size == 0) while (size > 0) {
return 0; if (n_fds > MAX_FDS_MSG) {
outfds = MAX_FDS_MSG;
fds_len = buf->n_fds * sizeof(int); outsize = SPA_MIN(sizeof(uint32_t), size);
} else {
iov[0].iov_base = buf->buffer_data; outfds = n_fds;
iov[0].iov_len = buf->buffer_size; outsize = size;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
if (buf->n_fds > 0) {
msg.msg_control = cmsgbuf;
msg.msg_controllen = CMSG_SPACE(fds_len);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(fds_len);
cm = (int *) CMSG_DATA(cmsg);
for (i = 0; i < buf->n_fds; i++)
cm[i] = buf->fds[i] > 0 ? buf->fds[i] : -buf->fds[i];
msg.msg_controllen = cmsg->cmsg_len;
} else {
msg.msg_control = NULL;
msg.msg_controllen = 0;
}
while (true) {
len = sendmsg(conn->fd, &msg, MSG_NOSIGNAL | MSG_DONTWAIT);
if (len < 0) {
if (errno == EINTR)
continue;
else
goto send_error;
} }
break;
}
pw_log_trace("connection %p: %d written %zd bytes and %u fds", conn, conn->fd, len,
buf->n_fds);
buf->buffer_size -= len; fds_len = outfds * sizeof(int);
buf->n_fds = 0;
iov[0].iov_base = data;
iov[0].iov_len = outsize;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
if (outfds > 0) {
msg.msg_control = cmsgbuf;
msg.msg_controllen = CMSG_SPACE(fds_len);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(fds_len);
cm = (int *) CMSG_DATA(cmsg);
for (i = 0; i < outfds; i++)
cm[i] = fds[i] > 0 ? fds[i] : -fds[i];
msg.msg_controllen = cmsg->cmsg_len;
} else {
msg.msg_control = NULL;
msg.msg_controllen = 0;
}
while (true) {
sent = sendmsg(conn->fd, &msg, MSG_NOSIGNAL | MSG_DONTWAIT);
if (sent < 0) {
if (errno == EINTR)
continue;
else
goto send_error;
}
break;
}
pw_log_trace("connection %p: %d written %zd bytes and %u fds", conn, conn->fd, sent,
outfds);
size -= sent;
data += sent;
n_fds -= outfds;
fds += outfds;
}
buf->buffer_size = size;
buf->n_fds = n_fds;
return 0; return 0;

View file

@ -686,9 +686,6 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
if ((in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) || if ((in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) ||
(out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) { (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
minsize = 0; minsize = 0;
/* limit buffers to 16 because there is a limit on the number of
* fds in a message for now */
max_buffers = 16;
} }
data_sizes[0] = minsize; data_sizes[0] = minsize;