Use remote to join nodes on remote graphs

Make a new method that can take a local node and run it in a remote
pipeline. This basically replaces all functionality of the streams
and more.
Add 2 examples for exporting a sink and a v4l2 node
Make some more things const
Cleanups
Make it possible to do things when the node needs scheduling. The
default node will schedule the local pipeline but the remote node might
also schedule the remote pipeline.
This commit is contained in:
Wim Taymans 2017-07-25 19:52:31 +02:00
parent 589e3d977c
commit 3d9f28c676
40 changed files with 1591 additions and 138 deletions

View file

@ -174,7 +174,7 @@ static inline void send_have_output(struct proxy *this)
do_flush(this);
}
static int spa_proxy_node_send_command(struct spa_node *node, struct spa_command *command)
static int spa_proxy_node_send_command(struct spa_node *node, const struct spa_command *command)
{
struct proxy *this;
int res = SPA_RESULT_OK;
@ -237,11 +237,11 @@ spa_proxy_node_get_n_ports(struct spa_node *node,
if (n_input_ports)
*n_input_ports = this->n_inputs;
if (max_input_ports)
*max_input_ports = this->max_inputs;
*max_input_ports = this->max_inputs == 0 ? this->n_inputs : this->max_inputs;
if (n_output_ports)
*n_output_ports = this->n_outputs;
if (max_output_ports)
*max_output_ports = this->max_outputs;
*max_output_ports = this->max_outputs == 0 ? this->n_outputs : this->max_outputs;
return SPA_RESULT_OK;
}
@ -298,6 +298,7 @@ do_update_port(struct proxy *this,
}
if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_POSSIBLE_FORMATS) {
spa_log_info(this->log, "proxy %p: %d formats", this, n_possible_formats);
for (i = 0; i < port->n_formats; i++)
free(port->formats[i]);
port->n_formats = n_possible_formats;
@ -307,12 +308,14 @@ do_update_port(struct proxy *this,
port->formats[i] = spa_format_copy(possible_formats[i]);
}
if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_FORMAT) {
spa_log_info(this->log, "proxy %p: update format %p", this, format);
if (port->format)
free(port->format);
port->format = spa_format_copy(format);
}
if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) {
spa_log_info(this->log, "proxy %p: update %d params", this, n_params);
for (i = 0; i < port->n_params; i++)
free(port->params[i]);
port->n_params = n_params;
@ -743,7 +746,7 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32
static int
spa_proxy_node_port_send_command(struct spa_node *node,
enum spa_direction direction,
uint32_t port_id, struct spa_command *command)
uint32_t port_id, const struct spa_command *command)
{
struct proxy *this;
@ -1034,6 +1037,38 @@ proxy_init(struct proxy *this,
return SPA_RESULT_RETURN_ASYNC(this->seq++);
}
static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
if (impl->fds[0] == -1) {
#if 0
if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) !=
0)
return SPA_RESULT_ERRNO;
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[0];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[1];
#else
impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[1];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[0];
#endif
spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source);
pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd);
}
*readfd = impl->other_fds[0];
*writefd = impl->other_fds[1];
return SPA_RESULT_OK;
}
static void on_initialized(struct pw_listener *listener, struct pw_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, initialized);
@ -1044,11 +1079,11 @@ static void on_initialized(struct pw_listener *listener, struct pw_node *node)
if (this->resource == NULL)
return;
impl->transport = pw_transport_new(node->info.max_input_ports, node->info.max_output_ports);
impl->transport = pw_transport_new(node->info.max_input_ports, node->info.max_output_ports, 0);
impl->transport->area->n_input_ports = node->info.n_input_ports;
impl->transport->area->n_output_ports = node->info.n_output_ports;
pw_client_node_get_fds(this, &readfd, &writefd);
client_node_get_fds(this, &readfd, &writefd);
pw_transport_get_info(impl->transport, &info);
pw_client_node_resource_transport(this->resource, node->global->id,
@ -1181,46 +1216,3 @@ void pw_client_node_destroy(struct pw_client_node *node)
{
pw_resource_destroy(node->resource);
}
/** Get the set of fds for this \ref pw_client_node
*
* \param node a \ref pw_client_node
* \param[out] readfd an fd for reading
* \param[out] writefd an fd for writing
* \return 0 on success < 0 on error
*
* Create or return a previously created set of fds for \a node.
*
* \memberof pw_client_node
*/
int pw_client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
if (impl->fds[0] == -1) {
#if 0
if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) !=
0)
return SPA_RESULT_ERRNO;
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[0];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[1];
#else
impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->proxy.data_source.fd = impl->fds[0];
impl->proxy.writefd = impl->fds[1];
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[0];
#endif
spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source);
pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd);
}
*readfd = impl->other_fds[0];
*writefd = impl->other_fds[1];
return SPA_RESULT_OK;
}