From 1c16fd5533cd4585e45beac35541e7bbbaf61e55 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 27 Apr 2016 16:05:02 +0200 Subject: [PATCH] socketsink: track buffers with the fdmanager --- pinos/gst/gstpinossocketsink.c | 229 ++++++++++++++++++++++----------- 1 file changed, 156 insertions(+), 73 deletions(-) diff --git a/pinos/gst/gstpinossocketsink.c b/pinos/gst/gstpinossocketsink.c index 5f20cfb2a..57544a545 100644 --- a/pinos/gst/gstpinossocketsink.c +++ b/pinos/gst/gstpinossocketsink.c @@ -35,6 +35,7 @@ #include #include +#include #include "gstpinossocketsink.h" #include "gsttmpfileallocator.h" @@ -497,6 +498,107 @@ open_failed: } } +static void +myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader) +{ + MySource *mysource = myreader->source; + gssize navail, nread, maxmem; + GstEvent *ev; + gchar *mem; + PinosBuffer pbuf; + PinosBufferIter it; + PinosBufferBuilder b; + const gchar *client_path; + gboolean have_out = FALSE; + + navail = g_socket_get_available_bytes (myreader->socket); + maxmem = MAX (navail, 1); + mem = g_malloc (maxmem); + nread = g_socket_receive (myreader->socket, mem, maxmem, NULL, NULL); + + if (nread <= 0) { + GST_DEBUG ("client closed"); + mysource->condition &= ~G_IO_IN; + g_source_modify_unix_fd ((GSource *)mysource, mysource->tag, mysource->condition); + g_free (mem); + return; + } + + client_path = g_object_get_data (G_OBJECT (myreader->socket), "pinos-client-path"); + if (client_path == NULL) + return; + + if (this->pinos_input) { + pinos_buffer_builder_init (&b); + } + + pinos_buffer_init_data (&pbuf, mem, maxmem, NULL); + pinos_buffer_iter_init (&it, &pbuf); + while (pinos_buffer_iter_next (&it)) { + switch (pinos_buffer_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: + { + PinosPacketReleaseFDPayload p; + gint id; + + if (!pinos_buffer_iter_parse_release_fd_payload (&it, &p)) + continue; + + id = p.id; + + GST_LOG ("fd index %d for client %s is released", id, client_path); + pinos_fd_manager_remove (this->fdmanager, client_path, id); + break; + } + case PINOS_PACKET_TYPE_REFRESH_REQUEST: + { + PinosPacketRefreshRequest p; + + if (!pinos_buffer_iter_parse_refresh_request (&it, &p)) + continue; + + GST_LOG ("refresh request"); + if (!this->pinos_input) { + gst_pad_push_event (GST_BASE_SINK_PAD (this), + gst_video_event_new_upstream_force_key_unit (p.pts, + p.request_type == 1, 0)); + } else { + pinos_buffer_builder_add_refresh_request (&b, &p); + have_out = TRUE; + } + break; + } + default: + break; + } + } + pinos_buffer_clear (&pbuf); + g_free (mem); + + if (this->pinos_input) { + GstBuffer *outbuf; + gsize size; + gpointer data; + + if (have_out) { + pinos_buffer_builder_end (&b, &pbuf); + + data = pinos_buffer_steal (&pbuf, &size, NULL); + + outbuf = gst_buffer_new_wrapped (data, size); + ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstNetworkMessage", + "object", G_TYPE_OBJECT, this, + "buffer", GST_TYPE_BUFFER, outbuf, NULL)); + gst_buffer_unref (outbuf); + + gst_pad_push_event (GST_BASE_SINK_PAD (this), ev); + } else { + pinos_buffer_builder_clear (&b); + } + } +} + static void myreader_callback (GstBurstCache *cache, GstBurstCacheReader *reader, @@ -510,56 +612,73 @@ myreader_callback (GstBurstCache *cache, g_source_modify_unix_fd ((GSource *)mysource, mysource->tag, mysource->condition); } -static int -map_n_memory_output_vector (GstBuffer * buf, GOutputVector * vectors, - GstMapInfo * mapinfo, guint num_vectors) +#define VEC_MAX 8 +#define CMSG_MAX 255 + +static void +myreader_send_buffer (GstPinosSocketSink *this, MyReader *myreader, GstBuffer *buf) { - guint mem_len; - guint i; + GstMapInfo maps[VEC_MAX]; + GOutputVector vec[VEC_MAX]; + GSocketControlMessage *cmsgs[CMSG_MAX]; + guint i, mem_len; + gpointer iter_state = NULL; + GstMeta *meta; + gsize msg_count = 0; + gssize wrote; - mem_len = gst_buffer_n_memory (buf); + mem_len = MIN (gst_buffer_n_memory (buf), VEC_MAX); - for (i = 0; i < mem_len && i < num_vectors; i++) { + for (i = 0; i < mem_len; i++) { GstMapInfo map = { 0 }; GstMemory *mem = gst_buffer_peek_memory (buf, i); if (!gst_memory_map (mem, &map, GST_MAP_READ)) g_error ("Unable to map memory %p. This should never happen.", mem); - vectors[i].buffer = map.data; - vectors[i].size = map.size; + vec[i].buffer = map.data; + vec[i].size = map.size; - mapinfo[i] = map; + maps[i] = map; } - return i; -} - -static void -unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings) -{ - gint i; - - for (i = 0; i < num_mappings; i++) - gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]); -} - -static gsize -gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs, - gsize msg_space) -{ - gpointer iter_state = NULL; - GstMeta *meta; - gsize msg_count = 0; - while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL - && msg_count < msg_space) { + && msg_count < CMSG_MAX) { if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE) - msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message; + cmsgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message; } - return msg_count; -} -#define CMSG_MAX 255 + wrote = g_socket_send_message (myreader->socket, NULL, vec, mem_len, cmsgs, msg_count, 0, + NULL, NULL); + + for (i = 0; i < mem_len; i++) + gst_memory_unmap (maps[i].memory, &maps[i]); + + if (wrote < 0) { + GST_DEBUG_OBJECT (this, "error sending to reader"); + } else { + GArray *fdids; + const gchar *client_path; + + fdids = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), fdids_quark); + if (fdids == NULL) + return; + + /* get the client path of this socket */ + client_path = g_object_get_data (G_OBJECT (myreader->socket), "pinos-client-path"); + if (client_path == NULL) + return; + + for (i = 0; i < fdids->len; i++) { + gint id = g_array_index (fdids, guint32, i); + /* now store the id/client-path/buffer in the fdmanager */ + GST_LOG ("fd index %d, client %s increment refcount of buffer %p", id, client_path, buf); + pinos_fd_manager_add (this->fdmanager, + client_path, id, + gst_buffer_ref (buf), + (GDestroyNotify) gst_buffer_unref); + } + } +} static gboolean myreader_source_func (GstBurstCacheReader *reader, GIOCondition condition, gpointer user_data) @@ -575,31 +694,7 @@ myreader_source_func (GstBurstCacheReader *reader, GIOCondition condition, gpoin return FALSE; } if (condition & G_IO_IN) { - gssize navail, nread, maxmem; - GstBuffer *buf; - GstEvent *ev; - gchar *mem; - - navail = g_socket_get_available_bytes (myreader->socket); - maxmem = MAX (navail, 1); - mem = g_malloc (maxmem); - nread = g_socket_receive (myreader->socket, mem, maxmem, NULL, NULL); - - if (nread > 0) { - buf = gst_buffer_new_wrapped (mem, navail); - ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, - gst_structure_new ("GstNetworkMessage", - "object", G_TYPE_OBJECT, myreader->socket, - "buffer", GST_TYPE_BUFFER, buf, NULL)); - gst_buffer_unref (buf); - - gst_pad_push_event (GST_BASE_SINK_PAD (this), ev); - } else { - GST_DEBUG ("client closed"); - mysource->condition &= ~G_IO_IN; - g_source_modify_unix_fd ((GSource *)mysource, mysource->tag, mysource->condition); - g_free (mem); - } + myreader_receive_buffer (this, myreader); } if (condition & G_IO_OUT) { GstBuffer *buf = NULL; @@ -621,19 +716,7 @@ myreader_source_func (GstBurstCacheReader *reader, GIOCondition condition, gpoin break; } if (buf) { - GstMapInfo maps[8]; - GOutputVector vec[8]; - guint mems_mapped; - GSocketControlMessage *cmsgs[CMSG_MAX]; - gsize msg_count; - - mems_mapped = map_n_memory_output_vector (buf, vec, maps, 8); - msg_count = gst_buffer_get_cmsg_list (buf, cmsgs, CMSG_MAX); - - g_socket_send_message (myreader->socket, NULL, vec, mems_mapped, cmsgs, msg_count, 0, - NULL, NULL); - - unmap_n_memorys (maps, mems_mapped); + myreader_send_buffer (this, myreader, buf); gst_buffer_unref (buf); } }