diff --git a/pinos/Makefile.am b/pinos/Makefile.am index 6c9eca56f..03210c4f6 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -207,6 +207,7 @@ lib_LTLIBRARIES += libpinoscore-@PINOS_MAJORMINOR@.la libpinoscore_@PINOS_MAJORMINOR@_la_SOURCES = \ server/channel.c server/channel.h \ server/client.c server/client.h \ + server/client-node.c server/client-node.h \ server/daemon.c server/daemon.h \ server/link.c server/link.h \ server/node.c server/node.h \ diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index b035d305f..eda81a111 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -160,6 +160,7 @@ gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, /* add-mem packets */ /** * PinosPacketAddMem: + * @port: the port number * @id: the unique id of this memory block * @type: the memory block type * @fd_index: the index of the fd with the data @@ -169,6 +170,7 @@ gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, * A Packet that contains a memory block used for data transfer. */ typedef struct { + guint32 port; guint32 id; guint32 type; gint32 fd_index; @@ -184,11 +186,13 @@ gboolean pinos_buffer_builder_add_add_mem (PinosBufferBuilder *b /* remove-mem packets */ /** * PinosPacketRemoveMem: + * @port: the port number * @id: the unique id of the memory block * * Remove a memory block. */ typedef struct { + guint32 port; guint32 id; } PinosPacketRemoveMem; @@ -208,6 +212,7 @@ gboolean pinos_buffer_builder_add_remove_mem (PinosBufferBuilder *b * A Packet that contains the header. */ typedef struct { + guint32 port; guint32 flags; guint32 seq; gint64 pts; @@ -231,6 +236,7 @@ gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *buil * @size. */ typedef struct { + guint32 port; guint32 id; guint64 offset; guint64 size; @@ -249,6 +255,7 @@ gboolean pinos_buffer_builder_add_process_mem (PinosBufferBuilder * * Release the payload with @id */ typedef struct { + guint32 port; guint32 id; guint64 offset; guint64 size; @@ -269,6 +276,7 @@ gboolean pinos_buffer_builder_add_reuse_mem (PinosBufferBuilder *bui * A new format. */ typedef struct { + guint32 port; guint8 id; const gchar *format; } PinosPacketFormatChange; @@ -288,6 +296,7 @@ gboolean pinos_buffer_builder_add_format_change (PinosBufferBuilder * A new property change. */ typedef struct { + guint32 port; const gchar *key; const gchar *value; } PinosPacketPropertyChange; @@ -308,6 +317,7 @@ gboolean pinos_buffer_builder_add_property_change (PinosBufferBuilder * A refresh request packet. This packet is sent to trigger a new keyframe. */ typedef struct { + guint32 port; guint32 last_id; guint32 request_type; gint64 pts; diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 39d1fe831..4386ceea6 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -53,7 +53,7 @@ struct _PinosStreamPrivate GBytes *format; - GDBusProxy *channel; + GDBusProxy *node; gboolean disconnecting; PinosStreamMode mode; @@ -215,7 +215,7 @@ subscription_cb (PinosSubscribe *subscribe, switch (flags) { case PINOS_SUBSCRIPTION_FLAG_CHANNEL: if (event == PINOS_SUBSCRIPTION_EVENT_REMOVE) { - if (object == priv->channel && !priv->disconnecting) { + if (object == priv->node && !priv->disconnecting) { stream_set_state (stream, PINOS_STREAM_STATE_ERROR, g_error_new_literal (G_IO_ERROR, @@ -252,7 +252,7 @@ pinos_stream_finalize (GObject * object) g_debug ("free stream %p", stream); - g_clear_object (&priv->channel); + g_clear_object (&priv->node); if (priv->possible_formats) g_bytes_unref (priv->possible_formats); @@ -488,71 +488,6 @@ pinos_stream_get_error (PinosStream *stream) } -static void -on_channel_proxy (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - PinosStream *stream = user_data; - PinosStreamPrivate *priv = stream->priv; - PinosContext *context = priv->context; - GVariant *v; - gchar *str; - GError *error = NULL; - - priv->channel = pinos_subscribe_get_proxy_finish (context->priv->subscribe, - res, - &error); - if (priv->channel == NULL) - goto channel_failed; - - /* get the port we are connected to */ - v = g_dbus_proxy_get_cached_property (priv->channel, "Port"); - if (v) { - gsize len; - str = g_variant_dup_string (v, &len); - g_variant_unref (v); - - g_free (priv->path); - priv->path = str; - } - - v = g_dbus_proxy_get_cached_property (priv->channel, "PossibleFormats"); - if (v) { - gsize len; - str = g_variant_dup_string (v, &len); - g_variant_unref (v); - - if (priv->possible_formats) - g_bytes_unref (priv->possible_formats); - priv->possible_formats = g_bytes_new_take (str, len + 1); - - g_object_notify (G_OBJECT (stream), "possible-formats"); - } - v = g_dbus_proxy_get_cached_property (priv->channel, "Properties"); - if (v) { - if (priv->properties) - pinos_properties_free (priv->properties); - priv->properties = pinos_properties_from_variant (v); - g_variant_unref (v); - - g_object_notify (G_OBJECT (stream), "properties"); - } - - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); - g_object_unref (stream); - - return; - -channel_failed: - { - g_warning ("failed to get channel proxy: %s", error->message); - stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); - g_object_unref (stream); - return; - } -} - static gboolean parse_buffer (PinosStream *stream, PinosBuffer *pbuf) @@ -745,16 +680,46 @@ unhandle_socket (PinosStream *stream) static void -on_channel_created (GObject *source_object, - GAsyncResult *res, - gpointer user_data) +on_node_proxy (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + PinosContext *context = priv->context; + GError *error = NULL; + + priv->node = pinos_subscribe_get_proxy_finish (context->priv->subscribe, + res, + &error); + if (priv->node == NULL) + goto node_failed; + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + g_object_unref (stream); + + return; + +node_failed: + { + g_warning ("failed to get node proxy: %s", error->message); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_object_unref (stream); + return; + } +} + +static void +on_node_created (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { PinosStream *stream = user_data; PinosStreamPrivate *priv = stream->priv; PinosContext *context = priv->context; GVariant *ret; GError *error = NULL; - const gchar *channel_path; + const gchar *node_path; GUnixFDList *fd_list; gint fd_idx, fd; @@ -766,7 +731,7 @@ on_channel_created (GObject *source_object, if (ret == NULL) goto create_failed; - g_variant_get (ret, "(&oh)", &channel_path, &fd_idx); + g_variant_get (ret, "(&oh)", &node_path, &fd_idx); g_variant_unref (ret); if ((fd = g_unix_fd_list_get (fd_list, fd_idx, &error)) < 0) @@ -778,10 +743,10 @@ on_channel_created (GObject *source_object, pinos_subscribe_get_proxy (context->priv->subscribe, PINOS_DBUS_SERVICE, - channel_path, - "org.pinos.Channel1", + node_path, + "org.pinos.Node1", NULL, - on_channel_proxy, + on_node_proxy, stream); return; @@ -812,18 +777,29 @@ do_connect (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; PinosContext *context = priv->context; + GVariantBuilder b; + GVariant *ports; + + g_variant_builder_init (&b, G_VARIANT_TYPE ("a(uusa{sv}s)")); + g_variant_builder_open (&b, G_VARIANT_TYPE ("(uusa{sv}s)")); + g_variant_builder_add (&b, "u", priv->direction); + g_variant_builder_add (&b, "u", 0); + g_variant_builder_add (&b, "s", g_bytes_get_data (priv->possible_formats, NULL)); + g_variant_builder_add_value (&b, pinos_properties_to_variant (priv->properties)); + g_variant_builder_add (&b, "s", priv->path); + g_variant_builder_close (&b); + ports = g_variant_builder_end (&b); g_dbus_proxy_call (context->priv->daemon, - "CreateChannel", - g_variant_new ("(sus@a{sv})", - (priv->path ? priv->path : ""), - priv->direction, - g_bytes_get_data (priv->possible_formats, NULL), - pinos_properties_to_variant (priv->properties)), + "CreateClientNode", + g_variant_new ("(s@a{sv}@a(uusa{sv}s))", + "client-node", + pinos_properties_to_variant (priv->properties), + ports), G_DBUS_CALL_FLAGS_NONE, -1, NULL, /* GCancellable *cancellable */ - on_channel_created, + on_node_created, stream); return FALSE; } @@ -981,7 +957,7 @@ pinos_stream_stop (PinosStream *stream) } static void -on_channel_removed (GObject *source_object, +on_node_removed (GObject *source_object, GAsyncResult *res, gpointer user_data) { @@ -990,10 +966,10 @@ on_channel_removed (GObject *source_object, GVariant *ret; GError *error = NULL; - g_assert (priv->channel == G_DBUS_PROXY (source_object)); + g_assert (priv->node == G_DBUS_PROXY (source_object)); priv->disconnecting = FALSE; - g_clear_object (&priv->channel); + g_clear_object (&priv->node); ret = g_dbus_proxy_call_finish (G_DBUS_PROXY (source_object), res, &error); if (ret == NULL) @@ -1021,13 +997,13 @@ do_disconnect (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; - g_dbus_proxy_call (priv->channel, + g_dbus_proxy_call (priv->node, "Remove", g_variant_new ("()"), G_DBUS_CALL_FLAGS_NONE, -1, NULL, /* GCancellable *cancellable */ - on_channel_removed, + on_node_removed, stream); return FALSE; @@ -1050,7 +1026,7 @@ pinos_stream_disconnect (PinosStream *stream) g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); priv = stream->priv; g_return_val_if_fail (priv->state >= PINOS_STREAM_STATE_READY, FALSE); - g_return_val_if_fail (priv->channel != NULL, FALSE); + g_return_val_if_fail (priv->node != NULL, FALSE); context = priv->context; g_return_val_if_fail (pinos_context_get_state (context) >= PINOS_CONTEXT_STATE_CONNECTED, FALSE); g_return_val_if_fail (!priv->disconnecting, FALSE); diff --git a/pinos/dbus/org.pinos.xml b/pinos/dbus/org.pinos.xml index 0d9633b8a..fe4076ecf 100644 --- a/pinos/dbus/org.pinos.xml +++ b/pinos/dbus/org.pinos.xml @@ -59,6 +59,14 @@ + + + + + + + + @@ -139,6 +147,16 @@ + + + + + + + + + +