channelmix: add control port to process control sequences

Channelmix has now a control port that receives control sequences. Each Control
of those sequences are handled individually by channelmix in order to change
properties when a specific amount of samples have been processed.

Co-authored-by: Wim Taymans
This commit is contained in:
Julian Bouzas 2020-01-30 14:17:01 -05:00 committed by Wim Taymans
parent cab645d155
commit 5e37131cf8

View file

@ -31,6 +31,7 @@
#include <spa/support/cpu.h>
#include <spa/utils/list.h>
#include <spa/utils/names.h>
#include <spa/node/keys.h>
#include <spa/node/node.h>
#include <spa/node/io.h>
#include <spa/node/utils.h>
@ -50,6 +51,8 @@
#define MAX_BUFFERS 32
#define MAX_DATAS 32
#define MIN_CONTROL_BUFFER_SIZE 32768
struct impl;
#define DEFAULT_MUTE false
@ -119,6 +122,7 @@ struct impl {
struct spa_param_info params[8];
struct port control_port;
struct port in_port;
struct port out_port;
@ -128,10 +132,14 @@ struct impl {
uint32_t cpu_flags;
};
#define CHECK_PORT(this,d,id) (id == 0)
#define IS_CONTROL_PORT(this,d,id) (id == 1 && d == SPA_DIRECTION_INPUT)
#define IS_DATA_PORT(this,d,id) (id == 0)
#define CHECK_PORT(this,d,id) (IS_CONTROL_PORT(this,d,id) || IS_DATA_PORT(this,d,id))
#define GET_CONTROL_PORT(this,id) (&this->control_port)
#define GET_IN_PORT(this,id) (&this->in_port)
#define GET_OUT_PORT(this,id) (&this->out_port)
#define GET_PORT(this,d,id) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,id) : GET_OUT_PORT(this,id))
#define GET_PORT(this,d,id) (IS_CONTROL_PORT(this,d,id) ? GET_CONTROL_PORT(this,id) : (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,id) : GET_OUT_PORT(this,id)))
#define _MASK(ch) (1ULL << SPA_AUDIO_CHANNEL_ ## ch)
#define STEREO (_MASK(FL)|_MASK(FR))
@ -437,6 +445,8 @@ impl_node_add_listener(void *object,
{
struct impl *this = object;
struct spa_hook_list save;
struct spa_dict_item items[1];
uint32_t n_items = 0;
spa_return_val_if_fail(this != NULL, -EINVAL);
@ -446,6 +456,11 @@ impl_node_add_listener(void *object,
emit_port_info(this, GET_IN_PORT(this, 0), true);
emit_port_info(this, GET_OUT_PORT(this, 0), true);
struct port *control_port = GET_CONTROL_PORT(this, 1);
items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_PORT_NAME, "control");
control_port->info.props = &SPA_DICT_INIT(items, n_items);
emit_port_info(this, control_port, true);
spa_hook_list_join(&this->hooks, &save);
return 0;
@ -481,6 +496,14 @@ static int port_enum_formats(void *object,
struct impl *this = object;
struct port *other;
if (IS_CONTROL_PORT(this, direction, port_id)) {
*param = spa_pod_builder_add_object(builder,
SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control));
return 1;
}
other = GET_PORT(this, SPA_DIRECTION_REVERSE(direction), 0);
switch (index) {
@ -554,7 +577,13 @@ impl_node_port_enum_params(void *object, int seq,
if (result.index > 0)
return 0;
param = spa_format_audio_raw_build(&b, id, &port->format.info.raw);
if (IS_CONTROL_PORT(this, direction, port_id))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control));
else
param = spa_format_audio_raw_build(&b, id, &port->format.info.raw);
break;
case SPA_PARAM_Buffers:
@ -566,29 +595,45 @@ impl_node_port_enum_params(void *object, int seq,
if (result.index > 0)
return 0;
if (other->n_buffers > 0) {
buffers = other->n_buffers;
size = other->size / other->stride;
if (IS_CONTROL_PORT(this, direction, port_id)) {
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
MIN_CONTROL_BUFFER_SIZE,
MIN_CONTROL_BUFFER_SIZE,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(16));
} else {
buffers = 1;
size = MAX_SAMPLES;
}
if (other->n_buffers > 0) {
buffers = other->n_buffers;
size = other->size / other->stride;
} else {
buffers = 1;
size = MAX_SAMPLES;
}
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(buffers, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(port->blocks),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
size * port->stride,
16 * port->stride,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->stride),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(16));
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(buffers, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(port->blocks),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
size * port->stride,
16 * port->stride,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->stride),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(16));
}
break;
}
case SPA_PARAM_Meta:
switch (result.index) {
case 0:
if (IS_CONTROL_PORT(this, direction, port_id))
return -EINVAL;
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, id,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
@ -662,22 +707,28 @@ static int port_set_format(void *object,
if ((res = spa_format_parse(format, &info.media_type, &info.media_subtype)) < 0)
return res;
if (info.media_type != SPA_MEDIA_TYPE_audio ||
info.media_subtype != SPA_MEDIA_SUBTYPE_raw)
return -EINVAL;
if (IS_CONTROL_PORT(this, direction, port_id)) {
if (info.media_type != SPA_MEDIA_TYPE_application ||
info.media_subtype != SPA_MEDIA_SUBTYPE_control)
return -EINVAL;
} else {
if (info.media_type != SPA_MEDIA_TYPE_audio ||
info.media_subtype != SPA_MEDIA_SUBTYPE_raw)
return -EINVAL;
if (spa_format_audio_raw_parse(format, &info.info.raw) < 0)
return -EINVAL;
if (spa_format_audio_raw_parse(format, &info.info.raw) < 0)
return -EINVAL;
if (info.info.raw.format != SPA_AUDIO_FORMAT_F32P)
return -EINVAL;
if (info.info.raw.format != SPA_AUDIO_FORMAT_F32P)
return -EINVAL;
port->stride = sizeof(float);
port->blocks = info.info.raw.channels;
port->stride = sizeof(float);
port->blocks = info.info.raw.channels;
if (other->have_format) {
if ((res = setup_convert(this, direction, &info)) < 0)
return res;
if (other->have_format) {
if ((res = setup_convert(this, direction, &info)) < 0)
return res;
}
}
port->format = info;
port->have_format = true;
@ -704,15 +755,20 @@ impl_node_port_set_param(void *object,
uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
spa_return_val_if_fail(object != NULL, -EINVAL);
struct impl *this = object;
spa_return_val_if_fail(CHECK_PORT(object, direction, port_id), -EINVAL);
spa_return_val_if_fail(this != NULL, -EINVAL);
if (id == SPA_PARAM_Format) {
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
switch (id) {
case SPA_PARAM_Format:
return port_set_format(object, direction, port_id, flags, param);
default:
break;
}
else
return -ENOENT;
return -ENOENT;
}
static int
@ -730,7 +786,8 @@ impl_node_port_use_buffers(void *object,
port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(port->have_format, -EIO);
if (IS_DATA_PORT(this, direction, port_id))
spa_return_val_if_fail(port->have_format, -EIO);
spa_log_debug(this->log, NAME " %p: use buffers %d on port %d", this, n_buffers, port_id);
@ -840,20 +897,136 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t
return 0;
}
static int update_control_sequence(struct spa_pod_sequence *ctrl_sequence,
uint32_t ctrl_size, uint32_t curr_ctrl, uint32_t curr_offset)
{
struct spa_pod_control *c;
struct spa_pod_builder b;
struct spa_pod_frame f[1];
uint32_t i = 0;
spa_return_val_if_fail(ctrl_sequence != NULL, -EINVAL);
spa_return_val_if_fail(ctrl_size > 0, -EINVAL);
spa_return_val_if_fail(curr_offset > 0, -EINVAL);
/* copy the sequence */
uint8_t old[ctrl_size];
memcpy(old, ctrl_sequence, ctrl_size);
/* reset the current sequence */
spa_pod_builder_init(&b, ctrl_sequence, ctrl_size);
spa_pod_builder_push_sequence(&b, &f[0], 0);
/* add the remaining controls */
SPA_POD_SEQUENCE_FOREACH(&((struct spa_io_sequence *)old)->sequence, c) {
switch (c->type) {
case SPA_CONTROL_Properties:
{
if (i >= curr_ctrl) {
spa_pod_builder_control(&b,
i == curr_ctrl ? curr_offset : c->offset,
SPA_CONTROL_Properties);
spa_pod_builder_primitive(&b, &c->value);
}
i++;
break;
}
default:
break;
}
}
spa_pod_builder_pop(&b, &f[0]);
return 0;
}
static int channelmix_process_control(struct impl *this, struct buffer *cbuf,
uint32_t n_dst, void * SPA_RESTRICT dst[n_dst],
uint32_t n_src, const void * SPA_RESTRICT src[n_src],
uint32_t n_samples)
{
struct spa_pod_sequence *ctrl_sequence;
uint32_t ctrl_size;
struct spa_pod_control *c;
uint32_t avail_samples = n_samples;
uint32_t i, j;
const float **s = (const float **)src;
float **d = (float **)dst;
/* get the control sequence */
spa_return_val_if_fail(cbuf != NULL, -EINVAL);
spa_return_val_if_fail(cbuf->outbuf->n_datas == 1, -EINVAL);
ctrl_sequence = (struct spa_pod_sequence *)cbuf->outbuf->datas[0].data;
ctrl_size = cbuf->outbuf->datas[0].maxsize;
i = 0;
SPA_POD_SEQUENCE_FOREACH(ctrl_sequence, c) {
switch (c->type) {
case SPA_CONTROL_Properties:
{
/* update the sequence and return if all samples were processed */
if (avail_samples == 0) {
update_control_sequence(ctrl_sequence, ctrl_size, i, c->offset);
return 0;
}
/* apply the sequence control props */
apply_props(this, &c->value);
if (c->offset > 0) {
/* get the minimum offset for this buffer */
uint32_t offset = SPA_MIN(avail_samples, c->offset);
/* process the control sequence */
channelmix_process(&this->mix, n_dst, dst, n_src, src, offset);
for (j = 0; j < n_src; j++)
s[j] += offset;
for (j = 0; j < n_dst; j++)
d[j] += offset;
/* update the sequence with the remaining offset samples */
if (avail_samples < c->offset) {
update_control_sequence(ctrl_sequence, ctrl_size, i, c->offset - avail_samples);
return 0;
}
/* decrease available samples */
avail_samples -= offset;
}
i++;
break;
}
default:
break;
}
}
/* process the remaining samples */
if (avail_samples > 0)
channelmix_process(&this->mix, n_dst, dst, n_src, src, avail_samples);
return 1;
}
static int impl_node_process(void *object)
{
struct impl *this = object;
struct port *outport, *inport;
struct spa_io_buffers *outio, *inio;
struct buffer *sbuf, *dbuf;
struct port *outport, *inport, *ctrlport;
struct spa_io_buffers *outio, *inio, *ctrlio;
struct buffer *sbuf, *dbuf, *cbuf = NULL;
spa_return_val_if_fail(this != NULL, -EINVAL);
outport = GET_OUT_PORT(this, 0);
inport = GET_IN_PORT(this, 0);
ctrlport = GET_CONTROL_PORT(this, 1);
outio = outport->io;
inio = inport->io;
ctrlio = ctrlport->io;
spa_return_val_if_fail(outio != NULL, -EIO);
spa_return_val_if_fail(inio != NULL, -EIO);
@ -880,6 +1053,11 @@ static int impl_node_process(void *object)
sbuf = &inport->buffers[inio->buffer_id];
if (ctrlio != NULL &&
ctrlio->status == SPA_STATUS_HAVE_DATA &&
ctrlio->buffer_id < ctrlport->n_buffers)
cbuf = &ctrlport->buffers[ctrlio->buffer_id];
{
uint32_t i, n_samples;
struct spa_buffer *sb = sbuf->outbuf, *db = dbuf->outbuf;
@ -889,7 +1067,7 @@ static int impl_node_process(void *object)
void *dst_datas[n_dst_datas];
bool is_passthrough;
is_passthrough = this->is_passthrough && this->mix.identity;
is_passthrough = this->is_passthrough && this->mix.identity && cbuf == NULL;
n_samples = sb->datas[0].chunk->size / inport->stride;
@ -904,9 +1082,18 @@ static int impl_node_process(void *object)
spa_log_trace_fp(this->log, NAME " %p: n_src:%d n_dst:%d n_samples:%d p:%d",
this, n_src_datas, n_dst_datas, n_samples, is_passthrough);
if (!is_passthrough)
channelmix_process(&this->mix, n_dst_datas, dst_datas,
n_src_datas, src_datas, n_samples);
if (!is_passthrough) {
if (cbuf != NULL) {
/* if return value is 1, the sequence has been processed */
if (channelmix_process_control(this, cbuf, n_dst_datas, dst_datas,
n_src_datas, src_datas, n_samples) == 1) {
ctrlio->status = SPA_STATUS_OK;
}
} else {
channelmix_process(&this->mix, n_dst_datas, dst_datas,
n_src_datas, src_datas, n_samples);
}
}
}
outio->status = SPA_STATUS_HAVE_DATA;
@ -998,6 +1185,8 @@ impl_init(const struct spa_handle_factory *factory,
SPA_NODE_CHANGE_MASK_PARAMS;
this->info = SPA_NODE_INFO_INIT();
this->info.flags = SPA_NODE_FLAG_RT;
this->info.max_input_ports = 2;
this->info.max_output_ports = 1;
this->params[0] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);
this->params[1] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE);
this->info.params = this->params;
@ -1037,6 +1226,23 @@ impl_init(const struct spa_handle_factory *factory,
port->info.n_params = 0;
spa_list_init(&port->queue);
port = GET_CONTROL_PORT(this, 1);
port->direction = SPA_DIRECTION_INPUT;
port->id = 1;
port->info_all = SPA_PORT_CHANGE_MASK_FLAGS |
SPA_PORT_CHANGE_MASK_PROPS |
SPA_PORT_CHANGE_MASK_PARAMS;
port->info = SPA_PORT_INFO_INIT();
port->info.flags = SPA_PORT_FLAG_NO_REF |
SPA_PORT_FLAG_DYNAMIC_DATA;
port->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ);
port->params[1] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ);
port->params[2] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
port->info.params = port->params;
port->info.n_params = 4;
spa_list_init(&port->queue);
return 0;
}