diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index 76a699d5d..1f5ee1649 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -97,9 +97,15 @@ struct impl { struct pw_port *client_port; struct pw_port_mix client_port_mix; + struct spa_io_buffers *io; + struct spa_buffer **buffers; uint32_t n_buffers; struct pw_memblock *mem; + + struct { + struct spa_graph_link link; + } rt; }; /** \endcond */ @@ -542,16 +548,25 @@ impl_node_port_set_io(struct spa_node *node, { struct node *this; struct impl *impl; + struct pw_type *t; + int res; spa_return_val_if_fail(node != NULL, -EINVAL); this = SPA_CONTAINER_OF(node, struct node, node); impl = this->impl; + t = impl->t; if (direction != impl->direction) return -EINVAL; - return spa_node_port_set_io(impl->adapter, direction, port_id, id, data, size); + res = spa_node_port_set_io(impl->adapter, direction, port_id, id, data, size); + + if (id == t->io.Buffers && size >= sizeof(struct spa_io_buffers)) { + impl->io = data; + } + + return res; } static int @@ -659,17 +674,20 @@ static int impl_node_process(struct spa_node *node) if (impl->use_converter) status = spa_node_process(impl->adapter); - else + else { status = SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER; + spa_log_trace(this->log, "%p: process %d/%d %d/%d", this, + impl->io->status, impl->io->buffer_id, + impl->client_port_mix.io->status, + impl->client_port_mix.io->buffer_id); + if (impl->direction == SPA_DIRECTION_INPUT) + *impl->client_port_mix.io = *impl->io; + else + *impl->io = *impl->client_port_mix.io; + } spa_log_trace(this->log, "%p: process %d", this, status); -// if (SPA_FLAG_CHECK(status, SPA_STATUS_HAVE_BUFFER)) -// spa_graph_node_trigger(&impl->this.node->rt.node); - - if (SPA_FLAG_CHECK(status, SPA_STATUS_NEED_BUFFER)) - spa_graph_run(impl->client_node->node->rt.node.graph); - return status; } @@ -736,6 +754,16 @@ static void client_node_initialized(void *data) impl->cnode = pw_node_get_implementation(impl->client_node->node); + spa_graph_node_remove(&impl->client_node->node->rt.root); + spa_graph_node_add(impl->this.node->rt.node.graph, &impl->client_node->node->rt.root); + + spa_graph_link_add(&impl->client_node->node->rt.root, + impl->this.node->rt.node.state, + &impl->rt.link); + impl->rt.link.signal = spa_graph_link_signal_node; + impl->rt.link.signal_data = &impl->this.node->rt.node; + impl->client_node->node->rt.driver = impl->this.node->rt.driver; + if ((res = spa_node_get_n_ports(impl->cnode, &n_input_ports, &max_input_ports, @@ -748,6 +776,20 @@ static void client_node_initialized(void *data) else impl->direction = SPA_DIRECTION_OUTPUT; + impl->client_port = pw_node_find_port(impl->client_node->node, impl->direction, 0); + if (impl->client_port == NULL) + return; + + if ((res = pw_port_init_mix(impl->client_port, &impl->client_port_mix)) < 0) + return; + + if ((res = spa_node_port_set_io(&impl->client_port->mix_node, + impl->direction, 0, + t->io.Buffers, + impl->client_port_mix.io, + sizeof(impl->client_port_mix.io))) < 0) + return; + state = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); if ((res = spa_node_port_enum_params(impl->cnode, @@ -770,27 +812,6 @@ static void client_node_initialized(void *data) "splitter", SPA_TYPE__Node, NULL, 0)) == NULL) return; - impl->client_port = pw_node_find_port(impl->client_node->node, impl->direction, 0); - if (impl->client_port == NULL) - return; - - if ((res = pw_port_init_mix(impl->client_port, &impl->client_port_mix)) < 0) - return; - - if ((res = spa_node_port_set_io(impl->adapter, - SPA_DIRECTION_INPUT, 0, - t->io.Buffers, - impl->client_port_mix.io, - sizeof(impl->client_port_mix.io))) < 0) - return; - - if ((res = spa_node_port_set_io(&impl->client_port->mix_node, - SPA_DIRECTION_OUTPUT, 0, - t->io.Buffers, - impl->client_port_mix.io, - sizeof(impl->client_port_mix.io))) < 0) - return; - impl->use_converter = true; } else { @@ -798,6 +819,14 @@ static void client_node_initialized(void *data) impl->use_converter = false; } + if (impl->use_converter) { + if ((res = spa_node_port_set_io(impl->adapter, + SPA_DIRECTION_REVERSE(impl->direction), 0, + t->io.Buffers, + impl->client_port_mix.io, + sizeof(impl->client_port_mix.io))) < 0) + return; + } pw_node_register(impl->this.node, NULL, NULL, NULL);