node: keep separate array of links

Keep a separate array with the links that we only access and update from
the data-thread.
This commit is contained in:
Wim Taymans 2016-10-24 18:01:04 +02:00
parent d3dd90bb05
commit 73e6272488
4 changed files with 121 additions and 28 deletions

View file

@ -156,7 +156,6 @@ loop (void *user_data)
while (spa_ringbuffer_get_read_offset (&priv->buffer, &offset) > 0) {
InvokeItem *item = SPA_MEMBER (priv->buffer_data, offset, InvokeItem);
g_debug ("data-loop %p: invoke %d", this, item->seq);
item->func (p, true, item->seq, item->size, item->data, item->user_data);
spa_ringbuffer_read_advance (&priv->buffer, item->item_size);
}
@ -347,7 +346,10 @@ do_invoke (SpaPoll *poll,
wakeup_thread (this);
res = SPA_RESULT_RETURN_ASYNC (seq);
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC (seq);
else
res = SPA_RESULT_OK;
}
return res;
}

View file

@ -92,6 +92,10 @@ struct _PinosNodePrivate
PinosDataLoop *data_loop;
PinosMainLoop *main_loop;
struct {
GPtrArray *links;
} rt;
};
G_DEFINE_TYPE (PinosNode, pinos_node, G_TYPE_OBJECT);
@ -380,14 +384,13 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
{
SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event;
PinosPort *p;
guint i;
if (!(p = find_node_port (priv->input_ports, this, ni->port_id)))
break;
for (i = 0; i < priv->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (priv->rt.links, i);
for (i = 0; i < p->links->len; i++) {
PinosLink *link = g_ptr_array_index (p->links, i);
if (link->input == NULL || link->input->port != ni->port_id)
continue;
link->in_ready++;
do_read_link (this, link);
@ -400,7 +403,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
SpaPortOutputInfo oinfo[1] = { 0, };
SpaResult res;
gboolean pushed = FALSE;
PinosPort *p;
guint i;
oinfo[0].port_id = ho->port_id;
@ -410,13 +412,13 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
break;
}
if (!(p = find_node_port (priv->output_ports, this, oinfo[0].port_id)))
break;
for (i = 0; i < p->links->len; i++) {
PinosLink *link = g_ptr_array_index (p->links, i);
for (i = 0; i < priv->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (priv->rt.links, i);
SpaRingbufferArea areas[2];
if (link->output == NULL || link->output->port != ho->port_id)
continue;
spa_ringbuffer_get_write_areas (&link->ringbuffer, areas);
if (areas[0].len > 0) {
link->queue[areas[0].offset] = oinfo[0].buffer_id;
@ -436,16 +438,12 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{
SpaResult res;
SpaNodeEventReuseBuffer *rb = (SpaNodeEventReuseBuffer *) event;
PinosPort *p;
guint i;
if (!(p = find_node_port (priv->input_ports, this, rb->port_id)))
break;
for (i = 0; i < priv->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (priv->rt.links, i);
for (i = 0; i < p->links->len; i++) {
PinosLink *link = g_ptr_array_index (p->links, i);
if (link->output == NULL)
if (link->output == NULL || link->output->port != rb->port_id)
continue;
if ((res = spa_node_port_reuse_buffer (link->output->node->node,
@ -876,6 +874,7 @@ pinos_node_init (PinosNode * node)
node);
priv->state = PINOS_NODE_STATE_CREATING;
pinos_node1_set_state (priv->iface, priv->state);
priv->rt.links = g_ptr_array_new_full (256, NULL);
}
/**
@ -1059,8 +1058,25 @@ pinos_node_get_free_port (PinosNode *node,
}
static SpaResult
do_remove_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosNodePrivate *priv = this->priv;
PinosLink *link = ((PinosLink**)data)[0];
g_ptr_array_remove_fast (priv->rt.links, link);
return SPA_RESULT_OK;
}
static void
do_remove_link (PinosLink *link, PinosNode *node)
on_remove_link (PinosLink *link, PinosNode *node)
{
PinosPort *p;
PinosNode *n;
@ -1071,6 +1087,13 @@ do_remove_link (PinosLink *link, PinosNode *node)
if (g_ptr_array_remove_fast (p->links, link))
n->priv->n_used_output_links--;
spa_poll_invoke (&n->priv->data_loop->poll,
do_remove_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
n);
if (n->priv->n_used_output_links == 0 &&
n->priv->n_used_input_links == 0)
pinos_node_report_idle (n);
@ -1081,12 +1104,36 @@ do_remove_link (PinosLink *link, PinosNode *node)
if (g_ptr_array_remove_fast (p->links, link))
n->priv->n_used_input_links--;
spa_poll_invoke (&n->priv->data_loop->poll,
do_remove_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
n);
if (n->priv->n_used_output_links == 0 &&
n->priv->n_used_input_links == 0)
pinos_node_report_idle (n);
}
}
static SpaResult
do_add_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosNodePrivate *priv = this->priv;
PinosLink *link = ((PinosLink**)data)[0];
g_ptr_array_add (priv->rt.links, link);
return SPA_RESULT_OK;
}
/**
* pinos_node_link:
* @output_node: a #PinosNode
@ -1157,16 +1204,29 @@ pinos_node_link (PinosNode *output_node,
"properties", properties,
NULL);
g_signal_connect (link,
"remove",
(GCallback) on_remove_link,
output_node);
g_ptr_array_add (output_port->links, link);
g_ptr_array_add (input_port->links, link);
g_signal_connect (link,
"remove",
(GCallback) do_remove_link,
output_node);
output_node->priv->n_used_output_links++;
input_node->priv->n_used_input_links++;
spa_poll_invoke (&priv->data_loop->poll,
do_add_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
output_node);
spa_poll_invoke (&input_node->priv->data_loop->poll,
do_add_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
input_node);
}
return link;

View file

@ -87,6 +87,21 @@ typedef struct {
} SpaPollItem;
/**
* SpaPollInvokeFunc:
* @poll: a #SpaPoll
* @async: If this function was called async
* @seq: sequence number
* @size: size of data
* @data: data
* @user_data: extra user data
*
* Function called from SpaPoll::invoke. If @async is %true, invoke returned
* an async return value and this function should possibly schedule an async
* reply.
*
* Returns: the result of the invoke function
*/
typedef SpaResult (*SpaPollInvokeFunc) (SpaPoll *poll,
bool async,
uint32_t seq,
@ -132,6 +147,22 @@ struct _SpaPoll {
SpaResult (*remove_item) (SpaPoll *poll,
SpaPollItem *item);
/**
* SpaPoll::invoke:
* @poll: a #SpaPoll
* @func: function to call
* @seq: sequence number
* @size: size of data
* @data: data
* @user_data: extra user data
*
* Invoke @func from the poll context of @poll. @seq, @data, @size and
* @user_data are passed to @func.
*
* Returns: The result of @func when this function is already called in
* the @poll context, otherwise an async return value with @seq or, when
* @seq is %SPA_ID_INVALID, %SPA_RETURN_OK.
*/
SpaResult (*invoke) (SpaPoll *poll,
SpaPollInvokeFunc func,
uint32_t seq,

View file

@ -260,7 +260,7 @@ do_start (SpaPoll *poll,
ac.res = res;
spa_poll_invoke (this->state[0].main_loop,
do_send_event,
seq,
SPA_ID_INVALID,
sizeof (ac),
&ac,
this);
@ -289,7 +289,7 @@ do_pause (SpaPoll *poll,
ac.res = res;
spa_poll_invoke (this->state[0].main_loop,
do_send_event,
seq,
SPA_ID_INVALID,
sizeof (ac),
&ac,
this);