From b795fb851fac19dfe85c8fa4071bbaaedb1a2557 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 25 Jul 2016 15:55:56 +0200 Subject: [PATCH] stream: work on stream upload --- pinos/client/buffer.c | 33 +++++---- pinos/client/buffer.h | 2 +- pinos/client/stream.c | 3 +- pinos/gst/gstpinossink.c | 3 +- pinos/modules/spa/spa-alsa-sink.c | 115 ++++++++++++++++++++++++++---- pinos/server/client-node.c | 82 ++++++++++++++++++++- pinos/server/daemon.c | 4 +- pinos/server/link.c | 17 ++++- pinos/server/node.h | 1 - 9 files changed, 224 insertions(+), 36 deletions(-) diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index ec7f088f8..441cf836a 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -929,18 +929,19 @@ pinos_buffer_iter_parse_format_change (PinosBufferIter *iter, PinosPacketFormatChange *payload) { struct stack_iter *si = PPSI (iter); - char *p; + PinosPacketFormatChange *fc; g_return_val_if_fail (is_valid_iter (iter), FALSE); g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_FORMAT_CHANGE, FALSE); - if (si->size < 2) + if (si->size < 9) return FALSE; - p = si->data; + fc = si->data; - payload->id = *p++; - payload->format = p; + payload->port = fc->port; + payload->id = fc->id; + payload->format = (gchar *) &fc->format; return TRUE; } @@ -959,18 +960,22 @@ pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, PinosPacketFormatChange *payload) { struct stack_builder *sb = PPSB (builder); - gsize len; - char *p; + gsize len, slen; + PinosPacketFormatChange *fc; g_return_val_if_fail (is_valid_builder (builder), FALSE); - /* id + format len + zero byte */ - len = 1 + strlen (payload->format) + 1; - p = builder_add_packet (sb, - PINOS_PACKET_TYPE_FORMAT_CHANGE, - len); - *p++ = payload->id; - strcpy (p, payload->format); + /* port + id + format len + zero byte */ + slen = strlen (payload->format) + 1; + len = 8 + slen; + fc = builder_add_packet (sb, + PINOS_PACKET_TYPE_FORMAT_CHANGE, + len); + fc->port = payload->port; + fc->id = payload->id; + memcpy ((gchar*)&fc->format, payload->format, slen); + + //strcpy ((gchar*)&fc->format, payload->format); sb->sh->flags |= PINOS_BUFFER_FLAG_CONTROL; return TRUE; diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index eda81a111..313862dde 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -277,7 +277,7 @@ gboolean pinos_buffer_builder_add_reuse_mem (PinosBufferBuilder *bui */ typedef struct { guint32 port; - guint8 id; + guint32 id; const gchar *format; } PinosPacketFormatChange; diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 5941c0530..ef458dee3 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -863,6 +863,7 @@ do_start (PinosStream *stream) handle_socket (stream, priv->fd); pinos_stream_buffer_builder_init (stream, &builder); + fc.port = 0; fc.id = 0; fc.format = priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"; pinos_buffer_builder_add_format_change (&builder, &fc); @@ -1112,7 +1113,7 @@ pinos_stream_send_buffer (PinosStream *stream, priv = stream->priv; if (!pinos_io_write_buffer (priv->fd, buffer, &error)) { - g_warning ("stream %p: failed to read buffer: %s", stream, error->message); + g_warning ("stream %p: failed to write buffer: %s", stream, error->message); g_clear_error (&error); return FALSE; } diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index ffe24fb0e..2d1e63e8c 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -547,7 +547,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) am.id = pinos_fd_manager_get_id (pinossink->fdmanager); am.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem)); am.offset = 0; - am.size = mem->size; + am.size = mem->size + mem->offset; + p.port = 0; p.id = am.id; p.offset = mem->offset; p.size = mem->size; diff --git a/pinos/modules/spa/spa-alsa-sink.c b/pinos/modules/spa/spa-alsa-sink.c index 634dee40c..180bcbe1b 100644 --- a/pinos/modules/spa/spa-alsa-sink.c +++ b/pinos/modules/spa/spa-alsa-sink.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include @@ -50,17 +52,22 @@ typedef struct { struct _PinosSpaAlsaSinkPrivate { - PinosPort *input; + SpaHandle *sink; + const SpaNode *sink_node; PinosProperties *props; PinosRingbuffer *ringbuffer; + SpaPollFd fds[16]; + unsigned int n_fds; + SpaPollItem poll; + + gboolean running; + pthread_t thread; + GHashTable *mem_ids; GList *ports; - - SpaHandle *sink; - const SpaNode *sink_node; }; enum { @@ -145,7 +152,7 @@ on_sink_event (SpaHandle *handle, SpaEvent *event, void *user_data) pinos_ringbuffer_get_read_areas (priv->ringbuffer, areas); total = MIN (size, areas[0].len + areas[1].len); - g_debug ("total read %zd %zd", total, areas[0].len + areas[1].len); + g_debug ("total read %zd %zd %zd", total, size, areas[0].len + areas[1].len); if (total < size) { g_warning ("underrun"); } @@ -170,6 +177,19 @@ on_sink_event (SpaHandle *handle, SpaEvent *event, void *user_data) g_debug ("got error %d", res); break; } + + case SPA_EVENT_TYPE_ADD_POLL: + { + SpaPollItem *poll = event->data; + + g_debug ("add poll"); + priv->poll = *poll; + priv->fds[0] = poll->fds[0]; + priv->n_fds = 1; + priv->poll.fds = priv->fds; + break; + } + default: g_debug ("got event %d", event->type); break; @@ -194,26 +214,66 @@ create_pipeline (PinosSpaAlsaSink *this) g_debug ("got get_props error %d", res); value.type = SPA_PROP_TYPE_STRING; - value.size = strlen ("hw:1")+1; - value.value = "hw:1"; + value.value = "hw:0"; + value.size = strlen (value.value)+1; props->set_prop (props, spa_props_index_for_name (props, "device"), &value); if ((res = priv->sink_node->set_props (priv->sink, props)) < 0) g_debug ("got set_props error %d", res); } +static void * +loop (void *user_data) +{ + PinosSpaAlsaSink *this = user_data; + PinosSpaAlsaSinkPrivate *priv = this->priv; + int r; + + g_debug ("spa-alsa-sink %p: enter thread", this); + while (priv->running) { + SpaPollNotifyData ndata; + + r = poll ((struct pollfd *) priv->fds, priv->n_fds, -1); + if (r < 0) { + if (errno == EINTR) + continue; + break; + } + if (r == 0) { + g_debug ("spa-alsa-sink %p: select timeout", this); + break; + } + if (priv->poll.after_cb) { + ndata.fds = priv->poll.fds; + ndata.n_fds = priv->poll.n_fds; + ndata.user_data = priv->poll.user_data; + priv->poll.after_cb (&ndata); + } + } + g_debug ("spa-alsa-sink %p: leave thread", this); + + return NULL; +} + static void start_pipeline (PinosSpaAlsaSink *sink) { PinosSpaAlsaSinkPrivate *priv = sink->priv; SpaResult res; SpaCommand cmd; + int err; g_debug ("spa-alsa-sink %p: starting pipeline", sink); cmd.type = SPA_COMMAND_START; if ((res = priv->sink_node->send_command (priv->sink, &cmd)) < 0) g_debug ("got error %d", res); + + priv->running = true; + if ((err = pthread_create (&priv->thread, NULL, loop, sink)) != 0) { + g_debug ("spa-v4l2-source %p: can't create thread", strerror (err)); + priv->running = false; + } } static void @@ -225,6 +285,11 @@ stop_pipeline (PinosSpaAlsaSink *sink) g_debug ("spa-alsa-sink %p: stopping pipeline", sink); + if (priv->running) { + priv->running = false; + pthread_join (priv->thread, NULL); + } + cmd.type = SPA_COMMAND_STOP; if ((res = priv->sink_node->send_command (priv->sink, &cmd)) < 0) g_debug ("got error %d", res); @@ -256,7 +321,7 @@ set_state (PinosNode *node, break; case PINOS_NODE_STATE_RUNNING: - //start_pipeline (this); + start_pipeline (this); break; case PINOS_NODE_STATE_ERROR: @@ -295,7 +360,8 @@ set_property (GObject *object, static void on_activate (PinosPort *port, gpointer user_data) { - PinosNode *node = user_data; + SinkPortData *data = user_data; + PinosNode *node = PINOS_NODE (data->sink); g_debug ("port %p: activate", port); @@ -305,7 +371,8 @@ on_activate (PinosPort *port, gpointer user_data) static void on_deactivate (PinosPort *port, gpointer user_data) { - PinosNode *node = user_data; + SinkPortData *data = user_data; + PinosNode *node = PINOS_NODE (data->sink); g_debug ("port %p: deactivate", port); pinos_node_report_idle (node); @@ -355,8 +422,6 @@ negotiate_formats (PinosSpaAlsaSink *this) priv->ringbuffer = pinos_ringbuffer_new (PINOS_RINGBUFFER_MODE_READ, 64 * 1024); - g_object_set (priv->input, "ringbuffer", priv->ringbuffer, NULL); - return SPA_RESULT_OK; } @@ -466,8 +531,6 @@ on_received_buffer (PinosPort *port, break; g_debug ("got format change %d %s", change.id, change.format); - negotiate_formats (this); - start_pipeline (this); break; } default: @@ -479,6 +542,24 @@ on_received_buffer (PinosPort *port, return TRUE; } +static void +on_format_change (GObject *obj, + GParamSpec *pspec, + gpointer user_data) +{ + SinkPortData *data = user_data; + PinosNode *node = PINOS_NODE (data->sink); + PinosSpaAlsaSink *sink = PINOS_SPA_ALSA_SINK (node); + PinosSpaAlsaSinkPrivate *priv = sink->priv; + GBytes *formats; + + g_object_get (obj, "format", &formats, NULL); + if (formats) { + g_debug ("port %p: format change %s", obj, g_bytes_get_data (formats, NULL)); + negotiate_formats (sink); + } +} + static PinosPort * add_port (PinosNode *node, PinosDirection direction, @@ -488,6 +569,7 @@ add_port (PinosNode *node, PinosSpaAlsaSink *sink = PINOS_SPA_ALSA_SINK (node); PinosSpaAlsaSinkPrivate *priv = sink->priv; SinkPortData *data; + GBytes *formats; data = g_slice_new0 (SinkPortData); data->sink = sink; @@ -497,10 +579,15 @@ add_port (PinosNode *node, pinos_port_set_received_buffer_cb (data->port, on_received_buffer, sink, NULL); + formats = g_bytes_new ("ANY", strlen ("ANY") + 1); + g_object_set (data->port, "possible-formats", formats, NULL); + g_debug ("connecting signals"); g_signal_connect (data->port, "activate", (GCallback) on_activate, data); g_signal_connect (data->port, "deactivate", (GCallback) on_deactivate, data); + g_signal_connect (data->port, "notify::format", (GCallback) on_format_change, data); + priv->ports = g_list_append (priv->ports, data); return data->port; diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 0be3954c7..6f778571e 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -37,6 +38,15 @@ #define MAX_BUFFER_SIZE 1024 #define MAX_FDS 16 +typedef struct { + guint32 id; + guint32 type; + int fd; + guint64 offset; + guint64 size; + void *data; +} MemBlock; + struct _PinosClientNodePrivate { int fd; @@ -50,6 +60,8 @@ struct _PinosClientNodePrivate guint8 send_data[MAX_BUFFER_SIZE]; int send_fds[MAX_FDS]; + + GHashTable *mem_ids; }; #define PINOS_CLIENT_NODE_GET_PRIVATE(obj) \ @@ -100,6 +112,13 @@ pinos_client_node_set_property (GObject *_object, } } +static void +free_mem_block (MemBlock *b) +{ + munmap (b->data, b->size); + g_slice_free (MemBlock, b); +} + static gboolean parse_buffer (PinosClientNode *cnode, PinosBuffer *pbuf) @@ -122,6 +141,8 @@ parse_buffer (PinosClientNode *cnode, if (!pinos_buffer_iter_parse_format_change (&it, &p)) break; + g_debug ("format change port %d", p.port); + if (!(port = pinos_node_find_port (node, p.port))) break; @@ -197,6 +218,62 @@ parse_buffer (PinosClientNode *cnode, } break; } + case PINOS_PACKET_TYPE_ADD_MEM: + { + PinosPacketAddMem p; + MemBlock *b; + int fd; + + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) + break; + + fd = pinos_buffer_get_fd (pbuf, p.fd_index); + if (fd == -1) + break; + + b = g_slice_new0 (MemBlock); + b->id = p.id; + b->type = p.type; + b->fd = fd; + b->data = mmap (NULL, p.size, PROT_READ, MAP_PRIVATE, fd, p.offset); + b->offset = p.offset; + b->size = p.size; + + g_hash_table_insert (priv->mem_ids, GINT_TO_POINTER (p.id), b); + break; + } + + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + break; + + g_hash_table_remove (priv->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + PinosPacketProcessMem p; + GError *error = NULL; + + if (!pinos_buffer_iter_parse_process_mem (&it, &p)) + break; + + if (!(port = pinos_node_find_port (node, p.port))) + break; + + if (!pinos_port_send_buffer (port, pbuf, &error)) { + g_warning ("client-node %p: port %p failed to receive buffer: %s", node, port, error->message); + g_clear_error (&error); + } + break; + } + case PINOS_PACKET_TYPE_HEADER: + { + break; + } case PINOS_PACKET_TYPE_REUSE_MEM: { break; @@ -390,8 +467,10 @@ static void pinos_client_node_finalize (GObject * object) { PinosClientNode *node = PINOS_CLIENT_NODE (object); + PinosClientNodePrivate *priv = node->priv; g_debug ("client-node %p: finalize", node); + g_hash_table_unref (priv->mem_ids); G_OBJECT_CLASS (pinos_client_node_parent_class)->finalize (object); } @@ -427,7 +506,8 @@ pinos_client_node_class_init (PinosClientNodeClass * klass) static void pinos_client_node_init (PinosClientNode * node) { - node->priv = PINOS_CLIENT_NODE_GET_PRIVATE (node); + PinosClientNodePrivate *priv = node->priv = PINOS_CLIENT_NODE_GET_PRIVATE (node); g_debug ("client-node %p: new", node); + priv->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) free_mem_block); } diff --git a/pinos/server/daemon.c b/pinos/server/daemon.c index d5c961a55..b152e1615 100644 --- a/pinos/server/daemon.c +++ b/pinos/server/daemon.c @@ -546,13 +546,13 @@ pinos_daemon_find_port (PinosDaemon *daemon, have_name = name ? strlen (name) > 0 : FALSE; - g_debug ("name %s, format %s, %d", name, (gchar*)g_bytes_get_data (format_filter, NULL), have_name); + g_debug ("name \"%s\", format %s, %d", name, (gchar*)g_bytes_get_data (format_filter, NULL), have_name); for (nodes = priv->nodes; nodes; nodes = g_list_next (nodes)) { PinosNode *n = nodes->data; gboolean node_found = FALSE; - g_debug ("node path %s", pinos_node_get_object_path (n)); + g_debug ("node path \"%s\"", pinos_node_get_object_path (n)); /* we found the node */ if (have_name) { diff --git a/pinos/server/link.c b/pinos/server/link.c index 7f58addce..818a775a0 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -235,6 +235,20 @@ on_deactivate (PinosPort *port, gpointer user_data) return TRUE; } +static void +on_format_change (GObject *obj, + GParamSpec *pspec, + gpointer user_data) +{ + PinosLink *link = user_data; + PinosLinkPrivate *priv = link->priv; + GBytes *formats; + + g_object_get (priv->output, "format", &formats, NULL); + g_debug ("port %p: format change %s", priv->output, g_bytes_get_data (formats, NULL)); + g_object_set (priv->input, "format", formats, NULL); +} + static void pinos_link_constructed (GObject * object) { @@ -246,7 +260,6 @@ pinos_link_constructed (GObject * object) on_output_send, link, NULL); - priv->input_id = pinos_port_add_send_buffer_cb (priv->input, on_input_send, link, @@ -257,6 +270,8 @@ pinos_link_constructed (GObject * object) g_object_get (priv->output, "format", &formats, NULL); g_object_set (priv->input, "format", formats, NULL); + g_signal_connect (priv->output, "notify::format", (GCallback) on_format_change, link); + g_signal_connect (priv->input, "activate", (GCallback) on_activate, link); g_signal_connect (priv->input, "deactivate", (GCallback) on_deactivate, link); g_signal_connect (priv->output, "activate", (GCallback) on_activate, link); diff --git a/pinos/server/node.h b/pinos/server/node.h index de19deb65..6b030db91 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -81,7 +81,6 @@ PinosNode * pinos_node_new (PinosDaemon *daemon, PinosProperties *properties); void pinos_node_remove (PinosNode *node); - const gchar * pinos_node_get_name (PinosNode *node); PinosProperties * pinos_node_get_properties (PinosNode *node);