More work on wire protocol

Make separate payload for the header.
Make release-fd payloads
capture_buffer -> peek_buffer to avoid a copy
remove release-buffer, we really need to release each fd in the buffer
separately.
provide_buffer -> send_buffer so that we can also use this to send the
release-fd messages.
in pinossrc, send back release-fd messages when the fd is no longer in
use.
This commit is contained in:
Wim Taymans 2015-08-31 16:47:32 +02:00
parent c2cf359076
commit 8d1ad2ea63
10 changed files with 351 additions and 155 deletions

View file

@ -43,6 +43,7 @@
#include <gst/allocators/gstfdmemory.h>
static GQuark fdpayload_data_quark;
GST_DEBUG_CATEGORY_STATIC (pinos_src_debug);
#define GST_CAT_DEFAULT pinos_src_debug
@ -190,6 +191,8 @@ gst_pinos_src_class_init (GstPinosSrcClass * klass)
GST_DEBUG_CATEGORY_INIT (pinos_src_debug, "pinossrc", 0,
"Pinos Source");
fdpayload_data_quark = g_quark_from_static_string ("GstPinosSrcFDPayloadQuark");
}
static void
@ -246,55 +249,100 @@ gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
return caps;
}
typedef struct {
GstPinosSrc *src;
PinosPacketFDPayload p;
} FDPayloadData;
static void
fdpayload_data_destroy (gpointer user_data)
{
FDPayloadData *data = user_data;
GstPinosSrc *pinossrc = data->src;
PinosBufferBuilder b;
PinosPacketReleaseFDPayload r;
PinosBuffer pbuf;
r.id = data->p.id;
GST_DEBUG_OBJECT (pinossrc, "destroy %d", r.id);
pinos_buffer_builder_init (&b);
pinos_buffer_builder_add_release_fd_payload (&b, &r);
pinos_buffer_builder_end (&b, &pbuf);
GST_OBJECT_LOCK (pinossrc);
if (pinossrc->stream)
pinos_stream_send_buffer (pinossrc->stream, &pbuf);
GST_OBJECT_UNLOCK (pinossrc);
pinos_buffer_clear (&pbuf);
gst_object_unref (pinossrc);
g_slice_free (FDPayloadData, data);
}
static void
on_new_buffer (GObject *gobject,
gpointer user_data)
{
GstPinosSrc *pinossrc = user_data;
PinosBuffer pbuf;
const PinosBufferHeader *hdr;
PinosBuffer *pbuf;
PinosBufferIter it;
GstBuffer *buf;
GError *error = NULL;
GST_LOG_OBJECT (pinossrc, "got new buffer");
if (!pinos_stream_capture_buffer (pinossrc->stream, &pbuf)) {
if (!pinos_stream_peek_buffer (pinossrc->stream, &pbuf)) {
g_warning ("failed to capture buffer");
return;
}
buf = gst_buffer_new ();
hdr = pinos_buffer_get_header (&pbuf, NULL);
if (GST_CLOCK_TIME_IS_VALID (hdr->pts)) {
if (hdr->pts > GST_ELEMENT_CAST (pinossrc)->base_time)
GST_BUFFER_PTS (buf) = hdr->pts - GST_ELEMENT_CAST (pinossrc)->base_time;
if (GST_BUFFER_PTS (buf) + hdr->dts_offset > 0)
GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr->dts_offset;
}
GST_BUFFER_OFFSET (buf) = hdr->seq;
pinos_buffer_iter_init (&it, &pbuf);
pinos_buffer_iter_init (&it, pbuf);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
case PINOS_PACKET_TYPE_HEADER:
{
PinosPacketHeader hdr;
if (!pinos_buffer_iter_parse_header (&it, &hdr))
goto no_fds;
if (GST_CLOCK_TIME_IS_VALID (hdr.pts)) {
if (hdr.pts > GST_ELEMENT_CAST (pinossrc)->base_time)
GST_BUFFER_PTS (buf) = hdr.pts - GST_ELEMENT_CAST (pinossrc)->base_time;
if (GST_BUFFER_PTS (buf) + hdr.dts_offset > 0)
GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr.dts_offset;
}
GST_BUFFER_OFFSET (buf) = hdr.seq;
break;
}
case PINOS_PACKET_TYPE_FD_PAYLOAD:
{
GstMemory *fdmem = NULL;
PinosPacketFDPayload p;
FDPayloadData data;
int fd;
GST_DEBUG ("got fd payload");
pinos_buffer_iter_parse_fd_payload (&it, &p);
fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &error);
if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p))
goto no_fds;
fd = pinos_buffer_get_fd (pbuf, data.p.fd_index, &error);
if (fd == -1)
goto no_fds;
fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fd,
p.offset + p.size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (fdmem, p.offset, p.size);
data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (fdmem, data.p.offset, data.p.size);
gst_buffer_append_memory (buf, fdmem);
data.src = gst_object_ref (pinossrc);
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (fdmem),
fdpayload_data_quark,
g_slice_dup (FDPayloadData, &data),
fdpayload_data_destroy);
break;
}
default:
@ -305,8 +353,6 @@ on_new_buffer (GObject *gobject,
gst_buffer_unref (pinossrc->current);
pinossrc->current = buf;
pinos_stream_release_buffer (pinossrc->stream, &pbuf);
pinos_main_loop_signal (pinossrc->loop, FALSE);
return;
@ -678,7 +724,9 @@ gst_pinos_src_close (GstPinosSrc * pinossrc)
g_clear_object (&pinossrc->loop);
g_clear_object (&pinossrc->ctx);
g_main_context_unref (pinossrc->context);
GST_OBJECT_LOCK (pinossrc);
g_clear_object (&pinossrc->stream);
GST_OBJECT_UNLOCK (pinossrc);
if (pinossrc->current)
gst_buffer_unref (pinossrc->current);