diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index 0c77a8549..501358ea7 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -1664,11 +1664,7 @@ static int impl_node_process(void *object) break; done = (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)); - - /* when not async, we can return the data when we are done. - * In async mode we might first need to wake up the follower - * to asynchronously provide more data for the next round. */ - if (!this->async && done) + if (done) break; if (status & SPA_STATUS_NEED_DATA) { @@ -1684,10 +1680,6 @@ static int impl_node_process(void *object) if ((fstatus & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) == 0) break; } - /* converter produced something or is drained and we - * scheduled the follower above, we can stop now*/ - if (done) - break; } if (!done) spa_node_call_xrun(&this->callbacks, 0, 0, NULL); diff --git a/src/pipewire/context.c b/src/pipewire/context.c index c3618c6be..33c51905e 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -510,6 +510,17 @@ struct pw_data_loop *pw_context_get_data_loop(struct pw_context *context) return impl->data_loop_impl; } +SPA_EXPORT +struct pw_loop *pw_context_find_loop(struct pw_context *context, const char *name) +{ + if (spa_strstartswith(name, "main-loop.")) + return context->main_loop; + else if (spa_strstartswith(name, "data-loop.")) + return context->data_loop; + else + return NULL; +} + SPA_EXPORT struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context) { diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 6fb76a761..c3380804d 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -144,7 +144,6 @@ struct filter { unsigned int drained:1; unsigned int allow_mlock:1; unsigned int warn_mlock:1; - unsigned int process_rt:1; unsigned int trigger:1; int in_emit_param_changed; }; @@ -970,28 +969,12 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe return 0; } -static int -do_call_process(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct filter *impl = user_data; - struct pw_filter *filter = &impl->this; - pw_log_trace("%p: do process", filter); - pw_filter_emit_process(filter, filter->node->rt.position); - return 0; -} - static void call_process(struct filter *impl) { pw_log_trace_fp("%p: call process", impl); - if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) { - if (impl->rt_callbacks.funcs) - spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events, - process, 0, impl->this.node->rt.position); - } else { - pw_loop_invoke(impl->main_loop, - do_call_process, 1, NULL, 0, false, impl); - } + if (impl->rt_callbacks.funcs) + spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events, + process, 0, impl->this.node->rt.position); } static int @@ -1577,8 +1560,6 @@ pw_filter_connect(struct pw_filter *filter, pw_log_debug("%p: connect", filter); impl->flags = flags; - impl->process_rt = SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_RT_PROCESS); - impl->warn_mlock = pw_properties_get_bool(filter->properties, "mem.warn-mlock", impl->warn_mlock); impl->impl_node.iface = SPA_INTERFACE_INIT( @@ -1595,7 +1576,7 @@ pw_filter_connect(struct pw_filter *filter, impl->info.max_input_ports = UINT32_MAX; impl->info.max_output_ports = UINT32_MAX; impl->info.flags = SPA_NODE_FLAG_RT; - if (!impl->process_rt || SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_ASYNC)) + if (SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_ASYNC)) impl->info.flags |= SPA_NODE_FLAG_ASYNC; impl->info.props = &filter->properties->dict; impl->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, 0); @@ -1615,6 +1596,10 @@ pw_filter_connect(struct pw_filter *filter, impl->draining = false; filter_set_state(filter, PW_FILTER_STATE_CONNECTING, 0, NULL); + if (!SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_RT_PROCESS)) { + pw_properties_set(filter->properties, PW_KEY_NODE_DATA_LOOP, "main-loop.*"); + pw_properties_set(filter->properties, PW_KEY_NODE_ASYNC, "true"); + } if (flags & PW_FILTER_FLAG_DRIVER) pw_properties_set(filter->properties, PW_KEY_NODE_DRIVER, "true"); if (flags & PW_FILTER_FLAG_TRIGGER) { diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 06b8d3657..46188c3f2 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -1452,6 +1452,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, { struct impl *impl; struct pw_impl_node *this; + const char *str; size_t size; int res; @@ -1467,12 +1468,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, this = &impl->this; this->context = context; this->name = strdup("node"); - - this->data_loop = pw_context_get_data_loop(context)->loop; - this->data_system = this->data_loop->system; - - if (user_data_size > 0) - this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void); + this->source.fd = -1; if (properties == NULL) properties = pw_properties_new(NULL, NULL); @@ -1481,6 +1477,21 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, goto error_clean; } + if ((str = pw_properties_get(properties, PW_KEY_NODE_DATA_LOOP)) == NULL) + str = "data-loop.*"; + + this->data_loop = pw_context_find_loop(context, str); + if (this->data_loop == NULL) { + pw_log_error("unknown data-loop name '%s'", str); + res = -ENOENT; + goto error_clean; + } + + this->data_system = this->data_loop->system; + + if (user_data_size > 0) + this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void); + this->properties = properties; /* the eventfd used to signal the node */ @@ -1554,6 +1565,7 @@ error_clean: pw_memblock_unref(this->activation); if (this->source.fd != -1) spa_system_close(this->data_system, this->source.fd); + free(this->name); free(impl); error_exit: pw_properties_free(properties); diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index f59387c32..b28ff5038 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -279,9 +279,9 @@ static int tee_process(void *object) struct spa_io_buffers *io = &this->rt.io; uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1; - pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id); + pw_log_trace_fp("%p: tee input status:%d id:%d cycle:%d", this, io->status, io->buffer_id, cycle); spa_list_for_each(mix, &impl->mix_list, rt_link) { - pw_log_trace_fp("%p: port %d %p->%p %d", this, + pw_log_trace_fp("%p: port %d %p->%p id:%d", this, mix->port.port_id, io, mix->io[cycle], mix->io[cycle]->buffer_id); *mix->io[cycle] = *io; } @@ -321,9 +321,9 @@ static int schedule_mix_input(void *object) return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; spa_list_for_each(mix, &impl->mix_list, rt_link) { - pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this, + pw_log_trace_fp("%p: mix input %d %p->%p status:%d id:%d cycle:%d", this, mix->port.port_id, mix->io[cycle], io, - mix->io[cycle]->status, mix->io[cycle]->buffer_id); + mix->io[cycle]->status, mix->io[cycle]->buffer_id, cycle); *io = *mix->io[cycle]; mix->io[cycle]->status = SPA_STATUS_NEED_DATA; break; diff --git a/src/pipewire/keys.h b/src/pipewire/keys.h index 4b36cd522..1e77e8edd 100644 --- a/src/pipewire/keys.h +++ b/src/pipewire/keys.h @@ -183,6 +183,7 @@ extern "C" { #define PW_KEY_NODE_TRANSPORT_SYNC "node.transport.sync" /**< the node handles transport sync */ #define PW_KEY_NODE_DRIVER "node.driver" /**< node can drive the graph */ #define PW_KEY_NODE_ASYNC "node.async" /**< the node wants async scheduling */ +#define PW_KEY_NODE_DATA_LOOP "node.data-loop" /**< the data loops to run in */ #define PW_KEY_NODE_STREAM "node.stream" /**< node is a stream, the server side should * add a converter */ #define PW_KEY_NODE_VIRTUAL "node.virtual" /**< the node is some sort of virtual diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 5a5b205ba..570021b2f 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -1180,6 +1180,7 @@ int pw_proxy_init(struct pw_proxy *proxy, struct pw_core *core, const char *type void pw_proxy_remove(struct pw_proxy *proxy); int pw_context_recalc_graph(struct pw_context *context, const char *reason); +struct pw_loop *pw_context_find_loop(struct pw_context *context, const char *name); void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 6a60b2253..90ebadc88 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -147,7 +147,6 @@ struct stream { unsigned int drained:1; unsigned int allow_mlock:1; unsigned int warn_mlock:1; - unsigned int process_rt:1; unsigned int using_trigger:1; unsigned int trigger:1; unsigned int early_process:1; @@ -432,31 +431,14 @@ static inline uint32_t update_requested(struct stream *impl) return buffer->this.requested > 0 ? 1 : 0; } -static int -do_call_process(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct stream *impl = user_data; - struct pw_stream *stream = &impl->this; - pw_log_trace_fp("%p: do process", stream); - if (!impl->disconnecting) - pw_stream_emit_process(stream); - return 0; -} - static inline void call_process(struct stream *impl) { - pw_log_trace_fp("%p: call process rt:%u buffers:%d", impl, impl->process_rt, impl->n_buffers); + pw_log_trace_fp("%p: call process buffers:%d", impl, impl->n_buffers); if (impl->n_buffers == 0 || (impl->direction == SPA_DIRECTION_OUTPUT && update_requested(impl) <= 0)) return; - if (impl->process_rt) { - if (impl->rt_callbacks.funcs) - spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0); - } else { - pw_loop_invoke(impl->main_loop, - do_call_process, 1, NULL, 0, false, impl); - } + if (impl->rt_callbacks.funcs) + spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0); } static int @@ -689,11 +671,6 @@ static int impl_send_command(void *object, const struct spa_command *command) if (impl->io != NULL) impl->io->status = SPA_STATUS_NEED_DATA; } - else { - copy_position(impl, impl->queued.incount); - if (!impl->process_rt && !stream->node->driving) - call_process(impl); - } stream_set_state(stream, PW_STREAM_STATE_STREAMING, 0, NULL); } break; @@ -1089,16 +1066,6 @@ again: impl->drained = false; io->buffer_id = b->id; res = io->status = SPA_STATUS_HAVE_DATA; - /* we have a buffer, if we are not rt and don't follow - * any rate matching and there are no more - * buffers queued and there is a buffer to dequeue, ask for - * more buffers so that we have one in the next round. - * If we are using rate matching we need to wait until the - * rate matching node (audioconvert) has been scheduled to - * update the values. */ - ask_more = !impl->process_rt && impl->rate_match == NULL && - (impl->early_process || queue_is_empty(impl, &impl->queued)) && - !queue_is_empty(impl, &impl->dequeued); pw_log_trace_fp("%p: pop %d %p ask_more:%u %p", stream, b->id, io, ask_more, impl->rate_match); } else if (impl->draining || impl->drained) { @@ -1113,25 +1080,16 @@ again: pw_log_trace_fp("%p: no more buffers %p", stream, io); ask_more = true; } - } else { - ask_more = !impl->process_rt && - (impl->early_process || queue_is_empty(impl, &impl->queued)) && - !queue_is_empty(impl, &impl->dequeued); } copy_position(impl, impl->queued.outcount); - if (!impl->draining && !stream->node->driving) { + if (!impl->draining && !stream->node->driving && ask_more) { /* we're not draining, not a driver check if we need to get * more buffers */ - if (ask_more) { - call_process(impl); - /* realtime, we can try again now if there is something. - * non-realtime, we will have to try in the next round */ - if (impl->process_rt && - (impl->draining || !queue_is_empty(impl, &impl->queued))) - goto again; - } + call_process(impl); + if (impl->draining || !queue_is_empty(impl, &impl->queued)) + goto again; } pw_log_trace_fp("%p: res %d", stream, res); @@ -1911,7 +1869,6 @@ pw_stream_connect(struct pw_stream *stream, else impl->node_methods.process = impl_node_process_output; - impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS); impl->trigger_done_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_TRIGGER_DONE); impl->early_process = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_EARLY_PROCESS); @@ -1936,9 +1893,7 @@ pw_stream_connect(struct pw_stream *stream, /* we're always RT safe, if the stream was marked RT_PROCESS, * the callback must be RT safe */ impl->info.flags = SPA_NODE_FLAG_RT; - /* if the callback was not marked RT_PROCESS, we will offload - * the process callback in the main thread and we are ASYNC */ - if (!impl->process_rt || SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ASYNC)) + if (SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ASYNC)) impl->info.flags |= SPA_NODE_FLAG_ASYNC; impl->info.props = &stream->properties->dict; impl->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, 0); @@ -2001,6 +1956,10 @@ pw_stream_connect(struct pw_stream *stream, if (pw_properties_get(stream->properties, PW_KEY_NODE_DONT_RECONNECT) == NULL) pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true"); + if (!SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS)) { + pw_properties_set(stream->properties, PW_KEY_NODE_DATA_LOOP, "main-loop.*"); + pw_properties_set(stream->properties, PW_KEY_NODE_ASYNC, "true"); + } if (flags & PW_STREAM_FLAG_DRIVER) pw_properties_set(stream->properties, PW_KEY_NODE_DRIVER, "true"); if (flags & PW_STREAM_FLAG_TRIGGER) { @@ -2541,8 +2500,7 @@ do_trigger_driver(struct spa_loop *loop, struct stream *impl = user_data; int res; if (impl->direction == SPA_DIRECTION_OUTPUT) { - if (impl->process_rt) - call_process(impl); + call_process(impl); res = impl->node_methods.process(impl); } else { res = SPA_STATUS_NEED_DATA; @@ -2578,8 +2536,6 @@ int pw_stream_trigger_process(struct pw_stream *stream) if (impl->trigger) { pw_impl_node_trigger(stream->node); } else if (stream->node->driving) { - if (!impl->process_rt) - call_process(impl); res = pw_loop_invoke(impl->data_loop, do_trigger_driver, 1, NULL, 0, false, impl); } else {