Add mapper

Ger rid of static ids for interfaces and replace with something we can
register dynamically
Implement logger.
This commit is contained in:
Wim Taymans 2016-10-07 13:14:32 +02:00
parent a68e5d5124
commit fc4fd1424a
43 changed files with 997 additions and 360 deletions

View file

@ -31,7 +31,8 @@
#include <spa/control.h>
#include <spa/debug.h>
#include <spa/node.h>
#include <spa/queue.h>
#include <spa/log.h>
#include <spa/id-map.h>
#include <spa/queue.h>
#include "../lib/memfd-wrappers.h"
@ -88,10 +89,18 @@ typedef struct {
SpaQueue ready;
} SpaProxyPort;
typedef struct {
uint32_t node;
} URI;
struct _SpaProxy {
SpaHandle handle;
SpaNode node;
URI uri;
SpaIDMap *map;
SpaLog *log;
SpaProxyProps props[2];
SpaNodeEventCallback event_cb;
@ -176,7 +185,7 @@ static SpaResult
clear_buffers (SpaProxy *this, SpaProxyPort *port)
{
if (port->n_buffers) {
fprintf (stderr, "proxy %p: clear buffers\n", this);
spa_log_info (this->log, "proxy %p: clear buffers\n", this);
munmap (port->buffer_mem_ptr, port->buffer_mem_size);
close (port->buffer_mem_fd);
@ -273,7 +282,7 @@ spa_proxy_node_send_command (SpaNode *node,
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control %d\n", this, res);
spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res);
spa_control_clear (&control);
@ -295,7 +304,7 @@ spa_proxy_node_send_command (SpaNode *node,
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control %d\n", this, res);
spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res);
spa_control_clear (&control);
break;
@ -421,7 +430,7 @@ do_update_port (SpaProxy *this,
}
if (!port->valid) {
fprintf (stderr, "proxy %p: adding port %d\n", this, pu->port_id);
spa_log_info (this->log, "proxy %p: adding port %d\n", this, pu->port_id);
port->format = NULL;
port->valid = true;
@ -462,7 +471,7 @@ do_uninit_port (SpaProxy *this,
{
SpaProxyPort *port;
fprintf (stderr, "proxy %p: removing port %d\n", this, port_id);
spa_log_info (this->log, "proxy %p: removing port %d\n", this, port_id);
if (direction == SPA_DIRECTION_INPUT) {
port = &this->in_ports[port_id];
this->n_inputs--;
@ -581,7 +590,7 @@ spa_proxy_node_port_set_format (SpaNode *node,
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control\n", this);
spa_log_error (this->log, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
@ -709,7 +718,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
return SPA_RESULT_INVALID_ARGUMENTS;
this = (SpaProxy *) node->handle;
fprintf (stderr, "proxy %p: use buffers %p %u\n", this, buffers, n_buffers);
spa_log_info (this->log, "proxy %p: use buffers %p %u\n", this, buffers, n_buffers);
if (!CHECK_PORT (this, direction, port_id))
return SPA_RESULT_INVALID_PORT;
@ -770,7 +779,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
default:
b->buffer.datas[j].type = SPA_DATA_TYPE_INVALID;
b->buffer.datas[j].data = 0;
fprintf (stderr, "invalid memory type %d\n", d->type);
spa_log_error (this->log, "invalid memory type %d\n", d->type);
break;
}
}
@ -783,7 +792,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
port->buffer_mem_fd = memfd_create ("spa-memfd", MFD_CLOEXEC | MFD_ALLOW_SEALING);
if (ftruncate (port->buffer_mem_fd, size) < 0) {
fprintf (stderr, "Failed to truncate temporary file: %s\n", strerror (errno));
spa_log_error (this->log, "Failed to truncate temporary file: %s\n", strerror (errno));
close (port->buffer_mem_fd);
return SPA_RESULT_ERROR;
}
@ -791,7 +800,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
{
unsigned int seals = F_SEAL_GROW | F_SEAL_SHRINK | F_SEAL_SEAL;
if (fcntl (port->buffer_mem_fd, F_ADD_SEALS, seals) == -1) {
fprintf (stderr, "Failed to add seals: %s\n", strerror (errno));
spa_log_error (this->log, "Failed to add seals: %s\n", strerror (errno));
}
}
#endif
@ -848,7 +857,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control\n", this);
spa_log_error (this->log, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
@ -897,7 +906,7 @@ copy_meta_in (SpaProxy *this, SpaProxyPort *port, uint32_t buffer_id)
for (i = 0; i < b->outbuf->n_datas; i++) {
b->outbuf->datas[i].size = b->buffer.datas[i].size;
if (b->outbuf->datas[i].type == SPA_DATA_TYPE_MEMPTR) {
fprintf (stderr, "memcpy in %zd\n", b->buffer.datas[i].size);
spa_log_info (this->log, "memcpy in %zd\n", b->buffer.datas[i].size);
memcpy (b->outbuf->datas[i].data, b->datas[i].data, b->buffer.datas[i].size);
}
}
@ -917,7 +926,7 @@ copy_meta_out (SpaProxy *this, SpaProxyPort *port, uint32_t buffer_id)
for (i = 0; i < b->outbuf->n_datas; i++) {
b->buffer.datas[i].size = b->outbuf->datas[i].size;
if (b->datas[i].type == SPA_DATA_TYPE_MEMPTR) {
fprintf (stderr, "memcpy out %zd\n", b->outbuf->datas[i].size);
spa_log_info (this->log, "memcpy out %zd\n", b->outbuf->datas[i].size);
memcpy (b->datas[i].data, b->outbuf->datas[i].data, b->outbuf->datas[i].size);
}
}
@ -948,7 +957,7 @@ spa_proxy_node_port_push_input (SpaNode *node,
for (i = 0; i < n_info; i++) {
if (!CHECK_IN_PORT (this, SPA_DIRECTION_INPUT, info[i].port_id)) {
printf ("invalid port %d\n", info[i].port_id);
spa_log_warn (this->log, "invalid port %d\n", info[i].port_id);
info[i].status = SPA_RESULT_INVALID_PORT;
have_error = true;
continue;
@ -986,7 +995,7 @@ spa_proxy_node_port_push_input (SpaNode *node,
return SPA_RESULT_HAVE_ENOUGH_INPUT;
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control\n", this);
spa_log_error (this->log, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
@ -1011,7 +1020,7 @@ spa_proxy_node_port_pull_output (SpaNode *node,
for (i = 0; i < n_info; i++) {
if (!CHECK_OUT_PORT (this, SPA_DIRECTION_OUTPUT, info[i].port_id)) {
fprintf (stderr, "invalid port %u\n", info[i].port_id);
spa_log_warn (this->log, "invalid port %u\n", info[i].port_id);
info[i].status = SPA_RESULT_INVALID_PORT;
have_error = true;
continue;
@ -1072,7 +1081,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control %d\n", this, res);
spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res);
spa_control_clear (&control);
@ -1085,12 +1094,16 @@ spa_proxy_node_port_push_event (SpaNode *node,
uint32_t port_id,
SpaNodeEvent *event)
{
SpaProxy *this;
if (node == NULL || node->handle == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = (SpaProxy *) node->handle;
switch (event->type) {
default:
fprintf (stderr, "unhandled event %d\n", event->type);
spa_log_warn (this->log, "unhandled event %d\n", event->type);
break;
}
return SPA_RESULT_NOT_IMPLEMENTED;
@ -1141,7 +1154,7 @@ parse_control (SpaProxy *this,
case SPA_CONTROL_CMD_SET_FORMAT:
case SPA_CONTROL_CMD_SET_PROPERTY:
case SPA_CONTROL_CMD_NODE_COMMAND:
fprintf (stderr, "proxy %p: got unexpected control %d\n", this, cmd);
spa_log_error (this->log, "proxy %p: got unexpected control %d\n", this, cmd);
break;
case SPA_CONTROL_CMD_NODE_UPDATE:
@ -1156,7 +1169,7 @@ parse_control (SpaProxy *this,
if (nu.change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS)
this->max_outputs = nu.max_output_ports;
fprintf (stderr, "proxy %p: got node update %d, max_in %u, max_out %u\n", this, cmd,
spa_log_info (this->log, "proxy %p: got node update %d, max_in %u, max_out %u\n", this, cmd,
this->max_inputs, this->max_outputs);
break;
@ -1167,7 +1180,7 @@ parse_control (SpaProxy *this,
SpaControlCmdPortUpdate pu;
bool remove;
fprintf (stderr, "proxy %p: got port update %d\n", this, cmd);
spa_log_info (this->log, "proxy %p: got port update %d\n", this, cmd);
if (spa_control_iter_parse_cmd (&it, &pu) < 0)
break;
@ -1186,7 +1199,7 @@ parse_control (SpaProxy *this,
case SPA_CONTROL_CMD_PORT_STATUS_CHANGE:
{
fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd);
spa_log_warn (this->log, "proxy %p: command not implemented %d\n", this, cmd);
break;
}
@ -1198,7 +1211,7 @@ parse_control (SpaProxy *this,
if (spa_control_iter_parse_cmd (&it, &sc) < 0)
break;
fprintf (stderr, "proxy %p: got node state change %d -> %d\n", this, old, sc.state);
spa_log_info (this->log, "proxy %p: got node state change %d -> %d\n", this, old, sc.state);
this->node.state = sc.state;
if (old == SPA_NODE_STATE_INIT)
send_async_complete (this, 0, SPA_RESULT_OK);
@ -1227,7 +1240,7 @@ parse_control (SpaProxy *this,
port = cmd.direction == SPA_DIRECTION_INPUT ? &this->in_ports[cmd.port_id] : &this->out_ports[cmd.port_id];
if (port->buffer_id != SPA_ID_INVALID)
fprintf (stderr, "proxy %p: unprocessed buffer: %d\n", this, port->buffer_id);
spa_log_warn (this->log, "proxy %p: unprocessed buffer: %d\n", this, port->buffer_id);
copy_meta_in (this, port, cmd.buffer_id);
@ -1263,7 +1276,7 @@ proxy_on_fd_events (SpaPollNotifyData *data)
int fds[16];
if ((res = spa_control_read (&control, data->fds[0].fd, buf, 1024, fds, 16)) < 0) {
fprintf (stderr, "proxy %p: failed to read control: %d\n", this, res);
spa_log_error (this->log, "proxy %p: failed to read control: %d\n", this, res);
return 0;
}
parse_control (this, &control);
@ -1310,14 +1323,11 @@ spa_proxy_get_interface (SpaHandle *handle,
if (handle == NULL || interface == 0)
return SPA_RESULT_INVALID_ARGUMENTS;
switch (interface_id) {
case SPA_INTERFACE_ID_NODE:
*interface = &this->node;
break;
default:
return SPA_RESULT_UNKNOWN_INTERFACE;
if (interface_id == this->uri.node)
*interface = &this->node;
else
return SPA_RESULT_UNKNOWN_INTERFACE;
}
return SPA_RESULT_OK;
}
@ -1348,10 +1358,11 @@ static SpaResult
proxy_init (const SpaHandleFactory *factory,
SpaHandle *handle,
const SpaDict *info,
const SpaSupport **support,
const SpaSupport *support,
unsigned int n_support)
{
SpaProxy *this;
unsigned int i;
if (factory == NULL || handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -1360,6 +1371,19 @@ proxy_init (const SpaHandleFactory *factory,
handle->clear = proxy_clear;
this = (SpaProxy *) handle;
for (i = 0; i < n_support; i++) {
if (strcmp (support[i].uri, SPA_ID_MAP_URI) == 0)
this->map = support[i].data;
else if (strcmp (support[i].uri, SPA_LOG_URI) == 0)
this->log = support[i].data;
}
if (this->map == NULL) {
spa_log_error (this->log, "an id-map is needed");
return SPA_RESULT_ERROR;
}
this->uri.node = spa_id_map_get_id (this->map, SPA_NODE_URI);
this->node = proxy_node;
this->node.handle = handle;
this->props[1].props.n_prop_info = PROP_ID_LAST;
@ -1384,10 +1408,7 @@ proxy_init (const SpaHandleFactory *factory,
static const SpaInterfaceInfo proxy_interfaces[] =
{
{ SPA_INTERFACE_ID_NODE,
SPA_INTERFACE_ID_NODE_NAME,
SPA_INTERFACE_ID_NODE_DESCRIPTION,
},
{ SPA_NODE_URI, },
};
static SpaResult