remote: improve memory handling

Better tracking of buffer memory and io areas
Activate/deactivate areas
This commit is contained in:
Wim Taymans 2018-03-21 17:59:54 +01:00
parent a7341ce583
commit a10d7a4e90

View file

@ -57,13 +57,17 @@ struct mem {
void *ptr; void *ptr;
}; };
struct buffer_mem {
void *ptr;
struct pw_map_range map;
uint32_t mem_id;
};
struct buffer { struct buffer {
uint32_t id; uint32_t id;
struct spa_buffer *buf; struct spa_buffer *buf;
struct pw_map_range map; struct buffer_mem *mem;
void *ptr;
uint32_t n_mem; uint32_t n_mem;
uint32_t *mem;
}; };
struct mix { struct mix {
@ -72,6 +76,7 @@ struct mix {
uint32_t mix_id; uint32_t mix_id;
struct pw_port_mix mix; struct pw_port_mix mix;
struct pw_array buffers; struct pw_array buffers;
bool active;
}; };
struct node_data { struct node_data {
@ -519,40 +524,51 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
} }
} }
static struct mem *find_mem(struct pw_array *mems, uint32_t id) static struct mem *find_mem(struct node_data *data, uint32_t id)
{ {
struct mem *m; struct mem *m;
pw_array_for_each(m, &data->mems) {
pw_array_for_each(m, mems) {
if (m->id == id) if (m->id == id)
return m; return m;
} }
return NULL; return NULL;
} }
static void *mem_map(struct node_data *data, struct mem *m, uint32_t offset, uint32_t size) static struct mem *find_mem_ptr(struct node_data *data, void *ptr)
{ {
if (m->ptr == NULL) { struct mem *m;
pw_map_range_init(&m->map, offset, size, data->core->sc_pagesize); pw_array_for_each(m, &data->mems) {
if (m->ptr == ptr)
m->ptr = mmap(NULL, m->map.size, PROT_READ|PROT_WRITE, return m;
MAP_SHARED, m->fd, m->map.offset);
if (m->ptr == MAP_FAILED) {
pw_log_error("Failed to mmap memory %d %p: %m", size, m);
m->ptr = NULL;
return NULL;
}
} }
return SPA_MEMBER(m->ptr, m->map.start, void); return NULL;
} }
static void mem_unmap(struct node_data *data, struct mem *m)
static void *mem_map(struct node_data *data, struct pw_map_range *range,
int fd, int prot, uint32_t offset, uint32_t size)
{ {
if (m->ptr != NULL) { void *ptr;
if (munmap(m->ptr, m->map.size) < 0)
pw_log_warn("failed to unmap: %m"); pw_map_range_init(range, offset, size, data->core->sc_pagesize);
m->ptr = NULL;
ptr = mmap(NULL, range->size, prot, MAP_SHARED, fd, range->offset);
if (ptr == MAP_FAILED) {
pw_log_error("remote %p: Failed to mmap memory %d: %m", data, size);
return NULL;
} }
ptr = SPA_MEMBER(ptr, range->start, void);
pw_log_debug("remote %p: fd %d mapped %d %d %p", data, fd, offset, size, ptr);
return ptr;
}
static void *mem_unmap(struct node_data *data, void *ptr, struct pw_map_range *range)
{
if (ptr != NULL) {
if (munmap(SPA_MEMBER(ptr, -range->start, void), range->size) < 0)
pw_log_warn("failed to unmap: %m");
}
return NULL;
} }
static void clear_mem(struct node_data *data, struct mem *m) static void clear_mem(struct node_data *data, struct mem *m)
@ -562,6 +578,8 @@ static void clear_mem(struct node_data *data, struct mem *m)
int fd; int fd;
struct mem *m2; struct mem *m2;
pw_log_debug("remote %p: clear mem %d", data, m->id);
fd = m->fd; fd = m->fd;
m->fd = -1; m->fd = -1;
m->id = SPA_ID_INVALID; m->id = SPA_ID_INVALID;
@ -573,7 +591,7 @@ static void clear_mem(struct node_data *data, struct mem *m)
} }
} }
if (!has_ref) { if (!has_ref) {
mem_unmap(data, m); m->ptr = mem_unmap(data, m->ptr, &m->map);
close(fd); close(fd);
} }
} }
@ -607,6 +625,27 @@ static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id)
pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64);
} }
static int
do_deactivate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
spa_graph_port_remove(&mix->mix.port);
return 0;
}
static int
deactivate_mix(struct node_data *data, struct mix *mix)
{
if (mix->active) {
pw_log_debug("node %p: mix %p deactivate", data, mix);
pw_loop_invoke(data->core->data_loop,
do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix);
mix->active = false;
}
return 0;
}
static int static int
do_activate_mix(struct spa_loop *loop, do_activate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -616,6 +655,18 @@ do_activate_mix(struct spa_loop *loop,
return 0; return 0;
} }
static int
activate_mix(struct node_data *data, struct mix *mix)
{
if (!mix->active) {
pw_log_debug("node %p: mix %p activate", data, mix);
pw_loop_invoke(data->core->data_loop,
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix);
mix->active = true;
}
return 0;
}
static struct mix *find_mix(struct node_data *data, static struct mix *find_mix(struct node_data *data,
enum spa_direction direction, uint32_t port_id, uint32_t mix_id) enum spa_direction direction, uint32_t port_id, uint32_t mix_id)
{ {
@ -651,9 +702,7 @@ static struct mix *ensure_mix(struct node_data *data,
mix_init(mix, port, mix_id); mix_init(mix, port, mix_id);
spa_list_append(&data->mix[direction], &mix->link); spa_list_append(&data->mix[direction], &mix->link);
mix->active = false;
pw_loop_invoke(data->core->data_loop,
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix);
return mix; return mix;
} }
@ -666,7 +715,7 @@ static void client_node_add_mem(void *object,
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct mem *m; struct mem *m;
m = find_mem(&data->mems, mem_id); m = find_mem(data, mem_id);
if (m) { if (m) {
pw_log_warn("duplicate mem %u, fd %d, flags %d", pw_log_warn("duplicate mem %u, fd %d, flags %d",
mem_id, memfd, flags); mem_id, memfd, flags);
@ -843,6 +892,33 @@ client_node_remove_port(void *object, uint32_t seq, enum spa_direction direction
pw_log_warn("remove port not supported"); pw_log_warn("remove port not supported");
} }
static void clear_buffers(struct node_data *data, struct mix *mix)
{
struct pw_port *port = mix->port;
struct buffer *b;
int i;
pw_log_debug("port %p: clear buffers", port);
pw_port_use_buffers(port, mix->mix_id, NULL, 0);
pw_array_for_each(b, &mix->buffers) {
for (i = 0; i < b->n_mem; i++) {
struct buffer_mem *bm = &b->mem[i];
struct mem *m;
pw_log_debug("port %p: clear buffer %d mem %d",
port, b->id, bm->mem_id);
m = find_mem(data, bm->mem_id);
if (m && --m->ref == 0)
clear_mem(data, m);
}
b->n_mem = 0;
free(b->buf);
}
mix->buffers.size = 0;
}
static void static void
client_node_port_set_param(void *object, client_node_port_set_param(void *object,
uint32_t seq, uint32_t seq,
@ -852,6 +928,7 @@ client_node_port_set_param(void *object,
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_type *t = data->t;
struct pw_port *port; struct pw_port *port;
int res; int res;
@ -861,6 +938,16 @@ client_node_port_set_param(void *object,
goto done; goto done;
} }
if (id == t->param.idFormat) {
if (param == NULL) {
struct mix *mix;
spa_list_for_each(mix, &data->mix[direction], link) {
if (mix->port->port_id == port_id)
clear_buffers(data, mix);
}
}
}
res = pw_port_set_param(port, id, flags, param); res = pw_port_set_param(port, id, flags, param);
if (res < 0) if (res < 0)
goto done; goto done;
@ -873,37 +960,6 @@ client_node_port_set_param(void *object,
pw_client_node_proxy_done(data->node_proxy, seq, res); pw_client_node_proxy_done(data->node_proxy, seq, res);
} }
static void clear_buffers(struct node_data *data, struct mix *mix)
{
struct pw_port *port = mix->port;
struct buffer *bid;
int i;
pw_log_debug("port %p: clear buffers", port);
pw_port_use_buffers(port, mix->mix_id, NULL, 0);
pw_array_for_each(bid, &mix->buffers) {
if (bid->ptr != NULL) {
if (munmap(bid->ptr, bid->map.size) < 0)
pw_log_warn("failed to unmap: %m");
}
if (bid->mem != NULL) {
for (i = 0; i < bid->n_mem; i++) {
struct mem *m;
m = find_mem(&data->mems, bid->mem[i]);
if (--m->ref == 0)
clear_mem(data, m);
}
bid->mem = NULL;
bid->n_mem = 0;
}
bid->ptr = NULL;
free(bid->buf);
bid->buf = NULL;
}
mix->buffers.size = 0;
}
static void static void
client_node_port_use_buffers(void *object, client_node_port_use_buffers(void *object,
uint32_t seq, uint32_t seq,
@ -916,8 +972,7 @@ client_node_port_use_buffers(void *object,
uint32_t i, j, len; uint32_t i, j, len;
struct spa_buffer *b, **bufs; struct spa_buffer *b, **bufs;
struct mix *mix; struct mix *mix;
struct pw_core *core = proxy->remote->core; struct pw_type *t = data->t;
struct pw_type *t = &core->type;
int res, prot; int res, prot;
mix = ensure_mix(data, direction, port_id, mix_id); mix = ensure_mix(data, direction, port_id, mix_id);
@ -934,9 +989,12 @@ client_node_port_use_buffers(void *object,
bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); bufs = alloca(n_buffers * sizeof(struct spa_buffer *));
for (i = 0; i < n_buffers; i++) { for (i = 0; i < n_buffers; i++) {
struct buffer_mem bmem;
size_t size;
off_t offset; off_t offset;
struct mem *m;
struct mem *m = find_mem(&data->mems, buffers[i].mem_id); m = find_mem(data, buffers[i].mem_id);
if (m == NULL) { if (m == NULL) {
pw_log_error("unknown memory id %u", buffers[i].mem_id); pw_log_error("unknown memory id %u", buffers[i].mem_id);
res = -EINVAL; res = -EINVAL;
@ -946,59 +1004,56 @@ client_node_port_use_buffers(void *object,
len = pw_array_get_len(&mix->buffers, struct buffer); len = pw_array_get_len(&mix->buffers, struct buffer);
bid = pw_array_add(&mix->buffers, sizeof(struct buffer)); bid = pw_array_add(&mix->buffers, sizeof(struct buffer));
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); bmem.mem_id = m->id;
bmem.ptr = mem_map(data, &bmem.map, m->fd, prot,
bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, m->fd, bid->map.offset); buffers[i].offset, buffers[i].size);
if (bid->ptr == MAP_FAILED) { if (bmem.ptr == NULL) {
bid->ptr = NULL;
pw_log_error("Failed to mmap memory %u %u %u %d: %m",
bid->map.offset, bid->map.size, buffers[i].mem_id, m->fd);
res = -errno; res = -errno;
goto cleanup; goto cleanup;
} }
if (mlock(bid->ptr, bid->map.size) < 0) if (mlock(bmem.ptr, bmem.map.size) < 0)
pw_log_warn("Failed to mlock memory %u %u: %m", pw_log_warn("Failed to mlock memory %u %u: %m",
bid->map.offset, bid->map.size); bmem.map.offset, bmem.map.size);
b = buffers[i].buffer; size = sizeof(struct spa_buffer);
size += sizeof(struct buffer_mem);
{ for (j = 0; j < buffers[i].buffer->n_metas; j++)
size_t size; size += sizeof(struct spa_meta);
for (j = 0; j < buffers[i].buffer->n_datas; j++) {
size = sizeof(struct spa_buffer); size += sizeof(struct spa_data);
size += sizeof(uint32_t); size += sizeof(struct buffer_mem);
for (j = 0; j < buffers[i].buffer->n_metas; j++)
size += sizeof(struct spa_meta);
for (j = 0; j < buffers[i].buffer->n_datas; j++) {
size += sizeof(struct spa_data);
size += sizeof(uint32_t);
}
b = bid->buf = malloc(size);
memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer));
b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta);
b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas,
struct spa_data);
bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas,
uint32_t);
bid->n_mem = 0;
m->ref++;
bid->mem[bid->n_mem++] = m->id;
} }
b = bid->buf = malloc(size);
if (b == NULL) {
res = -ENOMEM;
goto cleanup;
}
memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer));
b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta);
b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas,
struct spa_data);
bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas,
struct buffer_mem);
bid->n_mem = 0;
bid->id = b->id; bid->id = b->id;
bid->mem[bid->n_mem++] = bmem;
m->ref++;
if (bid->id != len) { if (bid->id != len) {
pw_log_warn("unexpected id %u found, expected %u", bid->id, len); pw_log_warn("unexpected id %u found, expected %u", bid->id, len);
} }
pw_log_debug("add buffer %d %d %u %u", m->id, bid->id, bid->map.offset, bid->map.size); pw_log_debug("add buffer %d %d %u %u", m->id,
bid->id, bmem.map.offset, bmem.map.size);
offset = bid->map.start; offset = 0;
for (j = 0; j < b->n_metas; j++) { for (j = 0; j < b->n_metas; j++) {
struct spa_meta *m = &b->metas[j]; struct spa_meta *m = &b->metas[j];
memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta));
m->data = SPA_MEMBER(bid->ptr, offset, void); m->data = SPA_MEMBER(bmem.ptr, offset, void);
offset += m->size; offset += m->size;
} }
@ -1007,27 +1062,32 @@ client_node_port_use_buffers(void *object,
memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data));
d->chunk = d->chunk =
SPA_MEMBER(bid->ptr, offset + sizeof(struct spa_chunk) * j, SPA_MEMBER(bmem.ptr, offset + sizeof(struct spa_chunk) * j,
struct spa_chunk); struct spa_chunk);
if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) {
uint32_t id = SPA_PTR_TO_UINT32(d->data); uint32_t mem_id = SPA_PTR_TO_UINT32(d->data);
struct mem *bm = find_mem(&data->mems, id); struct mem *bm = find_mem(data, mem_id);
struct buffer_mem bm2;
if (bm == NULL) { if (bm == NULL) {
pw_log_error("unknown buffer mem %u", id); pw_log_error("unknown buffer mem %u", mem_id);
res = -EINVAL; res = -EINVAL;
goto cleanup; goto cleanup;
} }
d->data = NULL;
d->fd = bm->fd; d->fd = bm->fd;
bm->ref++; bm->ref++;
bid->mem[bid->n_mem++] = bm->id; bm2.mem_id = bm->id;
bm2.ptr = NULL;
d->data = bm2.ptr;
bid->mem[bid->n_mem++] = bm2;
pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd);
} else if (d->type == t->data.MemPtr) { } else if (d->type == t->data.MemPtr) {
d->data = SPA_MEMBER(bid->ptr, int offs = SPA_PTR_TO_INT(d->data);
bid->map.start + SPA_PTR_TO_INT(d->data), void); d->data = SPA_MEMBER(bmem.ptr, offs, void);
d->fd = -1; d->fd = -1;
pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data);
} else { } else {
@ -1080,9 +1140,9 @@ client_node_port_set_io(void *object,
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_core *core = proxy->remote->core; struct pw_core *core = proxy->remote->core;
struct pw_type *t = &core->type; struct pw_type *t = data->t;
struct mix *mix; struct mix *mix;
struct mem *mid; struct mem *m;
void *ptr; void *ptr;
mix = ensure_mix(data, direction, port_id, mix_id); mix = ensure_mix(data, direction, port_id, mix_id);
@ -1094,22 +1154,34 @@ client_node_port_set_io(void *object,
size = 0; size = 0;
} }
else { else {
mid = find_mem(&data->mems, memid); m = find_mem(data, memid);
if (mid == NULL) { if (m == NULL) {
pw_log_warn("unknown memory id %u", memid); pw_log_warn("unknown memory id %u", memid);
return; return;
} }
if (m->ptr == NULL) {
if ((ptr = mem_map(data, mid, offset, size)) == NULL) m->ptr = mem_map(data, &m->map, m->fd,
return; PROT_READ|PROT_WRITE, offset, size);
if (m->ptr == NULL)
return;
}
m->ref++;
ptr = m->ptr;
} }
pw_log_debug("port %p: set io %s %p", mix->port, pw_log_debug("port %p: set io %s %p", mix->port,
spa_type_map_get_type(core->type.map, id), ptr); spa_type_map_get_type(core->type.map, id), ptr);
if (id == t->io.Buffers) { if (id == t->io.Buffers) {
if (ptr == NULL && mix->mix.port.io) {
deactivate_mix(data, mix);
m = find_mem_ptr(data, mix->mix.port.io);
if (m && --m->ref == 0)
clear_mem(data, m);
}
mix->mix.port.io = ptr; mix->mix.port.io = ptr;
if (ptr)
activate_mix(data, mix);
} else { } else {
spa_node_port_set_io(mix->port->node->node, spa_node_port_set_io(mix->port->node->node,
direction, port_id, direction, port_id,
@ -1185,22 +1257,12 @@ static const struct pw_node_events node_events = {
.finish = node_finish, .finish = node_finish,
}; };
static int
do_deactivate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
spa_graph_port_remove(&mix->mix.port);
return 0;
}
static void clear_mix(struct node_data *data, struct mix *mix) static void clear_mix(struct node_data *data, struct mix *mix)
{ {
clear_buffers(data, mix); clear_buffers(data, mix);
pw_array_clear(&mix->buffers); pw_array_clear(&mix->buffers);
pw_loop_invoke(data->core->data_loop, deactivate_mix(data, mix);
do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix);
spa_list_remove(&mix->link); spa_list_remove(&mix->link);
spa_list_append(&data->free_mix, &mix->link); spa_list_append(&data->free_mix, &mix->link);