graph: use spa_node as implementation

Always use a spa_node as the graph implementation, implementing the
methods is just as easy.
Plug some mem leaks in remote
This commit is contained in:
Wim Taymans 2017-08-27 12:12:14 +02:00
parent 6953642ed5
commit 577f86be0d
16 changed files with 167 additions and 206 deletions

View file

@ -448,7 +448,7 @@ static void make_node(struct data *data)
struct pw_properties *props;
props = pw_properties_new(
//"pipewire.target.node", port_path,
"pipewire.target.node", data->path,
"pipewire.autoconnect", "1",
NULL);

View file

@ -19,6 +19,7 @@
#include <stdio.h>
#include <sys/mman.h>
#include <signal.h>
#include <spa/type-map.h>
#include <spa/format-utils.h>
@ -79,8 +80,6 @@ static void make_node(struct data *data)
"spa.factory.name", "v4l2-source", NULL);
data->node = pw_node_factory_create_node(factory, NULL, "v4l2-source", props);
pw_node_register(data->node);
pw_remote_export(data->remote, data->node);
}
@ -110,6 +109,12 @@ static const struct pw_remote_events remote_events = {
.state_changed = on_state_changed,
};
static void do_quit(void *data, int signal_number)
{
struct data *d = data;
d->running = false;
}
int main(int argc, char *argv[])
{
struct data data = { 0, };
@ -117,6 +122,9 @@ int main(int argc, char *argv[])
pw_init(&argc, &argv);
data.loop = pw_loop_new(NULL);
pw_loop_add_signal(data.loop, SIGINT, do_quit, &data);
pw_loop_add_signal(data.loop, SIGTERM, do_quit, &data);
data.running = true;
data.core = pw_core_new(data.loop, NULL);
data.t = pw_core_get_type(data.core);
@ -134,10 +142,14 @@ int main(int argc, char *argv[])
pw_loop_enter(data.loop);
while (data.running) {
pw_loop_iterate(data.loop, -1);
pw_loop_iterate(data.loop, 100);
}
pw_loop_leave(data.loop);
pw_remote_destroy(data.remote);
if (data.node)
pw_node_destroy(data.node);
pw_core_destroy(data.core);
pw_loop_destroy(data.loop);
return 0;

View file

@ -107,7 +107,7 @@ static void handle_events(struct data *data)
while (SDL_PollEvent(&event)) {
switch (event.type) {
case SDL_QUIT:
exit(0);
data->running = false;
break;
}
}
@ -487,10 +487,13 @@ int main(int argc, char *argv[])
pw_loop_enter(data.loop);
while (data.running) {
pw_loop_iterate(data.loop, -1);
pw_loop_iterate(data.loop, 100);
}
pw_loop_leave(data.loop);
pw_link_destroy(data.link);
pw_node_destroy(data.node);
pw_core_destroy(data.core);
pw_loop_destroy(data.loop);
return 0;

View file

@ -1124,7 +1124,7 @@ static void jack_node_pull(void *data)
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL))
continue;
pn->state = pn->callbacks->process_input(pn->callbacks_data);
pn->state = spa_node_process_input(pn->implementation);
}
}
@ -1154,7 +1154,7 @@ static void jack_node_push(void *data)
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL))
continue;
pn->state = pn->callbacks->process_output(pn->callbacks_data);
pn->state = spa_node_process_output(pn->implementation);
}
spa_list_for_each(node, &impl->rt.nodes, graph_link) {
@ -1163,25 +1163,25 @@ static void jack_node_push(void *data)
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL))
continue;
pn->state = pn->callbacks->process_output(pn->callbacks_data);
pn->state = spa_node_process_output(pn->implementation);
}
n->state = n->callbacks->process_output(n->callbacks_data);
n->state = spa_node_process_output(n->implementation);
/* mix inputs */
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL))
continue;
pn->state = pn->callbacks->process_output(pn->callbacks_data);
pn->state = pn->callbacks->process_input(pn->callbacks_data);
pn->state = spa_node_process_output(pn->implementation);
pn->state = spa_node_process_input(pn->implementation);
}
n->state = n->callbacks->process_input(n->callbacks_data);
n->state = spa_node_process_input(n->implementation);
/* tee outputs */
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL))
continue;
pn->state = pn->callbacks->process_input(pn->callbacks_data);
pn->state = spa_node_process_input(pn->implementation);
}
}

View file

@ -100,6 +100,8 @@ struct port_data {
bool driver_port;
struct spa_node mix_node;
struct spa_port_info info;
struct spa_port_io *io;
@ -278,7 +280,7 @@ static int driver_process_output(struct spa_node *node)
spa_hook_list_call(&nd->listener_list, struct pw_jack_node_events, pull);
spa_list_for_each(p, &gn->ports[SPA_DIRECTION_INPUT], link) {
struct pw_port *port = p->callbacks_data;
struct pw_port *port = p->scheduler_data;
struct port_data *ipd = pw_port_get_user_data(port);
struct spa_port_io *in_io = ipd->io;
struct buffer *in;
@ -327,7 +329,7 @@ static int node_process_input(struct spa_node *node)
&server->synchro_table[ref_num]);
spa_list_for_each(p, &gn->ports[SPA_DIRECTION_OUTPUT], link) {
struct pw_port *port = p->callbacks_data;
struct pw_port *port = p->scheduler_data;
struct port_data *opd = pw_port_get_user_data(port);
struct spa_port_io *out_io = opd->io;
out_io->buffer_id = 0;
@ -346,7 +348,7 @@ static int node_process_output(struct spa_node *node)
pw_log_trace(NAME " %p: process output", nd);
spa_list_for_each(p, &gn->ports[SPA_DIRECTION_INPUT], link) {
struct pw_port *port = p->callbacks_data;
struct pw_port *port = p->scheduler_data;
struct port_data *ipd = pw_port_get_user_data(port);
struct spa_port_io *in_io = ipd->io;
in_io->buffer_id = 0;
@ -580,18 +582,18 @@ static const struct spa_node node_impl = {
.process_output = node_process_output,
};
static int schedule_mix_input(void *data)
static int schedule_mix_input(struct spa_node *_node)
{
struct pw_jack_port *this = data;
struct port_data *pd = SPA_CONTAINER_OF(_node, struct port_data, mix_node);
struct pw_jack_port *this = &pd->port;
struct spa_graph_node *node = &this->port->rt.mix_node;
struct spa_graph_port *p;
struct spa_port_io *io = this->port->rt.mix_port.io;
struct port_data *pd = SPA_CONTAINER_OF(this, struct port_data, port);
size_t buffer_size = pd->node->node.server->engine_control->buffer_size;
int layer = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
struct pw_link *link = p->callbacks_data;
struct pw_link *link = p->scheduler_data;
struct spa_buffer *inbuf;
pw_log_trace("mix %p: input %d %d", node, p->io->buffer_id, link->output->n_buffers);
@ -616,9 +618,10 @@ static int schedule_mix_input(void *data)
return SPA_RESULT_HAVE_BUFFER;
}
static int schedule_mix_output(void *data)
static int schedule_mix_output(struct spa_node *_node)
{
struct pw_jack_port *this = data;
struct port_data *pd = SPA_CONTAINER_OF(_node, struct port_data, mix_node);
struct pw_jack_port *this = &pd->port;
struct spa_graph_node *node = &this->port->rt.mix_node;
struct spa_graph_port *p;
struct spa_port_io *io = this->port->rt.mix_port.io;
@ -631,10 +634,11 @@ static int schedule_mix_output(void *data)
return SPA_RESULT_NEED_BUFFER;
}
static const struct spa_graph_node_callbacks schedule_mix_node = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
schedule_mix_input,
schedule_mix_output,
static const struct spa_node schedule_mix_node = {
SPA_VERSION_NODE,
NULL,
.process_input = schedule_mix_input,
.process_output = schedule_mix_output,
};
static void port_destroy(void *data)
@ -771,6 +775,9 @@ pw_jack_node_add_port(struct pw_jack_node *node,
pw_port_add(port->port, node->node);
pd->mix_node = schedule_mix_node;
{
struct spa_buffer *b = &pd->buf;
struct type *t = &pd->node->type;
@ -792,7 +799,7 @@ pw_jack_node_add_port(struct pw_jack_node *node,
port->port->state = PW_PORT_STATE_PAUSED;
}
if (direction == PW_DIRECTION_INPUT) {
spa_graph_node_set_callbacks(&port->port->rt.mix_node, &schedule_mix_node, port);
spa_graph_node_set_implementation(&port->port->rt.mix_node, &pd->mix_node);
}

View file

@ -1132,8 +1132,8 @@ struct pw_link *pw_link_new(struct pw_core *core,
0,
&this->io);
this->rt.in_port.callbacks_data = this;
this->rt.out_port.callbacks_data = this;
this->rt.in_port.scheduler_data = this;
this->rt.out_port.scheduler_data = this;
/* nodes can be in different data loops so we do this twice */
pw_loop_invoke(output_node->data_loop, do_add_link,

View file

@ -273,26 +273,6 @@ void pw_node_register(struct pw_node *this)
pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL);
}
static int
graph_impl_process_input(void *data)
{
struct pw_node *this = data;
return spa_node_process_input(this->node);
}
static int
graph_impl_process_output(void *data)
{
struct pw_node *this = data;
return spa_node_process_output(this->node);
}
static const struct spa_graph_node_callbacks graph_callbacks = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
.process_input = graph_impl_process_input,
.process_output = graph_impl_process_output,
};
struct pw_node *pw_node_new(struct pw_core *core,
struct pw_resource *owner,
struct pw_global *parent,
@ -342,9 +322,6 @@ struct pw_node *pw_node_new(struct pw_core *core,
pw_map_init(&this->output_port_map, 64, 64);
spa_graph_node_init(&this->rt.node);
spa_graph_node_set_callbacks(&this->rt.node,
&graph_callbacks,
this);
return this;
@ -446,9 +423,8 @@ static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
if (p->port_id != port_id)
continue;
pp = p->peer;
if (pp && pp->callbacks->reuse_buffer)
pp->callbacks->reuse_buffer(pp->callbacks_data, buffer_id);
if ((pp = p->peer) != NULL)
spa_node_port_reuse_buffer(pp->node->implementation, pp->port_id, buffer_id);
break;
}
}
@ -468,6 +444,7 @@ void pw_node_set_implementation(struct pw_node *node,
{
node->node = spa_node;
spa_node_set_callbacks(node->node, &node_callbacks, node);
spa_graph_node_set_implementation(&node->rt.node, spa_node);
}
struct spa_node *pw_node_get_implementation(struct pw_node *node)

View file

@ -28,6 +28,8 @@
/** \cond */
struct impl {
struct pw_port this;
struct spa_node mix_node;
};
/** \endcond */
@ -41,9 +43,10 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state)
}
}
static int schedule_tee_input(void *data)
static int schedule_tee_input(struct spa_node *data)
{
struct pw_port *this = data;
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
@ -63,9 +66,10 @@ static int schedule_tee_input(void *data)
}
return res;
}
static int schedule_tee_output(void *data)
static int schedule_tee_output(struct spa_node *data)
{
struct pw_port *this = data;
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
@ -77,25 +81,23 @@ static int schedule_tee_output(void *data)
return SPA_RESULT_NEED_BUFFER;
}
static const struct spa_graph_node_callbacks schedule_tee_node = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
schedule_tee_input,
schedule_tee_output,
};
static int schedule_tee_reuse_buffer(void *data, uint32_t buffer_id)
static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{
return SPA_RESULT_OK;
}
static const struct spa_graph_port_callbacks schedule_tee_port = {
SPA_VERSION_GRAPH_PORT_CALLBACKS,
schedule_tee_reuse_buffer,
static const struct spa_node schedule_tee_node = {
SPA_VERSION_NODE,
NULL,
.process_input = schedule_tee_input,
.process_output = schedule_tee_output,
.port_reuse_buffer = schedule_tee_reuse_buffer,
};
static int schedule_mix_input(void *data)
static int schedule_mix_input(struct spa_node *data)
{
struct pw_port *this = data;
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
@ -111,9 +113,10 @@ static int schedule_mix_input(void *data)
return SPA_RESULT_HAVE_BUFFER;
}
static int schedule_mix_output(void *data)
static int schedule_mix_output(struct spa_node *data)
{
struct pw_port *this = data;
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
@ -126,19 +129,17 @@ static int schedule_mix_output(void *data)
return SPA_RESULT_NEED_BUFFER;
}
static const struct spa_graph_node_callbacks schedule_mix_node = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
schedule_mix_input,
schedule_mix_output,
};
static int schedule_mix_reuse_buffer(void *data, uint32_t buffer_id)
static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{
return SPA_RESULT_OK;
}
static const struct spa_graph_port_callbacks schedule_mix_port = {
SPA_VERSION_GRAPH_PORT_CALLBACKS,
schedule_mix_reuse_buffer,
static const struct spa_node schedule_mix_node = {
SPA_VERSION_NODE,
NULL,
.process_input = schedule_mix_input,
.process_output = schedule_mix_output,
.port_reuse_buffer = schedule_mix_reuse_buffer,
};
struct pw_port *pw_port_new(enum pw_direction direction,
@ -175,29 +176,24 @@ struct pw_port *pw_port_new(enum pw_direction direction,
spa_hook_list_init(&this->listener_list);
spa_graph_port_set_callbacks(&this->rt.port, NULL, this);
spa_graph_port_init(&this->rt.port,
this->direction,
this->port_id,
0,
&this->io);
spa_graph_node_init(&this->rt.mix_node);
spa_graph_node_set_callbacks(&this->rt.mix_node,
this->direction == PW_DIRECTION_INPUT ?
&schedule_mix_node :
&schedule_tee_node,
this);
impl->mix_node = this->direction == PW_DIRECTION_INPUT ? schedule_mix_node : schedule_tee_node;
spa_graph_node_set_implementation(&this->rt.mix_node, &impl->mix_node);
spa_graph_port_init(&this->rt.mix_port,
pw_direction_reverse(this->direction),
0,
0,
&this->io);
spa_graph_port_set_callbacks(&this->rt.mix_port,
this->direction == PW_DIRECTION_INPUT ?
&schedule_mix_port :
&schedule_tee_port,
this);
this->rt.mix_port.scheduler_data = this;
this->rt.port.scheduler_data = this;
return this;
no_mem:

View file

@ -78,6 +78,7 @@ struct node_data {
struct spa_hook node_listener;
struct pw_client_node_proxy *node_proxy;
struct spa_hook node_proxy_listener;
struct spa_hook proxy_listener;
struct pw_array mem_ids;
@ -430,7 +431,7 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_
/* process all input in the mixers */
spa_list_for_each(port, &n->ports[SPA_DIRECTION_INPUT], link) {
pn = port->peer->node;
pn->state = pn->callbacks->process_input(pn->callbacks_data);
pn->state = spa_node_process_input(pn->implementation);
if (pn->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_have_output(data->node->rt.graph, pn);
else {
@ -444,7 +445,7 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_
}
}
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
n->callbacks->process_output(n->callbacks_data);
spa_node_process_output(n->implementation);
}
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
}
@ -479,6 +480,28 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
}
}
static void clean_transport(struct pw_proxy *proxy)
{
struct node_data *data = proxy->user_data;
struct pw_port *port;
if (data->trans == NULL)
return;
spa_list_for_each(port, &data->node->input_ports, link)
spa_graph_port_remove(&data->in_ports[port->port_id]);
spa_list_for_each(port, &data->node->output_ports, link)
spa_graph_port_remove(&data->out_ports[port->port_id]);
free(data->in_ports);
free(data->out_ports);
pw_client_node_transport_destroy(data->trans);
unhandle_socket(proxy);
close(data->rtwritefd);
data->trans = NULL;
}
static void client_node_transport(void *object, uint32_t node_id,
int readfd, int writefd,
struct pw_client_node_transport *transport)
@ -488,14 +511,18 @@ static void client_node_transport(void *object, uint32_t node_id,
struct pw_port *port;
int i;
clean_transport(proxy);
data->node_id = node_id;
data->trans = transport;
pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u",
proxy, data->trans, readfd, writefd, node_id);
data->in_ports = calloc(data->trans->area->max_input_ports, sizeof(struct spa_graph_port));
data->out_ports = calloc(data->trans->area->max_output_ports, sizeof(struct spa_graph_port));
data->in_ports = calloc(data->trans->area->max_input_ports,
sizeof(struct spa_graph_port));
data->out_ports = calloc(data->trans->area->max_output_ports,
sizeof(struct spa_graph_port));
for (i = 0; i < data->trans->area->max_input_ports; i++) {
spa_graph_port_init(&data->in_ports[i],
@ -521,8 +548,6 @@ static void client_node_transport(void *object, uint32_t node_id,
data->rtreadfd = readfd;
data->rtwritefd = writefd;
unhandle_socket(proxy);
data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop,
data->rtreadfd,
SPA_IO_ERR | SPA_IO_HUP,
@ -676,7 +701,7 @@ static void clear_mems(struct pw_proxy *proxy)
struct mem_id *mid;
pw_array_for_each(mid, &data->mem_ids)
clear_memid(mid);
clear_memid(mid);
data->mem_ids.size = 0;
}
@ -981,6 +1006,25 @@ static const struct pw_node_events node_events = {
.have_output = node_have_output,
};
static void node_proxy_destroy(void *data)
{
struct node_data *d = data;
struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy;
clean_transport(proxy);
clear_buffers(proxy);
clear_mems(proxy);
pw_array_clear(&d->mem_ids);
pw_array_clear(&d->buffer_ids);
spa_hook_remove(&d->node_listener);
}
static const struct pw_proxy_events proxy_events = {
PW_VERSION_PROXY_EVENTS,
.destroy = node_proxy_destroy,
};
struct pw_proxy *pw_remote_export(struct pw_remote *remote,
struct pw_node *node)
{
@ -1010,10 +1054,11 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
pw_array_init(&data->buffer_ids, 32);
pw_array_ensure_size(&data->buffer_ids, sizeof(struct buffer_id) * 64);
pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data);
pw_node_add_listener(node, &data->node_listener, &node_events, data);
pw_client_node_proxy_add_listener(data->node_proxy,
&data->proxy_listener,
&data->node_proxy_listener,
&client_node_events,
proxy);
do_node_init(proxy);