From af3de36416d01db436293913c261ffe1d28d91cd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 21 Jul 2016 18:38:24 +0200 Subject: [PATCH] work on stream negotiation and start Add more buffer types to add and remove memory shared memory between the server and client. We would like to send buffers only once and then simply reference them by index. Do format negotiation and stream start with a START message. --- pinos/Makefile.am | 1 + pinos/client/buffer.c | 220 ++++++++++++++---- pinos/client/buffer.h | 129 ++++++++--- pinos/client/fdmanager.c | 37 +++ pinos/client/fdmanager.h | 3 + pinos/client/pinos.h | 1 + pinos/client/stream.c | 141 ++++++++++-- pinos/dbus/org.pinos.xml | 18 +- pinos/gst/gstpinosdepay.c | 52 ++++- pinos/gst/gstpinosdepay.h | 1 + pinos/gst/gstpinospay.c | 36 +-- pinos/gst/gstpinosportsink.c | 38 ++-- pinos/gst/gstpinosportsrc.c | 67 ++++-- pinos/gst/gstpinosportsrc.h | 1 + pinos/gst/gstpinossink.c | 55 +++-- pinos/gst/gstpinossink.h | 4 +- pinos/gst/gstpinossocketsink.c | 46 ++-- pinos/gst/gstpinossrc.c | 80 ++++--- pinos/gst/gstpinossrc.h | 1 + pinos/server/channel.c | 186 +++++++-------- pinos/server/link.c | 401 +++++++++++++++++++++++++++++++++ pinos/server/link.h | 75 ++++++ pinos/server/port.c | 2 +- 23 files changed, 1246 insertions(+), 349 deletions(-) create mode 100644 pinos/server/link.c create mode 100644 pinos/server/link.h diff --git a/pinos/Makefile.am b/pinos/Makefile.am index b1f1200e2..6c9eca56f 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -208,6 +208,7 @@ libpinoscore_@PINOS_MAJORMINOR@_la_SOURCES = \ server/channel.c server/channel.h \ server/client.c server/client.h \ server/daemon.c server/daemon.h \ + server/link.c server/link.h \ server/node.c server/node.h \ server/port.c server/port.h \ server/node-factory.c server/node-factory.h \ diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index 63f54f79d..ec7f088f8 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -459,8 +459,8 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, sb->buf.max_size = sizeof (PinosStackHeader) + 128; sb->buf.data = g_malloc (sb->buf.max_size); sb->buf.free_data = sb->buf.data; -// g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, -// builder, max_data, sb->buf.max_size); + g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, + builder, max_data, sb->buf.max_size); } else { sb->buf.max_size = max_data; sb->buf.data = data; @@ -581,8 +581,8 @@ pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, if (sb->buf.n_fds >= sb->buf.max_fds) { gint new_size = sb->buf.max_fds + 8; -// g_warning ("builder %p: realloc buffer fds %d -> %d", -// builder, sb->buf.max_fds, new_size); + g_warning ("builder %p: realloc buffer fds %d -> %d", + builder, sb->buf.max_fds, new_size); sb->buf.max_fds = new_size; sb->buf.free_fds = g_realloc (sb->buf.free_fds, new_size * sizeof (int)); sb->buf.fds = sb->buf.free_fds; @@ -599,8 +599,8 @@ builder_ensure_size (struct stack_builder *sb, gsize size) { if (sb->buf.size + size > sb->buf.max_size) { gsize new_size = sb->buf.size + MAX (size, 1024); -// g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, -// sb, sb->buf.max_size, new_size); + g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, + sb, sb->buf.max_size, new_size); sb->buf.max_size = new_size; sb->buf.free_data = g_realloc (sb->buf.free_data, new_size); sb->sh = sb->buf.data = sb->buf.free_data; @@ -634,20 +634,144 @@ builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size) return p; } +/** + * pinos_buffer_builder_add_empty: + * @builder: a #PinosBufferBuilder + * @type: a #PinosPacketType + * + * Add an empty packet of @type. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_empty (PinosBufferBuilder *builder, + PinosPacketType type) +{ + struct stack_builder *sb = PPSB (builder); + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + builder_add_packet (sb, type, 0); + + return TRUE; +} + +/** + * pinos_buffer_iter_parse_add_mem: + * @iter: a #PinosBufferIter + * @payload: a #PinosPacketAddMem + * + * Get the #PinosPacketAddMem. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_ADD_MEM + * + * Returns: %TRUE if @payload contains valid data. + */ +gboolean +pinos_buffer_iter_parse_add_mem (PinosBufferIter *iter, + PinosPacketAddMem *payload) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_ADD_MEM, FALSE); + + if (si->size < sizeof (PinosPacketAddMem)) + return FALSE; + + memcpy (payload, si->data, sizeof (*payload)); + + return TRUE; +} + +/** + * pinos_buffer_builder_add_add_mem: + * @builder: a #PinosBufferBuilder + * @payload: a #PinosPacketAddMem + * + * Add a #PINOS_PACKET_TYPE_ADD_MEM to @builder with data from @payload. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_add_mem (PinosBufferBuilder *builder, + PinosPacketAddMem *payload) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketAddMem *p; + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + p = builder_add_packet (sb, PINOS_PACKET_TYPE_ADD_MEM, sizeof (PinosPacketAddMem)); + memcpy (p, payload, sizeof (*payload)); + + return TRUE; +} + +/** + * pinos_buffer_iter_parse_remove_mem: + * @iter: a #PinosBufferIter + * @payload: a #PinosPacketRemoveMem + * + * Get the #PinosPacketRemoveMem. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_REMOVE_MEM + * + * Returns: %TRUE if @payload contains valid data. + */ +gboolean +pinos_buffer_iter_parse_remove_mem (PinosBufferIter *iter, + PinosPacketRemoveMem *payload) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_REMOVE_MEM, FALSE); + + if (si->size < sizeof (PinosPacketRemoveMem)) + return FALSE; + + memcpy (payload, si->data, sizeof (*payload)); + + return TRUE; +} + +/** + * pinos_buffer_builder_add_remove_mem: + * @builder: a #PinosBufferBuilder + * @payload: a #PinosPacketRemoveMem + * + * Add a #PINOS_PACKET_TYPE_REMOVE_MEM to @builder with data from @payload. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_remove_mem (PinosBufferBuilder *builder, + PinosPacketRemoveMem *payload) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketRemoveMem *p; + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + p = builder_add_packet (sb, PINOS_PACKET_TYPE_REMOVE_MEM, sizeof (PinosPacketRemoveMem)); + memcpy (p, payload, sizeof (*payload)); + + return TRUE; +} + /* header packets */ /** - * pinos_buffer_iter_get_header: + * pinos_buffer_iter_parse_header: * @iter: a #PinosBufferIter - * @header: a #PinosPacketHeader + * @payload: a #PinosPacketHeader * * Get the #PinosPacketHeader. @iter must be positioned on a packet of * type #PINOS_PACKET_TYPE_HEADER * - * Returns: %TRUE if @header contains valid data. + * Returns: %TRUE if @payload contains valid data. */ gboolean pinos_buffer_iter_parse_header (PinosBufferIter *iter, - PinosPacketHeader *header) + PinosPacketHeader *payload) { struct stack_iter *si = PPSI (iter); @@ -657,7 +781,7 @@ pinos_buffer_iter_parse_header (PinosBufferIter *iter, if (si->size < sizeof (PinosPacketHeader)) return FALSE; - memcpy (header, si->data, sizeof (*header)); + memcpy (payload, si->data, sizeof (*payload)); return TRUE; } @@ -665,48 +789,48 @@ pinos_buffer_iter_parse_header (PinosBufferIter *iter, /** * pinos_buffer_builder_add_header: * @builder: a #PinosBufferBuilder - * @header: a #PinosPacketHeader + * @payload: a #PinosPacketHeader * - * Add a #PINOS_PACKET_TYPE_HEADER to @builder with data from @header. + * Add a #PINOS_PACKET_TYPE_HEADER to @builder with data from @payload. * * Returns: %TRUE on success. */ gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *builder, - PinosPacketHeader *header) + PinosPacketHeader *payload) { struct stack_builder *sb = PPSB (builder); - PinosPacketHeader *h; + PinosPacketHeader *p; g_return_val_if_fail (is_valid_builder (builder), FALSE); - h = builder_add_packet (sb, PINOS_PACKET_TYPE_HEADER, sizeof (PinosPacketHeader)); - memcpy (h, header, sizeof (*header)); + p = builder_add_packet (sb, PINOS_PACKET_TYPE_HEADER, sizeof (PinosPacketHeader)); + memcpy (p, payload, sizeof (*payload)); return TRUE; } -/* fd-payload packets */ +/* process-mem packets */ /** - * pinos_buffer_iter_get_fd_payload: + * pinos_buffer_iter_parse_process_mem: * @iter: a #PinosBufferIter - * @payload: a #PinosPacketFDPayload + * @payload: a #PinosPacketProcessMem * - * Get the #PinosPacketFDPayload. @iter must be positioned on a packet of - * type #PINOS_PACKET_TYPE_FD_PAYLOAD + * Get the #PinosPacketProcessMem. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_PROCESS_MEM * * Returns: %TRUE if @payload contains valid data. */ gboolean -pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, - PinosPacketFDPayload *payload) +pinos_buffer_iter_parse_process_mem (PinosBufferIter *iter, + PinosPacketProcessMem *payload) { struct stack_iter *si = PPSI (iter); g_return_val_if_fail (is_valid_iter (iter), FALSE); - g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_FD_PAYLOAD, FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_PROCESS_MEM, FALSE); - if (si->size < sizeof (PinosPacketFDPayload)) + if (si->size < sizeof (PinosPacketProcessMem)) return FALSE; memcpy (payload, si->data, sizeof (*payload)); @@ -715,49 +839,49 @@ pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, } /** - * pinos_buffer_builder_add_fd_payload: + * pinos_buffer_builder_add_process_mem: * @builder: a #PinosBufferBuilder - * @payload: a #PinosPacketFDPayload + * @payload: a #PinosPacketProcessMem * - * Add a #PINOS_PACKET_TYPE_FD_PAYLOAD to @builder with data from @payload. + * Add a #PINOS_PACKET_TYPE_PROCESS_MEM to @builder with data from @payload. * * Returns: %TRUE on success. */ gboolean -pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder, - PinosPacketFDPayload *payload) +pinos_buffer_builder_add_process_mem (PinosBufferBuilder *builder, + PinosPacketProcessMem *payload) { struct stack_builder *sb = PPSB (builder); - PinosPacketFDPayload *p; + PinosPacketProcessMem *p; g_return_val_if_fail (is_valid_builder (builder), FALSE); g_return_val_if_fail (payload->size > 0, FALSE); - p = builder_add_packet (sb, PINOS_PACKET_TYPE_FD_PAYLOAD, sizeof (PinosPacketFDPayload)); + p = builder_add_packet (sb, PINOS_PACKET_TYPE_PROCESS_MEM, sizeof (PinosPacketProcessMem)); memcpy (p, payload, sizeof (*payload)); return TRUE; } /** - * pinos_buffer_iter_parse_release_fd_payload: + * pinos_buffer_iter_parse_reuse_mem: * @iter: a #PinosBufferIter - * @payload: a #PinosPacketReleaseFDPayload + * @payload: a #PinosPacketReuseMem * - * Parse a #PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD packet from @iter into @payload. + * Parse a #PINOS_PACKET_TYPE_REUSE_MEM packet from @iter into @payload. * * Returns: %TRUE on success */ gboolean -pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, - PinosPacketReleaseFDPayload *payload) +pinos_buffer_iter_parse_reuse_mem (PinosBufferIter *iter, + PinosPacketReuseMem *payload) { struct stack_iter *si = PPSI (iter); g_return_val_if_fail (is_valid_iter (iter), FALSE); - g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD, FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_REUSE_MEM, FALSE); - if (si->size < sizeof (PinosPacketReleaseFDPayload)) + if (si->size < sizeof (PinosPacketReuseMem)) return FALSE; memcpy (payload, si->data, sizeof (*payload)); @@ -766,26 +890,26 @@ pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, } /** - * pinos_buffer_builder_add_release_fd_payload: + * pinos_buffer_builder_add_reuse_mem: * @builder: a #PinosBufferBuilder - * @payload: a #PinosPacketReleaseFDPayload + * @payload: a #PinosPacketReuseMem * - * Add a #PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD payload in @payload to @builder. + * Add a #PINOS_PACKET_TYPE_REUSE_MEM payload in @payload to @builder. * * Returns: %TRUE on success */ gboolean -pinos_buffer_builder_add_release_fd_payload (PinosBufferBuilder *builder, - PinosPacketReleaseFDPayload *payload) +pinos_buffer_builder_add_reuse_mem (PinosBufferBuilder *builder, + PinosPacketReuseMem *payload) { struct stack_builder *sb = PPSB (builder); - PinosPacketReleaseFDPayload *p; + PinosPacketReuseMem *p; g_return_val_if_fail (is_valid_builder (builder), FALSE); p = builder_add_packet (sb, - PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD, - sizeof (PinosPacketReleaseFDPayload)); + PINOS_PACKET_TYPE_REUSE_MEM, + sizeof (PinosPacketReuseMem)); memcpy (p, payload, sizeof (*payload)); return TRUE; diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index 3750c780e..b035d305f 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -75,11 +75,16 @@ gint * pinos_buffer_steal_fds (PinosBuffer *buffer, * @PINOS_PACKET_TYPE_INVALID: invalid packet type, ignore * @PINOS_PACKET_TYPE_CONTINUATION: continuation packet, used internally to send * commands using a shared memory region. + * @PINOS_PACKET_TYPE_REGISTER_MEM: register memory region + * @PINOS_PACKET_TYPE_RELEASE_MEM: release memory region + * @PINOS_PACKET_TYPE_START: start transfer + * @PINOS_PACKET_TYPE_STOP: stop transfer + * * @PINOS_PACKET_TYPE_HEADER: common packet header - * @PINOS_PACKET_TYPE_FD_PAYLOAD: packet contains fd-payload. An fd-payload contains - * the media data as a file descriptor - * @PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: packet contains release fd-payload. Notifies - * that a previously received fd-payload is no longer in use. + * @PINOS_PACKET_TYPE_PROCESS_MEM: packet contains mem-payload. An mem-payload contains + * the media data as the index of a shared memory region + * @PINOS_PACKET_TYPE_REUSE_MEM: when a memory region has been consumed and is ready to + * be reused. * @PINOS_PACKET_TYPE_FORMAT_CHANGE: a format change. * @PINOS_PACKET_TYPE_PROPERTY_CHANGE: one or more property changes. * @PINOS_PACKET_TYPE_REFRESH_REQUEST: ask for a new keyframe @@ -89,13 +94,21 @@ gint * pinos_buffer_steal_fds (PinosBuffer *buffer, typedef enum { PINOS_PACKET_TYPE_INVALID = 0, - PINOS_PACKET_TYPE_CONTINUATION = 1, - PINOS_PACKET_TYPE_HEADER = 2, - PINOS_PACKET_TYPE_FD_PAYLOAD = 3, - PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD = 4, - PINOS_PACKET_TYPE_FORMAT_CHANGE = 5, - PINOS_PACKET_TYPE_PROPERTY_CHANGE = 6, - PINOS_PACKET_TYPE_REFRESH_REQUEST = 7, + PINOS_PACKET_TYPE_CONTINUATION, + PINOS_PACKET_TYPE_ADD_MEM, + PINOS_PACKET_TYPE_REMOVE_MEM, + PINOS_PACKET_TYPE_START, + PINOS_PACKET_TYPE_STREAMING, + PINOS_PACKET_TYPE_STOP, + PINOS_PACKET_TYPE_STOPPED, + PINOS_PACKET_TYPE_DRAIN, + PINOS_PACKET_TYPE_DRAINED, + PINOS_PACKET_TYPE_HEADER, + PINOS_PACKET_TYPE_PROCESS_MEM, + PINOS_PACKET_TYPE_REUSE_MEM, + PINOS_PACKET_TYPE_FORMAT_CHANGE, + PINOS_PACKET_TYPE_PROPERTY_CHANGE, + PINOS_PACKET_TYPE_REFRESH_REQUEST, } PinosPacketType; @@ -139,8 +152,51 @@ void pinos_buffer_builder_clear (PinosBufferBuilder *builder) void pinos_buffer_builder_end (PinosBufferBuilder *builder, PinosBuffer *buffer); +gboolean pinos_buffer_builder_add_empty (PinosBufferBuilder *builder, + PinosPacketType type); + gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, int fd); +/* add-mem packets */ +/** + * PinosPacketAddMem: + * @id: the unique id of this memory block + * @type: the memory block type + * @fd_index: the index of the fd with the data + * @offset: the offset of the data + * @size: the size of the data + * + * A Packet that contains a memory block used for data transfer. + */ +typedef struct { + guint32 id; + guint32 type; + gint32 fd_index; + guint64 offset; + guint64 size; +} PinosPacketAddMem; + +gboolean pinos_buffer_iter_parse_add_mem (PinosBufferIter *iter, + PinosPacketAddMem *payload); +gboolean pinos_buffer_builder_add_add_mem (PinosBufferBuilder *builder, + PinosPacketAddMem *payload); + +/* remove-mem packets */ +/** + * PinosPacketRemoveMem: + * @id: the unique id of the memory block + * + * Remove a memory block. + */ +typedef struct { + guint32 id; +} PinosPacketRemoveMem; + +gboolean pinos_buffer_iter_parse_remove_mem (PinosBufferIter *iter, + PinosPacketRemoveMem *payload); +gboolean pinos_buffer_builder_add_remove_mem (PinosBufferBuilder *builder, + PinosPacketRemoveMem *payload); + /* header packets */ /** * PinosPacketHeader @@ -159,15 +215,15 @@ typedef struct { } PinosPacketHeader; gboolean pinos_buffer_iter_parse_header (PinosBufferIter *iter, - PinosPacketHeader *header); + PinosPacketHeader *payload); gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *builder, - PinosPacketHeader *header); + PinosPacketHeader *payload); -/* fd-payload packets */ + +/* process-mem packets */ /** - * PinosPacketFDPayload: - * @id: the unique id of this payload - * @fd_index: the index of the fd with the data + * PinosPacketProcessMem: + * @id: the mem index to process * @offset: the offset of the data * @size: the size of the data * @@ -176,31 +232,32 @@ gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *buil */ typedef struct { guint32 id; - gint32 fd_index; guint64 offset; guint64 size; -} PinosPacketFDPayload; +} PinosPacketProcessMem; -gboolean pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, - PinosPacketFDPayload *payload); -gboolean pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder, - PinosPacketFDPayload *payload); +gboolean pinos_buffer_iter_parse_process_mem (PinosBufferIter *iter, + PinosPacketProcessMem *payload); +gboolean pinos_buffer_builder_add_process_mem (PinosBufferBuilder *builder, + PinosPacketProcessMem *payload); -/* release fd-payload packets */ +/* reuse-mem packets */ /** - * PinosPacketReleaseFDPayload: - * @id: the unique id of the fd-payload to release + * PinosPacketReuseMem: + * @id: the unique id of the memory block to reuse * * Release the payload with @id */ typedef struct { guint32 id; -} PinosPacketReleaseFDPayload; + guint64 offset; + guint64 size; +} PinosPacketReuseMem; -gboolean pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, - PinosPacketReleaseFDPayload *payload); -gboolean pinos_buffer_builder_add_release_fd_payload (PinosBufferBuilder *builder, - PinosPacketReleaseFDPayload *payload); +gboolean pinos_buffer_iter_parse_reuse_mem (PinosBufferIter *iter, + PinosPacketReuseMem *payload); +gboolean pinos_buffer_builder_add_reuse_mem (PinosBufferBuilder *builder, + PinosPacketReuseMem *payload); /* format change packets */ @@ -216,9 +273,9 @@ typedef struct { const gchar *format; } PinosPacketFormatChange; -gboolean pinos_buffer_iter_parse_format_change (PinosBufferIter *iter, +gboolean pinos_buffer_iter_parse_format_change (PinosBufferIter *iter, PinosPacketFormatChange *payload); -gboolean pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, +gboolean pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, PinosPacketFormatChange *payload); @@ -235,10 +292,10 @@ typedef struct { const gchar *value; } PinosPacketPropertyChange; -gboolean pinos_buffer_iter_parse_property_change (PinosBufferIter *iter, - guint idx, +gboolean pinos_buffer_iter_parse_property_change (PinosBufferIter *iter, + guint idx, PinosPacketPropertyChange *payload); -gboolean pinos_buffer_builder_add_property_change (PinosBufferBuilder *builder, +gboolean pinos_buffer_builder_add_property_change (PinosBufferBuilder *builder, PinosPacketPropertyChange *payload); /* refresh request packets */ diff --git a/pinos/client/fdmanager.c b/pinos/client/fdmanager.c index 3175d780c..b7d67afa7 100644 --- a/pinos/client/fdmanager.c +++ b/pinos/client/fdmanager.c @@ -202,6 +202,43 @@ wrong_object: return FALSE; } } +/** + * pinos_fd_manager_find: + * @manager: a #PinosFdManager + * @client: a client id + * @id: an id + * + * find the object associated with the id and client from @manager. + * + * Returns: the object or %NULL + */ +gpointer +pinos_fd_manager_find (PinosFdManager *manager, + const gchar *client, guint32 id) +{ + PinosFdManagerPrivate *priv; + ObjectId *oid; + ClientIds *cids; + + g_return_val_if_fail (PINOS_IS_FD_MANAGER (manager), FALSE); + g_return_val_if_fail (client != NULL, FALSE); + + priv = manager->priv; + + g_mutex_lock (&priv->lock); + oid = g_hash_table_lookup (priv->object_ids, GINT_TO_POINTER (id)); + if (oid) { + cids = g_hash_table_lookup (priv->client_ids, client); + if (cids) { + GList *find = g_list_find (cids->ids, oid); + if (find) + return oid->obj; + } + } + g_mutex_unlock (&priv->lock); + + return NULL; +} /** * pinos_fd_manager_remove: diff --git a/pinos/client/fdmanager.h b/pinos/client/fdmanager.h index 25730701b..5332bfece 100644 --- a/pinos/client/fdmanager.h +++ b/pinos/client/fdmanager.h @@ -67,6 +67,9 @@ gboolean pinos_fd_manager_add (PinosFdManager *manager, gboolean pinos_fd_manager_remove (PinosFdManager *manager, const gchar *client, guint32 id); +gpointer pinos_fd_manager_find (PinosFdManager *manager, + const gchar *client, + guint32 id); gboolean pinos_fd_manager_remove_all (PinosFdManager *manager, const gchar *client); diff --git a/pinos/client/pinos.h b/pinos/client/pinos.h index f0a3c52cd..00ed54b38 100644 --- a/pinos/client/pinos.h +++ b/pinos/client/pinos.h @@ -35,6 +35,7 @@ #define PINOS_DBUS_OBJECT_SERVER PINOS_DBUS_OBJECT_PREFIX "/server" #define PINOS_DBUS_OBJECT_CLIENT PINOS_DBUS_OBJECT_PREFIX "/client" #define PINOS_DBUS_OBJECT_NODE PINOS_DBUS_OBJECT_PREFIX "/node" +#define PINOS_DBUS_OBJECT_LINK PINOS_DBUS_OBJECT_PREFIX "/link" void pinos_init (int *argc, char **argv[]); diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 3031a827e..7dc130636 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -65,6 +65,11 @@ struct _PinosStreamPrivate PinosBuffer recv_buffer; guint8 recv_data[MAX_BUFFER_SIZE]; int recv_fds[MAX_FDS]; + + guint8 send_data[MAX_BUFFER_SIZE]; + int send_fds[MAX_FDS]; + + GHashTable *mem_ids; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -91,6 +96,8 @@ enum static guint signals[LAST_SIGNAL] = { 0 }; +static void unhandle_socket (PinosStream *stream); + static void pinos_stream_get_property (GObject *_object, guint prop_id, @@ -546,6 +553,89 @@ channel_failed: } } +static gboolean +parse_buffer (PinosStream *stream, + PinosBuffer *pbuf) +{ + PinosBufferIter it; + PinosStreamPrivate *priv = stream->priv; + + pinos_buffer_iter_init (&it, pbuf); + while (pinos_buffer_iter_next (&it)) { + PinosPacketType type = pinos_buffer_iter_get_type (&it); + + switch (type) { + case PINOS_PACKET_TYPE_ADD_MEM: + { + PinosPacketAddMem p; + int fd; + + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) + break; + + fd = pinos_buffer_get_fd (pbuf, p.fd_index); + if (fd == -1) + break; + +// g_hash_table_insert (priv->mem_ids, GINT_TO_POINTER (p.id), NULL); + break; + } + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + break; + +// g_hash_table_remove (priv->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + case PINOS_PACKET_TYPE_FORMAT_CHANGE: + { + PinosPacketFormatChange p; + + if (!pinos_buffer_iter_parse_format_change (&it, &p)) + break; + + if (priv->format) + g_bytes_unref (priv->format); + priv->format = g_bytes_new (p.format, strlen (p.format) + 1); + g_object_notify (G_OBJECT (stream), "format"); + break; + } + case PINOS_PACKET_TYPE_STREAMING: + { + stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); + break; + } + case PINOS_PACKET_TYPE_STOPPED: + { + unhandle_socket (stream); + + g_clear_pointer (&priv->format, g_bytes_unref); + g_object_notify (G_OBJECT (stream), "format"); + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + break; + } + case PINOS_PACKET_TYPE_HEADER: + { + break; + } + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + break; + } + default: + g_warning ("unhandled packet %d", type); + break; + } + } + pinos_buffer_iter_end (&it); + + return TRUE; +} + static gboolean on_socket_condition (GSocket *socket, GIOCondition condition, @@ -572,6 +662,8 @@ on_socket_condition (GSocket *socket, return TRUE; } + parse_buffer (stream, buffer); + priv->buffer = buffer; g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); priv->buffer = NULL; @@ -841,18 +933,25 @@ static gboolean do_start (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; + PinosBufferBuilder builder; + PinosPacketFormatChange fc; + PinosBuffer pbuf; + GError *error = NULL; handle_socket (stream, priv->fd); - g_dbus_proxy_call (priv->channel, - "Start", - g_variant_new ("(s)", - priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"), - G_DBUS_CALL_FLAGS_NONE, - -1, - NULL, /* GCancellable *cancellable */ - on_stream_started, - stream); + pinos_stream_buffer_builder_init (stream, &builder); + fc.id = 0; + fc.format = priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"; + pinos_buffer_builder_add_format_change (&builder, &fc); + pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_START); + pinos_buffer_builder_end (&builder, &pbuf); + + if (!pinos_io_write_buffer (priv->fd, &pbuf, &error)) { + g_warning ("stream %p: failed to read buffer: %s", stream, error->message); + g_clear_error (&error); + } + g_object_unref (stream); return FALSE; } @@ -936,16 +1035,11 @@ call_failed: static gboolean do_stop (PinosStream *stream) { - PinosStreamPrivate *priv = stream->priv; + PinosBufferBuilder builder; - g_dbus_proxy_call (priv->channel, - "Stop", - g_variant_new ("()"), - G_DBUS_CALL_FLAGS_NONE, - -1, - NULL, /* GCancellable *cancellable */ - on_stream_stopped, - stream); + pinos_stream_buffer_builder_init (stream, &builder); + pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_STOP); + g_object_unref (stream); return FALSE; } @@ -1091,9 +1185,16 @@ pinos_stream_peek_buffer (PinosStream *stream) void pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *builder) { - g_return_if_fail (PINOS_IS_STREAM (stream)); + PinosStreamPrivate *priv; - pinos_buffer_builder_init (builder); + g_return_if_fail (PINOS_IS_STREAM (stream)); + priv = stream->priv; + + pinos_buffer_builder_init_into (builder, + priv->send_data, + MAX_BUFFER_SIZE, + priv->send_fds, + MAX_FDS); } /** diff --git a/pinos/dbus/org.pinos.xml b/pinos/dbus/org.pinos.xml index b1ab9b9d5..0d9633b8a 100644 --- a/pinos/dbus/org.pinos.xml +++ b/pinos/dbus/org.pinos.xml @@ -62,6 +62,7 @@ + @@ -114,23 +115,6 @@ - - - - - - - -