mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-02 09:01:50 -05:00
node: add an ASYNC flag
The flag means that the process function might not complete synchronously. We can use this knowledge to improve the adapter. In sync mode we can pull before scheduling the converter. In async mode we need to schedule the follower after the converter to get the data ready for the next iteration. We can also improve the stream when it is operating async to schedule a process call when a new buffer is ready to be filled. This reduces a cycle latency from alsa and improves latency in pulse.
This commit is contained in:
parent
9dfd261c71
commit
d2d722efb3
4 changed files with 72 additions and 43 deletions
|
|
@ -67,6 +67,10 @@ struct spa_node_info {
|
|||
* PortConfig parameter */
|
||||
#define SPA_NODE_FLAG_NEED_CONFIGURE (1u<<5) /**< node needs configuration before it can
|
||||
* be started. */
|
||||
#define SPA_NODE_FLAG_ASYNC (1u<<6) /**< the process function might not
|
||||
* immediateley produce or consume data
|
||||
* but might offload the work to a worker
|
||||
* thread. */
|
||||
uint64_t flags;
|
||||
struct spa_dict *props; /**< extra node properties */
|
||||
struct spa_param_info *params; /**< parameter information */
|
||||
|
|
|
|||
|
|
@ -83,6 +83,7 @@ struct impl {
|
|||
unsigned int have_format:1;
|
||||
unsigned int started:1;
|
||||
unsigned int driver:1;
|
||||
unsigned int async:1;
|
||||
};
|
||||
|
||||
/** \endcond */
|
||||
|
|
@ -611,6 +612,8 @@ static void follower_info(void *data, const struct spa_node_info *info)
|
|||
{
|
||||
struct impl *this = data;
|
||||
|
||||
this->async = (info->flags & SPA_NODE_FLAG_ASYNC) != 0;
|
||||
|
||||
if (info->max_input_ports > 0)
|
||||
this->direction = SPA_DIRECTION_INPUT;
|
||||
else
|
||||
|
|
@ -898,19 +901,18 @@ static int impl_node_process(void *object)
|
|||
/* an input node (sink).
|
||||
* First we run the converter to process the input for the follower
|
||||
* then if it produced data, we run the follower. */
|
||||
status = SPA_STATUS_HAVE_DATA;
|
||||
do {
|
||||
if (this->convert) {
|
||||
status = spa_node_process(this->convert);
|
||||
if (status <= 0)
|
||||
status = SPA_STATUS_HAVE_DATA;
|
||||
}
|
||||
while (true) {
|
||||
status = this->convert ? spa_node_process(this->convert) : 0;
|
||||
/* schedule the follower when the converter needed
|
||||
* a recycled buffer */
|
||||
if (status == -EPIPE || status == 0)
|
||||
status = SPA_STATUS_HAVE_DATA;
|
||||
else if (status < 0)
|
||||
break;
|
||||
|
||||
if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) {
|
||||
/* as long as the converter produced something or
|
||||
* is drained, process the follower. Also schedule
|
||||
* the follower when the converter was in error
|
||||
* because the follower might first need to recycle a
|
||||
* buffer to the converter */
|
||||
* is drained, process the follower. */
|
||||
fstatus = spa_node_process(this->follower);
|
||||
/* if the follower doesn't need more data or is
|
||||
* drained we can stop */
|
||||
|
|
@ -921,22 +923,28 @@ static int impl_node_process(void *object)
|
|||
/* the converter needs more data */
|
||||
if ((status & SPA_STATUS_NEED_DATA))
|
||||
break;
|
||||
} while (status > 0);
|
||||
}
|
||||
|
||||
if (this->direction == SPA_DIRECTION_OUTPUT &&
|
||||
!this->driver && this->convert) {
|
||||
status = SPA_STATUS_NEED_DATA;
|
||||
do {
|
||||
}
|
||||
} else if (!this->driver) {
|
||||
bool done;
|
||||
while (true) {
|
||||
/* output node (source). First run the converter to make
|
||||
* sure we push out any queued data. Then when it needs
|
||||
* more data, schedule the follower. */
|
||||
if (this->convert) {
|
||||
status = spa_node_process(this->convert);
|
||||
if (status <= 0)
|
||||
status = SPA_STATUS_NEED_DATA;
|
||||
}
|
||||
if ((status & SPA_STATUS_NEED_DATA)) {
|
||||
status = this->convert ? spa_node_process(this->convert) : 0;
|
||||
if (status == 0)
|
||||
status = SPA_STATUS_NEED_DATA;
|
||||
else if (status < 0)
|
||||
break;
|
||||
|
||||
done = (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED));
|
||||
|
||||
/* when not async, we can return the data when we are done.
|
||||
* In async mode we might first need to wake up the follower
|
||||
* to asynchronously provide more data for the next round. */
|
||||
if (!this->async && done)
|
||||
break;
|
||||
|
||||
if (status & SPA_STATUS_NEED_DATA) {
|
||||
/* the converter needs more data, schedule the
|
||||
* follower */
|
||||
fstatus = spa_node_process(this->follower);
|
||||
|
|
@ -945,10 +953,13 @@ static int impl_node_process(void *object)
|
|||
if ((fstatus & SPA_STATUS_HAVE_DATA) == 0)
|
||||
break;
|
||||
}
|
||||
/* converter produced something or is drained */
|
||||
if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED))
|
||||
/* converter produced something or is drained and we
|
||||
* scheduled the follower above, we can stop now*/
|
||||
if (done)
|
||||
break;
|
||||
} while (status > 0);
|
||||
}
|
||||
} else {
|
||||
status = spa_node_process(this->follower);
|
||||
}
|
||||
spa_log_trace_fp(this->log, "%p: process status:%d", this, status);
|
||||
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ struct stream {
|
|||
unsigned int draining:1;
|
||||
unsigned int drained:1;
|
||||
unsigned int allow_mlock:1;
|
||||
unsigned int warn_mlock:1;
|
||||
unsigned int process_rt:1;
|
||||
};
|
||||
|
||||
static int get_param_index(uint32_t id)
|
||||
|
|
@ -321,14 +321,12 @@ do_call_process(struct spa_loop *loop,
|
|||
static void call_process(struct stream *impl)
|
||||
{
|
||||
struct pw_stream *stream = &impl->this;
|
||||
pw_log_trace(NAME" %p: call process", impl);
|
||||
if (SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) {
|
||||
pw_log_trace(NAME" %p: call process rt:%u", impl, impl->process_rt);
|
||||
if (impl->process_rt)
|
||||
pw_stream_emit_process(stream);
|
||||
}
|
||||
else {
|
||||
else
|
||||
pw_loop_invoke(impl->context->main_loop,
|
||||
do_call_process, 1, NULL, 0, false, impl);
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
|
|
@ -429,7 +427,13 @@ static void emit_node_info(struct stream *d)
|
|||
info.max_output_ports = 1;
|
||||
}
|
||||
info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS;
|
||||
/* we're always RT safe, if the stream was marked RT_PROCESS,
|
||||
* the callback must be RT safe */
|
||||
info.flags = SPA_NODE_FLAG_RT;
|
||||
/* if the callback was not marked RT_PROCESS, we will offload
|
||||
* the process callback in the main thread and we are ASYNC */
|
||||
if (!d->process_rt)
|
||||
info.flags |= SPA_NODE_FLAG_ASYNC;
|
||||
spa_node_emit_info(&d->hooks, &info);
|
||||
}
|
||||
|
||||
|
|
@ -556,7 +560,7 @@ static int map_data(struct stream *impl, struct spa_data *data, int prot)
|
|||
range.offset, range.size, data->data);
|
||||
|
||||
if (impl->allow_mlock && mlock(data->data, data->maxsize) < 0) {
|
||||
pw_log(impl->warn_mlock ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG,
|
||||
pw_log(impl->process_rt ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG,
|
||||
NAME" %p: Failed to mlock memory %p %u: %s", impl,
|
||||
data->data, data->maxsize,
|
||||
errno == ENOMEM ?
|
||||
|
|
@ -805,12 +809,21 @@ again:
|
|||
copy_position(impl, impl->queued.outcount);
|
||||
|
||||
if (!impl->draining &&
|
||||
!SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_DRIVER) &&
|
||||
spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0) {
|
||||
call_process(impl);
|
||||
if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0 &&
|
||||
io->status == SPA_STATUS_NEED_DATA)
|
||||
goto again;
|
||||
!SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_DRIVER)) {
|
||||
/* we're not draining, not a driver check if we need to get
|
||||
* more buffers */
|
||||
if (!impl->process_rt) {
|
||||
/* not realtime and we have a free buffer, trigger process so that we have
|
||||
* data in the next round. */
|
||||
if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0)
|
||||
call_process(impl);
|
||||
} else if (io->status == SPA_STATUS_NEED_DATA) {
|
||||
/* realtime and we don't have a buffer, trigger process and try
|
||||
* again when there is something in the queue now */
|
||||
call_process(impl);
|
||||
if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0)
|
||||
goto again;
|
||||
}
|
||||
}
|
||||
|
||||
pw_log_trace(NAME" %p: res %d", stream, res);
|
||||
|
|
@ -1480,8 +1493,8 @@ pw_stream_connect(struct pw_stream *stream,
|
|||
if (flags & PW_STREAM_FLAG_DONT_RECONNECT)
|
||||
pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true");
|
||||
|
||||
impl->warn_mlock = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
|
||||
pw_properties_set(stream->properties, "mem.warn-mlock", impl->warn_mlock ? "true" : "false");
|
||||
impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
|
||||
pw_properties_set(stream->properties, "mem.warn-mlock", impl->process_rt ? "true" : "false");
|
||||
|
||||
if ((pw_properties_get(stream->properties, PW_KEY_MEDIA_CLASS) == NULL)) {
|
||||
const char *media_type = pw_properties_get(stream->properties, PW_KEY_MEDIA_TYPE);
|
||||
|
|
|
|||
|
|
@ -244,7 +244,8 @@ enum pw_stream_flags {
|
|||
PW_STREAM_FLAG_MAP_BUFFERS = (1 << 2), /**< mmap the buffers */
|
||||
PW_STREAM_FLAG_DRIVER = (1 << 3), /**< be a driver */
|
||||
PW_STREAM_FLAG_RT_PROCESS = (1 << 4), /**< call process from the realtime
|
||||
* thread */
|
||||
* thread. You MUST use RT safe functions
|
||||
* in the process callback. */
|
||||
PW_STREAM_FLAG_NO_CONVERT = (1 << 5), /**< don't convert format */
|
||||
PW_STREAM_FLAG_EXCLUSIVE = (1 << 6), /**< require exclusive access to the
|
||||
* device */
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue