From eef6f380c1f93a9a3fdc3290cc7007a1ee7a68a0 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 24 Oct 2017 13:00:32 +0200 Subject: [PATCH] remote: handle per port buffers and memory --- src/pipewire/remote.c | 149 ++++++++++++++++++++++++++---------------- 1 file changed, 92 insertions(+), 57 deletions(-) diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 11a80c931..7d1688575 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -64,6 +64,12 @@ struct buffer_id { struct port { struct spa_graph_port output; struct spa_graph_port input; + + struct pw_port *port; + + struct pw_array mem_ids; + struct pw_array buffer_ids; + bool in_order; }; struct node_data { @@ -89,11 +95,6 @@ struct node_data { struct pw_client_node_proxy *node_proxy; struct spa_hook node_proxy_listener; struct spa_hook proxy_listener; - - struct pw_array mem_ids; - struct pw_array buffer_ids; - bool in_order; - }; /** \endcond */ @@ -537,10 +538,28 @@ static void clean_transport(struct pw_proxy *proxy) data->trans = NULL; } -struct port_info { - struct spa_graph_port internal; - struct spa_graph_port external; -}; +static void port_init(struct port *port) +{ + pw_array_init(&port->mem_ids, 64); + pw_array_ensure_size(&port->mem_ids, sizeof(struct mem_id) * 64); + pw_array_init(&port->buffer_ids, 32); + pw_array_ensure_size(&port->buffer_ids, sizeof(struct buffer_id) * 64); + port->in_order = true; +} + +static struct port *find_port(struct node_data *data, enum spa_direction direction, uint32_t port_id) +{ + if (direction == SPA_DIRECTION_INPUT) { + if (port_id > data->trans->area->max_input_ports) + return NULL; + return &data->in_ports[port_id]; + } + else { + if (port_id > data->trans->area->max_output_ports) + return NULL; + return &data->out_ports[port_id]; + } +} static void client_node_transport(void *object, uint32_t node_id, int readfd, int writefd, @@ -565,6 +584,7 @@ static void client_node_transport(void *object, uint32_t node_id, sizeof(struct port)); for (i = 0; i < data->trans->area->max_input_ports; i++) { + port_init(&data->in_ports[i]); data->trans->inputs[i] = SPA_PORT_IO_INIT; spa_graph_port_init(&data->in_ports[i].input, SPA_DIRECTION_INPUT, @@ -580,10 +600,14 @@ static void client_node_transport(void *object, uint32_t node_id, spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input); pw_log_info("transport in %d %p", i, &data->trans->inputs[i]); } - spa_list_for_each(port, &data->node->input_ports, link) + spa_list_for_each(port, &data->node->input_ports, link) { spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input); + data->in_ports[port->port_id].port = port; + } for (i = 0; i < data->trans->area->max_output_ports; i++) { + port_init(&data->out_ports[i]); + data->trans->outputs[i] = SPA_PORT_IO_INIT; spa_graph_port_init(&data->out_ports[i].output, SPA_DIRECTION_OUTPUT, i, @@ -598,8 +622,10 @@ static void client_node_transport(void *object, uint32_t node_id, spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input); pw_log_info("transport out %d %p", i, &data->trans->inputs[i]); } - spa_list_for_each(port, &data->node->output_ports, link) + spa_list_for_each(port, &data->node->output_ports, link) { spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output); + data->out_ports[port->port_id].port = port; + } data->rtwritefd = writefd; data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, @@ -699,20 +725,20 @@ client_node_set_format(void *object, { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; - struct pw_port *port; + struct port *port; int res; - port = pw_node_find_port(data->node, direction, port_id); - if (port == NULL) { + port = find_port(data, direction, port_id); + if (port == NULL || port->port == NULL) { res = SPA_RESULT_INVALID_PORT; goto done; } - res = pw_port_set_format(port, flags, format); + res = pw_port_set_format(port->port, flags, format); if (res != SPA_RESULT_OK) goto done; - add_port_update(proxy, port, + add_port_update(proxy, port->port, PW_CLIENT_NODE_PORT_UPDATE_FORMAT | PW_CLIENT_NODE_PORT_UPDATE_PARAMS | PW_CLIENT_NODE_PORT_UPDATE_INFO); @@ -731,12 +757,11 @@ client_node_set_param(void *object, pw_log_warn("set param not implemented"); } -static struct mem_id *find_mem(struct pw_proxy *proxy, uint32_t id) +static struct mem_id *find_mem(struct port *port, uint32_t id) { struct mem_id *mid; - struct node_data *data = proxy->user_data; - pw_array_for_each(mid, &data->mem_ids) { + pw_array_for_each(mid, &port->mem_ids) { if (mid->id == id) return mid; } @@ -751,28 +776,34 @@ static void clear_memid(struct mem_id *mid) close(mid->fd); } -static void clear_mems(struct pw_proxy *proxy) +static void clear_mems(struct port *port) { - struct node_data *data = proxy->user_data; struct mem_id *mid; - pw_array_for_each(mid, &data->mem_ids) + pw_array_for_each(mid, &port->mem_ids) clear_memid(mid); - data->mem_ids.size = 0; + port->mem_ids.size = 0; } -static void clear_buffers(struct pw_proxy *proxy) +static void clear_buffers(struct port *port) { - struct node_data *data = proxy->user_data; struct buffer_id *bid; - pw_log_debug("node %p: clear buffers", proxy); + pw_log_debug("port %p: clear buffers", port); - pw_array_for_each(bid, &data->buffer_ids) { + pw_array_for_each(bid, &port->buffer_ids) { free(bid->buf); bid->buf = NULL; } - data->buffer_ids.size = 0; + port->buffer_ids.size = 0; +} + +static void clear_port(struct port *port) +{ + clear_buffers(port); + clear_mems(port); + pw_array_clear(&port->mem_ids); + pw_array_clear(&port->buffer_ids); } static void @@ -785,14 +816,15 @@ client_node_add_mem(void *object, struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; struct mem_id *m; + struct port *port = find_port(data, direction, port_id); - m = find_mem(proxy, mem_id); + m = find_mem(port, mem_id); if (m) { pw_log_debug("update mem %u, fd %d, flags %d, off %d, size %d", mem_id, memfd, flags, offset, size); clear_memid(m); } else { - m = pw_array_add(&data->mem_ids, sizeof(struct mem_id)); + m = pw_array_add(&port->mem_ids, sizeof(struct mem_id)); pw_log_debug("add mem %u, fd %d, flags %d, off %d, size %d", mem_id, memfd, flags, offset, size); } @@ -815,24 +847,26 @@ client_node_use_buffers(void *object, struct buffer_id *bid; uint32_t i, j, len; struct spa_buffer *b, **bufs; - struct pw_port *port; - int res; + struct port *port; + int res, prot; - port = pw_node_find_port(data->node, direction, port_id); + port = find_port(data, direction, port_id); if (port == NULL) { res = SPA_RESULT_INVALID_PORT; goto done; } + prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + /* clear previous buffers */ - clear_buffers(proxy); + clear_buffers(port); bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); for (i = 0; i < n_buffers; i++) { off_t offset; - struct mem_id *mid = find_mem(proxy, buffers[i].mem_id); + struct mem_id *mid = find_mem(port, buffers[i].mem_id); if (mid == NULL) { pw_log_warn("unknown memory id %u", buffers[i].mem_id); continue; @@ -840,7 +874,7 @@ client_node_use_buffers(void *object, if (mid->ptr == NULL) { mid->ptr = - mmap(NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, + mmap(NULL, mid->size + mid->offset, prot, MAP_SHARED, mid->fd, 0); if (mid->ptr == MAP_FAILED) { mid->ptr = NULL; @@ -849,8 +883,8 @@ client_node_use_buffers(void *object, continue; } } - len = pw_array_get_len(&data->buffer_ids, struct buffer_id); - bid = pw_array_add(&data->buffer_ids, sizeof(struct buffer_id)); + len = pw_array_get_len(&port->buffer_ids, struct buffer_id); + bid = pw_array_add(&port->buffer_ids, sizeof(struct buffer_id)); b = buffers[i].buffer; @@ -896,15 +930,19 @@ client_node_use_buffers(void *object, struct spa_chunk); if (d->type == proxy->remote->core->type.data.Id) { - struct mem_id *bmid = find_mem(proxy, SPA_PTR_TO_UINT32(d->data)); + struct mem_id *bmid = find_mem(port, SPA_PTR_TO_UINT32(d->data)); void *map; d->type = proxy->remote->core->type.data.MemFd; d->fd = bmid->fd; - map = mmap(NULL, d->maxsize + d->mapoffset, PROT_READ|PROT_WRITE, - MAP_SHARED, d->fd, 0); + map = mmap(NULL, d->maxsize + d->mapoffset, prot, MAP_SHARED, d->fd, 0); + if (map == MAP_FAILED) { + pw_log_error("data %d failed to mmap memory %m", j); + res = SPA_RESULT_ERROR; + goto done; + } d->data = SPA_MEMBER(map, d->mapoffset, uint8_t); - pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); + pw_log_debug(" data %d %u -> fd %d mem %p", j, bmid->id, bmid->fd, map); } else if (d->type == proxy->remote->core->type.data.MemPtr) { d->data = SPA_MEMBER(bid->buf_ptr, SPA_PTR_TO_INT(d->data), void); d->fd = -1; @@ -916,10 +954,10 @@ client_node_use_buffers(void *object, bufs[i] = b; } - res = pw_port_use_buffers(port, bufs, n_buffers); + res = pw_port_use_buffers(port->port, bufs, n_buffers); if (n_buffers == 0) - clear_mems(proxy); + clear_mems(port); done: pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -1080,18 +1118,20 @@ static const struct pw_node_events node_events = { .have_output = node_have_output, }; -static void node_proxy_destroy(void *data) +static void node_proxy_destroy(void *_data) { - struct node_data *d = data; - struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy; + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*) data->node_proxy; + int i; + + for (i = 0; i < data->trans->area->max_input_ports; i++) + clear_port(&data->in_ports[i]); + for (i = 0; i < data->trans->area->max_output_ports; i++) + clear_port(&data->out_ports[i]); clean_transport(proxy); - clear_buffers(proxy); - clear_mems(proxy); - pw_array_clear(&d->mem_ids); - pw_array_clear(&d->buffer_ids); - spa_hook_remove(&d->node_listener); + spa_hook_remove(&data->node_listener); } static const struct pw_proxy_events proxy_events = { @@ -1159,11 +1199,6 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, spa_graph_node_init(&data->out_node); spa_graph_node_set_implementation(&data->out_node, &data->out_node_impl); - pw_array_init(&data->mem_ids, 64); - pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64); - pw_array_init(&data->buffer_ids, 32); - pw_array_ensure_size(&data->buffer_ids, sizeof(struct buffer_id) * 64); - pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data); pw_node_add_listener(node, &data->node_listener, &node_events, data);