diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 52abbb959..ac51ba350 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -63,12 +63,21 @@ #define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers) +struct mem { + uint32_t id; + int ref; + int fd; + uint32_t type; + uint32_t flags; +}; + struct buffer { struct spa_buffer *outbuf; struct spa_buffer buffer; struct spa_meta metas[4]; struct spa_data datas[4]; bool outstanding; + struct mem *mem; }; struct port { @@ -112,7 +121,6 @@ struct node { uint32_t n_params; struct spa_pod **params; - uint32_t membase; uint32_t seq; }; @@ -131,6 +139,8 @@ struct impl { struct spa_hook node_listener; struct spa_hook resource_listener; + struct pw_array mems; + int fds[2]; int other_fds[2]; @@ -140,12 +150,67 @@ struct impl { /** \endcond */ +static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t flags) +{ + struct mem *m, *f = NULL; + + pw_array_for_each(m, &impl->mems) { + if (m->ref <= 0) + f = m; + else if (m->fd == fd) + goto found; + } + + if (f == NULL) { + m = pw_array_add(&impl->mems, sizeof(struct mem)); + m->id = pw_array_get_len(&impl->mems, struct mem) - 1; + m->ref = 0; + } + else { + m = f; + } + m->fd = fd; + m->type = type; + m->flags = flags; + + pw_client_node_resource_add_mem(impl->node.resource, + m->id, + type, + m->fd, + m->flags); + found: + m->ref++; + return m; +} + + static int clear_buffers(struct node *this, struct port *port) { - if (port->n_buffers) { - spa_log_info(this->log, "node %p: clear buffers", this); - port->n_buffers = 0; + uint32_t i, j; + struct impl *impl = this->impl; + struct pw_type *t = impl->t; + + for (i = 0; i < port->n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + + spa_log_debug(this->log, "node %p: clear buffer %d", this, i); + + for (j = 0; j < b->buffer.n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + if (d->type == t->data.DmaBuf || + d->type == t->data.MemFd) { + uint32_t id; + struct mem *m; + + id = SPA_PTR_TO_UINT32(b->buffer.datas[j].data); + m = pw_array_get_unchecked(&impl->mems, id, struct mem); + m->ref--; + } + } + b->mem->ref--; } + port->n_buffers = 0; return 0; } @@ -434,8 +499,8 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3 static int impl_node_port_get_info(struct spa_node *node, - enum spa_direction direction, - uint32_t port_id, const struct spa_port_info **info) + enum spa_direction direction, uint32_t port_id, + const struct spa_port_info **info) { struct node *this; struct port *port; @@ -528,15 +593,18 @@ impl_node_port_set_io(struct spa_node *node, void *data, size_t size) { struct node *this; + struct impl *impl; struct pw_type *t; struct pw_memblock *mem; + struct mem *m; uint32_t memid, mem_offset, mem_size; if (node == NULL) return -EINVAL; this = SPA_CONTAINER_OF(node, struct node, node); - t = this->impl->t; + impl = this->impl; + t = impl->t; if (this->resource == NULL) return 0; @@ -548,17 +616,13 @@ impl_node_port_set_io(struct spa_node *node, if ((mem = pw_memblock_find(data)) == NULL) return -EINVAL; - memid = this->membase++; mem_offset = mem->offset; - if (mem->size - mem_offset < size) + mem_size = mem->size; + if (mem_size - mem_offset < size) return -EINVAL; - mem_size = mem->size; - - pw_client_node_resource_add_mem(this->resource, - memid, - t->data.MemFd, - mem->fd, mem->flags); + m = ensure_mem(impl, mem->fd, t->data.MemFd, mem->flags); + memid = m->id; } else { memid = SPA_ID_INVALID; @@ -618,7 +682,8 @@ impl_node_port_use_buffers(struct spa_node *node, for (i = 0; i < n_buffers; i++) { struct buffer *b = &port->buffers[i]; - struct pw_memblock *m; + struct pw_memblock *mem; + struct mem *m; size_t data_size, size; void *baseptr; @@ -634,7 +699,7 @@ impl_node_port_use_buffers(struct spa_node *node, else return -EINVAL; - if ((m = pw_memblock_find(baseptr)) == NULL) + if ((mem = pw_memblock_find(baseptr)) == NULL) return -EINVAL; data_size = 0; @@ -648,15 +713,12 @@ impl_node_port_use_buffers(struct spa_node *node, data_size += d->maxsize; } - mb[i].buffer = &b->buffer; - mb[i].mem_id = this->membase++; - mb[i].offset = SPA_PTRDIFF(baseptr, m->ptr + m->offset); - mb[i].size = data_size; + b->mem = m = ensure_mem(impl, mem->fd, t->data.MemFd, mem->flags); - pw_client_node_resource_add_mem(this->resource, - mb[i].mem_id, - t->data.MemFd, - m->fd, m->flags); + mb[i].buffer = &b->buffer; + mb[i].mem_id = m->id; + mb[i].offset = SPA_PTRDIFF(baseptr, mem->ptr + mem->offset); + mb[i].size = data_size; for (j = 0; j < buffers[i]->n_metas; j++) memcpy(&b->buffer.metas[j], &buffers[i]->metas[j], sizeof(struct spa_meta)); @@ -670,13 +732,8 @@ impl_node_port_use_buffers(struct spa_node *node, if (d->type == t->data.DmaBuf || d->type == t->data.MemFd) { - pw_client_node_resource_add_mem(this->resource, - this->membase, - d->type, - d->fd, - d->flags); - b->buffer.datas[j].data = SPA_UINT32_TO_PTR(this->membase); - this->membase++; + m = ensure_mem(impl, d->fd, d->type, d->flags); + b->buffer.datas[j].data = SPA_UINT32_TO_PTR(m->id); } else if (d->type == t->data.MemPtr) { b->buffer.datas[j].data = SPA_INT_TO_PTR(size); size += d->maxsize; @@ -1053,9 +1110,9 @@ static const struct spa_node impl_node = { static int node_init(struct node *this, - struct spa_dict *info, - const struct spa_support *support, - uint32_t n_support) + struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) { uint32_t i; @@ -1158,6 +1215,8 @@ static void node_free(void *data) spa_hook_remove(&impl->node_listener); + pw_array_clear(&impl->mems); + if (impl->fds[0] != -1) close(impl->fds[0]); if (impl->fds[1] != -1) @@ -1215,6 +1274,8 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, node_init(&impl->node, NULL, support, n_support); impl->node.impl = impl; + pw_array_init(&impl->mems, 64); + if ((name = pw_properties_get(properties, "node.name")) == NULL) name = "client-node";