From 2352ae33c31803506452f83711ce2e5c2135903b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 17 May 2022 17:21:22 +0200 Subject: [PATCH] audioconvert2: add queueing Use in and out offset to read/write partial input/output. --- spa/plugins/audioconvert/audioconvert2.c | 115 +++++++++++++---------- 1 file changed, 66 insertions(+), 49 deletions(-) diff --git a/spa/plugins/audioconvert/audioconvert2.c b/spa/plugins/audioconvert/audioconvert2.c index 9f69c4293..7427767ac 100644 --- a/spa/plugins/audioconvert/audioconvert2.c +++ b/spa/plugins/audioconvert/audioconvert2.c @@ -206,6 +206,8 @@ struct impl { struct volume volume; double rate_scale; + uint32_t in_offset; + uint32_t out_offset; unsigned int started:1; unsigned int peaks:1; unsigned int is_passthrough:1; @@ -1976,20 +1978,20 @@ static int impl_node_process(void *object) const void *src_datas[MAX_PORTS], **in_datas; void *dst_datas[MAX_PORTS], *mon_datas[MAX_PORTS], **out_datas; uint32_t i, j, n_src_datas = 0, n_dst_datas = 0, n_mon_datas = 0, remap; - uint32_t n_samples, max_mon, max_out, quant_samples, n_empty; + uint32_t n_samples, max_in, max_mon, max_out, quant_samples, n_empty; struct port *port; struct buffer *buf, *out_bufs[MAX_PORTS]; struct spa_data *bd; struct dir *dir; int tmp = 0, res; bool in_passthrough, mix_passthrough, resample_passthrough, out_passthrough, end_passthrough; - bool flush_out, draining = false; + bool flush_in = false, flush_out = false, draining = false; struct spa_io_buffers *io; dir = &this->dir[SPA_DIRECTION_INPUT]; in_passthrough = dir->conv.is_passthrough; - n_samples = n_empty = UINT32_MAX; + n_samples = n_empty = max_in = UINT32_MAX; /* collect input port data */ for (i = 0; i < dir->n_ports; i++) { @@ -2002,7 +2004,7 @@ static int impl_node_process(void *object) } else if (io->status != SPA_STATUS_HAVE_DATA) { if (io->status & SPA_STATUS_DRAINED) { spa_log_debug(this->log, "%p: port %d drained", this, port->id); - draining = true; + flush_in = draining = true; } else { spa_log_trace_fp(this->log, "%p: empty input port %d %p %d %d %d", this, port->id, io, io->status, io->buffer_id, @@ -2026,6 +2028,7 @@ static int impl_node_process(void *object) spa_log_trace_fp(this->log, "%p: empty input %d->%d", this, i * port->blocks + j, remap); n_empty = SPA_MIN(n_empty, this->empty_size / port->stride); + max_in = SPA_MIN(max_in, this->empty_size / port->stride); } } else { for (j = 0; j < port->blocks; j++) { @@ -2036,12 +2039,16 @@ static int impl_node_process(void *object) offs = SPA_MIN(bd->chunk->offset, bd->maxsize); size = SPA_MIN(bd->maxsize - offs, bd->chunk->size); + max_in = SPA_MIN(max_in, size / port->stride); + + offs = SPA_MIN(offs + this->in_offset * port->stride, bd->maxsize); + size = SPA_MIN(bd->maxsize - offs, bd->chunk->size); src_datas[remap] = SPA_PTROFF(bd->data, offs, void); n_samples = SPA_MIN(n_samples, size / port->stride); - spa_log_trace_fp(this->log, "%p: input %d:%d %d %d->%d", this, - offs, size, n_samples, + spa_log_trace_fp(this->log, "%p: input %d:%d:%d %d %d %d->%d", this, + offs, size, port->stride, max_in, n_samples, i * port->blocks + j, remap); } } @@ -2123,15 +2130,17 @@ static int impl_node_process(void *object) bd->chunk->size = 0; if (port->is_monitor) { remap = n_mon_datas++; - mon_datas[remap] = bd->data; + mon_datas[remap] = SPA_PTROFF(bd->data, + this->out_offset * port->stride, void); max_mon = SPA_MIN(max_mon, bd->maxsize / port->stride); } else { remap = dir->dst_remap[n_dst_datas++]; - dst_datas[remap] = bd->data; + dst_datas[remap] = SPA_PTROFF(bd->data, + this->out_offset * port->stride, void); max_out = SPA_MIN(max_out, bd->maxsize / port->stride); } - spa_log_trace_fp(this->log, "%p: output %d:%d %d %d->%d", this, - max_mon, max_out, n_samples, + spa_log_trace_fp(this->log, "%p: output %d:%d %d %d %d->%d", this, + max_mon, max_out, this->out_offset, n_samples, i * port->blocks + j, remap); } } @@ -2160,7 +2169,7 @@ static int impl_node_process(void *object) if (this->direction == SPA_DIRECTION_INPUT) { /* in split mode we need to output exactly the size of the * duration so we don't try to flush early */ - max_out = SPA_MIN(max_out, quant_samples); + n_samples = max_out = SPA_MIN(max_out, quant_samples); flush_out = false; } else { /* in merge mode we consume one duration of samples and @@ -2191,6 +2200,7 @@ static int impl_node_process(void *object) channelmix_process(&this->mix, out_datas, in_datas, n_samples); } + this->in_offset += n_samples; if (!resample_passthrough) { uint32_t in_len, out_len; @@ -2206,10 +2216,9 @@ static int impl_node_process(void *object) spa_log_trace_fp(this->log, "%p: resample %d->%d %d->%d %d", this, n_samples, max_out, in_len, out_len, out_passthrough); n_samples = out_len; - } else { - n_samples = max_out; } resample_update_rate_match(this, resample_passthrough, max_out, 0); + this->out_offset += n_samples; if (!out_passthrough) { in_datas = (const void**)out_datas; @@ -2217,45 +2226,53 @@ static int impl_node_process(void *object) convert_process(&this->dir[SPA_DIRECTION_OUTPUT].conv, dst_datas, in_datas, n_samples); } - /* return input buffers */ - dir = &this->dir[SPA_DIRECTION_INPUT]; - for (i = 0; i < dir->n_ports; i++) { - port = GET_IN_PORT(this, i); - if ((io = port->io) != NULL) - io->status = SPA_STATUS_NEED_DATA; - } - res = SPA_STATUS_NEED_DATA; - - /* queue output buffers */ - dir = &this->dir[SPA_DIRECTION_OUTPUT]; - for (i = 0; i < dir->n_ports; i++) { - port = GET_OUT_PORT(this, i); - buf = out_bufs[i]; - - if (SPA_UNLIKELY((io = port->io) == NULL)) - continue; - - io->status = SPA_STATUS_HAVE_DATA; - io->buffer_id = buf ? buf->id : SPA_ID_INVALID; - - if (buf == NULL) - continue; - - dequeue_buffer(this, port, buf); - - for (j = 0; j < port->blocks; j++) { - bd = &buf->buf->datas[j]; - if (port->is_monitor) - bd->chunk->size = max_mon * port->stride; - else - bd->chunk->size = n_samples * port->stride; - - spa_log_trace_fp(this->log, "out: %d %d %d", max_mon, n_samples, bd->chunk->size); + spa_log_trace_fp(this->log, "%d/%d %d/%d", this->in_offset, max_in, this->out_offset, max_out); + res = 0; + if (this->in_offset >= max_in || flush_in) { + /* return input buffers */ + dir = &this->dir[SPA_DIRECTION_INPUT]; + for (i = 0; i < dir->n_ports; i++) { + port = GET_IN_PORT(this, i); + if ((io = port->io) != NULL) { + spa_log_trace_fp(this->log, "return: input %d %d", port->id, io->buffer_id); + io->status = SPA_STATUS_NEED_DATA; + } } + this->in_offset = 0; + res |= SPA_STATUS_NEED_DATA; } - res |= SPA_STATUS_HAVE_DATA; - this->drained = draining; + if (this->out_offset > 0 && (this->out_offset >= max_out || flush_out)) { + /* queue output buffers */ + dir = &this->dir[SPA_DIRECTION_OUTPUT]; + for (i = 0; i < dir->n_ports; i++) { + port = GET_OUT_PORT(this, i); + buf = out_bufs[i]; + + if (SPA_UNLIKELY((io = port->io) == NULL)) + continue; + + if (buf == NULL) + continue; + + io->status = SPA_STATUS_HAVE_DATA; + io->buffer_id = buf->id; + + for (j = 0; j < port->blocks; j++) { + bd = &buf->buf->datas[j]; + if (port->is_monitor) + bd->chunk->size = max_mon * port->stride; + else + bd->chunk->size = n_samples * port->stride; + + spa_log_trace_fp(this->log, "out: %d %d %d", max_mon, n_samples, bd->chunk->size); + } + dequeue_buffer(this, port, buf); + } + res |= SPA_STATUS_HAVE_DATA; + this->drained = draining; + this->out_offset = 0; + } return res; }