diff --git a/src/Makefile.am b/src/Makefile.am index 134eb869e..b502c657b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -60,6 +60,7 @@ dbuspolicy_DATA = \ ################################### enumtypesincludes = client/pv-context.h \ + client/pv-introspect.h \ client/pv-source.h \ client/pv-stream.h \ client/pv-subscribe.h @@ -159,6 +160,7 @@ pulsevideoinclude_HEADERS = \ client/pulsevideo.h \ client/pv-context.h \ client/pv-enumtypes.h \ + client/pv-introspect.h \ client/pv-stream.h \ client/pv-subscribe.h \ client/pv-source.h \ @@ -172,6 +174,7 @@ lib_LTLIBRARIES = \ libpulsevideo_@PV_MAJORMINOR@_la_SOURCES = \ client/pv-context.h client/pv-context.c \ client/pv-enumtypes.h client/pv-enumtypes.c \ + client/pv-introspect.h client/pv-introspect.c \ client/pv-stream.h client/pv-stream.c \ client/pulsevideo.c client/pulsevideo.h \ client/pv-source-output.c client/pv-source-output.h \ diff --git a/src/client/pulsevideo.h b/src/client/pulsevideo.h index e4e457e2c..368091fd5 100644 --- a/src/client/pulsevideo.h +++ b/src/client/pulsevideo.h @@ -25,6 +25,7 @@ #include #include #include +#include #define PV_DBUS_SERVICE "org.pulsevideo" #define PV_DBUS_OBJECT_PREFIX "/org/pulsevideo" diff --git a/src/client/pv-context.c b/src/client/pv-context.c index 03e24d632..5df3e4e93 100644 --- a/src/client/pv-context.c +++ b/src/client/pv-context.c @@ -25,31 +25,7 @@ #include "dbus/org-pulsevideo.h" -struct _PvContextPrivate -{ - gchar *name; - GVariant *properties; - - guint id; - GDBusConnection *connection; - - PvContextFlags flags; - PvContextState state; - - PvDaemon1 *daemon; - - PvClient1 *client; - - PvSubscriptionFlags subscription_mask; - PvSubscribe *subscribe; - - GList *sources; - - GDBusObjectManagerServer *server_manager; - - GError *error; -}; - +#include "client/pv-private.h" #define PV_CONTEXT_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), PV_TYPE_CONTEXT, PvContextPrivate)) @@ -67,6 +43,7 @@ subscription_cb (PvSubscribe *subscribe, enum { PROP_0, + PROP_MAIN_CONTEXT, PROP_NAME, PROP_PROPERTIES, PROP_STATE, @@ -92,6 +69,10 @@ pv_context_get_property (GObject *_object, PvContextPrivate *priv = context->priv; switch (prop_id) { + case PROP_MAIN_CONTEXT: + g_value_set_pointer (value, priv->context); + break; + case PROP_NAME: g_value_set_string (value, priv->name); break; @@ -128,6 +109,10 @@ pv_context_set_property (GObject *_object, PvContextPrivate *priv = context->priv; switch (prop_id) { + case PROP_MAIN_CONTEXT: + priv->context = g_value_get_pointer (value); + break; + case PROP_NAME: g_free (priv->name); priv->name = g_value_dup_string (value); @@ -173,6 +158,19 @@ pv_context_class_init (PvContextClass * klass) gobject_class->set_property = pv_context_set_property; gobject_class->get_property = pv_context_get_property; + /** + * PvContext:main-context + * + * The main context to use + */ + g_object_class_install_property (gobject_class, + PROP_MAIN_CONTEXT, + g_param_spec_pointer ("main-context", + "Main Context", + "The main context to use", + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); /** * PvContext:name * @@ -288,9 +286,9 @@ pv_context_init (PvContext * context) * Returns: a new unconnected #PvContext */ PvContext * -pv_context_new (const gchar *name, GVariant *properties) +pv_context_new (GMainContext *context, const gchar *name, GVariant *properties) { - return g_object_new (PV_TYPE_CONTEXT, "name", name, "properties", properties, NULL); + return g_object_new (PV_TYPE_CONTEXT, "main-context", context, "name", name, "properties", properties, NULL); } static void @@ -311,6 +309,8 @@ on_client_proxy (GObject *source_object, PvContextPrivate *priv = context->priv; GError *error = NULL; + g_assert (g_main_context_get_thread_default () == priv->context); + priv->client = pv_client1_proxy_new_finish (res, &error); if (priv->client == NULL) { priv->error = error; @@ -331,6 +331,8 @@ on_client_connected (GObject *source_object, GError *error = NULL; gchar *client_path; + g_assert (g_main_context_get_thread_default () == priv->context); + if (!pv_daemon1_call_connect_client_finish (priv->daemon, &client_path, res, &error)) { priv->error = error; context_set_state (context, PV_CONTEXT_STATE_ERROR); @@ -356,6 +358,8 @@ on_daemon_connected (GObject *source_object, PvContext *context = user_data; PvContextPrivate *priv = context->priv; + g_assert (g_main_context_get_thread_default () == priv->context); + context_set_state (context, PV_CONTEXT_STATE_REGISTERING); { @@ -385,6 +389,9 @@ subscription_cb (PvSubscribe *subscribe, g_print ("got event %d %d\n", event, flags); + g_assert (g_main_context_get_thread_default () == priv->context); + + switch (flags) { case PV_SUBSCRIPTION_FLAGS_DAEMON: priv->daemon = PV_DAEMON1 (object); @@ -394,7 +401,10 @@ subscription_cb (PvSubscribe *subscribe, break; case PV_SUBSCRIPTION_FLAGS_SOURCE: - priv->sources = g_list_prepend (priv->sources, object); + if (event == PV_SUBSCRIPTION_EVENT_NEW) + priv->sources = g_list_prepend (priv->sources, object); + else if (event == PV_SUBSCRIPTION_EVENT_REMOVE) + priv->sources = g_list_remove (priv->sources, object); break; case PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT: @@ -417,9 +427,13 @@ subscription_state (GObject *object, gpointer user_data) { PvContext *context = user_data; + PvContextPrivate *priv = context->priv; PvSubscriptionState state; - g_object_get (object, "state", &state, NULL); + g_assert (g_main_context_get_thread_default () == priv->context); + g_assert (object == G_OBJECT (priv->subscribe)); + + state = pv_subscribe_get_state (priv->subscribe); g_print ("got subscription state %d\n", state); switch (state) { @@ -442,6 +456,10 @@ on_name_appeared (GDBusConnection *connection, PvContext *context = user_data; PvContextPrivate *priv = context->priv; + g_assert (g_main_context_get_thread_default () == priv->context); + + g_print ("context: on name appeared\n"); + priv->connection = connection; g_dbus_object_manager_server_set_connection (priv->server_manager, connection); @@ -457,6 +475,10 @@ on_name_vanished (GDBusConnection *connection, PvContext *context = user_data; PvContextPrivate *priv = context->priv; + g_assert (g_main_context_get_thread_default () == priv->context); + + g_print ("context: on name vanished\n"); + priv->connection = connection; g_dbus_object_manager_server_set_connection (priv->server_manager, connection); @@ -470,6 +492,26 @@ on_name_vanished (GDBusConnection *connection, } } +static gboolean +do_connect (PvContext *context) +{ + PvContextPrivate *priv = context->priv; + GBusNameWatcherFlags nw_flags; + + nw_flags = G_BUS_NAME_WATCHER_FLAGS_NONE; + if (!(priv->flags & PV_CONTEXT_FLAGS_NOAUTOSPAWN)) + nw_flags = G_BUS_NAME_WATCHER_FLAGS_AUTO_START; + + priv->id = g_bus_watch_name (G_BUS_TYPE_SESSION, + PV_DBUS_SERVICE, + nw_flags, + on_name_appeared, + on_name_vanished, + context, + NULL); + return FALSE; +} + /** * pv_context_connect: * @context: a #PvContext @@ -483,7 +525,6 @@ gboolean pv_context_connect (PvContext *context, PvContextFlags flags) { PvContextPrivate *priv; - GBusNameWatcherFlags nw_flags; g_return_val_if_fail (PV_IS_CONTEXT (context), FALSE); @@ -493,18 +534,8 @@ pv_context_connect (PvContext *context, PvContextFlags flags) priv->flags = flags; context_set_state (context, PV_CONTEXT_STATE_CONNECTING); + g_main_context_invoke (priv->context, (GSourceFunc) do_connect, context); - nw_flags = G_BUS_NAME_WATCHER_FLAGS_NONE; - if (!(flags & PV_CONTEXT_FLAGS_NOAUTOSPAWN)) - nw_flags = G_BUS_NAME_WATCHER_FLAGS_AUTO_START; - - priv->id = g_bus_watch_name (G_BUS_TYPE_SESSION, - PV_DBUS_SERVICE, - nw_flags, - on_name_appeared, - on_name_vanished, - context, - NULL); return TRUE; } @@ -617,7 +648,7 @@ pv_context_get_state (PvContext *context) } /** - * pv_context_error: + * pv_context_get_error: * @context: a #PvContext * * Get the current error of @context or %NULL when the context state @@ -626,7 +657,7 @@ pv_context_get_state (PvContext *context) * Returns: the last error or %NULL */ const GError * -pv_context_error (PvContext *context) +pv_context_get_error (PvContext *context) { PvContextPrivate *priv; @@ -636,22 +667,6 @@ pv_context_error (PvContext *context) return priv->error; } -/** - * pv_context_get_connection: - * @context: a #PvContext - * - * Get the #GDBusConnection of @context. - * - * Returns: the #GDBusConnection of @context or %NULL when not connected. - */ -GDBusConnection * -pv_context_get_connection (PvContext *context) -{ - g_return_val_if_fail (PV_IS_CONTEXT (context), NULL); - - return context->priv->connection; -} - GDBusProxy * pv_context_find_source (PvContext *context, const gchar *name, GVariant *props) { diff --git a/src/client/pv-context.h b/src/client/pv-context.h index 8e3106bbb..781a75a42 100644 --- a/src/client/pv-context.h +++ b/src/client/pv-context.h @@ -98,7 +98,9 @@ struct _PvContextClass { /* normal GObject stuff */ GType pv_context_get_type (void); -PvContext * pv_context_new (const gchar *name, GVariant *properties); +PvContext * pv_context_new (GMainContext *ctx, + const gchar *name, + GVariant *properties); gboolean pv_context_connect (PvContext *context, PvContextFlags flags); gboolean pv_context_disconnect (PvContext *context); @@ -107,12 +109,7 @@ gboolean pv_context_register_source (PvContext *context, PvSource gboolean pv_context_unregister_source (PvContext *context, PvSource *source); PvContextState pv_context_get_state (PvContext *context); -const GError * pv_context_error (PvContext *context); - -GDBusConnection * pv_context_get_connection (PvContext *context); - -GDBusProxy * pv_context_find_source (PvContext *context, const gchar *name, GVariant *props); - +const GError * pv_context_get_error (PvContext *context); G_END_DECLS diff --git a/src/client/pv-introspect.c b/src/client/pv-introspect.c new file mode 100644 index 000000000..4555ff8a3 --- /dev/null +++ b/src/client/pv-introspect.c @@ -0,0 +1,61 @@ +/* Pulsevideo + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "client/pulsevideo.h" + +#include "client/pv-context.h" +#include "client/pv-enumtypes.h" +#include "client/pv-subscribe.h" + +#include "dbus/org-pulsevideo.h" + +#include "client/pv-private.h" + +/** + * pv_context_list_source_info: + * @context: a connected #PvContext + * @flags: extra #PvSourceInfoFlags + * @cb: a #PvSourceInfoCallback + * @cancelable: a #GCancellable + * @user_data: user data passed to @cb + * + * Call @cb for each source. + */ +void +pv_context_list_source_info (PvContext *context, + PvSourceInfoFlags flags, + PvSourceInfoCallback cb, + GCancellable *cancellable, + gpointer user_data) +{ + GList *walk; + PvContextPrivate *priv = context->priv; + + for (walk = priv->sources; walk; walk = g_list_next (walk)) { + GDBusProxy *proxy = walk->data; + PvSourceInfo info; + + info.name = pv_source1_get_name (PV_SOURCE1 (proxy)); + info.properties = pv_source1_get_properties (PV_SOURCE1 (proxy)); + info.state = pv_source1_get_state (PV_SOURCE1 (proxy)); + + cb (context, &info, user_data); + } + cb (context, NULL, user_data); +} diff --git a/src/client/pv-introspect.h b/src/client/pv-introspect.h new file mode 100644 index 000000000..16ae00e6b --- /dev/null +++ b/src/client/pv-introspect.h @@ -0,0 +1,68 @@ +/* Pulsevideo + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PV_INTROSPECT_H__ +#define __PV_INTROSPECT_H__ + +#include +#include + +#include + +G_BEGIN_DECLS + +/** + * PvSourceInfo: + * @name: the name of the source + * @properties: the properties of the source + * @state: the current state of the source + * + * The source information + */ +typedef struct { + const char *name; + GVariant *properties; + PvSourceState state; + GVariant *capabilities; +} PvSourceInfo; + +/** + * PvSourceInfoFlags: + * @PV_SOURCE_INFO_FLAGS_NONE: no flags + * @PV_SOURCE_INFO_FLAGS_CAPABILITIES: include capabilities + * + * Extra flags to pass to pv_context_get_source_info_list. + */ +typedef enum { + PV_SOURCE_INFO_FLAGS_NONE = 0, + PV_SOURCE_INFO_FLAGS_CAPABILITIES = (1 << 0) +} PvSourceInfoFlags; + +typedef gboolean (*PvSourceInfoCallback) (PvContext *c, const PvSourceInfo *info, gpointer userdata); + +void pv_context_list_source_info (PvContext *context, + PvSourceInfoFlags flags, + PvSourceInfoCallback cb, + GCancellable *cancellable, + gpointer user_data); + +G_END_DECLS + +#endif /* __PV_INTROSPECT_H__ */ + diff --git a/src/client/pv-private.h b/src/client/pv-private.h new file mode 100644 index 000000000..c7c850388 --- /dev/null +++ b/src/client/pv-private.h @@ -0,0 +1,48 @@ +/* Pulsevideo + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +struct _PvContextPrivate +{ + GMainContext *context; + + gchar *name; + GVariant *properties; + + guint id; + GDBusConnection *connection; + + PvContextFlags flags; + PvContextState state; + + PvDaemon1 *daemon; + + PvClient1 *client; + + PvSubscriptionFlags subscription_mask; + PvSubscribe *subscribe; + + GList *sources; + + GDBusObjectManagerServer *server_manager; + + GError *error; +}; + +GDBusProxy * pv_context_find_source (PvContext *context, const gchar *name, GVariant *props); + diff --git a/src/client/pv-stream.c b/src/client/pv-stream.c index f59ebae54..039c38547 100644 --- a/src/client/pv-stream.c +++ b/src/client/pv-stream.c @@ -26,6 +26,8 @@ #include "dbus/org-pulsevideo.h" +#include "client/pv-private.h" + struct _PvStreamPrivate { PvContext *context; @@ -33,9 +35,11 @@ struct _PvStreamPrivate GVariant *properties; gchar *target; PvStreamState state; + GError *error; gchar *source_output_path; PvSource1 *source; + GVariant *spec; PvSourceOutput1 *source_output; GSocket *socket; @@ -303,6 +307,23 @@ pv_stream_get_state (PvStream *stream) return stream->priv->state; } +/** + * pv_stream_get_error: + * @stream: a #PvStream + * + * Get the error of @stream. + * + * Returns: the error of @stream or %NULL when there is no error + */ +const GError * +pv_stream_get_error (PvStream *stream) +{ + g_return_val_if_fail (PV_IS_STREAM (stream), NULL); + + return stream->priv->error; +} + + static void on_request_reconfigure (PvSourceOutput1 *interface, GVariant *props, @@ -320,6 +341,8 @@ on_source_output1_proxy (GObject *source_object, PvStreamPrivate *priv = stream->priv; GError *error = NULL; + g_assert (g_main_context_get_thread_default () == priv->context->priv->context); + priv->source_output = pv_source_output1_proxy_new_finish (res, &error); if (priv->source_output == NULL) goto source_output_failed; @@ -331,9 +354,9 @@ on_source_output1_proxy (GObject *source_object, /* ERRORS */ source_output_failed: { + priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); g_error ("failed to get source output proxy: %s", error->message); - g_clear_error (&error); return; } } @@ -348,13 +371,15 @@ on_source_output_created (GObject *source_object, PvContext *context = priv->context; GError *error = NULL; + g_assert (g_main_context_get_thread_default () == priv->context->priv->context); + if (!pv_source1_call_create_source_output_finish (priv->source, &priv->source_output_path, res, &error)) goto create_failed; g_print ("got source-output %s\n", priv->source_output_path); - pv_source_output1_proxy_new (pv_context_get_connection (context), + pv_source_output1_proxy_new (context->priv->connection, G_DBUS_PROXY_FLAGS_NONE, g_dbus_proxy_get_name (G_DBUS_PROXY (priv->source)), priv->source_output_path, @@ -366,9 +391,9 @@ on_source_output_created (GObject *source_object, /* ERRORS */ create_failed: { + priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); g_print ("failed to get connect capture: %s", error->message); - g_clear_error (&error); return; } } @@ -382,11 +407,13 @@ on_source_output_removed (GObject *source_object, PvStreamPrivate *priv = stream->priv; GError *error = NULL; + g_assert (g_main_context_get_thread_default () == priv->context->priv->context); + if (!pv_source_output1_call_remove_finish (priv->source_output, res, &error)) { + priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); g_print ("failed to disconnect: %s", error->message); - g_clear_error (&error); return; } g_clear_pointer (&priv->source_output_path, g_free); @@ -398,6 +425,8 @@ remove_source_output (PvStream *stream) { PvStreamPrivate *priv = stream->priv; + g_assert (g_main_context_get_thread_default () == priv->context->priv->context); + pv_source_output1_call_remove (priv->source_output, NULL, /* GCancellable *cancellable */ on_source_output_removed, @@ -405,6 +434,21 @@ remove_source_output (PvStream *stream) return TRUE; } +static gboolean +do_connect_capture (PvStream *stream) +{ + PvStreamPrivate *priv = stream->priv; + + g_assert (g_main_context_get_thread_default () == priv->context->priv->context); + + pv_source1_call_create_source_output (priv->source, + priv->spec, /* GVariant *arg_props */ + NULL, /* GCancellable *cancellable */ + on_source_output_created, + stream); + return FALSE; +} + /** * pv_stream_connect_capture: * @stream: a #PvStream @@ -433,20 +477,17 @@ pv_stream_connect_capture (PvStream *stream, priv->target = g_strdup (source); - stream_set_state (stream, PV_STREAM_STATE_CONNECTING); - priv->source = PV_SOURCE1 (pv_context_find_source (context, priv->target, NULL)); if (priv->source == NULL) { g_warning ("can't find source"); - stream_set_state (stream, PV_STREAM_STATE_READY); return FALSE; } + priv->spec = spec; + + stream_set_state (stream, PV_STREAM_STATE_CONNECTING); + + g_main_context_invoke (context->priv->context, (GSourceFunc) do_connect_capture, stream); - pv_source1_call_create_source_output (priv->source, - spec, /* GVariant *arg_props */ - NULL, /* GCancellable *cancellable */ - on_source_output_created, - stream); return TRUE; } @@ -558,8 +599,7 @@ handle_socket (PvStream *stream, gint fd) source = g_socket_create_source (priv->socket, G_IO_IN, NULL); g_source_set_callback (source, (GSourceFunc) on_socket_data, stream, NULL); - g_print ("%p\n", g_main_context_get_thread_default ()); - priv->socket_id = g_source_attach (source, g_main_context_get_thread_default ()); + priv->socket_id = g_source_attach (source, priv->context->priv->context); g_source_unref (source); break; } @@ -572,9 +612,9 @@ handle_socket (PvStream *stream, gint fd) /* ERRORS */ socket_failed: { + priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); g_error ("failed to create socket: %s", error->message); - g_clear_error (&error); return; } } @@ -645,12 +685,29 @@ fd_failed: } exit_error: { + priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_clear_error (&error); return; } } +static gboolean +do_start (PvStream *stream) +{ + PvStreamPrivate *priv = stream->priv; + GVariantBuilder builder; + + g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}")); + g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello")); + + pv_source_output1_call_start (priv->source_output, + g_variant_builder_end (&builder), /* GVariant *arg_properties */ + NULL, /* GCancellable *cancellable */ + on_stream_started, + stream); + return FALSE; +} + /** * pv_stream_start: * @stream: a #PvStream @@ -680,18 +737,9 @@ pv_stream_start (PvStream *stream, PvStreamMode mode) priv->mode = mode; stream_set_state (stream, PV_STREAM_STATE_STARTING); - { - GVariantBuilder builder; - g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}")); - g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello")); + g_main_context_invoke (priv->context->priv->context, (GSourceFunc) do_start, stream); - pv_source_output1_call_start (priv->source_output, - g_variant_builder_end (&builder), /* GVariant *arg_properties */ - NULL, /* GCancellable *cancellable */ - on_stream_started, - stream); - } return TRUE; } @@ -717,13 +765,25 @@ on_stream_stopped (GObject *source_object, /* ERRORS */ call_failed: { + priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); g_error ("failed to release: %s", error->message); - g_clear_error (&error); return; } } +static gboolean +do_stop (PvStream *stream) +{ + PvStreamPrivate *priv = stream->priv; + + pv_source_output1_call_stop (priv->source_output, + NULL, /* GCancellable *cancellable */ + on_stream_stopped, + stream); + + return FALSE; +} /** * pv_stream_stop: * @stream: a #PvStream @@ -742,10 +802,7 @@ pv_stream_stop (PvStream *stream) priv = stream->priv; g_return_val_if_fail (priv->state == PV_STREAM_STATE_STREAMING, FALSE); - pv_source_output1_call_stop (priv->source_output, - NULL, /* GCancellable *cancellable */ - on_stream_stopped, - stream); + g_main_context_invoke (priv->context->priv->context, (GSourceFunc) do_stop, stream); return TRUE; } diff --git a/src/client/pv-stream.h b/src/client/pv-stream.h index c6e62c674..ca5a7a9ee 100644 --- a/src/client/pv-stream.h +++ b/src/client/pv-stream.h @@ -98,6 +98,7 @@ PvStream * pv_stream_new (PvContext * context, GVariant * props); PvStreamState pv_stream_get_state (PvStream *stream); +const GError * pv_stream_get_error (PvStream *stream); gboolean pv_stream_connect_capture (PvStream *stream, const gchar *source, diff --git a/src/client/pv-subscribe.c b/src/client/pv-subscribe.c index 7ce4b4068..5340b5e8e 100644 --- a/src/client/pv-subscribe.c +++ b/src/client/pv-subscribe.c @@ -91,6 +91,8 @@ on_sender_subscription_event (PvSubscribe *sender_subscribe, SenderData *data = user_data; PvSubscribe *subscribe = data->subscribe; + g_print ("on sender subscription def: %p\n", g_main_context_get_thread_default ()); + g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, @@ -145,6 +147,11 @@ client_name_appeared_handler (GDBusConnection *connection, { SenderData *data = user_data; + g_print ("client name appeared def: %p\n", g_main_context_get_thread_default ()); + + if (!g_strcmp0 (name, g_dbus_connection_get_unique_name (connection))) + return; + g_print ("appeared client %s %p\n", name, data); /* subscribe to Source events. We want to be notified when this new * sender add/change/remove sources and outputs */ @@ -215,6 +222,7 @@ sender_data_new (PvSubscribe *subscribe, const gchar *sender) data->subscribe = subscribe; data->sender = g_strdup (sender); + g_print ("watch name def: %p\n", g_main_context_get_thread_default ()); g_print ("watch name %s %p\n", sender, data); data->id = g_bus_watch_name_on_connection (priv->connection, @@ -239,6 +247,9 @@ notify_subscription (PvSubscribe *subscribe, { PvSubscribePrivate *priv = subscribe->priv; + g_print ("notify subscription def: %p\n", g_main_context_get_thread_default ()); + //g_assert (g_main_context_get_thread_default ()); + if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_DAEMON) { PvDaemon1 *daemon; @@ -384,6 +395,8 @@ client_manager_appeared (PvSubscribe *subscribe) PvSubscribePrivate *priv = subscribe->priv; GList *objects, *walk; + g_print ("client manager appeared def: %p\n", g_main_context_get_thread_default ()); + objects = g_dbus_object_manager_get_objects (G_DBUS_OBJECT_MANAGER (priv->client_manager)); for (walk = objects; walk ; walk = g_list_next (walk)) { on_client_manager_object_added (G_DBUS_OBJECT_MANAGER (priv->client_manager), @@ -408,6 +421,8 @@ on_client_manager_name_owner (GObject *object, PvSubscribePrivate *priv = subscribe->priv; gchar *name_owner; + g_print ("client manager owner def: %p\n", g_main_context_get_thread_default ()); + g_object_get (priv->client_manager, "name-owner", &name_owner, NULL); g_print ("client manager %s %s\n", g_dbus_object_manager_client_get_name (G_DBUS_OBJECT_MANAGER_CLIENT (priv->client_manager)), @@ -427,6 +442,8 @@ connect_client_signals (PvSubscribe *subscribe) { PvSubscribePrivate *priv = subscribe->priv; + g_print ("add signals def: %p\n", g_main_context_get_thread_default ()); + g_signal_connect (priv->client_manager, "notify::name-owner", (GCallback) on_client_manager_name_owner, subscribe); g_signal_connect (priv->client_manager, "interface-added", @@ -452,6 +469,8 @@ on_client_manager_ready (GObject *source_object, PvSubscribePrivate *priv = subscribe->priv; GError *error = NULL; + g_print ("client manager ready def: %p\n", g_main_context_get_thread_default ()); + priv->client_manager = pv_object_manager_client_new_finish (res, &error); if (priv->client_manager == NULL) goto manager_error; @@ -481,6 +500,8 @@ install_subscription (PvSubscribe *subscribe) subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_CONNECTING); + g_print ("new client manager def: %p\n", g_main_context_get_thread_default ()); + g_print ("new client manager for %s\n", priv->service); pv_object_manager_client_new (priv->connection, G_DBUS_OBJECT_MANAGER_CLIENT_FLAGS_NONE, diff --git a/src/gst/gstpvsrc.c b/src/gst/gstpvsrc.c index c76e66615..ba01f7391 100644 --- a/src/gst/gstpvsrc.c +++ b/src/gst/gstpvsrc.c @@ -74,6 +74,7 @@ static void gst_pulsevideo_src_get_property (GObject * object, guint prop_id, static GstStateChangeReturn gst_pulsevideo_src_change_state (GstElement * element, GstStateChange transition); +static gboolean gst_pulsevideo_src_negotiate (GstBaseSrc * basesrc); static GstCaps *gst_pulsevideo_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter); static gboolean gst_pulsevideo_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps); static GstCaps *gst_pulsevideo_src_src_fixate (GstBaseSrc * bsrc, @@ -113,6 +114,7 @@ gst_pulsevideo_src_class_init (GstPulsevideoSrcClass * klass) gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&gst_pulsevideo_src_template)); + gstbasesrc_class->negotiate = gst_pulsevideo_src_negotiate; gstbasesrc_class->get_caps = gst_pulsevideo_src_getcaps; gstbasesrc_class->set_caps = gst_pulsevideo_src_setcaps; gstbasesrc_class->fixate = gst_pulsevideo_src_src_fixate; @@ -132,6 +134,8 @@ gst_pulsevideo_src_init (GstPulsevideoSrc * src) gst_base_src_set_live (GST_BASE_SRC (src), TRUE); src->fd_allocator = gst_fd_allocator_new (); + g_mutex_init (&src->lock); + g_cond_init (&src->cond); } static GstCaps * @@ -252,18 +256,9 @@ on_new_buffer (GObject *gobject, { GstPulsevideoSrc *pvsrc = user_data; - g_main_loop_quit (pvsrc->loop); -} - -static void -on_socket_notify (GObject *gobject, - GParamSpec *pspec, - gpointer user_data) -{ - GSocket *socket; - - g_object_get (gobject, "socket", &socket, NULL); - g_print ("got socket %p\n", socket); + g_mutex_lock (&pvsrc->lock); + g_cond_signal (&pvsrc->cond); + g_mutex_unlock (&pvsrc->lock); } static void @@ -274,30 +269,52 @@ on_stream_notify (GObject *gobject, PvStreamState state; GstPulsevideoSrc *pvsrc = user_data; - g_object_get (gobject, "state", &state, NULL); + g_mutex_lock (&pvsrc->lock); + state = pv_stream_get_state (pvsrc->stream); g_print ("got stream state %d\n", state); - - switch (state) { - case PV_STREAM_STATE_ERROR: - g_main_loop_quit (pvsrc->loop); - break; - case PV_STREAM_STATE_READY: - g_main_loop_quit (pvsrc->loop); - g_main_context_push_thread_default (pvsrc->context); - pv_stream_start (pvsrc->stream, PV_STREAM_MODE_BUFFER); - g_main_context_pop_thread_default (pvsrc->context); - break; - case PV_STREAM_STATE_STREAMING: - break; - default: - break; + if (state == PV_STREAM_STATE_ERROR) { + GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, + ("Failed to connect stream: %s", + pv_stream_get_error (pvsrc->stream)->message), (NULL)); } + g_cond_broadcast (&pvsrc->cond); + g_mutex_unlock (&pvsrc->lock); +} + +static gboolean +source_info_callback (PvContext *c, const PvSourceInfo *info, gpointer user_data) +{ + GstPulsevideoSrc *pvsrc = user_data; + + if (info == NULL) { + return TRUE; + } + + g_print ("source %s %p\n", info->name, pvsrc); + + + return TRUE; +} + +static gboolean +gst_pulsevideo_src_negotiate (GstBaseSrc * basesrc) +{ + GstPulsevideoSrc *pvsrc = GST_PULSEVIDEO_SRC (basesrc); + + pv_context_list_source_info (pvsrc->ctx, + PV_SOURCE_INFO_FLAGS_CAPABILITIES, + source_info_callback, + NULL, + pvsrc); + + return GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc); } static GstCaps * gst_pulsevideo_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter) { return GST_BASE_SRC_CLASS (parent_class)->get_caps (bsrc, filter); + } static gboolean @@ -324,10 +341,8 @@ gst_pulsevideo_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps) /* looks ok here */ pvsrc->info = info; - g_main_context_push_thread_default (pvsrc->context); pvsrc->stream = pv_stream_new (pvsrc->ctx, "test", NULL); g_signal_connect (pvsrc->stream, "notify::state", (GCallback) on_stream_notify, pvsrc); - g_signal_connect (pvsrc->stream, "notify::socket", (GCallback) on_socket_notify, pvsrc); g_signal_connect (pvsrc->stream, "new-buffer", (GCallback) on_new_buffer, pvsrc); g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}")); @@ -345,9 +360,22 @@ gst_pulsevideo_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps) // g_variant_new_string (gst_video_interlace_mode_to_string (info.interlace_mode))); pv_stream_connect_capture (pvsrc->stream, NULL, 0, g_variant_builder_end (&builder)); - g_main_context_pop_thread_default (pvsrc->context); - g_main_loop_run (pvsrc->loop); + g_mutex_lock (&pvsrc->lock); + while (TRUE) { + PvStreamState state = pv_stream_get_state (pvsrc->stream); + + if (state == PV_STREAM_STATE_READY) + break; + + if (state == PV_STREAM_STATE_ERROR) + goto connect_error; + + g_cond_wait (&pvsrc->cond, &pvsrc->lock); + } + g_mutex_unlock (&pvsrc->lock); + + pv_stream_start (pvsrc->stream, PV_STREAM_MODE_BUFFER); GST_DEBUG_OBJECT (pvsrc, "size %dx%d, %d/%d fps", info.width, info.height, info.fps_n, info.fps_d); @@ -365,6 +393,11 @@ unsupported_caps: GST_DEBUG_OBJECT (bsrc, "unsupported caps: %" GST_PTR_FORMAT, caps); return FALSE; } +connect_error: + { + g_mutex_unlock (&pvsrc->lock); + return FALSE; + } } static gboolean @@ -449,8 +482,10 @@ gst_pulsevideo_src_create (GstPushSrc * psrc, GstBuffer ** buffer) GST_VIDEO_FORMAT_UNKNOWN)) goto not_negotiated; - g_main_loop_run (pvsrc->loop); + g_mutex_lock (&pvsrc->lock); + g_cond_wait (&pvsrc->cond, &pvsrc->lock); pv_stream_capture_buffer (pvsrc->stream, &info); + g_mutex_unlock (&pvsrc->lock); *buffer = gst_buffer_new (); @@ -494,6 +529,11 @@ gst_pulsevideo_src_stop (GstBaseSrc * basesrc) static gpointer handle_mainloop (GstPulsevideoSrc *this) { + g_main_context_push_thread_default (this->context); + g_print ("run mainloop\n"); + g_main_loop_run (this->loop); + g_print ("quit mainloop\n"); + g_main_context_pop_thread_default (this->context); return NULL; } @@ -506,36 +546,50 @@ on_state_notify (GObject *gobject, GstPulsevideoSrc *pvsrc = user_data; PvContextState state; - g_object_get (gobject, "state", &state, NULL); + g_mutex_lock (&pvsrc->lock); + state = pv_context_get_state (pvsrc->ctx); g_print ("got context state %d\n", state); + g_cond_broadcast (&pvsrc->cond); + g_mutex_unlock (&pvsrc->lock); - switch (state) { - case PV_CONTEXT_STATE_ERROR: - g_main_loop_quit (pvsrc->loop); - GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pv_context_error (pvsrc->ctx)->message), (NULL)); - break; - case PV_CONTEXT_STATE_READY: - g_main_loop_quit (pvsrc->loop); - break; - default: - break; + if (state == PV_CONTEXT_STATE_ERROR) { + GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, + ("Failed to connect stream: %s", + pv_context_get_error (pvsrc->ctx)->message), (NULL)); } } static gboolean gst_pulsevideo_src_open (GstPulsevideoSrc * pvsrc) { - g_main_context_push_thread_default (pvsrc->context); - pvsrc->ctx = pv_context_new ("test-client", NULL); - g_signal_connect (pvsrc->ctx, "notify::state", (GCallback) on_state_notify, pvsrc); - pv_context_connect(pvsrc->ctx, PV_CONTEXT_FLAGS_NONE); - g_main_context_pop_thread_default (pvsrc->context); - g_main_loop_run (pvsrc->loop); + pvsrc->ctx = pv_context_new (pvsrc->context, "test-client", NULL); + g_signal_connect (pvsrc->ctx, "notify::state", (GCallback) on_state_notify, pvsrc); + + pv_context_connect(pvsrc->ctx, PV_CONTEXT_FLAGS_NONE); + + g_mutex_lock (&pvsrc->lock); + while (TRUE) { + PvContextState state = pv_context_get_state (pvsrc->ctx); + + if (state == PV_CONTEXT_STATE_READY) + break; + + if (state == PV_CONTEXT_STATE_ERROR) + goto connect_error; + + g_cond_wait (&pvsrc->cond, &pvsrc->lock); + } + g_mutex_unlock (&pvsrc->lock); return TRUE; + + /* ERRORS */ +connect_error: + { + g_mutex_unlock (&pvsrc->lock); + return FALSE; + } } static GstStateChangeReturn @@ -552,7 +606,10 @@ gst_pulsevideo_src_change_state (GstElement * element, GstStateChange transition this->thread = g_thread_new ("pulsevideo", (GThreadFunc) handle_mainloop, this); break; case GST_STATE_CHANGE_READY_TO_PAUSED: - gst_pulsevideo_src_open (this); + if (!gst_pulsevideo_src_open (this)) { + ret = GST_STATE_CHANGE_FAILURE; + goto exit; + } break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* uncork and start recording */ @@ -568,7 +625,6 @@ gst_pulsevideo_src_change_state (GstElement * element, GstStateChange transition switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - g_main_loop_quit (this->loop); break; case GST_STATE_CHANGE_PAUSED_TO_READY: break; @@ -582,6 +638,7 @@ gst_pulsevideo_src_change_state (GstElement * element, GstStateChange transition break; } +exit: return ret; } diff --git a/src/gst/gstpvsrc.h b/src/gst/gstpvsrc.h index b21fa8dbb..c3be945ba 100644 --- a/src/gst/gstpvsrc.h +++ b/src/gst/gstpvsrc.h @@ -27,6 +27,7 @@ #include #include +#include G_BEGIN_DECLS @@ -65,6 +66,9 @@ struct _GstPulsevideoSrc { PvContext *ctx; PvStream *stream; GstAllocator *fd_allocator; + + GMutex lock; + GCond cond; }; struct _GstPulsevideoSrcClass { diff --git a/src/server/pv-daemon.c b/src/server/pv-daemon.c index 2b01aa6b9..23bdaf5cd 100644 --- a/src/server/pv-daemon.c +++ b/src/server/pv-daemon.c @@ -147,14 +147,13 @@ name_acquired_handler (GDBusConnection *connection, PvDaemonPrivate *priv = daemon->priv; GDBusObjectManagerServer *manager = priv->server_manager; + export_server_object (daemon, manager); + g_object_set (priv->subscribe, "service", PV_DBUS_SERVICE, "subscription-mask", PV_SUBSCRIPTION_FLAGS_ALL, "connection", connection, NULL); - - export_server_object (daemon, manager); - g_dbus_object_manager_server_set_connection (manager, connection); } diff --git a/src/tests/test-client.c b/src/tests/test-client.c index 8ee4c0389..eb4a8c660 100644 --- a/src/tests/test-client.c +++ b/src/tests/test-client.c @@ -117,7 +117,7 @@ main (gint argc, gchar *argv[]) loop = g_main_loop_new (NULL, FALSE); - c = pv_context_new ("test-client", NULL); + c = pv_context_new (NULL, "test-client", NULL); g_signal_connect (c, "notify::state", (GCallback) on_state_notify, c); pv_context_connect(c, PV_CONTEXT_FLAGS_NONE); diff --git a/src/tests/test-subscribe.c b/src/tests/test-subscribe.c index d28f81b8f..4691e882e 100644 --- a/src/tests/test-subscribe.c +++ b/src/tests/test-subscribe.c @@ -88,7 +88,7 @@ main (gint argc, gchar *argv[]) loop = g_main_loop_new (NULL, FALSE); - c = pv_context_new ("test-client", NULL); + c = pv_context_new (NULL, "test-client", NULL); g_signal_connect (c, "notify::state", (GCallback) on_state_notify, c); pv_context_connect(c, PV_CONTEXT_FLAGS_NOFAIL); diff --git a/src/tests/test-v4l2.c b/src/tests/test-v4l2.c index 7eb4eaffc..6ed3969ee 100644 --- a/src/tests/test-v4l2.c +++ b/src/tests/test-v4l2.c @@ -63,7 +63,7 @@ main (gint argc, gchar *argv[]) loop = g_main_loop_new (NULL, FALSE); - c = pv_context_new ("test-client", NULL); + c = pv_context_new (NULL, "test-client", NULL); g_signal_connect (c, "notify::state", (GCallback) on_state_notify, c); pv_context_connect(c, PV_CONTEXT_FLAGS_NONE);