seq: rework port handling

Dynamically allocate ports as we need them.
Use port lists to iterate active ports.
This commit is contained in:
Wim Taymans 2025-07-30 17:32:02 +02:00
parent 8f45cfcbc9
commit 6d07eaea1f
3 changed files with 102 additions and 71 deletions

View file

@ -294,14 +294,10 @@ static void emit_port_info(struct seq_state *this, struct seq_port *port, bool f
static void emit_stream_info(struct seq_state *this, struct seq_stream *stream, bool full)
{
uint32_t i;
for (i = 0; i < MAX_PORTS; i++) {
struct seq_port *port = &stream->ports[i];
if (port->valid)
struct seq_port *port;
spa_list_for_each(port, &stream->port_list, link)
emit_port_info(this, port, full);
}
}
static int
impl_node_add_listener(void *object,
@ -353,11 +349,9 @@ static int impl_node_sync(void *object, int seq)
static struct seq_port *find_port(struct seq_state *state,
struct seq_stream *stream, const snd_seq_addr_t *addr)
{
uint32_t i;
for (i = 0; i < stream->last_port; i++) {
struct seq_port *port = &stream->ports[i];
if (port->valid &&
port->addr.client == addr->client &&
struct seq_port *port;
spa_list_for_each(port, &stream->port_list, link) {
if (port->addr.client == addr->client &&
port->addr.port == addr->port)
return port;
}
@ -366,32 +360,36 @@ static struct seq_port *find_port(struct seq_state *state,
static struct seq_port *alloc_port(struct seq_state *state, struct seq_stream *stream)
{
struct seq_port *port;
uint32_t i;
for (i = 0; i < MAX_PORTS; i++) {
struct seq_port *port = &stream->ports[i];
if (!port->valid) {
if (stream->ports[i] == NULL)
break;
}
if (i == MAX_PORTS)
return NULL;
if (!spa_list_is_empty(&state->free_list)) {
port = spa_list_first(&state->free_list, struct seq_port, link);
spa_list_remove(&port->link);
} else {
port = calloc(1, sizeof(struct seq_port));
if (port == NULL)
return NULL;
}
port->id = i;
port->direction = stream->direction;
port->valid = true;
if (stream->last_port < i + 1)
stream->last_port = i + 1;
stream->ports[i] = port;
spa_list_append(&stream->port_list, &port->link);
return port;
}
}
return NULL;
}
static void free_port(struct seq_state *state, struct seq_stream *stream, struct seq_port *port)
{
port->valid = false;
if (port->id + 1 == stream->last_port) {
int i;
for (i = stream->last_port - 1; i >= 0; i--)
if (stream->ports[i].valid)
break;
stream->last_port = i + 1;
}
stream->ports[port->id] = NULL;
spa_list_remove(&port->link);
spa_list_append(&state->free_list, &port->link);
spa_node_emit_port_info(&state->hooks,
port->direction, port->id, NULL);
@ -751,6 +749,36 @@ impl_node_port_use_buffers(void *object,
return 0;
}
struct io_info {
struct seq_state *state;
struct seq_port *port;
void *data;
size_t size;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
struct seq_port *port = info->port;
struct seq_stream *stream = &info->state->streams[port->direction];
if (info->data == NULL || info->size < sizeof(struct spa_io_buffers)) {
port->io = NULL;
if (port->mixing) {
spa_list_remove(&port->mix_link);
port->mixing = false;
}
} else {
port->io = info->data;
if (!port->mixing) {
spa_list_append(&stream->mix_list, &port->mix_link);
port->mixing = true;
}
}
return 0;
}
static int
impl_node_port_set_io(void *object,
enum spa_direction direction,
@ -760,19 +788,25 @@ impl_node_port_set_io(void *object,
{
struct seq_state *this = object;
struct seq_port *port;
struct io_info info;
spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id);
info.state = this;
info.port = port;
info.data = data;
info.size = size;
spa_log_debug(this->log, "%p: io %d.%d %d %p %zd", this,
direction, port_id, id, data, size);
switch (id) {
case SPA_IO_Buffers:
port->io = data;
spa_loop_locked(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, &info);
break;
default:
return -ENOENT;

View file

@ -167,13 +167,17 @@ static int init_stream(struct seq_state *state, enum spa_direction direction)
return res;
}
snd_midi_event_no_status(stream->codec, 1);
memset(stream->ports, 0, sizeof(stream->ports));
spa_list_init(&stream->port_list);
spa_list_init(&stream->mix_list);
return 0;
}
static int uninit_stream(struct seq_state *state, enum spa_direction direction)
{
struct seq_stream *stream = &state->streams[direction];
spa_list_insert_list(&state->free_list, &stream->port_list);
spa_list_insert_list(&state->free_list, &stream->mix_list);
if (stream->codec)
snd_midi_event_free(stream->codec);
stream->codec = NULL;
@ -352,6 +356,7 @@ int spa_alsa_seq_open(struct seq_state *state)
if (state->opened)
return 0;
spa_list_init(&state->free_list);
init_stream(state, SPA_DIRECTION_INPUT);
init_stream(state, SPA_DIRECTION_OUTPUT);
@ -463,6 +468,7 @@ error_close:
int spa_alsa_seq_close(struct seq_state *state)
{
int res = 0;
struct seq_port *port;
if (!state->opened)
return 0;
@ -475,6 +481,10 @@ int spa_alsa_seq_close(struct seq_state *state)
uninit_stream(state, SPA_DIRECTION_INPUT);
uninit_stream(state, SPA_DIRECTION_OUTPUT);
spa_list_consume(port, &state->free_list, link) {
spa_list_remove(&port->link);
free(port);
}
spa_system_close(state->data_system, state->timerfd);
state->opened = false;
@ -497,11 +507,9 @@ static int set_timeout(struct seq_state *state, uint64_t time)
static struct seq_port *find_port(struct seq_state *state,
struct seq_stream *stream, const snd_seq_addr_t *addr)
{
uint32_t i;
for (i = 0; i < stream->last_port; i++) {
struct seq_port *port = &stream->ports[i];
if (port->valid &&
port->addr.client == addr->client &&
struct seq_port *port;
spa_list_for_each(port, &stream->mix_list, mix_link) {
if (port->addr.client == addr->client &&
port->addr.port == addr->port)
return port;
}
@ -595,15 +603,10 @@ static int prepare_buffer(struct seq_state *state, struct seq_port *port)
static int process_recycle(struct seq_state *state)
{
struct seq_stream *stream = &state->streams[SPA_DIRECTION_OUTPUT];
uint32_t i;
struct seq_port *port;
for (i = 0; i < stream->last_port; i++) {
struct seq_port *port = &stream->ports[i];
spa_list_for_each(port, &stream->mix_list, mix_link) {
struct spa_io_buffers *io = port->io;
if (!port->valid || io == NULL)
continue;
if (io->status != SPA_STATUS_HAVE_DATA &&
io->buffer_id < port->n_buffers) {
spa_alsa_seq_recycle_buffer(state, port, io->buffer_id);
@ -620,17 +623,16 @@ static int process_read(struct seq_state *state)
{
struct seq_stream *stream = &state->streams[SPA_DIRECTION_OUTPUT];
const bool ump = state->ump;
uint32_t i;
uint32_t *data;
uint8_t midi1_data[MAX_EVENT_SIZE];
uint32_t ump_data[MAX_EVENT_SIZE];
long size;
int res = -1;
struct seq_port *port;
/* copy all new midi events into their port buffers */
while (1) {
const snd_seq_addr_t *addr;
struct seq_port *port;
uint64_t ev_time, diff;
uint32_t offset;
void *event;
@ -753,13 +755,9 @@ done:
/* prepare a buffer on each port, some ports might have their
* buffer filled above */
res = 0;
for (i = 0; i < stream->last_port; i++) {
struct seq_port *port = &stream->ports[i];
spa_list_for_each(port, &stream->mix_list, mix_link) {
struct spa_io_buffers *io = port->io;
if (!port->valid || io == NULL)
continue;
if (prepare_buffer(state, port) >= 0) {
spa_pod_builder_pop(&port->builder, &port->frame);
@ -809,11 +807,10 @@ static int process_write(struct seq_state *state)
{
struct seq_stream *stream = &state->streams[SPA_DIRECTION_INPUT];
const bool ump = state->ump;
uint32_t i;
int err, res = 0;
struct seq_port *port;
for (i = 0; i < stream->last_port; i++) {
struct seq_port *port = &stream->ports[i];
spa_list_for_each(port, &stream->mix_list, mix_link) {
struct spa_io_buffers *io = port->io;
struct buffer *buffer;
struct spa_pod_sequence *pod;
@ -823,9 +820,6 @@ static int process_write(struct seq_state *state)
snd_seq_real_time_t out_rt;
bool first = true;
if (!port->valid || io == NULL)
continue;
if (io->status != SPA_STATUS_HAVE_DATA ||
io->buffer_id >= port->n_buffers)
continue;
@ -834,7 +828,7 @@ static int process_write(struct seq_state *state)
d = &buffer->buf->datas[0];
io->status = SPA_STATUS_NEED_DATA;
spa_node_call_reuse_buffer(&state->callbacks, i, io->buffer_id);
spa_node_call_reuse_buffer(&state->callbacks, port->id, io->buffer_id);
res |= SPA_STATUS_NEED_DATA;
pod = spa_pod_from_data(d->data, d->maxsize, d->chunk->offset, d->chunk->size);
@ -1078,15 +1072,12 @@ static void reset_buffers(struct seq_state *this, struct seq_port *port)
}
static void reset_stream(struct seq_state *this, struct seq_stream *stream, bool active)
{
uint32_t i;
for (i = 0; i < stream->last_port; i++) {
struct seq_port *port = &stream->ports[i];
if (port->valid) {
struct seq_port *port;
spa_list_for_each(port, &stream->port_list, link) {
reset_buffers(this, port);
spa_alsa_seq_activate_port(this, port, active);
}
}
}
static int set_timers(struct seq_state *state)
{

View file

@ -53,6 +53,8 @@ struct buffer {
};
struct seq_port {
struct spa_list link;
uint32_t id;
enum spa_direction direction;
snd_seq_addr_t addr;
@ -82,18 +84,21 @@ struct seq_port {
struct spa_audio_info current_format;
unsigned int have_format:1;
unsigned int valid:1;
unsigned int active:1;
struct spa_latency_info latency[2];
unsigned int mixing:1;
struct spa_list mix_link;
};
struct seq_stream {
enum spa_direction direction;
unsigned int caps;
snd_midi_event_t *codec;
struct seq_port ports[MAX_PORTS];
uint32_t last_port;
struct seq_port *ports[MAX_PORTS];
struct spa_list port_list;
struct spa_list mix_list;
};
struct seq_conn {
@ -159,17 +164,18 @@ struct seq_state {
unsigned int ump:1;
struct seq_stream streams[2];
struct spa_list free_list;
struct spa_dll dll;
};
#define VALID_DIRECTION(this,d) ((d) == SPA_DIRECTION_INPUT || (d) == SPA_DIRECTION_OUTPUT)
#define VALID_PORT(this,d,p) ((p) < MAX_PORTS && this->streams[d].ports[p].id == (p))
#define VALID_PORT(this,d,p) ((p) < MAX_PORTS && this->streams[d].ports[p] != NULL)
#define CHECK_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && VALID_PORT(this,d,p))
#define CHECK_OUT_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && VALID_PORT(this,d,p))
#define CHECK_PORT(this,d,p) (VALID_DIRECTION(this,d) && VALID_PORT(this,d,p))
#define GET_PORT(this,d,p) (&this->streams[d].ports[p])
#define GET_PORT(this,d,p) (this->streams[d].ports[p])
int spa_alsa_seq_open(struct seq_state *state);
int spa_alsa_seq_close(struct seq_state *state);