subscribe: rework some more

Track senders in the subscribe object and aggregate events from all
connected clients. This allows each client to get a complete view of all
the objects of pulsevideo. With all the source objects available, we can
then to the selection of the source on each client.

Remove the CreatSourceOutput on the Client1 object but let the client
select a good source and call CreateSourceOutput directly on the source.
This avoid going through the server to get a connection and the client
can just as well select a source.
Add a state property to the source and make it such that it can do async
state changes.

Remove the source provider object, each client can now directly see the
objects of another clients so there is no need for intermediate objects
in the server.
This commit is contained in:
Wim Taymans 2015-04-27 09:05:14 +02:00
parent 89c7955f3d
commit 417cd76f3e
17 changed files with 603 additions and 388 deletions

View file

@ -26,12 +26,16 @@
struct _PvSubscribePrivate
{
PvSubscriptionState state;
GDBusConnection *connection;
gchar *service;
PvSubscriptionFlags subscription_mask;
GDBusObjectManager *client_manager;
guint pending_subscribes;
GHashTable *senders;
};
@ -45,7 +49,8 @@ enum
PROP_0,
PROP_CONNECTION,
PROP_SERVICE,
PROP_SUBSCRIPTION_MASK
PROP_SUBSCRIPTION_MASK,
PROP_STATE,
};
enum
@ -56,6 +61,161 @@ enum
static guint signals[LAST_SIGNAL] = { 0 };
typedef struct {
PvSubscribe *subscribe;
gchar *sender;
guint id;
PvSubscribe *sender_subscribe;
GList *clients;
} SenderData;
static void
notify_subscription (PvSubscribe *subscribe,
GDBusObject *object,
GDBusInterface *interface,
PvSubscriptionEvent event);
static void
on_sender_subscription_event (PvSubscribe *sender_subscribe,
PvSubscriptionEvent event,
PvSubscriptionFlags flags,
GDBusProxy *object,
gpointer user_data)
{
SenderData *data = user_data;
PvSubscribe *subscribe = data->subscribe;
g_signal_emit (subscribe,
signals[SIGNAL_SUBSCRIPTION_EVENT],
0,
event,
flags,
object);
}
static void
subscription_set_state (PvSubscribe *subscribe, PvSubscriptionState state)
{
PvSubscribePrivate *priv = subscribe->priv;
if (state != priv->state) {
priv->state = state;
g_object_notify (G_OBJECT (subscribe), "state");
}
}
static void
on_sender_subscription_state (GObject *object,
GParamSpec *pspec,
gpointer user_data)
{
SenderData *data = user_data;
PvSubscribe *subscribe = data->subscribe;
PvSubscribePrivate *priv = subscribe->priv;
PvSubscriptionState state;
g_object_get (object, "state", &state, NULL);
switch (state) {
case PV_SUBSCRIPTION_STATE_READY:
if (--priv->pending_subscribes == 0)
subscription_set_state (subscribe, state);
break;
case PV_SUBSCRIPTION_STATE_ERROR:
subscription_set_state (subscribe, state);
break;
default:
break;
}
}
static void
client_name_appeared_handler (GDBusConnection *connection,
const gchar *name,
const gchar *name_owner,
gpointer user_data)
{
SenderData *data = user_data;
/* subscribe to Source events. We want to be notified when this new
* sender add/change/remove sources and outputs */
data->sender_subscribe = pv_subscribe_new ();
g_object_set (data->sender_subscribe, "service", data->sender,
"subscription-mask", PV_SUBSCRIPTION_FLAGS_ALL,
"connection", connection,
NULL);
g_signal_connect (data->sender_subscribe,
"subscription-event",
(GCallback) on_sender_subscription_event,
data);
g_signal_connect (data->sender_subscribe,
"notify::state",
(GCallback) on_sender_subscription_state,
data);
}
static void
remove_client (PvClient1 *client, SenderData *data)
{
g_signal_emit (data->subscribe,
signals[SIGNAL_SUBSCRIPTION_EVENT],
0,
PV_SUBSCRIPTION_EVENT_REMOVE,
PV_SUBSCRIPTION_FLAGS_CLIENT,
client);
}
static void
client_name_vanished_handler (GDBusConnection *connection,
const gchar *name,
gpointer user_data)
{
SenderData *data = user_data;
PvSubscribePrivate *priv = data->subscribe->priv;
g_print ("vanished client %s\n", name);
g_list_foreach (data->clients, (GFunc) remove_client, data);
g_hash_table_remove (priv->senders, data->sender);
if (data->sender_subscribe)
g_object_unref (data->sender_subscribe);
g_free (data->sender);
g_bus_unwatch_name (data->id);
g_free (data);
}
static SenderData *
sender_data_new (PvSubscribe *subscribe, const gchar *sender)
{
PvSubscribePrivate *priv = subscribe->priv;
SenderData *data;
g_print ("watch name %s\n", sender);
data = g_new0 (SenderData, 1);
data->subscribe = subscribe;
data->sender = g_strdup (sender);
data->id = g_bus_watch_name_on_connection (priv->connection,
sender,
G_BUS_NAME_WATCHER_FLAGS_NONE,
client_name_appeared_handler,
client_name_vanished_handler,
data,
NULL);
g_hash_table_insert (priv->senders, data->sender, data);
priv->pending_subscribes++;
return data;
}
static void
notify_subscription (PvSubscribe *subscribe,
GDBusObject *object,
@ -64,29 +224,81 @@ notify_subscription (PvSubscribe *subscribe,
{
PvSubscribePrivate *priv = subscribe->priv;
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_CLIENT) {
if ((interface == NULL && pv_object_peek_client1 (PV_OBJECT (object))) ||
PV_IS_CLIENT1_PROXY (interface))
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_DAEMON) {
PvDaemon1 *daemon;
if (interface == NULL)
daemon = pv_object_peek_daemon1 (PV_OBJECT (object));
else if (PV_IS_DAEMON1_PROXY (interface))
daemon = PV_DAEMON1 (interface);
else
daemon = NULL;
if (daemon) {
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_CLIENT, object);
PV_SUBSCRIPTION_FLAGS_DAEMON, daemon);
}
}
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_SOURCE_PROVIDER) {
if ((interface == NULL && pv_object_peek_source_provider1 (PV_OBJECT (object))) ||
PV_IS_SOURCE_PROVIDER1_PROXY (interface))
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_SOURCE_PROVIDER, object);
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_CLIENT) {
PvClient1 *client;
if (interface == NULL)
client = pv_object_peek_client1 (PV_OBJECT (object));
else if (PV_IS_CLIENT1_PROXY (interface))
client = PV_CLIENT1 (interface);
else
client = NULL;
if (client) {
const gchar *sender;
SenderData *data;
sender = pv_client1_get_name (client);
data = g_hash_table_lookup (priv->senders, sender);
if (data == NULL && event != PV_SUBSCRIPTION_EVENT_REMOVE) {
data = sender_data_new (subscribe, sender);
}
if (data) {
if (event == PV_SUBSCRIPTION_EVENT_NEW)
data->clients = g_list_prepend (data->clients, client);
else if (event == PV_SUBSCRIPTION_EVENT_REMOVE)
data->clients = g_list_remove (data->clients, client);
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_CLIENT, client);
}
}
}
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_SOURCE) {
if ((interface == NULL && pv_object_peek_source1 (PV_OBJECT (object))) ||
PV_IS_SOURCE1_PROXY (interface))
PvSource1 *source;
if (interface == NULL)
source = pv_object_peek_source1 (PV_OBJECT (object));
else if (PV_IS_SOURCE1_PROXY (interface))
source = PV_SOURCE1 (interface);
else
source = NULL;
if (source) {
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_SOURCE, object);
PV_SUBSCRIPTION_FLAGS_SOURCE, source);
}
}
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT) {
if ((interface == NULL && pv_object_peek_source_output1 (PV_OBJECT (object))) ||
PV_IS_SOURCE_OUTPUT1_PROXY (interface))
PvSourceOutput1 *output;
if (interface == NULL)
output = pv_object_peek_source_output1 (PV_OBJECT (object));
else if PV_IS_SOURCE_OUTPUT1_PROXY (interface)
output = PV_SOURCE_OUTPUT1 (interface);
else
output = NULL;
if (output) {
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT, object);
PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT, output);
}
}
}
@ -184,13 +396,20 @@ on_client_manager_ready (GObject *source_object,
if (priv->client_manager == NULL)
goto manager_error;
g_print ("client manager %s %s\n",
g_dbus_object_manager_client_get_name (G_DBUS_OBJECT_MANAGER_CLIENT (priv->client_manager)),
g_dbus_object_manager_client_get_name_owner (G_DBUS_OBJECT_MANAGER_CLIENT (priv->client_manager)));
connect_client_signals (subscribe);
objects = g_dbus_object_manager_get_objects (G_DBUS_OBJECT_MANAGER (priv->client_manager));
for (walk = objects; walk ; walk = g_list_next (walk)) {
on_client_manager_object_added (G_DBUS_OBJECT_MANAGER (priv->client_manager),
walk->data,
subscribe);
}
connect_client_signals (subscribe);
if (--priv->pending_subscribes == 0)
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_READY);
return;
@ -199,6 +418,7 @@ manager_error:
{
g_warning ("could not create client manager: %s", error->message);
g_clear_error (&error);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_ERROR);
return;
}
}
@ -208,6 +428,9 @@ install_subscription (PvSubscribe *subscribe)
{
PvSubscribePrivate *priv = subscribe->priv;
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_CONNECTING);
g_print ("new client manager for %s\n", priv->service);
pv_object_manager_client_new (priv->connection,
G_DBUS_OBJECT_MANAGER_CLIENT_FLAGS_NONE,
priv->service,
@ -215,6 +438,7 @@ install_subscription (PvSubscribe *subscribe)
NULL,
on_client_manager_ready,
subscribe);
priv->pending_subscribes++;
}
static void
@ -223,6 +447,7 @@ uninstall_subscription (PvSubscribe *subscribe)
PvSubscribePrivate *priv = subscribe->priv;
g_clear_object (&priv->client_manager);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_UNCONNECTED);
}
static void
@ -247,6 +472,10 @@ pv_subscribe_get_property (GObject *_object,
g_value_set_flags (value, priv->subscription_mask);
break;
case PROP_STATE:
g_value_set_enum (value, priv->state);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (subscribe, prop_id, pspec);
break;
@ -296,6 +525,7 @@ pv_subscribe_finalize (GObject * object)
g_free (priv->service);
g_object_unref (priv->client_manager);
g_hash_table_unref (priv->senders);
G_OBJECT_CLASS (pv_subscribe_parent_class)->finalize (object);
}
@ -352,6 +582,20 @@ pv_subscribe_class_init (PvSubscribeClass * klass)
0,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
/**
* PvSubscribe:state
*
* The state of the subscription
*/
g_object_class_install_property (gobject_class,
PROP_STATE,
g_param_spec_enum ("state",
"State",
"The state",
PV_TYPE_SUBSCRIPTION_STATE,
PV_SUBSCRIPTION_STATE_UNCONNECTED,
G_PARAM_READABLE |
G_PARAM_STATIC_STRINGS));
/**
* PvSubscribe:subscription-event
* @subscribe: The #PvSubscribe emitting the signal.
@ -372,7 +616,7 @@ pv_subscribe_class_init (PvSubscribeClass * klass)
3,
PV_TYPE_SUBSCRIPTION_EVENT,
PV_TYPE_SUBSCRIPTION_FLAGS,
G_TYPE_DBUS_OBJECT_PROXY);
G_TYPE_DBUS_PROXY);
}
static void
@ -381,6 +625,8 @@ pv_subscribe_init (PvSubscribe * subscribe)
PvSubscribePrivate *priv = subscribe->priv = PV_SUBSCRIBE_GET_PRIVATE (subscribe);
priv->service = g_strdup (PV_DBUS_SERVICE);
priv->senders = g_hash_table_new (g_str_hash, g_str_equal);
priv->state = PV_SUBSCRIPTION_STATE_UNCONNECTED;
}
/**