rework: make client and server nodes

work on making nodes and ports on the client.
This commit is contained in:
Wim Taymans 2016-05-12 17:03:28 +02:00
parent c67d3d7f04
commit 8407430891
34 changed files with 1500 additions and 3844 deletions

View file

@ -49,7 +49,8 @@ struct _PinosStreamPrivate
GBytes *format;
GDBusProxy *channel;
PinosNode *node;
PinosPort *port;
gboolean disconnecting;
PinosStreamMode mode;
@ -197,14 +198,14 @@ subscription_cb (PinosSubscribe *subscribe,
PinosStreamPrivate *priv = stream->priv;
switch (flags) {
case PINOS_SUBSCRIPTION_FLAG_CHANNEL:
case PINOS_SUBSCRIPTION_FLAG_NODE:
if (event == PINOS_SUBSCRIPTION_EVENT_REMOVE) {
if (object == priv->channel && !priv->disconnecting) {
if (object == priv->node && !priv->disconnecting) {
stream_set_state (stream,
PINOS_STREAM_STATE_ERROR,
g_error_new_literal (G_IO_ERROR,
G_IO_ERROR_CLOSED,
"Channel disappeared"));
"Node disappeared"));
}
}
break;
@ -237,7 +238,8 @@ pinos_stream_finalize (GObject * object)
g_debug ("free stream %p", stream);
g_clear_object (&priv->socket);
g_clear_object (&priv->channel);
g_clear_object (&priv->node);
g_clear_object (&priv->port);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
@ -493,64 +495,31 @@ pinos_stream_get_error (PinosStream *stream)
}
static void
on_channel_proxy (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
on_port_created (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
GVariant *v;
gchar *str;
GError *error = NULL;
priv->channel = pinos_subscribe_get_proxy_finish (context->priv->subscribe,
res,
&error);
if (priv->channel == NULL)
goto channel_failed;
g_assert (priv->node == PINOS_NODE (source_object));
/* get the port we are connected to */
v = g_dbus_proxy_get_cached_property (priv->channel, "Port");
if (v) {
gsize len;
str = g_variant_dup_string (v, &len);
g_variant_unref (v);
g_free (priv->path);
priv->path = str;
}
v = g_dbus_proxy_get_cached_property (priv->channel, "PossibleFormats");
if (v) {
gsize len;
str = g_variant_dup_string (v, &len);
g_variant_unref (v);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
priv->possible_formats = g_bytes_new_take (str, len + 1);
g_object_notify (G_OBJECT (stream), "possible-formats");
}
v = g_dbus_proxy_get_cached_property (priv->channel, "Properties");
if (v) {
if (priv->properties)
pinos_properties_free (priv->properties);
priv->properties = pinos_properties_from_variant (v);
g_variant_unref (v);
g_object_notify (G_OBJECT (stream), "properties");
}
priv->port = pinos_node_create_port_finish (priv->node,
res,
&error);
if (priv->port == NULL)
goto create_failed;
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
return;
channel_failed:
/* ERRORS */
create_failed:
{
g_warning ("failed to get channel proxy: %s", error->message);
g_warning ("failed to create port: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
@ -558,40 +527,33 @@ channel_failed:
}
static void
on_channel_created (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
on_node_created (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
GVariant *ret;
GError *error = NULL;
const gchar *channel_path;
g_assert (context->priv->client == G_DBUS_PROXY (source_object));
ret = g_dbus_proxy_call_finish (context->priv->client, res, &error);
if (ret == NULL)
priv->node = pinos_context_create_node_finish (context, res, &error);
if (priv->node == NULL)
goto create_failed;
g_variant_get (ret, "(&o)", &channel_path);
pinos_subscribe_get_proxy (context->priv->subscribe,
PINOS_DBUS_SERVICE,
channel_path,
"org.pinos.Channel1",
NULL,
on_channel_proxy,
pinos_node_create_port (priv->node,
priv->direction,
"client-port",
priv->possible_formats,
priv->properties,
NULL, /* GCancellable *cancellable */
on_port_created,
stream);
g_variant_unref (ret);
return;
/* ERRORS */
create_failed:
{
g_warning ("failed to get connect capture: %s", error->message);
g_warning ("failed to create node: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
@ -604,19 +566,12 @@ do_connect (PinosStream *stream)
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
g_dbus_proxy_call (context->priv->client,
"CreateChannel",
g_variant_new ("(uss@a{sv})",
priv->direction,
(priv->path ? priv->path : ""),
g_bytes_get_data (priv->possible_formats, NULL),
pinos_properties_to_variant (priv->properties)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_channel_created,
stream);
pinos_context_create_node (context,
"client-node",
priv->properties,
NULL, /* GCancellable *cancellable */
on_node_created,
stream);
return FALSE;
}
@ -670,6 +625,7 @@ pinos_stream_connect (PinosStream *stream,
static gboolean
do_connect_provide (PinosStream *stream)
{
#if 0
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
@ -683,6 +639,7 @@ do_connect_provide (PinosStream *stream)
NULL, /* GCancellable *cancellable */
on_channel_created,
stream);
#endif
return FALSE;
}
@ -726,39 +683,88 @@ pinos_stream_connect_provide (PinosStream *stream,
return TRUE;
}
static void
on_channel_removed (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
static gboolean
do_start (PinosStream *stream)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
GVariant *ret;
GError *error = NULL;
g_assert (priv->channel == G_DBUS_PROXY (source_object));
priv->disconnecting = FALSE;
g_clear_object (&priv->channel);
ret = g_dbus_proxy_call_finish (G_DBUS_PROXY (source_object), res, &error);
if (ret == NULL)
goto proxy_failed;
g_variant_unref (ret);
stream_set_state (stream, PINOS_STREAM_STATE_UNCONNECTED, NULL);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
g_object_unref (stream);
return;
/* ERRORS */
proxy_failed:
{
g_warning ("failed to disconnect: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
return FALSE;
}
/**
* pinos_stream_start:
* @stream: a #PinosStream
* @format: (transfer full): a #GBytes with format
* @mode: a #PinosStreamMode
*
* Start capturing from @stream in @format.
*
* When @mode is #PINOS_STREAM_MODE_SOCKET, you should connect to the notify::socket
* signal to obtain a readable socket with metadata and data.
*
* When @mode is #PINOS_STREAM_MODE_BUFFER, you should connect to the new-buffer
* signal and use pinos_stream_capture_buffer() to get the latest metadata and
* data.
*
* Returns: %TRUE on success.
*/
gboolean
pinos_stream_start (PinosStream *stream,
GBytes *format,
PinosStreamMode mode)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_READY, FALSE);
priv->mode = mode;
priv->format = format;
stream_set_state (stream, PINOS_STREAM_STATE_STARTING, NULL);
g_main_context_invoke (priv->context->priv->context,
(GSourceFunc) do_start,
g_object_ref (stream));
return TRUE;
}
static gboolean
do_stop (PinosStream *stream)
{
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
return FALSE;
}
/**
* pinos_stream_stop:
* @stream: a #PinosStream
*
* Stop capturing from @stream.
*
* Returns: %TRUE on success.
*/
gboolean
pinos_stream_stop (PinosStream *stream)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
g_main_context_invoke (priv->context->priv->context,
(GSourceFunc) do_stop,
g_object_ref (stream));
return TRUE;
}
static gboolean
@ -766,14 +772,7 @@ do_disconnect (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
g_dbus_proxy_call (priv->channel,
"Remove",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_channel_removed,
stream);
pinos_node_remove (priv->node);
return FALSE;
}
@ -795,7 +794,7 @@ pinos_stream_disconnect (PinosStream *stream)
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state >= PINOS_STREAM_STATE_READY, FALSE);
g_return_val_if_fail (priv->channel != NULL, FALSE);
g_return_val_if_fail (priv->node != NULL, FALSE);
context = priv->context;
g_return_val_if_fail (pinos_context_get_state (context) >= PINOS_CONTEXT_STATE_READY, FALSE);
g_return_val_if_fail (!priv->disconnecting, FALSE);
@ -976,212 +975,6 @@ unhandle_socket (PinosStream *stream)
}
}
static void
on_stream_started (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
GUnixFDList *out_fd_list;
gint fd_idx, fd;
gchar *format;
GError *error = NULL;
GVariant *result, *properties;
result = g_dbus_proxy_call_with_unix_fd_list_finish (priv->channel,
&out_fd_list,
res,
&error);
if (result == NULL)
goto start_failed;
g_variant_get (result,
"(hs@a{sv})",
&fd_idx,
&format,
&properties);
g_variant_unref (result);
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_bytes_new_take (format, strlen (format) + 1);
g_object_notify (G_OBJECT (stream), "format");
if (priv->properties)
pinos_properties_free (priv->properties);
priv->properties = pinos_properties_from_variant (properties);
g_variant_unref (properties);
g_object_notify (G_OBJECT (stream), "properties");
if ((fd = g_unix_fd_list_get (out_fd_list, fd_idx, &error)) < 0)
goto fd_failed;
g_object_unref (out_fd_list);
handle_socket (stream, fd);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
g_object_unref (stream);
return;
/* ERRORS */
start_failed:
{
g_warning ("failed to start: %s", error->message);
goto exit_error;
}
fd_failed:
{
g_warning ("failed to get FD: %s", error->message);
g_object_unref (out_fd_list);
goto exit_error;
}
exit_error:
{
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
do_start (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
g_dbus_proxy_call (priv->channel,
"Start",
g_variant_new ("(s)", g_bytes_get_data (priv->format, NULL)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_stream_started,
stream);
return FALSE;
}
/**
* pinos_stream_start:
* @stream: a #PinosStream
* @format: (transfer full): a #GBytes with format
* @mode: a #PinosStreamMode
*
* Start capturing from @stream in @format.
*
* When @mode is #PINOS_STREAM_MODE_SOCKET, you should connect to the notify::socket
* signal to obtain a readable socket with metadata and data.
*
* When @mode is #PINOS_STREAM_MODE_BUFFER, you should connect to the new-buffer
* signal and use pinos_stream_capture_buffer() to get the latest metadata and
* data.
*
* Returns: %TRUE on success.
*/
gboolean
pinos_stream_start (PinosStream *stream,
GBytes *format,
PinosStreamMode mode)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_READY, FALSE);
priv->mode = mode;
priv->format = format;
stream_set_state (stream, PINOS_STREAM_STATE_STARTING, NULL);
g_main_context_invoke (priv->context->priv->context,
(GSourceFunc) do_start,
g_object_ref (stream));
return TRUE;
}
static void
on_stream_stopped (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
GVariant *ret;
GError *error = NULL;
ret = g_dbus_proxy_call_finish (priv->channel, res, &error);
if (ret == NULL)
goto call_failed;
g_variant_unref (ret);
unhandle_socket (stream);
g_clear_pointer (&priv->format, g_bytes_unref);
g_object_notify (G_OBJECT (stream), "format");
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
return;
/* ERRORS */
call_failed:
{
g_warning ("failed to release: %s", error->message);
stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error);
g_object_unref (stream);
return;
}
}
static gboolean
do_stop (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
g_dbus_proxy_call (priv->channel,
"Stop",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_stream_stopped,
stream);
return FALSE;
}
/**
* pinos_stream_stop:
* @stream: a #PinosStream
*
* Stop capturing from @stream.
*
* Returns: %TRUE on success.
*/
gboolean
pinos_stream_stop (PinosStream *stream)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
g_main_context_invoke (priv->context->priv->context,
(GSourceFunc) do_stop,
g_object_ref (stream));
return TRUE;
}
/**
* pinos_stream_peek_buffer:
* @stream: a #PinosStream