diff --git a/spa/include/spa/node/node.h b/spa/include/spa/node/node.h index f9c48e874..d7149905a 100644 --- a/spa/include/spa/node/node.h +++ b/spa/include/spa/node/node.h @@ -67,6 +67,10 @@ struct spa_node_info { * PortConfig parameter */ #define SPA_NODE_FLAG_NEED_CONFIGURE (1u<<5) /**< node needs configuration before it can * be started. */ +#define SPA_NODE_FLAG_ASYNC (1u<<6) /**< the process function might not + * immediateley produce or consume data + * but might offload the work to a worker + * thread. */ uint64_t flags; struct spa_dict *props; /**< extra node properties */ struct spa_param_info *params; /**< parameter information */ diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index 8f4cc922e..d75793cfa 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -83,6 +83,7 @@ struct impl { unsigned int have_format:1; unsigned int started:1; unsigned int driver:1; + unsigned int async:1; }; /** \endcond */ @@ -611,6 +612,8 @@ static void follower_info(void *data, const struct spa_node_info *info) { struct impl *this = data; + this->async = (info->flags & SPA_NODE_FLAG_ASYNC) != 0; + if (info->max_input_ports > 0) this->direction = SPA_DIRECTION_INPUT; else @@ -898,19 +901,18 @@ static int impl_node_process(void *object) /* an input node (sink). * First we run the converter to process the input for the follower * then if it produced data, we run the follower. */ - status = SPA_STATUS_HAVE_DATA; - do { - if (this->convert) { - status = spa_node_process(this->convert); - if (status <= 0) - status = SPA_STATUS_HAVE_DATA; - } + while (true) { + status = this->convert ? spa_node_process(this->convert) : 0; + /* schedule the follower when the converter needed + * a recycled buffer */ + if (status == -EPIPE || status == 0) + status = SPA_STATUS_HAVE_DATA; + else if (status < 0) + break; + if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) { /* as long as the converter produced something or - * is drained, process the follower. Also schedule - * the follower when the converter was in error - * because the follower might first need to recycle a - * buffer to the converter */ + * is drained, process the follower. */ fstatus = spa_node_process(this->follower); /* if the follower doesn't need more data or is * drained we can stop */ @@ -921,22 +923,28 @@ static int impl_node_process(void *object) /* the converter needs more data */ if ((status & SPA_STATUS_NEED_DATA)) break; - } while (status > 0); - } - - if (this->direction == SPA_DIRECTION_OUTPUT && - !this->driver && this->convert) { - status = SPA_STATUS_NEED_DATA; - do { + } + } else if (!this->driver) { + bool done; + while (true) { /* output node (source). First run the converter to make * sure we push out any queued data. Then when it needs * more data, schedule the follower. */ - if (this->convert) { - status = spa_node_process(this->convert); - if (status <= 0) - status = SPA_STATUS_NEED_DATA; - } - if ((status & SPA_STATUS_NEED_DATA)) { + status = this->convert ? spa_node_process(this->convert) : 0; + if (status == 0) + status = SPA_STATUS_NEED_DATA; + else if (status < 0) + break; + + done = (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)); + + /* when not async, we can return the data when we are done. + * In async mode we might first need to wake up the follower + * to asynchronously provide more data for the next round. */ + if (!this->async && done) + break; + + if (status & SPA_STATUS_NEED_DATA) { /* the converter needs more data, schedule the * follower */ fstatus = spa_node_process(this->follower); @@ -945,10 +953,13 @@ static int impl_node_process(void *object) if ((fstatus & SPA_STATUS_HAVE_DATA) == 0) break; } - /* converter produced something or is drained */ - if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) + /* converter produced something or is drained and we + * scheduled the follower above, we can stop now*/ + if (done) break; - } while (status > 0); + } + } else { + status = spa_node_process(this->follower); } spa_log_trace_fp(this->log, "%p: process status:%d", this, status); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 95c2ec825..9ac17a0b6 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -138,7 +138,7 @@ struct stream { unsigned int draining:1; unsigned int drained:1; unsigned int allow_mlock:1; - unsigned int warn_mlock:1; + unsigned int process_rt:1; }; static int get_param_index(uint32_t id) @@ -321,14 +321,12 @@ do_call_process(struct spa_loop *loop, static void call_process(struct stream *impl) { struct pw_stream *stream = &impl->this; - pw_log_trace(NAME" %p: call process", impl); - if (SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) { + pw_log_trace(NAME" %p: call process rt:%u", impl, impl->process_rt); + if (impl->process_rt) pw_stream_emit_process(stream); - } - else { + else pw_loop_invoke(impl->context->main_loop, do_call_process, 1, NULL, 0, false, impl); - } } static int @@ -429,7 +427,13 @@ static void emit_node_info(struct stream *d) info.max_output_ports = 1; } info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS; + /* we're always RT safe, if the stream was marked RT_PROCESS, + * the callback must be RT safe */ info.flags = SPA_NODE_FLAG_RT; + /* if the callback was not marked RT_PROCESS, we will offload + * the process callback in the main thread and we are ASYNC */ + if (!d->process_rt) + info.flags |= SPA_NODE_FLAG_ASYNC; spa_node_emit_info(&d->hooks, &info); } @@ -556,7 +560,7 @@ static int map_data(struct stream *impl, struct spa_data *data, int prot) range.offset, range.size, data->data); if (impl->allow_mlock && mlock(data->data, data->maxsize) < 0) { - pw_log(impl->warn_mlock ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG, + pw_log(impl->process_rt ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG, NAME" %p: Failed to mlock memory %p %u: %s", impl, data->data, data->maxsize, errno == ENOMEM ? @@ -805,12 +809,21 @@ again: copy_position(impl, impl->queued.outcount); if (!impl->draining && - !SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_DRIVER) && - spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0) { - call_process(impl); - if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0 && - io->status == SPA_STATUS_NEED_DATA) - goto again; + !SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_DRIVER)) { + /* we're not draining, not a driver check if we need to get + * more buffers */ + if (!impl->process_rt) { + /* not realtime and we have a free buffer, trigger process so that we have + * data in the next round. */ + if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0) + call_process(impl); + } else if (io->status == SPA_STATUS_NEED_DATA) { + /* realtime and we don't have a buffer, trigger process and try + * again when there is something in the queue now */ + call_process(impl); + if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) + goto again; + } } pw_log_trace(NAME" %p: res %d", stream, res); @@ -1480,8 +1493,8 @@ pw_stream_connect(struct pw_stream *stream, if (flags & PW_STREAM_FLAG_DONT_RECONNECT) pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true"); - impl->warn_mlock = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS); - pw_properties_set(stream->properties, "mem.warn-mlock", impl->warn_mlock ? "true" : "false"); + impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS); + pw_properties_set(stream->properties, "mem.warn-mlock", impl->process_rt ? "true" : "false"); if ((pw_properties_get(stream->properties, PW_KEY_MEDIA_CLASS) == NULL)) { const char *media_type = pw_properties_get(stream->properties, PW_KEY_MEDIA_TYPE); diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index e20db1c47..6ecfed8ad 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -244,7 +244,8 @@ enum pw_stream_flags { PW_STREAM_FLAG_MAP_BUFFERS = (1 << 2), /**< mmap the buffers */ PW_STREAM_FLAG_DRIVER = (1 << 3), /**< be a driver */ PW_STREAM_FLAG_RT_PROCESS = (1 << 4), /**< call process from the realtime - * thread */ + * thread. You MUST use RT safe functions + * in the process callback. */ PW_STREAM_FLAG_NO_CONVERT = (1 << 5), /**< don't convert format */ PW_STREAM_FLAG_EXCLUSIVE = (1 << 6), /**< require exclusive access to the * device */