diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index 075397761..35565e95c 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -46,6 +46,8 @@ #define MAX_SAMPLES 8192 #define MAX_BUFFERS 64 #define MAX_PORTS 128 +#define MAX_CHANNELS 64 +#define MAX_BUFFER_SIZE (MAX_SAMPLES * MAX_CHANNELS * 8) #define PORT_DEFAULT_VOLUME 1.0 #define PORT_DEFAULT_MUTE false @@ -63,12 +65,15 @@ static void port_props_reset(struct port_props *props) struct buffer { uint32_t id; +#define BUFFER_FLAG_QUEUED (1 << 0) + uint32_t flags; + struct spa_list link; - bool outstanding; - - struct spa_buffer *outbuf; - + struct spa_buffer *buffer; struct spa_meta_header *h; + struct spa_buffer buf; + struct spa_data datas[1]; + struct spa_chunk chunk[1]; }; struct port { @@ -78,12 +83,10 @@ struct port { struct port_props props; struct spa_io_buffers *io; - double *io_volume; - int32_t *io_mute; uint64_t info_all; struct spa_port_info info; - struct spa_param_info params[5]; + struct spa_param_info params[8]; unsigned int valid:1; unsigned int have_format:1; @@ -116,12 +119,16 @@ struct impl { struct port in_ports[MAX_PORTS]; struct port out_ports[1]; - bool have_format; int n_formats; struct spa_audio_info format; - uint32_t bpf; - bool started; + unsigned int have_format:1; + unsigned int started:1; + uint32_t stride; + uint32_t blocks; + + uint8_t empty[MAX_BUFFER_SIZE]; + }; #define CHECK_FREE_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && !this->in_ports[(p)].valid) @@ -236,21 +243,19 @@ static int impl_node_add_port(void *object, enum spa_direction direction, uint32 spa_return_val_if_fail(CHECK_FREE_IN_PORT(this, direction, port_id), -EINVAL); port = GET_IN_PORT(this, port_id); - port->valid = true; port->direction = SPA_DIRECTION_INPUT; port->id = port_id; port_props_reset(&port->props); - port->io_volume = &port->props.volume; - port->io_mute = &port->props.mute; spa_list_init(&port->queue); port->info_all = SPA_PORT_CHANGE_MASK_FLAGS | SPA_PORT_CHANGE_MASK_PARAMS; port->info = SPA_PORT_INFO_INIT(); - port->info.flags = SPA_PORT_FLAG_REMOVABLE | - SPA_PORT_FLAG_OPTIONAL | - SPA_PORT_FLAG_IN_PLACE; + port->info.flags = SPA_PORT_FLAG_NO_REF | + SPA_PORT_FLAG_DYNAMIC_DATA | + SPA_PORT_FLAG_REMOVABLE | + SPA_PORT_FLAG_OPTIONAL; port->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ); port->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ); port->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ); @@ -262,8 +267,10 @@ static int impl_node_add_port(void *object, enum spa_direction direction, uint32 this->port_count++; if (this->last_port <= port_id) this->last_port = port_id + 1; + port->valid = true; - spa_log_debug(this->log, NAME " %p: add port %d", this, port_id); + spa_log_debug(this->log, NAME " %p: add port %d:%d %d", this, + direction, port_id, this->last_port); emit_port_info(this, port, true); return 0; @@ -280,6 +287,7 @@ impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_ port = GET_IN_PORT(this, port_id); + port->valid = false; this->port_count--; if (port->have_format && this->have_format) { if (--this->n_formats == 0) @@ -290,13 +298,15 @@ impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_ if (port_id == this->last_port + 1) { int i; - for (i = this->last_port; i >= 0; i--) + for (i = this->last_port - 1; i >= 0; i--) if (GET_IN_PORT (this, i)->valid) break; this->last_port = i + 1; } - spa_log_debug(this->log, NAME " %p: remove port %d", this, port_id); + spa_log_debug(this->log, NAME " %p: remove port %d:%d %d", this, + direction, port_id, this->last_port); + spa_node_emit_port_info(&this->hooks, direction, port_id, NULL); return 0; @@ -325,10 +335,13 @@ static int port_enum_formats(void *object, SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio), SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw), - SPA_FORMAT_AUDIO_format, SPA_POD_CHOICE_ENUM_Int(3, + SPA_FORMAT_AUDIO_format, SPA_POD_CHOICE_ENUM_Int(6, + SPA_AUDIO_FORMAT_S8, SPA_AUDIO_FORMAT_S16, - SPA_AUDIO_FORMAT_S16, - SPA_AUDIO_FORMAT_F32), + SPA_AUDIO_FORMAT_S24, + SPA_AUDIO_FORMAT_S32, + SPA_AUDIO_FORMAT_F32, + SPA_AUDIO_FORMAT_F64), SPA_FORMAT_AUDIO_rate, SPA_POD_CHOICE_RANGE_Int(44100, 1, INT32_MAX), SPA_FORMAT_AUDIO_channels, SPA_POD_CHOICE_RANGE_Int(2, 1, INT32_MAX)); } @@ -369,10 +382,10 @@ impl_node_port_enum_params(void *object, int seq, switch (id) { case SPA_PARAM_EnumFormat: - if ((res = port_enum_formats(this, direction, port_id, - result.index, ¶m, &b)) <= 0) + if ((res = port_enum_formats(this, direction, port_id, result.index, ¶m, &b)) <= 0) return res; break; + case SPA_PARAM_Format: if (!port->have_format) return -EIO; @@ -381,6 +394,7 @@ impl_node_port_enum_params(void *object, int seq, param = spa_format_audio_raw_build(&b, id, &this->format.info.raw); break; + case SPA_PARAM_Buffers: if (!port->have_format) return -EIO; @@ -390,12 +404,12 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, MAX_BUFFERS), - SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), + SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(this->blocks), SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( - MAX_SAMPLES * this->bpf, - 16 * this->bpf, + MAX_SAMPLES * this->stride, + 16 * this->stride, INT32_MAX), - SPA_PARAM_BUFFERS_stride, SPA_POD_Int(0), + SPA_PARAM_BUFFERS_stride, SPA_POD_Int(this->stride), SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); break; case SPA_PARAM_Meta: @@ -447,6 +461,54 @@ static int clear_buffers(struct impl *this, struct port *port) return 0; } +static int queue_buffer(struct impl *this, struct port *port, struct buffer *b) +{ + if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_QUEUED)) + return -EINVAL; + + spa_list_append(&port->queue, &b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_QUEUED); + spa_log_trace_fp(this->log, NAME " %p: queue buffer %d", this, b->id); + return 0; +} + +static struct buffer *dequeue_buffer(struct impl *this, struct port *port) +{ + struct buffer *b; + + if (spa_list_is_empty(&port->queue)) + return NULL; + + b = spa_list_first(&port->queue, struct buffer, link); + spa_list_remove(&b->link); + SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_QUEUED); + spa_log_trace_fp(this->log, NAME " %p: dequeue buffer %d", this, b->id); + return b; +} + +static int calc_width(struct spa_audio_info *info) +{ + switch (info->info.raw.format) { + case SPA_AUDIO_FORMAT_U8P: + case SPA_AUDIO_FORMAT_U8: + case SPA_AUDIO_FORMAT_S8P: + case SPA_AUDIO_FORMAT_S8: + case SPA_AUDIO_FORMAT_ALAW: + case SPA_AUDIO_FORMAT_ULAW: + return 1; + case SPA_AUDIO_FORMAT_S16P: + case SPA_AUDIO_FORMAT_S16: + case SPA_AUDIO_FORMAT_S16_OE: + return 2; + case SPA_AUDIO_FORMAT_S24P: + case SPA_AUDIO_FORMAT_S24: + case SPA_AUDIO_FORMAT_S24_OE: + return 3; + default: + return 4; + } +} + static int port_set_format(void *object, enum spa_direction direction, uint32_t port_id, @@ -490,13 +552,23 @@ static int port_set_format(void *object, if ((res = mix_ops_init(&this->ops)) < 0) return res; + this->stride = calc_width(&info); + + if (SPA_AUDIO_FORMAT_IS_PLANAR(info.info.raw.format)) { + this->blocks = info.info.raw.channels; + } else { + this->stride *= info.info.raw.channels; + this->blocks = 1; + } + this->have_format = true; this->format = info; } if (!port->have_format) { this->n_formats++; port->have_format = true; - spa_log_debug(this->log, NAME " %p: set format on port %d", this, port_id); + spa_log_debug(this->log, NAME " %p: set format on port %d", + this, port_id); } } port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; @@ -544,14 +616,16 @@ impl_node_port_use_buffers(void *object, uint32_t i; spa_return_val_if_fail(this != NULL, -EINVAL); + + spa_log_debug(this->log, NAME " %p: use %d buffers on port %d:%d", + this, n_buffers, direction, port_id); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); port = GET_PORT(this, direction, port_id); spa_return_val_if_fail(port->have_format, -EIO); - spa_log_debug(this->log, NAME " %p: use buffers %d on port %d", this, n_buffers, port_id); - clear_buffers(this, port); for (i = 0; i < n_buffers; i++) { @@ -559,9 +633,9 @@ impl_node_port_use_buffers(void *object, struct spa_data *d = buffers[i]->datas; b = &port->buffers[i]; + b->buffer = buffers[i]; + b->flags = 0; b->id = i; - b->outbuf = buffers[i]; - b->outstanding = (direction == SPA_DIRECTION_INPUT); b->h = spa_buffer_find_meta_data(buffers[i], SPA_META_Header, sizeof(*b->h)); if (d[0].data == NULL) { @@ -569,12 +643,15 @@ impl_node_port_use_buffers(void *object, buffers[i]); return -EINVAL; } - if (!b->outstanding) - spa_list_append(&port->queue, &b->link); + if (!SPA_IS_ALIGNED(d[0].data, 16)) { + spa_log_warn(this->log, NAME " %p: memory on buffer %d not aligned", this, i); + } + if (direction == SPA_DIRECTION_OUTPUT) + queue_buffer(this, port, b); - port->queued_bytes = 0; - if (port->io) - *port->io = SPA_IO_BUFFERS_INIT; + spa_log_debug(this->log, NAME " %p: port %d:%d buffer:%d n_data:%d data:%p maxsize:%d", + this, direction, port_id, i, + buffers[i]->n_datas, d[0].data, d[0].maxsize); } port->n_buffers = n_buffers; @@ -590,6 +667,10 @@ impl_node_port_set_io(void *object, struct port *port; spa_return_val_if_fail(this != NULL, -EINVAL); + + spa_log_debug(this->log, NAME " %p: port %d:%d io %d %p/%zd", this, + direction, port_id, id, data, size); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); port = GET_PORT(this, direction, port_id); @@ -604,152 +685,19 @@ impl_node_port_set_io(void *object, return 0; } -static void recycle_buffer(struct impl *this, uint32_t id) -{ - struct port *port = GET_OUT_PORT(this, 0); - struct buffer *b = &port->buffers[id]; - - if (!b->outstanding) - return; - - spa_list_append(&port->queue, &b->link); - b->outstanding = false; - spa_log_trace(this->log, NAME " %p: recycle buffer %d", this, id); -} - static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) { struct impl *this = object; + struct port *port; spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(CHECK_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL); + port = GET_OUT_PORT(this, 0); - recycle_buffer(this, buffer_id); + if (buffer_id >= port->n_buffers) + return -EINVAL; - return -ENOTSUP; -} - -static inline void -add_port_data(struct impl *this, void *out, size_t outsize, struct port *port, int layer) -{ - size_t insize; - struct buffer *b; - uint32_t index, offset, len1, len2, maxsize; - struct spa_data *d; - void *data; - double volume = *port->io_volume; - bool mute = *port->io_mute; - const void *s0[2], *s1[2]; - uint32_t n_src; - - b = spa_list_first(&port->queue, struct buffer, link); - - d = b->outbuf->datas; - - maxsize = d[0].maxsize; - data = d[0].data; - - insize = SPA_MIN(d[0].chunk->size, maxsize); - outsize = SPA_MIN(outsize, insize); - - index = d[0].chunk->offset + (insize - port->queued_bytes); - offset = index % maxsize; - - len1 = SPA_MIN(outsize, maxsize - offset); - len2 = outsize - len1; - - n_src = 0; - if (layer > 0) { - s0[n_src] = out; - s1[n_src] = SPA_PTROFF(out, len1, void); - n_src++; - } - s0[n_src] = SPA_PTROFF(data, offset, void); - s1[n_src] = data; - n_src++; - - if (volume < 0.001 || mute) { - /* silence, do nothing */ - } - else { - mix_ops_process(&this->ops, out, s0, n_src, len1); - if (len2 > 0) - mix_ops_process(&this->ops, SPA_PTROFF(out, len1, void), s1, n_src, len2); - } - port->queued_bytes -= outsize; - - if (port->queued_bytes == 0) { - spa_log_trace(this->log, NAME " %p: return buffer %d on port %d %zd", - this, b->id, port->id, outsize); - port->io->buffer_id = b->id; - spa_list_remove(&b->link); - b->outstanding = true; - } else { - spa_log_trace(this->log, NAME " %p: keeping buffer %d on port %d %zd %zd", - this, b->id, port->id, port->queued_bytes, outsize); - } -} - -static int mix_output(struct impl *this, size_t n_bytes) -{ - struct buffer *outbuf; - uint32_t i, layer; - struct port *outport; - struct spa_io_buffers *outio; - struct spa_data *od; - uint32_t avail, index, maxsize, len1, len2, offset; - - outport = GET_OUT_PORT(this, 0); - outio = outport->io; - - if (spa_list_is_empty(&outport->queue)) { - spa_log_trace(this->log, NAME " %p: out of buffers", this); - return -EPIPE; - } - - outbuf = spa_list_first(&outport->queue, struct buffer, link); - spa_list_remove(&outbuf->link); - outbuf->outstanding = true; - - od = outbuf->outbuf->datas; - maxsize = od[0].maxsize; - - avail = maxsize; - index = 0; - n_bytes = SPA_MIN(n_bytes, avail); - - offset = index % maxsize; - len1 = SPA_MIN(n_bytes, maxsize - offset); - len2 = n_bytes - len1; - - spa_log_trace(this->log, NAME " %p: dequeue output buffer %d %zd %d %d %d", - this, outbuf->id, n_bytes, offset, len1, len2); - - for (layer = 0, i = 0; i < this->last_port; i++) { - struct port *in_port = GET_IN_PORT(this, i); - - if (in_port->io == NULL || in_port->n_buffers == 0) - continue; - - if (in_port->queued_bytes == 0) { - spa_log_warn(this->log, NAME " %p: underrun stream %d", this, i); - continue; - } - - add_port_data(this, SPA_PTROFF(od[0].data, offset, void), len1, in_port, layer); - if (len2 > 0) - add_port_data(this, od[0].data, len2, in_port, layer); - layer++; - } - - od[0].chunk->offset = index; - od[0].chunk->size = n_bytes; - od[0].chunk->stride = 0; - - outio->buffer_id = outbuf->id; - outio->status = SPA_STATUS_HAVE_DATA; - - return SPA_STATUS_HAVE_DATA; + return queue_buffer(this, port, &port->buffers[buffer_id]); } static int impl_node_process(void *object) @@ -757,8 +705,10 @@ static int impl_node_process(void *object) struct impl *this = object; struct port *outport; struct spa_io_buffers *outio; - uint32_t i; - size_t min_queued = SIZE_MAX; + uint32_t n_samples, n_buffers, i, maxsize; + struct buffer **buffers; + struct buffer *outb; + const void **datas; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -766,49 +716,81 @@ static int impl_node_process(void *object) outio = outport->io; spa_return_val_if_fail(outio != NULL, -EIO); - spa_log_trace(this->log, NAME " %p: status %d", this, outio->status); + spa_log_trace_fp(this->log, NAME " %p: status %p %d %d", + this, outio, outio->status, outio->buffer_id); - if (outio->status == SPA_STATUS_HAVE_DATA) - goto done; + if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) + return outio->status; /* recycle */ - if (outio->buffer_id < outport->n_buffers) { - recycle_buffer(this, outio->buffer_id); + if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { + queue_buffer(this, outport, &outport->buffers[outio->buffer_id]); outio->buffer_id = SPA_ID_INVALID; } - /* produce more output if possible */ + buffers = alloca(MAX_PORTS * sizeof(struct buffer *)); + datas = alloca(MAX_PORTS * sizeof(void *)); + n_buffers = 0; + + maxsize = MAX_SAMPLES * this->stride; + for (i = 0; i < this->last_port; i++) { struct port *inport = GET_IN_PORT(this, i); + struct spa_io_buffers *inio = NULL; + struct buffer *inb; - if (inport->io == NULL || inport->n_buffers == 0) + if (SPA_UNLIKELY(!inport->valid || + (inio = inport->io) == NULL || + inio->buffer_id >= inport->n_buffers || + inio->status != SPA_STATUS_HAVE_DATA)) { + spa_log_trace_fp(this->log, NAME " %p: skip input idx:%d valid:%d " + "io:%p status:%d buf_id:%d n_buffers:%d", this, + i, inport->valid, inio, + inio ? inio->status : -1, + inio ? inio->buffer_id : SPA_ID_INVALID, + inport->n_buffers); continue; - - if (inport->queued_bytes < min_queued) - min_queued = inport->queued_bytes; - } - if (min_queued != SIZE_MAX && min_queued > 0) { - outio->status = mix_output(this, min_queued); - } else { - /* take requested output range and apply to input */ - for (i = 0; i < this->last_port; i++) { - struct port *inport = GET_IN_PORT(this, i); - struct spa_io_buffers *inio; - - if ((inio = inport->io) == NULL || inport->n_buffers == 0) - continue; - - spa_log_trace(this->log, NAME " %p: port %d queued %zd, res %d", this, - i, inport->queued_bytes, inio->status); - - if (inport->queued_bytes == 0) { - inio->status = SPA_STATUS_NEED_DATA; - } } - outio->status = SPA_STATUS_NEED_DATA; + + inb = &inport->buffers[inio->buffer_id]; + maxsize = SPA_MIN(inb->buffer->datas[0].chunk->size, maxsize); + + spa_log_trace_fp(this->log, NAME " %p: mix input %d %p->%p %d %d %d", this, + i, inio, outio, inio->status, inio->buffer_id, maxsize); + + datas[n_buffers] = inb->buffer->datas[0].data; + buffers[n_buffers++] = inb; + inio->status = SPA_STATUS_NEED_DATA; } - done: - return outio->status; + + outb = dequeue_buffer(this, outport); + if (SPA_UNLIKELY(outb == NULL)) { + spa_log_trace(this->log, NAME " %p: out of buffers", this); + return -EPIPE; + } + + n_samples = maxsize / this->stride; + + if (n_buffers == 1) { + *outb->buffer = *buffers[0]->buffer; + } + else { + outb->buffer->n_datas = 1; + outb->buffer->datas = outb->datas; + outb->datas[0].data = this->empty; + outb->datas[0].chunk = outb->chunk; + outb->datas[0].chunk->offset = 0; + outb->datas[0].chunk->size = n_samples * this->stride; + outb->datas[0].chunk->stride = this->stride; + outb->datas[0].maxsize = maxsize; + + mix_ops_process(&this->ops, outb->datas[0].data, datas, n_buffers, n_samples); + } + + outio->buffer_id = outb->id; + outio->status = SPA_STATUS_HAVE_DATA; + + return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; } static const struct spa_node_methods impl_node = { @@ -899,9 +881,7 @@ impl_init(const struct spa_handle_factory *factory, this->info.max_input_ports = MAX_PORTS; this->info.max_output_ports = 1; this->info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS; - this->info.flags = SPA_NODE_FLAG_IN_DYNAMIC_PORTS | - SPA_NODE_FLAG_RT; - this->info.params = this->params; + this->info.flags = SPA_NODE_FLAG_RT | SPA_NODE_FLAG_IN_DYNAMIC_PORTS; port = GET_OUT_PORT(this, 0); port->valid = true; @@ -909,7 +889,7 @@ impl_init(const struct spa_handle_factory *factory, port->id = 0; port->info = SPA_PORT_INFO_INIT(); port->info.change_mask |= SPA_PORT_CHANGE_MASK_FLAGS; - port->info.flags = SPA_PORT_FLAG_NO_REF; + port->info.flags = SPA_PORT_FLAG_DYNAMIC_DATA; port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; port->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ); port->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ);