diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h index 1b09bb5ed..2a1b2f9e4 100644 --- a/spa/include/spa/graph-scheduler3.h +++ b/spa/include/spa/graph-scheduler3.h @@ -130,11 +130,35 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched return false; } +static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node); + +static inline void spa_graph_scheduler_chain(struct spa_graph_scheduler *sched, + struct spa_list *ready) +{ + struct spa_graph_node *n, *t; + struct spa_graph_port *p; + + + spa_list_for_each_safe(n, t, ready, ready_link) { + n->state = n->methods->process_input(n, n->user_data); + debug("node %p chain processed in %d\n", n, n->state); + if (n->state == SPA_RESULT_HAVE_BUFFER) + spa_graph_scheduler_push(sched, n); + else { + n->ready_in = 0; + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + n->ready_in++; + } + } + spa_list_remove(&n->ready_link); + n->ready_link.next = NULL; + } +} static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node) { struct spa_graph_port *p; - struct spa_graph_node *n, *t; struct spa_list ready; debug("node %p start push\n", node); @@ -157,30 +181,15 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s spa_list_insert(ready.prev, &pnode->ready_link); } - spa_list_for_each_safe(n, t, &ready, ready_link) { - n->state = n->methods->process_input(n, n->user_data); - debug("peer %p processed in %d\n", n, n->state); - if (n->state == SPA_RESULT_HAVE_BUFFER) - spa_graph_scheduler_push(sched, n); - else { - n->ready_in = 0; - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - node->ready_in++; - } - } - spa_list_remove(&n->ready_link); - n->ready_link.next = NULL; - } + spa_graph_scheduler_chain(sched, &ready); node->state = node->methods->process_output(node, node->user_data); debug("node %p processed out %d\n", node, node->state); if (node->state == SPA_RESULT_NEED_BUFFER) { node->ready_in = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) - if (p->peer) - p->peer->node->ready_in++; + if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC)) + node->ready_in++; } } } diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index a9ec4a17d..127642564 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -489,7 +489,7 @@ static void make_node(struct data *data) data->port->user_data = data; data->port->implementation = &impl_port; pw_port_add(data->port, data->node); - pw_node_export(data->node); + pw_node_register(data->node); pw_remote_export(data->remote, data->node); } diff --git a/src/examples/export-v4l2.c b/src/examples/export-v4l2.c index 8579a06ec..9a0d8eb34 100644 --- a/src/examples/export-v4l2.c +++ b/src/examples/export-v4l2.c @@ -81,7 +81,7 @@ static void make_node(struct data *data) "spa.factory.name", "v4l2-source", NULL); data->node = pw_node_factory_create_node(factory, NULL, "v4l2-source", props); - pw_node_export(data->node); + pw_node_register(data->node); pw_remote_export(data->remote, data->node); } diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index 85167b070..aaeda6b65 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -472,7 +472,7 @@ static void make_nodes(struct data *data) data->port->user_data = data; data->port->implementation = &impl_port; pw_port_add(data->port, data->node); - pw_node_export(data->node); + pw_node_register(data->node); factory = pw_core_find_node_factory(data->core, "spa-node-factory"); props = pw_properties_new("spa.library.name", "v4l2/libspa-v4l2", diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 8f0eedc69..7f74d6361 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -156,24 +156,6 @@ static inline void do_flush(struct proxy *this) } -static inline void send_need_input(struct proxy *this) -{ - struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy); - - pw_transport_add_event(impl->transport, - &SPA_EVENT_INIT(impl->core->type.event_transport.NeedInput)); - do_flush(this); -} - -static inline void send_have_output(struct proxy *this) -{ - struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy); - - pw_transport_add_event(impl->transport, - &SPA_EVENT_INIT(impl->core->type.event_transport.HaveOutput)); - do_flush(this); -} - static int spa_proxy_node_send_command(struct spa_node *node, const struct spa_command *command) { struct proxy *this; @@ -195,9 +177,6 @@ static int spa_proxy_node_send_command(struct spa_node *node, const struct spa_c } else { /* send start */ pw_client_node_resource_node_command(this->resource, this->seq, command); - if (SPA_COMMAND_TYPE(command) == core->type.command_node.Start) - send_need_input(this); - res = SPA_RESULT_RETURN_ASYNC(this->seq++); } return res; @@ -780,9 +759,11 @@ static int spa_proxy_node_process_input(struct spa_node *node) pw_log_trace("%d %d", io->status, io->buffer_id); impl->transport->inputs[i] = *io; - io->status = SPA_RESULT_OK; + io->status = SPA_RESULT_NEED_BUFFER; } - send_have_output(this); + pw_transport_add_event(impl->transport, + &SPA_EVENT_INIT(impl->core->type.event_transport.ProcessInput)); + do_flush(this); if (this->callbacks->need_input) return SPA_RESULT_OK; @@ -795,45 +776,26 @@ static int spa_proxy_node_process_output(struct spa_node *node) struct proxy *this; struct impl *impl; int i; - bool send_need = false, flush = false; this = SPA_CONTAINER_OF(node, struct proxy, node); impl = this->impl; + pw_log_trace("process output"); + for (i = 0; i < MAX_OUTPUTS; i++) { - struct spa_port_io *io = this->out_ports[i].io, tmp; + struct spa_port_io *io = this->out_ports[i].io; if (!io) continue; - if (io->buffer_id != SPA_ID_INVALID) { - struct pw_event_transport_reuse_buffer rb = - PW_EVENT_TRANSPORT_REUSE_BUFFER_INIT(impl->core->type.event_transport. - ReuseBuffer, i, io->buffer_id); - - spa_log_trace(this->log, "reuse buffer %d", io->buffer_id); - - pw_transport_add_event(impl->transport, (struct spa_event *) &rb); - io->buffer_id = SPA_ID_INVALID; - flush = true; - } - - tmp = impl->transport->outputs[i]; impl->transport->outputs[i] = *io; - - pw_log_trace("%d %d %d %d", io->status, io->buffer_id, tmp.status, tmp.buffer_id); - - if (io->status == SPA_RESULT_NEED_BUFFER) - send_need = true; - - *io = tmp; + pw_log_trace("%d %d %d", io->status, io->buffer_id, io->status); } - if (send_need) - send_need_input(this); - else if (flush) - do_flush(this); + pw_transport_add_event(impl->transport, + &SPA_EVENT_INIT(impl->core->type.event_transport.ProcessOutput)); + do_flush(this); - return SPA_RESULT_HAVE_BUFFER; + return SPA_RESULT_OK; } static int handle_node_event(struct proxy *this, struct spa_event *event) diff --git a/src/modules/spa/spa-node.c b/src/modules/spa/spa-node.c index 20a21b355..d5bd60cc6 100644 --- a/src/modules/spa/spa-node.c +++ b/src/modules/spa/spa-node.c @@ -49,7 +49,6 @@ struct port { struct spa_node *node; }; - static int port_impl_enum_formats(struct pw_port *port, struct spa_format **format, const struct spa_format *filter, @@ -313,7 +312,7 @@ static void complete_init(struct impl *impl) { struct pw_node *this = impl->this; update_port_ids(impl); - pw_node_export(this); + pw_node_register(this); } static void on_node_done(struct spa_node *node, int seq, int res, void *user_data) diff --git a/src/pipewire/node.c b/src/pipewire/node.c index e2b8ceb38..efda00918 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -42,7 +42,7 @@ struct impl { struct pw_listener on_need_input; struct pw_listener on_have_output; - bool exported; + bool registered; }; /** \endcond */ @@ -270,12 +270,12 @@ do_node_add(struct spa_loop *loop, } -void pw_node_export(struct pw_node *this) +void pw_node_register(struct pw_node *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_core *core = this->core; - pw_log_debug("node %p: export", this); + pw_log_debug("node %p: register", this); update_info(this); @@ -286,7 +286,7 @@ void pw_node_export(struct pw_node *this) core->type.node, PW_VERSION_NODE, node_bind_func, this); - impl->exported = true; + impl->registered = true; pw_signal_emit(&this->initialized, this); pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL); @@ -419,7 +419,7 @@ void pw_node_destroy(struct pw_node *node) pw_loop_invoke(node->data_loop, do_node_remove, 1, 0, NULL, true, node); - if (impl->exported) { + if (impl->registered) { spa_list_remove(&node->link); pw_global_destroy(node->global); node->global = NULL; diff --git a/src/pipewire/node.h b/src/pipewire/node.h index 33c92b271..837a933fc 100644 --- a/src/pipewire/node.h +++ b/src/pipewire/node.h @@ -158,8 +158,8 @@ pw_node_new(struct pw_core *core, /**< the core */ struct pw_properties *properties, /**< extra properties */ size_t user_data_size /**< user data size */); -/** Complete initialization of the node */ -void pw_node_export(struct pw_node *node); +/** Complete initialization of the node and register */ +void pw_node_register(struct pw_node *node); /** Destroy a node */ void pw_node_destroy(struct pw_node *node); diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 0ac19830e..988f3e80b 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -52,6 +52,7 @@ static int schedule_tee_input(struct spa_graph_node *node, void *user_data) res = SPA_RESULT_NEED_BUFFER; } else { + pw_log_trace("tee input %d %d", io->status, io->buffer_id); spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) *p->io = *io; io->status = SPA_RESULT_OK; @@ -96,6 +97,7 @@ static int schedule_mix_input(struct spa_graph_node *node, void *user_data) struct spa_port_io *io = this->rt.mix_port.io; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + pw_log_trace("mix input %p %p->%p %d %d", p, p->io, io, p->io->status, p->io->buffer_id); *io = *p->io; p->io->status = SPA_RESULT_OK; p->io->buffer_id = SPA_ID_INVALID; @@ -297,6 +299,8 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_fo if (!SPA_RESULT_IS_ASYNC(res)) { if (format == NULL) { + if (port->buffers) + free(port->buffers); port->buffers = NULL; port->n_buffers = 0; if (port->allocated) @@ -334,6 +338,7 @@ int pw_port_set_param(struct pw_port *port, struct spa_param *param) int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) { int res; + size_t size; if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) return SPA_RESULT_OK; @@ -350,7 +355,11 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 pw_log_debug("port %p: use %d buffers", port, n_buffers); res = port->implementation->use_buffers(port, buffers, n_buffers); - port->buffers = buffers; + size = sizeof(struct spa_buffer *) * n_buffers; + + if (port->buffers) + free(port->buffers); + port->buffers = size ? memcpy(malloc(size), buffers, size) : NULL; port->n_buffers = n_buffers; if (port->allocated) pw_memblock_free(&port->buffer_mem); @@ -369,6 +378,7 @@ int pw_port_alloc_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t *n_buffers) { int res; + size_t size; if (port->state < PW_PORT_STATE_READY) return SPA_RESULT_NO_FORMAT; @@ -382,7 +392,11 @@ int pw_port_alloc_buffers(struct pw_port *port, pw_log_debug("port %p: alloc %d buffers", port, *n_buffers); res = port->implementation->alloc_buffers(port, params, n_params, buffers, n_buffers); - port->buffers = buffers; + size = sizeof(struct spa_buffer *) * *n_buffers; + + if (port->buffers) + free(port->buffers); + port->buffers = size ? memcpy(malloc(size), buffers, size) : NULL; port->n_buffers = *n_buffers; port->allocated = true; diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 16b9046c0..99c815436 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -81,6 +81,7 @@ struct node_data { struct trans_data { struct spa_graph_port *in_ports; struct spa_graph_port *out_ports; + /* memory for ports follows */ }; /** \endcond */ @@ -375,10 +376,18 @@ static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event) struct spa_graph_node *n = &data->node->rt.node; int res; - if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.HaveOutput) { - res = n->methods->process_input(n, n->user_data); + if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessInput) { + struct spa_list ready; + struct spa_graph_port *port; + + spa_list_init(&ready); + + spa_list_for_each(port, &n->ports[SPA_DIRECTION_INPUT], link) + spa_list_insert(ready.prev, &port->peer->node->ready_link); + + spa_graph_scheduler_chain(data->node->rt.sched, &ready); } - else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.NeedInput) { + else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessOutput) { res = n->methods->process_output(n, n->user_data); } else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ReuseBuffer) { @@ -450,6 +459,7 @@ static void client_node_transport(void *object, uint32_t node_id, i, 0, &data->trans->inputs[i]); + pw_log_info("transport in %d %p", i, &data->trans->inputs[i]); } spa_list_for_each(port, &data->node->input_ports, link) spa_graph_port_add(&port->rt.mix_node, &t->in_ports[port->port_id]); @@ -460,6 +470,7 @@ static void client_node_transport(void *object, uint32_t node_id, i, 0, &data->trans->outputs[i]); + pw_log_info("transport out %d %p", i, &data->trans->inputs[i]); } spa_list_for_each(port, &data->node->output_ports, link) spa_graph_port_add(&port->rt.mix_node, &t->out_ports[port->port_id]); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 9f82880b7..ff0d57d72 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -466,13 +466,13 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct pw_remote *remote = impl->this.remote; - if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.HaveOutput) { + if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessInput) { int i; for (i = 0; i < impl->trans->area->n_input_ports; i++) { struct spa_port_io *input = &impl->trans->inputs[i]; - pw_log_trace("stream %p: have output %d %d", stream, input->status, + pw_log_trace("stream %p: process input %d %d", stream, input->status, input->buffer_id); if (input->buffer_id == SPA_ID_INVALID) continue; @@ -481,7 +481,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even input->buffer_id = SPA_ID_INVALID; } send_need_input(stream); - } else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.NeedInput) { + } else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessOutput) { int i; for (i = 0; i < impl->trans->area->n_output_ports; i++) { @@ -493,7 +493,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even reuse_buffer(stream, output->buffer_id); output->buffer_id = SPA_ID_INVALID; } - pw_log_trace("stream %p: need input", stream); + pw_log_trace("stream %p: process output", stream); impl->in_need_buffer = true; pw_signal_emit(&stream->need_buffer, stream); impl->in_need_buffer = false; diff --git a/src/pipewire/transport.h b/src/pipewire/transport.h index a3e33692c..9f592de29 100644 --- a/src/pipewire/transport.h +++ b/src/pipewire/transport.h @@ -94,14 +94,18 @@ pw_transport_parse_event(struct pw_transport *trans, void *event); #define PW_TYPE_EVENT__Transport SPA_TYPE_EVENT_BASE "Transport" #define PW_TYPE_EVENT_TRANSPORT_BASE PW_TYPE_EVENT__Transport ":" -#define PW_TYPE_EVENT_TRANSPORT__HaveOutput PW_TYPE_EVENT_TRANSPORT_BASE "HaveOutput" -#define PW_TYPE_EVENT_TRANSPORT__NeedInput PW_TYPE_EVENT_TRANSPORT_BASE "NeedInput" -#define PW_TYPE_EVENT_TRANSPORT__ReuseBuffer PW_TYPE_EVENT_TRANSPORT_BASE "ReuseBuffer" +#define PW_TYPE_EVENT_TRANSPORT__HaveOutput PW_TYPE_EVENT_TRANSPORT_BASE "HaveOutput" +#define PW_TYPE_EVENT_TRANSPORT__NeedInput PW_TYPE_EVENT_TRANSPORT_BASE "NeedInput" +#define PW_TYPE_EVENT_TRANSPORT__ReuseBuffer PW_TYPE_EVENT_TRANSPORT_BASE "ReuseBuffer" +#define PW_TYPE_EVENT_TRANSPORT__ProcessInput PW_TYPE_EVENT_TRANSPORT_BASE "ProcessInput" +#define PW_TYPE_EVENT_TRANSPORT__ProcessOutput PW_TYPE_EVENT_TRANSPORT_BASE "ProcessOutput" struct pw_type_event_transport { uint32_t HaveOutput; uint32_t NeedInput; uint32_t ReuseBuffer; + uint32_t ProcessInput; + uint32_t ProcessOutput; }; static inline void @@ -111,6 +115,8 @@ pw_type_event_transport_map(struct spa_type_map *map, struct pw_type_event_trans type->HaveOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__HaveOutput); type->NeedInput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__NeedInput); type->ReuseBuffer = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ReuseBuffer); + type->ProcessInput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ProcessInput); + type->ProcessOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ProcessOutput); } }