make separate sockets for data and control

This commit is contained in:
Wim Taymans 2016-10-14 19:23:05 +02:00
parent ee202e13e9
commit 184e0a300a
7 changed files with 419 additions and 92 deletions

View file

@ -67,6 +67,7 @@ struct _ProxyBuffer {
typedef struct {
SpaProps props;
int socketfd;
int rtsocketfd;
} SpaProxyProps;
typedef struct {
@ -100,6 +101,7 @@ struct _SpaProxy {
URI uri;
SpaIDMap *map;
SpaLog *log;
SpaPoll *main_loop;
SpaPoll *data_loop;
SpaProxyProps props[2];
@ -109,6 +111,8 @@ struct _SpaProxy {
SpaPollFd fds[1];
SpaPollItem poll;
SpaPollFd rtfds[1];
SpaPollItem rtpoll;
unsigned int max_inputs;
unsigned int n_inputs;
@ -122,6 +126,7 @@ struct _SpaProxy {
enum {
PROP_ID_SOCKET,
PROP_ID_RT_SOCKET,
PROP_ID_LAST,
};
@ -133,12 +138,19 @@ static const SpaPropInfo prop_info[PROP_ID_LAST] =
SPA_PROP_TYPE_INT, sizeof (int),
SPA_PROP_RANGE_TYPE_NONE, 0, NULL,
NULL },
{ PROP_ID_RT_SOCKET, offsetof (SpaProxyProps, rtsocketfd),
"rt-socket",
SPA_PROP_FLAG_READWRITE,
SPA_PROP_TYPE_INT, sizeof (int),
SPA_PROP_RANGE_TYPE_NONE, 0, NULL,
NULL },
};
static void
reset_proxy_props (SpaProxyProps *props)
{
props->socketfd = -1;
props->rtsocketfd = -1;
}
static SpaResult
@ -149,14 +161,32 @@ update_poll (SpaProxy *this, int socketfd)
p = &this->props[1];
if (p->socketfd != -1) {
spa_poll_remove_item (this->data_loop, &this->poll);
}
if (p->socketfd != -1)
spa_poll_remove_item (this->main_loop, &this->poll);
p->socketfd = socketfd;
if (p->socketfd != -1) {
this->fds[0].fd = p->socketfd;
spa_poll_add_item (this->data_loop, &this->poll);
spa_poll_add_item (this->main_loop, &this->poll);
}
return res;
}
static SpaResult
update_rtpoll (SpaProxy *this, int rtsocketfd)
{
SpaProxyProps *p;
SpaResult res = SPA_RESULT_OK;
p = &this->props[1];
if (p->rtsocketfd != -1)
spa_poll_remove_item (this->data_loop, &this->rtpoll);
p->rtsocketfd = rtsocketfd;
if (p->rtsocketfd != -1) {
this->rtfds[0].fd = p->rtsocketfd;
spa_poll_add_item (this->data_loop, &this->rtpoll);
}
return res;
}
@ -232,8 +262,11 @@ spa_proxy_node_set_props (SpaNode *node,
res = spa_props_copy_values (props, &np->props);
/* compare changes */
if (op->socketfd != np->socketfd)
res = update_poll (this, np->socketfd);
if (op->rtsocketfd != np->rtsocketfd)
res = update_rtpoll (this, np->rtsocketfd);
/* commit changes */
memcpy (op, np, sizeof (*np));
@ -780,64 +813,68 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
size += b->size;
}
/* make mem for the buffers */
port->buffer_mem_id = n_mem++;
port->buffer_mem_size = size;
port->buffer_mem_fd = memfd_create ("spa-memfd", MFD_CLOEXEC | MFD_ALLOW_SEALING);
if (n_buffers > 0) {
/* make mem for the buffers */
port->buffer_mem_id = n_mem++;
port->buffer_mem_size = size;
port->buffer_mem_fd = memfd_create ("spa-memfd", MFD_CLOEXEC | MFD_ALLOW_SEALING);
if (ftruncate (port->buffer_mem_fd, size) < 0) {
spa_log_error (this->log, "Failed to truncate temporary file: %s\n", strerror (errno));
close (port->buffer_mem_fd);
return SPA_RESULT_ERROR;
}
if (ftruncate (port->buffer_mem_fd, size) < 0) {
spa_log_error (this->log, "Failed to truncate temporary file: %s\n", strerror (errno));
close (port->buffer_mem_fd);
return SPA_RESULT_ERROR;
}
#if 0
{
unsigned int seals = F_SEAL_GROW | F_SEAL_SHRINK | F_SEAL_SEAL;
if (fcntl (port->buffer_mem_fd, F_ADD_SEALS, seals) == -1) {
spa_log_error (this->log, "Failed to add seals: %s\n", strerror (errno));
{
unsigned int seals = F_SEAL_GROW | F_SEAL_SHRINK | F_SEAL_SEAL;
if (fcntl (port->buffer_mem_fd, F_ADD_SEALS, seals) == -1) {
spa_log_error (this->log, "Failed to add seals: %s\n", strerror (errno));
}
}
}
#endif
p = port->buffer_mem_ptr = mmap (NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, port->buffer_mem_fd, 0);
p = port->buffer_mem_ptr = mmap (NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, port->buffer_mem_fd, 0);
for (i = 0; i < n_buffers; i++) {
ProxyBuffer *b = &port->buffers[i];
SpaBuffer *sb;
SpaMeta *sbm;
SpaData *sbd;
for (i = 0; i < n_buffers; i++) {
ProxyBuffer *b = &port->buffers[i];
SpaBuffer *sb;
SpaMeta *sbm;
SpaData *sbd;
spa_buffer_serialize (p, &b->buffer);
spa_buffer_serialize (p, &b->buffer);
sb = p;
b->buffer.datas = SPA_MEMBER (sb, SPA_PTR_TO_INT (sb->datas), SpaData);
sbm = SPA_MEMBER (sb, SPA_PTR_TO_INT (sb->metas), SpaMeta);
sbd = SPA_MEMBER (sb, SPA_PTR_TO_INT (sb->datas), SpaData);
sb = p;
b->buffer.datas = SPA_MEMBER (sb, SPA_PTR_TO_INT (sb->datas), SpaData);
sbm = SPA_MEMBER (sb, SPA_PTR_TO_INT (sb->metas), SpaMeta);
sbd = SPA_MEMBER (sb, SPA_PTR_TO_INT (sb->datas), SpaData);
for (j = 0; j < b->buffer.n_metas; j++)
b->metas[j].data = SPA_MEMBER (sb, SPA_PTR_TO_INT (sbm[j].data), void);
for (j = 0; j < b->buffer.n_metas; j++)
b->metas[j].data = SPA_MEMBER (sb, SPA_PTR_TO_INT (sbm[j].data), void);
for (j = 0; j < b->buffer.n_datas; j++) {
if (b->datas[j].type == SPA_DATA_TYPE_MEMPTR)
b->datas[j].data = SPA_MEMBER (sb, SPA_PTR_TO_INT (sbd[j].data), void);
for (j = 0; j < b->buffer.n_datas; j++) {
if (b->datas[j].type == SPA_DATA_TYPE_MEMPTR)
b->datas[j].data = SPA_MEMBER (sb, SPA_PTR_TO_INT (sbd[j].data), void);
}
p += b->size;
}
p += b->size;
}
am.direction = direction;
am.port_id = port_id;
am.mem_id = port->buffer_mem_id;
am.type = SPA_DATA_TYPE_MEMFD;
am.fd_index = spa_control_builder_add_fd (&builder, port->buffer_mem_fd, false);
am.flags = 0;
am.offset = 0;
am.size = size;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am);
am.direction = direction;
am.port_id = port_id;
am.mem_id = port->buffer_mem_id;
am.type = SPA_DATA_TYPE_MEMFD;
am.fd_index = spa_control_builder_add_fd (&builder, port->buffer_mem_fd, false);
am.flags = 0;
am.offset = 0;
am.size = size;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am);
memref = alloca (n_buffers * sizeof (SpaControlMemRef));
for (i = 0; i < n_buffers; i++) {
memref[i].mem_id = port->buffer_mem_id;
memref[i].offset = port->buffers[i].offset;
memref[i].size = port->buffers[i].size;
memref = alloca (n_buffers * sizeof (SpaControlMemRef));
for (i = 0; i < n_buffers; i++) {
memref[i].mem_id = port->buffer_mem_id;
memref[i].offset = port->buffers[i].offset;
memref[i].size = port->buffers[i].size;
}
} else {
memref = NULL;
}
port->n_buffers = n_buffers;
@ -988,7 +1025,7 @@ spa_proxy_node_port_push_input (SpaNode *node,
if (have_enough)
return SPA_RESULT_HAVE_ENOUGH_INPUT;
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
if ((res = spa_control_write (&control, this->rtfds[0].fd)) < 0)
spa_log_error (this->log, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
@ -1074,7 +1111,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne);
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
if ((res = spa_control_write (&control, this->rtfds[0].fd)) < 0)
spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res);
spa_control_clear (&control);
@ -1145,6 +1182,7 @@ parse_control (SpaProxy *this,
case SPA_CONTROL_CMD_SET_FORMAT:
case SPA_CONTROL_CMD_SET_PROPERTY:
case SPA_CONTROL_CMD_NODE_COMMAND:
case SPA_CONTROL_CMD_PROCESS_BUFFER:
spa_log_error (this->log, "proxy %p: got unexpected control %d\n", this, cmd);
break;
@ -1217,6 +1255,50 @@ parse_control (SpaProxy *this,
case SPA_CONTROL_CMD_USE_BUFFERS:
break;
case SPA_CONTROL_CMD_NODE_EVENT:
{
SpaControlCmdNodeEvent cne;
if (spa_control_iter_parse_cmd (&it, &cne) < 0)
break;
handle_node_event (this, cne.event);
break;
}
}
}
spa_control_iter_end (&it);
return SPA_RESULT_OK;
}
static SpaResult
parse_rtcontrol (SpaProxy *this,
SpaControl *ctrl)
{
SpaControlIter it;
spa_control_iter_init (&it, ctrl);
while (spa_control_iter_next (&it) == SPA_RESULT_OK) {
SpaControlCmd cmd = spa_control_iter_get_cmd (&it);
switch (cmd) {
case SPA_CONTROL_CMD_INVALID:
case SPA_CONTROL_CMD_NODE_UPDATE:
case SPA_CONTROL_CMD_PORT_UPDATE:
case SPA_CONTROL_CMD_NODE_STATE_CHANGE:
case SPA_CONTROL_CMD_PORT_STATUS_CHANGE:
case SPA_CONTROL_CMD_ADD_PORT:
case SPA_CONTROL_CMD_REMOVE_PORT:
case SPA_CONTROL_CMD_SET_FORMAT:
case SPA_CONTROL_CMD_SET_PROPERTY:
case SPA_CONTROL_CMD_NODE_COMMAND:
case SPA_CONTROL_CMD_ADD_MEM:
case SPA_CONTROL_CMD_REMOVE_MEM:
case SPA_CONTROL_CMD_USE_BUFFERS:
spa_log_error (this->log, "proxy %p: got unexpected control %d\n", this, cmd);
break;
case SPA_CONTROL_CMD_PROCESS_BUFFER:
{
SpaControlCmdProcessBuffer cmd;
@ -1266,7 +1348,7 @@ proxy_on_fd_events (SpaPollNotifyData *data)
uint8_t buf[1024];
int fds[16];
if ((res = spa_control_read (&control, data->fds[0].fd, buf, 1024, fds, 16)) < 0) {
if ((res = spa_control_read (&control, data->fds[0].fd, buf, sizeof (buf), fds, 16)) < 0) {
spa_log_error (this->log, "proxy %p: failed to read control: %d\n", this, res);
return 0;
}
@ -1276,6 +1358,26 @@ proxy_on_fd_events (SpaPollNotifyData *data)
return 0;
}
static int
proxy_on_rtfd_events (SpaPollNotifyData *data)
{
SpaProxy *this = data->user_data;
SpaResult res;
if (data->fds[0].revents & POLLIN) {
SpaControl control;
uint8_t buf[1024];
if ((res = spa_control_read (&control, data->fds[0].fd, buf, sizeof (buf), NULL, 0)) < 0) {
spa_log_error (this->log, "proxy %p: failed to read control: %d\n", this, res);
return 0;
}
parse_rtcontrol (this, &control);
spa_control_clear (&control);
}
return 0;
}
static const SpaNode proxy_node = {
sizeof (SpaNode),
NULL,
@ -1367,6 +1469,8 @@ proxy_init (const SpaHandleFactory *factory,
this->map = support[i].data;
else if (strcmp (support[i].uri, SPA_LOG_URI) == 0)
this->log = support[i].data;
else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0)
this->main_loop = support[i].data;
else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0)
this->data_loop = support[i].data;
}
@ -1398,6 +1502,18 @@ proxy_init (const SpaHandleFactory *factory,
this->poll.after_cb = proxy_on_fd_events;
this->poll.user_data = this;
this->rtfds[0].fd = -1;
this->rtfds[0].events = POLLIN | POLLPRI | POLLERR;
this->rtfds[0].revents = 0;
this->rtpoll.id = 0;
this->rtpoll.enabled = true;
this->rtpoll.fds = this->rtfds;
this->rtpoll.n_fds = 1;
this->rtpoll.idle_cb = NULL;
this->rtpoll.before_cb = NULL;
this->rtpoll.after_cb = proxy_on_rtfd_events;
this->rtpoll.user_data = this;
return SPA_RESULT_RETURN_ASYNC (this->seq++);
}