Add gstreamer source element

Add a source gstreamer element
Expose error in context
Make it possible to set the source in the error state
Add properties to a stream and use those to get a source-output
Fix signal for new-buffer
Attach the socket source to the thread default mainloop
Make subscribe cancellable.
Propagate state and error in context.
Add bus handler for v4l2 source
Use negotiated properties to set capsfilter in v4l2
Fix subscribe in test-client
This commit is contained in:
Wim Taymans 2015-04-28 17:36:44 +02:00
parent e151150cad
commit 592e99a317
14 changed files with 934 additions and 38 deletions

View file

@ -46,6 +46,8 @@ struct _PvContextPrivate
GList *sources;
GDBusObjectManagerServer *server_manager;
GError *error;
};
@ -154,6 +156,7 @@ pv_context_finalize (GObject * object)
PvContextPrivate *priv = context->priv;
g_object_unref (priv->server_manager);
g_clear_error (&priv->error);
G_OBJECT_CLASS (pv_context_parent_class)->finalize (object);
}
@ -310,9 +313,9 @@ on_client_proxy (GObject *source_object,
priv->client = pv_client1_proxy_new_finish (res, &error);
if (priv->client == NULL) {
priv->error = error;
context_set_state (context, PV_CONTEXT_STATE_ERROR);
g_error ("failed to get client proxy: %s", error->message);
g_clear_error (&error);
return;
}
context_set_state (context, PV_CONTEXT_STATE_READY);
@ -329,9 +332,9 @@ on_client_connected (GObject *source_object,
gchar *client_path;
if (!pv_daemon1_call_connect_client_finish (priv->daemon, &client_path, res, &error)) {
priv->error = error;
context_set_state (context, PV_CONTEXT_STATE_ERROR);
g_error ("failed to connect client: %s", error->message);
g_clear_error (&error);
return;
}
@ -462,6 +465,7 @@ on_name_vanished (GDBusConnection *connection,
if (priv->flags & PV_CONTEXT_FLAGS_NOFAIL) {
context_set_state (context, PV_CONTEXT_STATE_CONNECTING);
} else {
priv->error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CLOSED, "Connection closed");
context_set_state (context, PV_CONTEXT_STATE_ERROR);
}
}
@ -514,9 +518,9 @@ on_client_disconnected (GObject *source_object,
GError *error = NULL;
if (!pv_client1_call_disconnect_finish (priv->client, res, &error)) {
priv->error = error;
context_set_state (context, PV_CONTEXT_STATE_ERROR);
g_error ("failed to disconnect client: %s", error->message);
g_clear_error (&error);
return;
}
context_set_state (context, PV_CONTEXT_STATE_UNCONNECTED);
@ -612,6 +616,26 @@ pv_context_get_state (PvContext *context)
return priv->state;
}
/**
* pv_context_error:
* @context: a #PvContext
*
* Get the current error of @context or %NULL when the context state
* is not #PV_CONTEXT_STATE_ERROR
*
* Returns: the last error or %NULL
*/
const GError *
pv_context_error (PvContext *context)
{
PvContextPrivate *priv;
g_return_val_if_fail (PV_IS_CONTEXT (context), NULL);
priv = context->priv;
return priv->error;
}
/**
* pv_context_get_connection:
* @context: a #PvContext

View file

@ -107,6 +107,7 @@ gboolean pv_context_register_source (PvContext *context, PvSource
gboolean pv_context_unregister_source (PvContext *context, PvSource *source);
PvContextState pv_context_get_state (PvContext *context);
const GError * pv_context_error (PvContext *context);
GDBusConnection * pv_context_get_connection (PvContext *context);

View file

@ -38,6 +38,8 @@ struct _PvSourcePrivate
gchar *name;
PvSourceState state;
GVariant *properties;
GError *error;
};
G_DEFINE_ABSTRACT_TYPE (PvSource, pv_source, G_TYPE_OBJECT);
@ -379,6 +381,20 @@ pv_source_update_state (PvSource *source, PvSourceState state)
}
}
void
pv_source_report_error (PvSource *source, GError *error)
{
PvSourcePrivate *priv;
g_return_if_fail (PV_IS_SOURCE (source));
priv = source->priv;
g_clear_error (&priv->error);
priv->error = error;
priv->state = PV_SOURCE_STATE_ERROR;
g_object_notify (G_OBJECT (source), "state");
}
PvSourceOutput *
pv_source_create_source_output (PvSource *source, GVariant *props, const gchar *prefix)
{

View file

@ -100,6 +100,7 @@ GVariant * pv_source_get_capabilities (PvSource *source, GVariant *p
gboolean pv_source_set_state (PvSource *source, PvSourceState state);
void pv_source_update_state (PvSource *source, PvSourceState state);
void pv_source_report_error (PvSource *source, GError *error);
PvSourceOutput * pv_source_create_source_output (PvSource *source, GVariant *props, const gchar *prefix);
gboolean pv_source_release_source_output (PvSource *source, PvSourceOutput *output);

View file

@ -30,6 +30,7 @@ struct _PvStreamPrivate
{
PvContext *context;
gchar *name;
GVariant *properties;
gchar *target;
PvStreamState state;
@ -54,6 +55,7 @@ enum
PROP_0,
PROP_CONTEXT,
PROP_NAME,
PROP_PROPERTIES,
PROP_STATE,
PROP_SOCKET
};
@ -84,6 +86,10 @@ pv_stream_get_property (GObject *_object,
g_value_set_string (value, priv->name);
break;
case PROP_PROPERTIES:
g_value_set_variant (value, priv->properties);
break;
case PROP_STATE:
g_value_set_enum (value, priv->state);
break;
@ -116,6 +122,12 @@ pv_stream_set_property (GObject *_object,
priv->name = g_value_dup_string (value);
break;
case PROP_PROPERTIES:
if (priv->properties)
g_variant_unref (priv->properties);
priv->properties = g_value_dup_variant (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (stream, prop_id, pspec);
break;
@ -172,6 +184,21 @@ pv_stream_class_init (PvStreamClass * klass)
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
/**
* PvStream:properties
*
* The properties of the stream as specified at construction time.
*/
g_object_class_install_property (gobject_class,
PROP_PROPERTIES,
g_param_spec_variant ("properties",
"Properties",
"The properties of the stream",
G_VARIANT_TYPE_VARIANT,
NULL,
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
/**
* PvStream:state
*
@ -236,18 +263,19 @@ pv_stream_init (PvStream * stream)
* pv_stream_new:
* @context: a #PvContext
* @name: a stream name
* @properties: stream properties
*
* Make a new unconnected #PvStream
*
* Returns: a new unconnected #PvStream
*/
PvStream *
pv_stream_new (PvContext * context, const gchar *name)
pv_stream_new (PvContext * context, const gchar *name, GVariant *props)
{
g_return_val_if_fail (PV_IS_CONTEXT (context), NULL);
g_return_val_if_fail (name != NULL, NULL);
return g_object_new (PV_TYPE_STREAM, "context", context, "name", name, NULL);
return g_object_new (PV_TYPE_STREAM, "context", context, "name", name, "properties", props, NULL);
}
static void
@ -390,13 +418,15 @@ remove_source_output (PvStream *stream)
gboolean
pv_stream_connect_capture (PvStream *stream,
const gchar *source,
PvStreamFlags flags)
PvStreamFlags flags,
GVariant *spec)
{
PvStreamPrivate *priv;
GVariantBuilder builder;
PvContext *context;
g_return_val_if_fail (PV_IS_STREAM (stream), FALSE);
g_return_val_if_fail (spec != NULL, FALSE);
priv = stream->priv;
context = priv->context;
g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE);
@ -408,15 +438,13 @@ pv_stream_connect_capture (PvStream *stream,
priv->source = PV_SOURCE1 (pv_context_find_source (context, priv->target, NULL));
if (priv->source == NULL) {
g_warning ("can't find source");
stream_set_state (stream, PV_STREAM_STATE_READY);
return FALSE;
}
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
pv_source1_call_create_source_output (priv->source,
g_variant_builder_end (&builder), /* GVariant *arg_props */
NULL, /* GCancellable *cancellable */
spec, /* GVariant *arg_props */
NULL, /* GCancellable *cancellable */
on_source_output_created,
stream);
return TRUE;
@ -497,7 +525,7 @@ on_socket_data (GSocket *socket,
priv->info.size = msg.size;
priv->info.message = num_messages > 0 ? messages[0] : NULL;
g_signal_emit (stream, SIGNAL_NEW_BUFFER, 0, NULL);
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
break;
}
@ -530,7 +558,8 @@ handle_socket (PvStream *stream, gint fd)
source = g_socket_create_source (priv->socket, G_IO_IN, NULL);
g_source_set_callback (source, (GSourceFunc) on_socket_data, stream, NULL);
priv->socket_id = g_source_attach (source, NULL);
g_print ("%p\n", g_main_context_get_thread_default ());
priv->socket_id = g_source_attach (source, g_main_context_get_thread_default ());
g_source_unref (source);
break;
}

View file

@ -94,13 +94,15 @@ GType pv_stream_get_type (void);
PvStream * pv_stream_new (PvContext * context,
const gchar *name);
const gchar *name,
GVariant * props);
PvStreamState pv_stream_get_state (PvStream *stream);
gboolean pv_stream_connect_capture (PvStream *stream,
const gchar *source,
PvStreamFlags flags);
PvStreamFlags flags,
GVariant *spec);
gboolean pv_stream_disconnect (PvStream *stream);
gboolean pv_stream_start (PvStream *stream, PvStreamMode mode);

View file

@ -29,6 +29,7 @@ struct _PvSubscribePrivate
PvSubscriptionState state;
GDBusConnection *connection;
gchar *service;
GCancellable *cancellable;
PvSubscriptionFlags subscription_mask;
@ -36,6 +37,8 @@ struct _PvSubscribePrivate
guint pending_subscribes;
GHashTable *senders;
GError *error;
};
@ -68,6 +71,8 @@ typedef struct {
guint id;
PvSubscribe *sender_subscribe;
GList *clients;
gulong signal_event;
gulong signal_state;
} SenderData;
static void
@ -140,6 +145,7 @@ client_name_appeared_handler (GDBusConnection *connection,
{
SenderData *data = user_data;
g_print ("appeared client %s %p\n", name, data);
/* subscribe to Source events. We want to be notified when this new
* sender add/change/remove sources and outputs */
data->sender_subscribe = pv_subscribe_new ();
@ -148,11 +154,11 @@ client_name_appeared_handler (GDBusConnection *connection,
"connection", connection,
NULL);
g_signal_connect (data->sender_subscribe,
data->signal_event = g_signal_connect (data->sender_subscribe,
"subscription-event",
(GCallback) on_sender_subscription_event,
data);
g_signal_connect (data->sender_subscribe,
data->signal_state = g_signal_connect (data->sender_subscribe,
"notify::state",
(GCallback) on_sender_subscription_state,
data);
@ -175,18 +181,27 @@ client_name_vanished_handler (GDBusConnection *connection,
gpointer user_data)
{
SenderData *data = user_data;
PvSubscribePrivate *priv = data->subscribe->priv;
g_print ("vanished client %s\n", name);
g_print ("vanished client %s %p\n", name, data);
g_bus_unwatch_name (data->id);
}
static void
data_free (SenderData *data)
{
g_print ("free client %s %p\n", data->sender, data);
g_list_foreach (data->clients, (GFunc) remove_client, data);
g_hash_table_remove (priv->senders, data->sender);
g_hash_table_remove (data->subscribe->priv->senders, data->sender);
if (data->sender_subscribe)
if (data->sender_subscribe) {
g_signal_handler_disconnect (data->sender_subscribe, data->signal_event);
g_signal_handler_disconnect (data->sender_subscribe, data->signal_state);
g_object_unref (data->sender_subscribe);
}
g_free (data->sender);
g_bus_unwatch_name (data->id);
g_free (data);
}
@ -196,19 +211,19 @@ sender_data_new (PvSubscribe *subscribe, const gchar *sender)
PvSubscribePrivate *priv = subscribe->priv;
SenderData *data;
g_print ("watch name %s\n", sender);
data = g_new0 (SenderData, 1);
data->subscribe = subscribe;
data->sender = g_strdup (sender);
g_print ("watch name %s %p\n", sender, data);
data->id = g_bus_watch_name_on_connection (priv->connection,
sender,
G_BUS_NAME_WATCHER_FLAGS_NONE,
client_name_appeared_handler,
client_name_vanished_handler,
data,
NULL);
(GDestroyNotify) data_free);
g_hash_table_insert (priv->senders, data->sender, data);
priv->pending_subscribes++;
@ -444,14 +459,17 @@ on_client_manager_ready (GObject *source_object,
connect_client_signals (subscribe);
on_client_manager_name_owner (G_OBJECT (priv->client_manager), NULL, subscribe);
g_object_unref (subscribe);
return;
/* ERRORS */
manager_error:
{
g_warning ("could not create client manager: %s", error->message);
g_clear_error (&error);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_ERROR);
priv->error = error;
g_object_unref (subscribe);
return;
}
}
@ -468,9 +486,9 @@ install_subscription (PvSubscribe *subscribe)
G_DBUS_OBJECT_MANAGER_CLIENT_FLAGS_NONE,
priv->service,
PV_DBUS_OBJECT_PREFIX,
NULL,
priv->cancellable,
on_client_manager_ready,
subscribe);
g_object_ref (subscribe));
priv->pending_subscribes++;
}
@ -480,6 +498,7 @@ uninstall_subscription (PvSubscribe *subscribe)
PvSubscribePrivate *priv = subscribe->priv;
g_clear_object (&priv->client_manager);
g_clear_error (&priv->error);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_UNCONNECTED);
}
@ -557,9 +576,13 @@ pv_subscribe_finalize (GObject * object)
PvSubscribe *subscribe = PV_SUBSCRIBE (object);
PvSubscribePrivate *priv = subscribe->priv;
g_free (priv->service);
g_object_unref (priv->client_manager);
g_print ("cancel\n");
g_cancellable_cancel (priv->cancellable);
g_hash_table_unref (priv->senders);
if (priv->client_manager)
g_object_unref (priv->client_manager);
g_object_unref (priv->cancellable);
g_free (priv->service);
G_OBJECT_CLASS (pv_subscribe_parent_class)->finalize (object);
}
@ -661,6 +684,7 @@ pv_subscribe_init (PvSubscribe * subscribe)
priv->service = g_strdup (PV_DBUS_SERVICE);
priv->senders = g_hash_table_new (g_str_hash, g_str_equal);
priv->state = PV_SUBSCRIPTION_STATE_UNCONNECTED;
priv->cancellable = g_cancellable_new ();
}
/**
@ -677,3 +701,26 @@ pv_subscribe_new (void)
{
return g_object_new (PV_TYPE_SUBSCRIBE, NULL);
}
PvSubscriptionState
pv_subscribe_get_state (PvSubscribe *subscribe)
{
PvSubscribePrivate *priv;
g_return_val_if_fail (PV_IS_SUBSCRIBE (subscribe), PV_SUBSCRIPTION_STATE_ERROR);
priv = subscribe->priv;
return priv->state;
}
GError *
pv_subscribe_get_error (PvSubscribe *subscribe)
{
PvSubscribePrivate *priv;
g_return_val_if_fail (PV_IS_SUBSCRIBE (subscribe), NULL);
priv = subscribe->priv;
return priv->error;
}

View file

@ -80,9 +80,14 @@ struct _PvSubscribeClass {
};
/* normal GObject stuff */
GType pv_subscribe_get_type (void);
GType pv_subscribe_get_type (void);
PvSubscribe * pv_subscribe_new (void);
PvSubscriptionState pv_subscribe_get_state (PvSubscribe *subscribe);
GError * pv_subscribe_get_error (PvSubscribe *subscribe);
PvSubscribe * pv_subscribe_new (void);
G_END_DECLS