From c1aa3b4625ca47266e2a6771cb80d937b3f365bb Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 1 Dec 2017 17:24:03 +0100 Subject: [PATCH] client-node: move to per node memory Remove the per-port memory and only use per-node memory. This will make it possible to share memory between ports in the future. Keep refs to memory on the buffers and free (close) the memory when no longer used. --- src/extensions/client-node.h | 52 +++-- src/modules/module-client-node/client-node.c | 48 ++--- .../module-client-node/protocol-native.c | 101 +++++----- src/pipewire/map.h | 2 +- src/pipewire/remote.c | 184 ++++++++++-------- src/pipewire/stream.c | 30 ++- 6 files changed, 218 insertions(+), 199 deletions(-) diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 0d3f40b46..63b804088 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -288,14 +288,14 @@ pw_client_node_proxy_destroy(struct pw_client_node_proxy *p) } -#define PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT 0 -#define PW_CLIENT_NODE_PROXY_EVENT_SET_PARAM 1 -#define PW_CLIENT_NODE_PROXY_EVENT_EVENT 2 -#define PW_CLIENT_NODE_PROXY_EVENT_COMMAND 3 -#define PW_CLIENT_NODE_PROXY_EVENT_ADD_PORT 4 -#define PW_CLIENT_NODE_PROXY_EVENT_REMOVE_PORT 5 -#define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_PARAM 6 -#define PW_CLIENT_NODE_PROXY_EVENT_PORT_ADD_MEM 7 +#define PW_CLIENT_NODE_PROXY_EVENT_ADD_MEM 0 +#define PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT 1 +#define PW_CLIENT_NODE_PROXY_EVENT_SET_PARAM 2 +#define PW_CLIENT_NODE_PROXY_EVENT_EVENT 3 +#define PW_CLIENT_NODE_PROXY_EVENT_COMMAND 4 +#define PW_CLIENT_NODE_PROXY_EVENT_ADD_PORT 5 +#define PW_CLIENT_NODE_PROXY_EVENT_REMOVE_PORT 6 +#define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_PARAM 7 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_USE_BUFFERS 8 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_COMMAND 9 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_IO 10 @@ -305,6 +305,19 @@ pw_client_node_proxy_destroy(struct pw_client_node_proxy *p) struct pw_client_node_proxy_events { #define PW_VERSION_CLIENT_NODE_PROXY_EVENTS 0 uint32_t version; + /** + * Memory was added to a node + * + * \param mem_id the id of the memory + * \param type the memory type + * \param memfd the fd of the memory + * \param flags flags for the \a memfd + */ + void (*add_mem) (void *object, + uint32_t mem_id, + uint32_t type, + int memfd, + uint32_t flags); /** * Notify of a new transport area * @@ -387,25 +400,6 @@ struct pw_client_node_proxy_events { uint32_t port_id, uint32_t id, uint32_t flags, const struct spa_pod *param); - /** - * Memory was added for a port - * - * \param direction a port direction - * \param port_id the port id - * \param mem_id the id of the memory - * \param type the memory type - * \param memfd the fd of the memory - * \param flags flags for the \a memfd - * \param offset valid offset of mapped memory from \a memfd - * \param size valid size of mapped memory from \a memfd - */ - void (*port_add_mem) (void *object, - enum spa_direction direction, - uint32_t port_id, - uint32_t mem_id, - uint32_t type, - int memfd, - uint32_t flags); /** * Notify the port of buffers * @@ -463,6 +457,8 @@ pw_client_node_proxy_add_listener(struct pw_client_node_proxy *p, pw_proxy_add_proxy_listener((struct pw_proxy*)p, listener, events, data); } +#define pw_client_node_resource_add_mem(r,...) \ + pw_resource_notify(r,struct pw_client_node_proxy_events,add_mem,__VA_ARGS__) #define pw_client_node_resource_transport(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,transport,__VA_ARGS__) #define pw_client_node_resource_set_param(r,...) \ @@ -477,8 +473,6 @@ pw_client_node_proxy_add_listener(struct pw_client_node_proxy *p, pw_resource_notify(r,struct pw_client_node_proxy_events,remove_port,__VA_ARGS__) #define pw_client_node_resource_port_set_param(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,port_set_param,__VA_ARGS__) -#define pw_client_node_resource_port_add_mem(r,...) \ - pw_resource_notify(r,struct pw_client_node_proxy_events,port_add_mem,__VA_ARGS__) #define pw_client_node_resource_port_use_buffers(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,port_use_buffers,__VA_ARGS__) #define pw_client_node_resource_port_command(r,...) \ diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index e7782904f..0142f62a4 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -68,8 +68,6 @@ struct buffer { struct spa_buffer buffer; struct spa_meta metas[4]; struct spa_data datas[4]; - off_t offset; - size_t size; bool outstanding; }; @@ -540,11 +538,10 @@ spa_proxy_node_port_set_io(struct spa_node *node, if ((mem = pw_memblock_find(data)) == NULL) return -EINVAL; - pw_client_node_resource_port_add_mem(this->resource, - direction, port_id, - memid, - t->data.MemFd, - mem->fd, mem->flags); + pw_client_node_resource_add_mem(this->resource, + memid, + t->data.MemFd, + mem->fd, mem->flags); pw_client_node_resource_port_set_io(this->resource, this->seq, @@ -567,7 +564,6 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, struct impl *impl; struct port *port; uint32_t i, j; - size_t n_mem; struct pw_client_node_buffer *mb; struct pw_type *t; @@ -598,11 +594,10 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, if (this->resource == NULL) return 0; - n_mem = this->membase; for (i = 0; i < n_buffers; i++) { struct buffer *b = &port->buffers[i]; struct pw_memblock *m; - size_t data_size; + size_t data_size, size; void *baseptr; b->outbuf = buffers[i]; @@ -632,21 +627,20 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, } mb[i].buffer = &b->buffer; - mb[i].mem_id = n_mem++; + mb[i].mem_id = this->membase++; mb[i].offset = SPA_PTRDIFF(baseptr, m->ptr + m->offset); mb[i].size = data_size; - pw_client_node_resource_port_add_mem(this->resource, - direction, - port_id, - mb[i].mem_id, - t->data.MemFd, - m->fd, m->flags); + pw_client_node_resource_add_mem(this->resource, + mb[i].mem_id, + t->data.MemFd, + m->fd, m->flags); for (j = 0; j < buffers[i]->n_metas; j++) memcpy(&b->buffer.metas[j], &buffers[i]->metas[j], sizeof(struct spa_meta)); b->buffer.n_metas = j; + size = 0; for (j = 0; j < buffers[i]->n_datas; j++) { struct spa_data *d = &buffers[i]->datas[j]; @@ -654,18 +648,16 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, if (d->type == t->data.DmaBuf || d->type == t->data.MemFd) { - pw_client_node_resource_port_add_mem(this->resource, - direction, - port_id, - n_mem, - d->type, - d->fd, - d->flags); - b->buffer.datas[j].data = SPA_UINT32_TO_PTR(n_mem); - n_mem++; + pw_client_node_resource_add_mem(this->resource, + this->membase, + d->type, + d->fd, + d->flags); + b->buffer.datas[j].data = SPA_UINT32_TO_PTR(this->membase); + this->membase++; } else if (d->type == t->data.MemPtr) { - b->buffer.datas[j].data = SPA_INT_TO_PTR(b->size); - b->size += d->maxsize; + b->buffer.datas[j].data = SPA_INT_TO_PTR(size); + size += d->maxsize; } else { b->buffer.datas[j].type = SPA_ID_INVALID; b->buffer.datas[j].data = 0; diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c index fbfec10c4..854475700 100644 --- a/src/modules/module-client-node/protocol-native.c +++ b/src/modules/module-client-node/protocol-native.c @@ -149,6 +149,31 @@ static void client_node_marshal_destroy(void *object) pw_protocol_native_end_proxy(proxy, b); } +static bool client_node_demarshal_add_mem(void *object, void *data, size_t size) +{ + struct pw_proxy *proxy = object; + struct spa_pod_parser prs; + uint32_t mem_id, type, memfd_idx, flags; + int memfd; + + spa_pod_parser_init(&prs, data, size, 0); + if (spa_pod_parser_get(&prs, + "[" + "i", &mem_id, + "I", &type, + "i", &memfd_idx, + "i", &flags, NULL) < 0) + return false; + + memfd = pw_protocol_native_get_proxy_fd(proxy, memfd_idx); + + pw_proxy_notify(proxy, struct pw_client_node_proxy_events, add_mem, + mem_id, + type, + memfd, flags); + return true; +} + static bool client_node_demarshal_transport(void *object, void *data, size_t size) { struct pw_proxy *proxy = object; @@ -294,34 +319,6 @@ static bool client_node_demarshal_port_set_param(void *object, void *data, size_ return true; } -static bool client_node_demarshal_port_add_mem(void *object, void *data, size_t size) -{ - struct pw_proxy *proxy = object; - struct spa_pod_parser prs; - uint32_t direction, port_id, mem_id, type, memfd_idx, flags; - int memfd; - - spa_pod_parser_init(&prs, data, size, 0); - if (spa_pod_parser_get(&prs, - "[" - "i", &direction, - "i", &port_id, - "i", &mem_id, - "I", &type, - "i", &memfd_idx, - "i", &flags, NULL) < 0) - return false; - - memfd = pw_protocol_native_get_proxy_fd(proxy, memfd_idx); - - pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_add_mem, direction, - port_id, - mem_id, - type, - memfd, flags); - return true; -} - static bool client_node_demarshal_port_use_buffers(void *object, void *data, size_t size) { struct pw_proxy *proxy = object; @@ -432,6 +429,26 @@ static bool client_node_demarshal_port_set_io(void *object, void *data, size_t s return true; } +static void +client_node_marshal_add_mem(void *object, + uint32_t mem_id, + uint32_t type, + int memfd, uint32_t flags) +{ + struct pw_resource *resource = object; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_ADD_MEM); + + spa_pod_builder_struct(b, + "i", mem_id, + "I", type, + "i", pw_protocol_native_add_resource_fd(resource, memfd), + "i", flags); + + pw_protocol_native_end_resource(resource, b); +} + static void client_node_marshal_transport(void *object, uint32_t node_id, int readfd, int writefd, struct pw_client_node_transport *transport) { @@ -556,30 +573,6 @@ client_node_marshal_port_set_param(void *object, pw_protocol_native_end_resource(resource, b); } -static void -client_node_marshal_port_add_mem(void *object, - enum spa_direction direction, - uint32_t port_id, - uint32_t mem_id, - uint32_t type, - int memfd, uint32_t flags) -{ - struct pw_resource *resource = object; - struct spa_pod_builder *b; - - b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_ADD_MEM); - - spa_pod_builder_struct(b, - "i", direction, - "i", port_id, - "i", mem_id, - "I", type, - "i", pw_protocol_native_add_resource_fd(resource, memfd), - "i", flags); - - pw_protocol_native_end_resource(resource, b); -} - static void client_node_marshal_port_use_buffers(void *object, uint32_t seq, @@ -838,6 +831,7 @@ static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_ static const struct pw_client_node_proxy_events pw_protocol_native_client_node_event_marshal = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + &client_node_marshal_add_mem, &client_node_marshal_transport, &client_node_marshal_set_param, &client_node_marshal_event_event, @@ -845,13 +839,13 @@ static const struct pw_client_node_proxy_events pw_protocol_native_client_node_e &client_node_marshal_add_port, &client_node_marshal_remove_port, &client_node_marshal_port_set_param, - &client_node_marshal_port_add_mem, &client_node_marshal_port_use_buffers, &client_node_marshal_port_command, &client_node_marshal_port_set_io, }; static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_event_demarshal[] = { + { &client_node_demarshal_add_mem, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_transport, 0 }, { &client_node_demarshal_set_param, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_event_event, PW_PROTOCOL_NATIVE_REMAP }, @@ -859,7 +853,6 @@ static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_ { &client_node_demarshal_add_port, 0 }, { &client_node_demarshal_remove_port, 0 }, { &client_node_demarshal_port_set_param, PW_PROTOCOL_NATIVE_REMAP }, - { &client_node_demarshal_port_add_mem, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_use_buffers, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_command, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_set_io, PW_PROTOCOL_NATIVE_REMAP }, diff --git a/src/pipewire/map.h b/src/pipewire/map.h index 44b76f850..2e77cdb4a 100644 --- a/src/pipewire/map.h +++ b/src/pipewire/map.h @@ -132,7 +132,7 @@ static inline bool pw_map_insert_at(struct pw_map *map, uint32_t id, void *data) return true; } -/** Remove and item at index +/** Remove an item at index * \param map the map to remove from * \param id the index to remove * \memberof pw_map diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 7c911973a..d812e7535 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -50,6 +50,7 @@ struct mem_id { uint32_t id; int fd; uint32_t flags; + uint32_t ref; }; struct buffer_id { @@ -58,6 +59,8 @@ struct buffer_id { struct spa_buffer *buf; void *ptr; struct pw_map_range map; + uint32_t n_mem; + struct mem_id **mem; }; struct port { @@ -66,7 +69,6 @@ struct port { struct pw_port *port; - struct pw_array mem_ids; struct pw_array buffer_ids; bool in_order; }; @@ -88,6 +90,8 @@ struct node_data { struct spa_graph_node in_node; struct port *in_ports; + struct pw_array mem_ids; + struct pw_node *node; struct spa_hook node_listener; @@ -411,7 +415,7 @@ void pw_remote_disconnect(struct pw_remote *remote) pw_protocol_client_disconnect (remote->conn); spa_list_for_each_safe(proxy, t2, &remote->proxy_list, link) - pw_proxy_destroy(proxy); + pw_proxy_destroy(proxy); remote->core_proxy = NULL; pw_map_clear(&remote->objects); @@ -517,10 +521,42 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) } } +static struct mem_id *find_mem(struct pw_array *mem_ids, uint32_t id) +{ + struct mem_id *mid; + + pw_array_for_each(mid, mem_ids) { + if (mid->id == id) + return mid; + } + return NULL; +} + +static void clear_memid(struct node_data *data, struct mem_id *mid) +{ + if (mid->fd != -1) { + bool has_ref = false; + int fd; + + fd = mid->fd; + mid->fd = -1; + + pw_array_for_each(mid, &data->mem_ids) { + if (mid->fd == fd) { + has_ref = true; + break; + } + } + if (!has_ref) + close(fd); + } +} + static void clean_transport(struct pw_proxy *proxy) { struct node_data *data = proxy->user_data; struct pw_port *port; + struct mem_id *mid; if (data->trans == NULL) return; @@ -536,6 +572,10 @@ static void clean_transport(struct pw_proxy *proxy) spa_graph_port_remove(&data->out_ports[port->port_id].input); } + pw_array_for_each(mid, &data->mem_ids) + clear_memid(data, mid); + pw_array_clear(&data->mem_ids); + free(data->in_ports); free(data->out_ports); pw_client_node_transport_destroy(data->trans); @@ -546,8 +586,6 @@ static void clean_transport(struct pw_proxy *proxy) static void port_init(struct port *port) { - pw_array_init(&port->mem_ids, 64); - pw_array_ensure_size(&port->mem_ids, sizeof(struct mem_id) * 64); pw_array_init(&port->buffer_ids, 32); pw_array_ensure_size(&port->buffer_ids, sizeof(struct buffer_id) * 64); port->in_order = true; @@ -567,6 +605,30 @@ static struct port *find_port(struct node_data *data, enum spa_direction directi } } +static void client_node_add_mem(void *object, + uint32_t mem_id, + uint32_t type, int memfd, uint32_t flags) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mem_id *m; + + m = find_mem(&data->mem_ids, mem_id); + if (m) { + pw_log_debug("update mem %u, fd %d, flags %d", + mem_id, memfd, flags); + clear_memid(data, m); + } else { + m = pw_array_add(&data->mem_ids, sizeof(struct mem_id)); + pw_log_debug("add mem %u, fd %d, flags %d", + mem_id, memfd, flags); + } + m->id = mem_id; + m->fd = memfd; + m->flags = flags; + m->ref = 0; +} + static void client_node_transport(void *object, uint32_t node_id, int readfd, int writefd, struct pw_client_node_transport *transport) @@ -836,34 +898,10 @@ client_node_port_set_param(void *object, pw_client_node_proxy_done(data->node_proxy, seq, res); } -static struct mem_id *find_mem(struct port *port, uint32_t id) -{ - struct mem_id *mid; - - pw_array_for_each(mid, &port->mem_ids) { - if (mid->id == id) - return mid; - } - return NULL; -} - -static void clear_memid(struct mem_id *mid) -{ - close(mid->fd); -} - -static void clear_mems(struct port *port) -{ - struct mem_id *mid; - - pw_array_for_each(mid, &port->mem_ids) - clear_memid(mid); - port->mem_ids.size = 0; -} - -static void clear_buffers(struct port *port) +static void clear_buffers(struct node_data *data, struct port *port) { struct buffer_id *bid; + int i; pw_log_debug("port %p: clear buffers", port); @@ -872,6 +910,14 @@ static void clear_buffers(struct port *port) if (munmap(bid->ptr, bid->map.size) < 0) pw_log_warn("failed to unmap: %m"); } + if (bid->mem != NULL) { + for (i = 0; i < bid->n_mem; i++) { + if (--bid->mem[i]->ref == 0) + clear_memid(data, bid->mem[i]); + } + bid->mem = NULL; + bid->n_mem = 0; + } bid->ptr = NULL; free(bid->buf); bid->buf = NULL; @@ -879,40 +925,6 @@ static void clear_buffers(struct port *port) port->buffer_ids.size = 0; } -static void clear_port(struct port *port) -{ - clear_buffers(port); - clear_mems(port); - pw_array_clear(&port->mem_ids); - pw_array_clear(&port->buffer_ids); -} - -static void -client_node_port_add_mem(void *object, - enum spa_direction direction, uint32_t port_id, - uint32_t mem_id, - uint32_t type, int memfd, uint32_t flags) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct mem_id *m; - struct port *port = find_port(data, direction, port_id); - - m = find_mem(port, mem_id); - if (m) { - pw_log_debug("update mem %u, fd %d, flags %d", - mem_id, memfd, flags); - clear_memid(m); - } else { - m = pw_array_add(&port->mem_ids, sizeof(struct mem_id)); - pw_log_debug("add mem %u, fd %d, flags %d", - mem_id, memfd, flags); - } - m->id = mem_id; - m->fd = memfd; - m->flags = flags; -} - static void client_node_port_use_buffers(void *object, uint32_t seq, @@ -940,14 +952,14 @@ client_node_port_use_buffers(void *object, prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); /* clear previous buffers */ - clear_buffers(port); + clear_buffers(data, port); bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); for (i = 0; i < n_buffers; i++) { off_t offset; - struct mem_id *mid = find_mem(port, buffers[i].mem_id); + struct mem_id *mid = find_mem(&data->mem_ids, buffers[i].mem_id); if (mid == NULL) { pw_log_warn("unknown memory id %u", buffers[i].mem_id); continue; @@ -975,18 +987,26 @@ client_node_port_use_buffers(void *object, size_t size; size = sizeof(struct spa_buffer); + size += sizeof(struct mem_id *); for (j = 0; j < buffers[i].buffer->n_metas; j++) size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) + for (j = 0; j < buffers[i].buffer->n_datas; j++) { size += sizeof(struct spa_data); + size += sizeof(struct mem_id *); + } b = bid->buf = malloc(size); memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = - SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct mem_id*); + bid->n_mem = 0; + + mid->ref++; + bid->mem[bid->n_mem++] = mid; } bid->id = b->id; @@ -1012,10 +1032,13 @@ client_node_port_use_buffers(void *object, struct spa_chunk); if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { - struct mem_id *bmid = find_mem(port, SPA_PTR_TO_UINT32(d->data)); + struct mem_id *bmid = find_mem(&data->mem_ids, + SPA_PTR_TO_UINT32(d->data)); d->data = NULL; d->fd = bmid->fd; + bmid->ref++; + bid->mem[bid->n_mem++] = bmid; pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); } else if (d->type == t->data.MemPtr) { d->data = SPA_MEMBER(bid->ptr, @@ -1031,9 +1054,6 @@ client_node_port_use_buffers(void *object, res = pw_port_use_buffers(port->port, bufs, n_buffers); - if (n_buffers == 0) - clear_mems(port); - done: pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -1078,7 +1098,7 @@ client_node_port_set_io(void *object, if (port == NULL) return; - mid = find_mem(port, memid); + mid = find_mem(&data->mem_ids, memid); if (mid == NULL) { pw_log_warn("unknown memory id %u", memid); return; @@ -1097,12 +1117,12 @@ client_node_port_set_io(void *object, id, SPA_MEMBER(ptr, r.start, void), size); - } static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + .add_mem = client_node_add_mem, .transport = client_node_transport, .set_param = client_node_set_param, .event = client_node_event, @@ -1110,7 +1130,6 @@ static const struct pw_client_node_proxy_events client_node_events = { .add_port = client_node_add_port, .remove_port = client_node_remove_port, .port_set_param = client_node_port_set_param, - .port_add_mem = client_node_port_add_mem, .port_use_buffers = client_node_port_use_buffers, .port_command = client_node_port_command, .port_set_io = client_node_port_set_io, @@ -1166,6 +1185,12 @@ static const struct pw_node_events node_events = { .have_output = node_have_output, }; +static void clear_port(struct node_data *data, struct port *port) +{ + clear_buffers(data, port); + pw_array_clear(&port->buffer_ids); +} + static void node_proxy_destroy(void *_data) { struct node_data *data = _data; @@ -1174,9 +1199,9 @@ static void node_proxy_destroy(void *_data) if (data->trans) { for (i = 0; i < data->trans->area->max_input_ports; i++) - clear_port(&data->in_ports[i]); + clear_port(data, &data->in_ports[i]); for (i = 0; i < data->trans->area->max_output_ports; i++) - clear_port(&data->out_ports[i]); + clear_port(data, &data->out_ports[i]); } clean_transport(proxy); @@ -1243,6 +1268,9 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, data->in_node_impl = node_impl; data->out_node_impl = node_impl; + pw_array_init(&data->mem_ids, 64); + pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64); + spa_graph_node_init(&data->in_node); spa_graph_node_set_implementation(&data->in_node, &data->in_node_impl); spa_graph_node_init(&data->out_node); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 78c661966..0ac755d51 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -46,6 +46,7 @@ struct mem_id { uint32_t id; int fd; uint32_t flags; + uint32_t ref; }; struct buffer_id { @@ -55,6 +56,8 @@ struct buffer_id { struct spa_buffer *buf; void *ptr; struct pw_map_range map; + uint32_t n_mem; + struct mem_id **mem; }; struct stream { @@ -131,7 +134,7 @@ static void clear_mems(struct pw_stream *stream) struct mem_id *mid; pw_array_for_each(mid, &impl->mem_ids) - clear_memid(impl, mid); + clear_memid(impl, mid); impl->mem_ids.size = 0; } @@ -793,10 +796,9 @@ client_node_port_set_param(void *data, } static void -client_node_port_add_mem(void *data, - enum spa_direction direction, uint32_t port_id, - uint32_t mem_id, - uint32_t type, int memfd, uint32_t flags) +client_node_add_mem(void *data, + uint32_t mem_id, + uint32_t type, int memfd, uint32_t flags) { struct stream *impl = data; struct pw_stream *stream = &impl->this; @@ -871,18 +873,26 @@ client_node_port_use_buffers(void *data, size_t size; size = sizeof(struct spa_buffer); + size += sizeof(struct mem_id *); for (j = 0; j < buffers[i].buffer->n_metas; j++) size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) + for (j = 0; j < buffers[i].buffer->n_datas; j++) { size += sizeof(struct spa_data); + size += sizeof(struct mem_id *); + } b = bid->buf = malloc(size); memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = - SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct mem_id*); + bid->n_mem = 0; + + mid->ref++; + bid->mem[bid->n_mem++] = mid; } bid->id = b->id; @@ -913,6 +923,8 @@ client_node_port_use_buffers(void *data, struct mem_id *bmid = find_mem(stream, SPA_PTR_TO_UINT32(d->data)); d->data = NULL; d->fd = bmid->fd; + bmid->ref++; + bid->mem[bid->n_mem++] = bmid; pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); } else if (d->type == t->data.MemPtr) { d->data = SPA_MEMBER(bid->ptr, @@ -967,6 +979,7 @@ static void client_node_transport(void *data, uint32_t node_id, static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + .add_mem = client_node_add_mem, .transport = client_node_transport, .set_param = client_node_set_param, .event = client_node_event, @@ -974,7 +987,6 @@ static const struct pw_client_node_proxy_events client_node_events = { .add_port = client_node_add_port, .remove_port = client_node_remove_port, .port_set_param = client_node_port_set_param, - .port_add_mem = client_node_port_add_mem, .port_use_buffers = client_node_port_use_buffers, .port_command = client_node_port_command, };