Implement negotiation

Use generic byte blobs for formats. We currently use them to store
gstreamer caps but we could also use them to exchange serialized
GVariants if we want.

Make properties a variant dictionary
This commit is contained in:
Wim Taymans 2015-05-14 17:46:12 +02:00
parent ca7e4602f6
commit 4bc308835a
21 changed files with 620 additions and 582 deletions

View file

@ -138,8 +138,10 @@ pv_context_finalize (GObject * object)
PvContext *context = PV_CONTEXT (object);
PvContextPrivate *priv = context->priv;
g_object_unref (priv->server_manager);
g_free (priv->name);
g_clear_error (&priv->error);
if (priv->properties)
g_variant_unref (priv->properties);
G_OBJECT_CLASS (pv_context_parent_class)->finalize (object);
}
@ -192,7 +194,7 @@ pv_context_class_init (PvContextClass * klass)
g_param_spec_variant ("properties",
"Properties",
"Extra properties",
G_VARIANT_TYPE_VARIANT,
G_VARIANT_TYPE_DICTIONARY,
NULL,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
@ -267,7 +269,6 @@ pv_context_init (PvContext * context)
PvContextPrivate *priv = context->priv = PV_CONTEXT_GET_PRIVATE (context);
priv->state = PV_CONTEXT_STATE_UNCONNECTED;
priv->server_manager = g_dbus_object_manager_server_new (PV_DBUS_OBJECT_PREFIX);
priv->subscribe = pv_subscribe_new ();
g_object_set (priv->subscribe, "subscription-mask", PV_SUBSCRIPTION_FLAGS_ALL, NULL);
g_signal_connect (priv->subscribe, "subscription-event", (GCallback) subscription_cb, context);
@ -286,6 +287,15 @@ pv_context_init (PvContext * context)
PvContext *
pv_context_new (GMainContext *context, const gchar *name, GVariant *properties)
{
g_return_val_if_fail (name != NULL, NULL);
if (properties == NULL) {
GVariantBuilder builder;
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string (name));
properties = g_variant_builder_end (&builder);
}
return g_object_new (PV_TYPE_CONTEXT, "main-context", context, "name", name, "properties", properties, NULL);
}
@ -330,18 +340,14 @@ on_daemon_connected (GObject *source_object,
{
PvContext *context = user_data;
PvContextPrivate *priv = context->priv;
GVariantBuilder builder;
g_assert (g_main_context_get_thread_default () == priv->context);
context_set_state (context, PV_CONTEXT_STATE_REGISTERING);
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
g_dbus_proxy_call (priv->daemon,
"ConnectClient",
g_variant_new ("(@a{sv})", g_variant_builder_end (&builder)),
g_variant_new ("(@a{sv})", priv->properties),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL,
@ -436,7 +442,6 @@ on_name_appeared (GDBusConnection *connection,
g_print ("context: on name appeared\n");
priv->connection = connection;
g_dbus_object_manager_server_set_connection (priv->server_manager, connection);
g_object_set (priv->subscribe, "connection", priv->connection,
"service", name, NULL);
@ -455,7 +460,6 @@ on_name_vanished (GDBusConnection *connection,
g_print ("context: on name vanished\n");
priv->connection = connection;
g_dbus_object_manager_server_set_connection (priv->server_manager, connection);
g_object_set (priv->subscribe, "connection", connection, NULL);

View file

@ -41,6 +41,4 @@ struct _PvContextPrivate
PvSubscribe *subscribe;
GList *sources;
GDBusObjectManagerServer *server_manager;
};

View file

@ -17,6 +17,7 @@
* Boston, MA 02110-1301, USA.
*/
#include <string.h>
#include <gio/gunixfdlist.h>
#include "server/pv-daemon.h"
@ -35,14 +36,17 @@ struct _PvStreamPrivate
gchar *target;
PvStreamState state;
GError *error;
gboolean provide;
GBytes *accepted_formats;
GBytes *possible_formats;
GBytes *format;
gchar *source_output_path;
GVariant *spec;
GDBusProxy *source_output;
GSocket *socket;
PvStreamMode mode;
guint socket_id;
GSocket *socket;
GSource *socket_source;
PvBufferInfo info;
};
@ -59,7 +63,9 @@ enum
PROP_NAME,
PROP_PROPERTIES,
PROP_STATE,
PROP_SOCKET
PROP_POSSIBLE_FORMATS,
PROP_FORMAT,
PROP_SOCKET,
};
enum
@ -96,6 +102,14 @@ pv_stream_get_property (GObject *_object,
g_value_set_enum (value, priv->state);
break;
case PROP_POSSIBLE_FORMATS:
g_value_set_boxed (value, priv->possible_formats);
break;
case PROP_FORMAT:
g_value_set_boxed (value, priv->format);
break;
case PROP_SOCKET:
g_value_set_object (value, priv->socket);
break;
@ -216,7 +230,33 @@ pv_stream_class_init (PvStreamClass * klass)
PV_STREAM_STATE_UNCONNECTED,
G_PARAM_READABLE |
G_PARAM_STATIC_STRINGS));
/**
* PvStream:possible-formats
*
* The possible formats for the stream. this can only be used after connecting
* the stream for capture or provide.
*/
g_object_class_install_property (gobject_class,
PROP_POSSIBLE_FORMATS,
g_param_spec_boxed ("possible-formats",
"Possible Formats",
"The possbile formats of the stream",
G_TYPE_BYTES,
G_PARAM_READABLE |
G_PARAM_STATIC_STRINGS));
/**
* PvStream:formats
*
* The format of the stream. This will be set after starting the stream.
*/
g_object_class_install_property (gobject_class,
PROP_FORMAT,
g_param_spec_boxed ("format",
"Format",
"The format of the stream",
G_TYPE_BYTES,
G_PARAM_READABLE |
G_PARAM_STATIC_STRINGS));
/**
* PvStream:socket
*
@ -329,7 +369,7 @@ on_source_output_signal (GDBusProxy *proxy,
GVariant *parameters,
gpointer user_data)
{
g_print ("on source output signal\n");
g_print ("on source output signal %s %s\n", sender_name, signal_name);
}
static void
@ -340,6 +380,8 @@ on_source_output_proxy (GObject *source_object,
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
PvContext *context = priv->context;
GVariant *v;
gchar *str;
GError *error = NULL;
priv->source_output = pv_subscribe_get_proxy_finish (context->priv->subscribe,
@ -348,6 +390,22 @@ on_source_output_proxy (GObject *source_object,
if (priv->source_output == NULL)
goto source_output_failed;
g_print ("got source-output %s\n", priv->source_output_path);
v = g_dbus_proxy_get_cached_property (priv->source_output, "PossibleFormats");
if (v) {
str = g_variant_dup_string (v, NULL);
g_variant_unref (v);
g_print ("got possible formats %s\n", str);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
priv->possible_formats = g_bytes_new_take (str, strlen (str) + 1);
g_object_notify (G_OBJECT (stream), "possible-formats");
}
g_signal_connect (priv->source_output,
"g-signal",
(GCallback) on_source_output_signal,
@ -384,7 +442,6 @@ on_source_output_created (GObject *source_object,
goto create_failed;
g_variant_get (ret, "(o)", &priv->source_output_path);
g_print ("got source-output %s\n", priv->source_output_path);
g_variant_unref (ret);
pv_subscribe_get_proxy (context->priv->subscribe,
@ -417,9 +474,9 @@ do_connect_capture (PvStream *stream)
g_dbus_proxy_call (context->priv->client,
"CreateSourceOutput",
g_variant_new ("(o@a{sv})",
g_variant_new ("(os)",
(priv->target ? priv->target : "/"),
priv->spec),
g_bytes_get_data (priv->accepted_formats, NULL)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
@ -444,20 +501,21 @@ gboolean
pv_stream_connect_capture (PvStream *stream,
const gchar *source,
PvStreamFlags flags,
GVariant *spec)
GBytes *accepted_formats)
{
PvStreamPrivate *priv;
PvContext *context;
g_return_val_if_fail (PV_IS_STREAM (stream), FALSE);
g_return_val_if_fail (spec != NULL, FALSE);
g_return_val_if_fail (accepted_formats != NULL, FALSE);
priv = stream->priv;
context = priv->context;
g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE);
priv->target = g_strdup (source);
priv->spec = spec;
priv->accepted_formats = g_bytes_ref (accepted_formats);
priv->provide = FALSE;
stream_set_state (stream, PV_STREAM_STATE_CONNECTING);
@ -476,8 +534,7 @@ do_connect_provide (PvStream *stream)
g_dbus_proxy_call (context->priv->client,
"CreateSourceInput",
g_variant_new ("(@a{sv})",
priv->spec),
g_variant_new ("(s)", g_bytes_get_data (priv->possible_formats, NULL)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
@ -498,21 +555,22 @@ do_connect_provide (PvStream *stream)
* Returns: %TRUE on success.
*/
gboolean
pv_stream_connect_provide (PvStream *stream,
PvStreamFlags flags,
GVariant *spec)
pv_stream_connect_provide (PvStream *stream,
PvStreamFlags flags,
GBytes *possible_formats)
{
PvStreamPrivate *priv;
PvContext *context;
g_return_val_if_fail (PV_IS_STREAM (stream), FALSE);
g_return_val_if_fail (spec != NULL, FALSE);
g_return_val_if_fail (possible_formats != NULL, FALSE);
priv = stream->priv;
context = priv->context;
g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE);
priv->spec = spec;
priv->possible_formats = g_bytes_ref (possible_formats);
priv->provide = TRUE;
stream_set_state (stream, PV_STREAM_STATE_CONNECTING);
@ -588,16 +646,15 @@ pv_stream_disconnect (PvStream *stream)
g_main_context_invoke (context->priv->context, (GSourceFunc) do_disconnect, stream);
return TRUE;
}
#include <gst/wire-protocol.h>
static gboolean
on_socket_data (GSocket *socket,
GIOCondition condition,
gpointer user_data)
on_socket_condition (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
@ -642,6 +699,9 @@ on_socket_data (GSocket *socket,
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
break;
}
case G_IO_OUT:
g_print ("can do IO\n");
break;
default:
break;
@ -668,12 +728,11 @@ handle_socket (PvStream *stream, gint fd)
case PV_STREAM_MODE_BUFFER:
{
GSource *source;
source = g_socket_create_source (priv->socket, G_IO_IN, NULL);
g_source_set_callback (source, (GSourceFunc) on_socket_data, stream, NULL);
priv->socket_id = g_source_attach (source, priv->context->priv->context);
g_source_unref (source);
if (!priv->provide) {
priv->socket_source = g_socket_create_source (priv->socket, G_IO_IN, NULL);
g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL);
g_source_attach (priv->socket_source, priv->context->priv->context);
}
break;
}
@ -703,6 +762,13 @@ unhandle_socket (PvStream *stream)
g_object_notify (G_OBJECT (stream), "socket");
break;
case PV_STREAM_MODE_BUFFER:
if (priv->socket_source) {
g_source_destroy (priv->socket_source);
g_clear_pointer (&priv->socket_source, g_source_unref);
}
break;
default:
break;
}
@ -717,7 +783,7 @@ on_stream_started (GObject *source_object,
PvStreamPrivate *priv = stream->priv;
GUnixFDList *out_fd_list;
gint fd_idx, fd;
GVariant *out_props;
gchar *format;
GError *error = NULL;
GVariant *result;
@ -729,12 +795,17 @@ on_stream_started (GObject *source_object,
goto start_failed;
g_variant_get (result,
"(h@a{sv})",
"(hs)",
&fd_idx,
&out_props);
&format);
g_variant_unref (result);
g_variant_unref (out_props);
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_bytes_new (format, strlen (format) + 1);
g_object_notify (G_OBJECT (stream), "format");
if ((fd = g_unix_fd_list_get (out_fd_list, fd_idx, &error)) < 0)
goto fd_failed;
@ -768,14 +839,10 @@ static gboolean
do_start (PvStream *stream)
{
PvStreamPrivate *priv = stream->priv;
GVariantBuilder builder;
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
g_dbus_proxy_call (priv->source_output,
"Start",
g_variant_new ("(@a{sv})", g_variant_builder_end (&builder)),
g_variant_new ("(s)", g_bytes_get_data (priv->format, NULL)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
@ -802,7 +869,7 @@ do_start (PvStream *stream)
* Returns: %TRUE on success.
*/
gboolean
pv_stream_start (PvStream *stream, PvStreamMode mode)
pv_stream_start (PvStream *stream, GBytes *format, PvStreamMode mode)
{
PvStreamPrivate *priv;
@ -812,6 +879,7 @@ pv_stream_start (PvStream *stream, PvStreamMode mode)
g_return_val_if_fail (priv->state == PV_STREAM_STATE_READY, FALSE);
priv->mode = mode;
priv->format = g_bytes_ref (format);
stream_set_state (stream, PV_STREAM_STATE_STARTING);
@ -837,6 +905,8 @@ on_stream_stopped (GObject *source_object,
g_variant_unref (ret);
unhandle_socket (stream);
g_clear_pointer (&priv->format, g_free);
g_object_notify (G_OBJECT (stream), "format");
stream_set_state (stream, PV_STREAM_STATE_READY);
@ -932,6 +1002,11 @@ gboolean
pv_stream_provide_buffer (PvStream *stream, PvBufferInfo *info)
{
PvStreamPrivate *priv;
gssize len;
GOutputVector ovec;
FDMessage msg;
gint flags = 0;
GError *error = NULL;
g_return_val_if_fail (PV_IS_STREAM (stream), FALSE);
g_return_val_if_fail (info != NULL, FALSE);
@ -939,6 +1014,30 @@ pv_stream_provide_buffer (PvStream *stream, PvBufferInfo *info)
priv = stream->priv;
g_return_val_if_fail (priv->state == PV_STREAM_STATE_STREAMING, FALSE);
msg.flags = info->flags;
msg.seq = info->seq;
msg.pts = info->pts;
msg.dts_offset = info->dts_offset;
msg.offset = info->offset;
msg.size = info->size;
ovec.buffer = &msg;
ovec.size = sizeof (msg);
len = g_socket_send_message (priv->socket,
NULL,
&ovec,
1,
&info->message,
1,
flags,
NULL,
&error);
g_assert (len == sizeof (msg));
if (info->message)
g_object_unref (info->message);
return TRUE;
}

View file

@ -103,13 +103,15 @@ const GError * pv_stream_get_error (PvStream *stream);
gboolean pv_stream_connect_capture (PvStream *stream,
const gchar *source,
PvStreamFlags flags,
GVariant *spec);
GBytes *accepted_formats);
gboolean pv_stream_connect_provide (PvStream *stream,
PvStreamFlags flags,
GVariant *spec);
GBytes *possible_formats);
gboolean pv_stream_disconnect (PvStream *stream);
gboolean pv_stream_start (PvStream *stream, PvStreamMode mode);
gboolean pv_stream_start (PvStream *stream,
GBytes *format,
PvStreamMode mode);
gboolean pv_stream_stop (PvStream *stream);
gboolean pv_stream_capture_buffer (PvStream *stream,

View file

@ -142,6 +142,8 @@ on_proxy_created (GObject *source_object,
return;
}
g_print ("got proxy for %s:%s\n", data->object_path, data->interface_name);
g_signal_connect (data->proxy,
"g-properties-changed",
(GCallback) on_proxy_properties_changed,
@ -180,6 +182,8 @@ add_interface (PvSubscribe *subscribe,
priv->objects = g_list_prepend (priv->objects, data);
priv->pending_proxies++;
g_print ("making proxy for %s:%s\n", object_path, interface_name);
g_dbus_proxy_new (priv->connection,
G_DBUS_PROXY_FLAGS_NONE,
NULL, /* GDBusInterfaceInfo* */