From 4f680c224b756cca0727079b6ea8fb8058c642fe Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 1 Mar 2018 17:39:17 +0100 Subject: [PATCH] make per mix port io and buffers Move the io areas to a separate memory block. Make per link io areas for the ports Send per mix port io area and buffers --- src/daemon/pipewire.conf.in | 7 +- src/extensions/client-node.h | 15 +- src/modules/module-client-node/client-node.c | 190 +++++++++++------- .../module-client-node/protocol-native.c | 13 +- src/modules/module-client-node/transport.c | 19 -- src/modules/module-protocol-native.c | 2 +- src/pipewire/link.c | 57 ++++-- src/pipewire/mem.c | 4 +- src/pipewire/private.h | 11 + src/pipewire/remote.c | 45 ++--- src/pipewire/stream.c | 40 ++-- 11 files changed, 242 insertions(+), 161 deletions(-) diff --git a/src/daemon/pipewire.conf.in b/src/daemon/pipewire.conf.in index fb6cfdb4b..d4ca67d20 100644 --- a/src/daemon/pipewire.conf.in +++ b/src/daemon/pipewire.conf.in @@ -2,7 +2,7 @@ load-module libpipewire-module-rtkit load-module libpipewire-module-protocol-native load-module libpipewire-module-suspend-on-idle -#load-module libpipewire-module-spa-monitor alsa/libspa-alsa alsa-monitor alsa +load-module libpipewire-module-spa-monitor alsa/libspa-alsa alsa-monitor alsa load-module libpipewire-module-spa-monitor v4l2/libspa-v4l2 v4l2-monitor v4l2 #load-module libpipewire-module-spa-monitor bluez5/libspa-bluez5 bluez5-monitor bluez5 #load-module libpipewire-module-spa-node videotestsrc/libspa-videotestsrc videotestsrc videotestsrc Spa:POD:Object:Props:patternType=Spa:POD:Object:Props:patternType:snow @@ -10,6 +10,5 @@ load-module libpipewire-module-autolink #load-module libpipewire-module-mixer load-module libpipewire-module-client-node load-module libpipewire-module-flatpak -#load-module libpipewire-module-audio-dsp -#load-module libpipewire-module-link-factory -#load-module libpipewire-module-jack +load-module libpipewire-module-audio-dsp +load-module libpipewire-module-link-factory diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 7d7e06274..26ae63654 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -56,8 +56,6 @@ struct pw_client_node_area { */ struct pw_client_node_transport { struct pw_client_node_area *area; /**< the transport area */ - struct spa_io_buffers *inputs; /**< array of buffer input io */ - struct spa_io_buffers *outputs; /**< array of buffer output io */ void *input_data; /**< input memory for ringbuffer */ struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */ void *output_data; /**< output memory for ringbuffer */ @@ -308,6 +306,15 @@ struct pw_client_node_proxy_events { /** * Memory was added to a node * + * Memory is given to a node as an fd in \a memfd of a certain + * memory \a type. + * + * Further references to this fd will be made with the per memory + * unique identifier \a mem_id. + * + * Buffers or controls will reference the memory by \a mem_id and + * mapping the specified area will give access to the memory. + * * \param mem_id the id of the memory * \param type the memory type * \param memfd the fd of the memory @@ -406,6 +413,7 @@ struct pw_client_node_proxy_events { * \param seq a sequence number * \param direction a port direction * \param port_id the port id + * \param mix_id the mixer port id * \param n_buffer the number of buffers * \param buffers and array of buffer descriptions */ @@ -413,6 +421,7 @@ struct pw_client_node_proxy_events { uint32_t seq, enum spa_direction direction, uint32_t port_id, + uint32_t mix_id, uint32_t n_buffers, struct pw_client_node_buffer *buffers); /** @@ -433,6 +442,7 @@ struct pw_client_node_proxy_events { * \param seq a sequence number * \param direction the direction of the port * \param port_id the port id + * \param mix_id the mixer port id * \param id the id of the io area to set * \param mem_id the id of the memory to use * \param offset offset of io area in memory @@ -442,6 +452,7 @@ struct pw_client_node_proxy_events { uint32_t seq, enum spa_direction direction, uint32_t port_id, + uint32_t mix_id, uint32_t id, uint32_t mem_id, uint32_t offset, diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 8502a735f..0c75510d1 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -42,10 +42,11 @@ /** \cond */ -#define MAX_INPUTS 64 -#define MAX_OUTPUTS 64 +#define MAX_INPUTS 64 +#define MAX_OUTPUTS 64 -#define MAX_BUFFERS 64 +#define MAX_BUFFERS 64 +#define MAX_AREAS 1024 #define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) #define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) @@ -136,6 +137,9 @@ struct impl { struct pw_client_node_transport *transport; + struct pw_memblock *io_areas; + uint32_t io_memid; + struct spa_hook node_listener; struct spa_hook resource_listener; @@ -593,52 +597,7 @@ impl_node_port_set_io(struct spa_node *node, uint32_t id, void *data, size_t size) { - struct node *this; - struct impl *impl; - struct pw_type *t; - struct pw_memblock *mem; - struct mem *m; - uint32_t memid, mem_offset, mem_size; - - if (node == NULL) - return -EINVAL; - - this = SPA_CONTAINER_OF(node, struct node, node); - impl = this->impl; - t = impl->t; - - if (this->resource == NULL) - return 0; - - if (!CHECK_PORT(this, direction, port_id)) - return -EINVAL; - - if (data) { - if ((mem = pw_memblock_find(data)) == NULL) - return -EINVAL; - - mem_offset = SPA_PTRDIFF(data, mem->ptr); - mem_size = mem->size; - if (mem_size - mem_offset < size) - return -EINVAL; - - mem_offset += mem->offset; - m = ensure_mem(impl, mem->fd, t->data.MemFd, mem->flags); - memid = m->id; - } - else { - memid = SPA_ID_INVALID; - mem_offset = mem_size = 0; - } - - pw_client_node_resource_port_set_io(this->resource, - this->seq, - direction, port_id, - id, - memid, - mem_offset, mem_size); - - return SPA_RESULT_RETURN_ASYNC(this->seq++); + return -ENOTSUP; } static int @@ -750,7 +709,7 @@ impl_node_port_use_buffers(struct spa_node *node, pw_client_node_resource_port_use_buffers(this->resource, this->seq, - direction, port_id, + direction, port_id, 0, n_buffers, mb); return SPA_RESULT_RETURN_ASYNC(this->seq++); @@ -855,7 +814,6 @@ static int impl_node_process_input(struct spa_node *node) struct spa_io_buffers *io = p->io; pw_log_trace("set io status to %d %d", io->status, io->buffer_id); - impl->transport->inputs[p->port_id] = *io; /* explicitly recycle buffers when the client is not going to do it */ if (!client_reuse && (pp = p->peer)) @@ -876,29 +834,10 @@ static int impl_node_process_output(struct spa_node *node) { struct node *this; struct impl *impl; - struct spa_graph_node *n; - struct spa_graph_port *p; this = SPA_CONTAINER_OF(node, struct node, node); impl = this->impl; - n = &impl->this.node->rt.node; - if (impl->out_pending) - goto done; - - impl->out_pending = true; - - spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { - struct spa_io_buffers *io = p->io; - - impl->transport->outputs[p->port_id] = *io; - - pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, - impl->transport->outputs[p->port_id].status, - impl->transport->outputs[p->port_id].buffer_id); - } - - done: pw_client_node_transport_add_message(impl->transport, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT)); do_flush(this); @@ -916,18 +855,15 @@ static int handle_node_message(struct node *this, struct pw_client_node_message switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { case PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT: - spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { - *p->io = impl->transport->outputs[p->port_id]; - pw_log_trace("have output %d %d", p->io->status, p->io->buffer_id); - } impl->out_pending = false; this->callbacks->have_output(this->callbacks_data); break; case PW_CLIENT_NODE_MESSAGE_NEED_INPUT: - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - *p->io = impl->transport->inputs[p->port_id]; - pw_log_trace("need input %d %d", p->io->status, p->io->buffer_id); + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { + struct spa_graph_node *ni = p->peer->node; + spa_node_process_output(ni->implementation); + pw_log_trace("need input %p %d %d", p->io, p->io->status, p->io->buffer_id); } impl->input_ready++; this->callbacks->need_input(this->callbacks_data); @@ -1205,6 +1141,8 @@ static void node_initialized(void *data) struct impl *impl = data; struct pw_client_node *this = &impl->this; struct pw_node *node = this->node; + struct pw_type *t = impl->t; + struct mem *m; if (this->resource == NULL) return; @@ -1219,6 +1157,16 @@ static void node_initialized(void *data) spa_loop_add_source(impl->node.data_loop, &impl->node.data_source); pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); + if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | + PW_MEMBLOCK_FLAG_MAP_READWRITE | + PW_MEMBLOCK_FLAG_SEAL, + sizeof(struct spa_io_buffers) * MAX_AREAS, + &impl->io_areas) < 0) + return; + + m = ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags); + impl->io_memid = m->id; + pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), impl->other_fds[0], @@ -1247,10 +1195,98 @@ static void node_free(void *data) free(impl); } +static void *port_get_io(void *data, uint32_t id, size_t size) +{ + struct impl *impl = data; + + return impl->io_areas->ptr; +} + +static const struct pw_port_implementation port_impl = { + PW_VERSION_PORT_IMPLEMENTATION, + .get_io = port_get_io, +}; + +static int mix_port_set_io(struct spa_node *node, + enum spa_direction direction, uint32_t port_id, + uint32_t id, void *data, size_t size) +{ + struct pw_port *p = SPA_CONTAINER_OF(node, struct pw_port, mix_node); + struct impl *impl = p->owner_data; + struct node *this = &impl->node; + struct pw_type *t = impl->t; + struct pw_memblock *mem; + struct mem *m; + uint32_t memid, mem_offset, mem_size; + + pw_log_debug("client-node %p: mix port %d set io %p, %zd", impl, port_id, data, size); + + p->rt.port.io = data; + p->rt.mix_port.io = data; + + if (this->resource == NULL) + return 0; + + if (!CHECK_PORT(this, direction, port_id)) + return -EINVAL; + + if (data) { + if ((mem = pw_memblock_find(data)) == NULL) + return -EINVAL; + + mem_offset = mem->offset; + mem_size = mem->size; + if (mem_size - mem_offset < size) + return -EINVAL; + + m = ensure_mem(impl, mem->fd, t->data.MemFd, mem->flags); + memid = m->id; + } + else { + memid = SPA_ID_INVALID; + mem_offset = mem_size = 0; + } + + pw_client_node_resource_port_set_io(this->resource, + this->seq, + direction, port_id, 0, + id, + memid, + mem_offset, mem_size); + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int mix_port_process_input(struct spa_node *data) +{ + return SPA_STATUS_HAVE_BUFFER; +} + +static int mix_port_process_output(struct spa_node *data) +{ + return SPA_STATUS_NEED_BUFFER; +} + +static void node_port_added(void *data, struct pw_port *port) +{ + struct impl *impl = data; + + pw_log_debug("client-node %p: port added", &impl->this); + port->mix_node.port_set_io = mix_port_set_io; + port->mix_node.process_input = mix_port_process_input; + port->mix_node.process_output = mix_port_process_output; + + port->implementation = &port_impl; + port->implementation_data = impl; + + port->owner_data = impl; +} + static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .free = node_free, .initialized = node_initialized, + .port_added = node_port_added, }; static const struct pw_resource_events resource_events = { diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c index a645e8009..e69d733cb 100644 --- a/src/modules/module-client-node/protocol-native.c +++ b/src/modules/module-client-node/protocol-native.c @@ -333,7 +333,7 @@ static int client_node_demarshal_port_use_buffers(void *object, void *data, size { struct pw_proxy *proxy = object; struct spa_pod_parser prs; - uint32_t seq, direction, port_id, n_buffers, data_id; + uint32_t seq, direction, port_id, mix_id, n_buffers, data_id; struct pw_client_node_buffer *buffers; int i, j; @@ -343,6 +343,7 @@ static int client_node_demarshal_port_use_buffers(void *object, void *data, size "i", &seq, "i", &direction, "i", &port_id, + "i", &mix_id, "i", &n_buffers, NULL) < 0) return -EINVAL; @@ -388,6 +389,7 @@ static int client_node_demarshal_port_use_buffers(void *object, void *data, size pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_use_buffers, seq, direction, port_id, + mix_id, n_buffers, buffers); return 0; } @@ -417,7 +419,7 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si { struct pw_proxy *proxy = object; struct spa_pod_parser prs; - uint32_t seq, direction, port_id, id, memid, off, sz; + uint32_t seq, direction, port_id, mix_id, id, memid, off, sz; spa_pod_parser_init(&prs, data, size, 0); if (spa_pod_parser_get(&prs, @@ -425,6 +427,7 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si "i", &seq, "i", &direction, "i", &port_id, + "i", &mix_id, "I", &id, "i", &memid, "i", &off, @@ -433,7 +436,7 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_set_io, seq, - direction, port_id, + direction, port_id, mix_id, id, memid, off, sz); return 0; @@ -588,6 +591,7 @@ client_node_marshal_port_use_buffers(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id, + uint32_t mix_id, uint32_t n_buffers, struct pw_client_node_buffer *buffers) { struct pw_resource *resource = object; @@ -601,6 +605,7 @@ client_node_marshal_port_use_buffers(void *object, "i", seq, "i", direction, "i", port_id, + "i", mix_id, "i", n_buffers, NULL); for (i = 0; i < n_buffers; i++) { @@ -659,6 +664,7 @@ client_node_marshal_port_set_io(void *object, uint32_t seq, uint32_t direction, uint32_t port_id, + uint32_t mix_id, uint32_t id, uint32_t memid, uint32_t offset, @@ -673,6 +679,7 @@ client_node_marshal_port_set_io(void *object, "i", seq, "i", direction, "i", port_id, + "i", mix_id, "I", id, "i", memid, "i", offset, diff --git a/src/modules/module-client-node/transport.c b/src/modules/module-client-node/transport.c index a8d214ba5..cbcdf8d4c 100644 --- a/src/modules/module-client-node/transport.c +++ b/src/modules/module-client-node/transport.c @@ -48,8 +48,6 @@ static size_t area_get_size(struct pw_client_node_area *area) { size_t size; size = sizeof(struct pw_client_node_area); - size += area->max_input_ports * sizeof(struct spa_io_buffers); - size += area->max_output_ports * sizeof(struct spa_io_buffers); size += sizeof(struct spa_ringbuffer); size += INPUT_BUFFER_SIZE; size += sizeof(struct spa_ringbuffer); @@ -64,12 +62,6 @@ static void transport_setup_area(void *p, struct pw_client_node_transport *trans trans->area = a = p; p = SPA_MEMBER(p, sizeof(struct pw_client_node_area), struct spa_io_buffers); - trans->inputs = p; - p = SPA_MEMBER(p, a->max_input_ports * sizeof(struct spa_io_buffers), void); - - trans->outputs = p; - p = SPA_MEMBER(p, a->max_output_ports * sizeof(struct spa_io_buffers), void); - trans->input_buffer = p; p = SPA_MEMBER(p, sizeof(struct spa_ringbuffer), void); @@ -85,17 +77,6 @@ static void transport_setup_area(void *p, struct pw_client_node_transport *trans static void transport_reset_area(struct pw_client_node_transport *trans) { - int i; - struct pw_client_node_area *a = trans->area; - - for (i = 0; i < a->max_input_ports; i++) { - trans->inputs[i].status = SPA_STATUS_OK; - trans->inputs[i].buffer_id = SPA_ID_INVALID; - } - for (i = 0; i < a->max_output_ports; i++) { - trans->outputs[i].status = SPA_STATUS_OK; - trans->outputs[i].buffer_id = SPA_ID_INVALID; - } spa_ringbuffer_init(trans->input_buffer); spa_ringbuffer_init(trans->output_buffer); } diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c index 912b842cd..9c111866d 100644 --- a/src/modules/module-protocol-native.c +++ b/src/modules/module-protocol-native.c @@ -903,7 +903,7 @@ static int module_init(struct pw_module *module, struct pw_properties *propertie pw_protocol_native_init(this); - pw_log_debug("protocol-native %p: new", this); + pw_log_debug("protocol-native %p: new %d", this, debug_messages); d = pw_protocol_get_user_data(this); d->protocol = this; diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 5862ef8a3..b513f2574 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -480,6 +480,47 @@ param_filter(struct pw_link *this, return num; } +static void port_set_io(struct pw_link *this, struct pw_port *port, void *data, size_t size, + struct spa_graph_port *p) +{ + struct pw_type *t = &this->core->type; + int res; + + p->io = data; + pw_log_debug("link %p: port %p %d.%d set io: %p", this, port, port->port_id, p->port_id, data); + if (port->mix_node.port_set_io) { + if ((res = spa_node_port_set_io(&port->mix_node, + p->direction, + p->port_id, + t->io.Buffers, + data, size)) < 0) + pw_log_warn("port %p: can't set io: %s", port, spa_strerror(res)); + } +} + +static int select_io(struct pw_link *this) +{ + struct spa_io_buffers *io; + struct pw_type *t = &this->core->type; + + if (this->output->implementation && this->output->implementation->get_io) + io = this->output->implementation->get_io(this->output->implementation_data, + t->io.Buffers, sizeof(struct spa_io_buffers)); + else if (this->input->implementation && this->input->implementation->get_io) + io = this->input->implementation->get_io(this->input->implementation_data, + t->io.Buffers, sizeof(struct spa_io_buffers)); + else + io = &this->io; + + if (io == NULL) + return -EIO; + + port_set_io(this, this->input, io, sizeof(struct spa_io_buffers), &this->rt.in_port); + port_set_io(this, this->output, io, sizeof(struct spa_io_buffers), &this->rt.out_port); + + return 0; +} + static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_state) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); @@ -500,17 +541,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s output = this->output; pw_log_debug("link %p: doing alloc buffers %p %p", this, output->node, input->node); - /* find out what's possible */ - if ((res = spa_node_port_get_info(output->node->node, output->direction, output->port_id, - &oinfo)) < 0) { - asprintf(&error, "error get output port info: %d", res); - goto error; - } - if ((res = spa_node_port_get_info(input->node->node, input->direction, input->port_id, - &iinfo)) < 0) { - asprintf(&error, "error get input port info: %d", res); - goto error; - } + oinfo = output->spa_info; + iinfo = input->spa_info; in_flags = iinfo->flags; out_flags = oinfo->flags; @@ -707,6 +739,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } + select_io(this); + return 0; error: @@ -1287,7 +1321,6 @@ void pw_link_destroy(struct pw_link *link) spa_list_remove(&link->link); input_remove(link, link->input); - output_remove(link, link->output); if (link->global) { diff --git a/src/pipewire/mem.c b/src/pipewire/mem.c index a5767e07a..80e6886b2 100644 --- a/src/pipewire/mem.c +++ b/src/pipewire/mem.c @@ -138,7 +138,9 @@ int pw_memblock_map(struct pw_memblock *mem) } else { mem->ptr = NULL; } - pw_log_debug("mem %p: map", mem); + + pw_log_debug("mem %p: map to %p", mem, mem->ptr); + return 0; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index f0e8ad22c..e58c597f3 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -300,6 +300,13 @@ struct pw_node { void *user_data; /**< extra user data */ }; +struct pw_port_implementation { +#define PW_VERSION_PORT_IMPLEMENTATION 0 + uint32_t version; + + void *(*get_io) (void *data, uint32_t id, size_t size); +}; + struct pw_port { struct spa_list link; /**< link in node port_list */ @@ -330,6 +337,9 @@ struct pw_port { struct spa_hook_list listener_list; + const struct pw_port_implementation *implementation; + void *implementation_data; + struct spa_node *mix; /**< optional port buffer mix/split */ struct spa_node mix_node; /**< mix node implementation */ struct pw_map mix_port_map; /**< map from port_id from mixer */ @@ -341,6 +351,7 @@ struct pw_port { struct spa_graph_node mix_node; /**< mixer node */ } rt; /**< data only accessed from the data thread */ + void *owner_data; /**< extra owner data */ void *user_data; /**< extra user data */ }; diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index dbaeaaf5b..25510c88b 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -718,20 +718,16 @@ static void client_node_transport(void *object, uint32_t node_id, for (i = 0; i < data->trans->area->max_input_ports; i++) { port_init(&data->in_ports[i]); - data->trans->inputs[i] = SPA_IO_BUFFERS_INIT; spa_graph_port_init(&data->in_ports[i].input, - SPA_DIRECTION_INPUT, - i, + SPA_DIRECTION_INPUT, i, 0, - &data->trans->inputs[i]); + NULL); spa_graph_port_init(&data->in_ports[i].output, - SPA_DIRECTION_OUTPUT, - i, + SPA_DIRECTION_OUTPUT, i, 0, - &data->trans->inputs[i]); + NULL); spa_graph_port_add(&data->in_node, &data->in_ports[i].output); spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input); - pw_log_info("transport in %d %p", i, &data->trans->inputs[i]); } spa_list_for_each(port, &data->node->input_ports, link) { spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input); @@ -740,20 +736,16 @@ static void client_node_transport(void *object, uint32_t node_id, for (i = 0; i < data->trans->area->max_output_ports; i++) { port_init(&data->out_ports[i]); - data->trans->outputs[i] = SPA_IO_BUFFERS_INIT; spa_graph_port_init(&data->out_ports[i].output, - SPA_DIRECTION_OUTPUT, - i, + SPA_DIRECTION_OUTPUT, i, 0, - &data->trans->outputs[i]); + NULL); spa_graph_port_init(&data->out_ports[i].input, - SPA_DIRECTION_INPUT, - i, + SPA_DIRECTION_INPUT, i, 0, - &data->trans->outputs[i]); + NULL); spa_graph_port_add(&data->out_node, &data->out_ports[i].input); spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input); - pw_log_info("transport out %d %p", i, &data->trans->inputs[i]); } spa_list_for_each(port, &data->node->output_ports, link) { spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output); @@ -893,7 +885,7 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com /* FIXME we should call process_output on the node and see what its * status is */ for (i = 0; i < data->trans->area->max_input_ports; i++) - data->trans->inputs[i].status = SPA_STATUS_NEED_BUFFER; + data->in_ports[i].input.io->status = SPA_STATUS_NEED_BUFFER; node_need_input(data); pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -992,7 +984,7 @@ static void clear_buffers(struct node_data *data, struct port *port) static void client_node_port_use_buffers(void *object, uint32_t seq, - enum spa_direction direction, uint32_t port_id, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id, uint32_t n_buffers, struct pw_client_node_buffer *buffers) { struct pw_proxy *proxy = object; @@ -1156,6 +1148,7 @@ client_node_port_set_io(void *object, uint32_t seq, uint32_t direction, uint32_t port_id, + uint32_t mix_id, uint32_t id, uint32_t memid, uint32_t offset, @@ -1164,6 +1157,7 @@ client_node_port_set_io(void *object, struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; struct pw_core *core = proxy->remote->core; + struct pw_type *t = &core->type; struct port *port; struct mem_id *mid; void *ptr; @@ -1190,11 +1184,16 @@ client_node_port_set_io(void *object, pw_log_debug("port %p: set io %s %p", port, spa_type_map_get_type(core->type.map, id), ptr); - spa_node_port_set_io(port->port->node->node, - direction, port_id, - id, - ptr, - size); + if (id == t->io.Buffers) { + port->input.io = ptr; + port->output.io = ptr; + } else { + spa_node_port_set_io(port->port->node->node, + direction, port_id, + id, + ptr, + size); + } } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index dfbef3df9..b05ffbebc 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -591,22 +591,22 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod int i; for (i = 0; i < impl->trans->area->n_input_ports; i++) { - struct spa_io_buffers *input = &impl->trans->inputs[i]; + struct spa_io_buffers *io = impl->io; struct buffer_id *bid; uint32_t buffer_id; - buffer_id = input->buffer_id; + buffer_id = io->buffer_id; - pw_log_trace("stream %p: process input %d %d", stream, input->status, + pw_log_trace("stream %p: process input %d %d", stream, io->status, buffer_id); if ((bid = find_buffer(stream, buffer_id)) == NULL) continue; if (impl->client_reuse) - input->buffer_id = SPA_ID_INVALID; + io->buffer_id = SPA_ID_INVALID; - if (input->status == SPA_STATUS_HAVE_BUFFER) { + if (io->status == SPA_STATUS_HAVE_BUFFER) { bid->used = true; impl->in_new_buffer = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, @@ -614,7 +614,7 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod impl->in_new_buffer = false; } - input->status = SPA_STATUS_NEED_BUFFER; + io->status = SPA_STATUS_NEED_BUFFER; } send_need_input(stream); break; @@ -624,13 +624,13 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod int i; for (i = 0; i < impl->trans->area->n_output_ports; i++) { - struct spa_io_buffers *output = &impl->trans->outputs[i]; + struct spa_io_buffers *io = impl->io; - if (output->buffer_id == SPA_ID_INVALID) + if (io->buffer_id == SPA_ID_INVALID) continue; - reuse_buffer(stream, output->buffer_id); - output->buffer_id = SPA_ID_INVALID; + reuse_buffer(stream, io->buffer_id); + io->buffer_id = SPA_ID_INVALID; } pw_log_trace("stream %p: process output", stream); impl->in_need_buffer = true; @@ -747,7 +747,7 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma if (impl->direction == SPA_DIRECTION_INPUT) { for (i = 0; i < impl->trans->area->max_input_ports; i++) - impl->trans->inputs[i].status = SPA_STATUS_NEED_BUFFER; + impl->io->status = SPA_STATUS_NEED_BUFFER; send_need_input(stream); } else { @@ -861,7 +861,7 @@ client_node_add_mem(void *data, static void client_node_port_use_buffers(void *data, uint32_t seq, - enum spa_direction direction, uint32_t port_id, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id, uint32_t n_buffers, struct pw_client_node_buffer *buffers) { struct stream *impl = data; @@ -1020,6 +1020,7 @@ static void client_node_port_set_io(void *data, uint32_t seq, enum spa_direction direction, uint32_t port_id, + uint32_t mix_id, uint32_t id, uint32_t mem_id, uint32_t offset, @@ -1052,7 +1053,8 @@ static void client_node_port_set_io(void *data, if (id == t->io.Buffers) { impl->io = ptr; - pw_log_debug("stream %p: set io id %u %p", stream, id, ptr); + pw_log_debug("stream %p: %u.%u set io id %u %p", stream, + port_id, mix_id, id, ptr); } res = 0; @@ -1236,8 +1238,8 @@ int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) int i; for (i = 0; i < impl->trans->area->n_input_ports; i++) { - struct spa_io_buffers *input = &impl->trans->inputs[i]; - input->buffer_id = id; + struct spa_io_buffers *io = impl->io; + io->buffer_id = id; } } else { send_reuse_buffer(stream, id); @@ -1261,17 +1263,17 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer_id *bid; - if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) { + if (impl->io->buffer_id != SPA_ID_INVALID) { pw_log_debug("can't send %u, pending buffer %u", id, - impl->trans->outputs[0].buffer_id); + impl->io->buffer_id); return -EIO; } if ((bid = find_buffer(stream, id)) && !bid->used) { bid->used = true; spa_list_remove(&bid->link); - impl->trans->outputs[0].buffer_id = id; - impl->trans->outputs[0].status = SPA_STATUS_HAVE_BUFFER; + impl->io->buffer_id = id; + impl->io->status = SPA_STATUS_HAVE_BUFFER; pw_log_trace("stream %p: send buffer %d", stream, id); if (!impl->in_need_buffer) send_have_output(stream);