From b9a0b067beea461efea90a2285b6decb36251d13 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 3 Mar 2017 17:43:23 +0100 Subject: [PATCH] use interfaces in client --- pinos/client/connection.h | 20 +- pinos/client/context.c | 1093 ++++++++++++++++-------- pinos/client/context.h | 16 - pinos/client/interfaces.h | 13 + pinos/client/proxy.c | 42 - pinos/client/proxy.h | 22 +- pinos/client/stream.c | 583 +++++++------ pinos/modules/module-protocol-native.c | 206 ++--- pinos/server/client.h | 7 - pinos/server/resource.c | 3 - 10 files changed, 1163 insertions(+), 842 deletions(-) diff --git a/pinos/client/connection.h b/pinos/client/connection.h index 6493c677b..7e9890ee3 100644 --- a/pinos/client/connection.h +++ b/pinos/client/connection.h @@ -91,7 +91,7 @@ typedef enum { /* PINOS_MESSAGE_CLIENT_UPDATE */ typedef struct { - SpaDict *props; + const SpaDict *props; } PinosMessageClientUpdate; /* PINOS_MESSAGE_SYNC */ @@ -166,11 +166,11 @@ typedef struct { /* PINOS_MESSAGE_CREATE_NODE */ typedef struct { - uint32_t seq; - const char *factory_name; - const char *name; - SpaDict *props; - uint32_t new_id; + uint32_t seq; + const char *factory_name; + const char *name; + const SpaDict *props; + uint32_t new_id; } PinosMessageCreateNode; /* PINOS_MESSAGE_CREATE_NODE_DONE */ @@ -180,10 +180,10 @@ typedef struct { /* PINOS_MESSAGE_CREATE_CLIENT_NODE */ typedef struct { - uint32_t seq; - const char *name; - SpaDict *props; - uint32_t new_id; + uint32_t seq; + const char *name; + const SpaDict *props; + uint32_t new_id; } PinosMessageCreateClientNode; /* PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE */ diff --git a/pinos/client/context.c b/pinos/client/context.c index 6989a8c3b..2dc38f4d2 100644 --- a/pinos/client/context.c +++ b/pinos/client/context.c @@ -37,9 +37,6 @@ typedef struct { SpaSource source; bool disconnecting; - - PinosSendFunc send_func; - void *send_data; } PinosContextImpl; /** @@ -96,326 +93,763 @@ context_set_state (PinosContext *context, } } -static SpaResult -core_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +typedef void (*MarshallFunc) (void *object, void *data, size_t size); + +static void +core_interface_client_update (void *object, + const SpaDict *props) { - PinosContextImpl *impl = data; - PinosContext *this = &impl->this; PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageClientUpdate m = { props }; - switch (type) { - case PINOS_MESSAGE_CORE_INFO: - { - PinosMessageCoreInfo *m = message; - PinosSubscriptionEvent event; - - pinos_log_debug ("got core info %d", type); - - if (proxy->user_data == NULL) - event = PINOS_SUBSCRIPTION_EVENT_NEW; - else - event = PINOS_SUBSCRIPTION_EVENT_CHANGE; - - proxy->user_data = pinos_core_info_update (proxy->user_data, m->info); - - pinos_signal_emit (&this->subscription, - this, - event, - proxy->type, - proxy->id); - break; - } - case PINOS_MESSAGE_NOTIFY_DONE: - { - PinosMessageNotifyDone *nd = message; - - if (nd->seq == 0) { - PinosMessageSync sync; - - sync.seq = 1; - pinos_proxy_send_message (this->core_proxy, - PINOS_MESSAGE_SYNC, - &sync, - true); - } else if (nd->seq == 1) { - context_set_state (this, PINOS_CONTEXT_STATE_CONNECTED, NULL); - } - break; - } - case PINOS_MESSAGE_ERROR: - { - PinosMessageError *m = message; - context_set_state (this, PINOS_CONTEXT_STATE_ERROR, m->error); - break; - } - case PINOS_MESSAGE_REMOVE_ID: - { - PinosMessageRemoveId *m = message; - PinosProxy *proxy; - - proxy = pinos_map_lookup (&this->objects, m->id); - if (proxy) { - pinos_log_debug ("context %p: object remove %u", this, m->id); - pinos_proxy_destroy (proxy); - } - break; - } - - default: - pinos_log_warn ("unhandled message %d", type); - break; - } - return SPA_RESULT_OK; + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_CLIENT_UPDATE, + &m); + pinos_connection_flush (impl->connection); } -static SpaResult -module_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +static void +core_interface_sync (void *object, + uint32_t seq) { - PinosContextImpl *impl = data; - PinosContext *this = &impl->this; PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageSync m = { seq }; - switch (type) { - case PINOS_MESSAGE_MODULE_INFO: - { - PinosMessageModuleInfo *m = message; - PinosSubscriptionEvent event; - - pinos_log_debug ("got module info %d", type); - if (proxy->user_data == NULL) - event = PINOS_SUBSCRIPTION_EVENT_NEW; - else - event = PINOS_SUBSCRIPTION_EVENT_CHANGE; - - proxy->user_data = pinos_module_info_update (proxy->user_data, m->info); - - pinos_signal_emit (&this->subscription, - this, - event, - proxy->type, - proxy->id); - break; - } - - default: - pinos_log_warn ("unhandled message %d", type); - break; - } - return SPA_RESULT_OK; + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_SYNC, + &m); + pinos_connection_flush (impl->connection); } -static SpaResult -node_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +static void +core_interface_get_registry (void *object, + uint32_t seq, + uint32_t new_id) { - PinosContextImpl *impl = data; - PinosContext *this = &impl->this; PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageGetRegistry m = { seq, new_id }; - switch (type) { - case PINOS_MESSAGE_NODE_INFO: - { - PinosMessageNodeInfo *m = message; - PinosSubscriptionEvent event; - - pinos_log_debug ("got node info %d", type); - if (proxy->user_data == NULL) - event = PINOS_SUBSCRIPTION_EVENT_NEW; - else - event = PINOS_SUBSCRIPTION_EVENT_CHANGE; - - proxy->user_data = pinos_node_info_update (proxy->user_data, m->info); - - pinos_signal_emit (&this->subscription, - this, - event, - proxy->type, - proxy->id); - break; - } - default: - pinos_log_warn ("unhandled message %d", type); - break; - } - return SPA_RESULT_OK; + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_GET_REGISTRY, + &m); + pinos_connection_flush (impl->connection); } -static SpaResult -client_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +static void +core_interface_create_node (void *object, + uint32_t seq, + const char *factory_name, + const char *name, + const SpaDict *props, + uint32_t new_id) { - PinosContextImpl *impl = data; - PinosContext *this = &impl->this; PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageCreateNode m = { seq, factory_name, name, props, new_id }; - switch (type) { - case PINOS_MESSAGE_CLIENT_INFO: - { - PinosMessageClientInfo *m = message; - PinosSubscriptionEvent event; - - pinos_log_debug ("got client info %d", type); - if (proxy->user_data == NULL) - event = PINOS_SUBSCRIPTION_EVENT_NEW; - else - event = PINOS_SUBSCRIPTION_EVENT_CHANGE; - - proxy->user_data = pinos_client_info_update (proxy->user_data, m->info); - - pinos_signal_emit (&this->subscription, - this, - event, - proxy->type, - proxy->id); - break; - } - default: - pinos_log_warn ("unhandled message %d", type); - break; - } - return SPA_RESULT_OK; + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_CREATE_NODE, + &m); + pinos_connection_flush (impl->connection); } -static SpaResult -link_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +static void +core_interface_create_client_node (void *object, + uint32_t seq, + const char *name, + const SpaDict *props, + uint32_t new_id) { - PinosContextImpl *impl = data; - PinosContext *this = &impl->this; PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageCreateClientNode m = { seq, name, props, new_id }; - switch (type) { - case PINOS_MESSAGE_LINK_INFO: - { - PinosMessageLinkInfo *m = message; - PinosSubscriptionEvent event; - - pinos_log_debug ("got link info %d", type); - if (proxy->user_data == NULL) - event = PINOS_SUBSCRIPTION_EVENT_NEW; - else - event = PINOS_SUBSCRIPTION_EVENT_CHANGE; - - proxy->user_data = pinos_link_info_update (proxy->user_data, m->info); - - pinos_signal_emit (&this->subscription, - this, - event, - proxy->type, - proxy->id); - break; - } - default: - pinos_log_warn ("unhandled message %d", type); - break; - } - return SPA_RESULT_OK; + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_CREATE_CLIENT_NODE, + &m); + pinos_connection_flush (impl->connection); } -static SpaResult -registry_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +static const PinosCoreInterface core_interface = { + &core_interface_client_update, + &core_interface_sync, + &core_interface_get_registry, + &core_interface_create_node, + &core_interface_create_client_node +}; + +static void +core_marshall_info (void *object, + void *data, + size_t size) { - PinosContextImpl *impl = data; - PinosContext *this = &impl->this; + PinosProxy *proxy = object; + PinosMessageCoreInfo *m = data; + pinos_core_notify_info (proxy, m->info); +} - switch (type) { - case PINOS_MESSAGE_NOTIFY_GLOBAL: - { - PinosMessageNotifyGlobal *ng = message; - PinosProxy *proxy = NULL; +static void +core_marshall_done (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageNotifyDone *m = data; + pinos_core_notify_done (proxy, m->seq); +} - pinos_log_debug ("got global %u %s", ng->id, ng->type); +static void +core_marshall_error (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageError *m = data; + pinos_core_notify_error (proxy, m->id, m->res, m->error); +} - if (!strcmp (ng->type, PINOS_NODE_URI)) { - proxy = pinos_proxy_new (this, - SPA_ID_INVALID, - this->uri.node); - if (proxy == NULL) - goto no_mem; +static void +core_marshall_remove_id (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageRemoveId *m = data; + pinos_core_notify_remove_id (proxy, m->id); +} - pinos_proxy_set_dispatch (proxy, - node_dispatch_func, - impl); - } else if (!strcmp (ng->type, PINOS_MODULE_URI)) { - proxy = pinos_proxy_new (this, - SPA_ID_INVALID, - this->uri.module); - if (proxy == NULL) - goto no_mem; +static const MarshallFunc core_marshall[] = { + [PINOS_MESSAGE_CORE_INFO] = &core_marshall_info, + [PINOS_MESSAGE_NOTIFY_DONE] = &core_marshall_done, + [PINOS_MESSAGE_ERROR] = &core_marshall_error, + [PINOS_MESSAGE_REMOVE_ID] = &core_marshall_remove_id, +}; - pinos_proxy_set_dispatch (proxy, - module_dispatch_func, - impl); - } else if (!strcmp (ng->type, PINOS_CLIENT_URI)) { - proxy = pinos_proxy_new (this, - SPA_ID_INVALID, - this->uri.client); - if (proxy == NULL) - goto no_mem; +static void +core_event_info (void *object, + PinosCoreInfo *info) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + PinosSubscriptionEvent event; - pinos_proxy_set_dispatch (proxy, - client_dispatch_func, - impl); - } else if (!strcmp (ng->type, PINOS_LINK_URI)) { - proxy = pinos_proxy_new (this, - SPA_ID_INVALID, - this->uri.link); - if (proxy == NULL) - goto no_mem; + pinos_log_debug ("got core info"); - pinos_proxy_set_dispatch (proxy, - link_dispatch_func, - impl); - } - if (proxy) { - PinosMessageBind m; + if (proxy->user_data == NULL) + event = PINOS_SUBSCRIPTION_EVENT_NEW; + else + event = PINOS_SUBSCRIPTION_EVENT_CHANGE; - m.id = ng->id; - m.new_id = proxy->id; - pinos_proxy_send_message (this->registry_proxy, - PINOS_MESSAGE_BIND, - &m, - true); - } - break; - } - case PINOS_MESSAGE_NOTIFY_GLOBAL_REMOVE: - { - PinosMessageNotifyGlobalRemove *ng = message; - pinos_log_debug ("got global remove %u", ng->id); + proxy->user_data = pinos_core_info_update (proxy->user_data, info); - pinos_signal_emit (&this->subscription, - this, - PINOS_SUBSCRIPTION_EVENT_REMOVE, - SPA_ID_INVALID, - ng->id); - break; - } - default: - pinos_log_warn ("unhandled message %d", type); - break; + pinos_signal_emit (&this->subscription, + this, + event, + proxy->type, + proxy->id); +} + +static void +core_event_done (void *object, + uint32_t seq) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + + if (seq == 0) { + pinos_core_do_sync (this->core_proxy, 1); + } else if (seq == 1) { + context_set_state (this, PINOS_CONTEXT_STATE_CONNECTED, NULL); } - return SPA_RESULT_OK; +} + +static void +core_event_error (void *object, + uint32_t id, + SpaResult res, + const char *error, ...) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + context_set_state (this, PINOS_CONTEXT_STATE_ERROR, error); +} + +static void +core_event_remove_id (void *object, + uint32_t id) +{ + PinosProxy *core_proxy = object; + PinosContext *this = core_proxy->context; + PinosProxy *proxy; + + proxy = pinos_map_lookup (&this->objects, id); + if (proxy) { + pinos_log_debug ("context %p: object remove %u", this, id); + pinos_proxy_destroy (proxy); + } +} + +static const PinosCoreEvent core_events = { + &core_event_info, + &core_event_done, + &core_event_error, + &core_event_remove_id +}; + +static void +module_marshall_info (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageModuleInfo *m = data; + pinos_module_notify_info (proxy, m->info); +} + +static const MarshallFunc module_marshall[] = { + [PINOS_MESSAGE_MODULE_INFO] = &module_marshall_info, +}; + +static void +module_event_info (void *object, + PinosModuleInfo *info) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + PinosSubscriptionEvent event; + + pinos_log_debug ("got module info"); + + if (proxy->user_data == NULL) + event = PINOS_SUBSCRIPTION_EVENT_NEW; + else + event = PINOS_SUBSCRIPTION_EVENT_CHANGE; + + proxy->user_data = pinos_module_info_update (proxy->user_data, info); + + pinos_signal_emit (&this->subscription, + this, + event, + proxy->type, + proxy->id); +} + +static const PinosModuleEvent module_events = { + &module_event_info, +}; + +static void +node_marshall_done (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageCreateNodeDone *m = data; + pinos_node_notify_done (proxy, m->seq); +} + +static void +node_marshall_info (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageNodeInfo *m = data; + pinos_node_notify_info (proxy, m->info); +} + +static const MarshallFunc node_marshall[] = { + [PINOS_MESSAGE_CREATE_NODE_DONE] = &node_marshall_done, + [PINOS_MESSAGE_NODE_INFO] = &node_marshall_info, +}; + +static void +node_event_done (void *object, + uint32_t seq) +{ +} + +static void +node_event_info (void *object, + PinosNodeInfo *info) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + PinosSubscriptionEvent event; + + pinos_log_debug ("got node info"); + + if (proxy->user_data == NULL) + event = PINOS_SUBSCRIPTION_EVENT_NEW; + else + event = PINOS_SUBSCRIPTION_EVENT_CHANGE; + + proxy->user_data = pinos_node_info_update (proxy->user_data, info); + + pinos_signal_emit (&this->subscription, + this, + event, + proxy->type, + proxy->id); +} + +static const PinosNodeEvent node_events = { + &node_event_done, + &node_event_info +}; + +static void +client_node_interface_update (void *object, + uint32_t change_mask, + unsigned int max_input_ports, + unsigned int max_output_ports, + const SpaProps *props) +{ + PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageNodeUpdate m = { change_mask, max_input_ports, max_output_ports, props }; + + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_NODE_UPDATE, + &m); + pinos_connection_flush (impl->connection); +} + +static void +client_node_interface_port_update (void *object, + SpaDirection direction, + uint32_t port_id, + uint32_t change_mask, + unsigned int n_possible_formats, + SpaFormat **possible_formats, + SpaFormat *format, + const SpaProps *props, + const SpaPortInfo *info) +{ + PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessagePortUpdate m = { direction, port_id, change_mask, n_possible_formats, + possible_formats, format, props, info }; + + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_PORT_UPDATE, + &m); + pinos_connection_flush (impl->connection); +} + +static void +client_node_interface_state_change (void *object, + SpaNodeState state) +{ + PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageNodeStateChange m = { state }; + + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_NODE_STATE_CHANGE, + &m); + pinos_connection_flush (impl->connection); +} + +static void +client_node_interface_event (void *object, + SpaNodeEvent *event) +{ + PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageNodeEvent m = { event }; + + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_NODE_EVENT, + &m); + pinos_connection_flush (impl->connection); +} + +static void +client_node_interface_destroy (void *object, + uint32_t seq) +{ + PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageDestroy m = { seq }; + + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_DESTROY, + &m); + pinos_connection_flush (impl->connection); +} + +const PinosClientNodeInterface client_node_interface = { + &client_node_interface_update, + &client_node_interface_port_update, + &client_node_interface_state_change, + &client_node_interface_event, + &client_node_interface_destroy +}; + +static void +client_node_mashall_done (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageCreateClientNodeDone *m = data; + pinos_client_node_notify_done (proxy, m->seq, m->datafd); +} + +static void +client_node_mashall_event (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageNodeEvent *m = data; + pinos_client_node_notify_event (proxy, m->event); +} + +static void +client_node_mashall_add_port (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageAddPort *m = data; + pinos_client_node_notify_add_port (proxy, m->seq, m->direction, m->port_id); +} + +static void +client_node_mashall_remove_port (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageRemovePort *m = data; + pinos_client_node_notify_remove_port (proxy, m->seq, m->direction, m->port_id); +} + +static void +client_node_mashall_set_format (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageSetFormat *m = data; + pinos_client_node_notify_set_format (proxy, m->seq, m->direction, m->port_id, + m->flags, m->format); +} + +static void +client_node_mashall_set_property (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageSetProperty *m = data; + pinos_client_node_notify_set_property (proxy, m->seq, m->id, m->size, m->value); +} + +static void +client_node_mashall_add_mem (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageAddMem *m = data; + pinos_client_node_notify_add_mem (proxy, + m->direction, + m->port_id, + m->mem_id, + m->type, + m->memfd, + m->flags, + m->offset, + m->size); +} + +static void +client_node_mashall_use_buffers (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageUseBuffers *m = data; + pinos_client_node_notify_use_buffers (proxy, + m->seq, + m->direction, + m->port_id, + m->n_buffers, + m->buffers); +} + +static void +client_node_mashall_node_command (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageNodeCommand *m = data; + pinos_client_node_notify_node_command (proxy, m->seq, m->command); +} + +static void +client_node_mashall_port_command (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessagePortCommand *m = data; + pinos_client_node_notify_port_command (proxy, m->port_id, m->command); +} + +static void +client_node_mashall_transport (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageTransportUpdate *m = data; + pinos_client_node_notify_transport (proxy, m->memfd, m->offset, m->size); +} + +const MarshallFunc client_node_marshall[] = { + [PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE] = &client_node_mashall_done, + [PINOS_MESSAGE_NODE_EVENT] = &client_node_mashall_event, + [PINOS_MESSAGE_ADD_PORT] = &client_node_mashall_add_port, + [PINOS_MESSAGE_REMOVE_PORT] = &client_node_mashall_remove_port, + [PINOS_MESSAGE_SET_FORMAT] = &client_node_mashall_set_format, + [PINOS_MESSAGE_SET_PROPERTY] = &client_node_mashall_set_property, + [PINOS_MESSAGE_ADD_MEM] = &client_node_mashall_add_mem, + [PINOS_MESSAGE_USE_BUFFERS] = &client_node_mashall_use_buffers, + [PINOS_MESSAGE_NODE_COMMAND] = &client_node_mashall_node_command, + [PINOS_MESSAGE_PORT_COMMAND] = &client_node_mashall_port_command, + [PINOS_MESSAGE_TRANSPORT_UPDATE] = &client_node_mashall_transport +}; + +static void +client_marshall_info (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageClientInfo *m = data; + pinos_client_notify_info (proxy, m->info); +} + +static const MarshallFunc client_marshall[] = { + [PINOS_MESSAGE_CLIENT_INFO] = &client_marshall_info, +}; + +static void +client_event_info (void *object, + PinosClientInfo *info) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + PinosSubscriptionEvent event; + + pinos_log_debug ("got client info"); + + if (proxy->user_data == NULL) + event = PINOS_SUBSCRIPTION_EVENT_NEW; + else + event = PINOS_SUBSCRIPTION_EVENT_CHANGE; + + proxy->user_data = pinos_client_info_update (proxy->user_data, info); + + pinos_signal_emit (&this->subscription, + this, + event, + proxy->type, + proxy->id); +} + +static const PinosClientEvent client_events = { + &client_event_info +}; + +static void +link_marshall_info (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageLinkInfo *m = data; + pinos_link_notify_info (proxy, m->info); +} + +static const MarshallFunc link_marshall[] = { + [PINOS_MESSAGE_LINK_INFO] = &link_marshall_info, +}; + +static void +link_event_info (void *object, + PinosLinkInfo *info) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + PinosSubscriptionEvent event; + + pinos_log_debug ("got link info"); + + if (proxy->user_data == NULL) + event = PINOS_SUBSCRIPTION_EVENT_NEW; + else + event = PINOS_SUBSCRIPTION_EVENT_CHANGE; + + proxy->user_data = pinos_link_info_update (proxy->user_data, info); + + pinos_signal_emit (&this->subscription, + this, + event, + proxy->type, + proxy->id); +} + +static const PinosLinkEvent link_events = { + &link_event_info +}; + +static void +registry_marshall_global (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageNotifyGlobal *m = data; + pinos_registry_notify_global (proxy, m->id, m->type); +} + +static void +registry_marshall_global_remove (void *object, + void *data, + size_t size) +{ + PinosProxy *proxy = object; + PinosMessageNotifyGlobalRemove *m = data; + pinos_registry_notify_global_remove (proxy, m->id); +} + +static const MarshallFunc registry_marshall[] = { + [PINOS_MESSAGE_NOTIFY_GLOBAL] = ®istry_marshall_global, + [PINOS_MESSAGE_NOTIFY_GLOBAL_REMOVE] = ®istry_marshall_global_remove, +}; + +static void +registry_event_global (void *object, + uint32_t id, + const char *type) +{ + PinosProxy *registry_proxy = object; + PinosContext *this = registry_proxy->context; + PinosProxy *proxy = NULL; + + pinos_log_debug ("got global %u %s", id, type); + + if (!strcmp (type, PINOS_NODE_URI)) { + proxy = pinos_proxy_new (this, + SPA_ID_INVALID, + this->uri.node); + if (proxy == NULL) + goto no_mem; + + proxy->event = &node_events; + proxy->marshall = &node_marshall; + proxy->interface = NULL; + } else if (!strcmp (type, PINOS_MODULE_URI)) { + proxy = pinos_proxy_new (this, + SPA_ID_INVALID, + this->uri.module); + if (proxy == NULL) + goto no_mem; + + proxy->event = &module_events; + proxy->marshall = &module_marshall; + proxy->interface = NULL; + } else if (!strcmp (type, PINOS_CLIENT_URI)) { + proxy = pinos_proxy_new (this, + SPA_ID_INVALID, + this->uri.client); + if (proxy == NULL) + goto no_mem; + + proxy->event = &client_events; + proxy->marshall = &client_marshall; + proxy->interface = NULL; + } else if (!strcmp (type, PINOS_LINK_URI)) { + proxy = pinos_proxy_new (this, + SPA_ID_INVALID, + this->uri.link); + if (proxy == NULL) + goto no_mem; + + proxy->event = &link_events; + proxy->marshall = &link_marshall; + proxy->interface = NULL; + } + if (proxy) + pinos_registry_do_bind (this->registry_proxy, id, proxy->id); + + return; no_mem: - return SPA_RESULT_NO_MEMORY; + pinos_log_error ("context %p: failed to create proxy", this); + return; } +static void +registry_event_global_remove (void *object, + uint32_t id) +{ + PinosProxy *proxy = object; + PinosContext *this = proxy->context; + + pinos_log_debug ("got global remove %u", id); + + pinos_signal_emit (&this->subscription, + this, + PINOS_SUBSCRIPTION_EVENT_REMOVE, + SPA_ID_INVALID, + id); +} + +static const PinosRegistryEvent registry_events = { + ®istry_event_global, + ®istry_event_global_remove +}; + +static void +registry_bind (void *object, + uint32_t id, + uint32_t new_id) +{ + PinosProxy *proxy = object; + PinosContextImpl *impl = SPA_CONTAINER_OF (proxy->context, PinosContextImpl, this); + PinosMessageBind m = { id, new_id }; + + pinos_connection_add_message (impl->connection, + proxy->id, + PINOS_MESSAGE_BIND, + &m); + pinos_connection_flush (impl->connection); +} + +static const PinosRegistryInterface registry_interface = { + ®istry_bind +}; + static void on_context_data (SpaSource *source, int fd, @@ -441,6 +875,7 @@ on_context_data (SpaSource *source, while (pinos_connection_get_next (conn, &type, &id, &size)) { void *p = alloca (size); PinosProxy *proxy; + const MarshallFunc *marshall; if (!pinos_connection_parse_message (conn, p)) { pinos_log_error ("context %p: failed to parse message", this); @@ -452,33 +887,18 @@ on_context_data (SpaSource *source, pinos_log_error ("context %p: could not find proxy %u", this, id); continue; } - pinos_log_debug ("context %p: object dispatch %u", this, id); + pinos_log_debug ("context %p: object marshall %u, %u", this, id, type); + + marshall = proxy->marshall; + if (marshall[type]) + marshall[type] (proxy, p, size); + else + pinos_log_error ("context %p: function %d not implemented", this, type); - pinos_proxy_dispatch (proxy, type, p); } } } -static SpaResult -context_send_func (void *object, - uint32_t id, - PinosMessageType type, - void *message, - bool flush, - void *data) -{ - PinosContextImpl *impl = SPA_CONTAINER_OF (data, PinosContextImpl, this); - - pinos_connection_add_message (impl->connection, - id, - type, - message); - if (flush) - pinos_connection_flush (impl->connection); - - return SPA_RESULT_OK; -} - /** * pinos_context_new: * @context: a #GMainContext to run in @@ -522,10 +942,6 @@ pinos_context_new (PinosLoop *loop, this->state = PINOS_CONTEXT_STATE_UNCONNECTED; - pinos_context_set_send (this, - context_send_func, - this); - pinos_map_init (&this->objects, 64); spa_list_init (&this->stream_list); @@ -643,8 +1059,6 @@ pinos_context_connect_fd (PinosContext *context, int fd) { PinosContextImpl *impl = SPA_CONTAINER_OF (context, PinosContextImpl, this); - PinosMessageClientUpdate cu; - PinosMessageGetRegistry grm; context_set_state (context, PINOS_CONTEXT_STATE_CONNECTING, NULL); @@ -667,15 +1081,12 @@ pinos_context_connect_fd (PinosContext *context, if (context->core_proxy == NULL) goto no_proxy; - pinos_proxy_set_dispatch (context->core_proxy, - core_dispatch_func, - impl); + context->core_proxy->event = &core_events; + context->core_proxy->interface = &core_interface; + context->core_proxy->marshall = &core_marshall; - cu.props = &context->properties->dict; - pinos_proxy_send_message (context->core_proxy, - PINOS_MESSAGE_CLIENT_UPDATE, - &cu, - true); + pinos_core_do_client_update (context->core_proxy, + &context->properties->dict); context->registry_proxy = pinos_proxy_new (context, SPA_ID_INVALID, @@ -683,16 +1094,13 @@ pinos_context_connect_fd (PinosContext *context, if (context->registry_proxy == NULL) goto no_registry; - pinos_proxy_set_dispatch (context->registry_proxy, - registry_dispatch_func, - impl); + context->registry_proxy->event = ®istry_events; + context->registry_proxy->interface = ®istry_interface; + context->registry_proxy->marshall = ®istry_marshall; - grm.seq = 0; - grm.new_id = context->registry_proxy->id; - pinos_proxy_send_message (context->core_proxy, - PINOS_MESSAGE_GET_REGISTRY, - &grm, - true); + pinos_core_do_get_registry (context->core_proxy, + 0, + context->registry_proxy->id); return true; no_registry: @@ -740,35 +1148,6 @@ pinos_context_disconnect (PinosContext *context) return true; } -void -pinos_context_set_send (PinosContext *context, - PinosSendFunc func, - void *data) -{ - PinosContextImpl *impl = SPA_CONTAINER_OF (context, PinosContextImpl, this); - - impl->send_func = func; - impl->send_data = data; -} - - -SpaResult -pinos_context_send_message (PinosContext *context, - PinosProxy *proxy, - uint32_t opcode, - void *message, - bool flush) -{ - PinosContextImpl *impl = SPA_CONTAINER_OF (context, PinosContextImpl, this); - - if (impl->send_func) - return impl->send_func (proxy, proxy->id, opcode, message, flush, impl->send_data); - - pinos_log_error ("context %p: send func not implemented", context); - - return SPA_RESULT_NOT_IMPLEMENTED; -} - void pinos_context_get_core_info (PinosContext *context, PinosCoreInfoCallback cb, diff --git a/pinos/client/context.h b/pinos/client/context.h index b6fa3e356..6af1bb608 100644 --- a/pinos/client/context.h +++ b/pinos/client/context.h @@ -33,12 +33,6 @@ typedef struct _PinosContext PinosContext; #include #include -typedef SpaResult (*PinosSendFunc) (void *object, - uint32_t id, - uint32_t opcode, - void *message, - bool flush, - void *data); /** * PinosContextState: * @PINOS_CONTEXT_STATE_ERROR: context is in error @@ -99,16 +93,6 @@ PinosContext * pinos_context_new (PinosLoop *loop, PinosProperties *properties); void pinos_context_destroy (PinosContext *context); -void pinos_context_set_send (PinosContext *context, - PinosSendFunc func, - void *data); - -SpaResult pinos_context_send_message (PinosContext *context, - PinosProxy *proxy, - uint32_t opcode, - void *message, - bool flush); - bool pinos_context_connect (PinosContext *context); bool pinos_context_connect_fd (PinosContext *context, int fd); diff --git a/pinos/client/interfaces.h b/pinos/client/interfaces.h index 17467c48f..73cd783e2 100644 --- a/pinos/client/interfaces.h +++ b/pinos/client/interfaces.h @@ -61,6 +61,12 @@ typedef struct { uint32_t new_id); } PinosCoreInterface; +#define pinos_core_do_client_update(r,...) ((PinosCoreInterface*)r->interface)->client_update(r,__VA_ARGS__) +#define pinos_core_do_sync(r,...) ((PinosCoreInterface*)r->interface)->sync(r,__VA_ARGS__) +#define pinos_core_do_get_registry(r,...) ((PinosCoreInterface*)r->interface)->get_registry(r,__VA_ARGS__) +#define pinos_core_do_create_node(r,...) ((PinosCoreInterface*)r->interface)->create_node(r,__VA_ARGS__) +#define pinos_core_do_create_client_node(r,...) ((PinosCoreInterface*)r->interface)->create_client_node(r,__VA_ARGS__) + typedef struct { void (*info) (void *object, PinosCoreInfo *info); @@ -87,6 +93,8 @@ typedef struct { uint32_t new_id); } PinosRegistryInterface; +#define pinos_registry_do_bind(r,...) ((PinosRegistryInterface*)r->interface)->bind(r,__VA_ARGS__) + typedef struct { void (*global) (void *object, uint32_t id, @@ -159,6 +167,11 @@ typedef struct { uint32_t seq); } PinosClientNodeInterface; +#define pinos_client_node_do_update(r,...) ((PinosClientNodeInterface*)r->interface)->update(r,__VA_ARGS__) +#define pinos_client_node_do_port_update(r,...) ((PinosClientNodeInterface*)r->interface)->port_update(r,__VA_ARGS__) +#define pinos_client_node_do_state_change(r,...) ((PinosClientNodeInterface*)r->interface)->state_change(r,__VA_ARGS__) +#define pinos_client_node_do_event(r,...) ((PinosClientNodeInterface*)r->interface)->event(r,__VA_ARGS__) +#define pinos_client_node_do_destroy(r,...) ((PinosClientNodeInterface*)r->interface)->destroy(r,__VA_ARGS__) typedef struct { void (*done) (void *object, diff --git a/pinos/client/proxy.c b/pinos/client/proxy.c index cb35ab1ef..460ea4ebe 100644 --- a/pinos/client/proxy.c +++ b/pinos/client/proxy.c @@ -22,9 +22,6 @@ typedef struct { PinosProxy this; - - PinosDispatchFunc dispatch_func; - void *dispatch_data; } PinosProxyImpl; PinosProxy * @@ -67,42 +64,3 @@ pinos_proxy_destroy (PinosProxy *proxy) pinos_log_debug ("proxy %p: free", proxy); free (impl); } - -void -pinos_proxy_set_dispatch (PinosProxy *proxy, - PinosDispatchFunc func, - void *data) -{ - PinosProxyImpl *impl = SPA_CONTAINER_OF (proxy, PinosProxyImpl, this); - - impl->dispatch_func = func; - impl->dispatch_data = data; -} - -SpaResult -pinos_proxy_send_message (PinosProxy *proxy, - uint32_t opcode, - void *message, - bool flush) -{ - return pinos_context_send_message (proxy->context, - proxy, - opcode, - message, - flush); -} - -SpaResult -pinos_proxy_dispatch (PinosProxy *proxy, - uint32_t opcode, - void *message) -{ - PinosProxyImpl *impl = SPA_CONTAINER_OF (proxy, PinosProxyImpl, this); - - if (impl->dispatch_func) - return impl->dispatch_func (proxy, opcode, message, impl->dispatch_data); - - pinos_log_error ("proxy %p: dispatch func not implemented", proxy); - - return SPA_RESULT_NOT_IMPLEMENTED; -} diff --git a/pinos/client/proxy.h b/pinos/client/proxy.h index f58699976..9c8990f53 100644 --- a/pinos/client/proxy.h +++ b/pinos/client/proxy.h @@ -26,11 +26,6 @@ extern "C" { typedef struct _PinosProxy PinosProxy; - -typedef SpaResult (*PinosDispatchFunc) (void *object, - uint32_t opcode, - void *message, - void *data); #include #include @@ -43,6 +38,10 @@ struct _PinosProxy { void *user_data; + const void *interface; + const void *event; + const void *marshall; + PINOS_SIGNAL (destroy_signal, (PinosListener *listener, PinosProxy *proxy)); }; @@ -52,21 +51,8 @@ PinosProxy * pinos_proxy_new (PinosContext *contex uint32_t type); void pinos_proxy_destroy (PinosProxy *proxy); -void pinos_proxy_set_dispatch (PinosProxy *proxy, - PinosDispatchFunc func, - void *data); - -SpaResult pinos_proxy_send_message (PinosProxy *proxy, - uint32_t opcode, - void *message, - bool flush); - -SpaResult pinos_proxy_dispatch (PinosProxy *proxy, - uint32_t opcode, - void *message); #ifdef __cplusplus } #endif - #endif /* __PINOS_PROXY_H__ */ diff --git a/pinos/client/stream.c b/pinos/client/stream.c index a34ca8b4c..5bcbacd05 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -307,33 +307,29 @@ static void add_node_update (PinosStream *stream, uint32_t change_mask, bool flush) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessageNodeUpdate nu = { 0, }; + unsigned int max_input_ports = 0, max_output_ports = 0; - nu.change_mask = change_mask; if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS) - nu.max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0; + max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0; if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS) - nu.max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0; - nu.props = NULL; + max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0; - pinos_proxy_send_message (impl->node_proxy, - PINOS_MESSAGE_NODE_UPDATE, - &nu, - flush); + pinos_client_node_do_update (impl->node_proxy, + change_mask, + max_input_ports, + max_output_ports, + NULL); } static void add_state_change (PinosStream *stream, SpaNodeState state, bool flush) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessageNodeStateChange sc; if (impl->node_state != state) { - sc.state = impl->node_state = state; - pinos_proxy_send_message (impl->node_proxy, - PINOS_MESSAGE_NODE_STATE_CHANGE, - &sc, - flush); + impl->node_state = state; + pinos_client_node_do_state_change (impl->node_proxy, + state); } } @@ -341,26 +337,16 @@ static void add_port_update (PinosStream *stream, uint32_t change_mask, bool flush) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessagePortUpdate pu = { 0, };; - pu.direction = impl->direction; - pu.port_id = impl->port_id; - pu.change_mask = change_mask; - if (change_mask & PINOS_MESSAGE_PORT_UPDATE_POSSIBLE_FORMATS) { - pu.n_possible_formats = impl->n_possible_formats; - pu.possible_formats = impl->possible_formats; - } - if (change_mask & PINOS_MESSAGE_PORT_UPDATE_FORMAT) { - pu.format = impl->format; - } - pu.props = NULL; - if (change_mask & PINOS_MESSAGE_PORT_UPDATE_INFO) { - pu.info = &impl->port_info; - } - pinos_proxy_send_message (impl->node_proxy, - PINOS_MESSAGE_PORT_UPDATE, - &pu, - flush); + pinos_client_node_do_port_update (impl->node_proxy, + impl->direction, + impl->port_id, + change_mask, + impl->n_possible_formats, + impl->possible_formats, + impl->format, + NULL, + &impl->port_info); } static inline void @@ -401,19 +387,15 @@ static void add_request_clock_update (PinosStream *stream, bool flush) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessageNodeEvent cne; SpaNodeEventRequestClockUpdate rcu; - cne.event = &rcu.event; rcu.event.type = SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE; rcu.event.size = sizeof (rcu); rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME; rcu.timestamp = 0; rcu.offset = 0; - pinos_proxy_send_message (impl->node_proxy, - PINOS_MESSAGE_NODE_EVENT, - &cne, - flush); + pinos_client_node_do_event (impl->node_proxy, + &rcu.event); } static void @@ -423,18 +405,14 @@ add_async_complete (PinosStream *stream, bool flush) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessageNodeEvent cne; SpaNodeEventAsyncComplete ac; - cne.event = &ac.event; ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; ac.event.size = sizeof (ac); ac.seq = seq; ac.res = res; - pinos_proxy_send_message (impl->node_proxy, - PINOS_MESSAGE_NODE_EVENT, - &cne, - flush); + pinos_client_node_do_event (impl->node_proxy, + &ac.event); } static void @@ -678,220 +656,283 @@ handle_node_command (PinosStream *stream, return true; } -static SpaResult -stream_dispatch_func (void *object, - PinosMessageType type, - void *message, - void *data) +static void +client_node_done (void *object, + uint32_t seq, + int datafd) { - PinosStream *stream = data; - PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; - switch (type) { - case PINOS_MESSAGE_CREATE_NODE_DONE: - pinos_log_warn ("create node done %d", type); - break; - - case PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE: - { - PinosMessageCreateClientNodeDone *cnd = message; - - pinos_log_warn ("create client node done %d", type); - handle_socket (stream, cnd->datafd); - do_node_init (stream); - break; - } - - case PINOS_MESSAGE_ADD_PORT: - case PINOS_MESSAGE_REMOVE_PORT: - pinos_log_warn ("add/remove port not supported"); - break; - - case PINOS_MESSAGE_SET_FORMAT: - { - PinosMessageSetFormat *p = message; - void *mem; - - if (impl->format) - free (impl->format); - mem = malloc (pinos_serialize_format_get_size (p->format)); - impl->format = pinos_serialize_format_copy_into (mem, p->format); - - impl->pending_seq = p->seq; - - pinos_signal_emit (&stream->format_changed, stream, impl->format); - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); - break; - } - case PINOS_MESSAGE_SET_PROPERTY: - pinos_log_warn ("set property not implemented"); - break; - - case PINOS_MESSAGE_ADD_MEM: - { - PinosMessageAddMem *p = message; - MemId *m; - - m = find_mem (stream, p->mem_id); - if (m) { - pinos_log_debug ("update mem %u, fd %d, flags %d, off %zd, size %zd", - p->mem_id, p->memfd, p->flags, p->offset, p->size); - clear_memid (m); - } else { - m = pinos_array_add (&impl->mem_ids, sizeof (MemId)); - pinos_log_debug ("add mem %u, fd %d, flags %d, off %zd, size %zd", - p->mem_id, p->memfd, p->flags, p->offset, p->size); - } - m->id = p->mem_id; - m->fd = p->memfd; - m->flags = p->flags; - m->ptr = NULL; - m->offset = p->offset; - m->size = p->size; - break; - } - case PINOS_MESSAGE_USE_BUFFERS: - { - PinosMessageUseBuffers *p = message; - BufferId *bid; - unsigned int i, j, len; - SpaBuffer *b; - - /* clear previous buffers */ - clear_buffers (stream); - - for (i = 0; i < p->n_buffers; i++) { - off_t offset = 0; - - MemId *mid = find_mem (stream, p->buffers[i].mem_id); - if (mid == NULL) { - pinos_log_warn ("unknown memory id %u", p->buffers[i].mem_id); - continue; - } - - if (mid->ptr == NULL) { - mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0); - if (mid->ptr == MAP_FAILED) { - mid->ptr = NULL; - pinos_log_warn ("Failed to mmap memory %zd %p: %s", mid->size, mid, strerror (errno)); - continue; - } - mid->ptr = SPA_MEMBER (mid->ptr, mid->offset, void); - } - len = pinos_array_get_len (&impl->buffer_ids, BufferId); - bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId)); - bid->used = false; - - b = p->buffers[i].buffer; - - bid->buf_ptr = SPA_MEMBER (mid->ptr, p->buffers[i].offset, void); - { - size_t size; - - size = pinos_serialize_buffer_get_size (p->buffers[i].buffer); - b = bid->buf = malloc (size); - pinos_serialize_buffer_copy_into (b, p->buffers[i].buffer); - } - bid->id = b->id; - - if (bid->id != len) { - pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len); - impl->in_order = false; - } - pinos_log_debug ("add buffer %d %d %zd", mid->id, bid->id, p->buffers[i].offset); - - for (j = 0; j < b->n_metas; j++) { - SpaMeta *m = &b->metas[j]; - m->data = SPA_MEMBER (bid->buf_ptr, offset, void); - offset += m->size; - } - - for (j = 0; j < b->n_datas; j++) { - SpaData *d = &b->datas[j]; - - d->chunk = SPA_MEMBER (bid->buf_ptr, offset + sizeof (SpaChunk) * j, SpaChunk); - - switch (d->type) { - case SPA_DATA_TYPE_ID: - { - MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data)); - d->type = SPA_DATA_TYPE_MEMFD; - d->data = NULL; - d->fd = bmid->fd; - pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd); - break; - } - case SPA_DATA_TYPE_MEMPTR: - { - d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void); - d->fd = -1; - pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data); - break; - } - default: - pinos_log_warn ("unknown buffer data type %d", d->type); - break; - } - } - pinos_signal_emit (&stream->add_buffer, stream, bid->id); - } - - if (p->n_buffers) { - add_state_change (stream, SPA_NODE_STATE_PAUSED, false); - } else { - clear_mems (stream); - add_state_change (stream, SPA_NODE_STATE_READY, false); - } - add_async_complete (stream, p->seq, SPA_RESULT_OK, true); - - if (p->n_buffers) - stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL); - else - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); - break; - } - case PINOS_MESSAGE_NODE_EVENT: - { - PinosMessageNodeEvent *p = message; - handle_node_event (stream, p->event); - break; - } - case PINOS_MESSAGE_NODE_COMMAND: - { - PinosMessageNodeCommand *p = message; - handle_node_command (stream, p->seq, p->command); - break; - } - case PINOS_MESSAGE_PORT_COMMAND: - { - break; - } - case PINOS_MESSAGE_TRANSPORT_UPDATE: - { - PinosMessageTransportUpdate *p = message; - PinosTransportInfo info; - - info.memfd = p->memfd; - if (info.memfd == -1) - break; - info.offset = p->offset; - info.size = p->size; - - if (impl->trans) - pinos_transport_destroy (impl->trans); - impl->trans = pinos_transport_new_from_info (&info); - - pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans); - break; - } - default: - case PINOS_MESSAGE_INVALID: - pinos_log_warn ("unhandled message %d", type); - break; - } - return SPA_RESULT_OK; + pinos_log_warn ("create client node done"); + handle_socket (stream, datafd); + do_node_init (stream); } +static void +client_node_event (void *object, + SpaNodeEvent *event) +{ + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; + handle_node_event (stream, event); +} + +static void +client_node_add_port (void *object, + uint32_t seq, + SpaDirection direction, + uint32_t port_id) +{ + pinos_log_warn ("add port not supported"); +} + +static void +client_node_remove_port (void *object, + uint32_t seq, + SpaDirection direction, + uint32_t port_id) +{ + pinos_log_warn ("remove port not supported"); +} + +static void +client_node_set_format (void *object, + uint32_t seq, + SpaDirection direction, + uint32_t port_id, + SpaPortFormatFlags flags, + const SpaFormat *format) +{ + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + void *mem; + + if (impl->format) + free (impl->format); + mem = malloc (pinos_serialize_format_get_size (format)); + impl->format = pinos_serialize_format_copy_into (mem, format); + + impl->pending_seq = seq; + + pinos_signal_emit (&stream->format_changed, stream, impl->format); + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); +} + +static void +client_node_set_property (void *object, + uint32_t seq, + uint32_t id, + size_t size, + void *value) +{ + pinos_log_warn ("set property not implemented"); +} + +static void +client_node_add_mem (void *object, + SpaDirection direction, + uint32_t port_id, + uint32_t mem_id, + SpaDataType type, + int memfd, + uint32_t flags, + off_t offset, + size_t size) +{ + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + MemId *m; + + m = find_mem (stream, mem_id); + if (m) { + pinos_log_debug ("update mem %u, fd %d, flags %d, off %zd, size %zd", + mem_id, memfd, flags, offset, size); + clear_memid (m); + } else { + m = pinos_array_add (&impl->mem_ids, sizeof (MemId)); + pinos_log_debug ("add mem %u, fd %d, flags %d, off %zd, size %zd", + mem_id, memfd, flags, offset, size); + } + m->id = mem_id; + m->fd = memfd; + m->flags = flags; + m->ptr = NULL; + m->offset = offset; + m->size = size; +} + +static void +client_node_use_buffers (void *object, + uint32_t seq, + SpaDirection direction, + uint32_t port_id, + unsigned int n_buffers, + PinosClientNodeBuffer *buffers) +{ + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + BufferId *bid; + unsigned int i, j, len; + SpaBuffer *b; + + /* clear previous buffers */ + clear_buffers (stream); + + for (i = 0; i < n_buffers; i++) { + off_t offset = 0; + + MemId *mid = find_mem (stream, buffers[i].mem_id); + if (mid == NULL) { + pinos_log_warn ("unknown memory id %u", buffers[i].mem_id); + continue; + } + + if (mid->ptr == NULL) { + mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0); + if (mid->ptr == MAP_FAILED) { + mid->ptr = NULL; + pinos_log_warn ("Failed to mmap memory %zd %p: %s", mid->size, mid, strerror (errno)); + continue; + } + mid->ptr = SPA_MEMBER (mid->ptr, mid->offset, void); + } + len = pinos_array_get_len (&impl->buffer_ids, BufferId); + bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId)); + bid->used = false; + + b = buffers[i].buffer; + + bid->buf_ptr = SPA_MEMBER (mid->ptr, buffers[i].offset, void); + { + size_t size; + + size = pinos_serialize_buffer_get_size (buffers[i].buffer); + b = bid->buf = malloc (size); + pinos_serialize_buffer_copy_into (b, buffers[i].buffer); + } + bid->id = b->id; + + if (bid->id != len) { + pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len); + impl->in_order = false; + } + pinos_log_debug ("add buffer %d %d %zd", mid->id, bid->id, buffers[i].offset); + + for (j = 0; j < b->n_metas; j++) { + SpaMeta *m = &b->metas[j]; + m->data = SPA_MEMBER (bid->buf_ptr, offset, void); + offset += m->size; + } + + for (j = 0; j < b->n_datas; j++) { + SpaData *d = &b->datas[j]; + + d->chunk = SPA_MEMBER (bid->buf_ptr, offset + sizeof (SpaChunk) * j, SpaChunk); + + switch (d->type) { + case SPA_DATA_TYPE_ID: + { + MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data)); + d->type = SPA_DATA_TYPE_MEMFD; + d->data = NULL; + d->fd = bmid->fd; + pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd); + break; + } + case SPA_DATA_TYPE_MEMPTR: + { + d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void); + d->fd = -1; + pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data); + break; + } + default: + pinos_log_warn ("unknown buffer data type %d", d->type); + break; + } + } + pinos_signal_emit (&stream->add_buffer, stream, bid->id); + } + + if (n_buffers) { + add_state_change (stream, SPA_NODE_STATE_PAUSED, false); + } else { + clear_mems (stream); + add_state_change (stream, SPA_NODE_STATE_READY, false); + } + add_async_complete (stream, seq, SPA_RESULT_OK, true); + + if (n_buffers) + stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL); + else + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); +} + +static void +client_node_node_command (void *object, + uint32_t seq, + SpaNodeCommand *command) +{ + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; + handle_node_command (stream, seq, command); +} + +static void +client_node_port_command (void *object, + uint32_t port_id, + SpaNodeCommand *command) +{ + pinos_log_warn ("port command not supported"); +} + +static void +client_node_transport (void *object, + int memfd, + off_t offset, + size_t size) +{ + PinosProxy *proxy = object; + PinosStream *stream = proxy->user_data; + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + PinosTransportInfo info; + + info.memfd = memfd; + if (info.memfd == -1) + return; + info.offset = offset; + info.size = size; + + if (impl->trans) + pinos_transport_destroy (impl->trans); + impl->trans = pinos_transport_new_from_info (&info); + + pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans); +} + +static const PinosClientNodeEvent client_node_events = { + &client_node_done, + &client_node_event, + &client_node_add_port, + &client_node_remove_port, + &client_node_set_format, + &client_node_set_property, + &client_node_add_mem, + &client_node_use_buffers, + &client_node_node_command, + &client_node_port_command, + &client_node_transport +}; + +typedef void (*MarshallFunc) (void *object, void *data, size_t size); + +extern const PinosClientNodeInterface client_node_interface; +extern const MarshallFunc client_node_marshall[]; + static void on_node_proxy_destroy (PinosListener *listener, PinosProxy *proxy) @@ -932,7 +973,6 @@ pinos_stream_connect (PinosStream *stream, SpaFormat **possible_formats) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessageCreateClientNode ccn; impl->direction = direction == PINOS_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT; impl->port_id = 0; @@ -961,19 +1001,16 @@ pinos_stream_connect (PinosStream *stream, &impl->node_proxy_destroy, on_node_proxy_destroy); - pinos_proxy_set_dispatch (impl->node_proxy, - stream_dispatch_func, - impl); + impl->node_proxy->user_data = stream; + impl->node_proxy->interface = &client_node_interface; + impl->node_proxy->event = &client_node_events; + impl->node_proxy->marshall = &client_node_marshall; - ccn.seq = ++impl->seq; - ccn.name = "client-node"; - ccn.props = &stream->properties->dict; - ccn.new_id = impl->node_proxy->id; - - pinos_proxy_send_message (stream->context->core_proxy, - PINOS_MESSAGE_CREATE_CLIENT_NODE, - &ccn, - true); + pinos_core_do_create_client_node (stream->context->core_proxy, + ++impl->seq, + "client-node", + &stream->properties->dict, + impl->node_proxy->id); return true; } @@ -1066,17 +1103,13 @@ bool pinos_stream_disconnect (PinosStream *stream) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - PinosMessageDestroy msg; impl->disconnecting = true; unhandle_socket (stream); - msg.seq = ++impl->seq; - pinos_proxy_send_message (impl->node_proxy, - PINOS_MESSAGE_DESTROY, - &msg, - true); + pinos_client_node_do_destroy (impl->node_proxy, + ++impl->seq); return true; } diff --git a/pinos/modules/module-protocol-native.c b/pinos/modules/module-protocol-native.c index f0afc57cf..47ae4d911 100644 --- a/pinos/modules/module-protocol-native.c +++ b/pinos/modules/module-protocol-native.c @@ -103,9 +103,8 @@ core_event_info (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageCoreInfo m; + PinosMessageCoreInfo m = { info }; - m.info = info; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_CORE_INFO, @@ -119,9 +118,8 @@ core_event_done (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageNotifyDone m; + PinosMessageNotifyDone m = { seq }; - m.seq = seq; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_NOTIFY_DONE, @@ -137,17 +135,14 @@ core_event_error (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageError m; char buffer[128]; + PinosMessageError m = { id, res, buffer }; va_list ap; va_start (ap, error); vsnprintf (buffer, sizeof (buffer), error, ap); va_end (ap); - m.id = id; - m.res = res; - m.error = buffer; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_ERROR, @@ -161,9 +156,8 @@ core_event_remove_id (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageRemoveId m; + PinosMessageRemoveId m = { id }; - m.id = id; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_REMOVE_ID, @@ -172,67 +166,62 @@ core_event_remove_id (void *object, } static void -core_client_update (void *object, +core_marshall_client_update (void *object, + void *data, + size_t size) +{ + PinosResource *resource = object; + PinosMessageClientUpdate *m = data; + pinos_core_do_client_update (resource, m->props); +} + +static void +core_marshall_sync (void *object, void *data, size_t size) -{ - PinosResource *resource = object; - PinosMessageClientUpdate *m = data; - - ((PinosCoreInterface*)resource->interface)->client_update (resource, m->props); -} - -static void -core_sync (void *object, - void *data, - size_t size) { PinosResource *resource = object; PinosMessageSync *m = data; - - ((PinosCoreInterface*)resource->interface)->sync (resource, m->seq); + pinos_core_do_sync (resource, m->seq); } static void -core_get_registry (void *object, - void *data, - size_t size) +core_marshall_get_registry (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageGetRegistry *m = data; - - ((PinosCoreInterface*)resource->interface)->get_registry (resource, m->seq, m->new_id); + pinos_core_do_get_registry (resource, m->seq, m->new_id); } static void -core_create_node (void *object, - void *data, - size_t size) +core_marshall_create_node (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageCreateNode *m = data; - - ((PinosCoreInterface*)resource->interface)->create_node (resource, - m->seq, - m->factory_name, - m->name, - m->props, - m->new_id); + pinos_core_do_create_node (resource, + m->seq, + m->factory_name, + m->name, + m->props, + m->new_id); } static void -core_create_client_node (void *object, - void *data, - size_t size) +core_marshall_create_client_node (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageCreateClientNode *m = data; - - ((PinosCoreInterface*)resource->interface)->create_client_node (resource, - m->seq, - m->name, - m->props, - m->new_id); + pinos_core_do_create_client_node (resource, + m->seq, + m->name, + m->props, + m->new_id); } static void @@ -242,10 +231,8 @@ registry_event_global (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageNotifyGlobal m; + PinosMessageNotifyGlobal m = { id, type }; - m.id = id; - m.type = type; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_NOTIFY_GLOBAL, @@ -259,9 +246,8 @@ registry_event_global_remove (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageNotifyGlobalRemove m; + PinosMessageNotifyGlobalRemove m = { id }; - m.id = id; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_NOTIFY_GLOBAL_REMOVE, @@ -270,16 +256,15 @@ registry_event_global_remove (void *object, } static void -registry_bind (void *object, - void *data, - size_t size) +registry_marshall_bind (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageBind *m = data; - - ((PinosRegistryInterface*)resource->interface)->bind (resource, - m->id, - m->new_id); + pinos_registry_do_bind (resource, + m->id, + m->new_id); } static void @@ -288,9 +273,8 @@ module_event_info (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageModuleInfo m; + PinosMessageModuleInfo m = { info }; - m.info = info; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_MODULE_INFO, @@ -304,9 +288,8 @@ node_event_done (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageCreateNodeDone m; + PinosMessageCreateNodeDone m = { seq }; - m.seq = seq; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_CREATE_NODE_DONE, @@ -320,9 +303,8 @@ node_event_info (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageNodeInfo m; + PinosMessageNodeInfo m = { info }; - m.info = info; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_NODE_INFO, @@ -336,9 +318,8 @@ client_event_info (void *object, { PinosResource *resource = object; PinosProtocolNativeClient *client = resource->client->protocol_private; - PinosMessageClientInfo m; + PinosMessageClientInfo m = { info }; - m.info = info; pinos_connection_add_message (client->connection, resource->id, PINOS_MESSAGE_CLIENT_INFO, @@ -533,68 +514,65 @@ client_node_event_transport (void *object, } static void -client_node_update (void *object, - void *data, - size_t size) +client_node_marshall_update (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageNodeUpdate *m = data; - ((PinosClientNodeInterface*)resource->interface)->update (object, - m->change_mask, - m->max_input_ports, - m->max_output_ports, - m->props); + pinos_client_node_do_update (resource, + m->change_mask, + m->max_input_ports, + m->max_output_ports, + m->props); } static void -client_node_port_update (void *object, - void *data, - size_t size) +client_node_marshall_port_update (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessagePortUpdate *m = data; - ((PinosClientNodeInterface*)resource->interface)->port_update (object, - m->direction, - m->port_id, - m->change_mask, - m->n_possible_formats, - m->possible_formats, - m->format, - m->props, - m->info); + pinos_client_node_do_port_update (resource, + m->direction, + m->port_id, + m->change_mask, + m->n_possible_formats, + m->possible_formats, + m->format, + m->props, + m->info); } static void -client_node_state_change (void *object, - void *data, - size_t size) +client_node_marshall_state_change (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageNodeStateChange *m = data; - ((PinosClientNodeInterface*)resource->interface)->state_change (object, - m->state); + pinos_client_node_do_state_change (resource, m->state); } static void -client_node_event (void *object, - void *data, - size_t size) +client_node_marshall_event (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageNodeEvent *m = data; - ((PinosClientNodeInterface*)resource->interface)->event (object, - m->event); + pinos_client_node_do_event (resource, m->event); } static void -client_node_destroy (void *object, - void *data, - size_t size) +client_node_marshall_destroy (void *object, + void *data, + size_t size) { PinosResource *resource = object; PinosMessageDestroy *m = data; - ((PinosClientNodeInterface*)resource->interface)->destroy (object, - m->seq); + pinos_client_node_do_destroy (resource, m->seq); } static void @@ -626,11 +604,11 @@ on_resource_added (PinosListener *listener, &core_event_remove_id }; static const MarshallFunc core_marshall[] = { - [PINOS_MESSAGE_CLIENT_UPDATE] = &core_client_update, - [PINOS_MESSAGE_SYNC] = &core_sync, - [PINOS_MESSAGE_GET_REGISTRY] = &core_get_registry, - [PINOS_MESSAGE_CREATE_NODE] = &core_create_node, - [PINOS_MESSAGE_CREATE_CLIENT_NODE] = &core_create_client_node + [PINOS_MESSAGE_CLIENT_UPDATE] = &core_marshall_client_update, + [PINOS_MESSAGE_SYNC] = &core_marshall_sync, + [PINOS_MESSAGE_GET_REGISTRY] = &core_marshall_get_registry, + [PINOS_MESSAGE_CREATE_NODE] = &core_marshall_create_node, + [PINOS_MESSAGE_CREATE_CLIENT_NODE] = &core_marshall_create_client_node }; resource->event = &core_event; resource->marshall = &core_marshall; @@ -641,7 +619,7 @@ on_resource_added (PinosListener *listener, ®istry_event_global_remove, }; static const MarshallFunc registry_marshall[] = { - [PINOS_MESSAGE_BIND] = ®istry_bind, + [PINOS_MESSAGE_BIND] = ®istry_marshall_bind, }; resource->event = ®istry_event; resource->marshall = ®istry_marshall; @@ -683,11 +661,11 @@ on_resource_added (PinosListener *listener, &client_node_event_transport, }; static const MarshallFunc client_node_marshall[] = { - [PINOS_MESSAGE_NODE_UPDATE] = &client_node_update, - [PINOS_MESSAGE_PORT_UPDATE] = &client_node_port_update, - [PINOS_MESSAGE_NODE_STATE_CHANGE] = &client_node_state_change, - [PINOS_MESSAGE_NODE_EVENT] = &client_node_event, - [PINOS_MESSAGE_DESTROY] = &client_node_destroy, + [PINOS_MESSAGE_NODE_UPDATE] = &client_node_marshall_update, + [PINOS_MESSAGE_PORT_UPDATE] = &client_node_marshall_port_update, + [PINOS_MESSAGE_NODE_STATE_CHANGE] = &client_node_marshall_state_change, + [PINOS_MESSAGE_NODE_EVENT] = &client_node_marshall_event, + [PINOS_MESSAGE_DESTROY] = &client_node_marshall_destroy, }; resource->event = &client_node_events; resource->marshall = &client_node_marshall; diff --git a/pinos/server/client.h b/pinos/server/client.h index 25df7fdaa..40097b6e3 100644 --- a/pinos/server/client.h +++ b/pinos/server/client.h @@ -35,13 +35,6 @@ typedef struct _PinosClient PinosClient; #include #include -typedef SpaResult (*PinosSendFunc) (void *object, - uint32_t id, - uint32_t opcode, - void *message, - bool flush, - void *data); - /** * PinosClient: * diff --git a/pinos/server/resource.c b/pinos/server/resource.c index 6bf06b292..8e38ce121 100644 --- a/pinos/server/resource.c +++ b/pinos/server/resource.c @@ -24,9 +24,6 @@ typedef struct { PinosResource this; - - PinosDispatchFunc dispatch_func; - void *dispatch_data; } PinosResourceImpl; PinosResource *