stream: improve async handling

We can remove most of the special async handling in adapter, filter and
stream because this is now handled in the core.

Add a node.data-loop property to assign the node to a named data-loop.

Assign the non-rt stream and filter to the main loop. This means that
the node fd will be added to the main-loop and will be woken up directly
without having to wake up the RT thread and invoke the process callback
in the main-loop first. Because non-RT implies async, we can do all of
this like we do our rt processing because the output will only be used
in the next cycle.
This commit is contained in:
Wim Taymans 2024-04-18 12:20:38 +02:00
parent 34be6c76a6
commit e1e0a886d5
8 changed files with 57 additions and 99 deletions

View file

@ -1664,11 +1664,7 @@ static int impl_node_process(void *object)
break; break;
done = (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)); done = (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED));
if (done)
/* when not async, we can return the data when we are done.
* In async mode we might first need to wake up the follower
* to asynchronously provide more data for the next round. */
if (!this->async && done)
break; break;
if (status & SPA_STATUS_NEED_DATA) { if (status & SPA_STATUS_NEED_DATA) {
@ -1684,10 +1680,6 @@ static int impl_node_process(void *object)
if ((fstatus & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) == 0) if ((fstatus & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) == 0)
break; break;
} }
/* converter produced something or is drained and we
* scheduled the follower above, we can stop now*/
if (done)
break;
} }
if (!done) if (!done)
spa_node_call_xrun(&this->callbacks, 0, 0, NULL); spa_node_call_xrun(&this->callbacks, 0, 0, NULL);

View file

@ -510,6 +510,17 @@ struct pw_data_loop *pw_context_get_data_loop(struct pw_context *context)
return impl->data_loop_impl; return impl->data_loop_impl;
} }
SPA_EXPORT
struct pw_loop *pw_context_find_loop(struct pw_context *context, const char *name)
{
if (spa_strstartswith(name, "main-loop."))
return context->main_loop;
else if (spa_strstartswith(name, "data-loop."))
return context->data_loop;
else
return NULL;
}
SPA_EXPORT SPA_EXPORT
struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context) struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context)
{ {

View file

@ -144,7 +144,6 @@ struct filter {
unsigned int drained:1; unsigned int drained:1;
unsigned int allow_mlock:1; unsigned int allow_mlock:1;
unsigned int warn_mlock:1; unsigned int warn_mlock:1;
unsigned int process_rt:1;
unsigned int trigger:1; unsigned int trigger:1;
int in_emit_param_changed; int in_emit_param_changed;
}; };
@ -970,28 +969,12 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
return 0; return 0;
} }
static int
do_call_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct filter *impl = user_data;
struct pw_filter *filter = &impl->this;
pw_log_trace("%p: do process", filter);
pw_filter_emit_process(filter, filter->node->rt.position);
return 0;
}
static void call_process(struct filter *impl) static void call_process(struct filter *impl)
{ {
pw_log_trace_fp("%p: call process", impl); pw_log_trace_fp("%p: call process", impl);
if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) { if (impl->rt_callbacks.funcs)
if (impl->rt_callbacks.funcs) spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events,
spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events, process, 0, impl->this.node->rt.position);
process, 0, impl->this.node->rt.position);
} else {
pw_loop_invoke(impl->main_loop,
do_call_process, 1, NULL, 0, false, impl);
}
} }
static int static int
@ -1577,8 +1560,6 @@ pw_filter_connect(struct pw_filter *filter,
pw_log_debug("%p: connect", filter); pw_log_debug("%p: connect", filter);
impl->flags = flags; impl->flags = flags;
impl->process_rt = SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_RT_PROCESS);
impl->warn_mlock = pw_properties_get_bool(filter->properties, "mem.warn-mlock", impl->warn_mlock); impl->warn_mlock = pw_properties_get_bool(filter->properties, "mem.warn-mlock", impl->warn_mlock);
impl->impl_node.iface = SPA_INTERFACE_INIT( impl->impl_node.iface = SPA_INTERFACE_INIT(
@ -1595,7 +1576,7 @@ pw_filter_connect(struct pw_filter *filter,
impl->info.max_input_ports = UINT32_MAX; impl->info.max_input_ports = UINT32_MAX;
impl->info.max_output_ports = UINT32_MAX; impl->info.max_output_ports = UINT32_MAX;
impl->info.flags = SPA_NODE_FLAG_RT; impl->info.flags = SPA_NODE_FLAG_RT;
if (!impl->process_rt || SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_ASYNC)) if (SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_ASYNC))
impl->info.flags |= SPA_NODE_FLAG_ASYNC; impl->info.flags |= SPA_NODE_FLAG_ASYNC;
impl->info.props = &filter->properties->dict; impl->info.props = &filter->properties->dict;
impl->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, 0); impl->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, 0);
@ -1615,6 +1596,10 @@ pw_filter_connect(struct pw_filter *filter,
impl->draining = false; impl->draining = false;
filter_set_state(filter, PW_FILTER_STATE_CONNECTING, 0, NULL); filter_set_state(filter, PW_FILTER_STATE_CONNECTING, 0, NULL);
if (!SPA_FLAG_IS_SET(flags, PW_FILTER_FLAG_RT_PROCESS)) {
pw_properties_set(filter->properties, PW_KEY_NODE_DATA_LOOP, "main-loop.*");
pw_properties_set(filter->properties, PW_KEY_NODE_ASYNC, "true");
}
if (flags & PW_FILTER_FLAG_DRIVER) if (flags & PW_FILTER_FLAG_DRIVER)
pw_properties_set(filter->properties, PW_KEY_NODE_DRIVER, "true"); pw_properties_set(filter->properties, PW_KEY_NODE_DRIVER, "true");
if (flags & PW_FILTER_FLAG_TRIGGER) { if (flags & PW_FILTER_FLAG_TRIGGER) {

View file

@ -1452,6 +1452,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
{ {
struct impl *impl; struct impl *impl;
struct pw_impl_node *this; struct pw_impl_node *this;
const char *str;
size_t size; size_t size;
int res; int res;
@ -1467,12 +1468,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
this = &impl->this; this = &impl->this;
this->context = context; this->context = context;
this->name = strdup("node"); this->name = strdup("node");
this->source.fd = -1;
this->data_loop = pw_context_get_data_loop(context)->loop;
this->data_system = this->data_loop->system;
if (user_data_size > 0)
this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
if (properties == NULL) if (properties == NULL)
properties = pw_properties_new(NULL, NULL); properties = pw_properties_new(NULL, NULL);
@ -1481,6 +1477,21 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
goto error_clean; goto error_clean;
} }
if ((str = pw_properties_get(properties, PW_KEY_NODE_DATA_LOOP)) == NULL)
str = "data-loop.*";
this->data_loop = pw_context_find_loop(context, str);
if (this->data_loop == NULL) {
pw_log_error("unknown data-loop name '%s'", str);
res = -ENOENT;
goto error_clean;
}
this->data_system = this->data_loop->system;
if (user_data_size > 0)
this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
this->properties = properties; this->properties = properties;
/* the eventfd used to signal the node */ /* the eventfd used to signal the node */
@ -1554,6 +1565,7 @@ error_clean:
pw_memblock_unref(this->activation); pw_memblock_unref(this->activation);
if (this->source.fd != -1) if (this->source.fd != -1)
spa_system_close(this->data_system, this->source.fd); spa_system_close(this->data_system, this->source.fd);
free(this->name);
free(impl); free(impl);
error_exit: error_exit:
pw_properties_free(properties); pw_properties_free(properties);

View file

@ -279,9 +279,9 @@ static int tee_process(void *object)
struct spa_io_buffers *io = &this->rt.io; struct spa_io_buffers *io = &this->rt.io;
uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1; uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1;
pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id); pw_log_trace_fp("%p: tee input status:%d id:%d cycle:%d", this, io->status, io->buffer_id, cycle);
spa_list_for_each(mix, &impl->mix_list, rt_link) { spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: port %d %p->%p %d", this, pw_log_trace_fp("%p: port %d %p->%p id:%d", this,
mix->port.port_id, io, mix->io[cycle], mix->io[cycle]->buffer_id); mix->port.port_id, io, mix->io[cycle], mix->io[cycle]->buffer_id);
*mix->io[cycle] = *io; *mix->io[cycle] = *io;
} }
@ -321,9 +321,9 @@ static int schedule_mix_input(void *object)
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
spa_list_for_each(mix, &impl->mix_list, rt_link) { spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this, pw_log_trace_fp("%p: mix input %d %p->%p status:%d id:%d cycle:%d", this,
mix->port.port_id, mix->io[cycle], io, mix->port.port_id, mix->io[cycle], io,
mix->io[cycle]->status, mix->io[cycle]->buffer_id); mix->io[cycle]->status, mix->io[cycle]->buffer_id, cycle);
*io = *mix->io[cycle]; *io = *mix->io[cycle];
mix->io[cycle]->status = SPA_STATUS_NEED_DATA; mix->io[cycle]->status = SPA_STATUS_NEED_DATA;
break; break;

View file

@ -183,6 +183,7 @@ extern "C" {
#define PW_KEY_NODE_TRANSPORT_SYNC "node.transport.sync" /**< the node handles transport sync */ #define PW_KEY_NODE_TRANSPORT_SYNC "node.transport.sync" /**< the node handles transport sync */
#define PW_KEY_NODE_DRIVER "node.driver" /**< node can drive the graph */ #define PW_KEY_NODE_DRIVER "node.driver" /**< node can drive the graph */
#define PW_KEY_NODE_ASYNC "node.async" /**< the node wants async scheduling */ #define PW_KEY_NODE_ASYNC "node.async" /**< the node wants async scheduling */
#define PW_KEY_NODE_DATA_LOOP "node.data-loop" /**< the data loops to run in */
#define PW_KEY_NODE_STREAM "node.stream" /**< node is a stream, the server side should #define PW_KEY_NODE_STREAM "node.stream" /**< node is a stream, the server side should
* add a converter */ * add a converter */
#define PW_KEY_NODE_VIRTUAL "node.virtual" /**< the node is some sort of virtual #define PW_KEY_NODE_VIRTUAL "node.virtual" /**< the node is some sort of virtual

View file

@ -1180,6 +1180,7 @@ int pw_proxy_init(struct pw_proxy *proxy, struct pw_core *core, const char *type
void pw_proxy_remove(struct pw_proxy *proxy); void pw_proxy_remove(struct pw_proxy *proxy);
int pw_context_recalc_graph(struct pw_context *context, const char *reason); int pw_context_recalc_graph(struct pw_context *context, const char *reason);
struct pw_loop *pw_context_find_loop(struct pw_context *context, const char *name);
void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info); void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info);

View file

@ -147,7 +147,6 @@ struct stream {
unsigned int drained:1; unsigned int drained:1;
unsigned int allow_mlock:1; unsigned int allow_mlock:1;
unsigned int warn_mlock:1; unsigned int warn_mlock:1;
unsigned int process_rt:1;
unsigned int using_trigger:1; unsigned int using_trigger:1;
unsigned int trigger:1; unsigned int trigger:1;
unsigned int early_process:1; unsigned int early_process:1;
@ -432,31 +431,14 @@ static inline uint32_t update_requested(struct stream *impl)
return buffer->this.requested > 0 ? 1 : 0; return buffer->this.requested > 0 ? 1 : 0;
} }
static int
do_call_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
struct pw_stream *stream = &impl->this;
pw_log_trace_fp("%p: do process", stream);
if (!impl->disconnecting)
pw_stream_emit_process(stream);
return 0;
}
static inline void call_process(struct stream *impl) static inline void call_process(struct stream *impl)
{ {
pw_log_trace_fp("%p: call process rt:%u buffers:%d", impl, impl->process_rt, impl->n_buffers); pw_log_trace_fp("%p: call process buffers:%d", impl, impl->n_buffers);
if (impl->n_buffers == 0 || if (impl->n_buffers == 0 ||
(impl->direction == SPA_DIRECTION_OUTPUT && update_requested(impl) <= 0)) (impl->direction == SPA_DIRECTION_OUTPUT && update_requested(impl) <= 0))
return; return;
if (impl->process_rt) { if (impl->rt_callbacks.funcs)
if (impl->rt_callbacks.funcs) spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0);
spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0);
} else {
pw_loop_invoke(impl->main_loop,
do_call_process, 1, NULL, 0, false, impl);
}
} }
static int static int
@ -689,11 +671,6 @@ static int impl_send_command(void *object, const struct spa_command *command)
if (impl->io != NULL) if (impl->io != NULL)
impl->io->status = SPA_STATUS_NEED_DATA; impl->io->status = SPA_STATUS_NEED_DATA;
} }
else {
copy_position(impl, impl->queued.incount);
if (!impl->process_rt && !stream->node->driving)
call_process(impl);
}
stream_set_state(stream, PW_STREAM_STATE_STREAMING, 0, NULL); stream_set_state(stream, PW_STREAM_STATE_STREAMING, 0, NULL);
} }
break; break;
@ -1089,16 +1066,6 @@ again:
impl->drained = false; impl->drained = false;
io->buffer_id = b->id; io->buffer_id = b->id;
res = io->status = SPA_STATUS_HAVE_DATA; res = io->status = SPA_STATUS_HAVE_DATA;
/* we have a buffer, if we are not rt and don't follow
* any rate matching and there are no more
* buffers queued and there is a buffer to dequeue, ask for
* more buffers so that we have one in the next round.
* If we are using rate matching we need to wait until the
* rate matching node (audioconvert) has been scheduled to
* update the values. */
ask_more = !impl->process_rt && impl->rate_match == NULL &&
(impl->early_process || queue_is_empty(impl, &impl->queued)) &&
!queue_is_empty(impl, &impl->dequeued);
pw_log_trace_fp("%p: pop %d %p ask_more:%u %p", stream, b->id, io, pw_log_trace_fp("%p: pop %d %p ask_more:%u %p", stream, b->id, io,
ask_more, impl->rate_match); ask_more, impl->rate_match);
} else if (impl->draining || impl->drained) { } else if (impl->draining || impl->drained) {
@ -1113,25 +1080,16 @@ again:
pw_log_trace_fp("%p: no more buffers %p", stream, io); pw_log_trace_fp("%p: no more buffers %p", stream, io);
ask_more = true; ask_more = true;
} }
} else {
ask_more = !impl->process_rt &&
(impl->early_process || queue_is_empty(impl, &impl->queued)) &&
!queue_is_empty(impl, &impl->dequeued);
} }
copy_position(impl, impl->queued.outcount); copy_position(impl, impl->queued.outcount);
if (!impl->draining && !stream->node->driving) { if (!impl->draining && !stream->node->driving && ask_more) {
/* we're not draining, not a driver check if we need to get /* we're not draining, not a driver check if we need to get
* more buffers */ * more buffers */
if (ask_more) { call_process(impl);
call_process(impl); if (impl->draining || !queue_is_empty(impl, &impl->queued))
/* realtime, we can try again now if there is something. goto again;
* non-realtime, we will have to try in the next round */
if (impl->process_rt &&
(impl->draining || !queue_is_empty(impl, &impl->queued)))
goto again;
}
} }
pw_log_trace_fp("%p: res %d", stream, res); pw_log_trace_fp("%p: res %d", stream, res);
@ -1911,7 +1869,6 @@ pw_stream_connect(struct pw_stream *stream,
else else
impl->node_methods.process = impl_node_process_output; impl->node_methods.process = impl_node_process_output;
impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
impl->trigger_done_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_TRIGGER_DONE); impl->trigger_done_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_TRIGGER_DONE);
impl->early_process = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_EARLY_PROCESS); impl->early_process = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_EARLY_PROCESS);
@ -1936,9 +1893,7 @@ pw_stream_connect(struct pw_stream *stream,
/* we're always RT safe, if the stream was marked RT_PROCESS, /* we're always RT safe, if the stream was marked RT_PROCESS,
* the callback must be RT safe */ * the callback must be RT safe */
impl->info.flags = SPA_NODE_FLAG_RT; impl->info.flags = SPA_NODE_FLAG_RT;
/* if the callback was not marked RT_PROCESS, we will offload if (SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ASYNC))
* the process callback in the main thread and we are ASYNC */
if (!impl->process_rt || SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ASYNC))
impl->info.flags |= SPA_NODE_FLAG_ASYNC; impl->info.flags |= SPA_NODE_FLAG_ASYNC;
impl->info.props = &stream->properties->dict; impl->info.props = &stream->properties->dict;
impl->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, 0); impl->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, 0);
@ -2001,6 +1956,10 @@ pw_stream_connect(struct pw_stream *stream,
if (pw_properties_get(stream->properties, PW_KEY_NODE_DONT_RECONNECT) == NULL) if (pw_properties_get(stream->properties, PW_KEY_NODE_DONT_RECONNECT) == NULL)
pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true"); pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true");
if (!SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS)) {
pw_properties_set(stream->properties, PW_KEY_NODE_DATA_LOOP, "main-loop.*");
pw_properties_set(stream->properties, PW_KEY_NODE_ASYNC, "true");
}
if (flags & PW_STREAM_FLAG_DRIVER) if (flags & PW_STREAM_FLAG_DRIVER)
pw_properties_set(stream->properties, PW_KEY_NODE_DRIVER, "true"); pw_properties_set(stream->properties, PW_KEY_NODE_DRIVER, "true");
if (flags & PW_STREAM_FLAG_TRIGGER) { if (flags & PW_STREAM_FLAG_TRIGGER) {
@ -2541,8 +2500,7 @@ do_trigger_driver(struct spa_loop *loop,
struct stream *impl = user_data; struct stream *impl = user_data;
int res; int res;
if (impl->direction == SPA_DIRECTION_OUTPUT) { if (impl->direction == SPA_DIRECTION_OUTPUT) {
if (impl->process_rt) call_process(impl);
call_process(impl);
res = impl->node_methods.process(impl); res = impl->node_methods.process(impl);
} else { } else {
res = SPA_STATUS_NEED_DATA; res = SPA_STATUS_NEED_DATA;
@ -2578,8 +2536,6 @@ int pw_stream_trigger_process(struct pw_stream *stream)
if (impl->trigger) { if (impl->trigger) {
pw_impl_node_trigger(stream->node); pw_impl_node_trigger(stream->node);
} else if (stream->node->driving) { } else if (stream->node->driving) {
if (!impl->process_rt)
call_process(impl);
res = pw_loop_invoke(impl->data_loop, res = pw_loop_invoke(impl->data_loop,
do_trigger_driver, 1, NULL, 0, false, impl); do_trigger_driver, 1, NULL, 0, false, impl);
} else { } else {