node: add port and node params

Add a new struct spa_param_info that lists the available params on
a node/port and if they are readable/writable/updated. We can use
this to replace and improve the PARAM_List and also to notify
property change and updates.

Update elements and code to deal with this new param stuff. Add
port and node info to most elements and signal changes.

Use async enum_params in -inspect and use the param info to know
which ones to enumerate.

Use the port info to know what parameters to update in the
remote-node.
This commit is contained in:
Wim Taymans 2019-02-27 16:43:01 +01:00
parent 3d25adc598
commit 499dd3ff22
52 changed files with 1979 additions and 1461 deletions

View file

@ -1,4 +1,4 @@
/* Spa ALSA Sink
/* Spa A2DP Sink
*
* Copyright © 2018 Wim Taymans
*
@ -63,6 +63,25 @@ struct buffer {
struct spa_list link;
};
struct port {
bool have_format;
struct spa_audio_info current_format;
int frame_size;
struct spa_port_info info;
struct spa_io_buffers *io;
struct spa_io_range *range;
struct spa_param_info params[8];
struct buffer buffers[MAX_BUFFERS];
unsigned int n_buffers;
struct spa_list free;
struct spa_list ready;
size_t ready_offset;
};
struct impl {
struct spa_handle handle;
struct spa_node node;
@ -76,28 +95,16 @@ struct impl {
const struct spa_node_callbacks *callbacks;
void *callbacks_data;
struct spa_node_info info;
struct spa_param_info params[8];
struct props props;
struct spa_bt_transport *transport;
struct port port;
bool opened;
bool have_format;
struct spa_audio_info current_format;
int frame_size;
struct spa_port_info info;
struct spa_io_buffers *io;
struct spa_io_range *range;
struct buffer buffers[MAX_BUFFERS];
unsigned int n_buffers;
struct spa_list free;
struct spa_list ready;
size_t ready_offset;
bool started;
struct spa_source source;
int timerfd;
@ -171,19 +178,6 @@ static int impl_node_enum_params(struct spa_node *node, int seq,
spa_pod_builder_init(&b, buffer, sizeof(buffer));
switch (id) {
case SPA_PARAM_List:
{
uint32_t list[] = { SPA_PARAM_PropInfo,
SPA_PARAM_Props };
if (result.index < SPA_N_ELEMENTS(list))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
SPA_PARAM_LIST_id, SPA_POD_Id(list[result.index]));
else
return 0;
break;
}
case SPA_PARAM_PropInfo:
{
struct props *p = &this->props;
@ -337,9 +331,10 @@ static int encode_buffer(struct impl *this, const void *data, int size)
{
int processed;
ssize_t out_encoded;
struct port *port = &this->port;
spa_log_trace(this->log, "a2dp-sink %p: encode %d used %d, %d %d",
this, size, this->buffer_used, this->frame_size, this->write_size);
this, size, this->buffer_used, port->frame_size, this->write_size);
if (this->frame_count > MAX_FRAME_COUNT)
return -ENOSPC;
@ -351,8 +346,8 @@ static int encode_buffer(struct impl *this, const void *data, int size)
if (processed < 0)
return processed;
this->sample_count += processed / this->frame_size;
this->sample_time += processed / this->frame_size;
this->sample_count += processed / port->frame_size;
this->sample_time += processed / port->frame_size;
this->frame_count += processed / this->codesize;
this->buffer_used += out_encoded;
@ -428,6 +423,8 @@ static int add_data(struct impl *this, const void *data, int size)
static int set_bitpool(struct impl *this, int bitpool)
{
struct port *port = &this->port;
if (bitpool < this->min_bitpool)
bitpool = this->min_bitpool;
if (bitpool > this->max_bitpool)
@ -447,7 +444,8 @@ static int set_bitpool(struct impl *this, int bitpool)
- sizeof(struct rtp_header) - sizeof(struct rtp_payload) - 24;
this->write_size = this->transport->write_mtu
- sizeof(struct rtp_header) - sizeof(struct rtp_payload) - 24;
this->write_samples = (this->write_size / this->frame_length) * (this->codesize / this->frame_size);
this->write_samples = (this->write_size / this->frame_length) *
(this->codesize / port->frame_size);
return 0;
}
@ -469,27 +467,28 @@ static int flush_data(struct impl *this, uint64_t now_time)
uint64_t elapsed;
int64_t queued;
struct itimerspec ts;
struct port *port = &this->port;
total_frames = 0;
while (!spa_list_is_empty(&this->ready)) {
while (!spa_list_is_empty(&port->ready)) {
uint8_t *src;
uint32_t n_bytes, n_frames;
struct buffer *b;
struct spa_data *d;
uint32_t index, offs, avail, l0, l1;
b = spa_list_first(&this->ready, struct buffer, link);
b = spa_list_first(&port->ready, struct buffer, link);
d = b->buf->datas;
src = d[0].data;
index = d[0].chunk->offset + this->ready_offset;
avail = d[0].chunk->size - this->ready_offset;
avail /= this->frame_size;
index = d[0].chunk->offset + port->ready_offset;
avail = d[0].chunk->size - port->ready_offset;
avail /= port->frame_size;
offs = index % d[0].maxsize;
n_frames = avail;
n_bytes = n_frames * this->frame_size;
n_bytes = n_frames * port->frame_size;
l0 = SPA_MIN(n_bytes, d[0].maxsize - offs);
l1 = n_bytes - l0;
@ -500,16 +499,16 @@ static int flush_data(struct impl *this, uint64_t now_time)
if (n_bytes <= 0)
break;
n_frames = n_bytes / this->frame_size;
n_frames = n_bytes / port->frame_size;
this->ready_offset += n_bytes;
port->ready_offset += n_bytes;
if (this->ready_offset >= d[0].chunk->size) {
if (port->ready_offset >= d[0].chunk->size) {
spa_list_remove(&b->link);
b->outstanding = true;
spa_log_trace(this->log, "a2dp-sink %p: reuse buffer %u", this, b->id);
this->callbacks->reuse_buffer(this->callbacks_data, 0, b->id);
this->ready_offset = 0;
port->ready_offset = 0;
}
total_frames += n_frames;
@ -546,7 +545,7 @@ static int flush_data(struct impl *this, uint64_t now_time)
else
elapsed = 0;
elapsed = elapsed * this->current_format.info.raw.rate / SPA_NSEC_PER_SEC;
elapsed = elapsed * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC;
queued = this->sample_time - elapsed;
@ -559,7 +558,7 @@ static int flush_data(struct impl *this, uint64_t now_time)
this->sample_time = queued;
this->start_time = now_time;
}
if (!spa_list_is_empty(&this->ready) &&
if (!spa_list_is_empty(&port->ready) &&
now_time - this->last_error > SPA_NSEC_PER_SEC / 2) {
reduce_bitpool(this);
this->last_error = now_time;
@ -568,7 +567,7 @@ static int flush_data(struct impl *this, uint64_t now_time)
}
calc_timeout(queued,
FILL_FRAMES * this->write_samples,
this->current_format.info.raw.rate,
port->current_format.info.raw.rate,
&this->now, &ts.it_value);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
@ -605,9 +604,10 @@ static void a2dp_on_flush(struct spa_source *source)
static void a2dp_on_timeout(struct spa_source *source)
{
struct impl *this = source->data;
struct port *port = &this->port;
int err;
uint64_t exp, now_time;
struct spa_io_buffers *io = this->io;
struct spa_io_buffers *io = port->io;
if (this->started && read(this->timerfd, &exp, sizeof(uint64_t)) != sizeof(uint64_t))
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
@ -624,14 +624,14 @@ static void a2dp_on_timeout(struct spa_source *source)
this->start_time = now_time;
}
if (spa_list_is_empty(&this->ready)) {
if (spa_list_is_empty(&port->ready)) {
spa_log_trace(this->log, "a2dp-sink %p: %d", this, io->status);
io->status = SPA_STATUS_NEED_BUFFER;
if (this->range) {
this->range->offset = this->sample_count * this->frame_size;
this->range->min_size = this->threshold * this->frame_size;
this->range->max_size = this->write_samples * this->frame_size;
if (port->range) {
port->range->offset = this->sample_count * port->frame_size;
port->range->min_size = this->threshold * port->frame_size;
port->range->max_size = this->write_samples * port->frame_size;
}
this->callbacks->ready(this->callbacks_data, SPA_STATUS_NEED_BUFFER);
}
@ -823,18 +823,20 @@ static int do_stop(struct impl *this)
static int impl_node_send_command(struct spa_node *node, const struct spa_command *command)
{
struct impl *this;
struct port *port;
int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(command != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
port = &this->port;
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Start:
if (!this->have_format)
if (!port->have_format)
return -EIO;
if (this->n_buffers == 0)
if (port->n_buffers == 0)
return -EIO;
if ((res = do_start(this)) < 0)
@ -857,23 +859,18 @@ static const struct spa_dict_item node_info_items[] = {
static void emit_node_info(struct impl *this)
{
if (this->callbacks && this->callbacks->info) {
struct spa_node_info info;
info = SPA_NODE_INFO_INIT();
info.max_input_ports = 1;
info.change_mask = SPA_NODE_CHANGE_MASK_PROPS;
info.props = &SPA_DICT_INIT_ARRAY(node_info_items);
this->callbacks->info(this->callbacks_data, &info);
if (this->callbacks && this->callbacks->info && this->info.change_mask) {
this->info.props = &SPA_DICT_INIT_ARRAY(node_info_items);
this->callbacks->info(this->callbacks_data, &this->info);
this->info.change_mask = 0;
}
}
static void emit_port_info(struct impl *this)
static void emit_port_info(struct impl *this, struct port *port)
{
if (this->callbacks && this->callbacks->port_info && this->info.change_mask) {
this->callbacks->port_info(this->callbacks_data, SPA_DIRECTION_INPUT, 0, &this->info);
this->info.change_mask = 0;
if (this->callbacks && this->callbacks->port_info && port->info.change_mask) {
this->callbacks->port_info(this->callbacks_data, SPA_DIRECTION_INPUT, 0, &port->info);
port->info.change_mask = 0;
}
}
@ -892,7 +889,7 @@ impl_node_set_callbacks(struct spa_node *node,
this->callbacks_data = data;
emit_node_info(this);
emit_port_info(this);
emit_port_info(this, &this->port);
return 0;
}
@ -916,6 +913,7 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
{
struct impl *this;
struct port *port;
struct spa_pod *param;
struct spa_pod_builder b = { 0 };
uint8_t buffer[1024];
@ -930,6 +928,7 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
spa_return_val_if_fail(this->callbacks && this->callbacks->result, -EIO);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = &this->port;
result.id = id;
result.next = start;
@ -939,21 +938,6 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
spa_pod_builder_init(&b, buffer, sizeof(buffer));
switch (id) {
case SPA_PARAM_List:
{
uint32_t list[] = { SPA_PARAM_EnumFormat,
SPA_PARAM_Format,
SPA_PARAM_Buffers,
SPA_PARAM_Meta };
if (result.index < SPA_N_ELEMENTS(list))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
SPA_PARAM_LIST_id, SPA_POD_Id(list[result.index]));
else
return 0;
break;
}
case SPA_PARAM_EnumFormat:
if (result.index > 0)
return 0;
@ -999,16 +983,16 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
break;
case SPA_PARAM_Format:
if (!this->have_format)
if (!port->have_format)
return -EIO;
if (result.index > 0)
return 0;
param = spa_format_audio_raw_build(&b, id, &this->current_format.info.raw);
param = spa_format_audio_raw_build(&b, id, &port->current_format.info.raw);
break;
case SPA_PARAM_Buffers:
if (!this->have_format)
if (!port->have_format)
return -EIO;
if (result.index > 0)
return 0;
@ -1018,17 +1002,14 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 2, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->props.min_latency * this->frame_size,
this->props.min_latency * this->frame_size,
INT32_MAX),
this->props.min_latency * port->frame_size,
this->props.min_latency * port->frame_size,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(0),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(16));
break;
case SPA_PARAM_Meta:
if (!this->have_format)
return -EIO;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
@ -1057,28 +1038,26 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
return 0;
}
static int clear_buffers(struct impl *this)
static int clear_buffers(struct impl *this, struct port *port)
{
do_stop(this);
if (this->n_buffers > 0) {
spa_list_init(&this->ready);
this->n_buffers = 0;
if (port->n_buffers > 0) {
spa_list_init(&port->ready);
port->n_buffers = 0;
}
return 0;
}
static int port_set_format(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
static int port_set_format(struct impl *this, struct port *port,
uint32_t flags,
const struct spa_pod *format)
{
struct impl *this = SPA_CONTAINER_OF(node, struct impl, node);
int err;
if (format == NULL) {
spa_log_info(this->log, "clear format");
clear_buffers(this);
this->have_format = false;
clear_buffers(this, port);
port->have_format = false;
} else {
struct spa_audio_info info = { 0 };
@ -1092,16 +1071,26 @@ static int port_set_format(struct spa_node *node,
if (spa_format_audio_raw_parse(format, &info.info.raw) < 0)
return -EINVAL;
this->frame_size = info.info.raw.channels * 2;
port->frame_size = info.info.raw.channels * 2;
port->current_format = info;
port->have_format = true;
this->threshold = this->props.min_latency;
this->current_format = info;
this->have_format = true;
}
if (this->have_format) {
this->info.flags = SPA_PORT_FLAG_CAN_USE_BUFFERS | SPA_PORT_FLAG_LIVE;
this->info.rate = this->current_format.info.raw.rate;
if (port->have_format) {
port->info.change_mask |= SPA_PORT_CHANGE_MASK_FLAGS;
port->info.flags = SPA_PORT_FLAG_CAN_USE_BUFFERS | SPA_PORT_FLAG_LIVE;
port->info.change_mask |= SPA_PORT_CHANGE_MASK_RATE;
port->info.rate = port->current_format.info.raw.rate;
port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_READWRITE);
port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, SPA_PARAM_INFO_READ);
} else {
port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
}
emit_port_info(this, port);
return 0;
}
@ -1112,12 +1101,17 @@ impl_node_port_set_param(struct spa_node *node,
uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
struct impl *this;
struct port *port;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_return_val_if_fail(CHECK_PORT(node, direction, port_id), -EINVAL);
port = &this->port;
if (id == SPA_PARAM_Format) {
return port_set_format(node, direction, port_id, flags, param);
return port_set_format(this, port, flags, param);
}
else
return -ENOENT;
@ -1129,23 +1123,24 @@ impl_node_port_use_buffers(struct spa_node *node,
uint32_t port_id, struct spa_buffer **buffers, uint32_t n_buffers)
{
struct impl *this;
struct port *port;
uint32_t i;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = &this->port;
spa_log_info(this->log, "use buffers %d", n_buffers);
if (!this->have_format)
if (!port->have_format)
return -EIO;
clear_buffers(this);
clear_buffers(this, port);
for (i = 0; i < n_buffers; i++) {
struct buffer *b = &this->buffers[i];
struct buffer *b = &port->buffers[i];
uint32_t type;
b->buf = buffers[i];
@ -1161,9 +1156,9 @@ impl_node_port_use_buffers(struct spa_node *node,
spa_log_error(this->log, NAME " %p: need mapped memory", this);
return -EINVAL;
}
this->threshold = buffers[i]->datas[0].maxsize / this->frame_size;
this->threshold = buffers[i]->datas[0].maxsize / port->frame_size;
}
this->n_buffers = n_buffers;
port->n_buffers = n_buffers;
return 0;
}
@ -1178,6 +1173,7 @@ impl_node_port_alloc_buffers(struct spa_node *node,
uint32_t *n_buffers)
{
struct impl *this;
struct port *port;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(buffers != NULL, -EINVAL);
@ -1185,8 +1181,9 @@ impl_node_port_alloc_buffers(struct spa_node *node,
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = &this->port;
if (!this->have_format)
if (!port->have_format)
return -EIO;
return -ENOTSUP;
@ -1200,19 +1197,21 @@ impl_node_port_set_io(struct spa_node *node,
void *data, size_t size)
{
struct impl *this;
struct port *port;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = &this->port;
switch (id) {
case SPA_IO_Buffers:
this->io = data;
port->io = data;
break;
case SPA_IO_Range:
this->range = data;
port->range = data;
break;
default:
return -ENOENT;
@ -1228,30 +1227,32 @@ static int impl_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id,
static int impl_node_process(struct spa_node *node)
{
struct impl *this;
struct spa_io_buffers *input;
struct port *port;
struct spa_io_buffers *io;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
input = this->io;
spa_return_val_if_fail(input != NULL, -EIO);
port = &this->port;
io = port->io;
spa_return_val_if_fail(io != NULL, -EIO);
if (input->status == SPA_STATUS_HAVE_BUFFER && input->buffer_id < this->n_buffers) {
struct buffer *b = &this->buffers[input->buffer_id];
if (io->status == SPA_STATUS_HAVE_BUFFER && io->buffer_id < port->n_buffers) {
struct buffer *b = &port->buffers[io->buffer_id];
uint64_t now_time;
if (!b->outstanding) {
spa_log_warn(this->log, NAME " %p: buffer %u in use", this, input->buffer_id);
input->status = -EINVAL;
spa_log_warn(this->log, NAME " %p: buffer %u in use", this, io->buffer_id);
io->status = -EINVAL;
return -EINVAL;
}
spa_log_trace(this->log, NAME " %p: queue buffer %u", this, input->buffer_id);
spa_log_trace(this->log, NAME " %p: queue buffer %u", this, io->buffer_id);
spa_list_append(&this->ready, &b->link);
spa_list_append(&port->ready, &b->link);
b->outstanding = false;
this->threshold = SPA_MIN(b->buf->datas[0].chunk->size / this->frame_size,
this->threshold = SPA_MIN(b->buf->datas[0].chunk->size / port->frame_size,
this->props.max_latency);
clock_gettime(CLOCK_MONOTONIC, &this->now);
@ -1259,7 +1260,7 @@ static int impl_node_process(struct spa_node *node)
flush_data(this, now_time);
input->status = SPA_STATUS_OK;
io->status = SPA_STATUS_OK;
}
return SPA_STATUS_HAVE_BUFFER;
}
@ -1319,6 +1320,7 @@ impl_init(const struct spa_handle_factory *factory,
uint32_t n_support)
{
struct impl *this;
struct port *port;
uint32_t i;
spa_return_val_if_fail(factory != NULL, -EINVAL);
@ -1349,11 +1351,28 @@ impl_init(const struct spa_handle_factory *factory,
this->node = impl_node;
reset_props(&this->props);
this->info = SPA_PORT_INFO_INIT();
this->info.change_mask = SPA_PORT_CHANGE_MASK_FLAGS;
this->info.flags = SPA_PORT_FLAG_CAN_USE_BUFFERS;
this->info = SPA_NODE_INFO_INIT();
this->info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS;
this->info.flags = SPA_NODE_FLAG_RT;
this->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
this->params[0] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);
this->params[1] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE);
this->info.params = this->params;
this->info.n_params = 2;
spa_list_init(&this->ready);
port = &this->port;
port->info = SPA_PORT_INFO_INIT();
port->info.change_mask |= SPA_PORT_CHANGE_MASK_FLAGS;
port->info.flags = SPA_PORT_FLAG_CAN_USE_BUFFERS;
port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
port->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ);
port->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ);
port->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ);
port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
port->info.params = port->params;
port->info.n_params = 5;
spa_list_init(&port->ready);
for (i = 0; info && i < info->n_items; i++) {
if (strcmp(info->items[i].key, "bluez5.transport") == 0)