From 0b380dd43e9ab6e8c888c1f0dd8d498b7eed8f2e Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 9 Sep 2016 16:05:58 +0200 Subject: [PATCH] Add suport for clocks Add a clock interface to get raw clocking information from an element Make events and commands to request and configure clock updates. Add support for passing events and commands in control Set the size of the buffers in pinossrc --- pinos/client/stream.c | 278 ++++++++++++++++++------ pinos/gst/gstpinossrc.c | 28 ++- pinos/server/client-node.c | 2 - pinos/server/node.c | 35 ++- spa/include/spa/clock.h | 122 +++++++++++ spa/include/spa/control.h | 55 ++--- spa/include/spa/defs.h | 1 + spa/include/spa/node-command.h | 26 ++- spa/include/spa/node-event.h | 10 + spa/lib/control.c | 138 +++++++----- spa/plugins/alsa/alsa-sink.c | 1 + spa/plugins/audiomixer/audiomixer.c | 1 + spa/plugins/audiotestsrc/audiotestsrc.c | 1 + spa/plugins/ffmpeg/ffmpeg-dec.c | 1 + spa/plugins/ffmpeg/ffmpeg-enc.c | 1 + spa/plugins/remote/proxy.c | 186 ++++++++-------- spa/plugins/v4l2/v4l2-source.c | 69 +++++- spa/plugins/v4l2/v4l2-utils.c | 23 +- spa/plugins/volume/volume.c | 1 + spa/plugins/xv/xv-sink.c | 1 + 20 files changed, 712 insertions(+), 268 deletions(-) create mode 100644 spa/include/spa/clock.h diff --git a/pinos/client/stream.c b/pinos/client/stream.c index b0c917657..cc1d67204 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -82,6 +82,8 @@ struct _PinosStreamPrivate GSource *socket_source; int fd; + GSource *timeout_source; + SpaControl *control; SpaControl recv_control; guint8 recv_data[MAX_BUFFER_SIZE]; @@ -92,6 +94,9 @@ struct _PinosStreamPrivate GArray *buffer_ids; gboolean in_order; + + gint64 last_timestamp; + gint64 last_monotonic; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -588,19 +593,31 @@ add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang static void add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) { - SpaControlCmdStateChange sc; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventStateChange sc; + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; + ne.data = ≻ + ne.size = sizeof (sc); sc.state = state; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_STATE_CHANGE, &sc); + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id) { - SpaControlCmdNeedInput ni; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventNeedInput ni; + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; + ne.data = ∋ + ne.size = sizeof (ni); ni.port_id = port_id; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NEED_INPUT, &ni); + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void @@ -626,12 +643,18 @@ send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) PinosStreamPrivate *priv = stream->priv; SpaControlBuilder builder; SpaControl control; - SpaControlCmdReuseBuffer rb; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventReuseBuffer rb; control_builder_init (stream, &builder); + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; + ne.data = &rb; + ne.size = sizeof (rb); rb.port_id = port_id; rb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &rb); + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -647,14 +670,22 @@ send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) SpaControlBuilder builder; SpaControl control; SpaControlCmdProcessBuffer pb; - SpaControlCmdHaveOutput ho; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventHaveOutput ho; control_builder_init (stream, &builder); pb.port_id = port_id; pb.buffer_id = buffer_id; spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); + + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; + ne.data = &ho; + ne.size = sizeof (ho); ho.port_id = port_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_HAVE_OUTPUT, &ho); + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -681,6 +712,119 @@ find_buffer (PinosStream *stream, uint32_t id) return NULL; } +static gboolean +handle_node_event (PinosStream *stream, + SpaNodeEvent *event) +{ + PinosStreamPrivate *priv = stream->priv; + + switch (event->type) { + case SPA_NODE_EVENT_TYPE_INVALID: + case SPA_NODE_EVENT_TYPE_PORT_ADDED: + case SPA_NODE_EVENT_TYPE_PORT_REMOVED: + case SPA_NODE_EVENT_TYPE_STATE_CHANGE: + case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: + case SPA_NODE_EVENT_TYPE_NEED_INPUT: + g_warning ("unhandled node event %d", event->type); + break; + + case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: + { + SpaNodeEventReuseBuffer *p = event->data; + BufferId *bid; + + if (p->port_id != 0) + break; + if (priv->direction != PINOS_DIRECTION_OUTPUT) + break; + + if ((bid = find_buffer (stream, p->buffer_id))) { + bid->used = FALSE; + g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p->buffer_id); + } + break; + } + case SPA_NODE_EVENT_TYPE_ADD_POLL: + case SPA_NODE_EVENT_TYPE_UPDATE_POLL: + case SPA_NODE_EVENT_TYPE_REMOVE_POLL: + case SPA_NODE_EVENT_TYPE_DRAINED: + case SPA_NODE_EVENT_TYPE_MARKER: + case SPA_NODE_EVENT_TYPE_ERROR: + case SPA_NODE_EVENT_TYPE_BUFFERING: + case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: + g_warning ("unhandled node event %d", event->type); + break; + } + return TRUE; +} + +static gboolean +handle_node_command (PinosStream *stream, + SpaNodeCommand *command) +{ + PinosStreamPrivate *priv = stream->priv; + + switch (command->type) { + case SPA_NODE_COMMAND_INVALID: + break; + case SPA_NODE_COMMAND_PAUSE: + { + SpaControlBuilder builder; + SpaControl control; + + g_debug ("stream %p: stop", stream); + + control_builder_init (stream, &builder); + add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + break; + } + case SPA_NODE_COMMAND_START: + { + SpaControlBuilder builder; + SpaControl control; + + g_debug ("stream %p: start", stream); + control_builder_init (stream, &builder); + if (priv->direction == PINOS_DIRECTION_INPUT) + add_need_input (stream, &builder, 0); + add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); + break; + } + case SPA_NODE_COMMAND_FLUSH: + case SPA_NODE_COMMAND_DRAIN: + case SPA_NODE_COMMAND_MARKER: + g_warning ("unhandled node command %d", command->type); + break; + + case SPA_NODE_COMMAND_CLOCK_UPDATE: + { + SpaNodeCommandClockUpdate *cu = command->data; + g_debug ("got clock update %"PRId64", %"PRId64, cu->timestamp, cu->monotonic_time); + priv->last_timestamp = cu->timestamp; + priv->last_monotonic = cu->monotonic_time; + break; + } + } + return TRUE; +} + static gboolean parse_control (PinosStream *stream, SpaControl *ctrl) @@ -696,10 +840,7 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_NODE_UPDATE: case SPA_CONTROL_CMD_PORT_UPDATE: case SPA_CONTROL_CMD_PORT_REMOVED: - case SPA_CONTROL_CMD_STATE_CHANGE: case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: - case SPA_CONTROL_CMD_NEED_INPUT: - case SPA_CONTROL_CMD_HAVE_OUTPUT: g_warning ("got unexpected control %d", cmd); break; @@ -741,46 +882,6 @@ parse_control (PinosStream *stream, g_warning ("set property not implemented"); break; - case SPA_CONTROL_CMD_START: - { - SpaControlBuilder builder; - SpaControl control; - - g_debug ("stream %p: start", stream); - - control_builder_init (stream, &builder); - if (priv->direction == PINOS_DIRECTION_INPUT) - add_need_input (stream, &builder, 0); - add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); - - stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); - break; - } - case SPA_CONTROL_CMD_PAUSE: - { - SpaControlBuilder builder; - SpaControl control; - - g_debug ("stream %p: stop", stream); - - control_builder_init (stream, &builder); - add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); - - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); - break; - } case SPA_CONTROL_CMD_ADD_MEM: { SpaControlCmdAddMem p; @@ -872,21 +973,24 @@ parse_control (PinosStream *stream, send_need_input (stream, 0); break; } - case SPA_CONTROL_CMD_REUSE_BUFFER: + case SPA_CONTROL_CMD_NODE_EVENT: { - SpaControlCmdReuseBuffer p; - BufferId *bid; - - if (priv->direction != PINOS_DIRECTION_OUTPUT) - break; + SpaControlCmdNodeEvent p; if (spa_control_iter_parse_cmd (&it, &p) < 0) break; - if ((bid = find_buffer (stream, p.buffer_id))) { - bid->used = FALSE; - g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id); - } + handle_node_event (stream, p.event); + break; + } + case SPA_CONTROL_CMD_NODE_COMMAND: + { + SpaControlCmdNodeCommand p; + + if (spa_control_iter_parse_cmd (&it, &p) < 0) + break; + + handle_node_command (stream, p.command); break; } @@ -939,6 +1043,37 @@ on_socket_condition (GSocket *socket, return TRUE; } +static gboolean +on_timeout (gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + SpaControlBuilder builder; + SpaControl control; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventRequestClockUpdate rcu; + + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE; + ne.data = &rcu; + ne.size = sizeof (rcu); + rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME; + rcu.timestamp = 0; + rcu.offset = 0; + + control_builder_init (stream, &builder); + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + return G_SOURCE_CONTINUE; +} + static void handle_socket (PinosStream *stream, gint fd) { @@ -954,6 +1089,10 @@ handle_socket (PinosStream *stream, gint fd) g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL); g_source_attach (priv->socket_source, priv->context->priv->context); + priv->timeout_source = g_timeout_source_new (100); + g_source_set_callback (priv->timeout_source, (GSourceFunc) on_timeout, stream, NULL); + g_source_attach (priv->timeout_source, priv->context->priv->context); + return; /* ERRORS */ @@ -1266,10 +1405,25 @@ pinos_stream_start (PinosStream *stream) static gboolean do_stop (PinosStream *stream) { + PinosStreamPrivate *priv = stream->priv; SpaControlBuilder builder; + SpaControl control; + SpaControlCmdNodeCommand cnc; + SpaNodeCommand nc; control_builder_init (stream, &builder); - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PAUSE, NULL); + cnc.command = &nc; + nc.type = SPA_NODE_COMMAND_PAUSE; + nc.data = NULL; + nc.size = 0; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: failed to write control", stream); + + spa_control_clear (&control); + g_object_unref (stream); return FALSE; diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 6c377b751..e78e2d2ea 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -327,6 +327,7 @@ gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) typedef struct { GstPinosSrc *src; guint id; + SpaBuffer *buf; SpaMetaHeader *header; guint flags; } ProcessMemData; @@ -345,12 +346,15 @@ buffer_recycle (GstMiniObject *obj) { ProcessMemData *data; + GST_LOG_OBJECT (obj, "recycle buffer"); + gst_mini_object_ref (obj); data = gst_mini_object_get_qdata (obj, process_mem_data_quark); GST_BUFFER_FLAGS (obj) = data->flags; pinos_stream_recycle_buffer (data->src->stream, data->id); + GST_LOG_OBJECT (obj, "recycle buffer"); return FALSE; } @@ -378,6 +382,7 @@ on_add_buffer (GObject *gobject, data.src = gst_object_ref (pinossrc); data.id = id; + data.buf = b; data.header = NULL; for (i = 0; i < b->n_metas; i++) { @@ -392,23 +397,21 @@ on_add_buffer (GObject *gobject, } } for (i = 0; i < b->n_datas; i++) { - SpaData *d = &SPA_BUFFER_DATAS (b)[i]; + SpaData *d = &SPA_BUFFER_DATAS(b)[i]; SpaMemory *mem; + GstMemory *gmem; mem = spa_memory_find (&d->mem.mem); if (mem->fd) { - GstMemory *fdmem = NULL; - - fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (mem->fd), + gmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (mem->fd), d->mem.offset + d->mem.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, d->mem.offset, d->mem.size); - gst_buffer_append_memory (buf, fdmem); + gst_memory_resize (gmem, d->mem.offset, d->mem.size); } else { - gst_buffer_append_memory (buf, - gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset, - d->mem.size, NULL, NULL)); + gmem = gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset, + d->mem.size, NULL, NULL); } + gst_buffer_append_memory (buf, gmem); } data.flags = GST_BUFFER_FLAGS (buf); gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), @@ -448,6 +451,7 @@ on_new_buffer (GObject *gobject, if (buf) { ProcessMemData *data; SpaMetaHeader *h; + guint i; data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), process_mem_data_quark); @@ -462,7 +466,11 @@ on_new_buffer (GObject *gobject, } GST_BUFFER_OFFSET (buf) = h->seq; } - + for (i = 0; i < data->buf->n_datas; i++) { + SpaData *d = &SPA_BUFFER_DATAS(data->buf)[i]; + GstMemory *mem = gst_buffer_get_memory (buf, i); + gst_memory_resize (mem, 0, d->mem.size); + } g_queue_push_tail (&pinossrc->queue, buf); pinos_main_loop_signal (pinossrc->loop, FALSE); diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index ccf11ae5e..bf9827840 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -29,8 +29,6 @@ #include #include -#include - #include "pinos/client/pinos.h" #include "pinos/client/enumtypes.h" #include "pinos/client/private.h" diff --git a/pinos/server/node.c b/pinos/server/node.c index 96b21665f..5c99902b1 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -70,6 +70,8 @@ struct _PinosNodePrivate pthread_t thread; GHashTable *links; + + SpaClock *clock; }; G_DEFINE_ABSTRACT_TYPE (PinosNode, pinos_node, G_TYPE_OBJECT); @@ -436,6 +438,10 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } break; } + case SPA_NODE_EVENT_TYPE_NEED_INPUT: + { + break; + } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: { PinosLink *link; @@ -475,6 +481,27 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } break; } + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: + { + SpaResult res; + + if (priv->clock) { + SpaNodeCommand cmd; + SpaNodeCommandClockUpdate cu; + + cmd.type = SPA_NODE_COMMAND_CLOCK_UPDATE; + cmd.data = &cu; + cmd.size = sizeof (cu); + cu.change_mask = SPA_NODE_COMMAND_CLOCK_UPDATE_TIME; + res = spa_clock_get_time (priv->clock, &cu.timestamp, &cu.monotonic_time); + cu.scale = (1 << 16) | 1; + cu.state = SPA_CLOCK_STATE_RUNNING; + + if ((res = spa_node_send_command (this->node, &cmd)) < 0) + g_debug ("got error %d", res); + } + break; + } default: g_debug ("node %p: got event %d", this, event->type); break; @@ -573,9 +600,13 @@ pinos_node_set_property (GObject *_object, break; case PROP_NODE: + { + void *iface; node->node = g_value_get_pointer (value); + if (node->node->handle->get_interface (node->node->handle, SPA_INTERFACE_ID_CLOCK, &iface) >= 0) + priv->clock = iface; break; - + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (node, prop_id, pspec); break; @@ -1115,6 +1146,8 @@ pinos_node_link (PinosNode *output_node, link->input_port = input_port; g_object_ref (link); } else { + if (output_node->priv->clock) + input_node->priv->clock = output_node->priv->clock; link = g_object_new (PINOS_TYPE_LINK, "daemon", priv->daemon, diff --git a/spa/include/spa/clock.h b/spa/include/spa/clock.h new file mode 100644 index 000000000..880449c0a --- /dev/null +++ b/spa/include/spa/clock.h @@ -0,0 +1,122 @@ +/* Simple Plugin API + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_CLOCK_H__ +#define __SPA_CLOCK_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _SpaClock SpaClock; + +/** + * SpaClockState: + * @SPA_CLOCK_STATE_STOPPED: the clock is stopped + * @SPA_CLOCK_STATE_PAUSED: the clock is paused + * @SPA_CLOCK_STATE_RUNNING: the clock is running + */ +typedef enum { + SPA_CLOCK_STATE_STOPPED, + SPA_CLOCK_STATE_PAUSED, + SPA_CLOCK_STATE_RUNNING, +} SpaClockState; + +#include +#include +#include + +#define SPA_INTERFACE_ID_CLOCK 1 +#define SPA_INTERFACE_ID_CLOCK_NAME "Clock interface" +#define SPA_INTERFACE_ID_CLOCK_DESCRIPTION "Clock interface" + +/** + * SpaClock: + * + * The main processing clocks. + */ +struct _SpaClock { + /* pointer to the handle owning this interface */ + SpaHandle *handle; + /* the total size of this clock. This can be used to expand this + * structure in the future */ + size_t size; + /** + * SpaClock::state: + * + * The current state of the clock + */ + SpaClockState state; + /** + * SpaClock::get_props: + * @clock: a #SpaClock + * @props: a location for a #SpaProps pointer + * + * Get the configurable properties of @clock. + * + * The returned @props is a snapshot of the current configuration and + * can be modified. The modifications will take effect after a call + * to SpaClock::set_props. + * + * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_INVALID_ARGUMENTS when clock or props are %NULL + * #SPA_RESULT_NOT_IMPLEMENTED when there are no properties + * implemented on @clock + */ + SpaResult (*get_props) (SpaClock *clock, + SpaProps **props); + /** + * SpaClock::set_props: + * @clock: a #SpaClock + * @props: a #SpaProps + * + * Set the configurable properties in @clock. + * + * Usually, @props will be obtained from SpaClock::get_props and then + * modified but it is also possible to set another #SpaProps object + * as long as its keys and types match those of SpaProps::get_props. + * + * Properties with keys that are not known are ignored. + * + * If @props is NULL, all the properties are reset to their defaults. + * + * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_INVALID_ARGUMENTS when clock is %NULL + * #SPA_RESULT_NOT_IMPLEMENTED when no properties can be + * modified on @clock. + * #SPA_RESULT_WRONG_PROPERTY_TYPE when a property has the wrong + * type. + */ + SpaResult (*set_props) (SpaClock *clock, + const SpaProps *props); + + SpaResult (*get_time) (SpaClock *clock, + int64_t *clock_time, + int64_t *monotonic_time); +}; + +#define spa_clock_get_props(n,...) (n)->get_props((n),__VA_ARGS__) +#define spa_clock_set_props(n,...) (n)->set_props((n),__VA_ARGS__) +#define spa_clock_get_time(n,...) (n)->get_time((n),__VA_ARGS__) + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_CLOCK_H__ */ diff --git a/spa/include/spa/control.h b/spa/include/spa/control.h index d6b422bcd..664702992 100644 --- a/spa/include/spa/control.h +++ b/spa/include/spa/control.h @@ -60,12 +60,7 @@ typedef enum { SPA_CONTROL_CMD_PORT_UPDATE = 2, SPA_CONTROL_CMD_PORT_REMOVED = 3, - SPA_CONTROL_CMD_STATE_CHANGE = 4, - - SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 5, - - SPA_CONTROL_CMD_NEED_INPUT = 6, - SPA_CONTROL_CMD_HAVE_OUTPUT = 7, + SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 4, /* server to client */ SPA_CONTROL_CMD_ADD_PORT = 32, @@ -74,8 +69,7 @@ typedef enum { SPA_CONTROL_CMD_SET_FORMAT = 34, SPA_CONTROL_CMD_SET_PROPERTY = 35, - SPA_CONTROL_CMD_START = 36, - SPA_CONTROL_CMD_PAUSE = 37, + SPA_CONTROL_CMD_NODE_COMMAND = 36, /* both */ SPA_CONTROL_CMD_ADD_MEM = 64, @@ -83,25 +77,28 @@ typedef enum { SPA_CONTROL_CMD_USE_BUFFERS = 66, SPA_CONTROL_CMD_PROCESS_BUFFER = 67, - SPA_CONTROL_CMD_REUSE_BUFFER = 68, + SPA_CONTROL_CMD_NODE_EVENT = 68, } SpaControlCmd; /* SPA_CONTROL_CMD_NODE_UPDATE */ typedef struct { +#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS (1 << 0) +#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS (1 << 1) +#define SPA_CONTROL_CMD_NODE_UPDATE_PROPS (1 << 2) uint32_t change_mask; unsigned int max_input_ports; unsigned int max_output_ports; const SpaProps *props; } SpaControlCmdNodeUpdate; -#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS (1 << 0) -#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS (1 << 1) -#define SPA_CONTROL_CMD_NODE_UPDATE_PROPS (1 << 2) /* SPA_CONTROL_CMD_PORT_UPDATE */ typedef struct { uint32_t port_id; +#define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 0) +#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 1) +#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 2) uint32_t change_mask; unsigned int n_possible_formats; SpaFormat **possible_formats; @@ -109,33 +106,13 @@ typedef struct { const SpaPortInfo *info; } SpaControlCmdPortUpdate; -#define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 0) -#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 1) -#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 2) - /* SPA_CONTROL_CMD_PORT_REMOVED */ typedef struct { uint32_t port_id; } SpaControlCmdPortRemoved; -/* SPA_CONTROL_CMD_STATE_CHANGE */ -typedef struct { - SpaNodeState state; -} SpaControlCmdStateChange; - /* SPA_CONTROL_CMD_PORT_STATUS_CHANGE */ -/* SPA_CONTROL_CMD_NEED_INPUT */ -typedef struct { - uint32_t port_id; -} SpaControlCmdNeedInput; - -/* SPA_CONTROL_CMD_HAVE_OUTPUT */ -typedef struct { - uint32_t port_id; -} SpaControlCmdHaveOutput; - - /* SPA_CONTROL_CMD_ADD_PORT */ typedef struct { uint32_t port_id; @@ -161,8 +138,10 @@ typedef struct { void *value; } SpaControlCmdSetProperty; -/* SPA_CONTROL_CMD_PAUSE */ -/* SPA_CONTROL_CMD_START */ +/* SPA_CONTROL_CMD_NODE_COMMAND */ +typedef struct { + SpaNodeCommand *command; +} SpaControlCmdNodeCommand; /* SPA_CONTROL_CMD_ADD_MEM */ typedef struct { @@ -193,12 +172,10 @@ typedef struct { uint32_t buffer_id; } SpaControlCmdProcessBuffer; -/* SPA_CONTROL_CMD_REUSE_BUFFER */ +/* SPA_CONTROL_CMD_NODE_EVENT */ typedef struct { - uint32_t port_id; - uint32_t buffer_id; -} SpaControlCmdReuseBuffer; - + SpaNodeEvent *event; +} SpaControlCmdNodeEvent; struct _SpaControlIter { /*< private >*/ diff --git a/spa/include/spa/defs.h b/spa/include/spa/defs.h index d99601d05..c9c7d2be9 100644 --- a/spa/include/spa/defs.h +++ b/spa/include/spa/defs.h @@ -76,6 +76,7 @@ typedef void (*SpaNotify) (void *data); #define SPA_PTR_TO_UINT32(p) ((uint32_t) ((uintptr_t) (p))) #define SPA_UINT32_TO_PTR(u) ((void*) ((uintptr_t) (u))) +#define SPA_TIME_INVALID ((uint64_t)-1) #define SPA_IDX_INVALID ((unsigned int)-1) #define SPA_ID_INVALID ((uint32_t)0xffffffff) diff --git a/spa/include/spa/node-command.h b/spa/include/spa/node-command.h index 6678744b2..16396c836 100644 --- a/spa/include/spa/node-command.h +++ b/spa/include/spa/node-command.h @@ -27,6 +27,7 @@ extern "C" { typedef struct _SpaNodeCommand SpaNodeCommand; #include +#include typedef enum { SPA_NODE_COMMAND_INVALID = 0, @@ -35,15 +36,38 @@ typedef enum { SPA_NODE_COMMAND_FLUSH, SPA_NODE_COMMAND_DRAIN, SPA_NODE_COMMAND_MARKER, + SPA_NODE_COMMAND_CLOCK_UPDATE } SpaNodeCommandType; struct _SpaNodeCommand { SpaNodeCommandType type; - uint32_t port_id; void *data; size_t size; }; +/** + * SpaNodeCommandClockUpdate: + * @change_mask: marks which fields are updated + * @timestamp: the new timestamp, when @change_mask = 1<<0 + * @monotonic_time: the new monotonic time associated with @timestamp, when + * @change_mask = 1<<0 + * @offset: the difference between the time when this update was generated + * and @monotonic_time + * @scale: update to the speed stored as Q16.16, @change_mask = 1<<1 + * @state: the new clock state, when @change_mask = 1<<2 + */ +typedef struct { +#define SPA_NODE_COMMAND_CLOCK_UPDATE_TIME (1 << 0) +#define SPA_NODE_COMMAND_CLOCK_UPDATE_SCALE (1 << 1) +#define SPA_NODE_COMMAND_CLOCK_UPDATE_STATE (1 << 2) + uint32_t change_mask; + int64_t timestamp; + int64_t monotonic_time; + int64_t offset; + int32_t scale; + SpaClockState state; +} SpaNodeCommandClockUpdate; + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 474266e9a..486c64d70 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -64,6 +64,7 @@ typedef enum { SPA_NODE_EVENT_TYPE_ERROR, SPA_NODE_EVENT_TYPE_BUFFERING, SPA_NODE_EVENT_TYPE_REQUEST_REFRESH, + SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE, } SpaNodeEventType; struct _SpaNodeEvent { @@ -97,6 +98,15 @@ typedef struct { uint32_t buffer_id; } SpaNodeEventReuseBuffer; +typedef struct { +#define SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME (1 << 0) +#define SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_SCALE (1 << 1) +#define SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_STATE (1 << 2) + uint32_t update_mask; + int64_t timestamp; + int64_t offset; +} SpaNodeEventRequestClockUpdate; + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/lib/control.c b/spa/lib/control.c index 488b9781a..03d6cef33 100644 --- a/spa/lib/control.c +++ b/spa/lib/control.c @@ -477,6 +477,28 @@ iter_parse_use_buffers (struct stack_iter *si, SpaControlCmdUseBuffers *cmd) } } +static void +iter_parse_node_event (struct stack_iter *si, SpaControlCmdNodeEvent *cmd) +{ + void *p = si->data; + memcpy (cmd, p, sizeof (SpaControlCmdNodeEvent)); + if (cmd->event) + cmd->event = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event), SpaNodeEvent); + if (cmd->event->data) + cmd->event->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event->data), void); +} + +static void +iter_parse_node_command (struct stack_iter *si, SpaControlCmdNodeCommand *cmd) +{ + void *p = si->data; + memcpy (cmd, p, sizeof (SpaControlCmdNodeCommand)); + if (cmd->command) + cmd->command = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command), SpaNodeCommand); + if (cmd->command->data) + cmd->command->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command->data), void); +} + SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, void *command) @@ -503,28 +525,10 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, memcpy (command, si->data, sizeof (SpaControlCmdPortRemoved)); break; - case SPA_CONTROL_CMD_STATE_CHANGE: - if (si->size < sizeof (SpaControlCmdStateChange)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdStateChange)); - break; - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: fprintf (stderr, "implement iter of %d\n", si->cmd); break; - case SPA_CONTROL_CMD_NEED_INPUT: - if (si->size < sizeof (SpaControlCmdNeedInput)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdNeedInput)); - break; - - case SPA_CONTROL_CMD_HAVE_OUTPUT: - if (si->size < sizeof (SpaControlCmdHaveOutput)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdHaveOutput)); - break; - /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: if (si->size < sizeof (SpaControlCmdAddPort)) @@ -546,10 +550,6 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, fprintf (stderr, "implement iter of %d\n", si->cmd); break; - case SPA_CONTROL_CMD_START: - case SPA_CONTROL_CMD_PAUSE: - break; - /* bidirectional */ case SPA_CONTROL_CMD_ADD_MEM: if (si->size < sizeof (SpaControlCmdAddMem)) @@ -573,10 +573,12 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, memcpy (command, si->data, sizeof (SpaControlCmdProcessBuffer)); break; - case SPA_CONTROL_CMD_REUSE_BUFFER: - if (si->size < sizeof (SpaControlCmdReuseBuffer)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdReuseBuffer)); + case SPA_CONTROL_CMD_NODE_EVENT: + iter_parse_node_event (si, command); + break; + + case SPA_CONTROL_CMD_NODE_COMMAND: + iter_parse_node_command (si, command); break; case SPA_CONTROL_CMD_INVALID: @@ -1135,6 +1137,61 @@ builder_add_use_buffers (struct stack_builder *sb, SpaControlCmdUseBuffers *ub) } } +static void +builder_add_node_event (struct stack_builder *sb, SpaControlCmdNodeEvent *ev) +{ + size_t len; + void *p; + SpaControlCmdNodeEvent *d; + SpaNodeEvent *ne; + + /* calculate length */ + len = sizeof (SpaControlCmdNodeEvent); + len += sizeof (SpaNodeEvent); + len += ev->event->size; + + p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_EVENT, len); + memcpy (p, ev, sizeof (SpaControlCmdNodeEvent)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeEvent), void); + d->event = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + ne = p; + memcpy (p, ev->event, sizeof (SpaNodeEvent)); + p = SPA_MEMBER (p, sizeof (SpaNodeEvent), void); + ne->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + memcpy (p, ev->event->data, ev->event->size); +} + + +static void +builder_add_node_command (struct stack_builder *sb, SpaControlCmdNodeCommand *cm) +{ + size_t len; + void *p; + SpaControlCmdNodeCommand *d; + SpaNodeCommand *nc; + + /* calculate length */ + len = sizeof (SpaControlCmdNodeCommand); + len += sizeof (SpaNodeCommand); + len += cm->command->size; + + p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_COMMAND, len); + memcpy (p, cm, sizeof (SpaControlCmdNodeCommand)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeCommand), void); + d->command = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + nc = p; + memcpy (p, cm->command, sizeof (SpaNodeCommand)); + p = SPA_MEMBER (p, sizeof (SpaNodeCommand), void); + nc->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + memcpy (p, cm->command->data, cm->command->size); +} + /** * spa_control_builder_add_cmd: * @builder: a #SpaControlBuilder @@ -1172,25 +1229,10 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, memcpy (p, command, sizeof (SpaControlCmdPortRemoved)); break; - case SPA_CONTROL_CMD_STATE_CHANGE: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdStateChange)); - memcpy (p, command, sizeof (SpaControlCmdStateChange)); - break; - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: p = builder_add_cmd (sb, cmd, 0); break; - case SPA_CONTROL_CMD_NEED_INPUT: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdNeedInput)); - memcpy (p, command, sizeof (SpaControlCmdNeedInput)); - break; - - case SPA_CONTROL_CMD_HAVE_OUTPUT: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdHaveOutput)); - memcpy (p, command, sizeof (SpaControlCmdHaveOutput)); - break; - /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddPort)); @@ -1210,11 +1252,6 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, fprintf (stderr, "implement builder of %d\n", cmd); break; - case SPA_CONTROL_CMD_START: - case SPA_CONTROL_CMD_PAUSE: - p = builder_add_cmd (sb, cmd, 0); - break; - /* bidirectional */ case SPA_CONTROL_CMD_ADD_MEM: p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddMem)); @@ -1235,9 +1272,12 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, memcpy (p, command, sizeof (SpaControlCmdProcessBuffer)); break; - case SPA_CONTROL_CMD_REUSE_BUFFER: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdReuseBuffer)); - memcpy (p, command, sizeof (SpaControlCmdReuseBuffer)); + case SPA_CONTROL_CMD_NODE_EVENT: + builder_add_node_event (sb, command); + break; + + case SPA_CONTROL_CMD_NODE_COMMAND: + builder_add_node_command (sb, command); break; default: diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index c6226bcb3..60bd8d770 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -250,6 +250,7 @@ spa_alsa_sink_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index ff80a1615..a34858500 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -177,6 +177,7 @@ spa_audiomixer_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 0062cc3ae..a517f13f3 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -278,6 +278,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index be8eb7ced..584ee937e 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -172,6 +172,7 @@ spa_ffmpeg_dec_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index de73585b0..8250e2b81 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -172,6 +172,7 @@ spa_ffmpeg_enc_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/remote/proxy.c b/spa/plugins/remote/proxy.c index a490f1ecd..069d14a1d 100644 --- a/spa/plugins/remote/proxy.c +++ b/spa/plugins/remote/proxy.c @@ -126,24 +126,6 @@ update_poll (SpaProxy *this, int socketfd) } } -static SpaResult -update_state (SpaProxy *this, SpaNodeState state) -{ - if (this->node.state != state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); - } - return SPA_RESULT_OK; -} - static SpaResult spa_proxy_node_get_props (SpaNode *node, SpaProps **props) @@ -216,10 +198,12 @@ spa_proxy_node_send_command (SpaNode *node, SpaControlBuilder builder; SpaControl control; uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_START, NULL); + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); spa_control_builder_end (&builder, &control); if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) @@ -234,10 +218,12 @@ spa_proxy_node_send_command (SpaNode *node, SpaControlBuilder builder; SpaControl control; uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PAUSE, NULL); + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); spa_control_builder_end (&builder, &control); if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) @@ -251,6 +237,27 @@ spa_proxy_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: return SPA_RESULT_NOT_IMPLEMENTED; + + case SPA_NODE_COMMAND_CLOCK_UPDATE: + { + SpaControlBuilder builder; + SpaControl control; + uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; + + /* send start */ + spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); + spa_control_builder_end (&builder, &control); + + if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) + fprintf (stderr, "proxy %p: error writing control %d\n", this, res); + + spa_control_clear (&control); + break; + break; + } } return SPA_RESULT_OK; } @@ -756,11 +763,13 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, uint32_t buffer_id) { SpaProxy *this; - SpaControlCmdReuseBuffer crb; SpaControlBuilder builder; SpaControl control; uint8_t buf[128]; SpaResult res; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventReuseBuffer rb; if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -772,9 +781,13 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - crb.port_id = port_id; - crb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &crb); + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; + ne.data = &rb; + ne.size = sizeof (rb); + rb.port_id = port_id; + rb.buffer_id = buffer_id; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); spa_control_builder_end (&builder, &control); if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) @@ -885,6 +898,8 @@ spa_proxy_node_port_pull_output (SpaNode *node, info[i].buffer_id = port->buffer_id; info[i].status = SPA_RESULT_OK; + + port->buffer_id = SPA_ID_INVALID; } if (have_error) return SPA_RESULT_ERROR; @@ -899,42 +914,59 @@ spa_proxy_node_port_push_event (SpaNode *node, uint32_t port_id, SpaNodeEvent *event) { - SpaProxy *this; - SpaResult res; - if (node == NULL || node->handle == NULL || event == NULL) return SPA_RESULT_INVALID_ARGUMENTS; - this = (SpaProxy *) node->handle; - switch (event->type) { - case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: - { - SpaNodeEventReuseBuffer *rb = event->data; - SpaControlCmdReuseBuffer crb; - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; - - /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - crb.port_id = rb->port_id; - crb.buffer_id = rb->buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &crb); - spa_control_builder_end (&builder, &control); - - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - fprintf (stderr, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); - return SPA_RESULT_OK; - } default: + fprintf (stderr, "unhandled event %d\n", event->type); break; } return SPA_RESULT_NOT_IMPLEMENTED; } +static SpaResult +handle_node_event (SpaProxy *this, + SpaNodeEvent *event) +{ + switch (event->type) { + case SPA_NODE_EVENT_TYPE_INVALID: + case SPA_NODE_EVENT_TYPE_PORT_ADDED: + case SPA_NODE_EVENT_TYPE_PORT_REMOVED: + this->event_cb (&this->node, event, this->user_data); + break; + + case SPA_NODE_EVENT_TYPE_STATE_CHANGE: + { + SpaNodeEventStateChange *sc = event->data; + + fprintf (stderr, "proxy %p: got state-change to %d\n", this, sc->state); + if (this->node.state != sc->state) { + this->node.state = sc->state; + this->event_cb (&this->node, event, this->user_data); + } + break; + } + + case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: + case SPA_NODE_EVENT_TYPE_NEED_INPUT: + case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: + case SPA_NODE_EVENT_TYPE_ADD_POLL: + case SPA_NODE_EVENT_TYPE_UPDATE_POLL: + case SPA_NODE_EVENT_TYPE_REMOVE_POLL: + case SPA_NODE_EVENT_TYPE_DRAINED: + case SPA_NODE_EVENT_TYPE_MARKER: + case SPA_NODE_EVENT_TYPE_ERROR: + case SPA_NODE_EVENT_TYPE_BUFFERING: + case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: + this->event_cb (&this->node, event, this->user_data); + break; + } + + return SPA_RESULT_OK; +} + static SpaResult parse_control (SpaProxy *this, SpaControl *ctrl) @@ -950,8 +982,7 @@ parse_control (SpaProxy *this, case SPA_CONTROL_CMD_REMOVE_PORT: case SPA_CONTROL_CMD_SET_FORMAT: case SPA_CONTROL_CMD_SET_PROPERTY: - case SPA_CONTROL_CMD_START: - case SPA_CONTROL_CMD_PAUSE: + case SPA_CONTROL_CMD_NODE_COMMAND: fprintf (stderr, "proxy %p: got unexpected control %d\n", this, cmd); break; @@ -995,43 +1026,11 @@ parse_control (SpaProxy *this, break; } - case SPA_CONTROL_CMD_STATE_CHANGE: - { - SpaControlCmdStateChange sc; - - if (spa_control_iter_parse_cmd (&it, &sc) < 0) - break; - - fprintf (stderr, "proxy %p: got state-change to %d\n", this, sc.state); - update_state (this, sc.state); - break; - } case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: { fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd); break; } - case SPA_CONTROL_CMD_NEED_INPUT: - { - break; - } - case SPA_CONTROL_CMD_HAVE_OUTPUT: - { - SpaNodeEvent event; - SpaNodeEventHaveOutput hu; - SpaControlCmdHaveOutput cmd; - - if (spa_control_iter_parse_cmd (&it, &cmd) < 0) - break; - - event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; - event.data = &hu; - event.size = sizeof (hu); - hu.port_id = cmd.port_id; - this->event_cb (&this->node, &event, this->user_data); - break; - } - case SPA_CONTROL_CMD_ADD_MEM: break; case SPA_CONTROL_CMD_REMOVE_MEM: @@ -1048,25 +1047,20 @@ parse_control (SpaProxy *this, break; port = &this->ports[cmd.port_id]; + + if (port->buffer_id != SPA_ID_INVALID) + fprintf (stderr, "proxy %p: unprocessed buffer: %d\n", this, port->buffer_id); port->buffer_id = cmd.buffer_id; break; } - case SPA_CONTROL_CMD_REUSE_BUFFER: + case SPA_CONTROL_CMD_NODE_EVENT: { - SpaNodeEvent event; - SpaNodeEventReuseBuffer rb; - SpaControlCmdReuseBuffer crb; + SpaControlCmdNodeEvent cne; - if (spa_control_iter_parse_cmd (&it, &crb) < 0) + if (spa_control_iter_parse_cmd (&it, &cne) < 0) break; - event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; - event.data = &rb; - event.size = sizeof (rb); - rb.port_id = crb.port_id; - rb.buffer_id = crb.buffer_id; - this->event_cb (&this->node, &event, this->user_data); - + handle_node_event (this, cne.event); break; } default: diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 58dbc0902..290ecdaa8 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -115,11 +115,14 @@ typedef struct { SpaAllocParamBuffers param_buffers; SpaPortStatus status; + int64_t last_timestamp; + int64_t last_monotonic; } SpaV4l2State; struct _SpaV4l2Source { SpaHandle handle; SpaNode node; + SpaClock clock; SpaV4l2SourceProps props[2]; @@ -269,6 +272,7 @@ spa_v4l2_source_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; @@ -739,6 +743,51 @@ static const SpaNode v4l2source_node = { spa_v4l2_source_node_port_push_event, }; +static SpaResult +spa_v4l2_source_clock_get_props (SpaClock *clock, + SpaProps **props) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static SpaResult +spa_v4l2_source_clock_set_props (SpaClock *clock, + const SpaProps *props) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static SpaResult +spa_v4l2_source_clock_get_time (SpaClock *clock, + int64_t *clock_time, + int64_t *monotonic_time) +{ + SpaV4l2Source *this; + SpaV4l2State *state; + + if (clock == NULL || clock->handle == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + this = (SpaV4l2Source *) clock->handle; + state = &this->state[0]; + + if (clock_time) + *clock_time = state->last_timestamp; + if (monotonic_time) + *monotonic_time = state->last_monotonic; + + return SPA_RESULT_OK; +} + +static const SpaClock v4l2source_clock = { + NULL, + sizeof (SpaClock), + SPA_CLOCK_STATE_STOPPED, + spa_v4l2_source_clock_get_props, + spa_v4l2_source_clock_set_props, + spa_v4l2_source_clock_get_time, +}; + static SpaResult spa_v4l2_source_get_interface (SpaHandle *handle, uint32_t interface_id, @@ -755,6 +804,9 @@ spa_v4l2_source_get_interface (SpaHandle *handle, case SPA_INTERFACE_ID_NODE: *interface = &this->node; break; + case SPA_INTERFACE_ID_CLOCK: + *interface = &this->clock; + break; default: return SPA_RESULT_UNKNOWN_INTERFACE; } @@ -782,6 +834,8 @@ v4l2_source_init (const SpaHandleFactory *factory, this = (SpaV4l2Source *) handle; this->node = v4l2source_node; this->node.handle = handle; + this->clock = v4l2source_clock; + this->clock.handle = handle; this->props[1].props.n_prop_info = PROP_ID_LAST; this->props[1].props.prop_info = prop_info; reset_v4l2_source_props (&this->props[1]); @@ -800,6 +854,10 @@ static const SpaInterfaceInfo v4l2_source_interfaces[] = SPA_INTERFACE_ID_NODE_NAME, SPA_INTERFACE_ID_NODE_DESCRIPTION, }, + { SPA_INTERFACE_ID_CLOCK, + SPA_INTERFACE_ID_CLOCK_NAME, + SPA_INTERFACE_ID_CLOCK_DESCRIPTION, + }, }; static SpaResult @@ -814,13 +872,10 @@ v4l2_source_enum_interface_info (const SpaHandleFactory *factory, index = (*state == NULL ? 0 : *(int*)state); - switch (index) { - case 0: - *info = &v4l2_source_interfaces[index]; - break; - default: - return SPA_RESULT_ENUM_END; - } + if (index < 0 || index >= SPA_N_ELEMENTS (v4l2_source_interfaces)) + return SPA_RESULT_ENUM_END; + + *info = &v4l2_source_interfaces[index]; *(int*)state = ++index; return SPA_RESULT_OK; } diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index a59ab3d12..de303c6b9 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -811,6 +811,7 @@ mmap_read (SpaV4l2Source *this) SpaV4l2State *state = &this->state[0]; struct v4l2_buffer buf; V4l2Buffer *b; + SpaData *d; CLEAR(buf); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; @@ -829,8 +830,22 @@ mmap_read (SpaV4l2Source *this) } b = &state->alloc_buffers[buf.index]; + b->header.flags = SPA_BUFFER_FLAG_NONE; + if (buf.flags & V4L2_BUF_FLAG_ERROR) + b->header.flags |= SPA_BUFFER_FLAG_CORRUPTED; + b->header.seq = buf.sequence; b->header.pts = (uint64_t)buf.timestamp.tv_sec * 1000000000lu + (uint64_t)buf.timestamp.tv_usec * 1000lu; + state->last_timestamp = b->header.pts; + + if (buf.flags & V4L2_BUF_FLAG_TIMESTAMP_MONOTONIC) + state->last_monotonic = state->last_timestamp; + else + state->last_monotonic = SPA_TIME_INVALID; + + d = SPA_BUFFER_DATAS (b->outbuf); + d[0].mem.size = buf.bytesused; + b->next = state->ready; state->ready = b; state->ready_count++; @@ -893,7 +908,7 @@ spa_v4l2_use_buffers (SpaV4l2Source *this, SpaBuffer **buffers, uint32_t n_buffe V4l2Buffer *b; SpaMemoryRef *mem_ref; SpaMemory *mem; - SpaData *d = SPA_BUFFER_DATAS (buffers[i]); + SpaData *d; b = &state->alloc_buffers[i]; b->buffer.mem.mem = state->alloc_mem->mem; @@ -911,6 +926,12 @@ spa_v4l2_use_buffers (SpaV4l2Source *this, SpaBuffer **buffers, uint32_t n_buffe continue; } + if (buffers[i]->n_datas < 1) { + fprintf (stderr, "invalid memory on buffer %p\n", buffers[i]); + continue; + } + d = SPA_BUFFER_DATAS (buffers[i]); + CLEAR (b->v4l2_buffer); b->v4l2_buffer.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; b->v4l2_buffer.memory = state->memtype; diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index ff91dd38a..278cf7c19 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -182,6 +182,7 @@ spa_volume_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/xv/xv-sink.c b/spa/plugins/xv/xv-sink.c index 052bed829..76ece4ff2 100644 --- a/spa/plugins/xv/xv-sink.c +++ b/spa/plugins/xv/xv-sink.c @@ -207,6 +207,7 @@ spa_xv_sink_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK;