link: work on activate/deactivate

Recursively activate links when activating nodes
This commit is contained in:
Wim Taymans 2017-07-03 17:34:30 +02:00
parent d2f877912a
commit 9fa1df6f2c
13 changed files with 215 additions and 133 deletions

View file

@ -64,5 +64,5 @@ libpipewire = shared_library('pipewire-@0@'.format(apiversion), pipewire_sources
pipewire_dep = declare_dependency(link_with : libpipewire,
include_directories : [configinc, spa_inc],
dependencies : [pthread_lib],
dependencies : [pthread_lib,spalib_dep],
)

View file

@ -46,7 +46,7 @@ static const struct spa_handle_factory *find_factory(struct impl *impl)
char *filename;
const char *dir;
if ((dir = getenv("SPA_PLUIGIN_DIR")) == NULL)
if ((dir = getenv("SPA_PLUGIN_DIR")) == NULL)
dir = PLUGINDIR;
asprintf(&filename, "%s/%s.so", dir, AUDIOMIXER_LIB);
@ -157,6 +157,9 @@ static struct impl *module_new(struct pw_core *core, struct pw_properties *prope
if (op == NULL)
continue;
n->idle_used_input_links++;
node->idle_used_output_links++;
pw_link_new(core, op, ip, NULL, NULL, &error);
}
return impl;

View file

@ -37,7 +37,8 @@ struct pw_spa_node {
};
typedef int (*setup_node_t) (struct pw_core *core,
struct spa_node *spa_node, struct pw_properties *pw_props);
struct spa_node *spa_node,
struct pw_properties *pw_props);
struct pw_spa_node *
pw_spa_node_load(struct pw_core *core,

View file

@ -381,8 +381,8 @@ spa_node_param_filter(struct pw_link *this,
struct spa_pod_frame f;
uint32_t offset;
if (spa_node_port_enum_params
(out_node, SPA_DIRECTION_OUTPUT, out_port, oidx, &oparam) < 0)
if (spa_node_port_enum_params(out_node, SPA_DIRECTION_OUTPUT,
out_port, oidx, &oparam) < 0)
break;
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
@ -589,7 +589,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
}
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(this->output, params, n_params,
if ((res = pw_port_alloc_buffers(this->output,
params, n_params,
impl->buffers, &impl->n_buffers)) < 0) {
asprintf(&error, "error alloc output buffers: %d", res);
goto error;
@ -602,7 +603,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_log_debug("allocated %d buffers %p from output port", impl->n_buffers,
impl->buffers);
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(this->input, params, n_params,
if ((res = pw_port_alloc_buffers(this->input,
params, n_params,
impl->buffers, &impl->n_buffers)) < 0) {
asprintf(&error, "error alloc input buffers: %d", res);
goto error;
@ -619,7 +621,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("using %d buffers %p on input port", impl->n_buffers, impl->buffers);
if ((res = pw_port_use_buffers(this->input, impl->buffers, impl->n_buffers)) < 0) {
if ((res = pw_port_use_buffers(this->input,
impl->buffers, impl->n_buffers)) < 0) {
asprintf(&error, "error use input buffers: %d", res);
goto error;
}
@ -627,7 +630,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input);
} else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("using %d buffers %p on output port", impl->n_buffers, impl->buffers);
if ((res = pw_port_use_buffers(this->output, impl->buffers, impl->n_buffers)) < 0) {
if ((res = pw_port_use_buffers(this->output,
impl->buffers, impl->n_buffers)) < 0) {
asprintf(&error, "error use output buffers: %d", res);
goto error;
}
@ -758,6 +762,14 @@ on_output_async_complete_notify(struct pw_listener *listener,
pw_work_queue_complete(impl->work, node, seq, res);
}
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
if (impl->buffer_owner != port)
pw_port_use_buffers(port, NULL, 0);
}
static int
do_remove_input(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
@ -775,8 +787,11 @@ static void input_remove(struct pw_link *this, struct pw_port *port)
pw_log_debug("link %p: remove input port %p", this, port);
pw_signal_remove(&impl->input_port_destroy);
pw_signal_remove(&impl->input_async_complete);
pw_loop_invoke(port->node->data_loop->loop,
do_remove_input, 1, sizeof(struct pw_port*), &port, true, this);
clear_port_buffers(this, this->input);
}
static int
@ -796,8 +811,11 @@ static void output_remove(struct pw_link *this, struct pw_port *port)
pw_log_debug("link %p: remove output port %p", this, port);
pw_signal_remove(&impl->output_port_destroy);
pw_signal_remove(&impl->output_async_complete);
pw_loop_invoke(port->node->data_loop->loop,
do_remove_output, 1, sizeof(struct pw_port*), &port, true, this);
clear_port_buffers(this, this->output);
}
static void on_port_destroy(struct pw_link *this, struct pw_port *port)
@ -807,11 +825,9 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port)
if (port == this->input) {
input_remove(this, port);
this->input = NULL;
other = this->output;
} else if (port == this->output) {
output_remove(this, port);
this->output = NULL;
other = this->input;
} else
return;
@ -820,7 +836,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port)
impl->buffers = NULL;
impl->n_buffers = 0;
pw_log_debug("link %p: clear input allocated buffers on port %p", this, other);
pw_log_debug("link %p: clear allocated buffers on port %p", this, other);
pw_port_use_buffers(other, NULL, 0);
impl->buffer_owner = NULL;
}
@ -845,6 +861,15 @@ static void on_output_port_destroy(struct pw_listener *listener, struct pw_port
on_port_destroy(&impl->this, port);
}
static int
do_activate_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
spa_graph_port_link(this->output->node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port);
return SPA_RESULT_OK;
}
bool pw_link_activate(struct pw_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
@ -855,15 +880,70 @@ bool pw_link_activate(struct pw_link *this)
impl->active = true;
pw_log_debug("link %p: activate", this);
pw_loop_invoke(this->output->node->data_loop->loop,
do_activate_link, SPA_ID_INVALID, 0, NULL, false, this);
this->output->node->n_used_output_links++;
this->input->node->n_used_input_links++;
pw_work_queue_add(impl->work,
this, SPA_RESULT_WAIT_SYNC, (pw_work_func_t) check_states, this);
return true;
}
static int
do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
spa_graph_port_unlink(this->output->node->rt.sched->graph, &this->rt.out_port);
return SPA_RESULT_OK;
}
bool pw_link_deactivate(struct pw_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_node *input_node, *output_node;
if (!impl->active)
return true;
impl->active = false;
pw_log_debug("link %p: deactivate", this);
pw_loop_invoke(this->output->node->data_loop->loop,
do_deactivate_link, SPA_ID_INVALID, 0, NULL, true, this);
input_node = this->input->node;
output_node = this->output->node;
input_node->n_used_input_links--;
output_node->n_used_output_links--;
pw_log_debug("link %p: in %d %d, out %d %d, %d %d %d %d", this,
input_node->n_used_input_links,
input_node->n_used_output_links,
output_node->n_used_input_links,
output_node->n_used_output_links,
input_node->idle_used_input_links,
input_node->idle_used_output_links,
output_node->idle_used_input_links,
output_node->idle_used_output_links);
if (input_node->n_used_input_links <= input_node->idle_used_input_links &&
input_node->n_used_output_links <= input_node->idle_used_output_links &&
input_node->info.state > PW_NODE_STATE_IDLE) {
pw_node_update_state(input_node, PW_NODE_STATE_IDLE, NULL);
this->input->state = PW_PORT_STATE_PAUSED;
}
if (output_node->n_used_input_links <= output_node->idle_used_input_links &&
output_node->n_used_output_links <= output_node->idle_used_output_links &&
output_node->info.state > PW_NODE_STATE_IDLE) {
pw_node_update_state(output_node, PW_NODE_STATE_IDLE, NULL);
this->output->state = PW_PORT_STATE_PAUSED;
}
return true;
}
@ -998,22 +1078,17 @@ struct pw_link *pw_link_new(struct pw_core *core,
spa_list_insert(output->links.prev, &this->output_link);
spa_list_insert(input->links.prev, &this->input_link);
output_node->n_used_output_links++;
input_node->n_used_input_links++;
spa_list_insert(core->link_list.prev, &this->link);
pw_core_add_global(core, NULL, core->type.link, 0, this, link_bind_func, &this->global);
this->info.id = this->global->id;
this->info.output_node_id = output ? output->node->global->id : -1;
this->info.output_node_id = output ? output_node->global->id : -1;
this->info.output_port_id = output ? output->port_id : -1;
this->info.input_node_id = input ? input->node->global->id : -1;
this->info.input_node_id = input ? input_node->global->id : -1;
this->info.input_port_id = input ? input->port_id : -1;
this->info.format = NULL;
spa_graph_port_link(output_node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port);
pw_loop_invoke(output_node->data_loop->loop,
do_add_link,
SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this);
@ -1034,15 +1109,6 @@ struct pw_link *pw_link_new(struct pw_core *core,
return NULL;
}
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
if (impl->buffer_owner != port)
pw_port_use_buffers(port, NULL, 0);
}
void pw_link_destroy(struct pw_link *link)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
@ -1051,42 +1117,19 @@ void pw_link_destroy(struct pw_link *link)
pw_log_debug("link %p: destroy", impl);
pw_signal_emit(&link->destroy_signal, link);
pw_link_deactivate(link);
pw_global_destroy(link->global);
spa_list_remove(&link->link);
spa_list_for_each_safe(resource, tmp, &link->resource_list, link)
pw_resource_destroy(resource);
if (link->input) {
input_remove(link, link->input);
spa_list_remove(&link->input_link);
link->input->node->n_used_input_links--;
clear_port_buffers(link, link->input);
if (link->input->node->n_used_input_links == 0 &&
link->input->node->n_used_output_links == 0 &&
link->input->node->info.state > PW_NODE_STATE_IDLE)
pw_node_update_state(link->input->node, PW_NODE_STATE_IDLE, NULL);
link->input = NULL;
}
if (link->output) {
output_remove(link, link->output);
spa_list_remove(&link->output_link);
link->output->node->n_used_output_links--;
clear_port_buffers(link, link->output);
if (link->output->node->n_used_input_links == 0 &&
link->output->node->n_used_output_links == 0 &&
link->output->node->info.state > PW_NODE_STATE_IDLE)
pw_node_update_state(link->output->node, PW_NODE_STATE_IDLE, NULL);
link->output = NULL;
}
pw_work_queue_destroy(impl->work);

View file

@ -640,7 +640,7 @@ struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction di
port = spa_list_first(ports, struct pw_port, link);
/* for output we can reuse an existing port, for input only
* when there is a multiplex */
if (direction == PW_DIRECTION_INPUT && port->multiplex == NULL)
if (direction == PW_DIRECTION_INPUT && port->mix == NULL)
port = NULL;
}
}
@ -664,10 +664,28 @@ static void on_state_complete(struct pw_node *node, void *data, int res)
pw_node_update_state(node, state, error);
}
static void node_deactivate(struct pw_node *this)
{
struct pw_port *port;
pw_log_debug("node %p: deactivate", this);
spa_list_for_each(port, &this->input_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, input_link)
pw_link_deactivate(link);
}
spa_list_for_each(port, &this->output_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, output_link)
pw_link_deactivate(link);
}
}
static void node_activate(struct pw_node *this)
{
struct pw_port *port;
pw_log_debug("node %p: activate", this);
spa_list_for_each(port, &this->input_ports, link) {
struct pw_link *link;
spa_list_for_each(link, &port->links, input_link)
@ -754,6 +772,9 @@ void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char *
node->info.error = error;
node->info.state = state;
if (state == PW_NODE_STATE_IDLE)
node_deactivate(node);
pw_signal_emit(&node->state_changed, node, old, state);
node->info.change_mask = 1 << 5;

View file

@ -84,10 +84,12 @@ struct pw_node {
struct spa_list input_ports; /**< list of input ports */
struct pw_port **input_port_map; /**< map from port_id to port */
uint32_t n_used_input_links; /**< number of active input links */
uint32_t idle_used_input_links; /**< number of active input to be idle */
struct spa_list output_ports; /**< list of output ports */
struct pw_port **output_port_map; /**< map from port_id to port */
uint32_t n_used_output_links; /**< number of active output links */
uint32_t idle_used_output_links; /**< number of active output to be idle */
/** Emited when a new port is added */
PW_SIGNAL(port_added, (struct pw_listener *listener,

View file

@ -85,6 +85,7 @@ static int schedule_mix(struct spa_graph_node *node)
io->status = SPA_RESULT_NEED_BUFFER;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link)
*p->io = *io;
io->buffer_id = SPA_ID_INVALID;
res = SPA_RESULT_NEED_BUFFER;
}
else
@ -194,28 +195,17 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state)
}
}
int pw_port_pause_rt(struct pw_port *port)
{
int res;
if (port->state <= PW_PORT_STATE_PAUSED)
return SPA_RESULT_OK;
res = spa_node_port_send_command(port->node->node,
port->direction,
port->port_id,
&SPA_COMMAND_INIT(port->node->core->type.command_node.
Pause));
port_update_state (port, PW_PORT_STATE_PAUSED);
return res;
}
static int
do_port_pause(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
return pw_port_pause_rt(port);
return spa_node_port_send_command(port->node->node,
port->direction,
port->port_id,
&SPA_COMMAND_INIT(port->node->core->type.command_node.
Pause));
}
int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format)
@ -223,7 +213,6 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *
int res;
res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format);
pw_log_debug("port %p: set format %d", port, res);
if (!SPA_RESULT_IS_ASYNC(res)) {

View file

@ -72,7 +72,7 @@ struct pw_port {
struct spa_list links; /**< list of \ref pw_link */
void *multiplex; /**< optional port buffer mix/split */
void *mix; /**< optional port buffer mix/split */
struct {
struct spa_graph *graph;
@ -102,9 +102,6 @@ int pw_port_alloc_buffers(struct pw_port *port,
struct spa_buffer **buffers, uint32_t *n_buffers);
/** Pause a port, should be called from data thread \memberof pw_port */
int pw_port_pause_rt(struct pw_port *port);
#ifdef __cplusplus
}
#endif

View file

@ -28,6 +28,7 @@ extern "C" {
struct spa_graph_scheduler {
struct spa_graph *graph;
struct spa_list ready;
struct spa_list pending;
struct spa_graph_node *node;
};
@ -36,6 +37,7 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
struct spa_graph *graph)
{
sched->graph = graph;
spa_list_init(&sched->ready);
spa_list_init(&sched->pending);
sched->node = NULL;
}
@ -55,16 +57,34 @@ static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
return res;
}
static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, struct spa_graph_port *port)
{
struct spa_graph_node *node = port->node;
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
if (node->ready_link.next == NULL)
spa_list_insert(sched->ready.prev, &node->ready_link);
} else if (node->ready_link.next) {
spa_list_remove(&node->ready_link);
node->ready_link.next = NULL;
}
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
{
bool res;
struct spa_graph *graph = sched->graph;
struct spa_graph_port *p;
struct spa_graph_node *n;
res = !spa_list_is_empty(&graph->ready);
res = !spa_list_is_empty(&sched->ready);
if (res) {
n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
n = spa_list_first(&sched->ready, struct spa_graph_node, ready_link);
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
@ -79,7 +99,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node)
break;
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(graph->ready.prev, &n->ready_link);
spa_list_insert(sched->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_CHECK:
@ -91,7 +111,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
if (pn != sched->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->action = SPA_GRAPH_ACTION_OUT;
spa_list_insert(graph->ready.prev,
spa_list_insert(sched->ready.prev,
&pn->ready_link);
}
} else if (p->io->status == SPA_RESULT_OK)
@ -99,14 +119,14 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
}
} else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
spa_scheduler_port_check(sched, p->peer);
}
break;
default:
break;
}
res = !spa_list_is_empty(&graph->ready);
res = !spa_list_is_empty(&sched->ready);
}
return res;
}
@ -118,7 +138,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
node->state = SPA_RESULT_NEED_BUFFER;
sched->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->graph->ready.prev, &node->ready_link);
spa_list_insert(sched->ready.prev, &node->ready_link);
}
static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
@ -127,7 +147,7 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
node->action = SPA_GRAPH_ACTION_OUT;
sched->node = node;
if (node->ready_link.next == NULL)
spa_list_insert(sched->graph->ready.prev, &node->ready_link);
spa_list_insert(sched->ready.prev, &node->ready_link);
}
#ifdef __cplusplus

View file

@ -41,6 +41,36 @@ static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
return res;
}
static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port)
{
struct spa_graph_node *node = port->node;
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
} else if (node->ready_link.next) {
spa_list_remove(&node->ready_link);
node->ready_link.next = NULL;
}
}
static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) {
struct spa_graph_port *p;
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready_in++;
}
debug("node %p update %d ready\n", node, node->ready_in);
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
{
bool empty;

View file

@ -28,7 +28,6 @@ extern "C" {
struct spa_graph_scheduler {
struct spa_graph *graph;
struct spa_list pending;
struct spa_graph_node *node;
};
@ -36,7 +35,6 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
struct spa_graph *graph)
{
sched->graph = graph;
spa_list_init(&sched->pending);
sched->node = NULL;
}
@ -67,8 +65,11 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
struct spa_graph_port *pport = p->peer;
struct spa_graph_node *pnode = pport->node;
struct spa_graph_port *pport;
struct spa_graph_node *pnode;
if ((pport = p->peer) == NULL)
continue;
pnode = pport->node;
debug("node %p peer %p io %d\n", node, pnode, pport->io->status);
if (pport->io->status == SPA_RESULT_NEED_BUFFER) {
spa_list_insert(ready.prev, &pnode->ready_link);
@ -102,6 +103,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
if (node->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
if (p->io->status == SPA_RESULT_HAVE_BUFFER)
if (p->peer)
p->peer->node->ready_in++;
}
}
@ -125,8 +127,11 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
spa_list_init(&ready);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
struct spa_graph_port *pport = p->peer;
struct spa_graph_node *pnode = pport->node;
struct spa_graph_port *pport;
struct spa_graph_node *pnode;
if ((pport = p->peer) == NULL)
continue;
pnode = pport->node;
if (pport->io->status == SPA_RESULT_HAVE_BUFFER)
pnode->ready_in++;
@ -161,6 +166,7 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
if (p->peer)
p->peer->node->ready_in++;
}
}

View file

@ -40,7 +40,6 @@ struct spa_graph_port;
struct spa_graph {
struct spa_list nodes;
struct spa_list ready;
};
typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node);
@ -76,7 +75,6 @@ struct spa_graph_port {
static inline void spa_graph_init(struct spa_graph *graph)
{
spa_list_init(&graph->nodes);
spa_list_init(&graph->ready);
}
static inline void
@ -96,36 +94,6 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node,
debug("node %p add\n", node);
}
static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port)
{
struct spa_graph_node *node = port->node;
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
} else if (node->ready_link.next) {
spa_list_remove(&node->ready_link);
node->ready_link.next = NULL;
}
}
static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) {
struct spa_graph_port *p;
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready_in++;
}
debug("node %p update %d ready\n", node, node->ready_in);
}
static inline void
spa_graph_port_add(struct spa_graph *graph,
struct spa_graph_node *node,
@ -135,7 +103,7 @@ spa_graph_port_add(struct spa_graph *graph,
uint32_t flags,
struct spa_port_io *io)
{
debug("port %p add %d to node %p \n", port, direction, node);
debug("port %p add type %d id %d to node %p \n", port, direction, port_id, node);
port->node = node;
port->direction = direction;
port->port_id = port_id;
@ -145,7 +113,6 @@ spa_graph_port_add(struct spa_graph *graph,
node->max_in++;
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT)
node->required_in++;
spa_graph_port_check(graph, port);
}
static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node)

View file

@ -256,6 +256,8 @@ static int impl_node_add_port(struct spa_node *node, enum spa_direction directio
if (this->last_port <= port_id)
this->last_port = port_id + 1;
spa_log_info(this->log, NAME " %p: add port %d", this, port_id);
return SPA_RESULT_OK;
}
@ -289,6 +291,7 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
this->last_port = i + 1;
}
spa_log_info(this->log, NAME " %p: remove port %d", this, port_id);
return SPA_RESULT_OK;
}
@ -414,11 +417,11 @@ impl_node_port_set_format(struct spa_node *node,
this->copy = this->ops.copy[CONV_F32_F32];
this->add = this->ops.add[CONV_F32_F32];
}
}
if (!port->have_format) {
this->n_formats++;
port->have_format = true;
spa_log_info(this->log, NAME " %p: set format on port %d", this, port_id);
}
}