mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-02 09:01:50 -05:00
audioconvert2: add queueing
Use in and out offset to read/write partial input/output.
This commit is contained in:
parent
d728076d33
commit
2352ae33c3
1 changed files with 66 additions and 49 deletions
|
|
@ -206,6 +206,8 @@ struct impl {
|
||||||
struct volume volume;
|
struct volume volume;
|
||||||
double rate_scale;
|
double rate_scale;
|
||||||
|
|
||||||
|
uint32_t in_offset;
|
||||||
|
uint32_t out_offset;
|
||||||
unsigned int started:1;
|
unsigned int started:1;
|
||||||
unsigned int peaks:1;
|
unsigned int peaks:1;
|
||||||
unsigned int is_passthrough:1;
|
unsigned int is_passthrough:1;
|
||||||
|
|
@ -1976,20 +1978,20 @@ static int impl_node_process(void *object)
|
||||||
const void *src_datas[MAX_PORTS], **in_datas;
|
const void *src_datas[MAX_PORTS], **in_datas;
|
||||||
void *dst_datas[MAX_PORTS], *mon_datas[MAX_PORTS], **out_datas;
|
void *dst_datas[MAX_PORTS], *mon_datas[MAX_PORTS], **out_datas;
|
||||||
uint32_t i, j, n_src_datas = 0, n_dst_datas = 0, n_mon_datas = 0, remap;
|
uint32_t i, j, n_src_datas = 0, n_dst_datas = 0, n_mon_datas = 0, remap;
|
||||||
uint32_t n_samples, max_mon, max_out, quant_samples, n_empty;
|
uint32_t n_samples, max_in, max_mon, max_out, quant_samples, n_empty;
|
||||||
struct port *port;
|
struct port *port;
|
||||||
struct buffer *buf, *out_bufs[MAX_PORTS];
|
struct buffer *buf, *out_bufs[MAX_PORTS];
|
||||||
struct spa_data *bd;
|
struct spa_data *bd;
|
||||||
struct dir *dir;
|
struct dir *dir;
|
||||||
int tmp = 0, res;
|
int tmp = 0, res;
|
||||||
bool in_passthrough, mix_passthrough, resample_passthrough, out_passthrough, end_passthrough;
|
bool in_passthrough, mix_passthrough, resample_passthrough, out_passthrough, end_passthrough;
|
||||||
bool flush_out, draining = false;
|
bool flush_in = false, flush_out = false, draining = false;
|
||||||
struct spa_io_buffers *io;
|
struct spa_io_buffers *io;
|
||||||
|
|
||||||
dir = &this->dir[SPA_DIRECTION_INPUT];
|
dir = &this->dir[SPA_DIRECTION_INPUT];
|
||||||
in_passthrough = dir->conv.is_passthrough;
|
in_passthrough = dir->conv.is_passthrough;
|
||||||
|
|
||||||
n_samples = n_empty = UINT32_MAX;
|
n_samples = n_empty = max_in = UINT32_MAX;
|
||||||
|
|
||||||
/* collect input port data */
|
/* collect input port data */
|
||||||
for (i = 0; i < dir->n_ports; i++) {
|
for (i = 0; i < dir->n_ports; i++) {
|
||||||
|
|
@ -2002,7 +2004,7 @@ static int impl_node_process(void *object)
|
||||||
} else if (io->status != SPA_STATUS_HAVE_DATA) {
|
} else if (io->status != SPA_STATUS_HAVE_DATA) {
|
||||||
if (io->status & SPA_STATUS_DRAINED) {
|
if (io->status & SPA_STATUS_DRAINED) {
|
||||||
spa_log_debug(this->log, "%p: port %d drained", this, port->id);
|
spa_log_debug(this->log, "%p: port %d drained", this, port->id);
|
||||||
draining = true;
|
flush_in = draining = true;
|
||||||
} else {
|
} else {
|
||||||
spa_log_trace_fp(this->log, "%p: empty input port %d %p %d %d %d",
|
spa_log_trace_fp(this->log, "%p: empty input port %d %p %d %d %d",
|
||||||
this, port->id, io, io->status, io->buffer_id,
|
this, port->id, io, io->status, io->buffer_id,
|
||||||
|
|
@ -2026,6 +2028,7 @@ static int impl_node_process(void *object)
|
||||||
spa_log_trace_fp(this->log, "%p: empty input %d->%d", this,
|
spa_log_trace_fp(this->log, "%p: empty input %d->%d", this,
|
||||||
i * port->blocks + j, remap);
|
i * port->blocks + j, remap);
|
||||||
n_empty = SPA_MIN(n_empty, this->empty_size / port->stride);
|
n_empty = SPA_MIN(n_empty, this->empty_size / port->stride);
|
||||||
|
max_in = SPA_MIN(max_in, this->empty_size / port->stride);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (j = 0; j < port->blocks; j++) {
|
for (j = 0; j < port->blocks; j++) {
|
||||||
|
|
@ -2036,12 +2039,16 @@ static int impl_node_process(void *object)
|
||||||
|
|
||||||
offs = SPA_MIN(bd->chunk->offset, bd->maxsize);
|
offs = SPA_MIN(bd->chunk->offset, bd->maxsize);
|
||||||
size = SPA_MIN(bd->maxsize - offs, bd->chunk->size);
|
size = SPA_MIN(bd->maxsize - offs, bd->chunk->size);
|
||||||
|
max_in = SPA_MIN(max_in, size / port->stride);
|
||||||
|
|
||||||
|
offs = SPA_MIN(offs + this->in_offset * port->stride, bd->maxsize);
|
||||||
|
size = SPA_MIN(bd->maxsize - offs, bd->chunk->size);
|
||||||
|
|
||||||
src_datas[remap] = SPA_PTROFF(bd->data, offs, void);
|
src_datas[remap] = SPA_PTROFF(bd->data, offs, void);
|
||||||
n_samples = SPA_MIN(n_samples, size / port->stride);
|
n_samples = SPA_MIN(n_samples, size / port->stride);
|
||||||
|
|
||||||
spa_log_trace_fp(this->log, "%p: input %d:%d %d %d->%d", this,
|
spa_log_trace_fp(this->log, "%p: input %d:%d:%d %d %d %d->%d", this,
|
||||||
offs, size, n_samples,
|
offs, size, port->stride, max_in, n_samples,
|
||||||
i * port->blocks + j, remap);
|
i * port->blocks + j, remap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -2123,15 +2130,17 @@ static int impl_node_process(void *object)
|
||||||
bd->chunk->size = 0;
|
bd->chunk->size = 0;
|
||||||
if (port->is_monitor) {
|
if (port->is_monitor) {
|
||||||
remap = n_mon_datas++;
|
remap = n_mon_datas++;
|
||||||
mon_datas[remap] = bd->data;
|
mon_datas[remap] = SPA_PTROFF(bd->data,
|
||||||
|
this->out_offset * port->stride, void);
|
||||||
max_mon = SPA_MIN(max_mon, bd->maxsize / port->stride);
|
max_mon = SPA_MIN(max_mon, bd->maxsize / port->stride);
|
||||||
} else {
|
} else {
|
||||||
remap = dir->dst_remap[n_dst_datas++];
|
remap = dir->dst_remap[n_dst_datas++];
|
||||||
dst_datas[remap] = bd->data;
|
dst_datas[remap] = SPA_PTROFF(bd->data,
|
||||||
|
this->out_offset * port->stride, void);
|
||||||
max_out = SPA_MIN(max_out, bd->maxsize / port->stride);
|
max_out = SPA_MIN(max_out, bd->maxsize / port->stride);
|
||||||
}
|
}
|
||||||
spa_log_trace_fp(this->log, "%p: output %d:%d %d %d->%d", this,
|
spa_log_trace_fp(this->log, "%p: output %d:%d %d %d %d->%d", this,
|
||||||
max_mon, max_out, n_samples,
|
max_mon, max_out, this->out_offset, n_samples,
|
||||||
i * port->blocks + j, remap);
|
i * port->blocks + j, remap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -2160,7 +2169,7 @@ static int impl_node_process(void *object)
|
||||||
if (this->direction == SPA_DIRECTION_INPUT) {
|
if (this->direction == SPA_DIRECTION_INPUT) {
|
||||||
/* in split mode we need to output exactly the size of the
|
/* in split mode we need to output exactly the size of the
|
||||||
* duration so we don't try to flush early */
|
* duration so we don't try to flush early */
|
||||||
max_out = SPA_MIN(max_out, quant_samples);
|
n_samples = max_out = SPA_MIN(max_out, quant_samples);
|
||||||
flush_out = false;
|
flush_out = false;
|
||||||
} else {
|
} else {
|
||||||
/* in merge mode we consume one duration of samples and
|
/* in merge mode we consume one duration of samples and
|
||||||
|
|
@ -2191,6 +2200,7 @@ static int impl_node_process(void *object)
|
||||||
channelmix_process(&this->mix, out_datas, in_datas, n_samples);
|
channelmix_process(&this->mix, out_datas, in_datas, n_samples);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this->in_offset += n_samples;
|
||||||
if (!resample_passthrough) {
|
if (!resample_passthrough) {
|
||||||
uint32_t in_len, out_len;
|
uint32_t in_len, out_len;
|
||||||
|
|
||||||
|
|
@ -2206,10 +2216,9 @@ static int impl_node_process(void *object)
|
||||||
spa_log_trace_fp(this->log, "%p: resample %d->%d %d->%d %d", this,
|
spa_log_trace_fp(this->log, "%p: resample %d->%d %d->%d %d", this,
|
||||||
n_samples, max_out, in_len, out_len, out_passthrough);
|
n_samples, max_out, in_len, out_len, out_passthrough);
|
||||||
n_samples = out_len;
|
n_samples = out_len;
|
||||||
} else {
|
|
||||||
n_samples = max_out;
|
|
||||||
}
|
}
|
||||||
resample_update_rate_match(this, resample_passthrough, max_out, 0);
|
resample_update_rate_match(this, resample_passthrough, max_out, 0);
|
||||||
|
this->out_offset += n_samples;
|
||||||
|
|
||||||
if (!out_passthrough) {
|
if (!out_passthrough) {
|
||||||
in_datas = (const void**)out_datas;
|
in_datas = (const void**)out_datas;
|
||||||
|
|
@ -2217,45 +2226,53 @@ static int impl_node_process(void *object)
|
||||||
convert_process(&this->dir[SPA_DIRECTION_OUTPUT].conv, dst_datas, in_datas, n_samples);
|
convert_process(&this->dir[SPA_DIRECTION_OUTPUT].conv, dst_datas, in_datas, n_samples);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* return input buffers */
|
spa_log_trace_fp(this->log, "%d/%d %d/%d", this->in_offset, max_in, this->out_offset, max_out);
|
||||||
dir = &this->dir[SPA_DIRECTION_INPUT];
|
res = 0;
|
||||||
for (i = 0; i < dir->n_ports; i++) {
|
if (this->in_offset >= max_in || flush_in) {
|
||||||
port = GET_IN_PORT(this, i);
|
/* return input buffers */
|
||||||
if ((io = port->io) != NULL)
|
dir = &this->dir[SPA_DIRECTION_INPUT];
|
||||||
io->status = SPA_STATUS_NEED_DATA;
|
for (i = 0; i < dir->n_ports; i++) {
|
||||||
}
|
port = GET_IN_PORT(this, i);
|
||||||
res = SPA_STATUS_NEED_DATA;
|
if ((io = port->io) != NULL) {
|
||||||
|
spa_log_trace_fp(this->log, "return: input %d %d", port->id, io->buffer_id);
|
||||||
/* queue output buffers */
|
io->status = SPA_STATUS_NEED_DATA;
|
||||||
dir = &this->dir[SPA_DIRECTION_OUTPUT];
|
}
|
||||||
for (i = 0; i < dir->n_ports; i++) {
|
|
||||||
port = GET_OUT_PORT(this, i);
|
|
||||||
buf = out_bufs[i];
|
|
||||||
|
|
||||||
if (SPA_UNLIKELY((io = port->io) == NULL))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
io->status = SPA_STATUS_HAVE_DATA;
|
|
||||||
io->buffer_id = buf ? buf->id : SPA_ID_INVALID;
|
|
||||||
|
|
||||||
if (buf == NULL)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
dequeue_buffer(this, port, buf);
|
|
||||||
|
|
||||||
for (j = 0; j < port->blocks; j++) {
|
|
||||||
bd = &buf->buf->datas[j];
|
|
||||||
if (port->is_monitor)
|
|
||||||
bd->chunk->size = max_mon * port->stride;
|
|
||||||
else
|
|
||||||
bd->chunk->size = n_samples * port->stride;
|
|
||||||
|
|
||||||
spa_log_trace_fp(this->log, "out: %d %d %d", max_mon, n_samples, bd->chunk->size);
|
|
||||||
}
|
}
|
||||||
|
this->in_offset = 0;
|
||||||
|
res |= SPA_STATUS_NEED_DATA;
|
||||||
}
|
}
|
||||||
res |= SPA_STATUS_HAVE_DATA;
|
|
||||||
this->drained = draining;
|
|
||||||
|
|
||||||
|
if (this->out_offset > 0 && (this->out_offset >= max_out || flush_out)) {
|
||||||
|
/* queue output buffers */
|
||||||
|
dir = &this->dir[SPA_DIRECTION_OUTPUT];
|
||||||
|
for (i = 0; i < dir->n_ports; i++) {
|
||||||
|
port = GET_OUT_PORT(this, i);
|
||||||
|
buf = out_bufs[i];
|
||||||
|
|
||||||
|
if (SPA_UNLIKELY((io = port->io) == NULL))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (buf == NULL)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
io->status = SPA_STATUS_HAVE_DATA;
|
||||||
|
io->buffer_id = buf->id;
|
||||||
|
|
||||||
|
for (j = 0; j < port->blocks; j++) {
|
||||||
|
bd = &buf->buf->datas[j];
|
||||||
|
if (port->is_monitor)
|
||||||
|
bd->chunk->size = max_mon * port->stride;
|
||||||
|
else
|
||||||
|
bd->chunk->size = n_samples * port->stride;
|
||||||
|
|
||||||
|
spa_log_trace_fp(this->log, "out: %d %d %d", max_mon, n_samples, bd->chunk->size);
|
||||||
|
}
|
||||||
|
dequeue_buffer(this, port, buf);
|
||||||
|
}
|
||||||
|
res |= SPA_STATUS_HAVE_DATA;
|
||||||
|
this->drained = draining;
|
||||||
|
this->out_offset = 0;
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue