client-node: split setup of transport and fds

We can initialize the transport as soon as we get async notify
from the client when setup completes. Node initialization happens
after that and then finaly the node is initialized and we can send
the transport and fds.
Small cleanups
This commit is contained in:
Wim Taymans 2017-10-25 16:47:43 +02:00
parent 4f33a37ac6
commit b25985a2b2
5 changed files with 43 additions and 60 deletions

View file

@ -137,7 +137,7 @@ struct spa_node_callbacks {
/** /**
* struct spa_node: * struct spa_node:
* *
* A struct spa_node is a component that can comsume and produce buffers. * A struct spa_node is a component that can consume and produce buffers.
* *
* *
* *
@ -585,7 +585,7 @@ struct spa_node {
* output from * output from
* *
* - set the status to SPA_RESULT_OK for the port you don't want more * - set the status to SPA_RESULT_OK for the port you don't want more
* buffer from. * buffers from.
* *
* Returns: #SPA_RESULT_OK on success or when the node is asynchronous * Returns: #SPA_RESULT_OK on success or when the node is asynchronous
* #SPA_RESULT_NEED_BUFFER for synchronous nodes when input * #SPA_RESULT_NEED_BUFFER for synchronous nodes when input

View file

@ -101,7 +101,7 @@ spa_pod_builder_raw(struct spa_pod_builder *builder, const void *data, uint32_t
return ref; return ref;
} }
static void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) static inline void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size)
{ {
uint64_t zeroes = 0; uint64_t zeroes = 0;
size = SPA_ROUND_UP_N(size, 8) - size; size = SPA_ROUND_UP_N(size, 8) - size;

View file

@ -148,8 +148,6 @@ int main(int argc, char *argv[])
pw_main_loop_run(data.loop); pw_main_loop_run(data.loop);
if (data.node)
pw_node_destroy(data.node);
pw_core_destroy(data.core); pw_core_destroy(data.core);
pw_main_loop_destroy(data.loop); pw_main_loop_destroy(data.loop);

View file

@ -824,8 +824,8 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message
*io = impl->transport->outputs[i]; *io = impl->transport->outputs[i];
pw_log_trace("%d %d", io->status, io->buffer_id); pw_log_trace("%d %d", io->status, io->buffer_id);
impl->out_pending = false;
} }
impl->out_pending = false;
this->callbacks->have_output(this->callbacks_data); this->callbacks->have_output(this->callbacks_data);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) {
for (i = 0; i < MAX_INPUTS; i++) { for (i = 0; i < MAX_INPUTS; i++) {
@ -851,12 +851,26 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
static void setup_transport(struct impl *impl)
{
uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0;
spa_proxy_node_get_n_ports(&impl->proxy.node, &n_inputs, &max_inputs, &n_outputs, &max_outputs);
impl->transport = pw_client_node_transport_new(max_inputs, max_outputs);
impl->transport->area->n_input_ports = n_inputs;
impl->transport->area->n_output_ports = n_outputs;
}
static void static void
client_node_done(void *data, int seq, int res) client_node_done(void *data, int seq, int res)
{ {
struct impl *impl = data; struct impl *impl = data;
struct proxy *this = &impl->proxy; struct proxy *this = &impl->proxy;
if (seq == 0 && res == SPA_RESULT_OK)
setup_transport(impl);
this->callbacks->done(this->callbacks_data, seq, res); this->callbacks->done(this->callbacks_data, seq, res);
} }
@ -1028,59 +1042,6 @@ proxy_init(struct proxy *this,
return SPA_RESULT_RETURN_ASYNC(this->seq++); return SPA_RESULT_RETURN_ASYNC(this->seq++);
} }
static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
if (impl->fds[0] == -1) {
#if 0
if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) !=
0)
return SPA_RESULT_ERRNO;
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[0];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[1];
#else
impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[1];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[0];
#endif
spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source);
pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd);
}
*readfd = impl->other_fds[0];
*writefd = impl->other_fds[1];
return SPA_RESULT_OK;
}
static void node_initialized(void *data)
{
struct impl *impl = data;
struct pw_client_node *this = &impl->this;
struct pw_node *node = this->node;
int readfd, writefd;
const struct pw_node_info *i = pw_node_get_info(node);
if (this->resource == NULL)
return;
impl->transport = pw_client_node_transport_new(i->max_input_ports, i->max_output_ports);
impl->transport->area->n_input_ports = i->n_input_ports;
impl->transport->area->n_output_ports = i->n_output_ports;
client_node_get_fds(this, &readfd, &writefd);
pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)),
readfd, writefd, impl->transport);
}
static int proxy_clear(struct proxy *this) static int proxy_clear(struct proxy *this)
{ {
uint32_t i; uint32_t i;
@ -1113,6 +1074,30 @@ static void client_node_resource_destroy(void *data)
pw_node_destroy(this->node); pw_node_destroy(this->node);
} }
static void node_initialized(void *data)
{
struct impl *impl = data;
struct pw_client_node *this = &impl->this;
struct pw_node *node = this->node;
if (this->resource == NULL)
return;
impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[1];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[0];
spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source);
pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd);
pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)),
impl->other_fds[0], impl->other_fds[1], impl->transport);
}
static void node_free(void *data) static void node_free(void *data)
{ {
struct impl *impl = data; struct impl *impl = data;

View file

@ -287,7 +287,7 @@ bool pw_port_add(struct pw_port *port, struct pw_node *node)
node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS; node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS;
} }
spa_node_port_set_io(node->node, port->direction, port_id, &port->io); spa_node_port_set_io(node->node, port->direction, port_id, port->rt.port.io);
port->rt.graph = node->rt.graph; port->rt.graph = node->rt.graph;
pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port); pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port);