diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 828e73872..feefe537e 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -137,6 +137,7 @@ struct impl { struct pw_client_node_transport *transport; + struct pw_map io_map; struct pw_memblock *io_areas; uint32_t io_memid; @@ -600,7 +601,12 @@ static int do_port_set_io(struct impl *impl, struct mem *m; uint32_t memid, mem_offset, mem_size; - pw_log_debug("client-node %p: port %d.%d set io %p %zd", impl, port_id, mix_id, data, size); + pw_log_debug("client-node %p: %s port %d.%d set io %p %zd", impl, + direction == SPA_DIRECTION_INPUT ? "input" : "output", + port_id, mix_id, data, size); + + if (!CHECK_PORT(this, direction, port_id)) + return -EINVAL; if (this->resource == NULL) return 0; @@ -643,9 +649,6 @@ impl_node_port_set_io(struct spa_node *node, this = SPA_CONTAINER_OF(node, struct node, node); - if (!CHECK_PORT(this, direction, port_id)) - return -EINVAL; - return do_port_set_io(this->impl, direction, port_id, 0, id, data, size); } @@ -1215,6 +1218,7 @@ static void node_initialized(void *data) m = ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags); impl->io_memid = m->id; + pw_log_debug("client-node %p: io areas %p", node, impl->io_areas->ptr); pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), @@ -1246,16 +1250,35 @@ static void node_free(void *data) free(impl); } -static void *port_get_io(void *data, uint32_t id, size_t size) +static int port_init_mix(void *data, struct pw_port_mix *mix) { struct impl *impl = data; + uint32_t ioid; - return impl->io_areas->ptr; + ioid = pw_map_insert_new(&impl->io_map, NULL); + + mix->port.io = SPA_MEMBER(impl->io_areas->ptr, ioid * sizeof(struct spa_io_buffers), void); + + pw_log_debug("client-node %p: init mix io %d %p", impl, ioid, mix->port.io); + + return 0; +} + +static int port_release_mix(void *data, struct pw_port_mix *mix) +{ + struct impl *impl = data; + uint32_t id; + + id = (mix->port.io - (struct spa_io_buffers*)impl->io_areas->ptr); + + pw_map_remove(&impl->io_map, id); + return 0; } static const struct pw_port_implementation port_impl = { PW_VERSION_PORT_IMPLEMENTATION, - .get_io = port_get_io, + .init_mix = port_init_mix, + .release_mix = port_release_mix, }; static int mix_port_set_io(struct spa_node *node, @@ -1264,16 +1287,12 @@ static int mix_port_set_io(struct spa_node *node, { struct pw_port *p = SPA_CONTAINER_OF(node, struct pw_port, mix_node); struct impl *impl = p->owner_data; - struct node *this = &impl->node; pw_log_debug("client-node %p: mix port %d set io %p, %zd", impl, port_id, data, size); p->rt.port.io = data; p->rt.mix_port.io = data; - if (!CHECK_PORT(this, direction, port_id)) - return -EINVAL; - return do_port_set_io(impl, direction, p->port_id, port_id, id, data, size); @@ -1355,6 +1374,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, node_init(&impl->node, NULL, support, n_support); impl->node.impl = impl; + pw_map_init(&impl->io_map, 64, 64); pw_array_init(&impl->mems, 64); if ((name = pw_properties_get(properties, "node.name")) == NULL) diff --git a/src/pipewire/link.c b/src/pipewire/link.c index b513f2574..bf21518e9 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -40,6 +40,7 @@ struct impl { struct pw_link this; bool active; + bool have_io; struct pw_work_queue *work; @@ -480,14 +481,17 @@ param_filter(struct pw_link *this, return num; } -static void port_set_io(struct pw_link *this, struct pw_port *port, void *data, size_t size, +static int port_set_io(struct pw_link *this, struct pw_port *port, void *data, size_t size, struct spa_graph_port *p) { struct pw_type *t = &this->core->type; - int res; + int res = 0; p->io = data; - pw_log_debug("link %p: port %p %d.%d set io: %p", this, port, port->port_id, p->port_id, data); + pw_log_debug("link %p: %s port %p %d.%d set io: %p", this, + pw_direction_as_string(port->direction), + port, port->port_id, p->port_id, data); + if (port->mix_node.port_set_io) { if ((res = spa_node_port_set_io(&port->mix_node, p->direction, @@ -496,27 +500,33 @@ static void port_set_io(struct pw_link *this, struct pw_port *port, void *data, data, size)) < 0) pw_log_warn("port %p: can't set io: %s", port, spa_strerror(res)); } + return res; } static int select_io(struct pw_link *this) { + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct spa_io_buffers *io; - struct pw_type *t = &this->core->type; + int res; - if (this->output->implementation && this->output->implementation->get_io) - io = this->output->implementation->get_io(this->output->implementation_data, - t->io.Buffers, sizeof(struct spa_io_buffers)); - else if (this->input->implementation && this->input->implementation->get_io) - io = this->input->implementation->get_io(this->input->implementation_data, - t->io.Buffers, sizeof(struct spa_io_buffers)); - else - io = &this->io; + if (impl->have_io) + return 0; + io = this->rt.in_port.port.io; + if (io == NULL) + io = this->rt.out_port.port.io; if (io == NULL) return -EIO; - port_set_io(this, this->input, io, sizeof(struct spa_io_buffers), &this->rt.in_port); - port_set_io(this, this->output, io, sizeof(struct spa_io_buffers), &this->rt.out_port); + if ((res = port_set_io(this, this->input, io, + sizeof(struct spa_io_buffers), &this->rt.in_port.port)) < 0) + return res; + + if ((res = port_set_io(this, this->output, io, + sizeof(struct spa_io_buffers), &this->rt.out_port.port)) < 0) + return res; + + impl->have_io = true; return 0; } @@ -714,7 +724,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s if ((res = pw_port_use_buffers(output, allocation.buffers, allocation.n_buffers)) < 0) { - asprintf(&error, "error use output buffers: %d", res); + asprintf(&error, "link %p: error use output buffers: %s", this, + spa_strerror(res)); goto error; } if (SPA_RESULT_IS_ASYNC(res)) @@ -728,7 +739,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s if ((res = pw_port_use_buffers(input, allocation.buffers, allocation.n_buffers)) < 0) { - asprintf(&error, "error use input buffers: %d", res); + asprintf(&error, "link %p: error use input buffers: %s", this, + spa_strerror(res)); goto error; } if (SPA_RESULT_IS_ASYNC(res)) @@ -739,8 +751,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } - select_io(this); - + if ((res = select_io(this)) < 0) { + asprintf(&error, "link %p: error can set io: %s", this, spa_strerror(res)); + goto error; + } return 0; error: @@ -755,8 +769,8 @@ do_activate_link(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - SPA_FLAG_UNSET(this->rt.out_port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); - SPA_FLAG_UNSET(this->rt.in_port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + SPA_FLAG_UNSET(this->rt.out_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + SPA_FLAG_UNSET(this->rt.in_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); return 0; } @@ -892,7 +906,7 @@ do_remove_input(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - spa_graph_port_remove(&this->rt.in_port); + spa_graph_port_remove(&this->rt.in_port.port); return 0; } @@ -907,7 +921,7 @@ static void input_remove(struct pw_link *this, struct pw_port *port) pw_loop_invoke(port->node->data_loop, do_remove_input, 1, NULL, 0, true, this); - pw_map_remove(&port->mix_port_map, this->rt.in_port.port_id); + pw_port_release_mix(port, &this->rt.in_port); spa_list_remove(&this->input_link); spa_hook_list_call(&this->input->listener_list, struct pw_port_events, link_removed, this); @@ -921,7 +935,7 @@ do_remove_output(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - spa_graph_port_remove(&this->rt.out_port); + spa_graph_port_remove(&this->rt.out_port.port); return 0; } @@ -936,7 +950,7 @@ static void output_remove(struct pw_link *this, struct pw_port *port) pw_loop_invoke(port->node->data_loop, do_remove_output, 1, NULL, 0, true, this); - pw_map_remove(&port->mix_port_map, this->rt.out_port.port_id); + pw_port_release_mix(port, &this->rt.out_port); spa_list_remove(&this->output_link); spa_hook_list_call(&this->output->listener_list, struct pw_port_events, link_removed, this); @@ -990,8 +1004,8 @@ do_deactivate_link(struct spa_loop *loop, { struct pw_link *this = user_data; pw_log_trace("link %p: disable %p and %p", this, &this->rt.out_port, &this->rt.in_port); - SPA_FLAG_SET(this->rt.out_port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); - SPA_FLAG_SET(this->rt.in_port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + SPA_FLAG_SET(this->rt.out_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + SPA_FLAG_SET(this->rt.in_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); return 0; } @@ -1093,9 +1107,9 @@ do_add_link(struct spa_loop *loop, struct pw_port *port = ((struct pw_port **) data)[0]; if (port->direction == PW_DIRECTION_OUTPUT) { - spa_graph_port_add(&port->rt.mix_node, &this->rt.out_port); + spa_graph_port_add(&port->rt.mix_node, &this->rt.out_port.port); } else { - spa_graph_port_add(&port->rt.mix_node, &this->rt.in_port); + spa_graph_port_add(&port->rt.mix_node, &this->rt.in_port.port); } return 0; @@ -1197,27 +1211,14 @@ struct pw_link *pw_link_new(struct pw_core *core, this->io = SPA_IO_BUFFERS_INIT; - this->rt.out_port.port_id = pw_map_insert_new(&output->mix_port_map, NULL); - this->rt.in_port.port_id = pw_map_insert_new(&input->mix_port_map, NULL); + pw_port_init_mix(output, &this->rt.out_port); + pw_port_init_mix(input, &this->rt.in_port); pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl, - output_node, output->port_id, this->rt.out_port.port_id, - input_node, input->port_id, this->rt.in_port.port_id); + output_node, output->port_id, this->rt.out_port.port.port_id, + input_node, input->port_id, this->rt.in_port.port.port_id); - spa_graph_port_init(&this->rt.out_port, - PW_DIRECTION_OUTPUT, - this->rt.out_port.port_id, - SPA_GRAPH_PORT_FLAG_DISABLED, - &this->io); - spa_graph_port_init(&this->rt.in_port, - PW_DIRECTION_INPUT, - this->rt.in_port.port_id, - SPA_GRAPH_PORT_FLAG_DISABLED, - &this->io); - spa_graph_port_link(&this->rt.out_port, &this->rt.in_port); - - this->rt.in_port.scheduler_data = this; - this->rt.out_port.scheduler_data = this; + spa_graph_port_link(&this->rt.out_port.port, &this->rt.in_port.port); /* nodes can be in different data loops so we do this twice */ pw_loop_invoke(output_node->data_loop, do_add_link, diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 79badaa03..5c6880483 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -160,6 +160,45 @@ static const struct spa_node schedule_mix_node = { .port_reuse_buffer = schedule_mix_reuse_buffer, }; +int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix) +{ + uint32_t id; + int res = 0; + const struct pw_port_implementation *pi = port->implementation; + + id = pw_map_insert_new(&port->mix_port_map, NULL); + + spa_graph_port_init(&mix->port, + port->direction, id, + SPA_GRAPH_PORT_FLAG_DISABLED, + NULL); + + mix->port.scheduler_data = port; + + if (pi && pi->init_mix) + res = pi->init_mix(port->implementation_data, mix); + + pw_log_debug("port %p: init mix %d.%d io %p", port, + port->port_id, mix->port.port_id, mix->port.io); + + return res; +} +int pw_port_release_mix(struct pw_port *port, struct pw_port_mix *mix) +{ + int res = 0; + const struct pw_port_implementation *pi = port->implementation; + + pw_map_remove(&port->mix_port_map, mix->port.port_id); + + if (pi && pi->release_mix) + res = pi->release_mix(port->implementation_data, mix); + + pw_log_debug("port %p: release mix %d.%d", port, + port->port_id, mix->port.port_id); + + return res; +} + struct pw_port *pw_port_new(enum pw_direction direction, uint32_t port_id, struct pw_properties *properties, @@ -185,7 +224,7 @@ struct pw_port *pw_port_new(enum pw_direction direction, this->port_id = port_id; this->properties = properties; this->state = PW_PORT_STATE_INIT; - this->io = SPA_IO_BUFFERS_INIT; + this->rt.io = SPA_IO_BUFFERS_INIT; if (user_data_size > 0) this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); @@ -204,7 +243,7 @@ struct pw_port *pw_port_new(enum pw_direction direction, this->direction, this->port_id, 0, - &this->io); + &this->rt.io); spa_graph_node_init(&this->rt.mix_node); this->mix_node = this->direction == PW_DIRECTION_INPUT ? @@ -217,7 +256,7 @@ struct pw_port *pw_port_new(enum pw_direction direction, pw_direction_reverse(this->direction), 0, 0, - &this->io); + &this->rt.io); this->rt.mix_port.scheduler_data = this; this->rt.port.scheduler_data = this; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index e58c597f3..4f00edced 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -212,38 +212,6 @@ static inline void free_allocation(struct allocation *alloc) alloc->n_buffers = 0; } -struct pw_link { - struct pw_core *core; /**< core object */ - struct spa_list link; /**< link in core link_list */ - struct pw_global *global; /**< global for this link */ - struct spa_hook global_listener; - bool registered; - - struct pw_link_info info; /**< introspectable link info */ - struct pw_properties *properties; /**< extra link properties */ - - enum pw_link_state state; /**< link state */ - char *error; /**< error message when state error */ - - struct spa_list resource_list; /**< list of bound resources */ - - struct spa_io_buffers io; /**< link io area */ - - struct pw_port *output; /**< output port */ - struct spa_list output_link; /**< link in output port links */ - struct pw_port *input; /**< input port */ - struct spa_list input_link; /**< link in input port links */ - - struct spa_hook_list listener_list; - - struct { - struct spa_graph_port out_port; - struct spa_graph_port in_port; - } rt; - - void *user_data; -}; - struct pw_module { struct pw_core *core; /**< the core object */ struct spa_list link; /**< link in the core module_list */ @@ -300,11 +268,18 @@ struct pw_node { void *user_data; /**< extra user data */ }; +struct pw_port_mix { + struct spa_graph_port port; + struct spa_buffer *buffers; + uint32_t n_buffers; +}; + struct pw_port_implementation { #define PW_VERSION_PORT_IMPLEMENTATION 0 uint32_t version; - void *(*get_io) (void *data, uint32_t id, size_t size); + int (*init_mix) (void *data, struct pw_port_mix *mix); + int (*release_mix) (void *data, struct pw_port_mix *mix); }; struct pw_port { @@ -326,8 +301,6 @@ struct pw_port { enum pw_port_state state; /**< state of the port */ - struct spa_io_buffers io; /**< io area of the port */ - bool allocated; /**< if buffers are allocated */ struct allocation allocation; @@ -346,6 +319,7 @@ struct pw_port { struct { struct spa_graph *graph; + struct spa_io_buffers io; /**< io area of the port */ struct spa_graph_port port; /**< this graph port, linked to mix_port */ struct spa_graph_port mix_port; /**< port from the mixer */ struct spa_graph_node mix_node; /**< mixer node */ @@ -355,6 +329,37 @@ struct pw_port { void *user_data; /**< extra user data */ }; +struct pw_link { + struct pw_core *core; /**< core object */ + struct spa_list link; /**< link in core link_list */ + struct pw_global *global; /**< global for this link */ + struct spa_hook global_listener; + bool registered; + + struct pw_link_info info; /**< introspectable link info */ + struct pw_properties *properties; /**< extra link properties */ + + enum pw_link_state state; /**< link state */ + char *error; /**< error message when state error */ + + struct spa_list resource_list; /**< list of bound resources */ + + struct spa_io_buffers io; /**< link io area if not provided by ports */ + + struct pw_port *output; /**< output port */ + struct spa_list output_link; /**< link in output port links */ + struct pw_port *input; /**< input port */ + struct spa_list input_link; /**< link in input port links */ + + struct spa_hook_list listener_list; + + struct { + struct pw_port_mix out_port; + struct pw_port_mix in_port; + } rt; + + void *user_data; +}; struct pw_resource { struct pw_core *core; /**< the core object */ @@ -520,6 +525,9 @@ void * pw_port_get_user_data(struct pw_port *port); /** Add a port to a node \memberof pw_port */ int pw_port_add(struct pw_port *port, struct pw_node *node); +int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix); +int pw_port_release_mix(struct pw_port *port, struct pw_port_mix *mix); + /** Unlink a port \memberof pw_port */ void pw_port_unlink(struct pw_port *port);