From c67d3d7f04ef686c5cd4fe88ed7c52a89bdd50c6 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 9 May 2016 18:48:18 +0200 Subject: [PATCH] buffer: allow building into custom memory Make it possible to pass custom memory to the builder so that it can construct packets directly into specified memory and avoid allocs. Remove GError and GSocketControlMessage in the buffer API to make it possible to use other (rt-safe) API later. --- pinos/client/buffer.c | 202 ++++++++++++++++++--------------- pinos/client/buffer.h | 25 ++-- pinos/client/private.h | 10 +- pinos/client/stream.c | 70 ++++++++---- pinos/gst/gstpinosdepay.c | 14 +-- pinos/gst/gstpinospay.c | 37 +++--- pinos/gst/gstpinossink.c | 18 +-- pinos/gst/gstpinossocketsink.c | 32 ++++-- pinos/gst/gstpinossrc.c | 7 +- 9 files changed, 240 insertions(+), 175 deletions(-) diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index 56f6c2955..7d1943d91 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -17,11 +17,9 @@ * Boston, MA 02110-1301, USA. */ -#include #include #include -#include #include "pinos/client/properties.h" #include "pinos/client/context.h" @@ -35,26 +33,32 @@ G_STATIC_ASSERT (sizeof (PinosStackBuffer) <= sizeof (PinosBuffer)); * @buffer: a #PinosBuffer * @data: data * @size: size of @data - * @message: (transfer full): a #GSocketControlMessage + * @fds: file descriptors + * @n_fds: number of file descriptors * - * Initialize @buffer with @data and @size and @message. @data and size - * must not be modified until pinos_buffer_clear() is called. - * - * Ownership is taken of @message. + * Initialize @buffer with @data and @size and @fds and @n_fds. + * The memory pointer to by @data and @fds becomes property of @buffer + * and should not be freed or modified until pinos_buffer_clear() is + * called. */ void pinos_buffer_init_data (PinosBuffer *buffer, gpointer data, gsize size, - GSocketControlMessage *message) + gint *fds, + gint n_fds) { PinosStackBuffer *sb = PSB (buffer); sb->magic = PSB_MAGIC; sb->data = data; sb->size = size; - sb->allocated_size = 0; - sb->message = message; + sb->max_size = size; + sb->free_data = NULL; + sb->fds = fds; + sb->n_fds = n_fds; + sb->max_fds = n_fds; + sb->free_fds = NULL; } void @@ -65,11 +69,8 @@ pinos_buffer_clear (PinosBuffer *buffer) g_return_if_fail (is_valid_buffer (buffer)); sb->magic = 0; - if (sb->allocated_size) - g_free (sb->data); - sb->size = 0; - sb->allocated_size = 0; - g_clear_object (&sb->message); + g_free (sb->free_data); + g_free (sb->free_fds); } /** @@ -97,83 +98,89 @@ pinos_buffer_get_version (PinosBuffer *buffer) * pinos_buffer_get_fd: * @buffer: a #PinosBuffer * @index: an index - * @error: a #GError or %NULL * * Get the file descriptor at @index in @buffer. * - * Returns: a file descriptor ar @index in @buffer. The file descriptor is - * duplicated using dup() and set as close-on-exec before being returned. - * You must call close() on it when you are done. -1 is returned on error and - * @error is set. + * Returns: a file descriptor at @index in @buffer. The file descriptor + * is not duplicated in any way. -1 is returned on error. */ int -pinos_buffer_get_fd (PinosBuffer *buffer, gint index, GError **error) +pinos_buffer_get_fd (PinosBuffer *buffer, gint index) { PinosStackBuffer *sb = PSB (buffer); - GUnixFDList *fds; g_return_val_if_fail (is_valid_buffer (buffer), -1); - g_return_val_if_fail (sb->message != NULL, -1); - if (g_socket_control_message_get_msg_type (sb->message) != SCM_RIGHTS) - goto not_found; - - fds = g_unix_fd_message_get_fd_list (G_UNIX_FD_MESSAGE (sb->message)); - if (fds == NULL) - goto not_found; - - if (g_unix_fd_list_get_length (fds) <= index) - goto not_found; - - return g_unix_fd_list_get (fds, index, error); - - /* ERRORS */ -not_found: - { - if (error) - *error = g_error_new (G_IO_ERROR, - G_IO_ERROR_NOT_FOUND, - "Buffer does not have any fd at index %d", index); + if (sb->fds == NULL || sb->n_fds < index) return -1; - } + + return sb->fds[index]; } /** - * pinos_buffer_steal: + * pinos_buffer_steal_data: * @buffer: a #PinosBuffer - * @size: output size - * @message: output #GSocketControlMessage + * @size: output size or %NULL to ignore * - * Take the data and control message from @buffer. + * Take the data from @buffer. * * Returns: the data of @buffer. */ gpointer -pinos_buffer_steal (PinosBuffer *buffer, - gsize *size, - GSocketControlMessage **message) +pinos_buffer_steal_data (PinosBuffer *buffer, + gsize *size) { PinosStackBuffer *sb = PSB (buffer); gpointer data; g_return_val_if_fail (is_valid_buffer (buffer), 0); + data = sb->data; if (size) *size = sb->size; - if (message) - *message = sb->message; - data = sb->data; - - sb->magic = 0; + if (sb->data != sb->free_data) + g_free (sb->free_data); sb->data = NULL; + sb->free_data = NULL; sb->size = 0; - sb->allocated_size = 0; - sb->message = NULL; + sb->max_size = 0; return data; } +/** + * pinos_buffer_steal_fds: + * @buffer: a #PinosBuffer + * @n_fds: number of fds + * + * Take the fds from @buffer. + * + * Returns: the fds of @buffer. + */ +gint * +pinos_buffer_steal_fds (PinosBuffer *buffer, + gint *n_fds) +{ + PinosStackBuffer *sb = PSB (buffer); + gint *fds; + + g_return_val_if_fail (is_valid_buffer (buffer), 0); + + fds = sb->fds; + if (n_fds) + *n_fds = sb->n_fds; + + if (sb->fds != sb->free_fds) + g_free (sb->free_fds); + sb->fds = NULL; + sb->free_fds = NULL; + sb->n_fds = 0; + sb->max_fds = 0; + + return fds; +} + /** * PinosBufferIter: * @@ -336,8 +343,6 @@ struct stack_builder { PinosPacketType type; gsize offset; - - guint n_fds; }; G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosBufferBuilder)); @@ -352,12 +357,20 @@ G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosBufferBuilder)); * pinos_buffer_builder_init_full: * @builder: a #PinosBufferBuilder * @version: a version + * @data: data to build into or %NULL to allocate + * @max_data: allocated size of @data + * @fds: memory for fds + * @max_fds: maximum number of fds in @fds * * Initialize a stack allocated @builder and set the @version. */ void pinos_buffer_builder_init_full (PinosBufferBuilder *builder, - guint32 version) + guint32 version, + gpointer data, + gsize max_data, + gint *fds, + gint max_fds) { struct stack_builder *sb = PPSB (builder); PinosStackHeader *sh; @@ -365,10 +378,22 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, g_return_if_fail (builder != NULL); sb->magic = PPSB_MAGIC; - sb->buf.allocated_size = sizeof (PinosStackHeader) + 128; - sb->buf.data = g_malloc (sb->buf.allocated_size); + + if (max_data < sizeof (PinosStackHeader) || data == NULL) { + sb->buf.max_size = sizeof (PinosStackHeader) + 128; + sb->buf.data = g_malloc (sb->buf.max_size); + sb->buf.free_data = sb->buf.data; + } else { + sb->buf.max_size = max_data; + sb->buf.data = data; + sb->buf.free_data = NULL; + } sb->buf.size = sizeof (PinosStackHeader); - sb->buf.message = NULL; + + sb->buf.fds = fds; + sb->buf.max_fds = max_fds; + sb->buf.n_fds = 0; + sb->buf.free_fds = NULL; sh = sb->sh = sb->buf.data; sh->version = version; @@ -376,7 +401,6 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, sb->type = 0; sb->offset = 0; - sb->n_fds = 0; } /** @@ -397,7 +421,8 @@ pinos_buffer_builder_clear (PinosBufferBuilder *builder) g_return_if_fail (is_valid_builder (builder)); sb->magic = 0; - g_free (sb->buf.data); + g_free (sb->buf.free_data); + g_free (sb->buf.free_fds); } /** @@ -421,60 +446,59 @@ pinos_buffer_builder_end (PinosBufferBuilder *builder, g_return_if_fail (is_valid_builder (builder)); g_return_if_fail (buffer != NULL); + sb->magic = 0; sb->sh->length = sb->buf.size - sizeof (PinosStackHeader); sbuf->magic = PSB_MAGIC; sbuf->data = sb->buf.data; sbuf->size = sb->buf.size; - sbuf->allocated_size = sb->buf.allocated_size; - sbuf->message = sb->buf.message; + sbuf->max_size = sb->buf.max_size; + sbuf->free_data = sb->buf.free_data; - sb->buf.data = NULL; - sb->buf.size = 0; - sb->buf.allocated_size = 0; - sb->buf.message = NULL; - sb->magic = 0; + sbuf->fds = sb->buf.fds; + sbuf->n_fds = sb->buf.n_fds; + sbuf->max_fds = sb->buf.max_fds; + sbuf->free_fds = sb->buf.free_fds; } /** * pinos_buffer_builder_add_fd: * @builder: a #PinosBufferBuilder * @fd: a valid fd - * @error: a #GError or %NULL * * Add the file descriptor @fd to @builder. * - * Returns: the index of the file descriptor in @builder. The file descriptor - * is duplicated using dup(). You keep your copy of the descriptor and the copy - * contained in @buffer will be closed when @buffer is freed. - * -1 is returned on error and @error is set. + * Returns: the index of the file descriptor in @builder. */ gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, - int fd, - GError **error) + int fd) { struct stack_builder *sb = PPSB (builder); + gint index; g_return_val_if_fail (is_valid_builder (builder), -1); g_return_val_if_fail (fd > 0, -1); - if (sb->buf.message == NULL) - sb->buf.message = g_unix_fd_message_new (); + if (sb->buf.n_fds >= sb->buf.max_fds) { + sb->buf.max_fds += 8; + sb->buf.free_fds = g_realloc (sb->buf.free_fds, sb->buf.max_fds * sizeof (int)); + sb->buf.fds = sb->buf.free_fds; + } + index = sb->buf.n_fds; + sb->buf.fds[index] = fd; + sb->buf.n_fds++; - if (!g_unix_fd_message_append_fd ((GUnixFDMessage*)sb->buf.message, fd, error)) - return -1; - - return sb->n_fds++; + return index; } static gpointer builder_ensure_size (struct stack_builder *sb, gsize size) { - if (sb->buf.size + size > sb->buf.allocated_size) { - sb->buf.allocated_size = sb->buf.size + MAX (size, 1024); - sb->buf.data = g_realloc (sb->buf.data, sb->buf.allocated_size); - sb->sh = sb->buf.data; + if (sb->buf.size + size > sb->buf.max_size) { + sb->buf.max_size = sb->buf.size + MAX (size, 1024); + sb->buf.free_data = g_realloc (sb->buf.free_data, sb->buf.max_size); + sb->sh = sb->buf.data = sb->buf.free_data; } return (guint8 *) sb->buf.data + sb->buf.size; } diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index 8e4dffaa9..b828ad029 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -39,18 +39,19 @@ struct _PinosBuffer { void pinos_buffer_init_data (PinosBuffer *buffer, gpointer data, gsize size, - GSocketControlMessage *message); + gint *fds, + gint n_fds); void pinos_buffer_clear (PinosBuffer *buffer); guint32 pinos_buffer_get_version (PinosBuffer *buffer); int pinos_buffer_get_fd (PinosBuffer *buffer, - gint index, - GError **error); + gint index); -gpointer pinos_buffer_steal (PinosBuffer *buffer, - gsize *size, - GSocketControlMessage **message); +gpointer pinos_buffer_steal_data (PinosBuffer *buffer, + gsize *size); +gint * pinos_buffer_steal_fds (PinosBuffer *buffer, + gint *n_fds); /** @@ -107,16 +108,20 @@ struct _PinosBufferBuilder { }; void pinos_buffer_builder_init_full (PinosBufferBuilder *builder, - guint32 version); -#define pinos_buffer_builder_init(b) pinos_buffer_builder_init_full(b, PINOS_BUFFER_VERSION); + guint32 version, + gpointer data, + gsize max_data, + gint *fds, + gint max_fds); +#define pinos_buffer_builder_init_into(b,d,md,f,mf) pinos_buffer_builder_init_full(b, PINOS_BUFFER_VERSION,d,md,f,mf); +#define pinos_buffer_builder_init(b) pinos_buffer_builder_init_into(b, NULL, 0, NULL, 0); void pinos_buffer_builder_clear (PinosBufferBuilder *builder); void pinos_buffer_builder_end (PinosBufferBuilder *builder, PinosBuffer *buffer); gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, - int fd, - GError **error); + int fd); /* header packets */ /** * PinosPacketHeader diff --git a/pinos/client/private.h b/pinos/client/private.h index f0956c4f7..78e83432e 100644 --- a/pinos/client/private.h +++ b/pinos/client/private.h @@ -63,10 +63,14 @@ typedef struct { } PinosStackHeader; typedef struct { - gsize allocated_size; - gsize size; gpointer data; - GSocketControlMessage *message; + gsize size; + gsize max_size; + gpointer free_data; + gint *fds; + gint n_fds; + gint max_fds; + gpointer free_fds; gsize magic; } PinosStackBuffer; diff --git a/pinos/client/stream.c b/pinos/client/stream.c index fbcfae19e..3935925ce 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -17,8 +17,11 @@ * Boston, MA 02110-1301, USA. */ +#include #include + #include +#include #include "pinos/server/daemon.h" #include "pinos/client/pinos.h" @@ -253,9 +256,8 @@ pinos_stream_finalize (GObject * object) g_clear_object (&priv->context); g_free (priv->name); - g_free (priv->buffer.data); - if (priv->buffer.message) - g_object_unref (priv->buffer.message); + g_free (priv->buffer.free_data); + g_free (priv->buffer.free_fds); G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object); } @@ -830,9 +832,9 @@ on_socket_condition (GSocket *socket, need = sizeof (PinosStackHeader); - if (priv->buffer.allocated_size < need) { - priv->buffer.allocated_size = need; - priv->buffer.data = g_realloc (priv->buffer.data, need); + if (priv->buffer.max_size < need) { + priv->buffer.max_size = need; + priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need); } hdr = priv->buffer.data; @@ -856,9 +858,9 @@ on_socket_condition (GSocket *socket, /* now we know the total length */ need += hdr->length; - if (priv->buffer.allocated_size < need) { - priv->buffer.allocated_size = need; - hdr = priv->buffer.data = g_realloc (priv->buffer.data, need); + if (priv->buffer.max_size < need) { + priv->buffer.max_size = need; + hdr = priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need); } priv->buffer.size = need; @@ -872,17 +874,25 @@ on_socket_condition (GSocket *socket, g_assert (len == hdr->length); } + if (priv->buffer.max_fds < num_messages) { + priv->buffer.max_fds = num_messages; + priv->buffer.fds = priv->buffer.free_fds = g_realloc (priv->buffer.free_fds, + num_messages * sizeof (int)); + } + /* handle control messages */ for (i = 0; i < num_messages; i++) { - if (i == 0) { - if (priv->buffer.message) - g_object_unref (priv->buffer.message); - priv->buffer.message = messages[0]; - } - else { - g_warning ("discarding control message %d", i); - g_object_unref (messages[i]); - } + GSocketControlMessage *msg = messages[i]; + gint *fds, n_fds, j; + + if (g_socket_control_message_get_msg_type (msg) != SCM_RIGHTS) + continue; + + fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds); + for (j = 0; j < n_fds; j++) + priv->buffer.fds[i] = fds[i]; + g_free (fds); + g_object_unref (msg); } g_free (messages); @@ -892,7 +902,7 @@ on_socket_condition (GSocket *socket, priv->buffer.magic = 0; priv->buffer.size = 0; - g_clear_object (&priv->buffer.message); + priv->buffer.n_fds = 0; break; } case G_IO_OUT: @@ -1223,9 +1233,10 @@ pinos_stream_send_buffer (PinosStream *stream, gssize len; PinosStackBuffer *sb = (PinosStackBuffer *) buffer; GOutputVector ovec[1]; + GSocketControlMessage *msg = NULL; gint flags = 0; GError *error = NULL; - gint n_msg; + gint i, n_msg; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); g_return_val_if_fail (buffer != NULL, FALSE); @@ -1236,16 +1247,22 @@ pinos_stream_send_buffer (PinosStream *stream, ovec[0].buffer = sb->data; ovec[0].size = sb->size; - if (sb->message) + if (sb->n_fds) { n_msg = 1; - else + msg = g_unix_fd_message_new (); + for (i = 0; i < sb->n_fds; i++) + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), sb->fds[i], &error)) + goto append_failed; + } + else { n_msg = 0; + } len = g_socket_send_message (priv->socket, NULL, ovec, 1, - &sb->message, + &msg, n_msg, flags, NULL, @@ -1257,6 +1274,13 @@ pinos_stream_send_buffer (PinosStream *stream, return TRUE; +append_failed: + { + g_warning ("failed to append fd: %s", error->message); + g_object_unref (msg); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + return FALSE; + } send_error: { g_warning ("failed to send_message: %s", error->message); diff --git a/pinos/gst/gstpinosdepay.c b/pinos/gst/gstpinosdepay.c index 2cc1a764b..3d8de37aa 100644 --- a/pinos/gst/gstpinosdepay.c +++ b/pinos/gst/gstpinosdepay.c @@ -142,7 +142,7 @@ release_fds (GstPinosDepay *this, GstBuffer *buffer) pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -164,20 +164,20 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) PinosBuffer pbuf; PinosBufferIter it; GstNetControlMessageMeta * meta; - GSocketControlMessage *msg = NULL; GError *err = NULL; GArray *fdids = NULL; + GUnixFDList *fds = NULL; meta = ((GstNetControlMessageMeta*) gst_buffer_get_meta ( buffer, GST_NET_CONTROL_MESSAGE_META_API_TYPE)); if (meta) { - msg = g_object_ref (meta->message); - gst_buffer_remove_meta (buffer, (GstMeta *) meta); - meta = NULL; + if (G_IS_UNIX_FD_MESSAGE (meta->message)) { + fds = g_unix_fd_message_get_fd_list (G_UNIX_FD_MESSAGE (meta->message)); + } } gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, msg); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { @@ -212,7 +212,7 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) goto error; - fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &err); + fd = g_unix_fd_list_get (fds, p.fd_index, &err); if (fd == -1) goto error; diff --git a/pinos/gst/gstpinospay.c b/pinos/gst/gstpinospay.c index 33785a246..149c89ef4 100644 --- a/pinos/gst/gstpinospay.c +++ b/pinos/gst/gstpinospay.c @@ -214,7 +214,7 @@ client_buffer_received (GstPinosPay *pay, GstBuffer *buffer, } gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -266,7 +266,7 @@ client_buffer_received (GstPinosPay *pay, GstBuffer *buffer, if (have_out) { pinos_buffer_builder_end (&b, &pbuf); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -379,7 +379,7 @@ release_fds (GstPinosPay *pay, GstBuffer *buffer) pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -401,7 +401,7 @@ gst_pinos_pay_chain_pinos (GstPinosPay *pay, GstBuffer * buffer) GArray *fdids = NULL; gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -467,6 +467,7 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) gpointer data; GSocketControlMessage *msg; gboolean tmpfile = TRUE; + gint *fds, n_fds, i; hdr.flags = 0; hdr.seq = GST_BUFFER_OFFSET (buffer); @@ -476,20 +477,19 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) pinos_buffer_builder_init (&builder); pinos_buffer_builder_add_header (&builder, &hdr); + msg = g_unix_fd_message_new (); + fdmem = gst_pinos_pay_get_fd_memory (pay, buffer, &tmpfile); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err); - if (p.fd_index == -1) - goto add_fd_failed; + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); p.id = pinos_fd_manager_get_id (pay->fdmanager); p.offset = fdmem->offset; p.size = fdmem->size; pinos_buffer_builder_add_fd_payload (&builder, &p); pinos_buffer_builder_end (&builder, &pbuf); - gst_memory_unref(fdmem); - fdmem = NULL; - data = pinos_buffer_steal (&pbuf, &size, &msg); + data = pinos_buffer_steal_data (&pbuf, &size); + fds = pinos_buffer_steal_fds (&pbuf, &n_fds); outbuf = gst_buffer_new_wrapped (data, size); GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (buffer); @@ -498,6 +498,18 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer); GST_BUFFER_OFFSET_END (outbuf) = GST_BUFFER_OFFSET_END (buffer); + msg = g_unix_fd_message_new (); + for (i = 0; i < n_fds; i++) { + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), fds[i], &err)) + goto add_fd_failed; + } + gst_buffer_add_net_control_message_meta (outbuf, msg); + g_object_unref (msg); + g_free (fds); + + gst_memory_unref(fdmem); + fdmem = NULL; + if (!tmpfile) { GArray *fdids; /* we are using the original buffer fd in the control message, we need @@ -516,16 +528,13 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) gst_buffer_unref (buffer); } - gst_buffer_add_net_control_message_meta (outbuf, msg); - g_object_unref (msg); - return gst_pad_push (pay->srcpad, outbuf); /* ERRORS */ add_fd_failed: { GST_WARNING_OBJECT (pay, "Adding fd failed: %s", err->message); - gst_memory_unref(fdmem); + gst_object_unref(msg); g_clear_error (&err); return GST_FLOW_ERROR; diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index d4b3dce09..e06409eea 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -507,7 +507,6 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) PinosPacketHeader hdr; PinosPacketFDPayload p; gsize size; - GError *err = NULL; gboolean tmpfile, res; pinossink = GST_PINOS_SINK (bsink); @@ -552,17 +551,13 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_buffer_builder_init (&builder); pinos_buffer_builder_add_header (&builder, &hdr); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem), &err); - if (p.fd_index == -1) - goto add_fd_failed; + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem)); p.id = pinossink->id_counter++; p.offset = 0; p.size = size; pinos_buffer_builder_add_fd_payload (&builder, &p); pinos_buffer_builder_end (&builder, &pbuf); - gst_memory_unref (mem); - GST_LOG ("sending fd index %d", p.id); pinos_main_loop_lock (pinossink->loop); @@ -572,6 +567,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_buffer_clear (&pbuf); pinos_main_loop_unlock (pinossink->loop); + gst_memory_unref (mem); + if (res && !tmpfile) { /* keep the buffer around until we get the release fd message */ g_hash_table_insert (pinossink->fdids, GINT_TO_POINTER (p.id), gst_buffer_ref (buffer)); @@ -586,18 +583,13 @@ map_error: { GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, ("failed to map buffer"), (NULL)); - return GST_FLOW_ERROR; - } -add_fd_failed: - { - GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, - ("failed to add fd: %s", err->message), (NULL)); - pinos_buffer_builder_clear (&builder); + gst_memory_unref (mem); return GST_FLOW_ERROR; } streaming_error: { pinos_main_loop_unlock (pinossink->loop); + gst_memory_unref (mem); return GST_FLOW_ERROR; } } diff --git a/pinos/gst/gstpinossocketsink.c b/pinos/gst/gstpinossocketsink.c index 0e8cb53a6..6a70cd173 100644 --- a/pinos/gst/gstpinossocketsink.c +++ b/pinos/gst/gstpinossocketsink.c @@ -201,7 +201,7 @@ release_fds (GstPinosSocketSink *this, GstBuffer *buffer) pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -223,7 +223,7 @@ gst_pinos_socket_sink_render_pinos (GstPinosSocketSink * this, GstBuffer * buffe GArray *fdids = NULL; gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -314,6 +314,7 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe gpointer data; GSocketControlMessage *msg; gboolean tmpfile = TRUE; + gint *fds, n_fds, i; hdr.flags = 0; hdr.seq = GST_BUFFER_OFFSET (buffer); @@ -324,9 +325,7 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe pinos_buffer_builder_add_header (&builder, &hdr); fdmem = gst_pinos_socket_sink_get_fd_memory (this, buffer, &tmpfile); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err); - if (p.fd_index == -1) - goto add_fd_failed; + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); p.id = pinos_fd_manager_get_id (this->fdmanager); p.offset = fdmem->offset; p.size = fdmem->size; @@ -336,10 +335,9 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe p.id, hdr.pts, GST_BUFFER_PTS (buffer), GST_ELEMENT_CAST (this)->base_time); pinos_buffer_builder_end (&builder, &pbuf); - gst_memory_unref(fdmem); - fdmem = NULL; - data = pinos_buffer_steal (&pbuf, &size, &msg); + data = pinos_buffer_steal_data (&pbuf, &size); + fds = pinos_buffer_steal_fds (&pbuf, &n_fds); outbuf = gst_buffer_new_wrapped (data, size); GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (buffer); @@ -348,6 +346,18 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer); GST_BUFFER_OFFSET_END (outbuf) = GST_BUFFER_OFFSET_END (buffer); + msg = g_unix_fd_message_new (); + for (i = 0; i < n_fds; i++) { + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), fds[i], &err)) + goto add_fd_failed; + } + gst_buffer_add_net_control_message_meta (outbuf, msg); + g_object_unref (msg); + g_free (fds); + + gst_memory_unref(fdmem); + fdmem = NULL; + if (!tmpfile) { GArray *fdids; /* we are using the original buffer fd in the control message, we need @@ -363,8 +373,6 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (outbuf), orig_buffer_quark, gst_buffer_ref (buffer), (GDestroyNotify) gst_buffer_unref); } - gst_buffer_add_net_control_message_meta (outbuf, msg); - g_object_unref (msg); gst_burst_cache_queue_buffer (this->cache, outbuf); @@ -534,7 +542,7 @@ myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader) pinos_buffer_builder_init (&b); } - pinos_buffer_init_data (&pbuf, mem, maxmem, NULL); + pinos_buffer_init_data (&pbuf, mem, maxmem, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -585,7 +593,7 @@ myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader) if (have_out) { pinos_buffer_builder_end (&b, &pbuf); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 7d3d38d82..144cd4e0e 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -358,7 +358,6 @@ on_new_buffer (GObject *gobject, PinosBuffer *pbuf; PinosBufferIter it; GstBuffer *buf = NULL; - GError *error = NULL; GST_LOG_OBJECT (pinossrc, "got new buffer"); if (!pinos_stream_peek_buffer (pinossrc->stream, &pbuf)) { @@ -397,8 +396,9 @@ on_new_buffer (GObject *gobject, if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p)) goto parse_failed; + GST_DEBUG ("got fd payload id %d", data.p.id); - fd = pinos_buffer_get_fd (pbuf, data.p.fd_index, &error); + fd = pinos_buffer_get_fd (pbuf, data.p.fd_index); if (fd == -1) goto no_fds; @@ -453,8 +453,7 @@ parse_failed: no_fds: { gst_buffer_unref (buf); - GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, - ("buffer error: %s", error->message), (NULL)); + GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, ("fd not found in buffer"), (NULL)); pinos_main_loop_signal (pinossrc->loop, FALSE); return; }