From b25985a2b275aff304a6ffaddd8f745c5f3ac57f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 25 Oct 2017 16:47:43 +0200 Subject: [PATCH] client-node: split setup of transport and fds We can initialize the transport as soon as we get async notify from the client when setup completes. Node initialization happens after that and then finaly the node is initialized and we can send the transport and fds. Small cleanups --- spa/include/spa/node.h | 4 +- spa/include/spa/pod-builder.h | 2 +- src/examples/export-spa.c | 2 - src/modules/module-client-node/client-node.c | 93 ++++++++------------ src/pipewire/port.c | 2 +- 5 files changed, 43 insertions(+), 60 deletions(-) diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index f4a547ba6..126ea174a 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -137,7 +137,7 @@ struct spa_node_callbacks { /** * struct spa_node: * - * A struct spa_node is a component that can comsume and produce buffers. + * A struct spa_node is a component that can consume and produce buffers. * * * @@ -585,7 +585,7 @@ struct spa_node { * output from * * - set the status to SPA_RESULT_OK for the port you don't want more - * buffer from. + * buffers from. * * Returns: #SPA_RESULT_OK on success or when the node is asynchronous * #SPA_RESULT_NEED_BUFFER for synchronous nodes when input diff --git a/spa/include/spa/pod-builder.h b/spa/include/spa/pod-builder.h index 433b3a67f..b5c9999f5 100644 --- a/spa/include/spa/pod-builder.h +++ b/spa/include/spa/pod-builder.h @@ -101,7 +101,7 @@ spa_pod_builder_raw(struct spa_pod_builder *builder, const void *data, uint32_t return ref; } -static void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) +static inline void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) { uint64_t zeroes = 0; size = SPA_ROUND_UP_N(size, 8) - size; diff --git a/src/examples/export-spa.c b/src/examples/export-spa.c index 4a0d99630..9ae770b75 100644 --- a/src/examples/export-spa.c +++ b/src/examples/export-spa.c @@ -148,8 +148,6 @@ int main(int argc, char *argv[]) pw_main_loop_run(data.loop); - if (data.node) - pw_node_destroy(data.node); pw_core_destroy(data.core); pw_main_loop_destroy(data.loop); diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index ca51e700e..89ae972ab 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -824,8 +824,8 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message *io = impl->transport->outputs[i]; pw_log_trace("%d %d", io->status, io->buffer_id); - impl->out_pending = false; } + impl->out_pending = false; this->callbacks->have_output(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { for (i = 0; i < MAX_INPUTS; i++) { @@ -851,12 +851,26 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message return SPA_RESULT_OK; } +static void setup_transport(struct impl *impl) +{ + uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0; + + spa_proxy_node_get_n_ports(&impl->proxy.node, &n_inputs, &max_inputs, &n_outputs, &max_outputs); + + impl->transport = pw_client_node_transport_new(max_inputs, max_outputs); + impl->transport->area->n_input_ports = n_inputs; + impl->transport->area->n_output_ports = n_outputs; +} + static void client_node_done(void *data, int seq, int res) { struct impl *impl = data; struct proxy *this = &impl->proxy; + if (seq == 0 && res == SPA_RESULT_OK) + setup_transport(impl); + this->callbacks->done(this->callbacks_data, seq, res); } @@ -1028,59 +1042,6 @@ proxy_init(struct proxy *this, return SPA_RESULT_RETURN_ASYNC(this->seq++); } -static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - - if (impl->fds[0] == -1) { -#if 0 - if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != - 0) - return SPA_RESULT_ERRNO; - - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[0]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[1]; -#else - impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[1]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[0]; -#endif - - spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); - pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); - } - *readfd = impl->other_fds[0]; - *writefd = impl->other_fds[1]; - - return SPA_RESULT_OK; -} - -static void node_initialized(void *data) -{ - struct impl *impl = data; - struct pw_client_node *this = &impl->this; - struct pw_node *node = this->node; - int readfd, writefd; - const struct pw_node_info *i = pw_node_get_info(node); - - if (this->resource == NULL) - return; - - impl->transport = pw_client_node_transport_new(i->max_input_ports, i->max_output_ports); - impl->transport->area->n_input_ports = i->n_input_ports; - impl->transport->area->n_output_ports = i->n_output_ports; - - client_node_get_fds(this, &readfd, &writefd); - - pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), - readfd, writefd, impl->transport); -} - static int proxy_clear(struct proxy *this) { uint32_t i; @@ -1113,6 +1074,30 @@ static void client_node_resource_destroy(void *data) pw_node_destroy(this->node); } + +static void node_initialized(void *data) +{ + struct impl *impl = data; + struct pw_client_node *this = &impl->this; + struct pw_node *node = this->node; + + if (this->resource == NULL) + return; + + impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; + + spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); + pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); + + pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), + impl->other_fds[0], impl->other_fds[1], impl->transport); +} + static void node_free(void *data) { struct impl *impl = data; diff --git a/src/pipewire/port.c b/src/pipewire/port.c index a3b3409bf..97771d614 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -287,7 +287,7 @@ bool pw_port_add(struct pw_port *port, struct pw_node *node) node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS; } - spa_node_port_set_io(node->node, port->direction, port_id, &port->io); + spa_node_port_set_io(node->node, port->direction, port_id, port->rt.port.io); port->rt.graph = node->rt.graph; pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port);