Remove port

Remove the port object. We don't use it as a dbus object and we don't
need it, we can use the link and node object directy.
Move poll code and event handler to the node.
This commit is contained in:
Wim Taymans 2016-09-02 19:51:23 +02:00
parent 463954a299
commit 7d3e46e7f9
22 changed files with 689 additions and 1848 deletions

View file

@ -19,6 +19,9 @@
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <gio/gio.h>
#include <gio/gunixfdlist.h>
@ -56,7 +59,14 @@ struct _PinosNodePrivate
PinosProperties *properties;
GHashTable *ports;
SpaPollFd fds[16];
unsigned int n_fds;
SpaPollItem poll;
gboolean running;
pthread_t thread;
GHashTable *links;
};
G_DEFINE_ABSTRACT_TYPE (PinosNode, pinos_node, G_TYPE_OBJECT);
@ -84,24 +94,21 @@ enum
static guint signals[LAST_SIGNAL] = { 0 };
static gboolean
node_set_state (PinosNode *node,
PinosNodeState state)
static PinosDirection
get_port_direction (PinosNode *node, guint id)
{
return FALSE;
}
PinosNodePrivate *priv = node->priv;
PinosDirection direction;
static void
do_remove_port (PinosPort *port, PinosNode *node)
{
pinos_node_remove_port (node, port);
direction = id < priv->max_input_ports ? PINOS_DIRECTION_INPUT : PINOS_DIRECTION_OUTPUT;
return direction;
}
static void
update_port_ids (PinosNode *node, gboolean create)
{
PinosNodePrivate *priv = node->priv;
guint i;
if (node->node == NULL)
return;
@ -120,147 +127,251 @@ update_port_ids (PinosNode *node, gboolean create)
priv->input_port_ids,
priv->max_output_ports,
priv->output_port_ids);
}
if (create) {
for (i = 0; i < priv->n_input_ports; i++)
pinos_node_add_port (node, priv->input_port_ids[i], NULL);
for (i = 0; i < priv->n_output_ports; i++)
pinos_node_add_port (node, priv->output_port_ids[i], NULL);
static void *
loop (void *user_data)
{
PinosNode *this = user_data;
PinosNodePrivate *priv = this->priv;
int r;
g_debug ("node %p: enter thread", this);
while (priv->running) {
SpaPollNotifyData ndata;
r = poll ((struct pollfd *) priv->fds, priv->n_fds, -1);
if (r < 0) {
if (errno == EINTR)
continue;
break;
}
if (r == 0) {
g_debug ("node %p: select timeout", this);
break;
}
if (priv->fds[0].revents & POLLIN) {
uint64_t u;
if (read (priv->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
g_warning ("node %p: failed to read fd", strerror (errno));
g_debug ("node %p: event signaled", this);
break;
}
if (priv->poll.after_cb) {
ndata.fds = priv->poll.fds;
ndata.n_fds = priv->poll.n_fds;
ndata.user_data = priv->poll.user_data;
priv->poll.after_cb (&ndata);
}
}
g_debug ("node %p: leave thread", this);
return NULL;
}
static void
start_thread (PinosNode *this)
{
PinosNodePrivate *priv = this->priv;
int err;
if (!priv->running) {
priv->running = true;
if ((err = pthread_create (&priv->thread, NULL, loop, this)) != 0) {
g_warning ("node %p: can't create thread", strerror (err));
priv->running = false;
}
}
}
static PinosPort *
node_add_port (PinosNode *node,
guint id,
GError **error)
static void
stop_thread (PinosNode *this)
{
PinosNodePrivate *priv = node->priv;
PinosPort *port;
PinosDirection direction;
PinosNodePrivate *priv = this->priv;
update_port_ids (node, FALSE);
if (priv->running) {
uint64_t u = 1;
direction = id < priv->max_input_ports ? PINOS_DIRECTION_INPUT : PINOS_DIRECTION_OUTPUT;
if (write (priv->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
g_warning ("node %p: failed to write fd", strerror (errno));
port = g_object_new (PINOS_TYPE_PORT,
"daemon", priv->daemon,
"node", node,
"direction", direction,
"id", id,
NULL);
if (port) {
g_hash_table_insert (priv->ports, GUINT_TO_POINTER (port->id), port);
g_signal_connect (port, "remove", (GCallback) do_remove_port, node);
g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, port);
priv->running = false;
pthread_join (priv->thread, NULL);
}
return port;
}
static void
pause_node (PinosNode *this)
{
SpaResult res;
SpaCommand cmd;
g_debug ("node %p: pause node", this);
cmd.type = SPA_COMMAND_PAUSE;
if ((res = spa_node_send_command (this->node, &cmd)) < 0)
g_debug ("got error %d", res);
}
static void
suspend_node (PinosNode *this)
{
SpaResult res;
g_debug ("node %p: suspend node", this);
if ((res = spa_node_port_set_format (this->node, 0, 0, NULL)) < 0)
g_warning ("error unset format output: %d", res);
}
static gboolean
node_remove_port (PinosNode *node,
PinosPort *port)
node_set_state (PinosNode *this,
PinosNodeState state)
{
PinosNodePrivate *priv = node->priv;
g_debug ("node %p: set state %s", this, pinos_node_state_as_string (state));
g_debug ("node %p: removed port %u", node, port->id);
g_object_ref (port);
if (g_hash_table_remove (priv->ports, GUINT_TO_POINTER (port->id)))
g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, port);
g_object_unref (port);
switch (state) {
case PINOS_NODE_STATE_SUSPENDED:
suspend_node (this);
break;
case PINOS_NODE_STATE_INITIALIZING:
break;
case PINOS_NODE_STATE_IDLE:
pause_node (this);
break;
case PINOS_NODE_STATE_RUNNING:
break;
case PINOS_NODE_STATE_ERROR:
break;
}
pinos_node_update_state (this, state);
return TRUE;
}
static gboolean
handle_add_port (PinosNode1 *interface,
GDBusMethodInvocation *invocation,
PinosDirection arg_direction,
guint arg_id,
gpointer user_data)
static void
on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
{
PinosNode *node = user_data;
PinosNodePrivate *priv = node->priv;
const gchar *sender;
PinosPort *port;
GError *error = NULL;
PinosNode *this = user_data;
PinosNodePrivate *priv = this->priv;
sender = g_dbus_method_invocation_get_sender (invocation);
if (g_strcmp0 (priv->sender, sender) != 0)
goto not_allowed;
switch (event->type) {
case SPA_EVENT_TYPE_PORT_ADDED:
{
SpaEventPortAdded *pa = event->data;
port = pinos_node_add_port (node, arg_id, &error);
if (port == NULL)
goto no_port;
update_port_ids (this, FALSE);
g_debug ("node %p: add port %p", node, port);
g_dbus_method_invocation_return_value (invocation,
g_variant_new ("()"));
g_signal_emit (this, signals[SIGNAL_PORT_ADDED], 0, get_port_direction (this, pa->port_id),
pa->port_id);
break;
}
case SPA_EVENT_TYPE_PORT_REMOVED:
{
SpaEventPortRemoved *pr = event->data;
return TRUE;
update_port_ids (this, FALSE);
/* ERRORS */
not_allowed:
{
g_debug ("sender %s is not owner of node with sender %s", sender, priv->sender);
g_dbus_method_invocation_return_dbus_error (invocation,
"org.pinos.Error", "not node owner");
return TRUE;
}
no_port:
{
g_debug ("node %p: could create port", node);
g_dbus_method_invocation_return_dbus_error (invocation,
"org.pinos.Error", "can't create port");
return TRUE;
g_signal_emit (this, signals[SIGNAL_PORT_REMOVED], 0, pr->port_id);
break;
}
case SPA_EVENT_TYPE_STATE_CHANGE:
{
SpaEventStateChange *sc = event->data;
pinos_node_update_node_state (PINOS_NODE (this), sc->state);
switch (sc->state) {
case SPA_NODE_STATE_CONFIGURE:
{
GList *links, *walk;
links = pinos_node_get_links (this);
for (walk = links; walk; walk = g_list_next (walk)) {
PinosLink *link = walk->data;
pinos_link_activate (link);
}
g_list_free (links);
}
default:
break;
}
break;
}
case SPA_EVENT_TYPE_ADD_POLL:
{
SpaPollItem *poll = event->data;
unsigned int i;
g_debug ("node %p: add poll %d", this, poll->n_fds);
priv->poll = *poll;
priv->poll.fds = &priv->fds[priv->n_fds];
for (i = 0; i < poll->n_fds; i++)
priv->fds[priv->n_fds++] = poll->fds[i];
start_thread (this);
break;
}
case SPA_EVENT_TYPE_REMOVE_POLL:
{
SpaPollItem *poll = event->data;
g_debug ("node %p: remove poll %d", this, poll->n_fds);
priv->n_fds -= poll->n_fds;
stop_thread (this);
break;
}
case SPA_EVENT_TYPE_HAVE_OUTPUT:
{
PinosLink *link;
SpaOutputInfo oinfo[1] = { 0, };
SpaResult res;
if ((res = spa_node_port_pull_output (node, 1, oinfo)) < 0)
g_warning ("node %p: got pull error %d, %d", this, res, oinfo[0].status);
link = g_hash_table_lookup (priv->links, GUINT_TO_POINTER (oinfo[0].port_id));
if (link) {
SpaInputInfo iinfo[1];
iinfo[0].port_id = link->input_port;
iinfo[0].buffer_id = oinfo[0].buffer_id;
iinfo[0].flags = SPA_INPUT_FLAG_NONE;
if ((res = spa_node_port_push_input (link->input_node->node, 1, iinfo)) < 0)
g_warning ("node %p: error pushing buffer: %d, %d", this, res, iinfo[0].status);
}
break;
}
case SPA_EVENT_TYPE_REUSE_BUFFER:
{
PinosLink *link;
SpaResult res;
SpaEventReuseBuffer *rb = event->data;
link = g_hash_table_lookup (priv->links, GUINT_TO_POINTER (rb->port_id));
if (link) {
if ((res = spa_node_port_reuse_buffer (link->output_node->node,
link->output_port,
rb->buffer_id)) < 0)
g_warning ("node %p: error reuse buffer: %d", node, res);
}
break;
}
default:
g_debug ("node %p: got event %d", this, event->type);
break;
}
}
static gboolean
handle_remove_port (PinosNode1 *interface,
GDBusMethodInvocation *invocation,
guint arg_id,
gpointer user_data)
{
PinosNode *node = user_data;
PinosNodePrivate *priv = node->priv;
const gchar *sender;
PinosPort *port;
sender = g_dbus_method_invocation_get_sender (invocation);
if (g_strcmp0 (priv->sender, sender) != 0)
goto not_allowed;
port = pinos_node_find_port_by_id (node, arg_id);
if (port == NULL)
goto no_port;
if (!pinos_node_remove_port (node, port))
goto no_port;
g_debug ("node %p: remove port %u", node, arg_id);
g_dbus_method_invocation_return_value (invocation,
g_variant_new ("()"));
return TRUE;
not_allowed:
{
g_debug ("sender %s is not owner of node with sender %s", sender, priv->sender);
g_dbus_method_invocation_return_dbus_error (invocation,
"org.pinos.Error", "not node owner");
return TRUE;
}
no_port:
{
g_debug ("node %p: could remove port", node);
g_dbus_method_invocation_return_dbus_error (invocation,
"org.pinos.Error", "can't remove port");
return TRUE;
}
}
static gboolean
handle_remove (PinosNode1 *interface,
GDBusMethodInvocation *invocation,
@ -417,22 +528,31 @@ on_property_notify (GObject *obj,
static void
pinos_node_constructed (GObject * obj)
{
PinosNode *node = PINOS_NODE (obj);
PinosNodePrivate *priv = node->priv;
PinosNode *this = PINOS_NODE (obj);
PinosNodePrivate *priv = this->priv;
SpaResult res;
g_debug ("node %p: constructed", node);
g_debug ("node %p: constructed", this);
g_signal_connect (node, "notify", (GCallback) on_property_notify, node);
g_signal_connect (this, "notify", (GCallback) on_property_notify, this);
G_OBJECT_CLASS (pinos_node_parent_class)->constructed (obj);
update_port_ids (node, TRUE);
priv->fds[0].fd = eventfd (0, 0);
priv->fds[0].events = POLLIN | POLLPRI | POLLERR;
priv->fds[0].revents = 0;
priv->n_fds = 1;
if ((res = spa_node_set_event_callback (this->node, on_node_event, this)) < 0)
g_warning ("node %p: error setting callback", this);
update_port_ids (this, TRUE);
if (priv->sender == NULL) {
priv->sender = g_strdup (pinos_daemon_get_sender (priv->daemon));
}
on_property_notify (G_OBJECT (node), NULL, node);
on_property_notify (G_OBJECT (this), NULL, this);
node_register_object (node);
node_register_object (this);
}
static void
@ -441,12 +561,13 @@ pinos_node_dispose (GObject * obj)
PinosNode *node = PINOS_NODE (obj);
PinosNodePrivate *priv = node->priv;
pinos_node_set_state (node, PINOS_NODE_STATE_SUSPENDED);
g_debug ("node %p: dispose", node);
pinos_node_set_state (node, PINOS_NODE_STATE_SUSPENDED);
stop_thread (node);
node_unregister_object (node);
g_hash_table_unref (priv->ports);
g_hash_table_unref (priv->links);
G_OBJECT_CLASS (pinos_node_parent_class)->dispose (obj);
}
@ -582,8 +703,9 @@ pinos_node_class_init (PinosNodeClass * klass)
NULL,
g_cclosure_marshal_generic,
G_TYPE_NONE,
1,
PINOS_TYPE_PORT);
2,
PINOS_TYPE_DIRECTION,
G_TYPE_UINT);
signals[SIGNAL_PORT_REMOVED] = g_signal_new ("port-removed",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
@ -593,11 +715,9 @@ pinos_node_class_init (PinosNodeClass * klass)
g_cclosure_marshal_generic,
G_TYPE_NONE,
1,
PINOS_TYPE_PORT);
G_TYPE_UINT);
node_class->set_state = node_set_state;
node_class->add_port = node_add_port;
node_class->remove_port = node_remove_port;
}
static void
@ -607,22 +727,13 @@ pinos_node_init (PinosNode * node)
g_debug ("node %p: new", node);
priv->iface = pinos_node1_skeleton_new ();
g_signal_connect (priv->iface, "handle-add-port",
(GCallback) handle_add_port,
node);
g_signal_connect (priv->iface, "handle-remove-port",
(GCallback) handle_remove_port,
node);
g_signal_connect (priv->iface, "handle-remove",
(GCallback) handle_remove,
node);
priv->state = PINOS_NODE_STATE_SUSPENDED;
pinos_node1_set_state (priv->iface, PINOS_NODE_STATE_SUSPENDED);
priv->ports = g_hash_table_new_full (g_direct_hash,
g_direct_equal,
NULL,
(GDestroyNotify) g_object_unref);
priv->links = g_hash_table_new (g_direct_hash, g_direct_equal);
}
/**
@ -783,65 +894,6 @@ pinos_node_remove (PinosNode *node)
g_signal_emit (node, signals[SIGNAL_REMOVE], 0, NULL);
}
/**
* pinos_node_add_port:
* @node: a #PinosNode
* @direction: the direction of the port
* @error: location of #GError
*
* Add the #PinosPort to @node
*
* Returns: a new #PinosPort or %NULL
*/
PinosPort *
pinos_node_add_port (PinosNode *node,
guint id,
GError **error)
{
PinosNodeClass *klass;
PinosPort *port;
g_return_val_if_fail (PINOS_IS_NODE (node), NULL);
klass = PINOS_NODE_GET_CLASS (node);
if (!klass->add_port) {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "add-port not implemented");
return NULL;
}
g_debug ("node %p: add port", node);
port = klass->add_port (node, id, error);
return port;
}
/**
* pinos_node_remove_port:
* @node: a #PinosNode
* @port: a #PinosPort
*
* Remove @port from @node
*
* Returns: %TRUE when the port was removed
*/
gboolean
pinos_node_remove_port (PinosNode *node, PinosPort *port)
{
PinosNodeClass *klass;
gboolean res = FALSE;
g_return_val_if_fail (PINOS_IS_NODE (node), FALSE);
g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
klass = PINOS_NODE_GET_CLASS (node);
if (!klass->remove_port)
return FALSE;
res = klass->remove_port (node, port);
return res;
}
/**
* pinos_node_get_free_port_id:
* @node: a #PinosNode
@ -872,11 +924,15 @@ pinos_node_get_free_port_id (PinosNode *node,
ports = priv->output_port_ids;
}
g_debug ("direction %d max %u, n %u\n", direction, max_ports, n_ports);
g_debug ("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports);
for (i = 0; i < n_ports; i++) {
if (free_port < ports[i])
break;
if (g_hash_table_lookup (priv->links, GUINT_TO_POINTER (free_port)) == NULL && free_port < max_ports)
return free_port;
free_port = ports[i] + 1;
}
if (free_port >= max_ports)
@ -885,43 +941,110 @@ pinos_node_get_free_port_id (PinosNode *node,
return free_port;
}
/**
* pinos_node_find_port_by_id:
* @node: a #PinosNode
* @id: a #PinosPort id
*
* Get the port with @id @node.
*
* Returns: a #PinosPort with @id or %NULL when not found
*/
PinosPort *
pinos_node_find_port_by_id (PinosNode *node, guint id)
static void
do_remove_link (PinosLink *link, PinosNode *node)
{
PinosNodePrivate *priv;
g_hash_table_remove (link->output_node->priv->links, GUINT_TO_POINTER (link->output_port));
if (g_hash_table_size (link->output_node->priv->links) == 0)
pinos_node_report_idle (link->output_node);
g_return_val_if_fail (PINOS_IS_NODE (node), NULL);
priv = node->priv;
return g_hash_table_lookup (priv->ports, GUINT_TO_POINTER (id));
g_hash_table_remove (link->input_node->priv->links, GUINT_TO_POINTER (link->input_port));
if (g_hash_table_size (link->input_node->priv->links) == 0)
pinos_node_report_idle (link->input_node);
}
/**
* pinos_node_get_ports:
* pinos_node_link:
* @output_node: a #PinosNode
* @output_port: a port
* @input_node: a #PinosNode
* @input_port: a port
* @format_filter: a format filter
* @properties: extra properties
*
* Make a link between @output_node and @input_node on the given ports.
*
* If the ports were already linked, the existing linke will be returned.
*
* If the source port was linked to a different destination node or port, it
* will be relinked.
*
* Returns: a new #PinosLink
*/
PinosLink *
pinos_node_link (PinosNode *output_node,
guint output_port,
PinosNode *input_node,
guint input_port,
GPtrArray *format_filter,
PinosProperties *properties)
{
PinosNodePrivate *priv;
PinosLink *link;
g_return_val_if_fail (PINOS_IS_NODE (output_node), NULL);
g_return_val_if_fail (PINOS_IS_NODE (input_node), NULL);
if (get_port_direction (output_node, output_port) != PINOS_DIRECTION_OUTPUT) {
PinosNode *tmp;
guint tmp_port;
tmp = output_node;
output_node = input_node;
input_node = tmp;
tmp_port = output_port;
output_port = input_port;
input_port = tmp_port;
}
priv = output_node->priv;
link = g_hash_table_lookup (priv->links, GUINT_TO_POINTER (output_port));
if (link) {
link->input_node = input_node;
link->input_port = input_port;
g_object_ref (link);
} else {
link = g_object_new (PINOS_TYPE_LINK,
"daemon", priv->daemon,
"output-node", output_node,
"output-port", output_port,
"input-node", input_node,
"input-port", input_port,
"format-filter", format_filter,
"properties", properties,
NULL);
g_signal_connect (link,
"remove",
(GCallback) do_remove_link,
output_node);
g_hash_table_insert (priv->links, GUINT_TO_POINTER (output_port), link);
g_hash_table_insert (input_node->priv->links, GUINT_TO_POINTER (input_port), link);
}
return link;
}
/**
* pinos_node_get_links:
* @node: a #PinosNode
*
* Get the ports in @node.
* Get the links in @node.
*
* Returns: a #GList of ports g_list_free after usage.
* Returns: a #GList of #PinosLink g_list_free after usage.
*/
GList *
pinos_node_get_ports (PinosNode *node)
pinos_node_get_links (PinosNode *node)
{
PinosNodePrivate *priv;
g_return_val_if_fail (PINOS_IS_NODE (node), NULL);
priv = node->priv;
return g_hash_table_get_values (priv->ports);
return g_hash_table_get_values (priv->links);
}
static void