diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 255cbbca6..2c4d4ad21 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -47,6 +47,7 @@ #define MAX_BUFFERS 64 #define MAX_AREAS 1024 +#define MAX_IO 16 #define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) #define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) @@ -81,6 +82,11 @@ struct buffer { uint32_t memid; }; +struct io { + uint32_t id; + uint32_t memid; +}; + struct port { bool valid; struct spa_port_info info; @@ -89,10 +95,11 @@ struct port { bool have_format; uint32_t n_params; struct spa_pod **params; - struct spa_io_buffers *io; uint32_t n_buffers; struct buffer buffers[MAX_BUFFERS]; + + struct io ios[MAX_IO]; }; struct node { @@ -139,7 +146,6 @@ struct impl { struct pw_map io_map; struct pw_memblock *io_areas; - uint32_t io_memid; struct spa_hook node_listener; struct spa_hook resource_listener; @@ -185,6 +191,53 @@ static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t return m; } +static void clear_io(struct node *node, struct io *io) +{ + struct mem *m; + m = pw_array_get_unchecked(&node->impl->mems, io->memid, struct mem); + m->ref--; + io->id = SPA_ID_INVALID; +} + +static struct io *update_io(struct impl *impl, struct port *port, uint32_t id, uint32_t memid) +{ + int i; + struct io *io, *f = NULL; + + for (i = 0; i < MAX_IO; i++) { + io = &port->ios[i]; + if (io->id == SPA_ID_INVALID) + f = io; + else if (io->id == id) { + if (io->memid != memid) { + clear_io(&impl->node, io); + if (memid == SPA_ID_INVALID) + io->id = SPA_ID_INVALID; + } + goto found; + } + } + if (f == NULL) + return NULL; + + io = f; + io->id = id; + io->memid = memid; + + found: + return io; +} + +static void clear_ios(struct node *this, struct port *port) +{ + int i; + + for (i = 0; i < MAX_IO; i++) { + struct io *io = &port->ios[i]; + if (io->id != SPA_ID_INVALID) + clear_io(this, io); + } +} static int clear_buffers(struct node *this, struct port *port) { @@ -266,12 +319,12 @@ static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flag return SPA_RESULT_RETURN_ASYNC(this->seq++); } -static inline void do_flush(struct node *this) +static inline void send_process(struct node *this) { uint64_t cmd = 1; + pw_log_trace("client-node %p: send process", this); if (write(this->writefd, &cmd, 8) != 8) - spa_log_warn(this->log, "node %p: error flushing : %s", this, strerror(errno)); - + spa_log_warn(this->log, "node %p: error %s", this, strerror(errno)); } static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) @@ -384,12 +437,11 @@ do_update_port(struct node *this, { struct port *port; struct pw_type *t = this->impl->t; + int i; port = GET_PORT(this, direction, port_id); if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { - int i; - port->have_format = false; spa_log_info(this->log, "node %p: port %u update %d params", this, port_id, n_params); @@ -425,6 +477,8 @@ do_update_port(struct node *this, spa_log_info(this->log, "node %p: adding port %d", this, port_id); port->have_format = false; port->valid = true; + for (i = 0; i < MAX_IO; i++) + port->ios[i].id = SPA_ID_INVALID; if (direction == SPA_DIRECTION_INPUT) this->n_inputs++; @@ -443,6 +497,7 @@ clear_port(struct node *this, PW_CLIENT_NODE_PORT_UPDATE_PARAMS | PW_CLIENT_NODE_PORT_UPDATE_INFO, 0, NULL, NULL); clear_buffers(this, port); + clear_ios(this, port); } static void do_uninit_port(struct node *this, enum spa_direction direction, uint32_t port_id) @@ -597,6 +652,7 @@ static int do_port_set_io(struct impl *impl, struct pw_memblock *mem; struct mem *m; uint32_t memid, mem_offset, mem_size; + struct port *port; pw_log_debug("client-node %p: %s port %d.%d set io %p %zd", impl, direction == SPA_DIRECTION_INPUT ? "input" : "output", @@ -608,6 +664,8 @@ static int do_port_set_io(struct impl *impl, if (this->resource == NULL) return 0; + port = GET_PORT(this, direction, port_id); + if (data) { if ((mem = pw_memblock_find(data)) == NULL) return -EINVAL; @@ -625,6 +683,7 @@ static int do_port_set_io(struct impl *impl, memid = SPA_ID_INVALID; mem_offset = mem_size = 0; } + update_io(impl, port, id, memid); pw_client_node_resource_port_set_io(this->resource, this->seq, @@ -805,7 +864,7 @@ impl_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t bu pw_client_node_transport_add_message(impl->transport, (struct pw_client_node_message *) &PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id, buffer_id)); - do_flush(this); + send_process(this); return 0; } @@ -842,14 +901,14 @@ impl_node_port_send_command(struct spa_node *node, static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); - uint64_t cmd = 1; + struct impl *impl = this->impl; - pw_log_trace("client-node %p: send process", this); - - if (write(this->writefd, &cmd, 8) != 8) - spa_log_warn(this->log, "node %p: error flushing : %s", this, strerror(errno)); - - return SPA_STATUS_OK; + if (impl->this.node->driver) + return SPA_STATUS_HAVE_BUFFER; + else { + send_process(this); + return SPA_STATUS_OK; + } } static void @@ -1091,7 +1150,6 @@ static void node_initialized(void *data) struct pw_client_node *this = &impl->this; struct pw_node *node = this->node; struct pw_type *t = impl->t; - struct mem *m; if (this->resource == NULL) return; @@ -1113,8 +1171,7 @@ static void node_initialized(void *data) &impl->io_areas) < 0) return; - m = ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags); - impl->io_memid = m->id; + ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags); pw_log_debug("client-node %p: io areas %p", node, impl->io_areas->ptr); pw_client_node_resource_transport(this->resource, @@ -1218,11 +1275,18 @@ static void node_port_added(void *data, struct pw_port *port) port->owner_data = impl; } +static void node_finish(void *data) +{ + struct impl *impl = data; + send_process(&impl->node); +} + static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .free = node_free, .initialized = node_initialized, .port_added = node_port_added, + .finish = node_finish, }; static const struct pw_resource_events resource_events = {