diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index f4a547ba6..126ea174a 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -137,7 +137,7 @@ struct spa_node_callbacks { /** * struct spa_node: * - * A struct spa_node is a component that can comsume and produce buffers. + * A struct spa_node is a component that can consume and produce buffers. * * * @@ -585,7 +585,7 @@ struct spa_node { * output from * * - set the status to SPA_RESULT_OK for the port you don't want more - * buffer from. + * buffers from. * * Returns: #SPA_RESULT_OK on success or when the node is asynchronous * #SPA_RESULT_NEED_BUFFER for synchronous nodes when input diff --git a/spa/include/spa/pod-builder.h b/spa/include/spa/pod-builder.h index 433b3a67f..b5c9999f5 100644 --- a/spa/include/spa/pod-builder.h +++ b/spa/include/spa/pod-builder.h @@ -101,7 +101,7 @@ spa_pod_builder_raw(struct spa_pod_builder *builder, const void *data, uint32_t return ref; } -static void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) +static inline void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) { uint64_t zeroes = 0; size = SPA_ROUND_UP_N(size, 8) - size; diff --git a/src/examples/export-spa.c b/src/examples/export-spa.c index 4a0d99630..9ae770b75 100644 --- a/src/examples/export-spa.c +++ b/src/examples/export-spa.c @@ -148,8 +148,6 @@ int main(int argc, char *argv[]) pw_main_loop_run(data.loop); - if (data.node) - pw_node_destroy(data.node); pw_core_destroy(data.core); pw_main_loop_destroy(data.loop); diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index ca51e700e..89ae972ab 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -824,8 +824,8 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message *io = impl->transport->outputs[i]; pw_log_trace("%d %d", io->status, io->buffer_id); - impl->out_pending = false; } + impl->out_pending = false; this->callbacks->have_output(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { for (i = 0; i < MAX_INPUTS; i++) { @@ -851,12 +851,26 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message return SPA_RESULT_OK; } +static void setup_transport(struct impl *impl) +{ + uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0; + + spa_proxy_node_get_n_ports(&impl->proxy.node, &n_inputs, &max_inputs, &n_outputs, &max_outputs); + + impl->transport = pw_client_node_transport_new(max_inputs, max_outputs); + impl->transport->area->n_input_ports = n_inputs; + impl->transport->area->n_output_ports = n_outputs; +} + static void client_node_done(void *data, int seq, int res) { struct impl *impl = data; struct proxy *this = &impl->proxy; + if (seq == 0 && res == SPA_RESULT_OK) + setup_transport(impl); + this->callbacks->done(this->callbacks_data, seq, res); } @@ -1028,59 +1042,6 @@ proxy_init(struct proxy *this, return SPA_RESULT_RETURN_ASYNC(this->seq++); } -static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - - if (impl->fds[0] == -1) { -#if 0 - if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != - 0) - return SPA_RESULT_ERRNO; - - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[0]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[1]; -#else - impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[1]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[0]; -#endif - - spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); - pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); - } - *readfd = impl->other_fds[0]; - *writefd = impl->other_fds[1]; - - return SPA_RESULT_OK; -} - -static void node_initialized(void *data) -{ - struct impl *impl = data; - struct pw_client_node *this = &impl->this; - struct pw_node *node = this->node; - int readfd, writefd; - const struct pw_node_info *i = pw_node_get_info(node); - - if (this->resource == NULL) - return; - - impl->transport = pw_client_node_transport_new(i->max_input_ports, i->max_output_ports); - impl->transport->area->n_input_ports = i->n_input_ports; - impl->transport->area->n_output_ports = i->n_output_ports; - - client_node_get_fds(this, &readfd, &writefd); - - pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), - readfd, writefd, impl->transport); -} - static int proxy_clear(struct proxy *this) { uint32_t i; @@ -1113,6 +1074,30 @@ static void client_node_resource_destroy(void *data) pw_node_destroy(this->node); } + +static void node_initialized(void *data) +{ + struct impl *impl = data; + struct pw_client_node *this = &impl->this; + struct pw_node *node = this->node; + + if (this->resource == NULL) + return; + + impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; + + spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); + pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); + + pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), + impl->other_fds[0], impl->other_fds[1], impl->transport); +} + static void node_free(void *data) { struct impl *impl = data; diff --git a/src/pipewire/port.c b/src/pipewire/port.c index a3b3409bf..97771d614 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -287,7 +287,7 @@ bool pw_port_add(struct pw_port *port, struct pw_node *node) node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS; } - spa_node_port_set_io(node->node, port->direction, port_id, &port->io); + spa_node_port_set_io(node->node, port->direction, port_id, port->rt.port.io); port->rt.graph = node->rt.graph; pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port);