remote: handle per port buffers and memory

This commit is contained in:
Wim Taymans 2017-10-24 13:00:32 +02:00
parent e969e8854b
commit eef6f380c1

View file

@ -64,6 +64,12 @@ struct buffer_id {
struct port {
struct spa_graph_port output;
struct spa_graph_port input;
struct pw_port *port;
struct pw_array mem_ids;
struct pw_array buffer_ids;
bool in_order;
};
struct node_data {
@ -89,11 +95,6 @@ struct node_data {
struct pw_client_node_proxy *node_proxy;
struct spa_hook node_proxy_listener;
struct spa_hook proxy_listener;
struct pw_array mem_ids;
struct pw_array buffer_ids;
bool in_order;
};
/** \endcond */
@ -537,10 +538,28 @@ static void clean_transport(struct pw_proxy *proxy)
data->trans = NULL;
}
struct port_info {
struct spa_graph_port internal;
struct spa_graph_port external;
};
static void port_init(struct port *port)
{
pw_array_init(&port->mem_ids, 64);
pw_array_ensure_size(&port->mem_ids, sizeof(struct mem_id) * 64);
pw_array_init(&port->buffer_ids, 32);
pw_array_ensure_size(&port->buffer_ids, sizeof(struct buffer_id) * 64);
port->in_order = true;
}
static struct port *find_port(struct node_data *data, enum spa_direction direction, uint32_t port_id)
{
if (direction == SPA_DIRECTION_INPUT) {
if (port_id > data->trans->area->max_input_ports)
return NULL;
return &data->in_ports[port_id];
}
else {
if (port_id > data->trans->area->max_output_ports)
return NULL;
return &data->out_ports[port_id];
}
}
static void client_node_transport(void *object, uint32_t node_id,
int readfd, int writefd,
@ -565,6 +584,7 @@ static void client_node_transport(void *object, uint32_t node_id,
sizeof(struct port));
for (i = 0; i < data->trans->area->max_input_ports; i++) {
port_init(&data->in_ports[i]);
data->trans->inputs[i] = SPA_PORT_IO_INIT;
spa_graph_port_init(&data->in_ports[i].input,
SPA_DIRECTION_INPUT,
@ -580,10 +600,14 @@ static void client_node_transport(void *object, uint32_t node_id,
spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input);
pw_log_info("transport in %d %p", i, &data->trans->inputs[i]);
}
spa_list_for_each(port, &data->node->input_ports, link)
spa_list_for_each(port, &data->node->input_ports, link) {
spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input);
data->in_ports[port->port_id].port = port;
}
for (i = 0; i < data->trans->area->max_output_ports; i++) {
port_init(&data->out_ports[i]);
data->trans->outputs[i] = SPA_PORT_IO_INIT;
spa_graph_port_init(&data->out_ports[i].output,
SPA_DIRECTION_OUTPUT,
i,
@ -598,8 +622,10 @@ static void client_node_transport(void *object, uint32_t node_id,
spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input);
pw_log_info("transport out %d %p", i, &data->trans->inputs[i]);
}
spa_list_for_each(port, &data->node->output_ports, link)
spa_list_for_each(port, &data->node->output_ports, link) {
spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output);
data->out_ports[port->port_id].port = port;
}
data->rtwritefd = writefd;
data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop,
@ -699,20 +725,20 @@ client_node_set_format(void *object,
{
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
struct pw_port *port;
struct port *port;
int res;
port = pw_node_find_port(data->node, direction, port_id);
if (port == NULL) {
port = find_port(data, direction, port_id);
if (port == NULL || port->port == NULL) {
res = SPA_RESULT_INVALID_PORT;
goto done;
}
res = pw_port_set_format(port, flags, format);
res = pw_port_set_format(port->port, flags, format);
if (res != SPA_RESULT_OK)
goto done;
add_port_update(proxy, port,
add_port_update(proxy, port->port,
PW_CLIENT_NODE_PORT_UPDATE_FORMAT |
PW_CLIENT_NODE_PORT_UPDATE_PARAMS |
PW_CLIENT_NODE_PORT_UPDATE_INFO);
@ -731,12 +757,11 @@ client_node_set_param(void *object,
pw_log_warn("set param not implemented");
}
static struct mem_id *find_mem(struct pw_proxy *proxy, uint32_t id)
static struct mem_id *find_mem(struct port *port, uint32_t id)
{
struct mem_id *mid;
struct node_data *data = proxy->user_data;
pw_array_for_each(mid, &data->mem_ids) {
pw_array_for_each(mid, &port->mem_ids) {
if (mid->id == id)
return mid;
}
@ -751,28 +776,34 @@ static void clear_memid(struct mem_id *mid)
close(mid->fd);
}
static void clear_mems(struct pw_proxy *proxy)
static void clear_mems(struct port *port)
{
struct node_data *data = proxy->user_data;
struct mem_id *mid;
pw_array_for_each(mid, &data->mem_ids)
pw_array_for_each(mid, &port->mem_ids)
clear_memid(mid);
data->mem_ids.size = 0;
port->mem_ids.size = 0;
}
static void clear_buffers(struct pw_proxy *proxy)
static void clear_buffers(struct port *port)
{
struct node_data *data = proxy->user_data;
struct buffer_id *bid;
pw_log_debug("node %p: clear buffers", proxy);
pw_log_debug("port %p: clear buffers", port);
pw_array_for_each(bid, &data->buffer_ids) {
pw_array_for_each(bid, &port->buffer_ids) {
free(bid->buf);
bid->buf = NULL;
}
data->buffer_ids.size = 0;
port->buffer_ids.size = 0;
}
static void clear_port(struct port *port)
{
clear_buffers(port);
clear_mems(port);
pw_array_clear(&port->mem_ids);
pw_array_clear(&port->buffer_ids);
}
static void
@ -785,14 +816,15 @@ client_node_add_mem(void *object,
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
struct mem_id *m;
struct port *port = find_port(data, direction, port_id);
m = find_mem(proxy, mem_id);
m = find_mem(port, mem_id);
if (m) {
pw_log_debug("update mem %u, fd %d, flags %d, off %d, size %d",
mem_id, memfd, flags, offset, size);
clear_memid(m);
} else {
m = pw_array_add(&data->mem_ids, sizeof(struct mem_id));
m = pw_array_add(&port->mem_ids, sizeof(struct mem_id));
pw_log_debug("add mem %u, fd %d, flags %d, off %d, size %d",
mem_id, memfd, flags, offset, size);
}
@ -815,24 +847,26 @@ client_node_use_buffers(void *object,
struct buffer_id *bid;
uint32_t i, j, len;
struct spa_buffer *b, **bufs;
struct pw_port *port;
int res;
struct port *port;
int res, prot;
port = pw_node_find_port(data->node, direction, port_id);
port = find_port(data, direction, port_id);
if (port == NULL) {
res = SPA_RESULT_INVALID_PORT;
goto done;
}
prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
/* clear previous buffers */
clear_buffers(proxy);
clear_buffers(port);
bufs = alloca(n_buffers * sizeof(struct spa_buffer *));
for (i = 0; i < n_buffers; i++) {
off_t offset;
struct mem_id *mid = find_mem(proxy, buffers[i].mem_id);
struct mem_id *mid = find_mem(port, buffers[i].mem_id);
if (mid == NULL) {
pw_log_warn("unknown memory id %u", buffers[i].mem_id);
continue;
@ -840,7 +874,7 @@ client_node_use_buffers(void *object,
if (mid->ptr == NULL) {
mid->ptr =
mmap(NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED,
mmap(NULL, mid->size + mid->offset, prot, MAP_SHARED,
mid->fd, 0);
if (mid->ptr == MAP_FAILED) {
mid->ptr = NULL;
@ -849,8 +883,8 @@ client_node_use_buffers(void *object,
continue;
}
}
len = pw_array_get_len(&data->buffer_ids, struct buffer_id);
bid = pw_array_add(&data->buffer_ids, sizeof(struct buffer_id));
len = pw_array_get_len(&port->buffer_ids, struct buffer_id);
bid = pw_array_add(&port->buffer_ids, sizeof(struct buffer_id));
b = buffers[i].buffer;
@ -896,15 +930,19 @@ client_node_use_buffers(void *object,
struct spa_chunk);
if (d->type == proxy->remote->core->type.data.Id) {
struct mem_id *bmid = find_mem(proxy, SPA_PTR_TO_UINT32(d->data));
struct mem_id *bmid = find_mem(port, SPA_PTR_TO_UINT32(d->data));
void *map;
d->type = proxy->remote->core->type.data.MemFd;
d->fd = bmid->fd;
map = mmap(NULL, d->maxsize + d->mapoffset, PROT_READ|PROT_WRITE,
MAP_SHARED, d->fd, 0);
map = mmap(NULL, d->maxsize + d->mapoffset, prot, MAP_SHARED, d->fd, 0);
if (map == MAP_FAILED) {
pw_log_error("data %d failed to mmap memory %m", j);
res = SPA_RESULT_ERROR;
goto done;
}
d->data = SPA_MEMBER(map, d->mapoffset, uint8_t);
pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd);
pw_log_debug(" data %d %u -> fd %d mem %p", j, bmid->id, bmid->fd, map);
} else if (d->type == proxy->remote->core->type.data.MemPtr) {
d->data = SPA_MEMBER(bid->buf_ptr, SPA_PTR_TO_INT(d->data), void);
d->fd = -1;
@ -916,10 +954,10 @@ client_node_use_buffers(void *object,
bufs[i] = b;
}
res = pw_port_use_buffers(port, bufs, n_buffers);
res = pw_port_use_buffers(port->port, bufs, n_buffers);
if (n_buffers == 0)
clear_mems(proxy);
clear_mems(port);
done:
pw_client_node_proxy_done(data->node_proxy, seq, res);
@ -1080,18 +1118,20 @@ static const struct pw_node_events node_events = {
.have_output = node_have_output,
};
static void node_proxy_destroy(void *data)
static void node_proxy_destroy(void *_data)
{
struct node_data *d = data;
struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy;
struct node_data *data = _data;
struct pw_proxy *proxy = (struct pw_proxy*) data->node_proxy;
int i;
for (i = 0; i < data->trans->area->max_input_ports; i++)
clear_port(&data->in_ports[i]);
for (i = 0; i < data->trans->area->max_output_ports; i++)
clear_port(&data->out_ports[i]);
clean_transport(proxy);
clear_buffers(proxy);
clear_mems(proxy);
pw_array_clear(&d->mem_ids);
pw_array_clear(&d->buffer_ids);
spa_hook_remove(&d->node_listener);
spa_hook_remove(&data->node_listener);
}
static const struct pw_proxy_events proxy_events = {
@ -1159,11 +1199,6 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
spa_graph_node_init(&data->out_node);
spa_graph_node_set_implementation(&data->out_node, &data->out_node_impl);
pw_array_init(&data->mem_ids, 64);
pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64);
pw_array_init(&data->buffer_ids, 32);
pw_array_ensure_size(&data->buffer_ids, sizeof(struct buffer_id) * 64);
pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data);
pw_node_add_listener(node, &data->node_listener, &node_events, data);