reintroduce channels

Bring back the channel object. Making a node and port on the client side
was rather awkward because of the async nature of many methods. It feels
better to have a specific communication channel object to interface with
a server side port.
Use port activate/deactivate to start/stop streams
Remove links from the ports. We let other objects install a callback on
the port to receive and route buffers.
This commit is contained in:
Wim Taymans 2016-07-20 17:29:34 +02:00
parent eefe6aacb9
commit e167d30296
26 changed files with 2840 additions and 675 deletions

View file

@ -20,6 +20,7 @@
#include <sys/socket.h>
#include <string.h>
#include <gio/gio.h>
#include <gio/gunixfdlist.h>
#include <gio/gunixfdmessage.h>
@ -31,6 +32,9 @@
#include "pinos/client/private.h"
#define MAX_BUFFER_SIZE 1024
#define MAX_FDS 16
struct _PinosStreamPrivate
{
PinosContext *context;
@ -49,11 +53,18 @@ struct _PinosStreamPrivate
GBytes *format;
PinosNode *node;
PinosPort *port;
GDBusProxy *channel;
gboolean disconnecting;
PinosStreamMode mode;
GSocket *socket;
GSource *socket_source;
int fd;
PinosBuffer *buffer;
PinosBuffer recv_buffer;
guint8 recv_data[MAX_BUFFER_SIZE];
int recv_fds[MAX_FDS];
};
#define PINOS_STREAM_GET_PRIVATE(obj) \
@ -184,100 +195,6 @@ stream_set_state (PinosStream *stream,
}
}
static GDBusProxy *
get_proxy (PinosStream *stream, GList *list, const gchar *path)
{
GList *walk;
for (walk = list; walk; walk = g_list_next (walk)) {
GDBusProxy *proxy = walk->data;
if (!g_strcmp0 (g_dbus_proxy_get_object_path (proxy), path)) {
return proxy;
}
}
return NULL;
}
static GDBusProxy *
get_port_proxy (PinosStream *stream, const gchar *path)
{
return get_proxy (stream, stream->priv->context->priv->ports, path);
}
static GDBusProxy *
get_node_proxy (PinosStream *stream, const gchar *path)
{
return get_proxy (stream, stream->priv->context->priv->nodes, path);
}
static GDBusProxy *
get_peer_port_proxy (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
GDBusProxy *port_proxy;
GDBusProxy *peer_proxy = NULL;
if (priv->port == NULL)
return NULL;
g_object_get (priv->port, "proxy", &port_proxy, NULL);
if (port_proxy) {
GVariant *v;
v = g_dbus_proxy_get_cached_property (port_proxy, "Peers");
if (v) {
GVariantIter *iter;
gchar *peer_path;
g_variant_get (v, "ao", &iter);
if (g_variant_iter_next (iter, "&o", &peer_path, NULL)) {
peer_proxy = get_port_proxy (stream, peer_path);
}
g_variant_iter_free (iter);
}
}
return peer_proxy;
}
static GDBusProxy *
get_peer_node_proxy (PinosStream *stream)
{
GDBusProxy *peer_port;
GVariant *v;
GDBusProxy *node = NULL;
peer_port = get_peer_port_proxy (stream);
if (peer_port) {
v = g_dbus_proxy_get_cached_property (peer_port, "Node");
if (v) {
const gchar *node_path = g_variant_get_string (v, NULL);
node = get_node_proxy (stream, node_path);
g_variant_unref (v);
}
}
return node;
}
static void
merge_peer_properties (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
GDBusProxy *peer_node;
peer_node = get_peer_node_proxy (stream);
if (peer_node) {
GVariant *v = g_dbus_proxy_get_cached_property (peer_node, "Properties");
if (v) {
PinosProperties *props = pinos_properties_from_variant (v);
priv->properties = pinos_properties_merge (priv->properties, props);
pinos_properties_free (props);
g_variant_unref (v);
}
}
}
static void
subscription_cb (PinosSubscribe *subscribe,
PinosSubscriptionEvent event,
@ -289,28 +206,14 @@ subscription_cb (PinosSubscribe *subscribe,
PinosStreamPrivate *priv = stream->priv;
switch (flags) {
case PINOS_SUBSCRIPTION_FLAG_NODE:
case PINOS_SUBSCRIPTION_FLAG_CHANNEL:
if (event == PINOS_SUBSCRIPTION_EVENT_REMOVE) {
if (object == priv->node && !priv->disconnecting) {
if (object == priv->channel && !priv->disconnecting) {
stream_set_state (stream,
PINOS_STREAM_STATE_ERROR,
g_error_new_literal (G_IO_ERROR,
G_IO_ERROR_CLOSED,
"Node disappeared"));
}
} else if (event == PINOS_SUBSCRIPTION_EVENT_NEW ||
event == PINOS_SUBSCRIPTION_EVENT_CHANGE) {
if (object == get_peer_node_proxy (stream)) {
merge_peer_properties (stream);
}
}
break;
case PINOS_SUBSCRIPTION_FLAG_PORT:
if (event == PINOS_SUBSCRIPTION_EVENT_NEW ||
event == PINOS_SUBSCRIPTION_EVENT_CHANGE) {
if (object == get_peer_port_proxy (stream)) {
merge_peer_properties (stream);
"Channel disappeared"));
}
}
break;
@ -342,7 +245,7 @@ pinos_stream_finalize (GObject * object)
g_debug ("free stream %p", stream);
g_clear_object (&priv->node);
g_clear_object (&priv->channel);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
@ -577,12 +480,259 @@ pinos_stream_get_error (PinosStream *stream)
return stream->priv->error;
}
static void
on_channel_proxy (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
GVariant *v;
gchar *str;
GError *error = NULL;
priv->channel = pinos_subscribe_get_proxy_finish (context->priv->subscribe,
res,
&error);
if (priv->channel == NULL)
goto channel_failed;
/* get the port we are connected to */
v = g_dbus_proxy_get_cached_property (priv->channel, "Port");
if (v) {
gsize len;
str = g_variant_dup_string (v, &len);
g_variant_unref (v);
g_free (priv->path);
priv->path = str;
}
v = g_dbus_proxy_get_cached_property (priv->channel, "PossibleFormats");
if (v) {
gsize len;
str = g_variant_dup_string (v, &len);
g_variant_unref (v);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
priv->possible_formats = g_bytes_new_take (str, len + 1);
g_object_notify (G_OBJECT (stream), "possible-formats");
}
v = g_dbus_proxy_get_cached_property (priv->channel, "Properties");
if (v) {
if (priv->properties)
pinos_properties_free (priv->properties);
priv->properties = pinos_properties_from_variant (v);
g_variant_unref (v);
g_object_notify (G_OBJECT (stream), "properties");
}
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
return;
channel_failed:
{
g_warning ("failed to get channel proxy: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
switch (condition) {
case G_IO_IN:
{
PinosBuffer *buffer = &priv->recv_buffer;
GError *error = NULL;
if (!pinos_io_read_buffer (priv->fd,
buffer,
priv->recv_data,
MAX_BUFFER_SIZE,
priv->recv_fds,
MAX_FDS,
&error)) {
g_warning ("stream %p: failed to read buffer: %s", stream, error->message);
g_clear_error (&error);
return TRUE;
}
priv->buffer = buffer;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
priv->buffer = NULL;
g_assert (pinos_buffer_unref (buffer) == FALSE);
break;
}
case G_IO_OUT:
g_warning ("can do IO\n");
break;
default:
break;
}
return TRUE;
}
static void
handle_socket (PinosStream *stream, gint fd)
{
PinosStreamPrivate *priv = stream->priv;
GError *error = NULL;
priv->socket = g_socket_new_from_fd (fd, &error);
if (priv->socket == NULL)
goto socket_failed;
switch (priv->mode) {
case PINOS_STREAM_MODE_SOCKET:
g_object_notify (G_OBJECT (stream), "socket");
break;
case PINOS_STREAM_MODE_BUFFER:
{
priv->fd = g_socket_get_fd (priv->socket);
priv->socket_source = g_socket_create_source (priv->socket, G_IO_IN, NULL);
g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL);
g_source_attach (priv->socket_source, priv->context->priv->context);
break;
}
default:
break;
}
return;
/* ERRORS */
socket_failed:
{
g_warning ("failed to create socket: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
return;
}
}
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
switch (priv->mode) {
case PINOS_STREAM_MODE_SOCKET:
g_clear_object (&priv->socket);
g_object_notify (G_OBJECT (stream), "socket");
break;
case PINOS_STREAM_MODE_BUFFER:
if (priv->socket_source) {
g_source_destroy (priv->socket_source);
g_clear_pointer (&priv->socket_source, g_source_unref);
}
break;
default:
break;
}
}
static void
on_channel_created (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
GVariant *ret;
GError *error = NULL;
const gchar *channel_path;
GUnixFDList *fd_list;
gint fd_idx, fd;
g_assert (context->priv->daemon == G_DBUS_PROXY (source_object));
ret = g_dbus_proxy_call_with_unix_fd_list_finish (context->priv->daemon,
&fd_list,
res, &error);
if (ret == NULL)
goto create_failed;
g_variant_get (ret, "(&oh)", &channel_path, &fd_idx);
g_variant_unref (ret);
if ((fd = g_unix_fd_list_get (fd_list, fd_idx, &error)) < 0)
goto fd_failed;
priv->fd = fd;
g_object_unref (fd_list);
pinos_subscribe_get_proxy (context->priv->subscribe,
PINOS_DBUS_SERVICE,
channel_path,
"org.pinos.Channel1",
NULL,
on_channel_proxy,
stream);
return;
/* ERRORS */
create_failed:
{
g_warning ("failed to connect: %s", error->message);
goto exit_error;
}
fd_failed:
{
g_warning ("failed to get FD: %s", error->message);
g_object_unref (fd_list);
goto exit_error;
}
exit_error:
{
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
do_connect (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
g_dbus_proxy_call (context->priv->daemon,
"CreateChannel",
g_variant_new ("(sus@a{sv})",
(priv->path ? priv->path : ""),
priv->direction,
g_bytes_get_data (priv->possible_formats, NULL),
pinos_properties_to_variant (priv->properties)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_channel_created,
stream);
return FALSE;
}
@ -633,11 +783,76 @@ pinos_stream_connect (PinosStream *stream,
return TRUE;
}
static void
on_stream_started (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
gchar *format;
GError *error = NULL;
GVariant *result, *properties;
result = g_dbus_proxy_call_finish (priv->channel,
res,
&error);
if (result == NULL)
goto start_failed;
g_variant_get (result,
"(s@a{sv})",
&format,
&properties);
g_variant_unref (result);
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_bytes_new_take (format, strlen (format) + 1);
g_object_notify (G_OBJECT (stream), "format");
if (priv->properties)
pinos_properties_free (priv->properties);
priv->properties = pinos_properties_from_variant (properties);
g_variant_unref (properties);
g_object_notify (G_OBJECT (stream), "properties");
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
g_object_unref (stream);
return;
/* ERRORS */
start_failed:
{
g_warning ("failed to start: %s", error->message);
goto exit_error;
}
exit_error:
{
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
do_start (PinosStream *stream)
{
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
g_object_unref (stream);
PinosStreamPrivate *priv = stream->priv;
handle_socket (stream, priv->fd);
g_dbus_proxy_call (priv->channel,
"Start",
g_variant_new ("(s)",
priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_stream_started,
stream);
return FALSE;
}
@ -683,11 +898,54 @@ pinos_stream_start (PinosStream *stream,
return TRUE;
}
static void
on_stream_stopped (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
GVariant *ret;
GError *error = NULL;
ret = g_dbus_proxy_call_finish (priv->channel, res, &error);
if (ret == NULL)
goto call_failed;
g_variant_unref (ret);
unhandle_socket (stream);
g_clear_pointer (&priv->format, g_bytes_unref);
g_object_notify (G_OBJECT (stream), "format");
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
return;
/* ERRORS */
call_failed:
{
g_warning ("failed to stop: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
do_stop (PinosStream *stream)
{
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
PinosStreamPrivate *priv = stream->priv;
g_dbus_proxy_call (priv->channel,
"Stop",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_stream_stopped,
stream);
return FALSE;
}
@ -717,11 +975,56 @@ pinos_stream_stop (PinosStream *stream)
return TRUE;
}
static void
on_channel_removed (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
GVariant *ret;
GError *error = NULL;
g_assert (priv->channel == G_DBUS_PROXY (source_object));
priv->disconnecting = FALSE;
g_clear_object (&priv->channel);
ret = g_dbus_proxy_call_finish (G_DBUS_PROXY (source_object), res, &error);
if (ret == NULL)
goto proxy_failed;
g_variant_unref (ret);
stream_set_state (stream, PINOS_STREAM_STATE_UNCONNECTED, NULL);
g_object_unref (stream);
return;
/* ERRORS */
proxy_failed:
{
g_warning ("failed to disconnect: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
do_disconnect (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
g_dbus_proxy_call (priv->channel,
"Remove",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_channel_removed,
stream);
return FALSE;
}
@ -742,7 +1045,7 @@ pinos_stream_disconnect (PinosStream *stream)
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state >= PINOS_STREAM_STATE_READY, FALSE);
g_return_val_if_fail (priv->node != NULL, FALSE);
g_return_val_if_fail (priv->channel != NULL, FALSE);
context = priv->context;
g_return_val_if_fail (pinos_context_get_state (context) >= PINOS_CONTEXT_STATE_CONNECTED, FALSE);
g_return_val_if_fail (!priv->disconnecting, FALSE);
@ -770,12 +1073,10 @@ pinos_stream_peek_buffer (PinosStream *stream)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (PINOS_IS_STREAM (stream), NULL);
priv = stream->priv;
//g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
return NULL;
return priv->buffer;
}
/**
@ -790,11 +1091,9 @@ pinos_stream_peek_buffer (PinosStream *stream)
void
pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *builder)
{
PinosStreamPrivate *priv;
g_return_if_fail (PINOS_IS_STREAM (stream));
priv = stream->priv;
pinos_buffer_builder_init (builder);
}
/**
@ -825,5 +1124,10 @@ pinos_stream_send_buffer (PinosStream *stream,
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
if (!pinos_io_write_buffer (priv->fd, buffer, &error)) {
g_warning ("stream %p: failed to read buffer: %s", stream, error->message);
g_clear_error (&error);
return FALSE;
}
return TRUE;
}