More hacking

Add connection message for PORT_COMMAND
Add rtkit support to ask for realtime priority
work on stream states and improve negotiation
Rework of port linking works, keep separate state for realtime threads
and use message passing to update the state.
Don't try to link nodes that are removed.
Open the device in the ALSA monitor to detect source or sink
Implement send_command as async methods on the plugins, use async
replies to sync start and stop.
Work on alsa sink.
Implement async PAUSE/START on v4l2 src. move the STREAMON/OFF calls to
the mainloop because they have high latency, add the poll descriptors
from the data loop.
This commit is contained in:
Wim Taymans 2016-10-28 16:56:33 +02:00
parent 984375c0b2
commit 3f4ccaaea2
32 changed files with 1335 additions and 545 deletions

View file

@ -53,18 +53,6 @@ free_node_port (PinosPort *np)
g_slice_free (PinosPort, np);
}
static PinosPort *
find_node_port (GList *ports, PinosNode *node, uint32_t port)
{
GList *walk;
for (walk = ports; walk; walk = g_list_next (walk)) {
PinosPort *np = walk->data;
if (np->node == node && np->port == port)
return np;
}
return NULL;
}
struct _PinosNodePrivate
{
PinosDaemon *daemon;
@ -74,6 +62,8 @@ struct _PinosNodePrivate
gchar *object_path;
gchar *name;
uint32_t seq;
gboolean async_init;
unsigned int max_input_ports;
unsigned int max_output_ports;
@ -340,14 +330,12 @@ do_read_link (SpaPoll *poll,
size_t offset;
SpaResult res;
if (spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) {
if (link->input == NULL)
return SPA_RESULT_OK;
while (link->in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) {
SpaPortInputInfo iinfo[1];
if (link->in_ready <= 0 || link->input == NULL)
return SPA_RESULT_OK;
link->in_ready--;
iinfo[0].port_id = link->input->port;
iinfo[0].buffer_id = link->queue[offset];
iinfo[0].flags = SPA_PORT_INPUT_FLAG_NONE;
@ -356,6 +344,7 @@ do_read_link (SpaPoll *poll,
g_warning ("node %p: error pushing buffer: %d, %d", this, res, iinfo[0].status);
spa_ringbuffer_read_advance (&link->ringbuffer, 1);
link->in_ready--;
}
return SPA_RESULT_OK;
}
@ -379,8 +368,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
SpaNodeEventAsyncComplete *ac = (SpaNodeEventAsyncComplete *) event;
g_debug ("node %p: async complete event %d %d", this, ac->seq, ac->res);
pinos_main_loop_defer_complete (priv->main_loop, this, ac->seq, ac->res);
g_signal_emit (this, signals[SIGNAL_ASYNC_COMPLETE], 0, ac->seq, ac->res);
if (!pinos_main_loop_defer_complete (priv->main_loop, this, ac->seq, ac->res))
g_signal_emit (this, signals[SIGNAL_ASYNC_COMPLETE], 0, ac->seq, ac->res);
break;
}
@ -410,8 +399,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event;
SpaPortOutputInfo oinfo[1] = { 0, };
SpaResult res;
gboolean pushed = FALSE;
guint i;
gboolean pushed = FALSE;
oinfo[0].port_id = ho->port_id;
@ -422,9 +411,12 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
for (i = 0; i < priv->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (priv->rt.links, i);
PinosPort *output = link->output;
PinosPort *input = link->input;
size_t offset;
if (link->output == NULL || link->output->port != ho->port_id)
if (output == NULL || input == NULL ||
output->node->node != node || output->port != ho->port_id)
continue;
if (spa_ringbuffer_get_write_offset (&link->ringbuffer, &offset) > 0) {
@ -441,7 +433,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
}
}
if (!pushed) {
g_debug ("node %p: discarded buffer %u", this, oinfo[0].buffer_id);
if ((res = spa_node_port_reuse_buffer (node, oinfo[0].port_id, oinfo[0].buffer_id)) < 0)
g_warning ("node %p: error reuse buffer: %d", node, res);
}
@ -456,7 +447,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
for (i = 0; i < priv->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (priv->rt.links, i);
if (link->input == NULL || link->input->port != rb->port_id)
if (link->input == NULL || link->input->port != rb->port_id || link->output == NULL)
continue;
if ((res = spa_node_port_reuse_buffer (link->output->node->node,
@ -1015,7 +1006,11 @@ pinos_node_remove (PinosNode *node)
{
g_return_if_fail (PINOS_IS_NODE (node));
if (node->flags & PINOS_NODE_FLAG_REMOVING)
return;
g_debug ("node %p: remove", node);
node->flags |= PINOS_NODE_FLAG_REMOVING;
g_signal_emit (node, signals[SIGNAL_REMOVE], 0, NULL);
}
@ -1071,64 +1066,6 @@ pinos_node_get_free_port (PinosNode *node,
}
static SpaResult
do_remove_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosNodePrivate *priv = this->priv;
PinosLink *link = ((PinosLink**)data)[0];
g_ptr_array_remove_fast (priv->rt.links, link);
return SPA_RESULT_OK;
}
static void
on_remove_link (PinosLink *link, PinosNode *node)
{
PinosPort *p;
PinosNode *n;
if (link->output) {
n = link->output->node;
if ((p = find_node_port (n->priv->output_ports, n, link->output->port)))
if (g_ptr_array_remove_fast (p->links, link))
n->priv->n_used_output_links--;
spa_poll_invoke (&n->priv->data_loop->poll,
do_remove_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
n);
if (n->priv->n_used_output_links == 0 &&
n->priv->n_used_input_links == 0)
pinos_node_report_idle (n);
}
if (link->input->node) {
n = link->input->node;
if ((p = find_node_port (n->priv->input_ports, n, link->input->port)))
if (g_ptr_array_remove_fast (p->links, link))
n->priv->n_used_input_links--;
spa_poll_invoke (&n->priv->data_loop->poll,
do_remove_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
n);
if (n->priv->n_used_output_links == 0 &&
n->priv->n_used_input_links == 0)
pinos_node_report_idle (n);
}
}
static SpaResult
do_add_link (SpaPoll *poll,
@ -1147,67 +1084,79 @@ do_add_link (SpaPoll *poll,
return SPA_RESULT_OK;
}
static PinosLink *
find_link (PinosPort *output_port, PinosPort *input_port)
{
guint i;
for (i = 0; i < output_port->links->len; i++) {
PinosLink *pl = g_ptr_array_index (output_port->links, i);
if (pl->input == input_port) {
return pl;
}
}
return NULL;
}
PinosLink *
pinos_port_get_link (PinosPort *output_port,
PinosPort *input_port)
{
g_return_val_if_fail (output_port != NULL, NULL);
g_return_val_if_fail (input_port != NULL, NULL);
return find_link (output_port, input_port);
}
/**
* pinos_node_link:
* @output_node: a #PinosNode
* pinos_port_link:
* @output_port: an output port
* @input_port: an input port
* @format_filter: a format filter
* @properties: extra properties
* @error: an error or %NULL
*
* Make a link between @output_port and @input_port with the given ids
* Make a link between @output_port and @input_port
*
* If the ports were already linked, the existing links will be returned.
*
* If the output id was linked to a different input node or id, it
* will be relinked.
*
* Returns: a new #PinosLink or %NULL and @error is set.
*/
PinosLink *
pinos_node_link (PinosNode *output_node,
PinosPort *output_port,
pinos_port_link (PinosPort *output_port,
PinosPort *input_port,
GPtrArray *format_filter,
PinosProperties *properties,
GError **error)
{
PinosNodePrivate *priv;
PinosNode *input_node;
PinosLink *link = NULL;
guint i;
PinosNode *input_node, *output_node;
PinosLink *link;
g_return_val_if_fail (PINOS_IS_NODE (output_node), NULL);
g_return_val_if_fail (output_port != NULL, NULL);
g_return_val_if_fail (input_port != NULL, NULL);
output_node = output_port->node;
priv = output_node->priv;
input_node = input_port->node;
g_debug ("node %p: link %u %p:%u", output_node, output_port->port, input_node, input_port->port);
g_debug ("port link %p:%u -> %p:%u", output_node, output_port->port, input_node, input_port->port);
if (output_node == input_node)
goto same_node;
for (i = 0; i < output_port->links->len; i++) {
PinosLink *pl = g_ptr_array_index (output_port->links, i);
if (pl->input->node == input_node && pl->input == input_port) {
link = pl;
break;
}
}
if (input_port->links->len > 0)
goto was_linked;
link = find_link (output_port, input_port);
if (link) {
/* FIXME */
link->input->node = input_node;
link->input = input_port;
g_object_ref (link);
} else {
input_node->live = output_node->live;
if (output_node->clock)
input_node->clock = output_node->clock;
g_debug ("node %p: clock %p", output_node, output_node->clock);
g_debug ("node %p: clock %p, live %d", output_node, output_node->clock, output_node->live);
link = g_object_new (PINOS_TYPE_LINK,
"daemon", priv->daemon,
@ -1217,11 +1166,6 @@ pinos_node_link (PinosNode *output_node,
"properties", properties,
NULL);
g_signal_connect (link,
"remove",
(GCallback) on_remove_link,
output_node);
g_ptr_array_add (output_port->links, link);
g_ptr_array_add (input_port->links, link);
@ -1251,6 +1195,187 @@ same_node:
"can't link a node to itself");
return NULL;
}
was_linked:
{
g_set_error (error,
PINOS_ERROR,
PINOS_ERROR_NODE_LINK,
"input port was already linked");
return NULL;
}
}
static SpaResult
pinos_port_pause (PinosPort *port)
{
SpaNodeCommand cmd;
cmd.type = SPA_NODE_COMMAND_PAUSE;
cmd.size = sizeof (cmd);
return spa_node_port_send_command (port->node->node,
port->direction,
port->port,
&cmd);
}
static SpaResult
do_remove_link_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosNodePrivate *priv = this->priv;
PinosLink *link = ((PinosLink**)data)[0];
g_debug ("port %p: finish unlink", port);
if (port->direction == PINOS_DIRECTION_OUTPUT) {
if (g_ptr_array_remove_fast (port->links, link))
priv->n_used_output_links--;
link->output = NULL;
} else {
if (g_ptr_array_remove_fast (port->links, link))
priv->n_used_input_links--;
link->input = NULL;
}
if (priv->n_used_output_links == 0 &&
priv->n_used_input_links == 0) {
pinos_node_report_idle (this);
}
if (!port->allocated) {
g_debug ("port %p: clear buffers on port", port);
spa_node_port_use_buffers (port->node->node,
port->direction,
port->port,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
}
pinos_main_loop_defer_complete (priv->main_loop,
port,
seq,
SPA_RESULT_OK);
g_object_unref (link);
g_object_unref (port->node);
return SPA_RESULT_OK;
}
static SpaResult
do_remove_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosNodePrivate *priv = this->priv;
PinosLink *link = ((PinosLink**)data)[0];
SpaResult res;
pinos_port_pause (port);
g_ptr_array_remove_fast (priv->rt.links, link);
res = spa_poll_invoke (&priv->main_loop->poll,
do_remove_link_done,
seq,
sizeof (PinosLink *),
&link,
port);
return res;
}
SpaResult
pinos_port_unlink (PinosPort *port, PinosLink *link)
{
SpaResult res;
g_debug ("port %p: start unlink %p", port, link);
g_object_ref (link);
g_object_ref (port->node);
res = spa_poll_invoke (&port->node->priv->data_loop->poll,
do_remove_link,
port->node->priv->seq++,
sizeof (PinosLink *),
&link,
port);
return res;
}
static SpaResult
do_clear_buffers_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosNodePrivate *priv = this->priv;
SpaResult res;
g_debug ("port %p: clear buffers finish", port);
res = spa_node_port_use_buffers (port->node->node,
port->direction,
port->port,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
pinos_main_loop_defer_complete (priv->main_loop,
port,
seq,
res);
return res;
}
static SpaResult
do_clear_buffers (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosNodePrivate *priv = this->priv;
SpaResult res;
pinos_port_pause (port);
res = spa_poll_invoke (&priv->main_loop->poll,
do_clear_buffers_done,
seq,
0, NULL,
port);
return res;
}
SpaResult
pinos_port_clear_buffers (PinosPort *port)
{
SpaResult res;
g_debug ("port %p: clear buffers", port);
res = spa_poll_invoke (&port->node->priv->data_loop->poll,
do_clear_buffers,
port->node->priv->seq++,
0, NULL,
port);
return res;
}
/**