diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h index 503f2706e..7886e806a 100644 --- a/spa/include/spa/graph-scheduler1.h +++ b/spa/include/spa/graph-scheduler1.h @@ -45,24 +45,6 @@ static inline void spa_graph_data_init(struct spa_graph_data *data, data->node = NULL; } -static inline int spa_graph_node_impl_input(void *data) -{ - struct spa_node *n = data; - return spa_node_process_input(n); -} - -static inline int spa_graph_node_impl_output(void *data) -{ - struct spa_node *n = data; - return spa_node_process_output(n); -} - -static const struct spa_graph_node_callbacks spa_graph_node_impl_default = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - spa_graph_node_impl_input, - spa_graph_node_impl_output, -}; - static inline void spa_graph_data_port_check(struct spa_graph_data *data, struct spa_graph_port *port) { struct spa_graph_node *node = port->node; @@ -70,7 +52,8 @@ static inline void spa_graph_data_port_check(struct spa_graph_data *data, struct if (port->io->status == SPA_RESULT_HAVE_BUFFER) node->ready_in++; - debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in); + debug("port %p node %p check %d %d %d\n", port, node, + port->io->status, node->ready_in, node->required_in); if (node->required_in > 0 && node->ready_in == node->required_in) { node->state = SPA_GRAPH_STATE_IN; @@ -100,7 +83,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data) switch (n->state) { case SPA_GRAPH_STATE_IN: - state = n->callbacks->process_input(n->callbacks_data); + state = spa_node_process_input(n->implementation); if (state == SPA_RESULT_NEED_BUFFER) n->state = SPA_GRAPH_STATE_CHECK_IN; else if (state == SPA_RESULT_HAVE_BUFFER) @@ -112,7 +95,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data) break; case SPA_GRAPH_STATE_OUT: - state = n->callbacks->process_output(n->callbacks_data); + state = spa_node_process_output(n->implementation); if (state == SPA_RESULT_NEED_BUFFER) n->state = SPA_GRAPH_STATE_CHECK_IN; else if (state == SPA_RESULT_HAVE_BUFFER) diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h index c7afbf698..64301c724 100644 --- a/spa/include/spa/graph-scheduler3.h +++ b/spa/include/spa/graph-scheduler3.h @@ -26,38 +26,6 @@ extern "C" { #include -static inline int spa_graph_node_impl_input(void *data) -{ - struct spa_node *n = data; - return spa_node_process_input(n); -} - -static inline int spa_graph_node_impl_output(void *data) -{ - struct spa_node *n = data; - return spa_node_process_output(n); -} - -static const struct spa_graph_node_callbacks spa_graph_node_impl_default = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - spa_graph_node_impl_input, - spa_graph_node_impl_output, -}; - -static inline int spa_graph_port_impl_reuse_buffer(void *data, - uint32_t buffer_id) -{ - struct spa_graph_port *port = data; - struct spa_node *node = port->node->callbacks_data; - debug("port %p reuse buffer %d\n", port, buffer_id); - return spa_node_port_reuse_buffer(node, port->port_id, buffer_id); -} - -static const struct spa_graph_port_callbacks spa_graph_port_impl_default = { - SPA_VERSION_GRAPH_PORT_CALLBACKS, - spa_graph_port_impl_reuse_buffer, -}; - static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node) { struct spa_graph_port *p; @@ -85,7 +53,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n } spa_list_for_each_safe(n, t, &ready, ready_link) { - n->state = n->callbacks->process_output(n->callbacks_data); + n->state = spa_node_process_output(n->implementation); debug("peer %p processed out %d\n", n, n->state); if (n->state == SPA_RESULT_NEED_BUFFER) spa_graph_need_input(n->graph, n); @@ -102,7 +70,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n debug("node %p ready_in:%d required_in:%d\n", node, node->ready_in, node->required_in); if (node->required_in > 0 && node->ready_in == node->required_in) { - node->state = node->callbacks->process_input(node->callbacks_data); + node->state = spa_node_process_input(node->implementation); debug("node %p processed in %d\n", node, node->state); if (node->state == SPA_RESULT_HAVE_BUFFER) { spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { @@ -143,7 +111,7 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node * } spa_list_for_each_safe(n, t, &ready, ready_link) { - n->state = n->callbacks->process_input(n->callbacks_data); + n->state = spa_node_process_input(n->implementation); debug("node %p chain processed in %d\n", n, n->state); if (n->state == SPA_RESULT_HAVE_BUFFER) spa_graph_have_output(n->graph, n); @@ -159,7 +127,7 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node * n->ready_link.next = NULL; } - node->state = node->callbacks->process_output(node->callbacks_data); + node->state = spa_node_process_output(node->implementation); debug("node %p processed out %d\n", node, node->state); if (node->state == SPA_RESULT_NEED_BUFFER) { node->ready_in = 0; diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index eaf18a676..d2616196e 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -56,21 +56,7 @@ struct spa_graph { #define spa_graph_need_input(g,n) ((g)->callbacks->need_input((g)->callbacks_data, (n))) #define spa_graph_have_output(g,n) ((g)->callbacks->have_output((g)->callbacks_data, (n))) - -struct spa_graph_node_callbacks { -#define SPA_VERSION_GRAPH_NODE_CALLBACKS 0 - uint32_t version; - - int (*process_input) (void *data); - int (*process_output) (void *data); -}; - -struct spa_graph_port_callbacks { -#define SPA_VERSION_GRAPH_PORT_CALLBACKS 0 - uint32_t version; - - int (*reuse_buffer) (void *data, uint32_t buffer_id); -}; +#define spa_graph_reuse_buffer(g,n,p,i) ((g)->callbacks->reuse_buffer((g)->callbacks_data, (n),(p),(i))) struct spa_graph_node { struct spa_list link; /**< link in graph nodes list */ @@ -82,9 +68,7 @@ struct spa_graph_node { uint32_t required_in; /**< required number of ports */ uint32_t ready_in; /**< number of ports with data */ int state; /**< state of the node */ - /** callbacks and data */ - const struct spa_graph_node_callbacks *callbacks; - void *callbacks_data; + struct spa_node *implementation;/**< node implementation */ void *scheduler_data; /**< scheduler private data */ }; @@ -96,9 +80,6 @@ struct spa_graph_port { uint32_t flags; /**< port flags */ struct spa_port_io *io; /**< io area of the port */ struct spa_graph_port *peer; /**< peer */ - /** callbacks and data */ - const struct spa_graph_port_callbacks *callbacks; - void *callbacks_data; void *scheduler_data; /**< scheduler private data */ }; @@ -127,12 +108,10 @@ spa_graph_node_init(struct spa_graph_node *node) } static inline void -spa_graph_node_set_callbacks(struct spa_graph_node *node, - const struct spa_graph_node_callbacks *callbacks, - void *data) +spa_graph_node_set_implementation(struct spa_graph_node *node, + struct spa_node *implementation) { - node->callbacks = callbacks; - node->callbacks_data = data; + node->implementation = implementation; } static inline void @@ -160,15 +139,6 @@ spa_graph_port_init(struct spa_graph_port *port, port->io = io; } -static inline void -spa_graph_port_set_callbacks(struct spa_graph_port *port, - const struct spa_graph_port_callbacks *callbacks, - void *data) -{ - port->callbacks = callbacks; - port->callbacks_data = data; -} - static inline void spa_graph_port_add(struct spa_graph_node *node, struct spa_graph_port *port) diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 5a203b69f..8e950852c 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -347,7 +347,7 @@ static int loop_iterate(struct spa_loop_control *ctrl, int timeout) } } spa_list_for_each_safe(source, tmp, &impl->destroy_list, link) - free(source); + free(source); spa_list_init(&impl->destroy_list); diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index cda37671b..30d6af5c6 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -337,13 +337,13 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]); spa_graph_node_init(&data->source_node); - spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source); + spa_graph_node_set_implementation(&data->source_node, data->source); spa_graph_node_add(&data->graph, &data->source_node); spa_graph_port_init(&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]); spa_graph_port_add(&data->source_node, &data->source_out); spa_graph_node_init(&data->volume_node); - spa_graph_node_set_callbacks(&data->volume_node, &spa_graph_node_impl_default, data->volume); + spa_graph_node_set_implementation(&data->volume_node, data->volume); spa_graph_node_add(&data->graph, &data->volume_node); spa_graph_port_init(&data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]); spa_graph_port_add(&data->volume_node, &data->volume_in); @@ -354,7 +354,7 @@ static int make_nodes(struct data *data, const char *device) spa_graph_port_add(&data->volume_node, &data->volume_out); spa_graph_node_init(&data->sink_node); - spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink); + spa_graph_node_set_implementation(&data->sink_node, data->sink); spa_graph_node_add(&data->graph, &data->sink_node); spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]); spa_graph_port_add(&data->sink_node, &data->sink_in); diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 2de5aade4..e86a6e089 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -414,19 +414,19 @@ static int make_nodes(struct data *data, const char *device) #ifdef USE_GRAPH spa_graph_node_init(&data->source1_node); - spa_graph_node_set_callbacks(&data->source1_node, &spa_graph_node_impl_default, data->source1); + spa_graph_node_set_implementation(&data->source1_node, data->source1); spa_graph_port_init(&data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]); spa_graph_port_add(&data->source1_node, &data->source1_out); spa_graph_node_add(&data->graph, &data->source1_node); spa_graph_node_init(&data->source2_node); - spa_graph_node_set_callbacks(&data->source2_node, &spa_graph_node_impl_default, data->source2); + spa_graph_node_set_implementation(&data->source2_node, data->source2); spa_graph_port_init(&data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]); spa_graph_port_add(&data->source2_node, &data->source2_out); spa_graph_node_add(&data->graph, &data->source2_node); spa_graph_node_init(&data->mix_node); - spa_graph_node_set_callbacks(&data->mix_node, &spa_graph_node_impl_default, data->mix); + spa_graph_node_set_implementation(&data->mix_node, data->mix); spa_graph_port_init(&data->mix_in[0], SPA_DIRECTION_INPUT, data->mix_ports[0], 0, &data->source1_mix_io[0]); spa_graph_port_add(&data->mix_node, &data->mix_in[0]); @@ -442,7 +442,7 @@ static int make_nodes(struct data *data, const char *device) spa_graph_port_add(&data->mix_node, &data->mix_out); spa_graph_node_init(&data->sink_node); - spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink); + spa_graph_node_set_implementation(&data->sink_node, data->sink); spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]); spa_graph_port_add(&data->sink_node, &data->sink_in); spa_graph_node_add(&data->graph, &data->sink_node); diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index 87e8792ce..228479e87 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -362,7 +362,7 @@ static int make_nodes(struct data *data) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]); spa_graph_node_init(&data->source_node); - spa_graph_node_set_callbacks(&data->source_node, &spa_graph_node_impl_default, data->source); + spa_graph_node_set_implementation(&data->source_node, data->source); spa_graph_node_add(&data->graph, &data->source_node); data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; @@ -370,7 +370,7 @@ static int make_nodes(struct data *data) spa_graph_port_add(&data->source_node, &data->source_out); spa_graph_node_init(&data->sink_node); - spa_graph_node_set_callbacks(&data->sink_node, &spa_graph_node_impl_default, data->sink); + spa_graph_node_set_implementation(&data->sink_node, data->sink); spa_graph_node_add(&data->graph, &data->sink_node); data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index 62061152b..1de693d5d 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -448,7 +448,7 @@ static void make_node(struct data *data) struct pw_properties *props; props = pw_properties_new( - //"pipewire.target.node", port_path, + "pipewire.target.node", data->path, "pipewire.autoconnect", "1", NULL); diff --git a/src/examples/export-v4l2.c b/src/examples/export-v4l2.c index a09f4b8d9..f270b0caa 100644 --- a/src/examples/export-v4l2.c +++ b/src/examples/export-v4l2.c @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -79,8 +80,6 @@ static void make_node(struct data *data) "spa.factory.name", "v4l2-source", NULL); data->node = pw_node_factory_create_node(factory, NULL, "v4l2-source", props); - pw_node_register(data->node); - pw_remote_export(data->remote, data->node); } @@ -110,6 +109,12 @@ static const struct pw_remote_events remote_events = { .state_changed = on_state_changed, }; +static void do_quit(void *data, int signal_number) +{ + struct data *d = data; + d->running = false; +} + int main(int argc, char *argv[]) { struct data data = { 0, }; @@ -117,6 +122,9 @@ int main(int argc, char *argv[]) pw_init(&argc, &argv); data.loop = pw_loop_new(NULL); + pw_loop_add_signal(data.loop, SIGINT, do_quit, &data); + pw_loop_add_signal(data.loop, SIGTERM, do_quit, &data); + data.running = true; data.core = pw_core_new(data.loop, NULL); data.t = pw_core_get_type(data.core); @@ -134,10 +142,14 @@ int main(int argc, char *argv[]) pw_loop_enter(data.loop); while (data.running) { - pw_loop_iterate(data.loop, -1); + pw_loop_iterate(data.loop, 100); } pw_loop_leave(data.loop); + pw_remote_destroy(data.remote); + if (data.node) + pw_node_destroy(data.node); + pw_core_destroy(data.core); pw_loop_destroy(data.loop); return 0; diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index d281f676d..349f0128c 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -107,7 +107,7 @@ static void handle_events(struct data *data) while (SDL_PollEvent(&event)) { switch (event.type) { case SDL_QUIT: - exit(0); + data->running = false; break; } } @@ -487,10 +487,13 @@ int main(int argc, char *argv[]) pw_loop_enter(data.loop); while (data.running) { - pw_loop_iterate(data.loop, -1); + pw_loop_iterate(data.loop, 100); } pw_loop_leave(data.loop); + pw_link_destroy(data.link); + pw_node_destroy(data.node); + pw_core_destroy(data.core); pw_loop_destroy(data.loop); return 0; diff --git a/src/modules/module-jack.c b/src/modules/module-jack.c index cd178e441..197260e53 100644 --- a/src/modules/module-jack.c +++ b/src/modules/module-jack.c @@ -1124,7 +1124,7 @@ static void jack_node_pull(void *data) spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) continue; - pn->state = pn->callbacks->process_input(pn->callbacks_data); + pn->state = spa_node_process_input(pn->implementation); } } @@ -1154,7 +1154,7 @@ static void jack_node_push(void *data) spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) continue; - pn->state = pn->callbacks->process_output(pn->callbacks_data); + pn->state = spa_node_process_output(pn->implementation); } spa_list_for_each(node, &impl->rt.nodes, graph_link) { @@ -1163,25 +1163,25 @@ static void jack_node_push(void *data) spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) continue; - pn->state = pn->callbacks->process_output(pn->callbacks_data); + pn->state = spa_node_process_output(pn->implementation); } - n->state = n->callbacks->process_output(n->callbacks_data); + n->state = spa_node_process_output(n->implementation); /* mix inputs */ spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) continue; - pn->state = pn->callbacks->process_output(pn->callbacks_data); - pn->state = pn->callbacks->process_input(pn->callbacks_data); + pn->state = spa_node_process_output(pn->implementation); + pn->state = spa_node_process_input(pn->implementation); } - n->state = n->callbacks->process_input(n->callbacks_data); + n->state = spa_node_process_input(n->implementation); /* tee outputs */ spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) continue; - pn->state = pn->callbacks->process_input(pn->callbacks_data); + pn->state = spa_node_process_input(pn->implementation); } } diff --git a/src/modules/module-jack/jack-node.c b/src/modules/module-jack/jack-node.c index 008136f72..cf51c24db 100644 --- a/src/modules/module-jack/jack-node.c +++ b/src/modules/module-jack/jack-node.c @@ -100,6 +100,8 @@ struct port_data { bool driver_port; + struct spa_node mix_node; + struct spa_port_info info; struct spa_port_io *io; @@ -278,7 +280,7 @@ static int driver_process_output(struct spa_node *node) spa_hook_list_call(&nd->listener_list, struct pw_jack_node_events, pull); spa_list_for_each(p, &gn->ports[SPA_DIRECTION_INPUT], link) { - struct pw_port *port = p->callbacks_data; + struct pw_port *port = p->scheduler_data; struct port_data *ipd = pw_port_get_user_data(port); struct spa_port_io *in_io = ipd->io; struct buffer *in; @@ -327,7 +329,7 @@ static int node_process_input(struct spa_node *node) &server->synchro_table[ref_num]); spa_list_for_each(p, &gn->ports[SPA_DIRECTION_OUTPUT], link) { - struct pw_port *port = p->callbacks_data; + struct pw_port *port = p->scheduler_data; struct port_data *opd = pw_port_get_user_data(port); struct spa_port_io *out_io = opd->io; out_io->buffer_id = 0; @@ -346,7 +348,7 @@ static int node_process_output(struct spa_node *node) pw_log_trace(NAME " %p: process output", nd); spa_list_for_each(p, &gn->ports[SPA_DIRECTION_INPUT], link) { - struct pw_port *port = p->callbacks_data; + struct pw_port *port = p->scheduler_data; struct port_data *ipd = pw_port_get_user_data(port); struct spa_port_io *in_io = ipd->io; in_io->buffer_id = 0; @@ -580,18 +582,18 @@ static const struct spa_node node_impl = { .process_output = node_process_output, }; -static int schedule_mix_input(void *data) +static int schedule_mix_input(struct spa_node *_node) { - struct pw_jack_port *this = data; + struct port_data *pd = SPA_CONTAINER_OF(_node, struct port_data, mix_node); + struct pw_jack_port *this = &pd->port; struct spa_graph_node *node = &this->port->rt.mix_node; struct spa_graph_port *p; struct spa_port_io *io = this->port->rt.mix_port.io; - struct port_data *pd = SPA_CONTAINER_OF(this, struct port_data, port); size_t buffer_size = pd->node->node.server->engine_control->buffer_size; int layer = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - struct pw_link *link = p->callbacks_data; + struct pw_link *link = p->scheduler_data; struct spa_buffer *inbuf; pw_log_trace("mix %p: input %d %d", node, p->io->buffer_id, link->output->n_buffers); @@ -616,9 +618,10 @@ static int schedule_mix_input(void *data) return SPA_RESULT_HAVE_BUFFER; } -static int schedule_mix_output(void *data) +static int schedule_mix_output(struct spa_node *_node) { - struct pw_jack_port *this = data; + struct port_data *pd = SPA_CONTAINER_OF(_node, struct port_data, mix_node); + struct pw_jack_port *this = &pd->port; struct spa_graph_node *node = &this->port->rt.mix_node; struct spa_graph_port *p; struct spa_port_io *io = this->port->rt.mix_port.io; @@ -631,10 +634,11 @@ static int schedule_mix_output(void *data) return SPA_RESULT_NEED_BUFFER; } -static const struct spa_graph_node_callbacks schedule_mix_node = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - schedule_mix_input, - schedule_mix_output, +static const struct spa_node schedule_mix_node = { + SPA_VERSION_NODE, + NULL, + .process_input = schedule_mix_input, + .process_output = schedule_mix_output, }; static void port_destroy(void *data) @@ -771,6 +775,9 @@ pw_jack_node_add_port(struct pw_jack_node *node, pw_port_add(port->port, node->node); + pd->mix_node = schedule_mix_node; + + { struct spa_buffer *b = &pd->buf; struct type *t = &pd->node->type; @@ -792,7 +799,7 @@ pw_jack_node_add_port(struct pw_jack_node *node, port->port->state = PW_PORT_STATE_PAUSED; } if (direction == PW_DIRECTION_INPUT) { - spa_graph_node_set_callbacks(&port->port->rt.mix_node, &schedule_mix_node, port); + spa_graph_node_set_implementation(&port->port->rt.mix_node, &pd->mix_node); } diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 985639327..fa3510299 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -1132,8 +1132,8 @@ struct pw_link *pw_link_new(struct pw_core *core, 0, &this->io); - this->rt.in_port.callbacks_data = this; - this->rt.out_port.callbacks_data = this; + this->rt.in_port.scheduler_data = this; + this->rt.out_port.scheduler_data = this; /* nodes can be in different data loops so we do this twice */ pw_loop_invoke(output_node->data_loop, do_add_link, diff --git a/src/pipewire/node.c b/src/pipewire/node.c index f2a5a0572..a87093b43 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -273,26 +273,6 @@ void pw_node_register(struct pw_node *this) pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL); } -static int -graph_impl_process_input(void *data) -{ - struct pw_node *this = data; - return spa_node_process_input(this->node); -} - -static int -graph_impl_process_output(void *data) -{ - struct pw_node *this = data; - return spa_node_process_output(this->node); -} - -static const struct spa_graph_node_callbacks graph_callbacks = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - .process_input = graph_impl_process_input, - .process_output = graph_impl_process_output, -}; - struct pw_node *pw_node_new(struct pw_core *core, struct pw_resource *owner, struct pw_global *parent, @@ -342,9 +322,6 @@ struct pw_node *pw_node_new(struct pw_core *core, pw_map_init(&this->output_port_map, 64, 64); spa_graph_node_init(&this->rt.node); - spa_graph_node_set_callbacks(&this->rt.node, - &graph_callbacks, - this); return this; @@ -446,9 +423,8 @@ static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id) if (p->port_id != port_id) continue; - pp = p->peer; - if (pp && pp->callbacks->reuse_buffer) - pp->callbacks->reuse_buffer(pp->callbacks_data, buffer_id); + if ((pp = p->peer) != NULL) + spa_node_port_reuse_buffer(pp->node->implementation, pp->port_id, buffer_id); break; } } @@ -468,6 +444,7 @@ void pw_node_set_implementation(struct pw_node *node, { node->node = spa_node; spa_node_set_callbacks(node->node, &node_callbacks, node); + spa_graph_node_set_implementation(&node->rt.node, spa_node); } struct spa_node *pw_node_get_implementation(struct pw_node *node) diff --git a/src/pipewire/port.c b/src/pipewire/port.c index f7ddc3563..95abdb532 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -28,6 +28,8 @@ /** \cond */ struct impl { struct pw_port this; + + struct spa_node mix_node; }; /** \endcond */ @@ -41,9 +43,10 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state) } } -static int schedule_tee_input(void *data) +static int schedule_tee_input(struct spa_node *data) { - struct pw_port *this = data; + struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node); + struct pw_port *this = &impl->this; struct spa_graph_node *node = &this->rt.mix_node; struct spa_graph_port *p; struct spa_port_io *io = this->rt.mix_port.io; @@ -63,9 +66,10 @@ static int schedule_tee_input(void *data) } return res; } -static int schedule_tee_output(void *data) +static int schedule_tee_output(struct spa_node *data) { - struct pw_port *this = data; + struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node); + struct pw_port *this = &impl->this; struct spa_graph_node *node = &this->rt.mix_node; struct spa_graph_port *p; struct spa_port_io *io = this->rt.mix_port.io; @@ -77,25 +81,23 @@ static int schedule_tee_output(void *data) return SPA_RESULT_NEED_BUFFER; } -static const struct spa_graph_node_callbacks schedule_tee_node = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - schedule_tee_input, - schedule_tee_output, -}; - -static int schedule_tee_reuse_buffer(void *data, uint32_t buffer_id) +static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) { return SPA_RESULT_OK; } -static const struct spa_graph_port_callbacks schedule_tee_port = { - SPA_VERSION_GRAPH_PORT_CALLBACKS, - schedule_tee_reuse_buffer, +static const struct spa_node schedule_tee_node = { + SPA_VERSION_NODE, + NULL, + .process_input = schedule_tee_input, + .process_output = schedule_tee_output, + .port_reuse_buffer = schedule_tee_reuse_buffer, }; -static int schedule_mix_input(void *data) +static int schedule_mix_input(struct spa_node *data) { - struct pw_port *this = data; + struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node); + struct pw_port *this = &impl->this; struct spa_graph_node *node = &this->rt.mix_node; struct spa_graph_port *p; struct spa_port_io *io = this->rt.mix_port.io; @@ -111,9 +113,10 @@ static int schedule_mix_input(void *data) return SPA_RESULT_HAVE_BUFFER; } -static int schedule_mix_output(void *data) +static int schedule_mix_output(struct spa_node *data) { - struct pw_port *this = data; + struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node); + struct pw_port *this = &impl->this; struct spa_graph_node *node = &this->rt.mix_node; struct spa_graph_port *p; struct spa_port_io *io = this->rt.mix_port.io; @@ -126,19 +129,17 @@ static int schedule_mix_output(void *data) return SPA_RESULT_NEED_BUFFER; } -static const struct spa_graph_node_callbacks schedule_mix_node = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - schedule_mix_input, - schedule_mix_output, -}; - -static int schedule_mix_reuse_buffer(void *data, uint32_t buffer_id) +static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) { return SPA_RESULT_OK; } -static const struct spa_graph_port_callbacks schedule_mix_port = { - SPA_VERSION_GRAPH_PORT_CALLBACKS, - schedule_mix_reuse_buffer, + +static const struct spa_node schedule_mix_node = { + SPA_VERSION_NODE, + NULL, + .process_input = schedule_mix_input, + .process_output = schedule_mix_output, + .port_reuse_buffer = schedule_mix_reuse_buffer, }; struct pw_port *pw_port_new(enum pw_direction direction, @@ -175,29 +176,24 @@ struct pw_port *pw_port_new(enum pw_direction direction, spa_hook_list_init(&this->listener_list); - spa_graph_port_set_callbacks(&this->rt.port, NULL, this); - spa_graph_port_init(&this->rt.port, this->direction, this->port_id, 0, &this->io); spa_graph_node_init(&this->rt.mix_node); - spa_graph_node_set_callbacks(&this->rt.mix_node, - this->direction == PW_DIRECTION_INPUT ? - &schedule_mix_node : - &schedule_tee_node, - this); + + impl->mix_node = this->direction == PW_DIRECTION_INPUT ? schedule_mix_node : schedule_tee_node; + spa_graph_node_set_implementation(&this->rt.mix_node, &impl->mix_node); spa_graph_port_init(&this->rt.mix_port, pw_direction_reverse(this->direction), 0, 0, &this->io); - spa_graph_port_set_callbacks(&this->rt.mix_port, - this->direction == PW_DIRECTION_INPUT ? - &schedule_mix_port : - &schedule_tee_port, - this); + + this->rt.mix_port.scheduler_data = this; + this->rt.port.scheduler_data = this; + return this; no_mem: diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index c7a14d57b..3a00a7ce2 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -78,6 +78,7 @@ struct node_data { struct spa_hook node_listener; struct pw_client_node_proxy *node_proxy; + struct spa_hook node_proxy_listener; struct spa_hook proxy_listener; struct pw_array mem_ids; @@ -430,7 +431,7 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ /* process all input in the mixers */ spa_list_for_each(port, &n->ports[SPA_DIRECTION_INPUT], link) { pn = port->peer->node; - pn->state = pn->callbacks->process_input(pn->callbacks_data); + pn->state = spa_node_process_input(pn->implementation); if (pn->state == SPA_RESULT_HAVE_BUFFER) spa_graph_have_output(data->node->rt.graph, pn); else { @@ -444,7 +445,7 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ } } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { - n->callbacks->process_output(n->callbacks_data); + spa_node_process_output(n->implementation); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { } @@ -479,6 +480,28 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) } } +static void clean_transport(struct pw_proxy *proxy) +{ + struct node_data *data = proxy->user_data; + struct pw_port *port; + + if (data->trans == NULL) + return; + + spa_list_for_each(port, &data->node->input_ports, link) + spa_graph_port_remove(&data->in_ports[port->port_id]); + spa_list_for_each(port, &data->node->output_ports, link) + spa_graph_port_remove(&data->out_ports[port->port_id]); + + free(data->in_ports); + free(data->out_ports); + pw_client_node_transport_destroy(data->trans); + unhandle_socket(proxy); + close(data->rtwritefd); + + data->trans = NULL; +} + static void client_node_transport(void *object, uint32_t node_id, int readfd, int writefd, struct pw_client_node_transport *transport) @@ -488,14 +511,18 @@ static void client_node_transport(void *object, uint32_t node_id, struct pw_port *port; int i; + clean_transport(proxy); + data->node_id = node_id; data->trans = transport; pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u", proxy, data->trans, readfd, writefd, node_id); - data->in_ports = calloc(data->trans->area->max_input_ports, sizeof(struct spa_graph_port)); - data->out_ports = calloc(data->trans->area->max_output_ports, sizeof(struct spa_graph_port)); + data->in_ports = calloc(data->trans->area->max_input_ports, + sizeof(struct spa_graph_port)); + data->out_ports = calloc(data->trans->area->max_output_ports, + sizeof(struct spa_graph_port)); for (i = 0; i < data->trans->area->max_input_ports; i++) { spa_graph_port_init(&data->in_ports[i], @@ -521,8 +548,6 @@ static void client_node_transport(void *object, uint32_t node_id, data->rtreadfd = readfd; data->rtwritefd = writefd; - - unhandle_socket(proxy); data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, data->rtreadfd, SPA_IO_ERR | SPA_IO_HUP, @@ -676,7 +701,7 @@ static void clear_mems(struct pw_proxy *proxy) struct mem_id *mid; pw_array_for_each(mid, &data->mem_ids) - clear_memid(mid); + clear_memid(mid); data->mem_ids.size = 0; } @@ -981,6 +1006,25 @@ static const struct pw_node_events node_events = { .have_output = node_have_output, }; +static void node_proxy_destroy(void *data) +{ + struct node_data *d = data; + struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy; + + clean_transport(proxy); + clear_buffers(proxy); + clear_mems(proxy); + pw_array_clear(&d->mem_ids); + pw_array_clear(&d->buffer_ids); + + spa_hook_remove(&d->node_listener); +} + +static const struct pw_proxy_events proxy_events = { + PW_VERSION_PROXY_EVENTS, + .destroy = node_proxy_destroy, +}; + struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct pw_node *node) { @@ -1010,10 +1054,11 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, pw_array_init(&data->buffer_ids, 32); pw_array_ensure_size(&data->buffer_ids, sizeof(struct buffer_id) * 64); + pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data); pw_node_add_listener(node, &data->node_listener, &node_events, data); pw_client_node_proxy_add_listener(data->node_proxy, - &data->proxy_listener, + &data->node_proxy_listener, &client_node_events, proxy); do_node_init(proxy);