graph: add new scheduler

Move the activation state to the graph
Replace an old scheduler with a new one that works with the new
activation states.
Remove the DISABLED port flags, we explicitly add and remove to
make ports enabled/disabled.
Find first compatible port for link
Remove the node based scheduler and use the spa one.
This commit is contained in:
Wim Taymans 2018-03-14 11:52:13 +01:00
parent c547baf952
commit 6eea9247ea
15 changed files with 293 additions and 328 deletions

View file

@ -43,6 +43,10 @@
#define MAX_PORTS 256
#define MAX_BUFFERS 8
#define DEFAULT_CHANNELS 2
#define DEFAULT_SAMPLE_RATE 44100
#define DEFAULT_BUFFER_SIZE 1024
struct type {
struct spa_type_media_type media_type;
struct spa_type_media_subtype media_subtype;
@ -77,7 +81,6 @@ struct impl {
struct buffer {
#define BUFFER_FLAG_OUT (1<<0)
#define BUFFER_FLAG_FILLED (1<<1)
uint32_t flags;
struct spa_list link;
struct spa_buffer *outbuf;
@ -218,7 +221,7 @@ static void recycle_buffer(struct node *n, struct port *p, uint32_t id)
if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) {
pw_log_trace("recycle buffer %d", id);
spa_list_append(&p->queue, &b->link);
SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT | BUFFER_FLAG_FILLED);
SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
}
}
@ -242,7 +245,8 @@ static void conv_f32_s16(int16_t *out, int index, float *in, int n_samples, int
else if (in[i] >= 1.0f)
*out = 32767;
else
*out = lrintf(in[i] * 32767.0f);
//*out = lrintf(in[i] * 32767.0f);
*out = in[i] * 32767.0f;
out += stride;
}
}
@ -294,19 +298,16 @@ static int node_process_input(struct spa_node *node)
return -EPIPE;
}
if (SPA_FLAG_CHECK(out->flags, BUFFER_FLAG_FILLED)) {
dequeue_buffer(n, out);
outio->buffer_id = out->outbuf->id;
outio->status = SPA_STATUS_HAVE_BUFFER;
dequeue_buffer(n, out);
outio->buffer_id = out->outbuf->id;
outio->status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace(NAME " %p: output buffer %d %d", this, out->outbuf->id, out->flags);
out->outbuf->datas[0].chunk->offset = 0;
out->outbuf->datas[0].chunk->size = n->buffer_size * sizeof(int16_t) * n->channels;
out->outbuf->datas[0].chunk->stride = 0;
out->outbuf->datas[0].chunk->offset = 0;
out->outbuf->datas[0].chunk->size = n->buffer_size * sizeof(int16_t) * 2;
out->outbuf->datas[0].chunk->stride = 0;
}
else {
outio->buffer_id = SPA_ID_INVALID;
outio->status = SPA_STATUS_HAVE_BUFFER;
}
return outio->status;
}
@ -340,6 +341,7 @@ static int node_process_output(struct spa_node *node)
continue;
inio->status = SPA_STATUS_NEED_BUFFER;
pw_log_trace(NAME " %p: port %d %d", this, i, inio->buffer_id);
}
outio->status = SPA_STATUS_NEED_BUFFER;
}
@ -459,8 +461,8 @@ static int port_enum_params(struct spa_node *node,
param = spa_pod_builder_object(&b,
id, t->param_buffers.Buffers,
":", t->param_buffers.size, "iru", n->buffer_size * sizeof(float),
SPA_POD_PROP_MIN_MAX(24, 4096),
":", t->param_buffers.size, "i", n->buffer_size * sizeof(float),
// SPA_POD_PROP_MIN_MAX(24, 4096),
":", t->param_buffers.stride, "i", 0,
":", t->param_buffers.buffers, "ir", 2,
SPA_POD_PROP_MIN_MAX(1, MAX_BUFFERS),
@ -543,8 +545,6 @@ static int port_use_buffers(struct spa_node *node, enum spa_direction direction,
b->flags = 0;
b->outbuf = buffers[i];
n->buffer_size = d[0].maxsize / 4;
if ((d[0].type == t->data.MemPtr ||
d[0].type == t->data.MemFd ||
d[0].type == t->data.DmaBuf) && d[0].data != NULL) {
@ -637,18 +637,17 @@ static int schedule_mix_input(struct spa_node *_node)
struct spa_graph_node *node = &port->rt.mix_node;
struct spa_graph_port *gp;
struct spa_io_buffers *io = port->rt.mix_port.io;
size_t buffer_size = p->node->buffer_size;
size_t buffer_size = n->buffer_size;
struct buffer *outb;
float *out = NULL;
int layer = 0;
int stride = n->n_in_ports;
pw_log_trace("port %p", port);
spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) {
struct pw_port_mix *mix = SPA_CONTAINER_OF(gp, struct pw_port_mix, port);
if (gp->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
pw_log_trace("mix %p: input %d %d/%d", node,
gp->io->status, gp->io->buffer_id, mix->n_buffers);
@ -656,36 +655,32 @@ static int schedule_mix_input(struct spa_node *_node)
continue;
if (layer++ == 0) {
io->buffer_id = gp->io->buffer_id;
io->status = SPA_STATUS_HAVE_BUFFER;
out = mix->buffers[io->buffer_id]->datas[0].data;
out = mix->buffers[gp->io->buffer_id]->datas[0].data;
}
else {
add_f32(out, mix->buffers[gp->io->buffer_id]->datas[0].data, buffer_size);
}
pw_log_trace("mix %p: input %p %p->%p %d %d", node,
gp, gp->io, io, gp->io->status, gp->io->buffer_id);
pw_log_trace("mix %p: input %p %p->%p %d %d %zd", node,
gp, gp->io, io, gp->io->status, gp->io->buffer_id, buffer_size);
}
outb = peek_buffer(n, outp);
if (outb == NULL)
return -EPIPE;
if (layer == 0) {
if (layer > 0)
conv_f32_s16(outb->ptr, port->port_id, out, buffer_size, stride);
else {
fill_s16(outb->ptr, port->port_id, buffer_size, stride);
}
else {
conv_f32_s16(outb->ptr, port->port_id, out, buffer_size, stride);
SPA_FLAG_SET(outb->flags, BUFFER_FLAG_FILLED);
}
return SPA_STATUS_HAVE_BUFFER;
}
static int schedule_mix_output(struct spa_node *_node)
{
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct port *p = port->owner_data;
struct spa_graph_node *node = &port->rt.mix_node;
struct spa_graph_port *gp;
struct spa_io_buffers *io = port->rt.mix_port.io;
@ -693,9 +688,7 @@ static int schedule_mix_output(struct spa_node *_node)
spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) {
pw_log_trace("port %p: port %d %d %p->%p %d %d", port,
gp->port_id, gp->flags, io, gp->io, io->status, io->buffer_id);
if (gp->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
*gp->io = *io;
gp->io->status = io->status;
}
return io->status;
}
@ -708,9 +701,14 @@ static int schedule_mix_use_buffers(struct spa_node *_node,
{
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct port *p = port->owner_data;
struct node *n = p->node;
pw_log_debug("port %p: %d use buffers %d %p", port, port_id, n_buffers, buffers);
if (n_buffers > 0) {
n->buffer_size = buffers[0]->datas[0].maxsize / sizeof(float);
}
return 0;
}
@ -763,8 +761,10 @@ static struct port *make_port(struct node *n, enum pw_direction direction,
if (direction == PW_DIRECTION_INPUT) {
n->in_ports[id] = p;
n->n_in_ports++;
port->mix_node = schedule_mix_node;
port->mix = &port->mix_node;
if (flags & PORT_FLAG_RAW_F32) {
port->mix_node = schedule_mix_node;
port->mix = &port->mix_node;
}
} else {
n->out_ports[id] = p;
n->n_out_ports++;
@ -790,7 +790,7 @@ static struct pw_node *make_node(struct impl *impl, const struct pw_properties *
snprintf(node_name, sizeof(node_name), "system_%s", alias);
for (i = 0; node_name[i]; i++) {
if (node_name[i] == ':')
if (node_name[i] == ':' || node_name[i] == ',')
node_name[i] = '_';
}
if ((alias = pw_properties_get(props, "alsa.card")) == NULL)
@ -804,9 +804,9 @@ static struct pw_node *make_node(struct impl *impl, const struct pw_properties *
n->node = node;
n->impl = impl;
n->node_impl = node_impl;
n->channels = 2;
n->sample_rate = 44100;
n->buffer_size = 1024 / sizeof(float);
n->channels = DEFAULT_CHANNELS;
n->sample_rate = DEFAULT_SAMPLE_RATE;
n->buffer_size = DEFAULT_BUFFER_SIZE;
pw_node_set_implementation(node, &n->node_impl);
p = make_port(n, direction, 0, 0, NULL);
@ -964,6 +964,7 @@ static int module_init(struct pw_module *module, struct pw_properties *propertie
pw_core_add_listener(core, &impl->core_listener, &core_events, impl);
pw_module_add_listener(module, &impl->module_listener, &module_events, impl);
return 0;
}