From a10d7a4e907e50a24c6bff67c5ca9d9a5707ff9d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 21 Mar 2018 17:59:54 +0100 Subject: [PATCH] remote: improve memory handling Better tracking of buffer memory and io areas Activate/deactivate areas --- src/pipewire/remote.c | 318 +++++++++++++++++++++++++----------------- 1 file changed, 190 insertions(+), 128 deletions(-) diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 902503dfc..8f46bc692 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -57,13 +57,17 @@ struct mem { void *ptr; }; +struct buffer_mem { + void *ptr; + struct pw_map_range map; + uint32_t mem_id; +}; + struct buffer { uint32_t id; struct spa_buffer *buf; - struct pw_map_range map; - void *ptr; + struct buffer_mem *mem; uint32_t n_mem; - uint32_t *mem; }; struct mix { @@ -72,6 +76,7 @@ struct mix { uint32_t mix_id; struct pw_port_mix mix; struct pw_array buffers; + bool active; }; struct node_data { @@ -519,40 +524,51 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) } } -static struct mem *find_mem(struct pw_array *mems, uint32_t id) +static struct mem *find_mem(struct node_data *data, uint32_t id) { struct mem *m; - - pw_array_for_each(m, mems) { + pw_array_for_each(m, &data->mems) { if (m->id == id) return m; } return NULL; } -static void *mem_map(struct node_data *data, struct mem *m, uint32_t offset, uint32_t size) +static struct mem *find_mem_ptr(struct node_data *data, void *ptr) { - if (m->ptr == NULL) { - pw_map_range_init(&m->map, offset, size, data->core->sc_pagesize); - - m->ptr = mmap(NULL, m->map.size, PROT_READ|PROT_WRITE, - MAP_SHARED, m->fd, m->map.offset); - - if (m->ptr == MAP_FAILED) { - pw_log_error("Failed to mmap memory %d %p: %m", size, m); - m->ptr = NULL; - return NULL; - } + struct mem *m; + pw_array_for_each(m, &data->mems) { + if (m->ptr == ptr) + return m; } - return SPA_MEMBER(m->ptr, m->map.start, void); + return NULL; } -static void mem_unmap(struct node_data *data, struct mem *m) + +static void *mem_map(struct node_data *data, struct pw_map_range *range, + int fd, int prot, uint32_t offset, uint32_t size) { - if (m->ptr != NULL) { - if (munmap(m->ptr, m->map.size) < 0) - pw_log_warn("failed to unmap: %m"); - m->ptr = NULL; + void *ptr; + + pw_map_range_init(range, offset, size, data->core->sc_pagesize); + + ptr = mmap(NULL, range->size, prot, MAP_SHARED, fd, range->offset); + if (ptr == MAP_FAILED) { + pw_log_error("remote %p: Failed to mmap memory %d: %m", data, size); + return NULL; } + ptr = SPA_MEMBER(ptr, range->start, void); + pw_log_debug("remote %p: fd %d mapped %d %d %p", data, fd, offset, size, ptr); + + return ptr; +} + +static void *mem_unmap(struct node_data *data, void *ptr, struct pw_map_range *range) +{ + if (ptr != NULL) { + if (munmap(SPA_MEMBER(ptr, -range->start, void), range->size) < 0) + pw_log_warn("failed to unmap: %m"); + } + return NULL; } static void clear_mem(struct node_data *data, struct mem *m) @@ -562,6 +578,8 @@ static void clear_mem(struct node_data *data, struct mem *m) int fd; struct mem *m2; + pw_log_debug("remote %p: clear mem %d", data, m->id); + fd = m->fd; m->fd = -1; m->id = SPA_ID_INVALID; @@ -573,7 +591,7 @@ static void clear_mem(struct node_data *data, struct mem *m) } } if (!has_ref) { - mem_unmap(data, m); + m->ptr = mem_unmap(data, m->ptr, &m->map); close(fd); } } @@ -607,6 +625,27 @@ static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); } +static int +do_deactivate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct mix *mix = user_data; + spa_graph_port_remove(&mix->mix.port); + return 0; +} + +static int +deactivate_mix(struct node_data *data, struct mix *mix) +{ + if (mix->active) { + pw_log_debug("node %p: mix %p deactivate", data, mix); + pw_loop_invoke(data->core->data_loop, + do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); + mix->active = false; + } + return 0; +} + static int do_activate_mix(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) @@ -616,6 +655,18 @@ do_activate_mix(struct spa_loop *loop, return 0; } +static int +activate_mix(struct node_data *data, struct mix *mix) +{ + if (!mix->active) { + pw_log_debug("node %p: mix %p activate", data, mix); + pw_loop_invoke(data->core->data_loop, + do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); + mix->active = true; + } + return 0; +} + static struct mix *find_mix(struct node_data *data, enum spa_direction direction, uint32_t port_id, uint32_t mix_id) { @@ -651,9 +702,7 @@ static struct mix *ensure_mix(struct node_data *data, mix_init(mix, port, mix_id); spa_list_append(&data->mix[direction], &mix->link); - - pw_loop_invoke(data->core->data_loop, - do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); + mix->active = false; return mix; } @@ -666,7 +715,7 @@ static void client_node_add_mem(void *object, struct node_data *data = proxy->user_data; struct mem *m; - m = find_mem(&data->mems, mem_id); + m = find_mem(data, mem_id); if (m) { pw_log_warn("duplicate mem %u, fd %d, flags %d", mem_id, memfd, flags); @@ -843,6 +892,33 @@ client_node_remove_port(void *object, uint32_t seq, enum spa_direction direction pw_log_warn("remove port not supported"); } +static void clear_buffers(struct node_data *data, struct mix *mix) +{ + struct pw_port *port = mix->port; + struct buffer *b; + int i; + + pw_log_debug("port %p: clear buffers", port); + pw_port_use_buffers(port, mix->mix_id, NULL, 0); + + pw_array_for_each(b, &mix->buffers) { + for (i = 0; i < b->n_mem; i++) { + struct buffer_mem *bm = &b->mem[i]; + struct mem *m; + + pw_log_debug("port %p: clear buffer %d mem %d", + port, b->id, bm->mem_id); + + m = find_mem(data, bm->mem_id); + if (m && --m->ref == 0) + clear_mem(data, m); + } + b->n_mem = 0; + free(b->buf); + } + mix->buffers.size = 0; +} + static void client_node_port_set_param(void *object, uint32_t seq, @@ -852,6 +928,7 @@ client_node_port_set_param(void *object, { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; + struct pw_type *t = data->t; struct pw_port *port; int res; @@ -861,6 +938,16 @@ client_node_port_set_param(void *object, goto done; } + if (id == t->param.idFormat) { + if (param == NULL) { + struct mix *mix; + spa_list_for_each(mix, &data->mix[direction], link) { + if (mix->port->port_id == port_id) + clear_buffers(data, mix); + } + } + } + res = pw_port_set_param(port, id, flags, param); if (res < 0) goto done; @@ -873,37 +960,6 @@ client_node_port_set_param(void *object, pw_client_node_proxy_done(data->node_proxy, seq, res); } -static void clear_buffers(struct node_data *data, struct mix *mix) -{ - struct pw_port *port = mix->port; - struct buffer *bid; - int i; - - pw_log_debug("port %p: clear buffers", port); - pw_port_use_buffers(port, mix->mix_id, NULL, 0); - - pw_array_for_each(bid, &mix->buffers) { - if (bid->ptr != NULL) { - if (munmap(bid->ptr, bid->map.size) < 0) - pw_log_warn("failed to unmap: %m"); - } - if (bid->mem != NULL) { - for (i = 0; i < bid->n_mem; i++) { - struct mem *m; - m = find_mem(&data->mems, bid->mem[i]); - if (--m->ref == 0) - clear_mem(data, m); - } - bid->mem = NULL; - bid->n_mem = 0; - } - bid->ptr = NULL; - free(bid->buf); - bid->buf = NULL; - } - mix->buffers.size = 0; -} - static void client_node_port_use_buffers(void *object, uint32_t seq, @@ -916,8 +972,7 @@ client_node_port_use_buffers(void *object, uint32_t i, j, len; struct spa_buffer *b, **bufs; struct mix *mix; - struct pw_core *core = proxy->remote->core; - struct pw_type *t = &core->type; + struct pw_type *t = data->t; int res, prot; mix = ensure_mix(data, direction, port_id, mix_id); @@ -934,9 +989,12 @@ client_node_port_use_buffers(void *object, bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); for (i = 0; i < n_buffers; i++) { + struct buffer_mem bmem; + size_t size; off_t offset; + struct mem *m; - struct mem *m = find_mem(&data->mems, buffers[i].mem_id); + m = find_mem(data, buffers[i].mem_id); if (m == NULL) { pw_log_error("unknown memory id %u", buffers[i].mem_id); res = -EINVAL; @@ -946,59 +1004,56 @@ client_node_port_use_buffers(void *object, len = pw_array_get_len(&mix->buffers, struct buffer); bid = pw_array_add(&mix->buffers, sizeof(struct buffer)); - pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); - - bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, m->fd, bid->map.offset); - if (bid->ptr == MAP_FAILED) { - bid->ptr = NULL; - pw_log_error("Failed to mmap memory %u %u %u %d: %m", - bid->map.offset, bid->map.size, buffers[i].mem_id, m->fd); + bmem.mem_id = m->id; + bmem.ptr = mem_map(data, &bmem.map, m->fd, prot, + buffers[i].offset, buffers[i].size); + if (bmem.ptr == NULL) { res = -errno; goto cleanup; } - if (mlock(bid->ptr, bid->map.size) < 0) + if (mlock(bmem.ptr, bmem.map.size) < 0) pw_log_warn("Failed to mlock memory %u %u: %m", - bid->map.offset, bid->map.size); + bmem.map.offset, bmem.map.size); - b = buffers[i].buffer; - - { - size_t size; - - size = sizeof(struct spa_buffer); - size += sizeof(uint32_t); - for (j = 0; j < buffers[i].buffer->n_metas; j++) - size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) { - size += sizeof(struct spa_data); - size += sizeof(uint32_t); - } - - b = bid->buf = malloc(size); - memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); - - b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, - struct spa_data); - bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, - uint32_t); - bid->n_mem = 0; - - m->ref++; - bid->mem[bid->n_mem++] = m->id; + size = sizeof(struct spa_buffer); + size += sizeof(struct buffer_mem); + for (j = 0; j < buffers[i].buffer->n_metas; j++) + size += sizeof(struct spa_meta); + for (j = 0; j < buffers[i].buffer->n_datas; j++) { + size += sizeof(struct spa_data); + size += sizeof(struct buffer_mem); } + + b = bid->buf = malloc(size); + if (b == NULL) { + res = -ENOMEM; + goto cleanup; + } + memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); + + b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct buffer_mem); + bid->n_mem = 0; + bid->id = b->id; + bid->mem[bid->n_mem++] = bmem; + m->ref++; + if (bid->id != len) { pw_log_warn("unexpected id %u found, expected %u", bid->id, len); } - pw_log_debug("add buffer %d %d %u %u", m->id, bid->id, bid->map.offset, bid->map.size); + pw_log_debug("add buffer %d %d %u %u", m->id, + bid->id, bmem.map.offset, bmem.map.size); - offset = bid->map.start; + offset = 0; for (j = 0; j < b->n_metas; j++) { struct spa_meta *m = &b->metas[j]; memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); - m->data = SPA_MEMBER(bid->ptr, offset, void); + m->data = SPA_MEMBER(bmem.ptr, offset, void); offset += m->size; } @@ -1007,27 +1062,32 @@ client_node_port_use_buffers(void *object, memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); d->chunk = - SPA_MEMBER(bid->ptr, offset + sizeof(struct spa_chunk) * j, + SPA_MEMBER(bmem.ptr, offset + sizeof(struct spa_chunk) * j, struct spa_chunk); if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { - uint32_t id = SPA_PTR_TO_UINT32(d->data); - struct mem *bm = find_mem(&data->mems, id); + uint32_t mem_id = SPA_PTR_TO_UINT32(d->data); + struct mem *bm = find_mem(data, mem_id); + struct buffer_mem bm2; if (bm == NULL) { - pw_log_error("unknown buffer mem %u", id); + pw_log_error("unknown buffer mem %u", mem_id); res = -EINVAL; goto cleanup; } - d->data = NULL; d->fd = bm->fd; bm->ref++; - bid->mem[bid->n_mem++] = bm->id; + bm2.mem_id = bm->id; + bm2.ptr = NULL; + d->data = bm2.ptr; + + bid->mem[bid->n_mem++] = bm2; + pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); } else if (d->type == t->data.MemPtr) { - d->data = SPA_MEMBER(bid->ptr, - bid->map.start + SPA_PTR_TO_INT(d->data), void); + int offs = SPA_PTR_TO_INT(d->data); + d->data = SPA_MEMBER(bmem.ptr, offs, void); d->fd = -1; pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); } else { @@ -1080,9 +1140,9 @@ client_node_port_set_io(void *object, struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; struct pw_core *core = proxy->remote->core; - struct pw_type *t = &core->type; + struct pw_type *t = data->t; struct mix *mix; - struct mem *mid; + struct mem *m; void *ptr; mix = ensure_mix(data, direction, port_id, mix_id); @@ -1094,22 +1154,34 @@ client_node_port_set_io(void *object, size = 0; } else { - mid = find_mem(&data->mems, memid); - if (mid == NULL) { + m = find_mem(data, memid); + if (m == NULL) { pw_log_warn("unknown memory id %u", memid); return; } - - if ((ptr = mem_map(data, mid, offset, size)) == NULL) - return; + if (m->ptr == NULL) { + m->ptr = mem_map(data, &m->map, m->fd, + PROT_READ|PROT_WRITE, offset, size); + if (m->ptr == NULL) + return; + } + m->ref++; + ptr = m->ptr; } - pw_log_debug("port %p: set io %s %p", mix->port, spa_type_map_get_type(core->type.map, id), ptr); if (id == t->io.Buffers) { + if (ptr == NULL && mix->mix.port.io) { + deactivate_mix(data, mix); + m = find_mem_ptr(data, mix->mix.port.io); + if (m && --m->ref == 0) + clear_mem(data, m); + } mix->mix.port.io = ptr; + if (ptr) + activate_mix(data, mix); } else { spa_node_port_set_io(mix->port->node->node, direction, port_id, @@ -1185,22 +1257,12 @@ static const struct pw_node_events node_events = { .finish = node_finish, }; -static int -do_deactivate_mix(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct mix *mix = user_data; - spa_graph_port_remove(&mix->mix.port); - return 0; -} - static void clear_mix(struct node_data *data, struct mix *mix) { clear_buffers(data, mix); pw_array_clear(&mix->buffers); - pw_loop_invoke(data->core->data_loop, - do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); + deactivate_mix(data, mix); spa_list_remove(&mix->link); spa_list_append(&data->free_mix, &mix->link);