diff --git a/pinos/client/stream.c b/pinos/client/stream.c index b0c917657..cc1d67204 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -82,6 +82,8 @@ struct _PinosStreamPrivate GSource *socket_source; int fd; + GSource *timeout_source; + SpaControl *control; SpaControl recv_control; guint8 recv_data[MAX_BUFFER_SIZE]; @@ -92,6 +94,9 @@ struct _PinosStreamPrivate GArray *buffer_ids; gboolean in_order; + + gint64 last_timestamp; + gint64 last_monotonic; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -588,19 +593,31 @@ add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang static void add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) { - SpaControlCmdStateChange sc; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventStateChange sc; + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; + ne.data = ≻ + ne.size = sizeof (sc); sc.state = state; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_STATE_CHANGE, &sc); + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id) { - SpaControlCmdNeedInput ni; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventNeedInput ni; + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; + ne.data = ∋ + ne.size = sizeof (ni); ni.port_id = port_id; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NEED_INPUT, &ni); + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void @@ -626,12 +643,18 @@ send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) PinosStreamPrivate *priv = stream->priv; SpaControlBuilder builder; SpaControl control; - SpaControlCmdReuseBuffer rb; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventReuseBuffer rb; control_builder_init (stream, &builder); + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; + ne.data = &rb; + ne.size = sizeof (rb); rb.port_id = port_id; rb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &rb); + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -647,14 +670,22 @@ send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) SpaControlBuilder builder; SpaControl control; SpaControlCmdProcessBuffer pb; - SpaControlCmdHaveOutput ho; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventHaveOutput ho; control_builder_init (stream, &builder); pb.port_id = port_id; pb.buffer_id = buffer_id; spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); + + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; + ne.data = &ho; + ne.size = sizeof (ho); ho.port_id = port_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_HAVE_OUTPUT, &ho); + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -681,6 +712,119 @@ find_buffer (PinosStream *stream, uint32_t id) return NULL; } +static gboolean +handle_node_event (PinosStream *stream, + SpaNodeEvent *event) +{ + PinosStreamPrivate *priv = stream->priv; + + switch (event->type) { + case SPA_NODE_EVENT_TYPE_INVALID: + case SPA_NODE_EVENT_TYPE_PORT_ADDED: + case SPA_NODE_EVENT_TYPE_PORT_REMOVED: + case SPA_NODE_EVENT_TYPE_STATE_CHANGE: + case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: + case SPA_NODE_EVENT_TYPE_NEED_INPUT: + g_warning ("unhandled node event %d", event->type); + break; + + case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: + { + SpaNodeEventReuseBuffer *p = event->data; + BufferId *bid; + + if (p->port_id != 0) + break; + if (priv->direction != PINOS_DIRECTION_OUTPUT) + break; + + if ((bid = find_buffer (stream, p->buffer_id))) { + bid->used = FALSE; + g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p->buffer_id); + } + break; + } + case SPA_NODE_EVENT_TYPE_ADD_POLL: + case SPA_NODE_EVENT_TYPE_UPDATE_POLL: + case SPA_NODE_EVENT_TYPE_REMOVE_POLL: + case SPA_NODE_EVENT_TYPE_DRAINED: + case SPA_NODE_EVENT_TYPE_MARKER: + case SPA_NODE_EVENT_TYPE_ERROR: + case SPA_NODE_EVENT_TYPE_BUFFERING: + case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: + g_warning ("unhandled node event %d", event->type); + break; + } + return TRUE; +} + +static gboolean +handle_node_command (PinosStream *stream, + SpaNodeCommand *command) +{ + PinosStreamPrivate *priv = stream->priv; + + switch (command->type) { + case SPA_NODE_COMMAND_INVALID: + break; + case SPA_NODE_COMMAND_PAUSE: + { + SpaControlBuilder builder; + SpaControl control; + + g_debug ("stream %p: stop", stream); + + control_builder_init (stream, &builder); + add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + break; + } + case SPA_NODE_COMMAND_START: + { + SpaControlBuilder builder; + SpaControl control; + + g_debug ("stream %p: start", stream); + control_builder_init (stream, &builder); + if (priv->direction == PINOS_DIRECTION_INPUT) + add_need_input (stream, &builder, 0); + add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); + break; + } + case SPA_NODE_COMMAND_FLUSH: + case SPA_NODE_COMMAND_DRAIN: + case SPA_NODE_COMMAND_MARKER: + g_warning ("unhandled node command %d", command->type); + break; + + case SPA_NODE_COMMAND_CLOCK_UPDATE: + { + SpaNodeCommandClockUpdate *cu = command->data; + g_debug ("got clock update %"PRId64", %"PRId64, cu->timestamp, cu->monotonic_time); + priv->last_timestamp = cu->timestamp; + priv->last_monotonic = cu->monotonic_time; + break; + } + } + return TRUE; +} + static gboolean parse_control (PinosStream *stream, SpaControl *ctrl) @@ -696,10 +840,7 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_NODE_UPDATE: case SPA_CONTROL_CMD_PORT_UPDATE: case SPA_CONTROL_CMD_PORT_REMOVED: - case SPA_CONTROL_CMD_STATE_CHANGE: case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: - case SPA_CONTROL_CMD_NEED_INPUT: - case SPA_CONTROL_CMD_HAVE_OUTPUT: g_warning ("got unexpected control %d", cmd); break; @@ -741,46 +882,6 @@ parse_control (PinosStream *stream, g_warning ("set property not implemented"); break; - case SPA_CONTROL_CMD_START: - { - SpaControlBuilder builder; - SpaControl control; - - g_debug ("stream %p: start", stream); - - control_builder_init (stream, &builder); - if (priv->direction == PINOS_DIRECTION_INPUT) - add_need_input (stream, &builder, 0); - add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); - - stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); - break; - } - case SPA_CONTROL_CMD_PAUSE: - { - SpaControlBuilder builder; - SpaControl control; - - g_debug ("stream %p: stop", stream); - - control_builder_init (stream, &builder); - add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); - - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); - break; - } case SPA_CONTROL_CMD_ADD_MEM: { SpaControlCmdAddMem p; @@ -872,21 +973,24 @@ parse_control (PinosStream *stream, send_need_input (stream, 0); break; } - case SPA_CONTROL_CMD_REUSE_BUFFER: + case SPA_CONTROL_CMD_NODE_EVENT: { - SpaControlCmdReuseBuffer p; - BufferId *bid; - - if (priv->direction != PINOS_DIRECTION_OUTPUT) - break; + SpaControlCmdNodeEvent p; if (spa_control_iter_parse_cmd (&it, &p) < 0) break; - if ((bid = find_buffer (stream, p.buffer_id))) { - bid->used = FALSE; - g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id); - } + handle_node_event (stream, p.event); + break; + } + case SPA_CONTROL_CMD_NODE_COMMAND: + { + SpaControlCmdNodeCommand p; + + if (spa_control_iter_parse_cmd (&it, &p) < 0) + break; + + handle_node_command (stream, p.command); break; } @@ -939,6 +1043,37 @@ on_socket_condition (GSocket *socket, return TRUE; } +static gboolean +on_timeout (gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + SpaControlBuilder builder; + SpaControl control; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventRequestClockUpdate rcu; + + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE; + ne.data = &rcu; + ne.size = sizeof (rcu); + rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME; + rcu.timestamp = 0; + rcu.offset = 0; + + control_builder_init (stream, &builder); + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + return G_SOURCE_CONTINUE; +} + static void handle_socket (PinosStream *stream, gint fd) { @@ -954,6 +1089,10 @@ handle_socket (PinosStream *stream, gint fd) g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL); g_source_attach (priv->socket_source, priv->context->priv->context); + priv->timeout_source = g_timeout_source_new (100); + g_source_set_callback (priv->timeout_source, (GSourceFunc) on_timeout, stream, NULL); + g_source_attach (priv->timeout_source, priv->context->priv->context); + return; /* ERRORS */ @@ -1266,10 +1405,25 @@ pinos_stream_start (PinosStream *stream) static gboolean do_stop (PinosStream *stream) { + PinosStreamPrivate *priv = stream->priv; SpaControlBuilder builder; + SpaControl control; + SpaControlCmdNodeCommand cnc; + SpaNodeCommand nc; control_builder_init (stream, &builder); - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PAUSE, NULL); + cnc.command = &nc; + nc.type = SPA_NODE_COMMAND_PAUSE; + nc.data = NULL; + nc.size = 0; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: failed to write control", stream); + + spa_control_clear (&control); + g_object_unref (stream); return FALSE; diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 6c377b751..e78e2d2ea 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -327,6 +327,7 @@ gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) typedef struct { GstPinosSrc *src; guint id; + SpaBuffer *buf; SpaMetaHeader *header; guint flags; } ProcessMemData; @@ -345,12 +346,15 @@ buffer_recycle (GstMiniObject *obj) { ProcessMemData *data; + GST_LOG_OBJECT (obj, "recycle buffer"); + gst_mini_object_ref (obj); data = gst_mini_object_get_qdata (obj, process_mem_data_quark); GST_BUFFER_FLAGS (obj) = data->flags; pinos_stream_recycle_buffer (data->src->stream, data->id); + GST_LOG_OBJECT (obj, "recycle buffer"); return FALSE; } @@ -378,6 +382,7 @@ on_add_buffer (GObject *gobject, data.src = gst_object_ref (pinossrc); data.id = id; + data.buf = b; data.header = NULL; for (i = 0; i < b->n_metas; i++) { @@ -392,23 +397,21 @@ on_add_buffer (GObject *gobject, } } for (i = 0; i < b->n_datas; i++) { - SpaData *d = &SPA_BUFFER_DATAS (b)[i]; + SpaData *d = &SPA_BUFFER_DATAS(b)[i]; SpaMemory *mem; + GstMemory *gmem; mem = spa_memory_find (&d->mem.mem); if (mem->fd) { - GstMemory *fdmem = NULL; - - fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (mem->fd), + gmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (mem->fd), d->mem.offset + d->mem.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, d->mem.offset, d->mem.size); - gst_buffer_append_memory (buf, fdmem); + gst_memory_resize (gmem, d->mem.offset, d->mem.size); } else { - gst_buffer_append_memory (buf, - gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset, - d->mem.size, NULL, NULL)); + gmem = gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset, + d->mem.size, NULL, NULL); } + gst_buffer_append_memory (buf, gmem); } data.flags = GST_BUFFER_FLAGS (buf); gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), @@ -448,6 +451,7 @@ on_new_buffer (GObject *gobject, if (buf) { ProcessMemData *data; SpaMetaHeader *h; + guint i; data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), process_mem_data_quark); @@ -462,7 +466,11 @@ on_new_buffer (GObject *gobject, } GST_BUFFER_OFFSET (buf) = h->seq; } - + for (i = 0; i < data->buf->n_datas; i++) { + SpaData *d = &SPA_BUFFER_DATAS(data->buf)[i]; + GstMemory *mem = gst_buffer_get_memory (buf, i); + gst_memory_resize (mem, 0, d->mem.size); + } g_queue_push_tail (&pinossrc->queue, buf); pinos_main_loop_signal (pinossrc->loop, FALSE); diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index ccf11ae5e..bf9827840 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -29,8 +29,6 @@ #include #include -#include - #include "pinos/client/pinos.h" #include "pinos/client/enumtypes.h" #include "pinos/client/private.h" diff --git a/pinos/server/node.c b/pinos/server/node.c index 96b21665f..5c99902b1 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -70,6 +70,8 @@ struct _PinosNodePrivate pthread_t thread; GHashTable *links; + + SpaClock *clock; }; G_DEFINE_ABSTRACT_TYPE (PinosNode, pinos_node, G_TYPE_OBJECT); @@ -436,6 +438,10 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } break; } + case SPA_NODE_EVENT_TYPE_NEED_INPUT: + { + break; + } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: { PinosLink *link; @@ -475,6 +481,27 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } break; } + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: + { + SpaResult res; + + if (priv->clock) { + SpaNodeCommand cmd; + SpaNodeCommandClockUpdate cu; + + cmd.type = SPA_NODE_COMMAND_CLOCK_UPDATE; + cmd.data = &cu; + cmd.size = sizeof (cu); + cu.change_mask = SPA_NODE_COMMAND_CLOCK_UPDATE_TIME; + res = spa_clock_get_time (priv->clock, &cu.timestamp, &cu.monotonic_time); + cu.scale = (1 << 16) | 1; + cu.state = SPA_CLOCK_STATE_RUNNING; + + if ((res = spa_node_send_command (this->node, &cmd)) < 0) + g_debug ("got error %d", res); + } + break; + } default: g_debug ("node %p: got event %d", this, event->type); break; @@ -573,9 +600,13 @@ pinos_node_set_property (GObject *_object, break; case PROP_NODE: + { + void *iface; node->node = g_value_get_pointer (value); + if (node->node->handle->get_interface (node->node->handle, SPA_INTERFACE_ID_CLOCK, &iface) >= 0) + priv->clock = iface; break; - + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (node, prop_id, pspec); break; @@ -1115,6 +1146,8 @@ pinos_node_link (PinosNode *output_node, link->input_port = input_port; g_object_ref (link); } else { + if (output_node->priv->clock) + input_node->priv->clock = output_node->priv->clock; link = g_object_new (PINOS_TYPE_LINK, "daemon", priv->daemon, diff --git a/spa/include/spa/clock.h b/spa/include/spa/clock.h new file mode 100644 index 000000000..880449c0a --- /dev/null +++ b/spa/include/spa/clock.h @@ -0,0 +1,122 @@ +/* Simple Plugin API + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_CLOCK_H__ +#define __SPA_CLOCK_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _SpaClock SpaClock; + +/** + * SpaClockState: + * @SPA_CLOCK_STATE_STOPPED: the clock is stopped + * @SPA_CLOCK_STATE_PAUSED: the clock is paused + * @SPA_CLOCK_STATE_RUNNING: the clock is running + */ +typedef enum { + SPA_CLOCK_STATE_STOPPED, + SPA_CLOCK_STATE_PAUSED, + SPA_CLOCK_STATE_RUNNING, +} SpaClockState; + +#include +#include +#include + +#define SPA_INTERFACE_ID_CLOCK 1 +#define SPA_INTERFACE_ID_CLOCK_NAME "Clock interface" +#define SPA_INTERFACE_ID_CLOCK_DESCRIPTION "Clock interface" + +/** + * SpaClock: + * + * The main processing clocks. + */ +struct _SpaClock { + /* pointer to the handle owning this interface */ + SpaHandle *handle; + /* the total size of this clock. This can be used to expand this + * structure in the future */ + size_t size; + /** + * SpaClock::state: + * + * The current state of the clock + */ + SpaClockState state; + /** + * SpaClock::get_props: + * @clock: a #SpaClock + * @props: a location for a #SpaProps pointer + * + * Get the configurable properties of @clock. + * + * The returned @props is a snapshot of the current configuration and + * can be modified. The modifications will take effect after a call + * to SpaClock::set_props. + * + * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_INVALID_ARGUMENTS when clock or props are %NULL + * #SPA_RESULT_NOT_IMPLEMENTED when there are no properties + * implemented on @clock + */ + SpaResult (*get_props) (SpaClock *clock, + SpaProps **props); + /** + * SpaClock::set_props: + * @clock: a #SpaClock + * @props: a #SpaProps + * + * Set the configurable properties in @clock. + * + * Usually, @props will be obtained from SpaClock::get_props and then + * modified but it is also possible to set another #SpaProps object + * as long as its keys and types match those of SpaProps::get_props. + * + * Properties with keys that are not known are ignored. + * + * If @props is NULL, all the properties are reset to their defaults. + * + * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_INVALID_ARGUMENTS when clock is %NULL + * #SPA_RESULT_NOT_IMPLEMENTED when no properties can be + * modified on @clock. + * #SPA_RESULT_WRONG_PROPERTY_TYPE when a property has the wrong + * type. + */ + SpaResult (*set_props) (SpaClock *clock, + const SpaProps *props); + + SpaResult (*get_time) (SpaClock *clock, + int64_t *clock_time, + int64_t *monotonic_time); +}; + +#define spa_clock_get_props(n,...) (n)->get_props((n),__VA_ARGS__) +#define spa_clock_set_props(n,...) (n)->set_props((n),__VA_ARGS__) +#define spa_clock_get_time(n,...) (n)->get_time((n),__VA_ARGS__) + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_CLOCK_H__ */ diff --git a/spa/include/spa/control.h b/spa/include/spa/control.h index d6b422bcd..664702992 100644 --- a/spa/include/spa/control.h +++ b/spa/include/spa/control.h @@ -60,12 +60,7 @@ typedef enum { SPA_CONTROL_CMD_PORT_UPDATE = 2, SPA_CONTROL_CMD_PORT_REMOVED = 3, - SPA_CONTROL_CMD_STATE_CHANGE = 4, - - SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 5, - - SPA_CONTROL_CMD_NEED_INPUT = 6, - SPA_CONTROL_CMD_HAVE_OUTPUT = 7, + SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 4, /* server to client */ SPA_CONTROL_CMD_ADD_PORT = 32, @@ -74,8 +69,7 @@ typedef enum { SPA_CONTROL_CMD_SET_FORMAT = 34, SPA_CONTROL_CMD_SET_PROPERTY = 35, - SPA_CONTROL_CMD_START = 36, - SPA_CONTROL_CMD_PAUSE = 37, + SPA_CONTROL_CMD_NODE_COMMAND = 36, /* both */ SPA_CONTROL_CMD_ADD_MEM = 64, @@ -83,25 +77,28 @@ typedef enum { SPA_CONTROL_CMD_USE_BUFFERS = 66, SPA_CONTROL_CMD_PROCESS_BUFFER = 67, - SPA_CONTROL_CMD_REUSE_BUFFER = 68, + SPA_CONTROL_CMD_NODE_EVENT = 68, } SpaControlCmd; /* SPA_CONTROL_CMD_NODE_UPDATE */ typedef struct { +#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS (1 << 0) +#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS (1 << 1) +#define SPA_CONTROL_CMD_NODE_UPDATE_PROPS (1 << 2) uint32_t change_mask; unsigned int max_input_ports; unsigned int max_output_ports; const SpaProps *props; } SpaControlCmdNodeUpdate; -#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS (1 << 0) -#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS (1 << 1) -#define SPA_CONTROL_CMD_NODE_UPDATE_PROPS (1 << 2) /* SPA_CONTROL_CMD_PORT_UPDATE */ typedef struct { uint32_t port_id; +#define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 0) +#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 1) +#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 2) uint32_t change_mask; unsigned int n_possible_formats; SpaFormat **possible_formats; @@ -109,33 +106,13 @@ typedef struct { const SpaPortInfo *info; } SpaControlCmdPortUpdate; -#define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 0) -#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 1) -#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 2) - /* SPA_CONTROL_CMD_PORT_REMOVED */ typedef struct { uint32_t port_id; } SpaControlCmdPortRemoved; -/* SPA_CONTROL_CMD_STATE_CHANGE */ -typedef struct { - SpaNodeState state; -} SpaControlCmdStateChange; - /* SPA_CONTROL_CMD_PORT_STATUS_CHANGE */ -/* SPA_CONTROL_CMD_NEED_INPUT */ -typedef struct { - uint32_t port_id; -} SpaControlCmdNeedInput; - -/* SPA_CONTROL_CMD_HAVE_OUTPUT */ -typedef struct { - uint32_t port_id; -} SpaControlCmdHaveOutput; - - /* SPA_CONTROL_CMD_ADD_PORT */ typedef struct { uint32_t port_id; @@ -161,8 +138,10 @@ typedef struct { void *value; } SpaControlCmdSetProperty; -/* SPA_CONTROL_CMD_PAUSE */ -/* SPA_CONTROL_CMD_START */ +/* SPA_CONTROL_CMD_NODE_COMMAND */ +typedef struct { + SpaNodeCommand *command; +} SpaControlCmdNodeCommand; /* SPA_CONTROL_CMD_ADD_MEM */ typedef struct { @@ -193,12 +172,10 @@ typedef struct { uint32_t buffer_id; } SpaControlCmdProcessBuffer; -/* SPA_CONTROL_CMD_REUSE_BUFFER */ +/* SPA_CONTROL_CMD_NODE_EVENT */ typedef struct { - uint32_t port_id; - uint32_t buffer_id; -} SpaControlCmdReuseBuffer; - + SpaNodeEvent *event; +} SpaControlCmdNodeEvent; struct _SpaControlIter { /*< private >*/ diff --git a/spa/include/spa/defs.h b/spa/include/spa/defs.h index d99601d05..c9c7d2be9 100644 --- a/spa/include/spa/defs.h +++ b/spa/include/spa/defs.h @@ -76,6 +76,7 @@ typedef void (*SpaNotify) (void *data); #define SPA_PTR_TO_UINT32(p) ((uint32_t) ((uintptr_t) (p))) #define SPA_UINT32_TO_PTR(u) ((void*) ((uintptr_t) (u))) +#define SPA_TIME_INVALID ((uint64_t)-1) #define SPA_IDX_INVALID ((unsigned int)-1) #define SPA_ID_INVALID ((uint32_t)0xffffffff) diff --git a/spa/include/spa/node-command.h b/spa/include/spa/node-command.h index 6678744b2..16396c836 100644 --- a/spa/include/spa/node-command.h +++ b/spa/include/spa/node-command.h @@ -27,6 +27,7 @@ extern "C" { typedef struct _SpaNodeCommand SpaNodeCommand; #include +#include typedef enum { SPA_NODE_COMMAND_INVALID = 0, @@ -35,15 +36,38 @@ typedef enum { SPA_NODE_COMMAND_FLUSH, SPA_NODE_COMMAND_DRAIN, SPA_NODE_COMMAND_MARKER, + SPA_NODE_COMMAND_CLOCK_UPDATE } SpaNodeCommandType; struct _SpaNodeCommand { SpaNodeCommandType type; - uint32_t port_id; void *data; size_t size; }; +/** + * SpaNodeCommandClockUpdate: + * @change_mask: marks which fields are updated + * @timestamp: the new timestamp, when @change_mask = 1<<0 + * @monotonic_time: the new monotonic time associated with @timestamp, when + * @change_mask = 1<<0 + * @offset: the difference between the time when this update was generated + * and @monotonic_time + * @scale: update to the speed stored as Q16.16, @change_mask = 1<<1 + * @state: the new clock state, when @change_mask = 1<<2 + */ +typedef struct { +#define SPA_NODE_COMMAND_CLOCK_UPDATE_TIME (1 << 0) +#define SPA_NODE_COMMAND_CLOCK_UPDATE_SCALE (1 << 1) +#define SPA_NODE_COMMAND_CLOCK_UPDATE_STATE (1 << 2) + uint32_t change_mask; + int64_t timestamp; + int64_t monotonic_time; + int64_t offset; + int32_t scale; + SpaClockState state; +} SpaNodeCommandClockUpdate; + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 474266e9a..486c64d70 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -64,6 +64,7 @@ typedef enum { SPA_NODE_EVENT_TYPE_ERROR, SPA_NODE_EVENT_TYPE_BUFFERING, SPA_NODE_EVENT_TYPE_REQUEST_REFRESH, + SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE, } SpaNodeEventType; struct _SpaNodeEvent { @@ -97,6 +98,15 @@ typedef struct { uint32_t buffer_id; } SpaNodeEventReuseBuffer; +typedef struct { +#define SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME (1 << 0) +#define SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_SCALE (1 << 1) +#define SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_STATE (1 << 2) + uint32_t update_mask; + int64_t timestamp; + int64_t offset; +} SpaNodeEventRequestClockUpdate; + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/lib/control.c b/spa/lib/control.c index 488b9781a..03d6cef33 100644 --- a/spa/lib/control.c +++ b/spa/lib/control.c @@ -477,6 +477,28 @@ iter_parse_use_buffers (struct stack_iter *si, SpaControlCmdUseBuffers *cmd) } } +static void +iter_parse_node_event (struct stack_iter *si, SpaControlCmdNodeEvent *cmd) +{ + void *p = si->data; + memcpy (cmd, p, sizeof (SpaControlCmdNodeEvent)); + if (cmd->event) + cmd->event = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event), SpaNodeEvent); + if (cmd->event->data) + cmd->event->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event->data), void); +} + +static void +iter_parse_node_command (struct stack_iter *si, SpaControlCmdNodeCommand *cmd) +{ + void *p = si->data; + memcpy (cmd, p, sizeof (SpaControlCmdNodeCommand)); + if (cmd->command) + cmd->command = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command), SpaNodeCommand); + if (cmd->command->data) + cmd->command->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command->data), void); +} + SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, void *command) @@ -503,28 +525,10 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, memcpy (command, si->data, sizeof (SpaControlCmdPortRemoved)); break; - case SPA_CONTROL_CMD_STATE_CHANGE: - if (si->size < sizeof (SpaControlCmdStateChange)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdStateChange)); - break; - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: fprintf (stderr, "implement iter of %d\n", si->cmd); break; - case SPA_CONTROL_CMD_NEED_INPUT: - if (si->size < sizeof (SpaControlCmdNeedInput)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdNeedInput)); - break; - - case SPA_CONTROL_CMD_HAVE_OUTPUT: - if (si->size < sizeof (SpaControlCmdHaveOutput)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdHaveOutput)); - break; - /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: if (si->size < sizeof (SpaControlCmdAddPort)) @@ -546,10 +550,6 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, fprintf (stderr, "implement iter of %d\n", si->cmd); break; - case SPA_CONTROL_CMD_START: - case SPA_CONTROL_CMD_PAUSE: - break; - /* bidirectional */ case SPA_CONTROL_CMD_ADD_MEM: if (si->size < sizeof (SpaControlCmdAddMem)) @@ -573,10 +573,12 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, memcpy (command, si->data, sizeof (SpaControlCmdProcessBuffer)); break; - case SPA_CONTROL_CMD_REUSE_BUFFER: - if (si->size < sizeof (SpaControlCmdReuseBuffer)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdReuseBuffer)); + case SPA_CONTROL_CMD_NODE_EVENT: + iter_parse_node_event (si, command); + break; + + case SPA_CONTROL_CMD_NODE_COMMAND: + iter_parse_node_command (si, command); break; case SPA_CONTROL_CMD_INVALID: @@ -1135,6 +1137,61 @@ builder_add_use_buffers (struct stack_builder *sb, SpaControlCmdUseBuffers *ub) } } +static void +builder_add_node_event (struct stack_builder *sb, SpaControlCmdNodeEvent *ev) +{ + size_t len; + void *p; + SpaControlCmdNodeEvent *d; + SpaNodeEvent *ne; + + /* calculate length */ + len = sizeof (SpaControlCmdNodeEvent); + len += sizeof (SpaNodeEvent); + len += ev->event->size; + + p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_EVENT, len); + memcpy (p, ev, sizeof (SpaControlCmdNodeEvent)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeEvent), void); + d->event = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + ne = p; + memcpy (p, ev->event, sizeof (SpaNodeEvent)); + p = SPA_MEMBER (p, sizeof (SpaNodeEvent), void); + ne->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + memcpy (p, ev->event->data, ev->event->size); +} + + +static void +builder_add_node_command (struct stack_builder *sb, SpaControlCmdNodeCommand *cm) +{ + size_t len; + void *p; + SpaControlCmdNodeCommand *d; + SpaNodeCommand *nc; + + /* calculate length */ + len = sizeof (SpaControlCmdNodeCommand); + len += sizeof (SpaNodeCommand); + len += cm->command->size; + + p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_COMMAND, len); + memcpy (p, cm, sizeof (SpaControlCmdNodeCommand)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeCommand), void); + d->command = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + nc = p; + memcpy (p, cm->command, sizeof (SpaNodeCommand)); + p = SPA_MEMBER (p, sizeof (SpaNodeCommand), void); + nc->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + memcpy (p, cm->command->data, cm->command->size); +} + /** * spa_control_builder_add_cmd: * @builder: a #SpaControlBuilder @@ -1172,25 +1229,10 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, memcpy (p, command, sizeof (SpaControlCmdPortRemoved)); break; - case SPA_CONTROL_CMD_STATE_CHANGE: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdStateChange)); - memcpy (p, command, sizeof (SpaControlCmdStateChange)); - break; - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: p = builder_add_cmd (sb, cmd, 0); break; - case SPA_CONTROL_CMD_NEED_INPUT: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdNeedInput)); - memcpy (p, command, sizeof (SpaControlCmdNeedInput)); - break; - - case SPA_CONTROL_CMD_HAVE_OUTPUT: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdHaveOutput)); - memcpy (p, command, sizeof (SpaControlCmdHaveOutput)); - break; - /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddPort)); @@ -1210,11 +1252,6 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, fprintf (stderr, "implement builder of %d\n", cmd); break; - case SPA_CONTROL_CMD_START: - case SPA_CONTROL_CMD_PAUSE: - p = builder_add_cmd (sb, cmd, 0); - break; - /* bidirectional */ case SPA_CONTROL_CMD_ADD_MEM: p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddMem)); @@ -1235,9 +1272,12 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, memcpy (p, command, sizeof (SpaControlCmdProcessBuffer)); break; - case SPA_CONTROL_CMD_REUSE_BUFFER: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdReuseBuffer)); - memcpy (p, command, sizeof (SpaControlCmdReuseBuffer)); + case SPA_CONTROL_CMD_NODE_EVENT: + builder_add_node_event (sb, command); + break; + + case SPA_CONTROL_CMD_NODE_COMMAND: + builder_add_node_command (sb, command); break; default: diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index c6226bcb3..60bd8d770 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -250,6 +250,7 @@ spa_alsa_sink_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index ff80a1615..a34858500 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -177,6 +177,7 @@ spa_audiomixer_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 0062cc3ae..a517f13f3 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -278,6 +278,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index be8eb7ced..584ee937e 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -172,6 +172,7 @@ spa_ffmpeg_dec_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index de73585b0..8250e2b81 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -172,6 +172,7 @@ spa_ffmpeg_enc_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/remote/proxy.c b/spa/plugins/remote/proxy.c index a490f1ecd..069d14a1d 100644 --- a/spa/plugins/remote/proxy.c +++ b/spa/plugins/remote/proxy.c @@ -126,24 +126,6 @@ update_poll (SpaProxy *this, int socketfd) } } -static SpaResult -update_state (SpaProxy *this, SpaNodeState state) -{ - if (this->node.state != state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); - } - return SPA_RESULT_OK; -} - static SpaResult spa_proxy_node_get_props (SpaNode *node, SpaProps **props) @@ -216,10 +198,12 @@ spa_proxy_node_send_command (SpaNode *node, SpaControlBuilder builder; SpaControl control; uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_START, NULL); + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); spa_control_builder_end (&builder, &control); if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) @@ -234,10 +218,12 @@ spa_proxy_node_send_command (SpaNode *node, SpaControlBuilder builder; SpaControl control; uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PAUSE, NULL); + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); spa_control_builder_end (&builder, &control); if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) @@ -251,6 +237,27 @@ spa_proxy_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: return SPA_RESULT_NOT_IMPLEMENTED; + + case SPA_NODE_COMMAND_CLOCK_UPDATE: + { + SpaControlBuilder builder; + SpaControl control; + uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; + + /* send start */ + spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); + spa_control_builder_end (&builder, &control); + + if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) + fprintf (stderr, "proxy %p: error writing control %d\n", this, res); + + spa_control_clear (&control); + break; + break; + } } return SPA_RESULT_OK; } @@ -756,11 +763,13 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, uint32_t buffer_id) { SpaProxy *this; - SpaControlCmdReuseBuffer crb; SpaControlBuilder builder; SpaControl control; uint8_t buf[128]; SpaResult res; + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventReuseBuffer rb; if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -772,9 +781,13 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - crb.port_id = port_id; - crb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &crb); + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; + ne.data = &rb; + ne.size = sizeof (rb); + rb.port_id = port_id; + rb.buffer_id = buffer_id; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); spa_control_builder_end (&builder, &control); if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) @@ -885,6 +898,8 @@ spa_proxy_node_port_pull_output (SpaNode *node, info[i].buffer_id = port->buffer_id; info[i].status = SPA_RESULT_OK; + + port->buffer_id = SPA_ID_INVALID; } if (have_error) return SPA_RESULT_ERROR; @@ -899,42 +914,59 @@ spa_proxy_node_port_push_event (SpaNode *node, uint32_t port_id, SpaNodeEvent *event) { - SpaProxy *this; - SpaResult res; - if (node == NULL || node->handle == NULL || event == NULL) return SPA_RESULT_INVALID_ARGUMENTS; - this = (SpaProxy *) node->handle; - switch (event->type) { - case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: - { - SpaNodeEventReuseBuffer *rb = event->data; - SpaControlCmdReuseBuffer crb; - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; - - /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - crb.port_id = rb->port_id; - crb.buffer_id = rb->buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &crb); - spa_control_builder_end (&builder, &control); - - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - fprintf (stderr, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); - return SPA_RESULT_OK; - } default: + fprintf (stderr, "unhandled event %d\n", event->type); break; } return SPA_RESULT_NOT_IMPLEMENTED; } +static SpaResult +handle_node_event (SpaProxy *this, + SpaNodeEvent *event) +{ + switch (event->type) { + case SPA_NODE_EVENT_TYPE_INVALID: + case SPA_NODE_EVENT_TYPE_PORT_ADDED: + case SPA_NODE_EVENT_TYPE_PORT_REMOVED: + this->event_cb (&this->node, event, this->user_data); + break; + + case SPA_NODE_EVENT_TYPE_STATE_CHANGE: + { + SpaNodeEventStateChange *sc = event->data; + + fprintf (stderr, "proxy %p: got state-change to %d\n", this, sc->state); + if (this->node.state != sc->state) { + this->node.state = sc->state; + this->event_cb (&this->node, event, this->user_data); + } + break; + } + + case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: + case SPA_NODE_EVENT_TYPE_NEED_INPUT: + case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: + case SPA_NODE_EVENT_TYPE_ADD_POLL: + case SPA_NODE_EVENT_TYPE_UPDATE_POLL: + case SPA_NODE_EVENT_TYPE_REMOVE_POLL: + case SPA_NODE_EVENT_TYPE_DRAINED: + case SPA_NODE_EVENT_TYPE_MARKER: + case SPA_NODE_EVENT_TYPE_ERROR: + case SPA_NODE_EVENT_TYPE_BUFFERING: + case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: + this->event_cb (&this->node, event, this->user_data); + break; + } + + return SPA_RESULT_OK; +} + static SpaResult parse_control (SpaProxy *this, SpaControl *ctrl) @@ -950,8 +982,7 @@ parse_control (SpaProxy *this, case SPA_CONTROL_CMD_REMOVE_PORT: case SPA_CONTROL_CMD_SET_FORMAT: case SPA_CONTROL_CMD_SET_PROPERTY: - case SPA_CONTROL_CMD_START: - case SPA_CONTROL_CMD_PAUSE: + case SPA_CONTROL_CMD_NODE_COMMAND: fprintf (stderr, "proxy %p: got unexpected control %d\n", this, cmd); break; @@ -995,43 +1026,11 @@ parse_control (SpaProxy *this, break; } - case SPA_CONTROL_CMD_STATE_CHANGE: - { - SpaControlCmdStateChange sc; - - if (spa_control_iter_parse_cmd (&it, &sc) < 0) - break; - - fprintf (stderr, "proxy %p: got state-change to %d\n", this, sc.state); - update_state (this, sc.state); - break; - } case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: { fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd); break; } - case SPA_CONTROL_CMD_NEED_INPUT: - { - break; - } - case SPA_CONTROL_CMD_HAVE_OUTPUT: - { - SpaNodeEvent event; - SpaNodeEventHaveOutput hu; - SpaControlCmdHaveOutput cmd; - - if (spa_control_iter_parse_cmd (&it, &cmd) < 0) - break; - - event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; - event.data = &hu; - event.size = sizeof (hu); - hu.port_id = cmd.port_id; - this->event_cb (&this->node, &event, this->user_data); - break; - } - case SPA_CONTROL_CMD_ADD_MEM: break; case SPA_CONTROL_CMD_REMOVE_MEM: @@ -1048,25 +1047,20 @@ parse_control (SpaProxy *this, break; port = &this->ports[cmd.port_id]; + + if (port->buffer_id != SPA_ID_INVALID) + fprintf (stderr, "proxy %p: unprocessed buffer: %d\n", this, port->buffer_id); port->buffer_id = cmd.buffer_id; break; } - case SPA_CONTROL_CMD_REUSE_BUFFER: + case SPA_CONTROL_CMD_NODE_EVENT: { - SpaNodeEvent event; - SpaNodeEventReuseBuffer rb; - SpaControlCmdReuseBuffer crb; + SpaControlCmdNodeEvent cne; - if (spa_control_iter_parse_cmd (&it, &crb) < 0) + if (spa_control_iter_parse_cmd (&it, &cne) < 0) break; - event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; - event.data = &rb; - event.size = sizeof (rb); - rb.port_id = crb.port_id; - rb.buffer_id = crb.buffer_id; - this->event_cb (&this->node, &event, this->user_data); - + handle_node_event (this, cne.event); break; } default: diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 58dbc0902..290ecdaa8 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -115,11 +115,14 @@ typedef struct { SpaAllocParamBuffers param_buffers; SpaPortStatus status; + int64_t last_timestamp; + int64_t last_monotonic; } SpaV4l2State; struct _SpaV4l2Source { SpaHandle handle; SpaNode node; + SpaClock clock; SpaV4l2SourceProps props[2]; @@ -269,6 +272,7 @@ spa_v4l2_source_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; @@ -739,6 +743,51 @@ static const SpaNode v4l2source_node = { spa_v4l2_source_node_port_push_event, }; +static SpaResult +spa_v4l2_source_clock_get_props (SpaClock *clock, + SpaProps **props) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static SpaResult +spa_v4l2_source_clock_set_props (SpaClock *clock, + const SpaProps *props) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static SpaResult +spa_v4l2_source_clock_get_time (SpaClock *clock, + int64_t *clock_time, + int64_t *monotonic_time) +{ + SpaV4l2Source *this; + SpaV4l2State *state; + + if (clock == NULL || clock->handle == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + this = (SpaV4l2Source *) clock->handle; + state = &this->state[0]; + + if (clock_time) + *clock_time = state->last_timestamp; + if (monotonic_time) + *monotonic_time = state->last_monotonic; + + return SPA_RESULT_OK; +} + +static const SpaClock v4l2source_clock = { + NULL, + sizeof (SpaClock), + SPA_CLOCK_STATE_STOPPED, + spa_v4l2_source_clock_get_props, + spa_v4l2_source_clock_set_props, + spa_v4l2_source_clock_get_time, +}; + static SpaResult spa_v4l2_source_get_interface (SpaHandle *handle, uint32_t interface_id, @@ -755,6 +804,9 @@ spa_v4l2_source_get_interface (SpaHandle *handle, case SPA_INTERFACE_ID_NODE: *interface = &this->node; break; + case SPA_INTERFACE_ID_CLOCK: + *interface = &this->clock; + break; default: return SPA_RESULT_UNKNOWN_INTERFACE; } @@ -782,6 +834,8 @@ v4l2_source_init (const SpaHandleFactory *factory, this = (SpaV4l2Source *) handle; this->node = v4l2source_node; this->node.handle = handle; + this->clock = v4l2source_clock; + this->clock.handle = handle; this->props[1].props.n_prop_info = PROP_ID_LAST; this->props[1].props.prop_info = prop_info; reset_v4l2_source_props (&this->props[1]); @@ -800,6 +854,10 @@ static const SpaInterfaceInfo v4l2_source_interfaces[] = SPA_INTERFACE_ID_NODE_NAME, SPA_INTERFACE_ID_NODE_DESCRIPTION, }, + { SPA_INTERFACE_ID_CLOCK, + SPA_INTERFACE_ID_CLOCK_NAME, + SPA_INTERFACE_ID_CLOCK_DESCRIPTION, + }, }; static SpaResult @@ -814,13 +872,10 @@ v4l2_source_enum_interface_info (const SpaHandleFactory *factory, index = (*state == NULL ? 0 : *(int*)state); - switch (index) { - case 0: - *info = &v4l2_source_interfaces[index]; - break; - default: - return SPA_RESULT_ENUM_END; - } + if (index < 0 || index >= SPA_N_ELEMENTS (v4l2_source_interfaces)) + return SPA_RESULT_ENUM_END; + + *info = &v4l2_source_interfaces[index]; *(int*)state = ++index; return SPA_RESULT_OK; } diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index a59ab3d12..de303c6b9 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -811,6 +811,7 @@ mmap_read (SpaV4l2Source *this) SpaV4l2State *state = &this->state[0]; struct v4l2_buffer buf; V4l2Buffer *b; + SpaData *d; CLEAR(buf); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; @@ -829,8 +830,22 @@ mmap_read (SpaV4l2Source *this) } b = &state->alloc_buffers[buf.index]; + b->header.flags = SPA_BUFFER_FLAG_NONE; + if (buf.flags & V4L2_BUF_FLAG_ERROR) + b->header.flags |= SPA_BUFFER_FLAG_CORRUPTED; + b->header.seq = buf.sequence; b->header.pts = (uint64_t)buf.timestamp.tv_sec * 1000000000lu + (uint64_t)buf.timestamp.tv_usec * 1000lu; + state->last_timestamp = b->header.pts; + + if (buf.flags & V4L2_BUF_FLAG_TIMESTAMP_MONOTONIC) + state->last_monotonic = state->last_timestamp; + else + state->last_monotonic = SPA_TIME_INVALID; + + d = SPA_BUFFER_DATAS (b->outbuf); + d[0].mem.size = buf.bytesused; + b->next = state->ready; state->ready = b; state->ready_count++; @@ -893,7 +908,7 @@ spa_v4l2_use_buffers (SpaV4l2Source *this, SpaBuffer **buffers, uint32_t n_buffe V4l2Buffer *b; SpaMemoryRef *mem_ref; SpaMemory *mem; - SpaData *d = SPA_BUFFER_DATAS (buffers[i]); + SpaData *d; b = &state->alloc_buffers[i]; b->buffer.mem.mem = state->alloc_mem->mem; @@ -911,6 +926,12 @@ spa_v4l2_use_buffers (SpaV4l2Source *this, SpaBuffer **buffers, uint32_t n_buffe continue; } + if (buffers[i]->n_datas < 1) { + fprintf (stderr, "invalid memory on buffer %p\n", buffers[i]); + continue; + } + d = SPA_BUFFER_DATAS (buffers[i]); + CLEAR (b->v4l2_buffer); b->v4l2_buffer.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; b->v4l2_buffer.memory = state->memtype; diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index ff91dd38a..278cf7c19 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -182,6 +182,7 @@ spa_volume_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK; diff --git a/spa/plugins/xv/xv-sink.c b/spa/plugins/xv/xv-sink.c index 052bed829..76ece4ff2 100644 --- a/spa/plugins/xv/xv-sink.c +++ b/spa/plugins/xv/xv-sink.c @@ -207,6 +207,7 @@ spa_xv_sink_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + case SPA_NODE_COMMAND_CLOCK_UPDATE: return SPA_RESULT_NOT_IMPLEMENTED; } return SPA_RESULT_OK;