scheduling: don't use the graph helpers.

Simplify the scheduling by using simple lists and removing the
subgraphs etc..

Make the driver node trigger all nodes it manages and when they
complete, trigger the driver node to finish the graph.
This commit is contained in:
Wim Taymans 2019-03-06 20:33:55 +01:00
parent f0eb59bc75
commit b357b7a7da
8 changed files with 231 additions and 264 deletions

@ -1 +1 @@
Subproject commit 53693d1e328c2dde8fc167579f077e2e876c78d4
Subproject commit 42b4529c4caac63d521ea343d2029f59b47929b6

View file

@ -1598,18 +1598,13 @@ static const struct pw_resource_events resource_events = {
.pong = client_node_resource_pong,
};
static int root_impl_process(void *data, struct spa_graph_node *node)
static int process_node(void *data)
{
struct impl *impl = data;
pw_log_trace("client-node %p: process", impl);
return spa_node_process(&impl->node.node);
}
static const struct spa_graph_node_callbacks root_impl = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
.process = root_impl_process,
};
/** Create a new client node
* \param client an owner \ref pw_client
* \param id an id
@ -1672,7 +1667,8 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
this->node->remote = true;
this->flags = 0;
spa_graph_node_set_callbacks(&this->node->rt.root, &root_impl, this);
this->node->rt.target.signal = process_node;
this->node->rt.target.data = impl;
pw_resource_add_listener(this->resource,
&impl->resource_listener,

View file

@ -858,7 +858,7 @@ static int impl_node_process(struct spa_node *node)
trigger = status & SPA_STATUS_HAVE_BUFFER;
if (trigger && !impl->this.node->driver)
spa_graph_node_process(&impl->client_node->node->rt.root);
impl->client_node->node->rt.target.signal(impl->client_node->node->rt.target.data);
return status;
}
@ -1173,34 +1173,11 @@ static void node_initialized(void *data)
pw_client_node_registered(impl->client_node, impl->this.node->global->id);
}
static void node_peer_added(void *data, struct pw_node *peer)
{
struct impl *impl = data;
pw_node_emit_peer_added(impl->client_node->node, peer);
}
static void node_peer_removed(void *data, struct pw_node *peer)
{
struct impl *impl = data;
pw_node_emit_peer_removed(impl->client_node->node, peer);
}
static void node_driver_changed(void *data, struct pw_node *old, struct pw_node *driver)
{
struct impl *impl = data;
pw_log_debug("client-stream %p: driver changed %p->%p", &impl->this, old, driver);
impl->client_node->node->driver_node = driver;
pw_node_emit_driver_changed(impl->client_node->node, old, driver);
}
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = node_destroy,
.free = node_free,
.initialized = node_initialized,
.peer_added = node_peer_added,
.peer_removed = node_peer_removed,
.driver_changed = node_driver_changed,
};
/** Create a new client stream

View file

@ -79,8 +79,7 @@ struct mix {
};
struct link {
struct spa_graph_link link;
struct pw_node_activation *activation;
struct pw_node_target target;
int signalfd;
uint32_t mem_id;
};
@ -154,7 +153,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
pw_log_trace("remote %p: process %p", data->remote, proxy);
spa_graph_node_process(&data->node->rt.root);
data->node->rt.target.signal(data->node->rt.target.data);
}
}
@ -269,7 +268,7 @@ do_deactivate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
spa_graph_port_remove(&mix->mix.port);
spa_list_remove(&mix->mix.rt_link);
return 0;
}
@ -290,7 +289,8 @@ do_activate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
spa_graph_port_add(&mix->port->rt.mix_node, &mix->mix.port);
spa_list_append(&mix->port->rt.mix_list, &mix->mix.rt_link);
return 0;
}
@ -985,15 +985,14 @@ client_node_set_activation(void *object,
if (ptr) {
struct link *link;
link = pw_array_add(&data->links, sizeof(struct link));
link->activation = ptr;
link->target.activation = ptr;
link->signalfd = signalfd;
link->link.signal = link_signal_func;
link->link.signal_data = link;
spa_graph_link_add(&node->rt.root, &link->activation->state[0], &link->link);
link->link.state->required--;
link->target.signal = link_signal_func;
link->target.data = link;
link->target.activation->state[0].required--;
pw_log_debug("node %p: required %d, pending %d", node,
link->link.state->required,
link->link.state->pending);
link->target.activation->state[0].required,
link->target.activation->state[0].pending);
} else {
}
@ -1149,9 +1148,6 @@ static int node_ready(void *d, int status)
pw_log_trace("node %p: ready driver:%d exported:%d status:%d", node,
node->driver, node->exported, status);
if (status == SPA_STATUS_HAVE_BUFFER)
spa_graph_node_process(&node->rt.root);
if (write(data->rtwritefd, &cmd, sizeof(cmd)) != sizeof(cmd))
pw_log_warn("node %p: write failed %m", node);

View file

@ -505,18 +505,17 @@ param_filter(struct pw_link *this,
static int port_set_io(struct pw_link *this, struct pw_port *port, uint32_t id,
void *data, size_t size, struct pw_port_mix *mix)
{
struct spa_graph_port *p = &mix->port;
int res = 0;
mix->io = data;
pw_log_debug("link %p: %s port %p %d.%d set io: %d %p %zd", this,
pw_direction_as_string(port->direction),
port, port->port_id, p->port_id, id, data, size);
port, port->port_id, mix->port.port_id, id, data, size);
if (port->mix->port_set_io) {
if ((res = spa_node_port_set_io(port->mix,
p->direction,
p->port_id,
mix->port.direction,
mix->port.port_id,
id, data, size)) < 0)
pw_log_warn("port %p: can't set io: %s", port, spa_strerror(res));
}
@ -762,21 +761,16 @@ do_activate_link(struct spa_loop *loop,
{
struct pw_link *this = user_data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct spa_graph_port *in, *out;
pw_log_trace("link %p: activate", this);
out = &this->rt.out_mix.port;
in = &this->rt.in_mix.port;
spa_graph_port_add(&this->output->rt.mix_node, out);
spa_graph_port_add(&this->input->rt.mix_node, in);
spa_graph_port_link(out, in);
spa_list_append(&this->output->rt.mix_list, &this->rt.out_mix.rt_link);
spa_list_append(&this->input->rt.mix_list, &this->rt.in_mix.rt_link);
if (impl->inode != impl->onode) {
spa_graph_link_add(&impl->onode->rt.root,
impl->inode->rt.root.state,
&this->rt.link);
this->rt.target.activation = impl->inode->rt.activation;
spa_list_append(&impl->onode->rt.target_list, &this->rt.target.link);
this->rt.target.activation->state[0].required++;
}
return 0;
}
@ -973,18 +967,16 @@ do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
struct spa_graph_port *in, *out;
in = &this->rt.in_mix.port;
out = &this->rt.out_mix.port;
pw_log_trace("link %p: disable %p and %p", this, &this->rt.in_mix, &this->rt.out_mix);
pw_log_trace("link %p: disable %p and %p", this, in, out);
spa_list_remove(&this->rt.out_mix.rt_link);
spa_list_remove(&this->rt.in_mix.rt_link);
spa_graph_port_unlink(out);
spa_graph_port_remove(out);
spa_graph_port_remove(in);
if (this->input->node != this->output->node)
spa_graph_link_remove(&this->rt.link);
if (this->input->node != this->output->node) {
spa_list_remove(&this->rt.target.link);
this->rt.target.activation->state[0].required--;
}
return 0;
}
@ -1208,18 +1200,6 @@ static void try_unlink_controls(struct impl *impl, struct pw_port *port, struct
}
}
static int link_signal_node(void *data)
{
struct impl *impl = data;
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
impl->onode->rt.activation->status = FINISHED;
impl->onode->rt.activation->finish_time = SPA_TIMESPEC_TO_NSEC(&ts);
pw_log_trace("link %p finish %p, process %p", impl, impl->inode, impl->onode);
return spa_graph_node_process(&impl->inode->rt.root);
}
SPA_EXPORT
struct pw_link *pw_link_new(struct pw_core *core,
struct pw_port *output,
@ -1297,9 +1277,6 @@ struct pw_link *pw_link_new(struct pw_core *core,
pw_port_init_mix(output, &this->rt.out_mix);
pw_port_init_mix(input, &this->rt.in_mix);
this->rt.link.signal = link_signal_node;
this->rt.link.signal_data = impl;
if (this->feedback) {
impl->inode = output_node;
impl->onode = input_node;
@ -1309,6 +1286,9 @@ struct pw_link *pw_link_new(struct pw_core *core,
impl->inode = input_node;
}
this->rt.target.signal = impl->inode->rt.target.signal;
this->rt.target.data = impl->inode->rt.target.data;
pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl,
output_node, output->port_id, this->rt.out_mix.port.port_id,
input_node, input->port_id, this->rt.in_mix.port.port_id);

View file

@ -52,14 +52,6 @@ struct impl {
struct pw_work_queue *work;
struct spa_graph driver_graph;
struct spa_graph_state driver_state;
struct spa_graph graph;
struct spa_graph_state graph_state;
struct pw_node_activation node_activation;
uint32_t next_position;
int last_error;
@ -90,17 +82,44 @@ static void node_deactivate(struct pw_node *this)
}
}
static void add_node(struct pw_node *this, struct pw_node *driver)
{
if (this->rt.driver_target.activation == NULL) {
pw_log_trace("node %p: add to driver %p", this, driver);
/* signal the driver */
this->rt.driver_target.activation = driver->rt.activation;
this->rt.driver_target.node = driver;
this->rt.driver_target.data = driver;
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
this->rt.driver_target.activation->state[0].required++;
spa_list_append(&driver->rt.target_list, &this->rt.target.link);
this->rt.activation->state[0].required++;
}
}
static void remove_node(struct pw_node *this)
{
if (this->rt.driver_target.activation != NULL) {
pw_log_trace("node %p: remove from driver %p", this,
this->rt.driver_target.data);
spa_list_remove(&this->rt.driver_target.link);
this->rt.driver_target.activation->state[0].required--;
this->rt.driver_target.activation = NULL;
spa_list_remove(&this->rt.target.link);
this->rt.activation->state[0].required--;
}
}
static int
do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
if (this->rt.root.graph != NULL) {
if (this->source.loop != NULL)
spa_loop_remove_source(loop, &this->source);
spa_graph_node_remove(&this->rt.root);
spa_graph_link_remove(&this->rt.driver_link);
this->rt.root.graph = NULL;
}
remove_node(this);
return 0;
}
@ -131,13 +150,9 @@ do_node_add(struct spa_loop *loop,
struct pw_node *this = user_data;
struct pw_node *driver = this->driver_node;
if (this->rt.root.graph == NULL) {
if (this->source.loop == NULL)
spa_loop_add_source(loop, &this->source);
spa_graph_node_add(driver->rt.driver, &this->rt.root);
spa_graph_link_add(&this->rt.root,
driver->rt.root.state,
&this->rt.driver_link);
}
add_node(this, driver);
return 0;
}
@ -449,28 +464,17 @@ do_move_nodes(struct spa_loop *loop,
struct impl *src = user_data;
struct impl *dst = *(struct impl **)data;
struct pw_node *this = &src->this, *driver = &dst->this;
struct spa_graph_node *n, *t;
struct pw_node_target *n, *t;
pw_log_trace("node %p: root %p driver:%p->%p", this,
&this->rt.root, src, dst);
pw_log_trace("node %p: driver:%p->%p", this, this, driver);
if (this->rt.root.graph != NULL) {
spa_graph_node_remove(&this->rt.root);
spa_graph_node_add(driver->rt.driver, &this->rt.root);
spa_graph_link_remove(&this->rt.driver_link);
spa_graph_link_add(&this->rt.root,
driver->rt.root.state,
&this->rt.driver_link);
}
remove_node(this);
add_node(this, driver);
spa_list_for_each_safe(n, t, &this->rt.driver->nodes, link) {
struct pw_node *pn = SPA_CONTAINER_OF(n, struct pw_node, rt.root);
spa_graph_node_remove(n);
spa_graph_node_add(driver->rt.driver, n);
spa_graph_link_remove(&pn->rt.driver_link);
spa_graph_link_add(&pn->rt.root,
driver->rt.root.state,
&pn->rt.driver_link);
spa_list_for_each_safe(n, t, &this->rt.target_list, link) {
struct pw_node *node = n->node;
remove_node(node);
add_node(node, driver);
}
return 0;
}
@ -577,6 +581,67 @@ static void check_properties(struct pw_node *node)
}
static int pw_node_trigger(struct pw_node *this, uint64_t nsec)
{
struct pw_node_target *t;
spa_debug("node %p: trigger %"PRIu64, this, nsec);
spa_list_for_each(t, &this->rt.target_list, link) {
struct pw_node_activation_state *state;
state = &t->activation->state[0];
spa_debug("node %p: state %p pending %d/%d", t->data, state,
state->pending, state->required);
if (pw_node_activation_state_dec(state, 1)) {
t->activation->status = TRIGGERED;
t->activation->signal_time = nsec;
t->signal(t->data);
}
}
return 0;
}
static inline int process_node(void *data)
{
struct pw_node *this = data;
struct timespec ts;
struct pw_port *p;
uint64_t nsec;
pw_log_trace("node %p: process", this);
clock_gettime(CLOCK_MONOTONIC, &ts);
this->rt.activation->status = AWAKE;
this->rt.activation->awake_time = SPA_TIMESPEC_TO_NSEC(&ts);
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
spa_node_process(p->mix);
spa_node_process(this->node);
spa_list_for_each(p, &this->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
clock_gettime(CLOCK_MONOTONIC, &ts);
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
this->rt.activation->status = FINISHED;
this->rt.activation->finish_time = nsec;
pw_log_trace("node %p: finish waiting:%"PRIu64" processing:%"PRIu64, this,
this->rt.activation->awake_time - this->rt.activation->signal_time,
this->rt.activation->finish_time - this->rt.activation->awake_time);
if (this != this->driver_node) {
pw_node_trigger(this, nsec);
} else {
pw_log_trace("node %p: graph completed", this);
}
return 0;
}
static void node_on_fd_events(struct spa_source *source)
{
struct pw_node *this = source->data;
@ -593,44 +658,10 @@ static void node_on_fd_events(struct spa_source *source)
pw_log_warn("node %p: read %"PRIu64" failed %m", this, cmd);
pw_log_trace("node %p: got process", this);
spa_graph_node_process(&this->rt.root);
this->rt.target.signal(this->rt.target.data);
}
}
static inline int root_impl_sub_process(void *data, struct spa_graph_node *node)
{
struct spa_graph *graph = node->subgraph;
struct pw_node *this = data;
struct timespec ts;
pw_log_trace("node %p: sub process %p", this, graph);
clock_gettime(CLOCK_MONOTONIC, &ts);
this->rt.activation->status = AWAKE;
this->rt.activation->awake_time = SPA_TIMESPEC_TO_NSEC(&ts);
return spa_graph_run(graph);
}
static const struct spa_graph_node_callbacks root_impl = {
SPA_VERSION_GRAPH_NODE_CALLBACKS,
.process = root_impl_sub_process,
};
static int signal_driver(void *data)
{
struct impl *impl = data;
struct pw_node *this = &impl->this;
struct pw_node *driver = this->driver_node;
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
this->rt.activation->status = FINISHED;
this->rt.activation->finish_time = SPA_TIMESPEC_TO_NSEC(&ts);
pw_log_trace("node %p process driver %p", this, driver);
return spa_graph_node_process(&driver->rt.root);
}
SPA_EXPORT
struct pw_node *pw_node_new(struct pw_core *core,
const char *name,
@ -702,26 +733,20 @@ struct pw_node *pw_node_new(struct pw_core *core,
spa_list_init(&this->output_ports);
pw_map_init(&this->output_port_map, 64, 64);
this->rt.driver = &impl->driver_graph;
spa_list_init(&this->rt.input_mix);
spa_list_init(&this->rt.output_mix);
spa_list_init(&this->rt.target_list);
this->rt.activation = this->activation->ptr;
spa_graph_init(&impl->driver_graph, &impl->driver_state);
spa_graph_node_init(&this->rt.root, &this->rt.activation->state[0]);
spa_graph_init(&impl->graph, &impl->graph_state);
spa_graph_node_set_subgraph(&this->rt.root, &impl->graph);
spa_graph_node_set_callbacks(&this->rt.root, &root_impl, this);
impl->node_activation.state[0].status = SPA_STATUS_NEED_BUFFER;
spa_graph_node_init(&this->rt.node, &impl->node_activation.state[0]);
spa_graph_node_add(&impl->graph, &this->rt.node);
this->rt.target.activation = this->rt.activation;
this->rt.target.node = this;
this->rt.target.signal = process_node;
this->rt.target.data = this;
this->rt.driver_target.signal = process_node;
this->rt.activation->position.clock.rate = SPA_FRACTION(1, 48000);
this->rt.activation->position.size = DEFAULT_QUANTUM;
this->rt.driver_link.signal = signal_driver;
this->rt.driver_link.signal_data = impl;
check_properties(this);
this->driver_node = this;
@ -896,40 +921,44 @@ static int node_ready(void *data, int status)
{
struct pw_node *node = data;
struct pw_node *driver = node->driver_node;
struct spa_graph_port *p, *pp;
struct pw_port *p;
struct pw_node_target *t;
struct timespec ts;
uint64_t nsec;
pw_log_trace("node %p: ready driver:%d exported:%d %p status:%d", node,
node->driver, node->exported, driver, status);
if (driver->rt.root.graph == NULL) {
pw_log_error("node %p: no graph", node);
return -EINVAL;
spa_list_for_each(t, &driver->rt.target_list, link) {
pw_node_activation_state_reset(&t->activation->state[0]);
t->activation->status = NOT_TRIGGERED;
}
spa_graph_run(driver->rt.driver);
if (status == SPA_STATUS_HAVE_BUFFER) {
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link) {
if ((pp = p->peer) != NULL)
spa_node_process((struct spa_node*)pp->node->callbacks_data);
}
spa_graph_node_trigger(&driver->rt.root);
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
}
spa_graph_link_trigger(&driver->rt.driver_link);
clock_gettime(CLOCK_MONOTONIC, &ts);
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
driver->rt.activation->status = TRIGGERED;
driver->rt.activation->signal_time = nsec;
pw_node_trigger(driver, nsec);
return 0;
}
static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
{
struct pw_node *node = data;
struct spa_graph_port *p, *pp;
struct pw_port *p;
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) {
spa_list_for_each(p, &node->rt.input_mix, rt.node_link) {
if (p->port_id != port_id)
continue;
if ((pp = p->peer) != NULL)
spa_graph_node_reuse_buffer(pp->node, pp->port_id, buffer_id);
spa_node_port_reuse_buffer(p->mix, p->port_id, buffer_id);
break;
}
return 0;
@ -955,7 +984,6 @@ int pw_node_set_implementation(struct pw_node *node,
}
node->node = spa_node;
spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node);
spa_node_set_callbacks(node->node, &node_callbacks, node);
res = spa_node_add_listener(node->node, &node->listener, &node_events, node);

View file

@ -83,15 +83,13 @@ static int tee_process(struct spa_node *data)
{
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct pw_port_mix *mix;
struct spa_io_buffers *io = &this->rt.io;
pw_log_trace("port %p: tee input %d %d", this, io->status, io->buffer_id);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
struct pw_port_mix *mix = SPA_CONTAINER_OF(p, struct pw_port_mix, port);
spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
pw_log_trace("port %p: port %d %p->%p %d", this,
p->port_id, io, mix->io, mix->io->buffer_id);
mix->port.port_id, io, mix->io, mix->io->buffer_id);
*mix->io = *io;
}
io->status = SPA_STATUS_NEED_BUFFER;
@ -103,12 +101,10 @@ static int tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t bu
{
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_port *p = &this->rt.mix_port, *pp;
if ((pp = p->peer) != NULL) {
pw_log_trace("port %p: tee reuse buffer %d %d", this, port_id, buffer_id);
spa_graph_node_reuse_buffer(pp->node, port_id, buffer_id);
}
pw_log_trace("port %p: tee reuse buffer %d %d", this, port_id, buffer_id);
spa_node_port_reuse_buffer(this->node->node, this->port_id, buffer_id);
return 0;
}
@ -123,17 +119,15 @@ static int schedule_mix_input(struct spa_node *data)
{
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_io_buffers *io = &this->rt.io;
struct pw_port_mix *mix;
if (PW_PORT_IS_CONTROL(this))
return SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
struct pw_port_mix *mix = SPA_CONTAINER_OF(p, struct pw_port_mix, port);
spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
pw_log_trace("port %p: mix input %d %p->%p %d %d", this,
p->port_id, mix->io, io, mix->io->status, mix->io->buffer_id);
mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id);
*io = *mix->io;
mix->io->status = SPA_STATUS_NEED_BUFFER;
break;
@ -158,14 +152,11 @@ static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, ui
{
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p, *pp;
struct pw_port_mix *mix;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if ((pp = p->peer) != NULL) {
pw_log_trace("port %p: reuse buffer %d %d", this, port_id, buffer_id);
spa_graph_node_reuse_buffer(pp->node, port_id, buffer_id);
}
spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
pw_log_trace("port %p: reuse buffer %d %d", this, port_id, buffer_id);
spa_node_port_reuse_buffer(this->node->node, port_id, buffer_id);
}
return 0;
}
@ -187,9 +178,8 @@ int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix)
port_id = pw_map_insert_new(&port->mix_port_map, mix);
spa_graph_port_init(&mix->port,
port->direction, port_id,
0);
mix->port.direction = port->direction;
mix->port.port_id = port_id;
spa_list_append(&port->mix_list, &mix->link);
port->n_mix++;
@ -327,27 +317,19 @@ struct pw_port *pw_port_new(enum pw_direction direction,
spa_list_init(&this->links);
spa_list_init(&this->mix_list);
spa_list_init(&this->rt.mix_list);
spa_list_init(&this->control_list[0]);
spa_list_init(&this->control_list[1]);
spa_hook_list_init(&this->listener_list);
spa_graph_port_init(&this->rt.port,
this->direction,
this->port_id,
0);
spa_graph_node_init(&this->rt.mix_node, &this->rt.mix_state);
this->rt.mix_state.status = SPA_STATUS_NEED_BUFFER;
impl->mix_node = this->direction == PW_DIRECTION_INPUT ?
schedule_mix_node :
schedule_tee_node;
pw_port_set_mix(this, &impl->mix_node, 0);
pw_map_init(&this->mix_port_map, 64, 64);
spa_graph_port_init(&this->rt.mix_port,
pw_direction_reverse(this->direction), 0,
0);
this->rt.io.status = SPA_STATUS_NEED_BUFFER;
if (info)
@ -373,8 +355,6 @@ int pw_port_set_mix(struct pw_port *port, struct spa_node *node, uint32_t flags)
pw_log_debug("port %p: mix node %p->%p", port, port->mix, node);
port->mix = node;
port->mix_flags = flags;
spa_graph_node_set_callbacks(&port->rt.mix_node,
&spa_graph_node_impl_default, port->mix);
return 0;
}
@ -437,23 +417,11 @@ static int do_add_port(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_port *this = user_data;
struct spa_graph_node *out, *in;
spa_graph_port_add(&this->node->rt.node, &this->rt.port);
spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port);
spa_graph_port_link(&this->rt.port, &this->rt.mix_port);
spa_graph_node_add(this->node->rt.node.graph, &this->rt.mix_node);
if (this->direction == PW_DIRECTION_INPUT) {
out = &this->rt.mix_node;
in = &this->node->rt.node;
} else {
out = &this->node->rt.node;
in = &this->rt.mix_node;
}
this->rt.mix_link.signal = spa_graph_link_signal_node;
this->rt.mix_link.signal_data = in;
spa_graph_link_add(out, in->state, &this->rt.mix_link);
if (this->direction == PW_DIRECTION_INPUT)
spa_list_append(&this->node->rt.input_mix, &this->rt.node_link);
else
spa_list_append(&this->node->rt.output_mix, &this->rt.node_link);
return 0;
}
@ -710,11 +678,7 @@ static int do_remove_port(struct spa_loop *loop,
{
struct pw_port *this = user_data;
spa_graph_link_remove(&this->rt.mix_link);
spa_graph_port_unlink(&this->rt.port);
spa_graph_port_remove(&this->rt.port);
spa_graph_port_remove(&this->rt.mix_port);
spa_graph_node_remove(&this->rt.mix_node);
spa_list_remove(&this->rt.node_link);
return 0;
}
@ -970,9 +934,9 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id,
return -EIO;
if (port->mix->port_use_buffers != NULL) {
struct spa_graph_port *p = &mix->port;
res = spa_node_port_use_buffers(port->mix,
p->direction, p->port_id, buffers, n_buffers);
mix->port.direction, mix->port.port_id,
buffers, n_buffers);
pw_log_debug("port %p: use buffers on mix: %p %d (%s)",
port, port->mix, res, spa_strerror(res));
}

View file

@ -45,8 +45,6 @@ extern "C" {
#define spa_debug pw_log_trace
#endif
#include <spa/graph/graph.h>
struct pw_command;
typedef int (*pw_command_func_t) (struct pw_command *command, struct pw_core *core, char **err);
@ -293,6 +291,27 @@ struct pw_module {
void *user_data; /**< module user_data */
};
struct pw_node_activation_state {
int status; /**< current status */
uint32_t required; /**< required number of signals */
uint32_t pending; /**< number of pending signals */
};
static inline void pw_node_activation_state_reset(struct pw_node_activation_state *state)
{
state->pending = state->required;
}
#define pw_node_activation_state_dec(s,c) (__atomic_sub_fetch(&(s)->pending, c, __ATOMIC_SEQ_CST) == 0)
struct pw_node_target {
struct spa_list link;
struct pw_node *node;
struct pw_node_activation *activation;
int (*signal) (void *data);
void *data;
};
struct pw_node_activation {
#define NOT_TRIGGERED 0
#define TRIGGERED 1
@ -305,7 +324,7 @@ struct pw_node_activation {
uint64_t finish_time;
struct spa_io_position position;
struct spa_graph_state state[2]; /* one current state and one next state */
struct pw_node_activation_state state[2]; /* one current state and one next state */
};
#define pw_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_node_events, m, v, ##__VA_ARGS__)
@ -381,11 +400,17 @@ struct pw_node {
struct {
struct spa_io_clock *clock; /**< io area of the clock or NULL */
struct spa_io_position *position;
struct spa_graph *driver;
struct spa_graph_node root;
struct pw_node_activation *activation;
struct spa_graph_node node;
struct spa_graph_link driver_link;
struct spa_list target_list; /* list of targets to signal after
* this node */
struct pw_node_target driver_target; /* driver target that we signal */
struct spa_list input_mix; /* our input ports (and mixers) */
struct spa_list output_mix; /* output ports (and mixers) */
struct pw_node_target target; /* our target that is signaled by the
driver */
struct spa_list driver_link; /* our link in driver */
} rt;
void *user_data; /**< extra user data */
@ -393,8 +418,12 @@ struct pw_node {
struct pw_port_mix {
struct spa_list link;
struct spa_list rt_link;
struct pw_port *p;
struct spa_graph_port port;
struct {
enum spa_direction direction;
uint32_t port_id;
} port;
struct spa_io_buffers *io;
uint32_t id;
};
@ -471,11 +500,8 @@ struct pw_port {
struct {
struct spa_io_buffers io; /**< io area of the port */
struct spa_io_clock clock; /**< io area of the clock */
struct spa_graph_port port; /**< this graph port, linked to mix_port */
struct spa_graph_port mix_port; /**< port from the mixer */
struct spa_graph_node mix_node; /**< mixer node */
struct spa_graph_link mix_link; /**< mixer link */
struct spa_graph_state mix_state; /**< mixer state */
struct spa_list mix_list;
struct spa_list node_link;
} rt; /**< data only accessed from the data thread */
void *owner_data; /**< extra owner data */
@ -510,7 +536,7 @@ struct pw_link {
struct {
struct pw_port_mix out_mix; /**< port added to the output mixer */
struct pw_port_mix in_mix; /**< port added to the input mixer */
struct spa_graph_link link; /**< nodes link */
struct pw_node_target target;
} rt;
void *user_data;