audiomixer: add ringbuffer handling

This commit is contained in:
Wim Taymans 2017-10-31 16:01:26 +01:00
parent af99c196c9
commit 307239767c

View file

@ -28,6 +28,7 @@
#include <spa/format-builder.h>
#include <lib/format.h>
#include <lib/props.h>
#include <spa/param-alloc.h>
#include "conv.h"
@ -37,10 +38,13 @@
#define MAX_PORTS 128
struct buffer {
struct spa_buffer *outbuf;
bool outstanding;
struct spa_meta_header *h;
struct spa_list link;
bool outstanding;
struct spa_buffer *outbuf;
struct spa_meta_header *h;
struct spa_meta_ringbuffer *rb;
};
struct port {
@ -70,6 +74,8 @@ struct type {
struct spa_type_command_node command_node;
struct spa_type_meta meta;
struct spa_type_data data;
struct spa_type_param_alloc_buffers param_alloc_buffers;
struct spa_type_param_alloc_meta_enable param_alloc_meta_enable;
};
static inline void init_type(struct type *type, struct spa_type_map *map)
@ -83,6 +89,8 @@ static inline void init_type(struct type *type, struct spa_type_map *map)
spa_type_command_node_map(map, &type->command_node);
spa_type_meta_map(map, &type->meta);
spa_type_data_map(map, &type->data);
spa_type_param_alloc_buffers_map(map, &type->param_alloc_buffers);
spa_type_param_alloc_meta_enable_map(map, &type->param_alloc_meta_enable);
}
struct impl {
@ -107,6 +115,7 @@ struct impl {
bool have_format;
int n_formats;
struct spa_audio_info format;
uint32_t bpf;
mix_func_t copy;
mix_func_t add;
@ -393,16 +402,21 @@ impl_node_port_set_format(struct spa_node *node,
if (memcmp(&info, &this->format, sizeof(struct spa_audio_info)))
return SPA_RESULT_INVALID_MEDIA_TYPE;
} else {
this->have_format = true;
this->format = info;
if (info.info.raw.format == t->audio_format.S16) {
this->copy = this->ops.copy[CONV_S16_S16];
this->add = this->ops.add[CONV_S16_S16];
this->bpf = sizeof(int16_t) * info.info.raw.channels;
}
else if (info.info.raw.format == t->audio_format.F32) {
this->copy = this->ops.copy[CONV_F32_F32];
this->add = this->ops.add[CONV_F32_F32];
this->bpf = sizeof(float) * info.info.raw.channels;
}
else
return SPA_RESULT_INVALID_MEDIA_TYPE;
this->have_format = true;
this->format = info;
}
if (!port->have_format) {
this->n_formats++;
@ -478,7 +492,56 @@ impl_node_port_enum_params(struct spa_node *node,
uint32_t index,
struct spa_param **param)
{
return SPA_RESULT_NOT_IMPLEMENTED;
struct impl *this;
struct spa_pod_builder b = { NULL, };
struct type *t;
spa_return_val_if_fail(node != NULL, SPA_RESULT_INVALID_ARGUMENTS);
spa_return_val_if_fail(param != NULL, SPA_RESULT_INVALID_ARGUMENTS);
this = SPA_CONTAINER_OF(node, struct impl, node);
t = &this->type;
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), SPA_RESULT_INVALID_PORT);
spa_pod_builder_init(&b, this->format_buffer, sizeof(this->format_buffer));
switch (index) {
case 0:
*param = spa_pod_builder_param(&b,
t->param_alloc_buffers.Buffers,
":", t->param_alloc_buffers.size, "iru", 1024 * this->bpf,
2, 16 * this->bpf,
INT32_MAX / this->bpf,
":", t->param_alloc_buffers.stride, "i", 0,
":", t->param_alloc_buffers.buffers, "iru", 2,
2, 2, MAX_BUFFERS,
":", t->param_alloc_buffers.align, "i", 16);
break;
case 1:
*param = spa_pod_builder_param(&b,
t->param_alloc_meta_enable.MetaEnable,
":", t->param_alloc_meta_enable.type, "I", t->meta.Header,
":", t->param_alloc_meta_enable.size, "i", sizeof(struct spa_meta_header));
break;
case 2:
*param = spa_pod_builder_param(&b,
t->param_alloc_meta_enable.MetaEnable,
":", t->param_alloc_meta_enable.type, "I", t->meta.Ringbuffer,
":", t->param_alloc_meta_enable.size, "i", sizeof(struct spa_meta_ringbuffer),
":", t->param_alloc_meta_enable.ringbufferSize, "iru", 1024 * this->bpf,
2, 16 * this->bpf, INT32_MAX / this->bpf,
":", t->param_alloc_meta_enable.ringbufferStride, "i", 0,
":", t->param_alloc_meta_enable.ringbufferBlocks, "i", 1,
":", t->param_alloc_meta_enable.ringbufferAlign, "i", 16);
break;
default:
return SPA_RESULT_ENUM_END;
}
return SPA_RESULT_OK;
}
static int
@ -523,6 +586,7 @@ impl_node_port_use_buffers(struct spa_node *node,
b->outbuf = buffers[i];
b->outstanding = (direction == SPA_DIRECTION_INPUT);
b->h = spa_buffer_find_meta(buffers[i], t->meta.Header);
b->rb = spa_buffer_find_meta(buffers[i], t->meta.Ringbuffer);
if (!((d[0].type == t->data.MemPtr ||
d[0].type == t->data.MemFd ||
@ -619,32 +683,58 @@ add_port_data(struct impl *this, void *out, size_t outsize, struct port *port, i
size_t insize;
struct buffer *b;
struct spa_data *id;
uint32_t index = 0, offset, len1, len2;
b = spa_list_first(&port->queue, struct buffer, link);
id = b->outbuf->datas;
in = SPA_MEMBER(id[0].data, port->queued_offset + id[0].chunk->offset, void);
insize = id[0].chunk->size - port->queued_offset;
outsize = SPA_MIN(outsize, insize);
if (b->rb) {
insize = spa_ringbuffer_get_read_index(&b->rb->ringbuffer, &index);
outsize = SPA_MIN(outsize, insize);
if (layer == 0)
this->copy(out, in, outsize);
else
this->add(out, in, outsize);
offset = index % b->rb->ringbuffer.size;
if (offset + outsize > b->rb->ringbuffer.size)
len1 = b->rb->ringbuffer.size - offset;
else
len1 = outsize;
}
else {
offset = port->queued_offset + id[0].chunk->offset;
insize = id[0].chunk->size - offset;
outsize = SPA_MIN(outsize, insize);
len1 = outsize;
}
in = SPA_MEMBER(id[0].data, offset, void);
len2 = outsize - len1;
if (layer == 0) {
this->copy(out, in, len1);
if (len2 > 0)
this->copy(out + len1, in + len1, len2);
}
else {
this->add(out, in, len1);
if (len2 > 0)
this->add(out + len1, in + len1, len2);
}
if (b->rb)
spa_ringbuffer_read_update(&b->rb->ringbuffer, index + outsize);
port->queued_offset += outsize;
port->queued_bytes -= outsize;
if (outsize == insize) {
if (outsize == insize || b->rb) {
spa_log_trace(this->log, NAME " %p: return buffer %d on port %p %zd",
this, b->outbuf->id, port, outsize);
port->io->buffer_id = b->outbuf->id;
spa_list_remove(&b->link);
b->outstanding = true;
port->queued_offset = 0;
port->queued_bytes = 0;
} else {
spa_log_trace(this->log, NAME " %p: keeping buffer %d on port %p %zd %zd",
this, b->outbuf->id, port, port->queued_bytes, outsize);
port->queued_offset += outsize;
}
}
@ -655,6 +745,7 @@ static int mix_output(struct impl *this, size_t n_bytes)
struct port *outport;
struct spa_port_io *outio;
struct spa_data *od;
uint32_t index = 0, len1, len2 = 0, offset;
outport = GET_OUT_PORT(this, 0);
outio = outport->io;
@ -667,13 +758,31 @@ static int mix_output(struct impl *this, size_t n_bytes)
outbuf->outstanding = true;
od = outbuf->outbuf->datas;
n_bytes = SPA_MIN(n_bytes, od[0].maxsize);
od[0].chunk->offset = 0;
od[0].chunk->size = n_bytes;
od[0].chunk->stride = 0;
spa_log_trace(this->log, NAME " %p: dequeue output buffer %d %zd",
this, outbuf->outbuf->id, n_bytes);
if (outbuf->rb) {
int32_t filled, avail;
filled = spa_ringbuffer_get_write_index(&outbuf->rb->ringbuffer, &index);
avail = outbuf->rb->ringbuffer.size - filled;
offset = index % outbuf->rb->ringbuffer.size;
n_bytes = SPA_MIN(n_bytes, avail);
if (offset + n_bytes > outbuf->rb->ringbuffer.size)
len1 = outbuf->rb->ringbuffer.size - offset;
else
len1 = n_bytes;
len2 = n_bytes - len1;
spa_log_trace(this->log, NAME " %p: %d %d %d %ld %d %d", this, index, offset, avail, n_bytes, len1, len2);
} else {
n_bytes = SPA_MIN(n_bytes, od[0].maxsize);
offset = 0;
len1 = n_bytes;
}
spa_log_trace(this->log, NAME " %p: dequeue output buffer %d %zd %d %d %d",
this, outbuf->outbuf->id, n_bytes, offset, len1, len2);
for (layer = 0, i = 0; i < this->last_port; i++) {
struct port *in_port = GET_IN_PORT(this, i);
@ -686,8 +795,20 @@ static int mix_output(struct impl *this, size_t n_bytes)
in_port->queued_offset = 0;
continue;
}
add_port_data(this, od[0].data, n_bytes, in_port, layer++);
add_port_data(this, SPA_MEMBER(od[0].data, offset, void), len1, in_port, layer);
if (len2 > 0)
add_port_data(this, od[0].data, len2, in_port, layer);
layer++;
}
if (outbuf->rb) {
spa_ringbuffer_write_update(&outbuf->rb->ringbuffer, index + n_bytes);
}
else {
od[0].chunk->offset = 0;
od[0].chunk->size = n_bytes;
od[0].chunk->stride = 0;
}
outio->buffer_id = outbuf->outbuf->id;
outio->status = SPA_RESULT_HAVE_BUFFER;
@ -723,6 +844,7 @@ static int impl_node_process_input(struct spa_node *node)
if (inport->queued_bytes == 0 &&
inio->status == SPA_RESULT_HAVE_BUFFER && inio->buffer_id < inport->n_buffers) {
struct buffer *b = &inport->buffers[inio->buffer_id];
uint32_t index;
if (!b->outstanding) {
spa_log_warn(this->log, NAME " %p: buffer %u in use", this,
@ -736,7 +858,11 @@ static int impl_node_process_input(struct spa_node *node)
inio->status = SPA_RESULT_OK;
spa_list_append(&inport->queue, &b->link);
inport->queued_bytes += b->outbuf->datas[0].chunk->size;
if (b->rb)
inport->queued_bytes = spa_ringbuffer_get_read_index(&b->rb->ringbuffer, &index);
else
inport->queued_bytes += b->outbuf->datas[0].chunk->size;
spa_log_trace(this->log, NAME " %p: queue buffer %d on port %d %zd %zd",
this, b->outbuf->id, i, inport->queued_bytes, min_queued);