mixer-dsp: rework the port management

Keep a list of active ports in the port_list. These are all ports added
with add_port and not yet removed. When a port is removed, move it to
the free_list and reuse the port later when needed.

Update a mix_list of ports when a valid io is set on a port. This then
makes it possible to more efficiently and safely iterate the ports in
the processing loop.
This commit is contained in:
Wim Taymans 2025-07-18 16:43:22 +02:00
parent ec5d2d2a29
commit 5cd7b1de16

View file

@ -56,6 +56,8 @@ struct buffer {
};
struct port {
struct spa_list link;
uint32_t direction;
uint32_t id;
@ -67,7 +69,6 @@ struct port {
struct spa_port_info info;
struct spa_param_info params[8];
unsigned int valid:1;
unsigned int have_format:1;
struct buffer buffers[MAX_BUFFERS];
@ -75,6 +76,9 @@ struct port {
struct spa_list queue;
size_t queued_bytes;
struct spa_list mix_link;
bool active:1;
};
struct impl {
@ -100,10 +104,10 @@ struct impl {
struct spa_hook_list hooks;
uint32_t port_count;
uint32_t last_port;
struct port *in_ports[MAX_PORTS];
struct port out_ports[1];
struct spa_list port_list;
struct spa_list free_list;
struct buffer *mix_buffers[MAX_PORTS];
const void *mix_datas[MAX_PORTS];
@ -114,12 +118,13 @@ struct impl {
unsigned int have_format:1;
unsigned int started:1;
struct spa_list mix_list;
};
#define PORT_VALID(p) ((p) != NULL && (p)->valid)
#define CHECK_ANY_IN(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == SPA_ID_INVALID)
#define CHECK_FREE_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && !PORT_VALID(this->in_ports[(p)]))
#define CHECK_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && PORT_VALID(this->in_ports[(p)]))
#define CHECK_FREE_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && this->in_ports[(p)] == NULL)
#define CHECK_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && this->in_ports[(p)] != NULL)
#define CHECK_OUT_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
#define CHECK_PORT(this,d,p) (CHECK_OUT_PORT(this,d,p) || CHECK_IN_PORT (this,d,p))
#define CHECK_PORT_ANY(this,d,p) (CHECK_ANY_IN(this,d,p) || CHECK_PORT(this,d,p))
@ -205,7 +210,7 @@ static int impl_node_add_listener(void *object,
{
struct impl *this = object;
struct spa_hook_list save;
uint32_t i;
struct port *port;
spa_return_val_if_fail(this != NULL, -EINVAL);
@ -213,10 +218,8 @@ static int impl_node_add_listener(void *object,
emit_node_info(this, true);
emit_port_info(this, GET_OUT_PORT(this, 0), true);
for (i = 0; i < this->last_port; i++) {
if (PORT_VALID(this->in_ports[i]))
emit_port_info(this, GET_IN_PORT(this, i), true);
}
spa_list_for_each(port, &this->port_list, link)
emit_port_info(this, port, true);
spa_hook_list_join(&this->hooks, &save);
@ -231,6 +234,18 @@ impl_node_set_callbacks(void *object,
return 0;
}
static struct port *get_free_port(struct impl *this)
{
struct port *port;
if (!spa_list_is_empty(&this->free_list)) {
port = spa_list_first(&this->free_list, struct port, link);
spa_list_remove(&port->link);
} else {
port = calloc(1, sizeof(struct port));
}
return port;
}
static int impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id,
const struct spa_dict *props)
{
@ -240,13 +255,8 @@ static int impl_node_add_port(void *object, enum spa_direction direction, uint32
spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(CHECK_FREE_IN_PORT(this, direction, port_id), -EINVAL);
port = GET_IN_PORT (this, port_id);
if (port == NULL) {
port = calloc(1, sizeof(struct port));
if (port == NULL)
return -errno;
this->in_ports[port_id] = port;
}
if ((port = get_free_port(this)) == NULL)
return -errno;
port->direction = direction;
port->id = port_id;
@ -269,13 +279,10 @@ static int impl_node_add_port(void *object, enum spa_direction direction, uint32
port->info.params = port->params;
port->info.n_params = 5;
this->port_count++;
if (this->last_port <= port_id)
this->last_port = port_id + 1;
port->valid = true;
this->in_ports[port_id] = port;
spa_list_append(&this->port_list, &port->link);
spa_log_debug(this->log, "%p: add port %d:%d %d", this,
direction, port_id, this->last_port);
spa_log_debug(this->log, "%p: add port %d:%d", this, direction, port_id);
emit_port_info(this, port, true);
return 0;
@ -291,26 +298,18 @@ impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_
spa_return_val_if_fail(CHECK_IN_PORT(this, direction, port_id), -EINVAL);
port = GET_IN_PORT (this, port_id);
this->in_ports[port_id] = NULL;
spa_list_remove(&port->link);
port->valid = false;
this->port_count--;
if (port->have_format && this->have_format) {
if (--this->n_formats == 0)
this->have_format = false;
}
spa_memzero(port, sizeof(struct port));
spa_list_append(&this->free_list, &port->link);
if (port_id + 1 == this->last_port) {
int i;
for (i = this->last_port - 1; i >= 0; i--)
if (PORT_VALID(GET_IN_PORT(this, i)))
break;
this->last_port = i + 1;
}
spa_log_debug(this->log, "%p: remove port %d:%d %d", this,
direction, port_id, this->last_port);
spa_log_debug(this->log, "%p: remove port %d:%d", this,
direction, port_id);
spa_node_emit_port_info(&this->hooks, direction, port_id, NULL);
@ -633,6 +632,7 @@ impl_node_port_use_buffers(void *object,
}
struct io_info {
struct impl *impl;
struct port *port;
void *data;
size_t size;
@ -642,16 +642,28 @@ static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
if (info->size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = info->data;
info->port->io[0] = &ab->buffers[info->port->direction];
info->port->io[1] = &ab->buffers[info->port->direction^1];
} else if (info->size >= sizeof(struct spa_io_buffers)) {
info->port->io[0] = info->data;
info->port->io[1] = info->data;
struct port *port = info->port;
if (info->data == NULL || info->size < sizeof(struct spa_io_buffers)) {
port->io[0] = NULL;
port->io[1] = NULL;
if (port->active) {
spa_list_remove(&port->mix_link);
port->active = false;
}
} else {
info->port->io[0] = NULL;
info->port->io[1] = NULL;
if (info->size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = info->data;
port->io[0] = &ab->buffers[port->direction];
port->io[1] = &ab->buffers[port->direction^1];
} else {
port->io[0] = info->data;
port->io[1] = info->data;
}
if (!port->active) {
spa_list_append(&info->impl->mix_list, &port->mix_link);
port->active = true;
}
}
return 0;
}
@ -673,6 +685,7 @@ impl_node_port_set_io(void *object,
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id);
info.impl = this;
info.port = port;
info.data = data;
info.size = size;
@ -707,9 +720,9 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t
static int impl_node_process(void *object)
{
struct impl *this = object;
struct port *outport;
struct port *outport, *inport;
struct spa_io_buffers *outio;
uint32_t n_buffers, i, maxsize;
uint32_t n_buffers, maxsize;
struct buffer **buffers;
struct buffer *outb;
const void **datas;
@ -739,19 +752,12 @@ static int impl_node_process(void *object)
maxsize = UINT32_MAX;
for (i = 0; i < this->last_port; i++) {
struct port *inport = GET_IN_PORT(this, i);
struct spa_io_buffers *inio = NULL;
spa_list_for_each(inport, &this->mix_list, mix_link) {
struct spa_io_buffers *inio = inport->io[cycle];
struct buffer *inb;
struct spa_data *bd;
uint32_t size, offs;
if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d",
this, i, PORT_VALID(inport),
inport->io[0], inport->io[1], cycle);
continue;
}
if (inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d "
@ -851,14 +857,20 @@ static int impl_get_interface(struct spa_handle *handle, const char *type, void
static int impl_clear(struct spa_handle *handle)
{
struct impl *this;
uint32_t i;
struct port *port;
spa_return_val_if_fail(handle != NULL, -EINVAL);
this = (struct impl *) handle;
for (i = 0; i < MAX_PORTS; i++)
free(this->in_ports[i]);
spa_list_consume(port, &this->port_list, link) {
spa_list_remove(&port->link);
free(port);
}
spa_list_consume(port, &this->free_list, link) {
spa_list_remove(&port->link);
free(port);
}
return 0;
}
@ -911,6 +923,9 @@ impl_init(const struct spa_handle_factory *factory,
}
spa_hook_list_init(&this->hooks);
spa_list_init(&this->port_list);
spa_list_init(&this->free_list);
spa_list_init(&this->mix_list);
this->node.iface = SPA_INTERFACE_INIT(
SPA_TYPE_INTERFACE_Node,
@ -923,7 +938,6 @@ impl_init(const struct spa_handle_factory *factory,
this->info.flags = SPA_NODE_FLAG_RT | SPA_NODE_FLAG_IN_DYNAMIC_PORTS;
port = GET_OUT_PORT(this, 0);
port->valid = true;
port->direction = SPA_DIRECTION_OUTPUT;
port->id = 0;
port->info = SPA_PORT_INFO_INIT();