Optimize transport some more

We can optimize the transport some more if we allow the host to
configure the area used for transfering buffers. We can then also place
the current status in the area and avoid calling get_status(). We can
also allocate this area in shared memory, avoiding a memcpy in the
client-node.
This commit is contained in:
Wim Taymans 2016-11-07 18:23:09 +01:00
parent b774b99db5
commit e88a376d7c
21 changed files with 903 additions and 989 deletions

View file

@ -80,7 +80,7 @@ typedef struct {
SpaFormat *format;
unsigned int n_formats;
SpaFormat **formats;
SpaPortStatus status;
void *io;
unsigned int n_buffers;
ProxyBuffer buffers[MAX_BUFFERS];
@ -97,6 +97,8 @@ struct _SpaProxy
{
SpaNode node;
PinosNode *pnode;
SpaIDMap *map;
SpaLog *log;
SpaPoll *main_loop;
@ -119,8 +121,6 @@ struct _SpaProxy
SpaProxyPort in_ports[MAX_INPUTS];
SpaProxyPort out_ports[MAX_OUTPUTS];
PinosTransport *trans;
uint32_t seq;
};
@ -590,28 +590,41 @@ spa_proxy_node_port_set_props (SpaNode *node,
}
static SpaResult
spa_proxy_node_port_get_status (SpaNode *node,
SpaDirection direction,
uint32_t port_id,
const SpaPortStatus **status)
spa_proxy_node_port_set_input (SpaNode *node,
uint32_t port_id,
SpaPortInput *input)
{
SpaProxy *this;
SpaProxyPort *port;
if (node == NULL || status == NULL)
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
if (!CHECK_PORT (this, direction, port_id))
if (!CHECK_PORT (this, SPA_DIRECTION_INPUT, port_id))
return SPA_RESULT_INVALID_PORT;
port = direction == SPA_DIRECTION_INPUT ? &this->in_ports[port_id] : &this->out_ports[port_id];
this->in_ports[port_id].io = input;
if (!port->format)
return SPA_RESULT_NO_FORMAT;
return SPA_RESULT_OK;
}
*status = &port->status;
static SpaResult
spa_proxy_node_port_set_output (SpaNode *node,
uint32_t port_id,
SpaPortOutput *output)
{
SpaProxy *this;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
if (!CHECK_PORT (this, SPA_DIRECTION_OUTPUT, port_id))
return SPA_RESULT_INVALID_PORT;
this->out_ports[port_id].io = output;
return SPA_RESULT_OK;
}
@ -848,99 +861,6 @@ copy_meta_out (SpaProxy *this, SpaProxyPort *port, uint32_t buffer_id)
}
}
static SpaResult
spa_proxy_node_port_push_input (SpaNode *node,
unsigned int n_info,
SpaPortInputInfo *info)
{
SpaProxy *this;
unsigned int i;
bool have_error = false;
if (node == NULL || n_info == 0 || info == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
for (i = 0; i < n_info; i++) {
SpaProxyPort *port;
uint32_t port_id = info[i].port_id;
if (!CHECK_IN_PORT (this, SPA_DIRECTION_INPUT, port_id)) {
spa_log_warn (this->log, "invalid port %d", port_id);
info[i].status = SPA_RESULT_INVALID_PORT;
have_error = true;
continue;
}
port = &this->in_ports[port_id];
if (!port->format) {
info[i].status = SPA_RESULT_NO_FORMAT;
have_error = true;
continue;
}
if (!CHECK_PORT_BUFFER (this, info[i].buffer_id, port)) {
info[i].status = SPA_RESULT_INVALID_BUFFER_ID;
have_error = true;
continue;
}
copy_meta_out (this, port, info[i].buffer_id);
this->trans->input_info[i] = *info;
}
this->trans->area->n_input_info = n_info;
if (have_error)
return SPA_RESULT_ERROR;
pinos_transport_write_cmd (this->trans, PINOS_TRANSPORT_CMD_HAVE_DATA);
return SPA_RESULT_OK;
}
static SpaResult
spa_proxy_node_port_pull_output (SpaNode *node,
unsigned int n_info,
SpaPortOutputInfo *info)
{
SpaProxy *this;
unsigned int i;
bool have_error = false;
if (node == NULL || n_info == 0 || info == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
for (i = 0; i < n_info; i++) {
SpaProxyPort *port;
uint32_t port_id = info[i].port_id;
if (!CHECK_OUT_PORT (this, SPA_DIRECTION_OUTPUT, port_id)) {
info[i].status = SPA_RESULT_INVALID_PORT;
have_error = true;
continue;
}
port = &this->out_ports[port_id];
info[i] = this->trans->output_info[port_id];
if (!CHECK_PORT_BUFFER (this, info[i].buffer_id, port)) {
info[i].status = SPA_RESULT_INVALID_BUFFER_ID;
have_error = true;
continue;
}
copy_meta_in (this, port, info[i].buffer_id);
if (info[i].status != SPA_RESULT_OK)
have_error = true;
}
if (have_error)
return SPA_RESULT_ERROR;
return SPA_RESULT_OK;
}
static SpaResult
spa_proxy_node_port_reuse_buffer (SpaNode *node,
uint32_t port_id,
@ -948,11 +868,14 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
{
SpaProxy *this;
SpaNodeEventReuseBuffer rb;
PinosNode *pnode;
uint8_t cmd;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
pnode = this->pnode;
if (!CHECK_OUT_PORT (this, SPA_DIRECTION_OUTPUT, port_id))
return SPA_RESULT_INVALID_PORT;
@ -961,8 +884,9 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
rb.event.size = sizeof (rb);
rb.port_id = port_id;
rb.buffer_id = buffer_id;
pinos_transport_add_event (this->trans, &rb.event);
pinos_transport_write_cmd (this->trans, PINOS_TRANSPORT_CMD_HAVE_EVENT);
pinos_transport_add_event (pnode->transport, &rb.event);
cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT;
write (this->rtfds[0].fd, &cmd, 1);
return SPA_RESULT_OK;
}
@ -1000,6 +924,72 @@ spa_proxy_node_port_send_command (SpaNode *node,
return res;
}
static SpaResult
spa_proxy_node_process_input (SpaNode *node)
{
SpaProxy *this;
unsigned int i;
bool have_error = false;
uint8_t cmd;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
for (i = 0; i < this->n_inputs; i++) {
SpaProxyPort *port = &this->in_ports[i];
SpaPortInput *input;
if ((input = port->io) == NULL)
continue;
if (!CHECK_PORT_BUFFER (this, input->buffer_id, port)) {
input->status = SPA_RESULT_INVALID_BUFFER_ID;
have_error = true;
continue;
}
copy_meta_out (this, port, input->buffer_id);
}
if (have_error)
return SPA_RESULT_ERROR;
cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
write (this->rtfds[0].fd, &cmd, 1);
return SPA_RESULT_OK;
}
static SpaResult
spa_proxy_node_process_output (SpaNode *node)
{
SpaProxy *this;
unsigned int i;
bool have_error = false;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
for (i = 0; i < this->n_outputs; i++) {
SpaProxyPort *port = &this->out_ports[i];
SpaPortOutput *output;
if ((output = port->io) == NULL)
continue;
copy_meta_in (this, port, output->buffer_id);
if (output->status != SPA_RESULT_OK)
have_error = true;
}
if (have_error)
return SPA_RESULT_ERROR;
return SPA_RESULT_OK;
}
static SpaResult
handle_node_event (SpaProxy *this,
SpaNodeEvent *event)
@ -1047,8 +1037,6 @@ parse_connection (SpaProxy *this)
case PINOS_MESSAGE_NODE_UPDATE:
{
PinosMessageNodeUpdate nu;
PinosTransportInfo info;
PinosMessageTransportUpdate tu;
if (!pinos_connection_parse_message (conn, &nu))
break;
@ -1061,19 +1049,8 @@ parse_connection (SpaProxy *this)
spa_log_info (this->log, "proxy %p: got node update %d, max_in %u, max_out %u", this, type,
this->max_inputs, this->max_outputs);
this->trans = pinos_transport_new (this->max_inputs,
this->max_outputs,
this->rtfds[0].fd);
pinos_transport_get_info (this->trans, &info);
tu.memfd_index = pinos_connection_add_fd (conn, info.memfd, false);
tu.offset = info.offset;
tu.size = info.size;
pinos_connection_add_message (conn, PINOS_MESSAGE_TRANSPORT_UPDATE, &tu);
if (!pinos_connection_flush (conn))
spa_log_error (this->log, "proxy %p: error writing connection", this);
#if 0
#endif
break;
}
@ -1157,17 +1134,18 @@ static int
proxy_on_rtfd_events (SpaPollNotifyData *data)
{
SpaProxy *this = data->user_data;
PinosNode *pnode = this->pnode;
if (data->fds[0].revents & POLLIN) {
uint8_t cmd;
pinos_transport_read_cmd (this->trans, &cmd);
read (this->rtfds[0].fd, &cmd, 1);
if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) {
SpaNodeEvent event;
while (pinos_transport_next_event (this->trans, &event) == SPA_RESULT_OK) {
while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (event.size);
pinos_transport_parse_event (this->trans, ev);
pinos_transport_parse_event (pnode->transport, ev);
this->event_cb (&this->node, ev, this->user_data);
}
}
@ -1209,11 +1187,12 @@ static const SpaNode proxy_node = {
spa_proxy_node_port_set_props,
spa_proxy_node_port_use_buffers,
spa_proxy_node_port_alloc_buffers,
spa_proxy_node_port_get_status,
spa_proxy_node_port_push_input,
spa_proxy_node_port_pull_output,
spa_proxy_node_port_set_input,
spa_proxy_node_port_set_output,
spa_proxy_node_port_reuse_buffer,
spa_proxy_node_port_send_command,
spa_proxy_node_process_input,
spa_proxy_node_process_output,
};
static SpaResult
@ -1268,6 +1247,28 @@ proxy_init (SpaProxy *this,
return SPA_RESULT_RETURN_ASYNC (this->seq++);
}
static void
on_transport_changed (GObject *obj,
GParamSpec *pspec,
PinosClientNode *this)
{
PinosNode *node = PINOS_NODE (obj);
PinosClientNodePrivate *priv = this->priv;
PinosTransportInfo info;
PinosMessageTransportUpdate tu;
PinosConnection *conn = priv->proxy->conn;
pinos_transport_get_info (node->transport, &info);
tu.memfd_index = pinos_connection_add_fd (conn, info.memfd, false);
tu.offset = info.offset;
tu.size = info.size;
pinos_connection_add_message (conn, PINOS_MESSAGE_TRANSPORT_UPDATE, &tu);
if (!pinos_connection_flush (conn))
pinos_log_error ("client-node %p: error writing connection", this);
}
static SpaResult
proxy_clear (SpaProxy *this)
{
@ -1358,6 +1359,7 @@ pinos_client_node_constructed (GObject * object)
G_OBJECT_CLASS (pinos_client_node_parent_class)->constructed (object);
g_signal_connect (object, "notify::transport", (GCallback) on_transport_changed, this);
priv->proxy = (SpaProxy *) PINOS_NODE (this)->node;
}
@ -1408,7 +1410,6 @@ pinos_client_node_new (PinosDaemon *daemon,
p = g_malloc0 (sizeof (SpaProxy));
proxy_init (p, NULL, daemon->support, daemon->n_support);
node = g_object_new (PINOS_TYPE_CLIENT_NODE,
"daemon", daemon,
"client", client,
@ -1417,6 +1418,8 @@ pinos_client_node_new (PinosDaemon *daemon,
"node", p,
NULL);
p->pnode = node;
return node;
}