Make PinosPort struct

Keep track of the ports, their links and the allocated buffers in a
small struct managed by the node.
This commit is contained in:
Wim Taymans 2016-09-28 10:42:36 +02:00
parent c6861845a7
commit fe37e2bc1b
4 changed files with 364 additions and 417 deletions

View file

@ -34,8 +34,40 @@
(G_TYPE_INSTANCE_GET_PRIVATE ((node), PINOS_TYPE_NODE, PinosNodePrivate))
typedef struct {
PinosLink *link;
} NodeLink;
PinosPort port;
GPtrArray *links;
} NodePort;
static NodePort *
new_node_port (PinosNode *node, uint32_t port)
{
NodePort *np;
np = g_slice_new0 (NodePort);
np->port.node = node;
np->port.port = port;
np->links = g_ptr_array_new ();
return np;
}
static void
free_node_port (NodePort *np)
{
g_ptr_array_free (np->links, TRUE);
g_slice_free (NodePort, np);
}
static NodePort *
find_node_port (GList *ports, PinosNode *node, uint32_t port)
{
GList *walk;
for (walk = ports; walk; walk = g_list_next (walk)) {
NodePort *np = walk->data;
g_debug ("%p %d <-> %p %d", np->port.node, np->port.port, node, port);
if (np->port.node == node && np->port.port == port)
return np;
}
return NULL;
}
struct _PinosNodePrivate
{
@ -51,8 +83,10 @@ struct _PinosNodePrivate
unsigned int max_output_ports;
unsigned int n_input_ports;
unsigned int n_output_ports;
uint32_t *input_port_ids;
uint32_t *output_port_ids;
GList *input_ports;
GList *output_ports;
guint n_used_output_links;
guint n_used_input_links;
PinosNodeState state;
GError *error;
@ -62,11 +96,6 @@ struct _PinosNodePrivate
PinosRTLoop *loop;
GArray *output_links;
guint n_used_output_links;
GArray *input_links;
guint n_used_input_links;
SpaNodeEventAsyncComplete ac;
uint32_t pending_state_seq;
PinosNodeState pending_state;
@ -104,80 +133,108 @@ static void
update_port_ids (PinosNode *node, gboolean create)
{
PinosNodePrivate *priv = node->priv;
uint32_t *in_ports, *out_ports;
guint n_input_ports, n_output_ports;
guint i, j;
uint32_t *input_port_ids, *output_port_ids;
guint n_input_ports, n_output_ports, max_input_ports, max_output_ports;
guint i;
GList *ports;
if (node->node == NULL)
return;
n_input_ports = priv->n_input_ports;
n_output_ports = priv->n_output_ports;
in_ports = g_alloca (sizeof (uint32_t) * n_input_ports);
out_ports = g_alloca (sizeof (uint32_t) * n_output_ports);
memcpy (in_ports, priv->input_port_ids, sizeof (uint32_t) * n_input_ports);
memcpy (out_ports, priv->output_port_ids, sizeof (uint32_t) * n_output_ports);
spa_node_get_n_ports (node->node,
&priv->n_input_ports,
&priv->max_input_ports,
&priv->n_output_ports,
&priv->max_output_ports);
&n_input_ports,
&max_input_ports,
&n_output_ports,
&max_output_ports);
input_port_ids = g_alloca (sizeof (uint32_t) * n_input_ports);
output_port_ids = g_alloca (sizeof (uint32_t) * n_output_ports);
spa_node_get_port_ids (node->node,
max_input_ports,
input_port_ids,
max_output_ports,
output_port_ids);
g_debug ("node %p: update_port ids %u/%u, %u/%u", node,
n_input_ports, max_input_ports, n_output_ports, max_output_ports);
i = 0;
ports = priv->input_ports;
while (true) {
NodePort *p = (ports ? ports->data : NULL);
if (p && i < n_input_ports && p->port.port == input_port_ids[i]) {
i++;
ports = g_list_next (ports);
} else if ((p && i < n_input_ports && input_port_ids[i] < p->port.port) || i < n_input_ports) {
NodePort *np;
g_debug ("node %p: input port added %d", node, input_port_ids[i]);
np = new_node_port (node, input_port_ids[i]);
priv->input_ports = g_list_insert_before (priv->input_ports, ports, np);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, PINOS_DIRECTION_INPUT);
i++;
} else if (p) {
GList *next;
g_debug ("node %p: input port removed %d", node, p->port.port);
next = g_list_next (ports);
priv->input_ports = g_list_delete_link (priv->input_ports, ports);
ports = next;
free_node_port (p);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, PINOS_DIRECTION_INPUT);
} else
break;
}
i = 0;
ports = priv->output_ports;
while (true) {
NodePort *p = (ports ? ports->data : NULL);
if (p && i < n_output_ports && p->port.port == output_port_ids[i]) {
i++;
ports = g_list_next (ports);
} else if ((p && i < n_output_ports && output_port_ids[i] < p->port.port) || i < n_output_ports) {
NodePort *np;
g_debug ("node %p: output port added %d", node, output_port_ids[i]);
np = new_node_port (node, output_port_ids[i]);
priv->output_ports = g_list_insert_before (priv->output_ports, ports, np);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, PINOS_DIRECTION_INPUT);
i++;
} else if (p) {
GList *next;
g_debug ("node %p: output port removed %d", node, p->port.port);
next = g_list_next (ports);
priv->output_ports = g_list_delete_link (priv->output_ports, ports);
ports = next;
free_node_port (p);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, PINOS_DIRECTION_INPUT);
} else
break;
}
priv->max_input_ports = max_input_ports;
priv->max_output_ports = max_output_ports;
priv->n_input_ports = n_input_ports;
priv->n_output_ports = n_output_ports;
node->have_inputs = priv->n_input_ports > 0;
node->have_outputs = priv->n_output_ports > 0;
g_debug ("node %p: update_port ids %u/%u, %u/%u", node,
priv->n_input_ports, priv->max_input_ports, priv->n_output_ports, priv->max_output_ports);
priv->input_port_ids = g_realloc_n (priv->input_port_ids, priv->max_input_ports, sizeof (uint32_t));
priv->output_port_ids = g_realloc_n (priv->output_port_ids, priv->max_output_ports, sizeof (uint32_t));
spa_node_get_port_ids (node->node,
priv->max_input_ports,
priv->input_port_ids,
priv->max_output_ports,
priv->output_port_ids);
i = j = 0;
while (true) {
if (i < priv->n_input_ports && j < n_input_ports && priv->input_port_ids[i] == in_ports[j]) {
i++;
j++;
} else if ((i < priv->n_input_ports && j < n_input_ports &&
priv->input_port_ids[i] < in_ports[j]) || i < priv->n_input_ports) {
g_debug ("node %p: input port added %d", node, priv->input_port_ids[i]);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, PINOS_DIRECTION_INPUT);
i++;
} else if (j < n_input_ports) {
g_debug ("node %p: input port removed %d", node, in_ports[j]);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, PINOS_DIRECTION_INPUT);
j++;
} else
break;
}
i = j = 0;
while (true) {
if (i < priv->n_output_ports && j < n_output_ports && priv->output_port_ids[i] == out_ports[j]) {
i++;
j++;
} else if ((i < priv->n_output_ports && j < n_output_ports &&
priv->output_port_ids[i] < out_ports[j]) || i < priv->n_output_ports) {
g_debug ("node %p: output port added %d", node, priv->output_port_ids[i]);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, PINOS_DIRECTION_OUTPUT);
i++;
} else if (j < n_output_ports) {
g_debug ("node %p: output port removed %d", node, out_ports[j]);
if (!priv->async_init)
g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, PINOS_DIRECTION_OUTPUT);
j++;
} else
break;
}
}
static SpaResult
@ -219,17 +276,23 @@ suspend_node (PinosNode *this)
{
PinosNodePrivate *priv = this->priv;
SpaResult res = SPA_RESULT_OK;
guint i;
GList *walk;
g_debug ("node %p: suspend node", this);
for (i = 0; i < priv->n_input_ports; i++) {
if ((res = spa_node_port_set_format (this->node, priv->input_port_ids[i], 0, NULL)) < 0)
for (walk = priv->input_ports; walk; walk = g_list_next (walk)) {
NodePort *p = walk->data;
if ((res = spa_node_port_set_format (this->node, p->port.port, 0, NULL)) < 0)
g_warning ("error unset format output: %d", res);
p->port.allocated = FALSE;
p->port.n_buffers = 0;
}
for (i = 0; i < priv->n_output_ports; i++) {
if ((res = spa_node_port_set_format (this->node, priv->output_port_ids[i], 0, NULL)) < 0)
for (walk = priv->output_ports; walk; walk = g_list_next (walk)) {
NodePort *p = walk->data;
if ((res = spa_node_port_set_format (this->node, p->port.port, 0, NULL)) < 0)
g_warning ("error unset format output: %d", res);
p->port.allocated = FALSE;
p->port.n_buffers = 0;
}
return res;
}
@ -327,11 +390,11 @@ do_read_link (PinosNode *this, PinosLink *link)
link->in_ready--;
iinfo[0].port_id = link->input_port;
iinfo[0].port_id = link->input->port;
iinfo[0].buffer_id = link->queue[areas[0].offset];
iinfo[0].flags = SPA_PORT_INPUT_FLAG_NONE;
if ((res = spa_node_port_push_input (link->input_node->node, 1, iinfo)) < 0)
if ((res = spa_node_port_push_input (link->input->node->node, 1, iinfo)) < 0)
g_warning ("node %p: error pushing buffer: %d, %d", this, res, iinfo[0].status);
else
pushed = TRUE;
@ -405,18 +468,18 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
}
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
{
guint i;
SpaNodeEventNeedInput *ni = event->data;
NodePort *p;
guint i;
for (i = 0; i < priv->input_links->len; i++) {
NodeLink *link = &g_array_index (priv->input_links, NodeLink, i);
PinosLink *pl = link->link;
if (!(p = find_node_port (priv->input_ports, this, ni->port_id)))
break;
if (pl == NULL || pl->input_node->node != node || pl->input_port != ni->port_id)
continue;
for (i = 0; i < p->links->len; i++) {
PinosLink *link = g_ptr_array_index (p->links, i);
pl->in_ready++;
do_read_link (this, pl);
link->in_ready++;
do_read_link (this, link);
}
break;
}
@ -425,8 +488,9 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
SpaNodeEventHaveOutput *ho = event->data;
SpaPortOutputInfo oinfo[1] = { 0, };
SpaResult res;
guint i;
gboolean pushed = FALSE;
NodePort *p;
guint i;
oinfo[0].port_id = ho->port_id;
@ -435,20 +499,19 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
break;
}
for (i = 0; i < priv->output_links->len; i++) {
NodeLink *link = &g_array_index (priv->output_links, NodeLink, i);
PinosLink *pl = link->link;
if (!(p = find_node_port (priv->output_ports, this, oinfo[0].port_id)))
break;
for (i = 0; i < p->links->len; i++) {
PinosLink *link = g_ptr_array_index (p->links, i);
SpaRingbufferArea areas[2];
if (pl == NULL || pl->output_node->node != node || pl->output_port != oinfo[0].port_id)
continue;
spa_ringbuffer_get_write_areas (&pl->ringbuffer, areas);
spa_ringbuffer_get_write_areas (&link->ringbuffer, areas);
if (areas[0].len > 0) {
pl->queue[areas[0].offset] = oinfo[0].buffer_id;
spa_ringbuffer_write_advance (&pl->ringbuffer, 1);
link->queue[areas[0].offset] = oinfo[0].buffer_id;
spa_ringbuffer_write_advance (&link->ringbuffer, 1);
pushed = do_read_link (this, pl);
pushed = do_read_link (this, link);
}
}
if (!pushed) {
@ -462,17 +525,17 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{
SpaResult res;
SpaNodeEventReuseBuffer *rb = event->data;
NodePort *p;
guint i;
for (i = 0; i < priv->input_links->len; i++) {
NodeLink *link = &g_array_index (priv->input_links, NodeLink, i);
PinosLink *pl = link->link;
if (!(p = find_node_port (priv->input_ports, this, rb->port_id)))
break;
if (pl == NULL || pl->input_node->node != node || pl->input_port != rb->port_id)
continue;
for (i = 0; i < p->links->len; i++) {
PinosLink *link = g_ptr_array_index (p->links, i);
if ((res = spa_node_port_reuse_buffer (pl->output_node->node,
pl->output_port,
if ((res = spa_node_port_reuse_buffer (link->output->node->node,
link->output->port,
rb->buffer_id)) < 0)
g_warning ("node %p: error reuse buffer: %d", node, res);
}
@ -702,16 +765,13 @@ static void
pinos_node_dispose (GObject * obj)
{
PinosNode *node = PINOS_NODE (obj);
PinosNodePrivate *priv = node->priv;
//PinosNodePrivate *priv = node->priv;
g_debug ("node %p: dispose", node);
pinos_node_set_state (node, PINOS_NODE_STATE_SUSPENDED);
node_unregister_object (node);
g_array_free (priv->input_links, TRUE);
g_array_free (priv->output_links, TRUE);
G_OBJECT_CLASS (pinos_node_parent_class)->dispose (obj);
}
@ -729,8 +789,6 @@ pinos_node_finalize (GObject * obj)
g_clear_error (&priv->error);
if (priv->properties)
pinos_properties_free (priv->properties);
g_free (priv->input_port_ids);
g_free (priv->output_port_ids);
G_OBJECT_CLASS (pinos_node_parent_class)->finalize (obj);
}
@ -885,9 +943,6 @@ pinos_node_init (PinosNode * node)
priv->state = PINOS_NODE_STATE_CREATING;
priv->pending_state_seq = SPA_ID_INVALID;
pinos_node1_set_state (priv->iface, priv->state);
priv->input_links = g_array_new (FALSE, TRUE, sizeof (NodeLink));
priv->output_links = g_array_new (FALSE, TRUE, sizeof (NodeLink));
}
/**
@ -1048,41 +1103,6 @@ pinos_node_remove (PinosNode *node)
g_signal_emit (node, signals[SIGNAL_REMOVE], 0, NULL);
}
static uint32_t
get_free_node_port (PinosNode *node,
PinosDirection direction)
{
PinosNodePrivate *priv = node->priv;
guint i, free_port, n_ports, max_ports;
uint32_t *ports;
if (direction == PINOS_DIRECTION_INPUT) {
max_ports = priv->max_input_ports;
n_ports = priv->n_input_ports;
ports = priv->input_port_ids;
free_port = 0;
} else {
max_ports = priv->max_output_ports;
n_ports = priv->n_output_ports;
ports = priv->output_port_ids;
free_port = priv->max_input_ports;
}
if (max_ports == n_ports)
return SPA_ID_INVALID;
g_debug ("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports);
for (i = 0; i < n_ports; i++) {
if (free_port < ports[i])
break;
free_port = ports[i] + 1;
}
if (free_port >= max_ports)
return SPA_ID_INVALID;
return free_port;
}
/**
* pinos_node_get_free_port:
* @node: a #PinosNode
@ -1097,65 +1117,79 @@ pinos_node_get_free_port (PinosNode *node,
PinosDirection direction)
{
PinosNodePrivate *priv;
guint i, n_ports;
NodeLink *links;
guint free_port, n_ports, max_ports;
GList *ports, *walk;
g_return_val_if_fail (PINOS_IS_NODE (node), SPA_ID_INVALID);
priv = node->priv;
if (direction == PINOS_DIRECTION_INPUT) {
n_ports = priv->input_links->len;
links = (NodeLink *)priv->input_links->data;
max_ports = priv->max_input_ports;
n_ports = priv->n_input_ports;
ports = priv->input_ports;
free_port = 0;
} else {
n_ports = priv->output_links->len;
links = (NodeLink *)priv->output_links->data;
max_ports = priv->max_output_ports;
n_ports = priv->n_output_ports;
ports = priv->output_ports;
free_port = priv->max_input_ports;
}
for (i = 0; i < n_ports; i++) {
if (!links[i].link)
return i;
g_debug ("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports);
for (walk = ports; walk; walk = g_list_next (walk)) {
PinosPort *p = walk->data;
if (free_port < p->port)
break;
free_port = p->port + 1;
}
return n_ports;
if (free_port >= max_ports && ports) {
PinosPort *p = ports->data;
free_port = p->port;
} else
return SPA_ID_INVALID;
return free_port;
}
static void
do_remove_link (PinosLink *link, PinosNode *node)
{
guint i, n_links;
GArray *links;
NodePort *p;
PinosNode *n;
if (link->output_node) {
links = link->output_node->priv->output_links;
n_links = links->len;
for (i = 0; i < n_links; i++) {
NodeLink *l = &g_array_index (links, NodeLink, i);
if (l->link == link) {
l->link = NULL;
if (--link->output_node->priv->n_used_output_links == 0)
pinos_node_report_idle (link->output_node);
}
}
if (link->output) {
n = link->output->node;
if ((p = find_node_port (n->priv->output_ports, n, link->output->port)))
if (g_ptr_array_remove_fast (p->links, link))
n->priv->n_used_output_links--;
if (n->priv->n_used_output_links == 0 &&
n->priv->n_used_input_links == 0)
pinos_node_report_idle (n);
}
if (link->input_node) {
links = link->input_node->priv->input_links;
n_links = links->len;
for (i = 0; i < n_links; i++) {
NodeLink *l = &g_array_index (links, NodeLink, i);
if (l->link == link) {
l->link = NULL;
if (--link->input_node->priv->n_used_input_links == 0)
pinos_node_report_idle (link->input_node);
}
}
if (link->input->node) {
n = link->input->node;
if ((p = find_node_port (n->priv->input_ports, n, link->input->port)))
if (g_ptr_array_remove_fast (p->links, link))
n->priv->n_used_input_links--;
if (n->priv->n_used_output_links == 0 &&
n->priv->n_used_input_links == 0)
pinos_node_report_idle (n);
}
}
/**
* pinos_node_link:
* @output_node: a #PinosNode
* @output_id: an output link id
* @output_port: an output port
* @input_node: a #PinosNode
* @input_id: an input link id
* @input_port: an input port
* @format_filter: a format filter
* @properties: extra properties
* @error: an error or %NULL
@ -1171,85 +1205,74 @@ do_remove_link (PinosLink *link, PinosNode *node)
*/
PinosLink *
pinos_node_link (PinosNode *output_node,
guint output_id,
guint output_port,
PinosNode *input_node,
guint input_id,
guint input_port,
GPtrArray *format_filter,
PinosProperties *properties,
GError **error)
{
PinosNodePrivate *priv;
NodeLink *olink, *ilink;
PinosLink *pl;
NodePort *onp, *inp;
PinosLink *link = NULL;
guint i;
g_return_val_if_fail (PINOS_IS_NODE (output_node), NULL);
g_return_val_if_fail (PINOS_IS_NODE (input_node), NULL);
priv = output_node->priv;
g_debug ("node %p: link %u %p:%u", output_node, output_id, input_node, input_id);
g_debug ("node %p: link %u %p:%u", output_node, output_port, input_node, input_port);
if (output_node == input_node)
goto same_node;
if (output_id >= priv->output_links->len)
g_array_set_size (priv->output_links, output_id + 1);
if (input_id >= input_node->priv->input_links->len)
g_array_set_size (input_node->priv->input_links, input_id + 1);
olink = &g_array_index (priv->output_links, NodeLink, output_id);
ilink = &g_array_index (input_node->priv->input_links, NodeLink, input_id);
pl = olink->link;
if (pl) {
/* FIXME */
pl->input_node = input_node;
pl->input_id = input_id;
g_object_ref (pl);
} else {
uint32_t input_port, output_port;
output_port = get_free_node_port (output_node, PINOS_DIRECTION_OUTPUT);
if (output_port == SPA_ID_INVALID && output_node->priv->n_output_ports > 0)
output_port = output_node->priv->output_port_ids[0];
else
onp = find_node_port (priv->output_ports, output_node, output_port);
if (onp == NULL)
goto no_output_ports;
input_port = get_free_node_port (input_node, PINOS_DIRECTION_INPUT);
g_debug ("node %p: port %u, %u", input_node, input_port, input_node->priv->n_input_ports);
if (input_port == SPA_ID_INVALID && input_node->priv->n_input_ports > 0)
input_port = input_node->priv->input_port_ids[0];
else
for (i = 0; i < onp->links->len; i++) {
PinosLink *pl = g_ptr_array_index (onp->links, i);
if (pl->input->node == input_node && pl->input->port == input_port) {
link = pl;
break;
}
}
inp = find_node_port (input_node->priv->input_ports, input_node, input_port);
if (inp == NULL)
goto no_input_ports;
if (link) {
/* FIXME */
link->input->node = input_node;
link->input->port = input_port;
g_object_ref (link);
} else {
input_node->live = output_node->live;
if (output_node->clock)
input_node->clock = output_node->clock;
g_debug ("node %p: clock %p", output_node, output_node->clock);
pl = g_object_new (PINOS_TYPE_LINK,
link = g_object_new (PINOS_TYPE_LINK,
"daemon", priv->daemon,
"output-node", output_node,
"output-id", output_id,
"output-port", output_port,
"input-node", input_node,
"input-id", input_id,
"input-port", input_port,
"output-port", &onp->port,
"input-port", &inp->port,
"format-filter", format_filter,
"properties", properties,
NULL);
g_signal_connect (pl,
g_ptr_array_add (onp->links, link);
g_ptr_array_add (inp->links, link);
g_signal_connect (link,
"remove",
(GCallback) do_remove_link,
output_node);
output_node->priv->n_used_output_links++;
input_node->priv->n_used_input_links++;
olink->link = pl;
ilink->link = pl;
}
return pl;
return link;
same_node:
{