diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index 56f6c2955..7d1943d91 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -17,11 +17,9 @@ * Boston, MA 02110-1301, USA. */ -#include #include #include -#include #include "pinos/client/properties.h" #include "pinos/client/context.h" @@ -35,26 +33,32 @@ G_STATIC_ASSERT (sizeof (PinosStackBuffer) <= sizeof (PinosBuffer)); * @buffer: a #PinosBuffer * @data: data * @size: size of @data - * @message: (transfer full): a #GSocketControlMessage + * @fds: file descriptors + * @n_fds: number of file descriptors * - * Initialize @buffer with @data and @size and @message. @data and size - * must not be modified until pinos_buffer_clear() is called. - * - * Ownership is taken of @message. + * Initialize @buffer with @data and @size and @fds and @n_fds. + * The memory pointer to by @data and @fds becomes property of @buffer + * and should not be freed or modified until pinos_buffer_clear() is + * called. */ void pinos_buffer_init_data (PinosBuffer *buffer, gpointer data, gsize size, - GSocketControlMessage *message) + gint *fds, + gint n_fds) { PinosStackBuffer *sb = PSB (buffer); sb->magic = PSB_MAGIC; sb->data = data; sb->size = size; - sb->allocated_size = 0; - sb->message = message; + sb->max_size = size; + sb->free_data = NULL; + sb->fds = fds; + sb->n_fds = n_fds; + sb->max_fds = n_fds; + sb->free_fds = NULL; } void @@ -65,11 +69,8 @@ pinos_buffer_clear (PinosBuffer *buffer) g_return_if_fail (is_valid_buffer (buffer)); sb->magic = 0; - if (sb->allocated_size) - g_free (sb->data); - sb->size = 0; - sb->allocated_size = 0; - g_clear_object (&sb->message); + g_free (sb->free_data); + g_free (sb->free_fds); } /** @@ -97,83 +98,89 @@ pinos_buffer_get_version (PinosBuffer *buffer) * pinos_buffer_get_fd: * @buffer: a #PinosBuffer * @index: an index - * @error: a #GError or %NULL * * Get the file descriptor at @index in @buffer. * - * Returns: a file descriptor ar @index in @buffer. The file descriptor is - * duplicated using dup() and set as close-on-exec before being returned. - * You must call close() on it when you are done. -1 is returned on error and - * @error is set. + * Returns: a file descriptor at @index in @buffer. The file descriptor + * is not duplicated in any way. -1 is returned on error. */ int -pinos_buffer_get_fd (PinosBuffer *buffer, gint index, GError **error) +pinos_buffer_get_fd (PinosBuffer *buffer, gint index) { PinosStackBuffer *sb = PSB (buffer); - GUnixFDList *fds; g_return_val_if_fail (is_valid_buffer (buffer), -1); - g_return_val_if_fail (sb->message != NULL, -1); - if (g_socket_control_message_get_msg_type (sb->message) != SCM_RIGHTS) - goto not_found; - - fds = g_unix_fd_message_get_fd_list (G_UNIX_FD_MESSAGE (sb->message)); - if (fds == NULL) - goto not_found; - - if (g_unix_fd_list_get_length (fds) <= index) - goto not_found; - - return g_unix_fd_list_get (fds, index, error); - - /* ERRORS */ -not_found: - { - if (error) - *error = g_error_new (G_IO_ERROR, - G_IO_ERROR_NOT_FOUND, - "Buffer does not have any fd at index %d", index); + if (sb->fds == NULL || sb->n_fds < index) return -1; - } + + return sb->fds[index]; } /** - * pinos_buffer_steal: + * pinos_buffer_steal_data: * @buffer: a #PinosBuffer - * @size: output size - * @message: output #GSocketControlMessage + * @size: output size or %NULL to ignore * - * Take the data and control message from @buffer. + * Take the data from @buffer. * * Returns: the data of @buffer. */ gpointer -pinos_buffer_steal (PinosBuffer *buffer, - gsize *size, - GSocketControlMessage **message) +pinos_buffer_steal_data (PinosBuffer *buffer, + gsize *size) { PinosStackBuffer *sb = PSB (buffer); gpointer data; g_return_val_if_fail (is_valid_buffer (buffer), 0); + data = sb->data; if (size) *size = sb->size; - if (message) - *message = sb->message; - data = sb->data; - - sb->magic = 0; + if (sb->data != sb->free_data) + g_free (sb->free_data); sb->data = NULL; + sb->free_data = NULL; sb->size = 0; - sb->allocated_size = 0; - sb->message = NULL; + sb->max_size = 0; return data; } +/** + * pinos_buffer_steal_fds: + * @buffer: a #PinosBuffer + * @n_fds: number of fds + * + * Take the fds from @buffer. + * + * Returns: the fds of @buffer. + */ +gint * +pinos_buffer_steal_fds (PinosBuffer *buffer, + gint *n_fds) +{ + PinosStackBuffer *sb = PSB (buffer); + gint *fds; + + g_return_val_if_fail (is_valid_buffer (buffer), 0); + + fds = sb->fds; + if (n_fds) + *n_fds = sb->n_fds; + + if (sb->fds != sb->free_fds) + g_free (sb->free_fds); + sb->fds = NULL; + sb->free_fds = NULL; + sb->n_fds = 0; + sb->max_fds = 0; + + return fds; +} + /** * PinosBufferIter: * @@ -336,8 +343,6 @@ struct stack_builder { PinosPacketType type; gsize offset; - - guint n_fds; }; G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosBufferBuilder)); @@ -352,12 +357,20 @@ G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosBufferBuilder)); * pinos_buffer_builder_init_full: * @builder: a #PinosBufferBuilder * @version: a version + * @data: data to build into or %NULL to allocate + * @max_data: allocated size of @data + * @fds: memory for fds + * @max_fds: maximum number of fds in @fds * * Initialize a stack allocated @builder and set the @version. */ void pinos_buffer_builder_init_full (PinosBufferBuilder *builder, - guint32 version) + guint32 version, + gpointer data, + gsize max_data, + gint *fds, + gint max_fds) { struct stack_builder *sb = PPSB (builder); PinosStackHeader *sh; @@ -365,10 +378,22 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, g_return_if_fail (builder != NULL); sb->magic = PPSB_MAGIC; - sb->buf.allocated_size = sizeof (PinosStackHeader) + 128; - sb->buf.data = g_malloc (sb->buf.allocated_size); + + if (max_data < sizeof (PinosStackHeader) || data == NULL) { + sb->buf.max_size = sizeof (PinosStackHeader) + 128; + sb->buf.data = g_malloc (sb->buf.max_size); + sb->buf.free_data = sb->buf.data; + } else { + sb->buf.max_size = max_data; + sb->buf.data = data; + sb->buf.free_data = NULL; + } sb->buf.size = sizeof (PinosStackHeader); - sb->buf.message = NULL; + + sb->buf.fds = fds; + sb->buf.max_fds = max_fds; + sb->buf.n_fds = 0; + sb->buf.free_fds = NULL; sh = sb->sh = sb->buf.data; sh->version = version; @@ -376,7 +401,6 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, sb->type = 0; sb->offset = 0; - sb->n_fds = 0; } /** @@ -397,7 +421,8 @@ pinos_buffer_builder_clear (PinosBufferBuilder *builder) g_return_if_fail (is_valid_builder (builder)); sb->magic = 0; - g_free (sb->buf.data); + g_free (sb->buf.free_data); + g_free (sb->buf.free_fds); } /** @@ -421,60 +446,59 @@ pinos_buffer_builder_end (PinosBufferBuilder *builder, g_return_if_fail (is_valid_builder (builder)); g_return_if_fail (buffer != NULL); + sb->magic = 0; sb->sh->length = sb->buf.size - sizeof (PinosStackHeader); sbuf->magic = PSB_MAGIC; sbuf->data = sb->buf.data; sbuf->size = sb->buf.size; - sbuf->allocated_size = sb->buf.allocated_size; - sbuf->message = sb->buf.message; + sbuf->max_size = sb->buf.max_size; + sbuf->free_data = sb->buf.free_data; - sb->buf.data = NULL; - sb->buf.size = 0; - sb->buf.allocated_size = 0; - sb->buf.message = NULL; - sb->magic = 0; + sbuf->fds = sb->buf.fds; + sbuf->n_fds = sb->buf.n_fds; + sbuf->max_fds = sb->buf.max_fds; + sbuf->free_fds = sb->buf.free_fds; } /** * pinos_buffer_builder_add_fd: * @builder: a #PinosBufferBuilder * @fd: a valid fd - * @error: a #GError or %NULL * * Add the file descriptor @fd to @builder. * - * Returns: the index of the file descriptor in @builder. The file descriptor - * is duplicated using dup(). You keep your copy of the descriptor and the copy - * contained in @buffer will be closed when @buffer is freed. - * -1 is returned on error and @error is set. + * Returns: the index of the file descriptor in @builder. */ gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, - int fd, - GError **error) + int fd) { struct stack_builder *sb = PPSB (builder); + gint index; g_return_val_if_fail (is_valid_builder (builder), -1); g_return_val_if_fail (fd > 0, -1); - if (sb->buf.message == NULL) - sb->buf.message = g_unix_fd_message_new (); + if (sb->buf.n_fds >= sb->buf.max_fds) { + sb->buf.max_fds += 8; + sb->buf.free_fds = g_realloc (sb->buf.free_fds, sb->buf.max_fds * sizeof (int)); + sb->buf.fds = sb->buf.free_fds; + } + index = sb->buf.n_fds; + sb->buf.fds[index] = fd; + sb->buf.n_fds++; - if (!g_unix_fd_message_append_fd ((GUnixFDMessage*)sb->buf.message, fd, error)) - return -1; - - return sb->n_fds++; + return index; } static gpointer builder_ensure_size (struct stack_builder *sb, gsize size) { - if (sb->buf.size + size > sb->buf.allocated_size) { - sb->buf.allocated_size = sb->buf.size + MAX (size, 1024); - sb->buf.data = g_realloc (sb->buf.data, sb->buf.allocated_size); - sb->sh = sb->buf.data; + if (sb->buf.size + size > sb->buf.max_size) { + sb->buf.max_size = sb->buf.size + MAX (size, 1024); + sb->buf.free_data = g_realloc (sb->buf.free_data, sb->buf.max_size); + sb->sh = sb->buf.data = sb->buf.free_data; } return (guint8 *) sb->buf.data + sb->buf.size; } diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index 8e4dffaa9..b828ad029 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -39,18 +39,19 @@ struct _PinosBuffer { void pinos_buffer_init_data (PinosBuffer *buffer, gpointer data, gsize size, - GSocketControlMessage *message); + gint *fds, + gint n_fds); void pinos_buffer_clear (PinosBuffer *buffer); guint32 pinos_buffer_get_version (PinosBuffer *buffer); int pinos_buffer_get_fd (PinosBuffer *buffer, - gint index, - GError **error); + gint index); -gpointer pinos_buffer_steal (PinosBuffer *buffer, - gsize *size, - GSocketControlMessage **message); +gpointer pinos_buffer_steal_data (PinosBuffer *buffer, + gsize *size); +gint * pinos_buffer_steal_fds (PinosBuffer *buffer, + gint *n_fds); /** @@ -107,16 +108,20 @@ struct _PinosBufferBuilder { }; void pinos_buffer_builder_init_full (PinosBufferBuilder *builder, - guint32 version); -#define pinos_buffer_builder_init(b) pinos_buffer_builder_init_full(b, PINOS_BUFFER_VERSION); + guint32 version, + gpointer data, + gsize max_data, + gint *fds, + gint max_fds); +#define pinos_buffer_builder_init_into(b,d,md,f,mf) pinos_buffer_builder_init_full(b, PINOS_BUFFER_VERSION,d,md,f,mf); +#define pinos_buffer_builder_init(b) pinos_buffer_builder_init_into(b, NULL, 0, NULL, 0); void pinos_buffer_builder_clear (PinosBufferBuilder *builder); void pinos_buffer_builder_end (PinosBufferBuilder *builder, PinosBuffer *buffer); gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, - int fd, - GError **error); + int fd); /* header packets */ /** * PinosPacketHeader diff --git a/pinos/client/private.h b/pinos/client/private.h index f0956c4f7..78e83432e 100644 --- a/pinos/client/private.h +++ b/pinos/client/private.h @@ -63,10 +63,14 @@ typedef struct { } PinosStackHeader; typedef struct { - gsize allocated_size; - gsize size; gpointer data; - GSocketControlMessage *message; + gsize size; + gsize max_size; + gpointer free_data; + gint *fds; + gint n_fds; + gint max_fds; + gpointer free_fds; gsize magic; } PinosStackBuffer; diff --git a/pinos/client/stream.c b/pinos/client/stream.c index fbcfae19e..3935925ce 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -17,8 +17,11 @@ * Boston, MA 02110-1301, USA. */ +#include #include + #include +#include #include "pinos/server/daemon.h" #include "pinos/client/pinos.h" @@ -253,9 +256,8 @@ pinos_stream_finalize (GObject * object) g_clear_object (&priv->context); g_free (priv->name); - g_free (priv->buffer.data); - if (priv->buffer.message) - g_object_unref (priv->buffer.message); + g_free (priv->buffer.free_data); + g_free (priv->buffer.free_fds); G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object); } @@ -830,9 +832,9 @@ on_socket_condition (GSocket *socket, need = sizeof (PinosStackHeader); - if (priv->buffer.allocated_size < need) { - priv->buffer.allocated_size = need; - priv->buffer.data = g_realloc (priv->buffer.data, need); + if (priv->buffer.max_size < need) { + priv->buffer.max_size = need; + priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need); } hdr = priv->buffer.data; @@ -856,9 +858,9 @@ on_socket_condition (GSocket *socket, /* now we know the total length */ need += hdr->length; - if (priv->buffer.allocated_size < need) { - priv->buffer.allocated_size = need; - hdr = priv->buffer.data = g_realloc (priv->buffer.data, need); + if (priv->buffer.max_size < need) { + priv->buffer.max_size = need; + hdr = priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need); } priv->buffer.size = need; @@ -872,17 +874,25 @@ on_socket_condition (GSocket *socket, g_assert (len == hdr->length); } + if (priv->buffer.max_fds < num_messages) { + priv->buffer.max_fds = num_messages; + priv->buffer.fds = priv->buffer.free_fds = g_realloc (priv->buffer.free_fds, + num_messages * sizeof (int)); + } + /* handle control messages */ for (i = 0; i < num_messages; i++) { - if (i == 0) { - if (priv->buffer.message) - g_object_unref (priv->buffer.message); - priv->buffer.message = messages[0]; - } - else { - g_warning ("discarding control message %d", i); - g_object_unref (messages[i]); - } + GSocketControlMessage *msg = messages[i]; + gint *fds, n_fds, j; + + if (g_socket_control_message_get_msg_type (msg) != SCM_RIGHTS) + continue; + + fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds); + for (j = 0; j < n_fds; j++) + priv->buffer.fds[i] = fds[i]; + g_free (fds); + g_object_unref (msg); } g_free (messages); @@ -892,7 +902,7 @@ on_socket_condition (GSocket *socket, priv->buffer.magic = 0; priv->buffer.size = 0; - g_clear_object (&priv->buffer.message); + priv->buffer.n_fds = 0; break; } case G_IO_OUT: @@ -1223,9 +1233,10 @@ pinos_stream_send_buffer (PinosStream *stream, gssize len; PinosStackBuffer *sb = (PinosStackBuffer *) buffer; GOutputVector ovec[1]; + GSocketControlMessage *msg = NULL; gint flags = 0; GError *error = NULL; - gint n_msg; + gint i, n_msg; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); g_return_val_if_fail (buffer != NULL, FALSE); @@ -1236,16 +1247,22 @@ pinos_stream_send_buffer (PinosStream *stream, ovec[0].buffer = sb->data; ovec[0].size = sb->size; - if (sb->message) + if (sb->n_fds) { n_msg = 1; - else + msg = g_unix_fd_message_new (); + for (i = 0; i < sb->n_fds; i++) + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), sb->fds[i], &error)) + goto append_failed; + } + else { n_msg = 0; + } len = g_socket_send_message (priv->socket, NULL, ovec, 1, - &sb->message, + &msg, n_msg, flags, NULL, @@ -1257,6 +1274,13 @@ pinos_stream_send_buffer (PinosStream *stream, return TRUE; +append_failed: + { + g_warning ("failed to append fd: %s", error->message); + g_object_unref (msg); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + return FALSE; + } send_error: { g_warning ("failed to send_message: %s", error->message); diff --git a/pinos/gst/gstpinosdepay.c b/pinos/gst/gstpinosdepay.c index 2cc1a764b..3d8de37aa 100644 --- a/pinos/gst/gstpinosdepay.c +++ b/pinos/gst/gstpinosdepay.c @@ -142,7 +142,7 @@ release_fds (GstPinosDepay *this, GstBuffer *buffer) pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -164,20 +164,20 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) PinosBuffer pbuf; PinosBufferIter it; GstNetControlMessageMeta * meta; - GSocketControlMessage *msg = NULL; GError *err = NULL; GArray *fdids = NULL; + GUnixFDList *fds = NULL; meta = ((GstNetControlMessageMeta*) gst_buffer_get_meta ( buffer, GST_NET_CONTROL_MESSAGE_META_API_TYPE)); if (meta) { - msg = g_object_ref (meta->message); - gst_buffer_remove_meta (buffer, (GstMeta *) meta); - meta = NULL; + if (G_IS_UNIX_FD_MESSAGE (meta->message)) { + fds = g_unix_fd_message_get_fd_list (G_UNIX_FD_MESSAGE (meta->message)); + } } gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, msg); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { @@ -212,7 +212,7 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) goto error; - fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &err); + fd = g_unix_fd_list_get (fds, p.fd_index, &err); if (fd == -1) goto error; diff --git a/pinos/gst/gstpinospay.c b/pinos/gst/gstpinospay.c index 33785a246..149c89ef4 100644 --- a/pinos/gst/gstpinospay.c +++ b/pinos/gst/gstpinospay.c @@ -214,7 +214,7 @@ client_buffer_received (GstPinosPay *pay, GstBuffer *buffer, } gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -266,7 +266,7 @@ client_buffer_received (GstPinosPay *pay, GstBuffer *buffer, if (have_out) { pinos_buffer_builder_end (&b, &pbuf); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -379,7 +379,7 @@ release_fds (GstPinosPay *pay, GstBuffer *buffer) pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -401,7 +401,7 @@ gst_pinos_pay_chain_pinos (GstPinosPay *pay, GstBuffer * buffer) GArray *fdids = NULL; gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -467,6 +467,7 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) gpointer data; GSocketControlMessage *msg; gboolean tmpfile = TRUE; + gint *fds, n_fds, i; hdr.flags = 0; hdr.seq = GST_BUFFER_OFFSET (buffer); @@ -476,20 +477,19 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) pinos_buffer_builder_init (&builder); pinos_buffer_builder_add_header (&builder, &hdr); + msg = g_unix_fd_message_new (); + fdmem = gst_pinos_pay_get_fd_memory (pay, buffer, &tmpfile); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err); - if (p.fd_index == -1) - goto add_fd_failed; + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); p.id = pinos_fd_manager_get_id (pay->fdmanager); p.offset = fdmem->offset; p.size = fdmem->size; pinos_buffer_builder_add_fd_payload (&builder, &p); pinos_buffer_builder_end (&builder, &pbuf); - gst_memory_unref(fdmem); - fdmem = NULL; - data = pinos_buffer_steal (&pbuf, &size, &msg); + data = pinos_buffer_steal_data (&pbuf, &size); + fds = pinos_buffer_steal_fds (&pbuf, &n_fds); outbuf = gst_buffer_new_wrapped (data, size); GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (buffer); @@ -498,6 +498,18 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer); GST_BUFFER_OFFSET_END (outbuf) = GST_BUFFER_OFFSET_END (buffer); + msg = g_unix_fd_message_new (); + for (i = 0; i < n_fds; i++) { + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), fds[i], &err)) + goto add_fd_failed; + } + gst_buffer_add_net_control_message_meta (outbuf, msg); + g_object_unref (msg); + g_free (fds); + + gst_memory_unref(fdmem); + fdmem = NULL; + if (!tmpfile) { GArray *fdids; /* we are using the original buffer fd in the control message, we need @@ -516,16 +528,13 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) gst_buffer_unref (buffer); } - gst_buffer_add_net_control_message_meta (outbuf, msg); - g_object_unref (msg); - return gst_pad_push (pay->srcpad, outbuf); /* ERRORS */ add_fd_failed: { GST_WARNING_OBJECT (pay, "Adding fd failed: %s", err->message); - gst_memory_unref(fdmem); + gst_object_unref(msg); g_clear_error (&err); return GST_FLOW_ERROR; diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index d4b3dce09..e06409eea 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -507,7 +507,6 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) PinosPacketHeader hdr; PinosPacketFDPayload p; gsize size; - GError *err = NULL; gboolean tmpfile, res; pinossink = GST_PINOS_SINK (bsink); @@ -552,17 +551,13 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_buffer_builder_init (&builder); pinos_buffer_builder_add_header (&builder, &hdr); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem), &err); - if (p.fd_index == -1) - goto add_fd_failed; + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem)); p.id = pinossink->id_counter++; p.offset = 0; p.size = size; pinos_buffer_builder_add_fd_payload (&builder, &p); pinos_buffer_builder_end (&builder, &pbuf); - gst_memory_unref (mem); - GST_LOG ("sending fd index %d", p.id); pinos_main_loop_lock (pinossink->loop); @@ -572,6 +567,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_buffer_clear (&pbuf); pinos_main_loop_unlock (pinossink->loop); + gst_memory_unref (mem); + if (res && !tmpfile) { /* keep the buffer around until we get the release fd message */ g_hash_table_insert (pinossink->fdids, GINT_TO_POINTER (p.id), gst_buffer_ref (buffer)); @@ -586,18 +583,13 @@ map_error: { GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, ("failed to map buffer"), (NULL)); - return GST_FLOW_ERROR; - } -add_fd_failed: - { - GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, - ("failed to add fd: %s", err->message), (NULL)); - pinos_buffer_builder_clear (&builder); + gst_memory_unref (mem); return GST_FLOW_ERROR; } streaming_error: { pinos_main_loop_unlock (pinossink->loop); + gst_memory_unref (mem); return GST_FLOW_ERROR; } } diff --git a/pinos/gst/gstpinossocketsink.c b/pinos/gst/gstpinossocketsink.c index 0e8cb53a6..6a70cd173 100644 --- a/pinos/gst/gstpinossocketsink.c +++ b/pinos/gst/gstpinossocketsink.c @@ -201,7 +201,7 @@ release_fds (GstPinosSocketSink *this, GstBuffer *buffer) pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, @@ -223,7 +223,7 @@ gst_pinos_socket_sink_render_pinos (GstPinosSocketSink * this, GstBuffer * buffe GArray *fdids = NULL; gst_buffer_map (buffer, &info, GST_MAP_READ); - pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -314,6 +314,7 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe gpointer data; GSocketControlMessage *msg; gboolean tmpfile = TRUE; + gint *fds, n_fds, i; hdr.flags = 0; hdr.seq = GST_BUFFER_OFFSET (buffer); @@ -324,9 +325,7 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe pinos_buffer_builder_add_header (&builder, &hdr); fdmem = gst_pinos_socket_sink_get_fd_memory (this, buffer, &tmpfile); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err); - if (p.fd_index == -1) - goto add_fd_failed; + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); p.id = pinos_fd_manager_get_id (this->fdmanager); p.offset = fdmem->offset; p.size = fdmem->size; @@ -336,10 +335,9 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe p.id, hdr.pts, GST_BUFFER_PTS (buffer), GST_ELEMENT_CAST (this)->base_time); pinos_buffer_builder_end (&builder, &pbuf); - gst_memory_unref(fdmem); - fdmem = NULL; - data = pinos_buffer_steal (&pbuf, &size, &msg); + data = pinos_buffer_steal_data (&pbuf, &size); + fds = pinos_buffer_steal_fds (&pbuf, &n_fds); outbuf = gst_buffer_new_wrapped (data, size); GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (buffer); @@ -348,6 +346,18 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer); GST_BUFFER_OFFSET_END (outbuf) = GST_BUFFER_OFFSET_END (buffer); + msg = g_unix_fd_message_new (); + for (i = 0; i < n_fds; i++) { + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), fds[i], &err)) + goto add_fd_failed; + } + gst_buffer_add_net_control_message_meta (outbuf, msg); + g_object_unref (msg); + g_free (fds); + + gst_memory_unref(fdmem); + fdmem = NULL; + if (!tmpfile) { GArray *fdids; /* we are using the original buffer fd in the control message, we need @@ -363,8 +373,6 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (outbuf), orig_buffer_quark, gst_buffer_ref (buffer), (GDestroyNotify) gst_buffer_unref); } - gst_buffer_add_net_control_message_meta (outbuf, msg); - g_object_unref (msg); gst_burst_cache_queue_buffer (this->cache, outbuf); @@ -534,7 +542,7 @@ myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader) pinos_buffer_builder_init (&b); } - pinos_buffer_init_data (&pbuf, mem, maxmem, NULL); + pinos_buffer_init_data (&pbuf, mem, maxmem, NULL, 0); pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -585,7 +593,7 @@ myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader) if (have_out) { pinos_buffer_builder_end (&b, &pbuf); - data = pinos_buffer_steal (&pbuf, &size, NULL); + data = pinos_buffer_steal_data (&pbuf, &size); outbuf = gst_buffer_new_wrapped (data, size); ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 7d3d38d82..144cd4e0e 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -358,7 +358,6 @@ on_new_buffer (GObject *gobject, PinosBuffer *pbuf; PinosBufferIter it; GstBuffer *buf = NULL; - GError *error = NULL; GST_LOG_OBJECT (pinossrc, "got new buffer"); if (!pinos_stream_peek_buffer (pinossrc->stream, &pbuf)) { @@ -397,8 +396,9 @@ on_new_buffer (GObject *gobject, if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p)) goto parse_failed; + GST_DEBUG ("got fd payload id %d", data.p.id); - fd = pinos_buffer_get_fd (pbuf, data.p.fd_index, &error); + fd = pinos_buffer_get_fd (pbuf, data.p.fd_index); if (fd == -1) goto no_fds; @@ -453,8 +453,7 @@ parse_failed: no_fds: { gst_buffer_unref (buf); - GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, - ("buffer error: %s", error->message), (NULL)); + GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, ("fd not found in buffer"), (NULL)); pinos_main_loop_signal (pinossrc->loop, FALSE); return; }