diff --git a/pinos/server/data-loop.c b/pinos/server/data-loop.c index 8a1db6e26..dabf627fa 100644 --- a/pinos/server/data-loop.c +++ b/pinos/server/data-loop.c @@ -156,7 +156,6 @@ loop (void *user_data) while (spa_ringbuffer_get_read_offset (&priv->buffer, &offset) > 0) { InvokeItem *item = SPA_MEMBER (priv->buffer_data, offset, InvokeItem); - g_debug ("data-loop %p: invoke %d", this, item->seq); item->func (p, true, item->seq, item->size, item->data, item->user_data); spa_ringbuffer_read_advance (&priv->buffer, item->item_size); } @@ -347,7 +346,10 @@ do_invoke (SpaPoll *poll, wakeup_thread (this); - res = SPA_RESULT_RETURN_ASYNC (seq); + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC (seq); + else + res = SPA_RESULT_OK; } return res; } diff --git a/pinos/server/node.c b/pinos/server/node.c index a51e0f382..34313ade7 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -92,6 +92,10 @@ struct _PinosNodePrivate PinosDataLoop *data_loop; PinosMainLoop *main_loop; + + struct { + GPtrArray *links; + } rt; }; G_DEFINE_TYPE (PinosNode, pinos_node, G_TYPE_OBJECT); @@ -380,14 +384,13 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) case SPA_NODE_EVENT_TYPE_NEED_INPUT: { SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event; - PinosPort *p; guint i; - if (!(p = find_node_port (priv->input_ports, this, ni->port_id))) - break; + for (i = 0; i < priv->rt.links->len; i++) { + PinosLink *link = g_ptr_array_index (priv->rt.links, i); - for (i = 0; i < p->links->len; i++) { - PinosLink *link = g_ptr_array_index (p->links, i); + if (link->input == NULL || link->input->port != ni->port_id) + continue; link->in_ready++; do_read_link (this, link); @@ -400,7 +403,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) SpaPortOutputInfo oinfo[1] = { 0, }; SpaResult res; gboolean pushed = FALSE; - PinosPort *p; guint i; oinfo[0].port_id = ho->port_id; @@ -410,13 +412,13 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) break; } - if (!(p = find_node_port (priv->output_ports, this, oinfo[0].port_id))) - break; - - for (i = 0; i < p->links->len; i++) { - PinosLink *link = g_ptr_array_index (p->links, i); + for (i = 0; i < priv->rt.links->len; i++) { + PinosLink *link = g_ptr_array_index (priv->rt.links, i); SpaRingbufferArea areas[2]; + if (link->output == NULL || link->output->port != ho->port_id) + continue; + spa_ringbuffer_get_write_areas (&link->ringbuffer, areas); if (areas[0].len > 0) { link->queue[areas[0].offset] = oinfo[0].buffer_id; @@ -436,16 +438,12 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) { SpaResult res; SpaNodeEventReuseBuffer *rb = (SpaNodeEventReuseBuffer *) event; - PinosPort *p; guint i; - if (!(p = find_node_port (priv->input_ports, this, rb->port_id))) - break; + for (i = 0; i < priv->rt.links->len; i++) { + PinosLink *link = g_ptr_array_index (priv->rt.links, i); - for (i = 0; i < p->links->len; i++) { - PinosLink *link = g_ptr_array_index (p->links, i); - - if (link->output == NULL) + if (link->output == NULL || link->output->port != rb->port_id) continue; if ((res = spa_node_port_reuse_buffer (link->output->node->node, @@ -876,6 +874,7 @@ pinos_node_init (PinosNode * node) node); priv->state = PINOS_NODE_STATE_CREATING; pinos_node1_set_state (priv->iface, priv->state); + priv->rt.links = g_ptr_array_new_full (256, NULL); } /** @@ -1059,8 +1058,25 @@ pinos_node_get_free_port (PinosNode *node, } +static SpaResult +do_remove_link (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosNode *this = user_data; + PinosNodePrivate *priv = this->priv; + PinosLink *link = ((PinosLink**)data)[0]; + + g_ptr_array_remove_fast (priv->rt.links, link); + + return SPA_RESULT_OK; +} + static void -do_remove_link (PinosLink *link, PinosNode *node) +on_remove_link (PinosLink *link, PinosNode *node) { PinosPort *p; PinosNode *n; @@ -1071,6 +1087,13 @@ do_remove_link (PinosLink *link, PinosNode *node) if (g_ptr_array_remove_fast (p->links, link)) n->priv->n_used_output_links--; + spa_poll_invoke (&n->priv->data_loop->poll, + do_remove_link, + SPA_ID_INVALID, + sizeof (PinosLink *), + &link, + n); + if (n->priv->n_used_output_links == 0 && n->priv->n_used_input_links == 0) pinos_node_report_idle (n); @@ -1081,12 +1104,36 @@ do_remove_link (PinosLink *link, PinosNode *node) if (g_ptr_array_remove_fast (p->links, link)) n->priv->n_used_input_links--; + spa_poll_invoke (&n->priv->data_loop->poll, + do_remove_link, + SPA_ID_INVALID, + sizeof (PinosLink *), + &link, + n); + if (n->priv->n_used_output_links == 0 && n->priv->n_used_input_links == 0) pinos_node_report_idle (n); } } +static SpaResult +do_add_link (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosNode *this = user_data; + PinosNodePrivate *priv = this->priv; + PinosLink *link = ((PinosLink**)data)[0]; + + g_ptr_array_add (priv->rt.links, link); + + return SPA_RESULT_OK; +} + /** * pinos_node_link: * @output_node: a #PinosNode @@ -1157,16 +1204,29 @@ pinos_node_link (PinosNode *output_node, "properties", properties, NULL); + g_signal_connect (link, + "remove", + (GCallback) on_remove_link, + output_node); + g_ptr_array_add (output_port->links, link); g_ptr_array_add (input_port->links, link); - g_signal_connect (link, - "remove", - (GCallback) do_remove_link, - output_node); - output_node->priv->n_used_output_links++; input_node->priv->n_used_input_links++; + + spa_poll_invoke (&priv->data_loop->poll, + do_add_link, + SPA_ID_INVALID, + sizeof (PinosLink *), + &link, + output_node); + spa_poll_invoke (&input_node->priv->data_loop->poll, + do_add_link, + SPA_ID_INVALID, + sizeof (PinosLink *), + &link, + input_node); } return link; diff --git a/spa/include/spa/poll.h b/spa/include/spa/poll.h index b18316066..00153d8e5 100644 --- a/spa/include/spa/poll.h +++ b/spa/include/spa/poll.h @@ -87,6 +87,21 @@ typedef struct { } SpaPollItem; +/** + * SpaPollInvokeFunc: + * @poll: a #SpaPoll + * @async: If this function was called async + * @seq: sequence number + * @size: size of data + * @data: data + * @user_data: extra user data + * + * Function called from SpaPoll::invoke. If @async is %true, invoke returned + * an async return value and this function should possibly schedule an async + * reply. + * + * Returns: the result of the invoke function + */ typedef SpaResult (*SpaPollInvokeFunc) (SpaPoll *poll, bool async, uint32_t seq, @@ -132,6 +147,22 @@ struct _SpaPoll { SpaResult (*remove_item) (SpaPoll *poll, SpaPollItem *item); + /** + * SpaPoll::invoke: + * @poll: a #SpaPoll + * @func: function to call + * @seq: sequence number + * @size: size of data + * @data: data + * @user_data: extra user data + * + * Invoke @func from the poll context of @poll. @seq, @data, @size and + * @user_data are passed to @func. + * + * Returns: The result of @func when this function is already called in + * the @poll context, otherwise an async return value with @seq or, when + * @seq is %SPA_ID_INVALID, %SPA_RETURN_OK. + */ SpaResult (*invoke) (SpaPoll *poll, SpaPollInvokeFunc func, uint32_t seq, diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index ccd1b1033..ec503cbfa 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -260,7 +260,7 @@ do_start (SpaPoll *poll, ac.res = res; spa_poll_invoke (this->state[0].main_loop, do_send_event, - seq, + SPA_ID_INVALID, sizeof (ac), &ac, this); @@ -289,7 +289,7 @@ do_pause (SpaPoll *poll, ac.res = res; spa_poll_invoke (this->state[0].main_loop, do_send_event, - seq, + SPA_ID_INVALID, sizeof (ac), &ac, this);