From 6864ea98304e849b04cee551099b3110b11122fa Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 2 Dec 2015 21:03:53 +0100 Subject: [PATCH] pinospay: keep buffers around until released If we are using the original buffer fd without copying it into a temp file we need to make sure the memory stays alive and is not reused until all clients are done with it. To do this, track what fd ids are in the outgoing buffers and ref the original buffer. We also need to know when a message is sent to a client and when that client does a release-fd on the fd index. This is done with some new events on the multisocketsink. Every time a message is sent to a client we hash the fd index in it and ref the buffer in a per-client hash table (on the socket). Every time we receive release-fd we remove the fd index from the hash and unref the buffer again. Clients that are killed get their socket removed, which also cleans up the hashtable and frees the memory again. --- src/gst/gstpinospay.c | 152 ++++++++++++++++++++++++++++++++++++++++-- src/gst/gstpinospay.h | 1 + 2 files changed, 147 insertions(+), 6 deletions(-) diff --git a/src/gst/gstpinospay.c b/src/gst/gstpinospay.c index c84e74290..8727e7bbe 100644 --- a/src/gst/gstpinospay.c +++ b/src/gst/gstpinospay.c @@ -57,6 +57,9 @@ GST_DEBUG_CATEGORY_STATIC (gst_pinos_pay_debug_category); #define GST_CAT_DEFAULT gst_pinos_pay_debug_category +static GQuark fdids_quark; +static GQuark orig_buffer_quark; + /* prototypes */ /* pad templates */ @@ -124,15 +127,129 @@ gst_pinos_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) return res; } +static void +client_buffer_sent (GstPinosPay *pay, GstBuffer *buffer, + GObject *obj) +{ + GArray *fdids; + GHashTable *hash; + guint i; + + fdids = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), fdids_quark); + if (fdids == NULL) + return; + + /* we keep a hashtable of fd ids on the sender object (usually GSocket) itself, + * when the object is destroyed, we automatically also release the refcounts */ + hash = g_object_get_qdata (obj, fdids_quark); + if (hash == NULL) { + hash = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) gst_buffer_unref); + g_object_set_qdata_full (obj, fdids_quark, hash, + (GDestroyNotify) g_hash_table_unref); + } + + for (i = 0; i < fdids->len; i++) { + gint id = g_array_index (fdids, guint32, i); + GST_LOG ("%p: fd index %d, increment refcount of buffer %p", hash, id, buffer); + g_hash_table_insert (hash, GINT_TO_POINTER (id), gst_buffer_ref (buffer)); + } +} + +static void +client_buffer_received (GstPinosPay *pay, GstBuffer *buffer, + GObject *obj) +{ + PinosBuffer pbuf; + PinosBufferIter it; + GstMapInfo info; + GHashTable *hash; + + hash = g_object_get_qdata (obj, fdids_quark); + if (hash == NULL) + return; + + gst_buffer_map (buffer, &info, GST_MAP_READ); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL); + pinos_buffer_iter_init (&it, &pbuf); + while (pinos_buffer_iter_next (&it)) { + switch (pinos_buffer_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: + { + PinosPacketReleaseFDPayload p; + gint id; + + if (!pinos_buffer_iter_parse_release_fd_payload (&it, &p)) + continue; + + id = p.id; + + GST_LOG ("%p: fd index %d is released", hash, id); + g_assert (g_hash_table_remove (hash, GINT_TO_POINTER (id))); + break; + } + default: + break; + } + } + gst_buffer_unmap (buffer, &info); + pinos_buffer_clear (&pbuf); +} + +static gboolean +gst_pinos_pay_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstPinosPay *pay = GST_PINOS_PAY (parent); + gboolean res = FALSE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + if (gst_event_has_name (event, "GstNetworkMessageDispatched")) { + const GstStructure *str = gst_event_get_structure (event); + GstBuffer *buf; + GObject *obj; + + gst_structure_get (str, "object", G_TYPE_OBJECT, &obj, + "buffer", GST_TYPE_BUFFER, &buf, NULL); + + client_buffer_sent (pay, buf, obj); + gst_buffer_unref (buf); + g_object_unref (obj); + } + else if (gst_event_has_name (event, "GstNetworkMessageReceived")) { + const GstStructure *str = gst_event_get_structure (event); + GstBuffer *buf; + GObject *obj; + + gst_structure_get (str, "object", G_TYPE_OBJECT, &obj, + "buffer", GST_TYPE_BUFFER, &buf, NULL); + + client_buffer_received (pay, buf, obj); + gst_buffer_unref (buf); + g_object_unref (obj); + } + gst_event_unref (event); + res = TRUE; + break; + } + default: + res = gst_pad_event_default (pad, parent, event); + break; + } + return res; +} + static GstMemory * -gst_pinos_pay_get_fd_memory (GstPinosPay * tmpfilepay, GstBuffer * buffer) +gst_pinos_pay_get_fd_memory (GstPinosPay * tmpfilepay, GstBuffer * buffer, gboolean *tmpfile) { GstMemory *mem = NULL; if (gst_buffer_n_memory (buffer) == 1 - && gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) + && gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) { mem = gst_buffer_get_memory (buffer, 0); - else { + *tmpfile = FALSE; + } else { GstMapInfo info; GstAllocationParams params = {0, 0, 0, 0, { NULL, }}; gsize size = gst_buffer_get_size (buffer); @@ -142,6 +259,7 @@ gst_pinos_pay_get_fd_memory (GstPinosPay * tmpfilepay, GstBuffer * buffer) return NULL; gst_buffer_extract (buffer, 0, info.data, size); gst_memory_unmap (mem, &info); + *tmpfile = TRUE; } return mem; } @@ -160,6 +278,7 @@ gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) gsize size; gpointer data; GSocketControlMessage *msg; + gboolean tmpfile = TRUE; hdr.flags = 0; hdr.seq = GST_BUFFER_OFFSET (buffer); @@ -169,11 +288,11 @@ gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) pinos_buffer_builder_init (&builder); pinos_buffer_builder_add_header (&builder, &hdr); - fdmem = gst_pinos_pay_get_fd_memory (pay, buffer); + fdmem = gst_pinos_pay_get_fd_memory (pay, buffer, &tmpfile); p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err); if (p.fd_index == -1) goto add_fd_failed; - p.id = 0; + p.id = pay->id_counter++; p.offset = fdmem->offset; p.size = fdmem->size; pinos_buffer_builder_add_fd_payload (&builder, &p); @@ -190,7 +309,24 @@ gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_DURATION (outbuf) = GST_BUFFER_DURATION (buffer); GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer); GST_BUFFER_OFFSET_END (outbuf) = GST_BUFFER_OFFSET_END (buffer); - gst_buffer_unref (buffer); + + if (!tmpfile) { + GArray *fdids; + /* we are using the original buffer fd in the control message, we need + * to make sure it is not reused before everyone is finished with it. + * We tag the output buffer with the array of fds in it and the original + * buffer (to keep it alive). All clients that receive the fd will + * increment outbuf refcount, all clients that do release-fd on the fd + * will decrease the refcount again. */ + fdids = g_array_new (FALSE, FALSE, sizeof (guint32)); + g_array_append_val (fdids, p.id); + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (outbuf), + fdids_quark, fdids, (GDestroyNotify) g_array_unref); + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (outbuf), + orig_buffer_quark, buffer, (GDestroyNotify) gst_buffer_unref); + } else { + gst_buffer_unref (buffer); + } gst_buffer_add_net_control_message_meta (outbuf, msg); g_object_unref (msg); @@ -223,6 +359,7 @@ static void gst_pinos_pay_init (GstPinosPay * pay) { pay->srcpad = gst_pad_new_from_static_template (&gst_pinos_pay_src_template, "src"); + gst_pad_set_event_function (pay->srcpad, gst_pinos_pay_src_event); gst_element_add_pad (GST_ELEMENT (pay), pay->srcpad); pay->sinkpad = gst_pad_new_from_static_template (&gst_pinos_pay_sink_template, "sink"); @@ -251,4 +388,7 @@ gst_pinos_pay_class_init (GstPinosPayClass * klass) "Wim Taymans "); gobject_class->finalize = gst_pinos_pay_finalize; + + fdids_quark = g_quark_from_static_string ("GstPinosPayFDIds"); + orig_buffer_quark = g_quark_from_static_string ("GstPinosPayOrigBuffer"); } diff --git a/src/gst/gstpinospay.h b/src/gst/gstpinospay.h index e1c27c9f4..737972ad9 100644 --- a/src/gst/gstpinospay.h +++ b/src/gst/gstpinospay.h @@ -43,6 +43,7 @@ struct _GstPinosPay GstAllocator *allocator; + guint32 id_counter; }; struct _GstPinosPayClass