From d728076d331a2692acae5357ff61a1b21e7c55c0 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 17 May 2022 13:03:06 +0200 Subject: [PATCH] audioconvert2: handle drain --- spa/plugins/audioconvert/audioconvert2.c | 86 +++++++++++++++--------- 1 file changed, 56 insertions(+), 30 deletions(-) diff --git a/spa/plugins/audioconvert/audioconvert2.c b/spa/plugins/audioconvert/audioconvert2.c index a46d5772f..9f69c4293 100644 --- a/spa/plugins/audioconvert/audioconvert2.c +++ b/spa/plugins/audioconvert/audioconvert2.c @@ -209,6 +209,7 @@ struct impl { unsigned int started:1; unsigned int peaks:1; unsigned int is_passthrough:1; + unsigned int drained:1; uint32_t empty_size; float *empty; @@ -1974,22 +1975,21 @@ static int impl_node_process(void *object) struct impl *this = object; const void *src_datas[MAX_PORTS], **in_datas; void *dst_datas[MAX_PORTS], *mon_datas[MAX_PORTS], **out_datas; - uint32_t i, j, n_src_datas = 0, n_dst_datas = 0, n_mon_datas = 0; - uint32_t n_samples, max_mon, max_out, quant_samples; + uint32_t i, j, n_src_datas = 0, n_dst_datas = 0, n_mon_datas = 0, remap; + uint32_t n_samples, max_mon, max_out, quant_samples, n_empty; struct port *port; struct buffer *buf, *out_bufs[MAX_PORTS]; struct spa_data *bd; struct dir *dir; - int tmp = 0; + int tmp = 0, res; bool in_passthrough, mix_passthrough, resample_passthrough, out_passthrough, end_passthrough; - bool flush_out; - uint32_t in_len, out_len, remap; + bool flush_out, draining = false; struct spa_io_buffers *io; dir = &this->dir[SPA_DIRECTION_INPUT]; in_passthrough = dir->conv.is_passthrough; - n_samples = UINT32_MAX; + n_samples = n_empty = UINT32_MAX; /* collect input port data */ for (i = 0; i < dir->n_ports; i++) { @@ -1999,11 +1999,21 @@ static int impl_node_process(void *object) spa_log_trace_fp(this->log, "%p: no io on input port %d", this, port->id); buf = NULL; - } else if (io->status != SPA_STATUS_HAVE_DATA || - io->buffer_id >= port->n_buffers) { - spa_log_trace_fp(this->log, "%p: empty input port %d %p %d %d %d", + } else if (io->status != SPA_STATUS_HAVE_DATA) { + if (io->status & SPA_STATUS_DRAINED) { + spa_log_debug(this->log, "%p: port %d drained", this, port->id); + draining = true; + } else { + spa_log_trace_fp(this->log, "%p: empty input port %d %p %d %d %d", + this, port->id, io, io->status, io->buffer_id, + port->n_buffers); + } + buf = NULL; + } else if (io->buffer_id >= port->n_buffers) { + spa_log_trace_fp(this->log, "%p: invalid input buffer port %d %p %d %d %d", this, port->id, io, io->status, io->buffer_id, port->n_buffers); + io->status = -EINVAL; buf = NULL; } else { buf = &port->buffers[io->buffer_id]; @@ -2013,8 +2023,9 @@ static int impl_node_process(void *object) for (j = 0; j < port->blocks; j++) { remap = dir->src_remap[n_src_datas++]; src_datas[remap] = SPA_PTR_ALIGN(this->empty, MAX_ALIGN, void); - spa_log_trace_fp(this->log, "%p: %d->%d", this, + spa_log_trace_fp(this->log, "%p: empty input %d->%d", this, i * port->blocks + j, remap); + n_empty = SPA_MIN(n_empty, this->empty_size / port->stride); } } else { for (j = 0; j < port->blocks; j++) { @@ -2029,7 +2040,7 @@ static int impl_node_process(void *object) src_datas[remap] = SPA_PTROFF(bd->data, offs, void); n_samples = SPA_MIN(n_samples, size / port->stride); - spa_log_trace_fp(this->log, "%p: %d:%d %d %d->%d", this, + spa_log_trace_fp(this->log, "%p: input %d:%d %d %d->%d", this, offs, size, n_samples, i * port->blocks + j, remap); } @@ -2060,11 +2071,15 @@ static int impl_node_process(void *object) else quant_samples = this->quantum_limit; + if (draining) + n_samples = SPA_MIN(n_empty, this->quantum_limit); + resample_passthrough = resample_is_passthrough(this); - if (n_samples == UINT32_MAX) { + if (n_samples == UINT32_MAX || this->drained) { + spa_log_debug(this->log, "%p: %d %d", this, n_samples, this->drained); /* no input, ask for more */ resample_update_rate_match(this, resample_passthrough, quant_samples, 0); - return SPA_STATUS_NEED_DATA; + return this->drained ? SPA_STATUS_DRAINED : SPA_STATUS_NEED_DATA; } dir = &this->dir[SPA_DIRECTION_OUTPUT]; @@ -2091,12 +2106,13 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(buf == NULL)) { for (j = 0; j < port->blocks; j++) { if (port->is_monitor) { - mon_datas[n_mon_datas++] = NULL; + remap = n_mon_datas++; + mon_datas[remap] = NULL; } else { remap = dir->dst_remap[n_dst_datas++]; dst_datas[remap] = SPA_PTR_ALIGN(this->scratch, MAX_ALIGN, void); } - spa_log_trace_fp(this->log, "%p: %d->%d %d", this, + spa_log_trace_fp(this->log, "%p: empty output %d->%d %d", this, i * port->blocks + j, remap, port->is_monitor); } } else { @@ -2106,14 +2122,15 @@ static int impl_node_process(void *object) bd->chunk->offset = 0; bd->chunk->size = 0; if (port->is_monitor) { - mon_datas[n_mon_datas++] = bd->data; + remap = n_mon_datas++; + mon_datas[remap] = bd->data; max_mon = SPA_MIN(max_mon, bd->maxsize / port->stride); } else { remap = dir->dst_remap[n_dst_datas++]; dst_datas[remap] = bd->data; max_out = SPA_MIN(max_out, bd->maxsize / port->stride); } - spa_log_trace_fp(this->log, "%p: %d:%d %d %d->%d", this, + spa_log_trace_fp(this->log, "%p: output %d:%d %d %d->%d", this, max_mon, max_out, n_samples, i * port->blocks + j, remap); } @@ -2152,30 +2169,32 @@ static int impl_node_process(void *object) flush_out = true; } - in_datas = (const void**)src_datas; if (!in_passthrough || end_passthrough) { if (end_passthrough) out_datas = (void **)dst_datas; else out_datas = (void **)this->tmp_datas[(tmp++) & 1]; - convert_process(&this->dir[SPA_DIRECTION_INPUT].conv, out_datas, in_datas, n_samples); + spa_log_trace_fp(this->log, "%p: convert %d %d", this, n_samples, end_passthrough); + convert_process(&this->dir[SPA_DIRECTION_INPUT].conv, out_datas, src_datas, n_samples); } else { - out_datas = (void **)in_datas; + out_datas = (void **)src_datas; } - in_datas = (const void**)out_datas; if (!mix_passthrough) { + in_datas = (const void**)out_datas; if (resample_passthrough && out_passthrough) out_datas = (void **)dst_datas; else out_datas = (void **)this->tmp_datas[(tmp++) & 1]; + spa_log_trace_fp(this->log, "%p: channelmix %d %d %d", this, n_samples, + resample_passthrough, out_passthrough); channelmix_process(&this->mix, out_datas, in_datas, n_samples); - } else { - out_datas = (void **)in_datas; } - in_datas = (const void**)out_datas; if (!resample_passthrough) { + uint32_t in_len, out_len; + + in_datas = (const void**)out_datas; if (out_passthrough) out_datas = (void **)dst_datas; else @@ -2184,16 +2203,19 @@ static int impl_node_process(void *object) in_len = n_samples; out_len = max_out; resample_process(&this->resample, in_datas, &in_len, out_datas, &out_len); + spa_log_trace_fp(this->log, "%p: resample %d->%d %d->%d %d", this, + n_samples, max_out, in_len, out_len, out_passthrough); + n_samples = out_len; } else { - out_datas = (void **)in_datas; - out_len = n_samples; + n_samples = max_out; } resample_update_rate_match(this, resample_passthrough, max_out, 0); - n_samples = out_len; - in_datas = (const void **)out_datas; - if (!out_passthrough) + if (!out_passthrough) { + in_datas = (const void**)out_datas; + spa_log_trace_fp(this->log, "%p: convert %d", this, n_samples); convert_process(&this->dir[SPA_DIRECTION_OUTPUT].conv, dst_datas, in_datas, n_samples); + } /* return input buffers */ dir = &this->dir[SPA_DIRECTION_INPUT]; @@ -2202,6 +2224,7 @@ static int impl_node_process(void *object) if ((io = port->io) != NULL) io->status = SPA_STATUS_NEED_DATA; } + res = SPA_STATUS_NEED_DATA; /* queue output buffers */ dir = &this->dir[SPA_DIRECTION_OUTPUT]; @@ -2230,7 +2253,10 @@ static int impl_node_process(void *object) spa_log_trace_fp(this->log, "out: %d %d %d", max_mon, n_samples, bd->chunk->size); } } - return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA; + res |= SPA_STATUS_HAVE_DATA; + this->drained = draining; + + return res; } static const struct spa_node_methods impl_node = {