diff --git a/pinos/Makefile.am b/pinos/Makefile.am index db25d8e32..b3c0424ab 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -167,7 +167,6 @@ pinosgstsource = gst/gstpinospay.h gst/gstpinospay.c \ pinosinclude_HEADERS = \ client/buffer.h \ - client/client-node.h \ client/client-port.h \ client/context.h \ client/enumtypes.h \ diff --git a/pinos/client/stream.c b/pinos/client/stream.c index fa25f445c..5941c0530 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -851,38 +851,6 @@ pinos_stream_connect (PinosStream *stream, return TRUE; } -static void -on_ringbuffer (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - PinosStream *stream = user_data; - PinosStreamPrivate *priv = stream->priv; - GError *error = NULL; - - g_assert (priv->port == PINOS_PORT (source_object)); - - priv->ringbuffer = pinos_port_get_ringbuffer_finish (priv->port, - res, - &error); - if (priv->ringbuffer == NULL) - goto no_ringbuffer; - - stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); - g_object_unref (stream); - - return; - - /* ERRORS */ -no_ringbuffer: - { - g_warning ("failed to get ringbuffer: %s", error->message); - stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); - g_object_unref (stream); - return; - } -} - static gboolean do_start (PinosStream *stream) { diff --git a/pinos/gst/gstpinosportsink.c b/pinos/gst/gstpinosportsink.c index f87b03803..94d7fb7a3 100644 --- a/pinos/gst/gstpinosportsink.c +++ b/pinos/gst/gstpinosportsink.c @@ -316,7 +316,7 @@ gst_pinos_port_sink_render_other (GstPinosPortSink * this, GstBuffer * buffer) am.id = pinos_fd_manager_get_id (this->fdmanager); am.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); am.offset = 0; - am.size = fdmem->size; + am.size = fdmem->size + fdmem->offset; p.id = am.id; p.offset = fdmem->offset; p.size = fdmem->size; diff --git a/pinos/modules/gst/gst-sink.c b/pinos/modules/gst/gst-sink.c index faafe6cbf..045574ef4 100644 --- a/pinos/modules/gst/gst-sink.c +++ b/pinos/modules/gst/gst-sink.c @@ -311,14 +311,14 @@ set_property (GObject *object, } } -static gboolean -on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) +static void +on_activate (PinosPort *port, gpointer user_data) { SinkPortData *data = user_data; PinosGstSink *sink = data->sink; PinosGstSinkPrivate *priv = sink->priv; - g_debug ("port %p: linked", port); + g_debug ("port %p: activate", port); if (priv->mixer) { data->peerpad = gst_element_get_request_pad (priv->mixer, "sink_%u"); @@ -327,7 +327,7 @@ on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) } if (gst_pad_link (data->srcpad, data->peerpad) != GST_PAD_LINK_OK) { g_clear_object (&data->peerpad); - return FALSE; + return; } pinos_node_report_busy (PINOS_NODE (sink)); @@ -337,17 +337,17 @@ on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) } gst_element_set_state (data->src, GST_STATE_PLAYING); - return TRUE; + return; } static void -on_unlinked (PinosPort *port, PinosPort *peer, gpointer user_data) +on_deactivate (PinosPort *port, gpointer user_data) { SinkPortData *data = user_data; PinosGstSink *sink = data->sink; PinosGstSinkPrivate *priv = sink->priv; - g_debug ("port %p: unlinked", port); + g_debug ("port %p: deactivate", port); if (data->convert) { gst_element_set_state (data->convert, GST_STATE_NULL); @@ -399,8 +399,8 @@ add_port (PinosNode *node, ->add_port (node, direction, id, error); g_debug ("connecting signals"); - g_signal_connect (data->port, "linked", (GCallback) on_linked, data); - g_signal_connect (data->port, "unlinked", (GCallback) on_unlinked, data); + g_signal_connect (data->port, "activate", (GCallback) on_activate, data); + g_signal_connect (data->port, "deactivate", (GCallback) on_deactivate, data); data->src = gst_element_factory_make ("pinosportsrc", NULL); g_object_set (data->src, "port", data->port, NULL); diff --git a/pinos/modules/spa/spa-alsa-sink.c b/pinos/modules/spa/spa-alsa-sink.c index 43cff9973..634dee40c 100644 --- a/pinos/modules/spa/spa-alsa-sink.c +++ b/pinos/modules/spa/spa-alsa-sink.c @@ -32,6 +32,22 @@ #define PINOS_SPA_ALSA_SINK_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_SPA_ALSA_SINK, PinosSpaAlsaSinkPrivate)) +typedef struct { + PinosSpaAlsaSink *sink; + + guint id; + PinosPort *port; +} SinkPortData; + +typedef struct { + guint32 id; + guint32 type; + int fd; + guint64 offset; + guint64 size; + void *data; +} MemBlock; + struct _PinosSpaAlsaSinkPrivate { PinosPort *input; @@ -39,6 +55,10 @@ struct _PinosSpaAlsaSinkPrivate PinosProperties *props; PinosRingbuffer *ringbuffer; + GHashTable *mem_ids; + + GList *ports; + SpaHandle *sink; const SpaNode *sink_node; }; @@ -48,7 +68,7 @@ enum { PROP_POSSIBLE_FORMATS }; -G_DEFINE_TYPE (PinosSpaAlsaSink, pinos_spa_alsa_sink, PINOS_TYPE_SERVER_NODE); +G_DEFINE_TYPE (PinosSpaAlsaSink, pinos_spa_alsa_sink, PINOS_TYPE_NODE); static SpaResult make_node (SpaHandle **handle, const SpaNode **node, const char *lib, const char *name) @@ -273,28 +293,28 @@ set_property (GObject *object, } static void -on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) +on_activate (PinosPort *port, gpointer user_data) { PinosNode *node = user_data; - guint n_links; - g_debug ("port %p: linked", port); + g_debug ("port %p: activate", port); - pinos_port_get_links (port, &n_links); - if (n_links == 1) - pinos_node_report_busy (node); + pinos_node_report_busy (node); } static void -on_unlinked (PinosPort *port, PinosPort *peer, gpointer user_data) +on_deactivate (PinosPort *port, gpointer user_data) { PinosNode *node = user_data; - guint n_links; - g_debug ("port %p: unlinked", port); - pinos_port_get_links (port, &n_links); - if (n_links == 1) - pinos_node_report_idle (node); + g_debug ("port %p: deactivate", port); + pinos_node_report_idle (node); +} + +static void +free_sink_port_data (SinkPortData *data) +{ + g_slice_free (SinkPortData, data); } static SpaResult @@ -341,16 +361,23 @@ negotiate_formats (PinosSpaAlsaSink *this) } static void -on_received_buffer (PinosPort *port, - gpointer user_data) +free_mem_block (MemBlock *b) +{ + munmap (b->data, b->size); + g_slice_free (MemBlock, b); +} + +static gboolean +on_received_buffer (PinosPort *port, + PinosBuffer *buffer, + GError **error, + gpointer user_data) { PinosSpaAlsaSink *this = user_data; PinosSpaAlsaSinkPrivate *priv = this->priv; - PinosBuffer *pbuf; + PinosBuffer *pbuf = buffer; PinosBufferIter it; - pbuf = pinos_port_peek_buffer (port); - pinos_buffer_iter_init (&it, pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -363,25 +390,58 @@ on_received_buffer (PinosPort *port, break; } - - case PINOS_PACKET_TYPE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_ADD_MEM: { - PinosPacketFDPayload p; + PinosPacketAddMem p; + MemBlock *b; int fd; - PinosRingbufferArea areas[2]; - uint8_t *data, *d; - size_t size, towrite, total; - if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) break; - g_debug ("got fd payload id %d", p.id); fd = pinos_buffer_get_fd (pbuf, p.fd_index); if (fd == -1) break; - d = data = mmap (NULL, p.size, PROT_READ, MAP_PRIVATE, fd, p.offset); + b = g_slice_new0 (MemBlock); + b->id = p.id; + b->type = p.type; + b->fd = fd; + b->data = mmap (NULL, p.size, PROT_READ, MAP_PRIVATE, fd, p.offset); + b->offset = p.offset; + b->size = p.size; + + g_hash_table_insert (priv->mem_ids, GINT_TO_POINTER (p.id), b); + break; + } + + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + break; + + g_hash_table_remove (priv->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + PinosPacketProcessMem p; + MemBlock *b; + PinosRingbufferArea areas[2]; + uint8_t *data; + size_t size, towrite, total; + + if (!pinos_buffer_iter_parse_process_mem (&it, &p)) + break; + + if (!(b = g_hash_table_lookup (priv->mem_ids, GINT_TO_POINTER (p.id)))) + break; + size = p.size; + data = (guint8*)b->data + p.offset; pinos_ringbuffer_get_write_areas (priv->ringbuffer, areas); @@ -396,7 +456,6 @@ on_received_buffer (PinosPort *port, pinos_ringbuffer_write_advance (priv->ringbuffer, total); - munmap (d, p.size); break; } case PINOS_PACKET_TYPE_FORMAT_CHANGE: @@ -416,54 +475,79 @@ on_received_buffer (PinosPort *port, } } pinos_buffer_iter_end (&it); + + return TRUE; } -static void -on_input_port_created (GObject *source_object, - GAsyncResult *res, - gpointer user_data) +static PinosPort * +add_port (PinosNode *node, + PinosDirection direction, + guint id, + GError **error) { - PinosNode *node = PINOS_NODE (source_object); PinosSpaAlsaSink *sink = PINOS_SPA_ALSA_SINK (node); PinosSpaAlsaSinkPrivate *priv = sink->priv; + SinkPortData *data; - priv->input = pinos_node_create_port_finish (node, res, NULL); + data = g_slice_new0 (SinkPortData); + data->sink = sink; + data->id = id; + data->port = PINOS_NODE_CLASS (pinos_spa_alsa_sink_parent_class) + ->add_port (node, direction, id, error); - pinos_port_set_received_buffer_cb (priv->input, on_received_buffer, sink, NULL); + pinos_port_set_received_buffer_cb (data->port, on_received_buffer, sink, NULL); - g_signal_connect (priv->input, "linked", (GCallback) on_linked, node); - g_signal_connect (priv->input, "unlinked", (GCallback) on_unlinked, node); + g_debug ("connecting signals"); + g_signal_connect (data->port, "activate", (GCallback) on_activate, data); + g_signal_connect (data->port, "deactivate", (GCallback) on_deactivate, data); - create_pipeline (sink); + priv->ports = g_list_append (priv->ports, data); + + return data->port; +} + +static gboolean +remove_port (PinosNode *node, + guint id) +{ + PinosSpaAlsaSink *sink = PINOS_SPA_ALSA_SINK (node); + PinosSpaAlsaSinkPrivate *priv = sink->priv; + GList *walk; + + for (walk = priv->ports; walk; walk = g_list_next (walk)) { + SinkPortData *data = walk->data; + + if (data->id == id) { + free_sink_port_data (data); + priv->ports = g_list_delete_link (priv->ports, walk); + break; + } + } + if (priv->ports == NULL) + pinos_node_report_idle (node); + + return TRUE; } static void sink_constructed (GObject * object) { - PinosServerNode *node = PINOS_SERVER_NODE (object); + PinosSpaAlsaSink *sink = PINOS_SPA_ALSA_SINK (object); + + create_pipeline (sink); G_OBJECT_CLASS (pinos_spa_alsa_sink_parent_class)->constructed (object); - - pinos_node_create_port (PINOS_NODE (node), - PINOS_DIRECTION_INPUT, - "input", - NULL, - NULL, - NULL, - on_input_port_created, - node); } static void sink_finalize (GObject * object) { - PinosServerNode *node = PINOS_SERVER_NODE (object); PinosSpaAlsaSink *sink = PINOS_SPA_ALSA_SINK (object); PinosSpaAlsaSinkPrivate *priv = sink->priv; destroy_pipeline (sink); - pinos_node_remove_port (PINOS_NODE (node), priv->input); pinos_properties_free (priv->props); + g_hash_table_unref (priv->mem_ids); G_OBJECT_CLASS (pinos_spa_alsa_sink_parent_class)->finalize (object); } @@ -482,6 +566,8 @@ pinos_spa_alsa_sink_class_init (PinosSpaAlsaSinkClass * klass) gobject_class->set_property = set_property; node_class->set_state = set_state; + node_class->add_port = add_port; + node_class->remove_port = remove_port; } static void @@ -491,14 +577,15 @@ pinos_spa_alsa_sink_init (PinosSpaAlsaSink * sink) priv = sink->priv = PINOS_SPA_ALSA_SINK_GET_PRIVATE (sink); priv->props = pinos_properties_new (NULL, NULL); + priv->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) free_mem_block); } -PinosServerNode * +PinosNode * pinos_spa_alsa_sink_new (PinosDaemon *daemon, const gchar *name, PinosProperties *properties) { - PinosServerNode *node; + PinosNode *node; node = g_object_new (PINOS_TYPE_SPA_ALSA_SINK, "daemon", daemon, diff --git a/pinos/modules/spa/spa-alsa-sink.h b/pinos/modules/spa/spa-alsa-sink.h index 9f67c3840..de18ea195 100644 --- a/pinos/modules/spa/spa-alsa-sink.h +++ b/pinos/modules/spa/spa-alsa-sink.h @@ -23,7 +23,7 @@ #include #include -#include +#include G_BEGIN_DECLS @@ -41,18 +41,18 @@ typedef struct _PinosSpaAlsaSinkClass PinosSpaAlsaSinkClass; typedef struct _PinosSpaAlsaSinkPrivate PinosSpaAlsaSinkPrivate; struct _PinosSpaAlsaSink { - PinosServerNode object; + PinosNode object; PinosSpaAlsaSinkPrivate *priv; }; struct _PinosSpaAlsaSinkClass { - PinosServerNodeClass parent_class; + PinosNodeClass parent_class; }; GType pinos_spa_alsa_sink_get_type (void); -PinosServerNode * pinos_spa_alsa_sink_new (PinosDaemon *daemon, +PinosNode * pinos_spa_alsa_sink_new (PinosDaemon *daemon, const gchar *name, PinosProperties *properties); diff --git a/pinos/modules/spa/spa-v4l2-source.c b/pinos/modules/spa/spa-v4l2-source.c index 6ecb8f523..8d6507bf9 100644 --- a/pinos/modules/spa/spa-v4l2-source.c +++ b/pinos/modules/spa/spa-v4l2-source.c @@ -41,8 +41,9 @@ typedef struct { PinosSpaV4l2Source *source; + guint id; gboolean have_format; - PinosServerPort *port; + PinosPort *port; } SourcePortData; struct _PinosSpaV4l2SourcePrivate @@ -67,7 +68,7 @@ enum { PROP_0, }; -G_DEFINE_TYPE (PinosSpaV4l2Source, pinos_spa_v4l2_source, PINOS_TYPE_SERVER_NODE); +G_DEFINE_TYPE (PinosSpaV4l2Source, pinos_spa_v4l2_source, PINOS_TYPE_NODE); static SpaResult make_node (SpaHandle **handle, const SpaNode **node, const char *lib, const char *name) @@ -171,7 +172,9 @@ on_source_event (SpaHandle *handle, SpaEvent *event, void *user_data) PinosBuffer pbuf; PinosBufferBuilder builder; PinosPacketHeader hdr; - PinosPacketFDPayload p; + PinosPacketAddMem am; + PinosPacketProcessMem p; + PinosPacketRemoveMem rm; GList *walk; gint fd; guint8 buf[1024]; @@ -197,11 +200,17 @@ on_source_event (SpaHandle *handle, SpaEvent *event, void *user_data) fd = tmpfile_create (source, b->datas[0].ptr, b->size); do_close = TRUE; } - p.fd_index = pinos_buffer_builder_add_fd (&builder, fd); - p.id = pinos_fd_manager_get_id (priv->fdmanager); + am.fd_index = pinos_buffer_builder_add_fd (&builder, fd); + am.id = pinos_fd_manager_get_id (priv->fdmanager); + am.offset = 0; + am.size = b->datas[0].size + b->datas[0].offset; + p.id = am.id; p.offset = b->datas[0].offset; p.size = b->datas[0].size; - pinos_buffer_builder_add_fd_payload (&builder, &p); + rm.id = am.id; + pinos_buffer_builder_add_add_mem (&builder, &am); + pinos_buffer_builder_add_process_mem (&builder, &p); + pinos_buffer_builder_add_remove_mem (&builder, &rm); pinos_buffer_builder_end (&builder, &pbuf); for (walk = priv->ports; walk; walk = g_list_next (walk)) { @@ -459,42 +468,33 @@ set_property (GObject *object, } static gboolean -on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) +on_activate (PinosPort *port, gpointer user_data) { SourcePortData *data = user_data; PinosSpaV4l2Source *source = data->source; - guint n_links; - pinos_port_get_links (port, &n_links); - g_debug ("port %p: linked, now %d", port, n_links); - if (n_links == 0) - pinos_node_report_busy (PINOS_NODE (source)); + pinos_node_report_busy (PINOS_NODE (source)); return TRUE; } static void -on_unlinked (PinosPort *port, PinosPort *peer, gpointer user_data) +on_deactivate (PinosPort *port, gpointer user_data) { SourcePortData *data = user_data; PinosSpaV4l2Source *source = data->source; - guint n_links; - pinos_port_get_links (port, &n_links); - g_debug ("port %p: unlinked, now %d", port, n_links); - if (n_links == 1) - pinos_node_report_busy (PINOS_NODE (source)); + pinos_node_report_idle (PINOS_NODE (source)); } -static void -on_received_buffer (PinosPort *port, +static gboolean +on_received_buffer (PinosPort *port, + PinosBuffer *pbuf, + GError **error, gpointer user_data) { - PinosBuffer *pbuf; PinosBufferIter it; - pbuf = pinos_port_peek_buffer (port); - pinos_buffer_iter_init (&it, pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { @@ -503,6 +503,8 @@ on_received_buffer (PinosPort *port, } } pinos_buffer_iter_end (&it); + + return TRUE; } static void @@ -511,9 +513,9 @@ free_source_port_data (SourcePortData *data) g_slice_free (SourcePortData, data); } -static void +static gboolean remove_port (PinosNode *node, - PinosPort *port) + guint id) { PinosSpaV4l2Source *source = PINOS_SPA_V4L2_SOURCE (node); PinosSpaV4l2SourcePrivate *priv = source->priv; @@ -522,7 +524,7 @@ remove_port (PinosNode *node, for (walk = priv->ports; walk; walk = g_list_next (walk)) { SourcePortData *data = walk->data; - if (data->port == PINOS_SERVER_PORT_CAST (port)) { + if (data->id == id) { free_source_port_data (data); priv->ports = g_list_delete_link (priv->ports, walk); break; @@ -530,6 +532,8 @@ remove_port (PinosNode *node, } if (priv->ports == NULL) pinos_node_report_idle (node); + + return TRUE; } static void @@ -553,12 +557,11 @@ source_finalize (GObject * object) G_OBJECT_CLASS (pinos_spa_v4l2_source_parent_class)->finalize (object); } -static PinosServerPort * -create_port_sync (PinosServerNode *node, - PinosDirection direction, - const gchar *name, - GBytes *possible_formats, - PinosProperties *props) +static PinosPort * +add_port (PinosNode *node, + PinosDirection direction, + guint id, + GError **error) { PinosSpaV4l2Source *source = PINOS_SPA_V4L2_SOURCE (node); PinosSpaV4l2SourcePrivate *priv = source->priv; @@ -566,20 +569,17 @@ create_port_sync (PinosServerNode *node, data = g_slice_new0 (SourcePortData); data->source = source; + data->id = id; data->have_format = FALSE; - data->port = PINOS_SERVER_NODE_CLASS (pinos_spa_v4l2_source_parent_class) - ->create_port_sync (node, - direction, - name, - possible_formats, - props); + data->port = PINOS_NODE_CLASS (pinos_spa_v4l2_source_parent_class) + ->add_port (node, direction, id, error); - pinos_port_set_received_buffer_cb (PINOS_PORT (data->port), on_received_buffer, source, NULL); + pinos_port_set_received_buffer_cb (data->port, on_received_buffer, source, NULL); g_debug ("connecting signals"); - g_signal_connect (data->port, "linked", (GCallback) on_linked, data); - g_signal_connect (data->port, "unlinked", (GCallback) on_unlinked, data); + g_signal_connect (data->port, "activate", (GCallback) on_activate, data); + g_signal_connect (data->port, "deactivate", (GCallback) on_deactivate, data); priv->ports = g_list_append (priv->ports, data); @@ -591,7 +591,6 @@ pinos_spa_v4l2_source_class_init (PinosSpaV4l2SourceClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); PinosNodeClass *node_class = PINOS_NODE_CLASS (klass); - PinosServerNodeClass *server_node_class = PINOS_SERVER_NODE_CLASS (klass); g_type_class_add_private (klass, sizeof (PinosSpaV4l2SourcePrivate)); @@ -601,9 +600,8 @@ pinos_spa_v4l2_source_class_init (PinosSpaV4l2SourceClass * klass) gobject_class->set_property = set_property; node_class->set_state = set_state; + node_class->add_port = add_port; node_class->remove_port = remove_port; - - server_node_class->create_port_sync = create_port_sync; } static void @@ -615,12 +613,12 @@ pinos_spa_v4l2_source_init (PinosSpaV4l2Source * source) } -PinosServerNode * +PinosNode * pinos_spa_v4l2_source_new (PinosDaemon *daemon, const gchar *name, PinosProperties *properties) { - PinosServerNode *node; + PinosNode *node; node = g_object_new (PINOS_TYPE_SPA_V4L2_SOURCE, "daemon", daemon, diff --git a/pinos/modules/spa/spa-v4l2-source.h b/pinos/modules/spa/spa-v4l2-source.h index 05da8c68b..efc5df962 100644 --- a/pinos/modules/spa/spa-v4l2-source.h +++ b/pinos/modules/spa/spa-v4l2-source.h @@ -23,7 +23,7 @@ #include #include -#include +#include G_BEGIN_DECLS @@ -41,18 +41,18 @@ typedef struct _PinosSpaV4l2SourceClass PinosSpaV4l2SourceClass; typedef struct _PinosSpaV4l2SourcePrivate PinosSpaV4l2SourcePrivate; struct _PinosSpaV4l2Source { - PinosServerNode object; + PinosNode object; PinosSpaV4l2SourcePrivate *priv; }; struct _PinosSpaV4l2SourceClass { - PinosServerNodeClass parent_class; + PinosNodeClass parent_class; }; GType pinos_spa_v4l2_source_get_type (void); -PinosServerNode * pinos_spa_v4l2_source_new (PinosDaemon *daemon, +PinosNode * pinos_spa_v4l2_source_new (PinosDaemon *daemon, const gchar *name, PinosProperties *properties);