diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c
index 3bef337ae..1efbf449c 100644
--- a/pinos/client/buffer.c
+++ b/pinos/client/buffer.c
@@ -38,8 +38,8 @@ G_STATIC_ASSERT (sizeof (PinosStackBuffer) <= sizeof (PinosBuffer));
*
* Initialize @buffer with @data and @size and @fds and @n_fds.
* The memory pointer to by @data and @fds becomes property of @buffer
- * and should not be freed or modified until pinos_buffer_clear() is
- * called.
+ * and should not be freed or modified until all referenced to the buffer
+ * are gone pinos_buffer_unref().
*/
void
pinos_buffer_init_data (PinosBuffer *buffer,
@@ -50,7 +50,10 @@ pinos_buffer_init_data (PinosBuffer *buffer,
{
PinosStackBuffer *sb = PSB (buffer);
+ g_debug ("buffer %p: init", buffer);
+
sb->magic = PSB_MAGIC;
+ sb->refcount = 1;
sb->data = data;
sb->size = size;
sb->max_size = size;
@@ -61,20 +64,44 @@ pinos_buffer_init_data (PinosBuffer *buffer,
sb->free_fds = NULL;
}
-void
-pinos_buffer_clear (PinosBuffer *buffer)
+PinosBuffer *
+pinos_buffer_ref (PinosBuffer *buffer)
{
PinosStackBuffer *sb = PSB (buffer);
+
+ g_return_val_if_fail (is_valid_buffer (buffer), NULL);
+
+ g_debug ("buffer %p: ref %d -> %d", buffer, sb->refcount, sb->refcount+1);
+
+ sb->refcount++;
+
+ return buffer;
+}
+
+gboolean
+pinos_buffer_unref (PinosBuffer *buffer)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+ gboolean res;
gint i;
- g_return_if_fail (is_valid_buffer (buffer));
+ g_return_val_if_fail (is_valid_buffer (buffer), FALSE);
- sb->magic = 0;
- g_free (sb->free_data);
- for (i = 0; i < sb->n_fds; i++)
- close (sb->fds[i]);
- g_free (sb->free_fds);
- sb->n_fds = 0;
+ g_debug ("buffer %p: unref %d -> %d", buffer, sb->refcount, sb->refcount-1);
+
+ sb->refcount--;
+ res = sb->refcount > 0;
+ if (!res) {
+ sb->magic = 0;
+ g_free (sb->free_data);
+ for (i = 0; i < sb->n_fds; i++) {
+ g_debug ("%p: close %d %d", buffer, i, sb->fds[i]);
+ close (sb->fds[i]);
+ }
+ g_free (sb->free_fds);
+ sb->n_fds = 0;
+ }
+ return res;
}
/**
@@ -158,7 +185,8 @@ pinos_buffer_steal_data (PinosBuffer *buffer,
PinosStackBuffer *sb = PSB (buffer);
gpointer data;
- g_return_val_if_fail (is_valid_buffer (buffer), 0);
+ g_return_val_if_fail (is_valid_buffer (buffer), NULL);
+ g_return_val_if_fail (sb->refcount == 1, NULL);
data = sb->data;
if (size)
@@ -190,7 +218,8 @@ pinos_buffer_steal_fds (PinosBuffer *buffer,
PinosStackBuffer *sb = PSB (buffer);
gint *fds;
- g_return_val_if_fail (is_valid_buffer (buffer), 0);
+ g_return_val_if_fail (is_valid_buffer (buffer), NULL);
+ g_return_val_if_fail (sb->refcount == 1, NULL);
fds = sb->fds;
if (n_fds)
@@ -332,6 +361,22 @@ pinos_buffer_iter_next (PinosBufferIter *iter)
return TRUE;
}
+/**
+ * pinos_buffer_iter_done:
+ * @iter: a #PinosBufferIter
+ *
+ * End iterations on @iter.
+ */
+void
+pinos_buffer_iter_end (PinosBufferIter *iter)
+{
+ struct stack_iter *si = PPSI (iter);
+
+ g_return_if_fail (is_valid_iter (iter));
+
+ si->magic = 0;
+}
+
PinosPacketType
pinos_buffer_iter_get_type (PinosBufferIter *iter)
{
@@ -408,6 +453,8 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder,
sb->buf.max_size = sizeof (PinosStackHeader) + 128;
sb->buf.data = g_malloc (sb->buf.max_size);
sb->buf.free_data = sb->buf.data;
+ g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT,
+ builder, max_data, sb->buf.max_size);
} else {
sb->buf.max_size = max_data;
sb->buf.data = data;
@@ -493,6 +540,7 @@ pinos_buffer_builder_end (PinosBufferBuilder *builder,
sb->sh->length = sb->buf.size - sizeof (PinosStackHeader);
sbuf->magic = PSB_MAGIC;
+ sbuf->refcount = 1;
sbuf->data = sb->buf.data;
sbuf->size = sb->buf.size;
sbuf->max_size = sb->buf.max_size;
@@ -502,6 +550,8 @@ pinos_buffer_builder_end (PinosBufferBuilder *builder,
sbuf->n_fds = sb->buf.n_fds;
sbuf->max_fds = sb->buf.max_fds;
sbuf->free_fds = sb->buf.free_fds;
+
+ g_debug ("builder %p: buffer %p init", builder, buffer);
}
/**
@@ -524,8 +574,11 @@ pinos_buffer_builder_add_fd (PinosBufferBuilder *builder,
g_return_val_if_fail (fd > 0, -1);
if (sb->buf.n_fds >= sb->buf.max_fds) {
- sb->buf.max_fds += 8;
- sb->buf.free_fds = g_realloc (sb->buf.free_fds, sb->buf.max_fds * sizeof (int));
+ gint new_size = sb->buf.max_fds + 8;
+ g_warning ("builder %p: realloc buffer fds %d -> %d",
+ builder, sb->buf.max_fds, new_size);
+ sb->buf.max_fds = new_size;
+ sb->buf.free_fds = g_realloc (sb->buf.free_fds, new_size * sizeof (int));
sb->buf.fds = sb->buf.free_fds;
}
index = sb->buf.n_fds;
@@ -539,8 +592,11 @@ static gpointer
builder_ensure_size (struct stack_builder *sb, gsize size)
{
if (sb->buf.size + size > sb->buf.max_size) {
- sb->buf.max_size = sb->buf.size + MAX (size, 1024);
- sb->buf.free_data = g_realloc (sb->buf.free_data, sb->buf.max_size);
+ gsize new_size = sb->buf.size + MAX (size, 1024);
+ g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT,
+ sb, sb->buf.max_size, new_size);
+ sb->buf.max_size = new_size;
+ sb->buf.free_data = g_realloc (sb->buf.free_data, new_size);
sb->sh = sb->buf.data = sb->buf.free_data;
}
return (guint8 *) sb->buf.data + sb->buf.size;
diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h
index abfcfec64..eb1656964 100644
--- a/pinos/client/buffer.h
+++ b/pinos/client/buffer.h
@@ -55,7 +55,8 @@ void pinos_buffer_init_data (PinosBuffer *buffer,
gint *fds,
gint n_fds);
-void pinos_buffer_clear (PinosBuffer *buffer);
+PinosBuffer * pinos_buffer_ref (PinosBuffer *buffer);
+gboolean pinos_buffer_unref (PinosBuffer *buffer);
guint32 pinos_buffer_get_version (PinosBuffer *buffer);
PinosBufferFlags pinos_buffer_get_flags (PinosBuffer *buffer);
@@ -109,6 +110,7 @@ void pinos_buffer_iter_init_full (PinosBufferIter *iter,
#define pinos_buffer_iter_init(i,b) pinos_buffer_iter_init_full(i,b, PINOS_BUFFER_VERSION);
gboolean pinos_buffer_iter_next (PinosBufferIter *iter);
+void pinos_buffer_iter_end (PinosBufferIter *iter);
PinosPacketType pinos_buffer_iter_get_type (PinosBufferIter *iter);
gpointer pinos_buffer_iter_get_data (PinosBufferIter *iter, gsize *size);
diff --git a/pinos/client/client-node.c b/pinos/client/client-node.c
index 9fe97f081..553eab527 100644
--- a/pinos/client/client-node.c
+++ b/pinos/client/client-node.c
@@ -159,6 +159,8 @@ create_failed:
g_warning ("failed to create port: %s", error->message);
g_task_return_error (task, error);
g_object_unref (task);
+ if (ret)
+ g_variant_unref (ret);
return;
}
}
@@ -221,9 +223,9 @@ client_node_remove_port (PinosNode *node,
static void
pinos_client_node_get_property (GObject *_object,
- guint prop_id,
- GValue *value,
- GParamSpec *pspec)
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
{
PinosClientNode *node = PINOS_CLIENT_NODE (_object);
PinosClientNodePrivate *priv = node->priv;
diff --git a/pinos/client/client-port.c b/pinos/client/client-port.c
index c9f5609c7..91c05a0e8 100644
--- a/pinos/client/client-port.c
+++ b/pinos/client/client-port.c
@@ -100,23 +100,21 @@ proxy_g_properties_changed (GDBusProxy *_proxy,
gpointer data;
GBytes *bytes;
- g_debug ("changed %s", key);
-
variant = g_dbus_proxy_get_cached_property (_proxy, key);
if (variant == NULL)
continue;
- if (strcmp (key, "Format") == 0) {
+ if (strcmp (key, "PossibleFormats") == 0) {
data = g_variant_dup_string (variant, &size);
bytes = g_bytes_new_take (data, size);
- g_object_set (port, "format", bytes, NULL);
+ g_object_set (port, "possible-formats", bytes, NULL);
+ g_bytes_unref (bytes);
}
g_variant_unref (variant);
}
g_variant_iter_free (iter);
}
-#if 0
static void
proxy_set_property_cb (GDBusProxy *proxy,
GAsyncResult *res,
@@ -144,23 +142,11 @@ on_property_notify (GObject *obj,
const gchar *prop_name = NULL;
GVariant *variant;
- g_debug ("update %s", pspec ? g_param_spec_get_name (pspec) : "NULL");
-
- if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "properties") == 0) {
- PinosProperties *props = pinos_port_get_properties (port);
- prop_name = "Properties";
- variant = props ? pinos_properties_to_variant (props) : NULL;
- }
if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "possible-formats") == 0) {
GBytes *bytes = pinos_port_get_possible_formats (port);
prop_name = "PossibleFormats";
variant = bytes ? g_variant_new_string (g_bytes_get_data (bytes, NULL)) : NULL;
}
- if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "format") == 0) {
- GBytes *bytes = pinos_port_get_format (port);
- prop_name = "Format";
- variant = bytes ? g_variant_new_string (g_bytes_get_data (bytes, NULL)) : NULL;
- }
if (prop_name) {
g_dbus_proxy_call (G_DBUS_PROXY (priv->proxy),
"org.freedesktop.DBus.Properties.Set",
@@ -173,11 +159,8 @@ on_property_notify (GObject *obj,
NULL,
(GAsyncReadyCallback) proxy_set_property_cb,
port);
- if (variant)
- g_variant_unref (variant);
}
}
-#endif
static void
pinos_client_port_constructed (GObject * object)
@@ -190,6 +173,7 @@ pinos_client_port_constructed (GObject * object)
"g-properties-changed",
(GCallback) proxy_g_properties_changed,
port);
+ g_signal_connect (port, "notify", (GCallback) on_property_notify, port);
G_OBJECT_CLASS (pinos_client_port_parent_class)->constructed (object);
}
diff --git a/pinos/client/port.c b/pinos/client/port.c
index 21d98bc08..e8d5e3da8 100644
--- a/pinos/client/port.c
+++ b/pinos/client/port.c
@@ -36,6 +36,15 @@
#define PINOS_PORT_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_PORT, PinosPortPrivate))
+#if 0
+#define PINOS_DEBUG_TRANSPORT(format,args...) g_debug(format,##args)
+#else
+#define PINOS_DEBUG_TRANSPORT(format,args...)
+#endif
+
+#define MAX_BUFFER_SIZE 1024
+#define MAX_FDS 16
+
struct _PinosPortPrivate
{
PinosNode *node;
@@ -49,7 +58,12 @@ struct _PinosPortPrivate
GSource *socket_source;
- PinosBuffer net_buffer;
+ PinosBuffer recv_buffer;
+ guint8 recv_data[MAX_BUFFER_SIZE];
+ gint recv_fds[MAX_FDS];
+
+ guint8 send_data[MAX_BUFFER_SIZE];
+ gint send_fds[MAX_FDS];
PinosBuffer *buffer;
PinosPort *peers[16];
@@ -367,7 +381,6 @@ pinos_port_link (PinosPort *source, PinosPort *destination)
source->priv->peers[source->priv->n_peers++] = destination;
destination->priv->peers[destination->priv->n_peers++] = source;
- g_object_set (destination, "format", source->priv->format, NULL);
g_debug ("port %p: linked to %p", source, destination);
g_signal_emit (source, signals[SIGNAL_LINKED], 0, destination);
@@ -444,27 +457,31 @@ pinos_port_get_n_links (PinosPort *port)
return port->priv->n_peers;
}
-static gboolean
-read_buffer (GSocket *socket,
- PinosBuffer *buffer,
+static PinosBuffer *
+read_buffer (PinosPort *port,
GError **error)
{
+ PinosPortPrivate *priv = port->priv;
gssize len;
GInputVector ivec;
PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
- PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
+ PinosStackBuffer *sb = (PinosStackBuffer *) &priv->recv_buffer;
gint num_messages = 0;
gint flags = 0;
gsize need;
gint i;
- need = sizeof (PinosStackHeader);
+ g_assert (sb->refcount == 0);
- if (sb->max_size < need) {
- sb->max_size = need;
- sb->data = sb->free_data = g_realloc (sb->free_data, need);
- }
+ sb->data = priv->recv_data;
+ sb->max_size = MAX_BUFFER_SIZE;
+ sb->size = 0;
+ sb->free_data = NULL;
+ sb->fds = priv->recv_fds;
+ sb->max_fds = MAX_FDS;
+ sb->n_fds = 0;
+ sb->free_fds = NULL;
hdr = sb->data;
@@ -472,7 +489,7 @@ read_buffer (GSocket *socket,
ivec.buffer = hdr;
ivec.size = sizeof (PinosStackHeader);
- len = g_socket_receive_message (socket,
+ len = g_socket_receive_message (priv->sockets[0],
NULL,
&ivec,
1,
@@ -482,14 +499,15 @@ read_buffer (GSocket *socket,
NULL,
error);
if (len == -1)
- return FALSE;
+ return NULL;
g_assert (len == sizeof (PinosStackHeader));
/* now we know the total length */
- need += hdr->length;
+ need = sizeof (PinosStackHeader) + hdr->length;
if (sb->max_size < need) {
+ g_warning ("port %p: realloc receive memory %" G_GSIZE_FORMAT" -> %" G_GSIZE_FORMAT, port, sb->max_size, need);
sb->max_size = need;
hdr = sb->data = sb->free_data = g_realloc (sb->free_data, need);
}
@@ -497,18 +515,19 @@ read_buffer (GSocket *socket,
if (hdr->length > 0) {
/* read data */
- len = g_socket_receive (socket,
+ len = g_socket_receive (priv->sockets[0],
(gchar *)sb->data + sizeof (PinosStackHeader),
hdr->length,
NULL,
error);
if (len == -1)
- return FALSE;
+ return NULL;
g_assert (len == hdr->length);
}
if (sb->max_fds < num_messages) {
+ g_warning ("port %p: realloc receive fds %d -> %d", port, sb->max_fds, num_messages);
sb->max_fds = num_messages;
sb->fds = sb->free_fds = g_realloc (sb->free_fds, num_messages * sizeof (int));
}
@@ -524,14 +543,18 @@ read_buffer (GSocket *socket,
fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds);
for (j = 0; j < n_fds; j++)
sb->fds[i] = fds[i];
+ sb->n_fds = n_fds;
g_free (fds);
g_object_unref (msg);
}
g_free (messages);
+ sb->refcount = 1;
sb->magic = PSB_MAGIC;
- return TRUE;
+ g_debug ("port %p: buffer %p init", &priv->recv_buffer, sb);
+
+ return &priv->recv_buffer;
}
@@ -593,6 +616,73 @@ send_error:
}
}
+static void
+parse_control_buffer (PinosPort *port, PinosBuffer *buffer)
+{
+ PinosPortPrivate *priv = port->priv;
+ PinosBufferIter it;
+
+ pinos_buffer_iter_init (&it, buffer);
+ while (pinos_buffer_iter_next (&it)) {
+ switch (pinos_buffer_iter_get_type (&it)) {
+ case PINOS_PACKET_TYPE_FORMAT_CHANGE:
+ {
+ PinosPacketFormatChange change;
+
+ if (!pinos_buffer_iter_parse_format_change (&it, &change))
+ continue;
+
+ if (priv->format)
+ g_bytes_unref (priv->format);
+ priv->format = g_bytes_new (change.format, strlen (change.format) + 1);
+ g_object_notify (G_OBJECT (port), "format");
+ break;
+ }
+ default:
+ break;
+ }
+ }
+}
+
+static gboolean
+pinos_port_receive_buffer (PinosPort *port,
+ PinosBuffer *buffer,
+ GError **error)
+{
+ PinosPortPrivate *priv = port->priv;
+ gboolean res;
+
+ if (priv->buffer)
+ goto buffer_queued;
+
+ PINOS_DEBUG_TRANSPORT ("port %p: receive buffer %p", port, buffer);
+ if (pinos_buffer_get_flags (buffer) & PINOS_BUFFER_FLAG_CONTROL)
+ parse_control_buffer (port, buffer);
+
+ if (priv->sockets[0]) {
+ PINOS_DEBUG_TRANSPORT ("port %p: write buffer %p", port, buffer);
+ res = write_buffer (priv->sockets[0], buffer, error);
+ }
+ else {
+ res = TRUE;
+ priv->buffer = buffer;
+ if (priv->received_buffer_cb)
+ priv->received_buffer_cb (port, priv->received_buffer_data);
+ priv->buffer = NULL;
+ }
+
+ return res;
+
+ /* ERRORS */
+buffer_queued:
+ {
+ g_set_error (error,
+ G_IO_ERROR,
+ G_IO_ERROR_NOT_FOUND,
+ "buffer was already queued on port");
+ return FALSE;
+ }
+}
static gboolean
on_socket_condition (GSocket *socket,
@@ -605,27 +695,42 @@ on_socket_condition (GSocket *socket,
switch (condition) {
case G_IO_IN:
- if (!read_buffer (socket, &priv->net_buffer, &error))
- goto read_failed;
- g_debug ("port %p: received buffer", port);
+ {
+ gint i;
+ PinosBuffer *buffer;
- if (priv->direction == PINOS_DIRECTION_INPUT) {
- priv->buffer = &priv->net_buffer;
- if (priv->received_buffer_cb)
- priv->received_buffer_cb (port, priv->received_buffer_data);
- } else {
- gint i;
- for (i = 0; i < priv->n_peers; i++) {
- PinosPort *peer = priv->peers[i];
- if (peer == NULL)
- continue;
+ buffer = read_buffer (port, &error);
+ if (buffer == NULL) {
+ g_warning ("port %p: failed to read buffer: %s", port, error->message);
+ g_clear_error (&error);
+ return TRUE;
+ }
- g_debug ("port %p: send buffer %p to peer %p", port, &priv->net_buffer, peer);
- if (!pinos_port_receive_buffer (peer, &priv->net_buffer, &error))
- goto read_failed;
+ if (pinos_buffer_get_flags (buffer) & PINOS_BUFFER_FLAG_CONTROL)
+ parse_control_buffer (port, buffer);
+
+ PINOS_DEBUG_TRANSPORT ("port %p: read buffer %p", port, buffer);
+
+ if (priv->received_buffer_cb) {
+ PINOS_DEBUG_TRANSPORT ("port %p: notify buffer %p", port, buffer);
+ priv->buffer = buffer;
+ priv->received_buffer_cb (port, priv->received_buffer_data);
+ priv->buffer = NULL;
+ }
+ PINOS_DEBUG_TRANSPORT ("port %p: send to peer buffer %p", port, buffer);
+ for (i = 0; i < priv->n_peers; i++) {
+ PinosPort *peer = priv->peers[i];
+ if (peer == NULL)
+ continue;
+
+ if (!pinos_port_receive_buffer (peer, buffer, &error)) {
+ g_warning ("peer %p: failed to receive buffer: %s", peer, error->message);
+ g_clear_error (&error);
}
}
+ g_assert (pinos_buffer_unref (buffer) == FALSE);
break;
+ }
case G_IO_OUT:
g_warning ("can do IO OUT\n");
@@ -635,13 +740,6 @@ on_socket_condition (GSocket *socket,
break;
}
return TRUE;
-
-read_failed:
- {
- g_warning ("failed to read buffer: %s", error->message);
- g_clear_error (&error);
- return TRUE;
- }
}
@@ -669,62 +767,11 @@ unhandle_socket (PinosPort *port)
}
}
-/**
- * pinos_port_recieve_buffer:
- * @port: a #PinosPort
- * @buffer: a #PinosBuffer
- * @error: a #GError or %NULL
- *
- * Receive a buffer on @port.
- *
- * Returns: %TRUE if the buffer could be accepted. %FALSE if port
- * already has an unconsumed buffer.
- */
-gboolean
-pinos_port_receive_buffer (PinosPort *port,
- PinosBuffer *buffer,
- GError **error)
-{
- PinosPortPrivate *priv;
- gboolean res;
-
- g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
- priv = port->priv;
-
- if (priv->buffer)
- goto buffer_queued;
-
- if (priv->sockets[0]) {
- g_debug ("port %p: receive buffer %p write to socket", port, buffer);
- res = write_buffer (priv->sockets[0], buffer, error);
- }
- else {
- g_debug ("port %p: receive buffer %p signal", port, buffer);
- res = TRUE;
- priv->buffer = buffer;
- if (priv->received_buffer_cb)
- priv->received_buffer_cb (port, priv->received_buffer_data);
- }
-
- return res;
-
- /* ERRORS */
-buffer_queued:
- {
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_NOT_FOUND,
- "buffer was already queued on port");
- return FALSE;
- }
-}
-
/**
* pinos_port_peek_buffer:
* @port: a #PinosPort
*
- * Check if there is a buffer on @port and peek it without dequeueing it
- * from the port.
+ * Peek the buffer on @port.
*
* Returns: a #PinosBuffer or %NULL when no buffer has arrived on the pad.
*/
@@ -739,27 +786,18 @@ pinos_port_peek_buffer (PinosPort *port)
return priv->buffer;
}
-/**
- * pinos_port_get_buffer:
- * @port: a #PinosPort
- *
- * Get the buffer on @port. The buffer will no longer be queued on the port.
- *
- * Returns: a #PinosBuffer or %NULL when no buffer has arrived on the pad.
- */
-PinosBuffer *
-pinos_port_get_buffer (PinosPort *port)
+void
+pinos_port_buffer_builder_init (PinosPort *port,
+ PinosBufferBuilder *builder)
{
PinosPortPrivate *priv;
- PinosBuffer *res;
- g_return_val_if_fail (PINOS_IS_PORT (port), NULL);
+ g_return_if_fail (PINOS_IS_PORT (port));
priv = port->priv;
- res = priv->buffer;
- priv->buffer = NULL;
-
- return res;
+ pinos_buffer_builder_init_into (builder,
+ priv->send_data, MAX_BUFFER_SIZE,
+ priv->send_fds, MAX_FDS);
}
/**
@@ -785,15 +823,18 @@ pinos_port_send_buffer (PinosPort *port,
g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
priv = port->priv;
- if (priv->direction == PINOS_DIRECTION_OUTPUT && priv->sockets[0]) {
- g_debug ("port %p: send buffer %p write to socket", port, buffer);
+ if (pinos_buffer_get_flags (buffer) & PINOS_BUFFER_FLAG_CONTROL)
+ parse_control_buffer (port, buffer);
+
+ PINOS_DEBUG_TRANSPORT ("port %p: send buffer %p", port, buffer);
+ if (priv->sockets[0]) {
+ PINOS_DEBUG_TRANSPORT ("port %p: write buffer %p", port, buffer);
res = write_buffer (priv->sockets[0], buffer, error);
}
for (i = 0; i < priv->n_peers; i++) {
peer = priv->peers[i];
if (peer == NULL)
continue;
- g_debug ("port %p: send buffer %p to peer %p", port, buffer, peer);
res = pinos_port_receive_buffer (peer, buffer, error);
}
return res;
@@ -853,80 +894,6 @@ create_failed:
}
}
-static void
-set_format (PinosPort *port, GBytes *format)
-{
- PinosPortPrivate *priv = port->priv;
-
- if (priv->format)
- g_bytes_unref (priv->format);
- priv->format = format;
-
- g_debug ("port %p: set format", port);
- if (priv->direction == PINOS_DIRECTION_OUTPUT) {
- gint i;
-
- for (i = 0; i < priv->n_peers; i++) {
- PinosPort *peer = priv->peers[i];
- if (peer == NULL)
- continue;
- set_format (peer, g_bytes_ref (format));
- g_object_notify (G_OBJECT (peer), "format");
- }
- }
-}
-
-/**
- * pinos_port_update_format:
- * @port: a #PinosPort
- * @format: a new format
- * @error: a #GError or %NULL
- *
- * Update the format on @port.
- *
- * Returns: %TRUE on succes, @error is set when %FALSE is returned.
- */
-gboolean
-pinos_port_update_format (PinosPort *port, GBytes *format, GError **error)
-{
- PinosPortPrivate *priv;
- gboolean res = TRUE;
-
- g_return_val_if_fail (PINOS_IS_PORT (port), FALSE);
- priv = port->priv;
-
- if (priv->format)
- g_bytes_unref (priv->format);
- priv->format = format;
-
- g_debug ("port %p: update format", port);
- if (priv->direction == PINOS_DIRECTION_INPUT && priv->sockets[0]) {
- PinosBufferBuilder builder;
- PinosBuffer pbuf;
- PinosPacketFormatChange fc;
-
- pinos_buffer_builder_init (&builder);
- fc.id = 0;
- fc.format = g_bytes_get_data (format, NULL);
- pinos_buffer_builder_add_format_change (&builder, &fc);
- pinos_buffer_builder_end (&builder, &pbuf);
-
- g_debug ("port %p: send format message %s", port, fc.format);
- res = write_buffer (priv->sockets[0], &pbuf, error);
- pinos_buffer_clear (&pbuf);
- } else if (priv->direction == PINOS_DIRECTION_OUTPUT) {
- gint i;
-
- for (i = 0; i < priv->n_peers; i++) {
- PinosPort *peer = priv->peers[i];
- if (peer == NULL)
- continue;
- res = pinos_port_update_format (peer, g_bytes_ref (format), error);
- }
- }
- return res;
-}
-
static void
pinos_port_get_property (GObject *_object,
guint prop_id,
@@ -983,7 +950,6 @@ pinos_port_set_property (GObject *_object,
switch (prop_id) {
case PROP_NODE:
priv->node = g_value_get_object (value);
- g_debug ("port %p: set node %p %d", port, priv->node, G_OBJECT (priv->node)->ref_count);
break;
case PROP_SOCKET:
@@ -1005,7 +971,9 @@ pinos_port_set_property (GObject *_object,
break;
case PROP_FORMAT:
- set_format (port, g_value_dup_boxed (value));
+ if (priv->format)
+ g_bytes_unref (priv->format);
+ priv->format = g_value_dup_boxed (value);
break;
case PROP_PROPERTIES:
@@ -1026,8 +994,8 @@ pinos_port_constructed (GObject * object)
PinosPort *port = PINOS_PORT (object);
PinosPortPrivate *priv = port->priv;
- g_debug ("port %p: %s port constructed, node %p %d",
- port, pinos_direction_as_string (priv->direction), priv->node, G_OBJECT (priv->node)->ref_count);
+ g_debug ("port %p: %s port constructed, node %p",
+ port, pinos_direction_as_string (priv->direction), priv->node);
if (priv->sockets[0])
handle_socket (port, priv->sockets[0]);
@@ -1141,7 +1109,7 @@ pinos_port_class_init (PinosPortClass * klass)
"The format of the port",
G_TYPE_BYTES,
G_PARAM_READWRITE |
- G_PARAM_CONSTRUCT |
+ G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
@@ -1151,7 +1119,7 @@ pinos_port_class_init (PinosPortClass * klass)
"The properties of the port",
PINOS_TYPE_PROPERTIES,
G_PARAM_READWRITE |
- G_PARAM_CONSTRUCT |
+ G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
diff --git a/pinos/client/port.h b/pinos/client/port.h
index 7d48e399b..aa7018fcb 100644
--- a/pinos/client/port.h
+++ b/pinos/client/port.h
@@ -84,9 +84,6 @@ PinosProperties * pinos_port_get_properties (PinosPort *port);
GBytes * pinos_port_filter_formats (PinosPort *port,
GBytes *filter,
GError **error);
-gboolean pinos_port_update_format (PinosPort *port,
- GBytes *format,
- GError **error);
GSocket * pinos_port_get_socket_pair (PinosPort *port,
GError **error);
@@ -97,11 +94,10 @@ gboolean pinos_port_unlink (PinosPort *source,
PinosPort *destination);
gint pinos_port_get_n_links (PinosPort *port);
-gboolean pinos_port_receive_buffer (PinosPort *port,
- PinosBuffer *buffer,
- GError **error);
PinosBuffer * pinos_port_peek_buffer (PinosPort *port);
-PinosBuffer * pinos_port_get_buffer (PinosPort *port);
+
+void pinos_port_buffer_builder_init (PinosPort *port,
+ PinosBufferBuilder *builder);
gboolean pinos_port_send_buffer (PinosPort *port,
PinosBuffer *buffer,
diff --git a/pinos/client/private.h b/pinos/client/private.h
index 011e9fced..4da5cc7a2 100644
--- a/pinos/client/private.h
+++ b/pinos/client/private.h
@@ -73,6 +73,7 @@ typedef struct {
gint max_fds;
gpointer free_fds;
gsize magic;
+ gint refcount;
} PinosStackBuffer;
#define PSB(b) ((PinosStackBuffer *) (b))
diff --git a/pinos/client/stream.c b/pinos/client/stream.c
index b491fe95f..1685e014d 100644
--- a/pinos/client/stream.c
+++ b/pinos/client/stream.c
@@ -148,7 +148,6 @@ pinos_stream_set_property (GObject *_object,
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_value_dup_boxed (value);
- g_object_set (priv->port, "format", priv->format, NULL);
break;
default:
@@ -479,7 +478,6 @@ on_received_buffer (PinosPort *port,
{
PinosStream *stream = user_data;
- g_debug ("buffer received");
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
}
@@ -824,30 +822,45 @@ pinos_stream_disconnect (PinosStream *stream)
}
/**
- * pinos_stream_get_buffer:
+ * pinos_stream_peek_buffer:
* @stream: a #PinosStream
- * @buffer: a #PinosBuffer
*
- * Get the next buffer from @stream. This function should be called from
+ * Get the current buffer from @stream. This function should be called from
* the new-buffer signal callback.
*
- * Returns: %TRUE when @buffer contains valid information
+ * Returns: a #PinosBuffer or %NULL when there is no buffer.
*/
-gboolean
-pinos_stream_get_buffer (PinosStream *stream,
- PinosBuffer **buffer)
+PinosBuffer *
+pinos_stream_peek_buffer (PinosStream *stream)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
- g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
//g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
- *buffer = pinos_port_get_buffer (priv->port);
+ return pinos_port_peek_buffer (priv->port);
+}
- return TRUE;
+/**
+ * pinos_stream_buffer_builder_init:
+ * @stream: a #PinosStream
+ * @builder: a #PinosBufferBuilder
+ *
+ * Get a #PinosBufferBuilder for @stream.
+ *
+ * Returns: a #PinosBuffer or %NULL when there is no buffer.
+ */
+void
+pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *builder)
+{
+ PinosStreamPrivate *priv;
+
+ g_return_if_fail (PINOS_IS_STREAM (stream));
+ priv = stream->priv;
+
+ pinos_port_buffer_builder_init (priv->port, builder);
}
/**
diff --git a/pinos/client/stream.h b/pinos/client/stream.h
index 187b54b44..8d0cc6579 100644
--- a/pinos/client/stream.h
+++ b/pinos/client/stream.h
@@ -106,8 +106,9 @@ gboolean pinos_stream_start (PinosStream *stream,
PinosStreamMode mode);
gboolean pinos_stream_stop (PinosStream *stream);
-gboolean pinos_stream_get_buffer (PinosStream *stream,
- PinosBuffer **buffer);
+PinosBuffer * pinos_stream_peek_buffer (PinosStream *stream);
+void pinos_stream_buffer_builder_init (PinosStream *stream,
+ PinosBufferBuilder *builder);
gboolean pinos_stream_send_buffer (PinosStream *stream,
PinosBuffer *buffer);
diff --git a/pinos/dbus/org.pinos.xml b/pinos/dbus/org.pinos.xml
index 983882ea7..42c5c4c33 100644
--- a/pinos/dbus/org.pinos.xml
+++ b/pinos/dbus/org.pinos.xml
@@ -145,9 +145,9 @@
-
+
diff --git a/pinos/gst/gstpinosdepay.c b/pinos/gst/gstpinosdepay.c
index 3d8de37aa..cca5e421b 100644
--- a/pinos/gst/gstpinosdepay.c
+++ b/pinos/gst/gstpinosdepay.c
@@ -251,7 +251,8 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer)
break;
}
}
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_iter_init (&it, &pbuf);
+ pinos_buffer_unref (&pbuf);
gst_buffer_unmap (buffer, &info);
gst_buffer_unref (buffer);
diff --git a/pinos/gst/gstpinospay.c b/pinos/gst/gstpinospay.c
index 149c89ef4..ff6a87d26 100644
--- a/pinos/gst/gstpinospay.c
+++ b/pinos/gst/gstpinospay.c
@@ -254,8 +254,9 @@ client_buffer_received (GstPinosPay *pay, GstBuffer *buffer,
break;
}
}
+ pinos_buffer_iter_end (&it);
+ pinos_buffer_unref (&pbuf);
gst_buffer_unmap (buffer, &info);
- pinos_buffer_clear (&pbuf);
if (pay->pinos_input) {
GstBuffer *outbuf;
@@ -440,8 +441,9 @@ gst_pinos_pay_chain_pinos (GstPinosPay *pay, GstBuffer * buffer)
break;
}
}
+ pinos_buffer_iter_end (&it);
+ pinos_buffer_unref (&pbuf);
gst_buffer_unmap (buffer, &info);
- pinos_buffer_clear (&pbuf);
if (fdids != NULL) {
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buffer),
@@ -490,6 +492,7 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer)
data = pinos_buffer_steal_data (&pbuf, &size);
fds = pinos_buffer_steal_fds (&pbuf, &n_fds);
+ pinos_buffer_unref (&pbuf);
outbuf = gst_buffer_new_wrapped (data, size);
GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (buffer);
diff --git a/pinos/gst/gstpinosportsink.c b/pinos/gst/gstpinosportsink.c
index 2f079b705..003d37d41 100644
--- a/pinos/gst/gstpinosportsink.c
+++ b/pinos/gst/gstpinosportsink.c
@@ -84,7 +84,7 @@ on_received_buffer (PinosPort *port, gpointer user_data)
PinosBufferBuilder b;
gboolean have_out = FALSE;
- pbuf = pinos_port_get_buffer (port);
+ pbuf = pinos_port_peek_buffer (port);
if (this->pinos_input) {
pinos_buffer_builder_init (&b);
@@ -93,7 +93,7 @@ on_received_buffer (PinosPort *port, gpointer user_data)
pinos_buffer_iter_init (&it, pbuf);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
- case PINOS_PACKET_TYPE_REFRESH_REQUEST:
+ case PINOS_PACKET_TYPE_REFRESH_REQUEST:
{
PinosPacketRefreshRequest p;
@@ -115,7 +115,7 @@ on_received_buffer (PinosPort *port, gpointer user_data)
break;
}
}
- pinos_buffer_clear (pbuf);
+ pinos_buffer_iter_end (&it);
if (this->pinos_input) {
GstBuffer *outbuf;
@@ -225,16 +225,23 @@ gst_pinos_port_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
str = gst_caps_get_structure (caps, 0);
this->pinos_input = gst_structure_has_name (str, "application/x-pinos");
if (!this->pinos_input) {
- GBytes *format;
GError *error = NULL;
+ PinosBufferBuilder builder;
+ PinosBuffer pbuf;
+ PinosPacketFormatChange fc;
- cstr = gst_caps_to_string (caps);
- format = g_bytes_new_take (cstr, strlen (cstr) + 1);
+ pinos_port_buffer_builder_init (this->port, &builder);
+ fc.id = 0;
+ fc.format = cstr = gst_caps_to_string (caps);
+ pinos_buffer_builder_add_format_change (&builder, &fc);
+ pinos_buffer_builder_end (&builder, &pbuf);
+ g_free (cstr);
- if (!pinos_port_update_format (this->port, format, &error)) {
- GST_WARNING ("update failed: %s", error->message);
+ if (!pinos_port_send_buffer (this->port, &pbuf, &error)) {
+ GST_WARNING ("format update failed: %s", error->message);
g_clear_error (&error);
}
+ pinos_buffer_unref (&pbuf);
}
return GST_BASE_SINK_CLASS (parent_class)->set_caps (bsink, caps);
@@ -249,12 +256,13 @@ gst_pinos_port_sink_render_pinos (GstPinosPortSink * this, GstBuffer * buffer)
gst_buffer_map (buffer, &info, GST_MAP_READ);
pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0);
- gst_buffer_unmap (buffer, &info);
if (!pinos_port_send_buffer (this->port, &pbuf, &error)) {
GST_WARNING ("send failed: %s", error->message);
g_clear_error (&error);
}
+ gst_buffer_unmap (buffer, &info);
+ pinos_buffer_unref (&pbuf);
return GST_FLOW_OK;
}
@@ -293,17 +301,19 @@ gst_pinos_port_sink_render_other (GstPinosPortSink * this, GstBuffer * buffer)
PinosPacketHeader hdr;
PinosPacketFDPayload p;
gboolean tmpfile = TRUE;
+ gint fd;
hdr.flags = 0;
hdr.seq = GST_BUFFER_OFFSET (buffer);
hdr.pts = GST_BUFFER_PTS (buffer) + GST_ELEMENT_CAST (this)->base_time;
hdr.dts_offset = 0;
- pinos_buffer_builder_init (&builder);
+ pinos_port_buffer_builder_init (this->port, &builder);
pinos_buffer_builder_add_header (&builder, &hdr);
fdmem = gst_pinos_port_sink_get_fd_memory (this, buffer, &tmpfile);
- p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem));
+ fd = gst_fd_memory_get_fd (fdmem);
+ p.fd_index = pinos_buffer_builder_add_fd (&builder, fd);
p.id = pinos_fd_manager_get_id (this->fdmanager);
p.offset = fdmem->offset;
p.size = fdmem->size;
@@ -318,6 +328,8 @@ gst_pinos_port_sink_render_other (GstPinosPortSink * this, GstBuffer * buffer)
GST_WARNING ("send failed: %s", error->message);
g_clear_error (&error);
}
+ pinos_buffer_steal_fds (&pbuf, NULL);
+ pinos_buffer_unref (&pbuf);
gst_memory_unref(fdmem);
diff --git a/pinos/gst/gstpinosportsrc.c b/pinos/gst/gstpinosportsrc.c
index d9c7415a9..5ce923bdb 100644
--- a/pinos/gst/gstpinosportsrc.c
+++ b/pinos/gst/gstpinosportsrc.c
@@ -99,12 +99,12 @@ fdpayload_data_destroy (gpointer user_data)
GST_DEBUG_OBJECT (this, "destroy %d", r.id);
- pinos_buffer_builder_init (&b);
+ pinos_port_buffer_builder_init (this->port, &b);
pinos_buffer_builder_add_release_fd_payload (&b, &r);
pinos_buffer_builder_end (&b, &pbuf);
pinos_port_send_buffer (this->port, &pbuf, NULL);
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_unref (&pbuf);
gst_object_unref (this);
g_slice_free (FDPayloadData, data);
@@ -120,7 +120,7 @@ on_received_buffer (PinosPort *port,
GstBuffer *buf = NULL;
GST_LOG_OBJECT (this, "got new buffer");
- pbuf = pinos_port_get_buffer (port);
+ pbuf = pinos_port_peek_buffer (port);
pinos_buffer_iter_init (&it, pbuf);
while (pinos_buffer_iter_next (&it)) {
@@ -130,7 +130,7 @@ on_received_buffer (PinosPort *port,
PinosPacketHeader hdr;
if (!pinos_buffer_iter_parse_header (&it, &hdr))
- goto parse_failed;
+ break;
if (buf == NULL)
buf = gst_buffer_new ();
@@ -152,17 +152,17 @@ on_received_buffer (PinosPort *port,
int fd;
if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p))
- goto parse_failed;
+ break;
GST_DEBUG ("got fd payload id %d", data.p.id);
fd = pinos_buffer_get_fd (pbuf, data.p.fd_index);
if (fd == -1)
- goto no_fds;
+ break;
if (buf == NULL)
buf = gst_buffer_new ();
- fdmem = gst_fd_allocator_alloc (this->fd_allocator, fd,
+ fdmem = gst_fd_allocator_alloc (this->fd_allocator, dup (fd),
data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (fdmem, data.p.offset, data.p.size);
gst_buffer_append_memory (buf, fdmem);
@@ -180,7 +180,7 @@ on_received_buffer (PinosPort *port,
GstCaps *caps;
if (!pinos_buffer_iter_parse_format_change (&it, &change))
- goto parse_failed;
+ break;
GST_DEBUG ("got format change %d %s", change.id, change.format);
caps = gst_caps_from_string (change.format);
@@ -192,27 +192,14 @@ on_received_buffer (PinosPort *port,
break;
}
}
+ pinos_buffer_iter_end (&it);
+
if (buf) {
g_queue_push_tail (&this->queue, buf);
g_cond_signal (&this->cond);
}
return;
-
- /* ERRORS */
-parse_failed:
- {
- gst_buffer_unref (buf);
- GST_ELEMENT_ERROR (this, RESOURCE, FAILED, ("buffer parse failure"), (NULL));
- return;
- }
-no_fds:
- {
- gst_buffer_unref (buf);
- GST_ELEMENT_ERROR (this, RESOURCE, FAILED, ("fd not found in buffer"), (NULL));
- return;
- }
-
}
static void
@@ -443,7 +430,7 @@ gst_pinos_port_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
g_object_get (this->port, "format", &format, NULL);
if (format) {
- GST_DEBUG ("have format %s", g_bytes_get_data (format, NULL));
+ GST_DEBUG ("have format %s", (gchar *)g_bytes_get_data (format, NULL));
caps = gst_caps_from_string (g_bytes_get_data (format, NULL));
g_bytes_unref (format);
}
@@ -475,13 +462,13 @@ gst_pinos_port_src_event (GstBaseSrc * src, GstEvent * event)
refresh.request_type = all_headers ? 1 : 0;
refresh.pts = running_time;
- pinos_buffer_builder_init (&b);
+ pinos_port_buffer_builder_init (this->port, &b);
pinos_buffer_builder_add_refresh_request (&b, &refresh);
pinos_buffer_builder_end (&b, &pbuf);
GST_DEBUG_OBJECT (this, "send refresh request");
pinos_port_send_buffer (this->port, &pbuf, NULL);
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_unref (&pbuf);
res = TRUE;
} else {
res = GST_BASE_SRC_CLASS (parent_class)->event (src, event);
diff --git a/pinos/gst/gstpinosprovide.c b/pinos/gst/gstpinosprovide.c
index 46901fa2b..89db03b2c 100644
--- a/pinos/gst/gstpinosprovide.c
+++ b/pinos/gst/gstpinosprovide.c
@@ -412,6 +412,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gsize size;
GError *err = NULL;
gboolean tmpfile, res;
+ int fd;
pinossink = GST_PINOS_SINK (bsink);
@@ -455,7 +456,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
pinos_buffer_builder_init (&builder);
pinos_buffer_builder_add_header (&builder, &hdr);
- p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem), &err);
+ fd = gst_fd_memory_get_fd (mem);
+ p.fd_index = pinos_buffer_builder_add_fd (&builder, fd, &err);
if (p.fd_index == -1)
goto add_fd_failed;
p.id = pinossink->id_counter++;
@@ -464,15 +466,17 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
pinos_buffer_builder_add_fd_payload (&builder, &p);
pinos_buffer_builder_end (&builder, &pbuf);
- gst_memory_unref (mem);
pinos_main_loop_lock (pinossink->loop);
if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
goto streaming_error;
res = pinos_stream_send_buffer (pinossink->stream, &pbuf);
+ pinos_buffer_steal_fds (&pbuf, NULL);
pinos_buffer_clear (&pbuf);
pinos_main_loop_unlock (pinossink->loop);
+ gst_memory_unref (mem);
+
if (res && !tmpfile) {
/* keep the buffer around until we get the release fd message */
g_hash_table_insert (pinossink->fdids, GINT_TO_POINTER (p.id), gst_buffer_ref (buffer));
diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c
index da9894947..ab56f6dc6 100644
--- a/pinos/gst/gstpinossink.c
+++ b/pinos/gst/gstpinossink.c
@@ -337,7 +337,7 @@ on_new_buffer (GObject *gobject,
return;
}
- if (!pinos_stream_get_buffer (pinossink->stream, &pbuf)) {
+ if (!(pbuf = pinos_stream_peek_buffer (pinossink->stream))) {
g_warning ("failed to capture buffer");
return;
}
@@ -373,6 +373,7 @@ on_new_buffer (GObject *gobject,
break;
}
}
+ pinos_buffer_iter_end (&it);
}
static void
@@ -471,7 +472,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
PinosPacketFormatChange change;
PinosBuffer pbuf;
- pinos_buffer_builder_init (&builder);
+ pinos_stream_buffer_builder_init (pinossink->stream, &builder);
change.id = 1;
change.format = g_bytes_get_data (format, NULL);
@@ -480,7 +481,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
g_debug ("sending format");
res = pinos_stream_send_buffer (pinossink->stream, &pbuf);
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_unref (&pbuf);
}
pinos_main_loop_unlock (pinossink->loop);
@@ -552,10 +553,14 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
tmpfile = TRUE;
}
- pinos_buffer_builder_init (&builder);
+ pinos_main_loop_lock (pinossink->loop);
+ if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
+ goto streaming_error;
+
+ pinos_stream_buffer_builder_init (pinossink->stream, &builder);
pinos_buffer_builder_add_header (&builder, &hdr);
- fd = dup (gst_fd_memory_get_fd (mem));
+ fd = gst_fd_memory_get_fd (mem);
p.fd_index = pinos_buffer_builder_add_fd (&builder, fd);
p.id = pinossink->id_counter++;
p.offset = 0;
@@ -565,11 +570,9 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GST_LOG ("sending fd index %d %d %d", p.id, p.fd_index, fd);
- pinos_main_loop_lock (pinossink->loop);
- if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
- goto streaming_error;
res = pinos_stream_send_buffer (pinossink->stream, &pbuf);
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_steal_fds (&pbuf, NULL);
+ pinos_buffer_unref (&pbuf);
pinos_main_loop_unlock (pinossink->loop);
gst_memory_unref (mem);
diff --git a/pinos/gst/gstpinossocketsink.c b/pinos/gst/gstpinossocketsink.c
index 6a70cd173..95b910648 100644
--- a/pinos/gst/gstpinossocketsink.c
+++ b/pinos/gst/gstpinossocketsink.c
@@ -262,8 +262,9 @@ gst_pinos_socket_sink_render_pinos (GstPinosSocketSink * this, GstBuffer * buffe
break;
}
}
+ pinos_buffer_iter_end (&it);
+ pinos_buffer_unref (&pbuf);
gst_buffer_unmap (buffer, &info);
- pinos_buffer_clear (&pbuf);
if (fdids != NULL) {
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buffer),
@@ -338,6 +339,7 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe
data = pinos_buffer_steal_data (&pbuf, &size);
fds = pinos_buffer_steal_fds (&pbuf, &n_fds);
+ pinos_buffer_unref (&pbuf);
outbuf = gst_buffer_new_wrapped (data, size);
GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (buffer);
@@ -582,7 +584,8 @@ myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader)
break;
}
}
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_iter_end (&it);
+ pinos_buffer_unref (&pbuf);
g_free (mem);
if (this->pinos_input) {
diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c
index 012b74371..8a2939086 100644
--- a/pinos/gst/gstpinossrc.c
+++ b/pinos/gst/gstpinossrc.c
@@ -332,19 +332,18 @@ fdpayload_data_destroy (gpointer user_data)
GST_DEBUG_OBJECT (pinossrc, "destroy %d", r.id);
- pinos_buffer_builder_init (&b);
- pinos_buffer_builder_add_release_fd_payload (&b, &r);
- pinos_buffer_builder_end (&b, &pbuf);
-
GST_OBJECT_LOCK (pinossrc);
if (pinossrc->stream_state == PINOS_STREAM_STATE_STREAMING) {
+ pinos_stream_buffer_builder_init (pinossrc->stream, &b);
+ pinos_buffer_builder_add_release_fd_payload (&b, &r);
+ pinos_buffer_builder_end (&b, &pbuf);
+
GST_DEBUG_OBJECT (pinossrc, "send release-fd for %d", r.id);
pinos_stream_send_buffer (pinossrc->stream, &pbuf);
+ pinos_buffer_unref (&pbuf);
}
GST_OBJECT_UNLOCK (pinossrc);
- pinos_buffer_clear (&pbuf);
-
gst_object_unref (pinossrc);
g_slice_free (FDPayloadData, data);
}
@@ -359,7 +358,7 @@ on_new_buffer (GObject *gobject,
GstBuffer *buf = NULL;
GST_LOG_OBJECT (pinossrc, "got new buffer");
- if (!pinos_stream_get_buffer (pinossrc->stream, &pbuf)) {
+ if (!(pbuf = pinos_stream_peek_buffer (pinossrc->stream))) {
g_warning ("failed to capture buffer");
return;
}
@@ -404,7 +403,7 @@ on_new_buffer (GObject *gobject,
if (buf == NULL)
buf = gst_buffer_new ();
- fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fd,
+ fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (fd),
data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (fdmem, data.p.offset, data.p.size);
gst_buffer_append_memory (buf, fdmem);
@@ -434,6 +433,7 @@ on_new_buffer (GObject *gobject,
break;
}
}
+ pinos_buffer_iter_end (&it);
if (buf) {
g_queue_push_tail (&pinossrc->queue, buf);
@@ -445,6 +445,8 @@ on_new_buffer (GObject *gobject,
/* ERRORS */
parse_failed:
{
+ pinos_buffer_iter_end (&it);
+ pinos_buffer_unref (pbuf);
gst_buffer_unref (buf);
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, ("buffer parse failure"), (NULL));
pinos_main_loop_signal (pinossrc->loop, FALSE);
@@ -452,6 +454,8 @@ parse_failed:
}
no_fds:
{
+ pinos_buffer_iter_end (&it);
+ pinos_buffer_unref (pbuf);
gst_buffer_unref (buf);
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, ("fd not found in buffer"), (NULL));
pinos_main_loop_signal (pinossrc->loop, FALSE);
@@ -745,12 +749,13 @@ gst_pinos_src_event (GstBaseSrc * src, GstEvent * event)
gst_video_event_parse_upstream_force_key_unit (event,
&running_time, &all_headers, &count);
+ pinos_buffer_builder_init (&b);
+
refresh.last_id = 0;
refresh.request_type = all_headers ? 1 : 0;
refresh.pts = running_time;
-
- pinos_buffer_builder_init (&b);
pinos_buffer_builder_add_refresh_request (&b, &refresh);
+
pinos_buffer_builder_end (&b, &pbuf);
GST_OBJECT_LOCK (pinossrc);
@@ -760,7 +765,7 @@ gst_pinos_src_event (GstBaseSrc * src, GstEvent * event)
}
GST_OBJECT_UNLOCK (pinossrc);
- pinos_buffer_clear (&pbuf);
+ pinos_buffer_unref (&pbuf);
res = TRUE;
} else {
res = GST_BASE_SRC_CLASS (parent_class)->event (src, event);
diff --git a/pinos/server/daemon.c b/pinos/server/daemon.c
index c669ca27b..cd54633d3 100644
--- a/pinos/server/daemon.c
+++ b/pinos/server/daemon.c
@@ -391,9 +391,9 @@ pinos_daemon_remove_node (PinosDaemon *daemon,
*
* Find the best port in @daemon that matches the given parameters.
*
- * Returns: a #PinosServerPort or %NULL when no port could be found.
+ * Returns: a #PinosPort or %NULL when no port could be found.
*/
-PinosServerPort *
+PinosPort *
pinos_daemon_find_port (PinosDaemon *daemon,
PinosDirection direction,
const gchar *name,
@@ -417,7 +417,7 @@ pinos_daemon_find_port (PinosDaemon *daemon,
/* we found the node */
if (have_name && g_str_has_suffix (pinos_server_node_get_object_path (n), name)) {
- g_debug ("name \"%s\" matches node %s", name, pinos_server_node_get_object_path (n));
+ g_debug ("name \"%s\" matches node %p", name, n);
node_found = TRUE;
}
@@ -433,22 +433,15 @@ pinos_daemon_find_port (PinosDaemon *daemon,
if (have_name && !node_found) {
if (!g_str_has_suffix (pinos_server_port_get_object_path (p), name))
continue;
- g_debug ("name \"%s\" matches port %s", name, pinos_server_port_get_object_path (p));
+ g_debug ("name \"%s\" matches port %p", name, p);
best = p;
node_found = TRUE;
break;
}
- g_debug ("port %s with filter %s",
- pinos_server_port_get_object_path (p),
- format_filter ? (gchar*)g_bytes_get_data (format_filter, NULL) : "ANY");
-
format = pinos_port_filter_formats (PINOS_PORT (p), format_filter, NULL);
if (format != NULL) {
- g_debug ("port %s with format %s matches filter %s",
- pinos_server_port_get_object_path (p),
- (gchar*)g_bytes_get_data (format, NULL),
- format_filter ? (gchar*)g_bytes_get_data (format_filter, NULL) : "ANY");
+ g_debug ("port %p matches filter", p);
g_bytes_unref (format);
best = p;
node_found = TRUE;
@@ -464,7 +457,7 @@ pinos_daemon_find_port (PinosDaemon *daemon,
G_IO_ERROR_NOT_FOUND,
"No matching Port found");
}
- return best;
+ return PINOS_PORT (best);
}
diff --git a/pinos/server/daemon.h b/pinos/server/daemon.h
index f3561646c..a0d6fdc4c 100644
--- a/pinos/server/daemon.h
+++ b/pinos/server/daemon.h
@@ -75,7 +75,7 @@ void pinos_daemon_unexport (PinosDaemon *daemon, const gch
void pinos_daemon_add_node (PinosDaemon *daemon, PinosServerNode *node);
void pinos_daemon_remove_node (PinosDaemon *daemon, PinosServerNode *node);
-PinosServerPort * pinos_daemon_find_port (PinosDaemon *daemon,
+PinosPort * pinos_daemon_find_port (PinosDaemon *daemon,
PinosDirection direction,
const gchar *name,
PinosProperties *props,
diff --git a/pinos/server/server-port.c b/pinos/server/server-port.c
index d8c469f71..0f93a7ff0 100644
--- a/pinos/server/server-port.c
+++ b/pinos/server/server-port.c
@@ -164,7 +164,7 @@ on_property_notify (GObject *obj,
PinosPort *port = PINOS_PORT (obj);
PinosServerPortPrivate *priv = PINOS_SERVER_PORT (port)->priv;
- g_debug ("update %s", pspec ? g_param_spec_get_name (pspec) : "NULL");
+ g_debug ("port %p: update %s", port, pspec ? g_param_spec_get_name (pspec) : "NULL");
if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "node") == 0) {
PinosServerNode *node = PINOS_SERVER_NODE (pinos_port_get_node (port));