Registry: implement registry

Make GET_REGISTRY method to create a registry resource, send global
added and removed to this resource.
Use map for storing proxies and resources.
This commit is contained in:
Wim Taymans 2016-11-30 19:09:09 +01:00
parent 3dada4731c
commit 7c29209023
15 changed files with 221 additions and 59 deletions

View file

@ -710,10 +710,16 @@ pinos_connection_parse_message (PinosConnection *conn,
memcpy (message, conn->in.data, sizeof (PinosMessageNotifyDone));
break;
case PINOS_MESSAGE_SUBSCRIBE:
if (conn->in.size < sizeof (PinosMessageSubscribe))
case PINOS_MESSAGE_GET_REGISTRY:
if (conn->in.size < sizeof (PinosMessageGetRegistry))
return false;
memcpy (message, conn->in.data, sizeof (PinosMessageSubscribe));
memcpy (message, conn->in.data, sizeof (PinosMessageGetRegistry));
break;
case PINOS_MESSAGE_BIND:
if (conn->in.size < sizeof (PinosMessageBind))
return false;
memcpy (message, conn->in.data, sizeof (PinosMessageBind));
break;
case PINOS_MESSAGE_NOTIFY_GLOBAL:
@ -749,6 +755,17 @@ pinos_connection_parse_message (PinosConnection *conn,
d->datafd = connection_get_fd (conn, d->datafd);
break;
}
case PINOS_MESSAGE_DESTROY:
if (conn->in.size < sizeof (PinosMessageDestroy))
return false;
memcpy (message, conn->in.data, sizeof (PinosMessageDestroy));
break;
case PINOS_MESSAGE_DESTROY_DONE:
if (conn->in.size < sizeof (PinosMessageDestroyDone))
return false;
memcpy (message, conn->in.data, sizeof (PinosMessageDestroyDone));
break;
/* C -> S */
case PINOS_MESSAGE_NODE_UPDATE:
@ -865,9 +882,14 @@ pinos_connection_add_message (PinosConnection *conn,
memcpy (p, message, sizeof (PinosMessageNotifyDone));
break;
case PINOS_MESSAGE_SUBSCRIBE:
p = connection_add_message (conn, dest_id, type, sizeof (PinosMessageSubscribe));
memcpy (p, message, sizeof (PinosMessageSubscribe));
case PINOS_MESSAGE_GET_REGISTRY:
p = connection_add_message (conn, dest_id, type, sizeof (PinosMessageGetRegistry));
memcpy (p, message, sizeof (PinosMessageGetRegistry));
break;
case PINOS_MESSAGE_BIND:
p = connection_add_message (conn, dest_id, type, sizeof (PinosMessageBind));
memcpy (p, message, sizeof (PinosMessageBind));
break;
case PINOS_MESSAGE_NOTIFY_GLOBAL:
@ -900,6 +922,16 @@ pinos_connection_add_message (PinosConnection *conn,
d->datafd = connection_add_fd (conn, d->datafd);
break;
}
case PINOS_MESSAGE_DESTROY:
p = connection_add_message (conn, dest_id, type, sizeof (PinosMessageDestroy));
memcpy (p, message, sizeof (PinosMessageDestroy));
break;
case PINOS_MESSAGE_DESTROY_DONE:
p = connection_add_message (conn, dest_id, type, sizeof (PinosMessageDestroyDone));
memcpy (p, message, sizeof (PinosMessageDestroyDone));
break;
/* C -> S */
case PINOS_MESSAGE_NODE_UPDATE:

View file

@ -37,8 +37,9 @@ typedef enum {
PINOS_MESSAGE_SYNC,
PINOS_MESSAGE_NOTIFY_DONE,
PINOS_MESSAGE_GET_REGISTRY,
PINOS_MESSAGE_SUBSCRIBE,
PINOS_MESSAGE_BIND,
PINOS_MESSAGE_NOTIFY_GLOBAL,
PINOS_MESSAGE_NOTIFY_GLOBAL_REMOVE,
@ -48,6 +49,9 @@ typedef enum {
PINOS_MESSAGE_CREATE_CLIENT_NODE,
PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE,
PINOS_MESSAGE_DESTROY,
PINOS_MESSAGE_DESTROY_DONE,
/* client to server */
PINOS_MESSAGE_NODE_UPDATE,
PINOS_MESSAGE_PORT_UPDATE,
@ -84,10 +88,17 @@ typedef struct {
uint32_t seq;
} PinosMessageNotifyDone;
/* PINOS_MESSAGE_SUBSCRIBE */
/* PINOS_MESSAGE_GET_REGISTRY */
typedef struct {
uint32_t seq;
} PinosMessageSubscribe;
uint32_t new_id;
} PinosMessageGetRegistry;
/* PINOS_MESSAGE_BIND */
typedef struct {
uint32_t id;
uint32_t new_id;
} PinosMessageBind;
/* PINOS_MESSAGE_NOTIFY_GLOBAL */
typedef struct {
@ -106,7 +117,7 @@ typedef struct {
const char *factory_name;
const char *name;
SpaDict *props;
uint32_t id;
uint32_t new_id;
} PinosMessageCreateNode;
/* PINOS_MESSAGE_CREATE_NODE_DONE */
@ -119,7 +130,7 @@ typedef struct {
uint32_t seq;
const char *name;
SpaDict *props;
uint32_t id;
uint32_t new_id;
} PinosMessageCreateClientNode;
/* PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE */
@ -128,6 +139,18 @@ typedef struct {
int datafd;
} PinosMessageCreateClientNodeDone;
/* PINOS_MESSAGE_DESTROY */
typedef struct {
uint32_t seq;
uint32_t id;
} PinosMessageDestroy;
/* PINOS_MESSAGE_DESTROY_DONE */
typedef struct {
uint32_t seq;
uint32_t id;
} PinosMessageDestroyDone;
/* PINOS_MESSAGE_NODE_UPDATE */
typedef struct {
#define PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS (1 << 0)

View file

@ -113,12 +113,32 @@ core_dispatch_func (void *object,
context_set_state (context, PINOS_CONTEXT_STATE_CONNECTED, NULL);
break;
}
default:
pinos_log_warn ("unhandled message %d", type);
break;
}
return SPA_RESULT_OK;
}
static SpaResult
registry_dispatch_func (void *object,
PinosMessageType type,
void *message,
void *data)
{
switch (type) {
case PINOS_MESSAGE_NOTIFY_GLOBAL:
{
PinosMessageNotifyGlobal *ng = message;
pinos_log_debug ("got global %u %s", ng->id, ng->type);
break;
}
case PINOS_MESSAGE_NOTIFY_GLOBAL_REMOVE:
{
PinosMessageNotifyGlobalRemove *ng = message;
pinos_log_debug ("got global remove %u", ng->id);
break;
}
default:
pinos_log_warn ("unhandled message %d", type);
break;
@ -292,7 +312,7 @@ pinos_context_connect (PinosContext *context)
socklen_t size;
const char *runtime_dir, *name = NULL;
int name_size, fd;
PinosMessageSubscribe sm;
PinosMessageGetRegistry grm;
context_set_state (context, PINOS_CONTEXT_STATE_CONNECTING, NULL);
@ -349,10 +369,17 @@ pinos_context_connect (PinosContext *context)
context->core_proxy->dispatch_func = core_dispatch_func;
context->core_proxy->dispatch_data = context;
sm.seq = 0;
context->registry_proxy = pinos_proxy_new (context,
SPA_ID_INVALID,
0);
context->registry_proxy->dispatch_func = registry_dispatch_func;
context->registry_proxy->dispatch_data = context;
grm.seq = 0;
grm.new_id = context->registry_proxy->id;
pinos_proxy_send_message (context->core_proxy,
PINOS_MESSAGE_SUBSCRIBE,
&sm,
PINOS_MESSAGE_GET_REGISTRY,
&grm,
true);
return true;
}

View file

@ -63,6 +63,7 @@ struct _PinosContext {
PinosLoop *loop;
PinosProxy *core_proxy;
PinosProxy *registry_proxy;
PinosMap objects;

View file

@ -45,7 +45,8 @@ struct _PinosMap {
#define pinos_map_get_size(m) pinos_array_get_len (&(m)->items, PinosMapItem)
#define pinos_map_get_item(m,id) pinos_array_get_unchecked(&(m)->items,id,PinosMapItem)
#define pinos_map_item_is_free(m,id) (pinos_map_get_item(m,id)->next & 0x1)
#define pinos_map_item_is_free(item) ((item)->next & 0x1)
#define pinos_map_id_is_free(m,id) (pinos_map_item_is_free (pinos_map_get_item(m,id)))
#define pinos_map_check_id(m,id) ((id) < pinos_map_get_size (m))
#define pinos_map_has_item(m,id) (pinos_map_check_id(m,id) && !pinos_map_item_is_free(m, id))
#define pinos_map_lookup_unchecked(m,id) pinos_map_get_item(m,id)->data
@ -104,6 +105,19 @@ pinos_map_lookup (PinosMap *map,
return NULL;
}
static inline void
pinos_map_for_each (PinosMap *map,
void (*func) (void *, void *),
void *data)
{
PinosMapItem *item;
pinos_array_for_each (item, &map->items) {
if (!pinos_map_item_is_free (item))
func (item->data, data);
}
}
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -607,7 +607,10 @@ stream_dispatch_func (void *object,
switch (type) {
case PINOS_MESSAGE_SYNC:
case PINOS_MESSAGE_SUBSCRIBE:
case PINOS_MESSAGE_GET_REGISTRY:
case PINOS_MESSAGE_BIND:
case PINOS_MESSAGE_DESTROY:
case PINOS_MESSAGE_DESTROY_DONE:
case PINOS_MESSAGE_CREATE_NODE:
case PINOS_MESSAGE_CREATE_CLIENT_NODE:
case PINOS_MESSAGE_NODE_UPDATE:
@ -899,7 +902,7 @@ pinos_stream_connect (PinosStream *stream,
items[0].key = "pinos.target.node";
items[0].value = port_path;
ccn.props = &dict;
ccn.id = impl->node_proxy->id;
ccn.new_id = impl->node_proxy->id;
pinos_proxy_send_message (stream->context->core_proxy,
PINOS_MESSAGE_CREATE_CLIENT_NODE,