pinospay: Also handle plain pinos input

Handle application/x-pinos specially; track all the fd indices in the
buffers and send a release-fd message when the buffer is freed.
We can then use the payloader also in the client-source to handle the
release-fd messages correctly.
This commit is contained in:
Wim Taymans 2015-12-03 15:55:45 +01:00
parent a3a0a45d0b
commit 4322ddaf0f
2 changed files with 102 additions and 6 deletions

View file

@ -113,11 +113,16 @@ gst_pinos_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
case GST_EVENT_CAPS:
{
GstCaps *caps;
GstStructure *str;
gst_event_parse_caps (event, &caps);
str = gst_caps_get_structure (caps, 0);
pay->pinos_input = gst_structure_has_name (str, "application/x-pinos");
gst_event_unref (event);
caps = gst_caps_new_empty_simple ("application/x-pinos");
res = gst_pad_push_event (pay->srcpad, gst_event_new_caps (caps));
gst_caps_unref (caps);
gst_event_unref (event);
break;
}
default:
@ -264,10 +269,90 @@ gst_pinos_pay_get_fd_memory (GstPinosPay * tmpfilepay, GstBuffer * buffer, gbool
return mem;
}
static GstFlowReturn
gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
static void
release_fds (GstPinosPay *pay, GstBuffer *buffer)
{
GArray *fdids;
guint i;
PinosBufferBuilder b;
PinosPacketReleaseFDPayload r;
PinosBuffer pbuf;
gsize size;
gpointer data;
GstBuffer *outbuf;
GstEvent *ev;
fdids = gst_mini_object_steal_qdata (GST_MINI_OBJECT_CAST (buffer),
fdids_quark);
if (fdids == NULL)
return;
pinos_buffer_builder_init (&b);
for (i = 0; i < fdids->len; i++) {
r.id = g_array_index (fdids, guint32, i);
GST_LOG ("release fd %d", r.id);
pinos_buffer_builder_add_release_fd_payload (&b, &r);
}
pinos_buffer_builder_end (&b, &pbuf);
g_array_unref (fdids);
data = pinos_buffer_steal (&pbuf, &size, NULL);
outbuf = gst_buffer_new_wrapped (data, size);
ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstNetworkMessageReceived",
"object", G_TYPE_OBJECT, pay,
"buffer", GST_TYPE_BUFFER, outbuf, NULL));
gst_buffer_unref (outbuf);
gst_pad_push_event (pay->sinkpad, ev);
}
static GstFlowReturn
gst_pinos_pay_chain_pinos (GstPinosPay *pay, GstBuffer * buffer)
{
GstMapInfo info;
PinosBuffer pbuf;
PinosBufferIter it;
GArray *fdids;
fdids = g_array_new (FALSE, FALSE, sizeof (guint32));
gst_buffer_map (buffer, &info, GST_MAP_READ);
pinos_buffer_init_data (&pbuf, info.data, info.size, NULL);
pinos_buffer_iter_init (&it, &pbuf);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
case PINOS_PACKET_TYPE_FD_PAYLOAD:
{
PinosPacketFDPayload p;
if (!pinos_buffer_iter_parse_fd_payload (&it, &p))
continue;
GST_LOG ("track fd index %d", p.id);
g_array_append_val (fdids, p.id);
break;
}
default:
break;
}
}
gst_buffer_unmap (buffer, &info);
pinos_buffer_clear (&pbuf);
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buffer),
fdids_quark, fdids, NULL);
gst_mini_object_weak_ref (GST_MINI_OBJECT_CAST (buffer),
(GstMiniObjectNotify) release_fds, pay);
return gst_pad_push (pay->srcpad, buffer);
}
static GstFlowReturn
gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer)
{
GstPinosPay *pay = GST_PINOS_PAY (parent);
GstMemory *fdmem = NULL;
GError *err = NULL;
GstBuffer *outbuf;
@ -343,6 +428,17 @@ add_fd_failed:
return GST_FLOW_ERROR;
}
}
static GstFlowReturn
gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstPinosPay *pay = GST_PINOS_PAY (parent);
if (pay->pinos_input)
return gst_pinos_pay_chain_pinos (pay, buffer);
else
return gst_pinos_pay_chain_other (pay, buffer);
}
static void
gst_pinos_pay_finalize (GObject * object)
{

View file

@ -38,7 +38,7 @@ struct _GstPinosPay
{
GstElement parent;
gboolean negotiated;
gboolean pinos_input;
GstPad *srcpad, *sinkpad;
GstAllocator *allocator;
@ -55,4 +55,4 @@ GType gst_pinos_pay_get_type (void);
G_END_DECLS
#endif
#endif /* _GST_PINOS_PAY_H_ */