diff --git a/pinos/Makefile.am b/pinos/Makefile.am index 42c17055c..d06a18eb3 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -239,7 +239,8 @@ libgstpinos_la_SOURCES = \ gst/gstpinosformat.c \ gst/gstpinosdeviceprovider.c \ gst/gstpinossrc.c \ - gst/gstpinossink.c + gst/gstpinossink.c \ + gst/gstpinospool.c #gst/gstpinosdepay.c #gst/gstpinosportsrc.c #gst/gstpinosportsink.c diff --git a/pinos/client/properties.c b/pinos/client/properties.c index fc5677f1c..6bcdcbe25 100644 --- a/pinos/client/properties.c +++ b/pinos/client/properties.c @@ -131,7 +131,8 @@ pinos_properties_free (PinosProperties *properties) * @value: a value * * Set the property in @properties with @key to @value. Any previous value - * of @key will be overwritten. + * of @key will be overwritten. When @value is %NULL, the key will be + * removed. */ void pinos_properties_set (PinosProperties *properties, @@ -140,9 +141,11 @@ pinos_properties_set (PinosProperties *properties, { g_return_if_fail (properties != NULL); g_return_if_fail (key != NULL); - g_return_if_fail (value != NULL); - g_hash_table_replace (properties->hashtable, g_strdup (key), g_strdup (value)); + if (value == NULL) + g_hash_table_remove (properties->hashtable, key); + else + g_hash_table_replace (properties->hashtable, g_strdup (key), g_strdup (value)); } /** @@ -193,23 +196,6 @@ pinos_properties_get (PinosProperties *properties, return g_hash_table_lookup (properties->hashtable, key); } -/** - * pinos_properties_remove: - * @properties: a #PinosProperties - * @key: a key - * - * Remove the property in @properties with @key. - */ -void -pinos_properties_remove (PinosProperties *properties, - const gchar *key) -{ - g_return_if_fail (properties != NULL); - g_return_if_fail (key != NULL); - - g_hash_table_remove (properties->hashtable, key); -} - /** * pinos_properties_iterate: * @properties: a #PinosProperties diff --git a/pinos/client/properties.h b/pinos/client/properties.h index 545c65573..fdb2614e9 100644 --- a/pinos/client/properties.h +++ b/pinos/client/properties.h @@ -44,8 +44,6 @@ void pinos_properties_setf (PinosProperties *properties, ...) G_GNUC_PRINTF (3, 4); const gchar * pinos_properties_get (PinosProperties *properties, const gchar *key); -void pinos_properties_remove (PinosProperties *properties, - const gchar *key); const gchar * pinos_properties_iterate (PinosProperties *properties, gpointer *state); diff --git a/pinos/client/ringbuffer.c b/pinos/client/ringbuffer.c index 318d39ce5..fc965836a 100644 --- a/pinos/client/ringbuffer.c +++ b/pinos/client/ringbuffer.c @@ -71,8 +71,6 @@ enum LAST_SIGNAL }; -static guint signals[LAST_SIGNAL] = { 0 }; - static void pinos_ringbuffer_get_property (GObject *_object, guint prop_id, @@ -367,7 +365,8 @@ pinos_ringbuffer_read_advance (PinosRingbuffer *rbuf, if (priv->mode == PINOS_RINGBUFFER_MODE_READ) { val = 1; - write (priv->semaphore, &val, 8); + if (write (priv->semaphore, &val, 8) != 8) + g_warning ("error writing semaphore"); } return TRUE; @@ -387,7 +386,8 @@ pinos_ringbuffer_write_advance (PinosRingbuffer *rbuf, if (priv->mode == PINOS_RINGBUFFER_MODE_WRITE) { val = 1; - write (priv->semaphore, &val, 8); + if (write (priv->semaphore, &val, 8) != 8) + g_warning ("error writing semaphore"); } return TRUE; } diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 07303554f..ab4ae097a 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -37,7 +37,7 @@ #include "pinos/client/format.h" #include "pinos/client/private.h" -#define MAX_BUFFER_SIZE 1024 +#define MAX_BUFFER_SIZE 4096 #define MAX_FDS 16 typedef struct { @@ -46,6 +46,7 @@ typedef struct { int fd; off_t offset; size_t size; + bool used; SpaBuffer *buf; } BufferId; @@ -75,6 +76,9 @@ struct _PinosStreamPrivate GPtrArray *possible_formats; SpaFormat *format; SpaPortInfo port_info; + SpaAllocParam *port_params[2]; + SpaAllocParamMetaEnable param_meta_enable; + SpaAllocParamBuffers param_buffers; PinosStreamFlags flags; @@ -86,8 +90,6 @@ struct _PinosStreamPrivate GSource *socket_source; int fd; - SpaBuffer *buffer; - SpaControl *control; SpaControl recv_control; guint8 recv_data[MAX_BUFFER_SIZE]; @@ -97,6 +99,7 @@ struct _PinosStreamPrivate int send_fds[MAX_FDS]; GArray *buffer_ids; + gboolean in_order; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -117,6 +120,8 @@ enum enum { + SIGNAL_ADD_BUFFER, + SIGNAL_REMOVE_BUFFER, SIGNAL_NEW_BUFFER, LAST_SIGNAL }; @@ -394,10 +399,41 @@ pinos_stream_class_init (PinosStreamClass * klass) G_PARAM_STATIC_STRINGS)); /** * PinosStream:new-buffer + * @id: the buffer id * - * When doing pinos_stream_start() with #PINOS_STREAM_MODE_BUFFER, this signal - * will be fired whenever a new buffer can be obtained with - * pinos_stream_capture_buffer(). + * this signal will be fired whenever a buffer is added to the pool of buffers. + */ + signals[SIGNAL_ADD_BUFFER] = g_signal_new ("add-buffer", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 1, + G_TYPE_UINT); + /** + * PinosStream:remove-buffer + * @id: the buffer id + * + * this signal will be fired whenever a buffer is removed from the pool of buffers. + */ + signals[SIGNAL_REMOVE_BUFFER] = g_signal_new ("remove-buffer", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 1, + G_TYPE_UINT); + /** + * PinosStream:new-buffer + * @id: the buffer id + * + * this signal will be fired whenever a buffer is ready to be processed. */ signals[SIGNAL_NEW_BUFFER] = g_signal_new ("new-buffer", G_TYPE_FROM_CLASS (klass), @@ -407,8 +443,8 @@ pinos_stream_class_init (PinosStreamClass * klass) NULL, g_cclosure_marshal_generic, G_TYPE_NONE, - 0, - G_TYPE_NONE); + 1, + G_TYPE_UINT); } static void @@ -421,6 +457,7 @@ pinos_stream_init (PinosStream * stream) priv->state = PINOS_STREAM_STATE_UNCONNECTED; priv->buffer_ids = g_array_sized_new (FALSE, FALSE, sizeof (BufferId), 64); g_array_set_clear_func (priv->buffer_ids, (GDestroyNotify) clear_buffer_id); + priv->in_order = TRUE; } /** @@ -524,24 +561,110 @@ control_builder_init (PinosStream *stream, SpaControlBuilder *builder) } static void -send_need_input (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) +add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlCmdNodeUpdate nu = { 0, }; + + nu.change_mask = change_mask; + if (change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS) + nu.max_input_ports = priv->direction == PINOS_DIRECTION_INPUT ? 1 : 0; + if (change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS) + nu.max_output_ports = priv->direction == PINOS_DIRECTION_OUTPUT ? 1 : 0; + nu.props = NULL; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_UPDATE, &nu); +} + +static void +add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlCmdPortUpdate pu = { 0, };; + + pu.port_id = 0; + pu.change_mask = change_mask; + if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_DIRECTION) + pu.direction = priv->direction; + if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS) { + pu.n_possible_formats = priv->possible_formats->len; + pu.possible_formats = (SpaFormat **)priv->possible_formats->pdata; + } + pu.props = NULL; + if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO) + pu.info = &priv->port_info; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu); +} + +static void +add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) +{ + SpaControlCmdStateChange sc; + + sc.state = state; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_STATE_CHANGE, &sc); +} + +static void +add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id) +{ + SpaControlCmdNeedInput ni; + + ni.port_id = port_id; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NEED_INPUT, &ni); +} + +static void +send_need_input (PinosStream *stream, uint32_t port_id) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlBuilder builder; + SpaControl control; + + control_builder_init (stream, &builder); + add_need_input (stream, &builder, port_id); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); +} + +static void +send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) { PinosStreamPrivate *priv = stream->priv; SpaControlBuilder builder; SpaControl control; - SpaControlCmdNeedInput ni; SpaControlCmdReuseBuffer rb; control_builder_init (stream, &builder); - if (buffer_id != SPA_ID_INVALID) { - rb.port_id = port_id; - rb.buffer_id = buffer_id; - rb.offset = 0; - rb.size = -1; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &rb); - } - ni.port_id = port_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NEED_INPUT, &ni); + rb.port_id = port_id; + rb.buffer_id = buffer_id; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &rb); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); +} + +static void +send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlBuilder builder; + SpaControl control; + SpaControlCmdProcessBuffer pb; + SpaControlCmdHaveOutput ho; + + control_builder_init (stream, &builder); + pb.port_id = port_id; + pb.buffer_id = buffer_id; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); + ho.port_id = port_id; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_HAVE_OUTPUT, &ho); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -555,10 +678,15 @@ find_buffer (PinosStream *stream, uint32_t id) { PinosStreamPrivate *priv = stream->priv; guint i; - for (i = 0; i < priv->buffer_ids->len; i++) { - BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i); - if (bid->id == id) - return bid; + + if (priv->in_order && id < priv->buffer_ids->len) { + return &g_array_index (priv->buffer_ids, BufferId, id); + } else { + for (i = 0; i < priv->buffer_ids->len; i++) { + BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i); + if (bid->id == id) + return bid; + } } return NULL; } @@ -593,9 +721,6 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_SET_FORMAT: { SpaControlCmdSetFormat p; - SpaControlBuilder builder; - SpaControl control; - SpaControlCmdStateChange sc; if (spa_control_iter_parse_cmd (&it, &p) < 0) break; @@ -607,19 +732,19 @@ parse_control (PinosStream *stream, spa_debug_format (p.format); g_object_notify (G_OBJECT (stream), "format"); - control_builder_init (stream, &builder); + if (priv->port_info.n_params != 0) { + SpaControlBuilder builder; + SpaControl control; - /* FIXME send update port status */ + control_builder_init (stream, &builder); + add_state_change (stream, &builder, SPA_NODE_STATE_READY); + spa_control_builder_end (&builder, &control); - /* send state-change */ - sc.state = SPA_NODE_STATE_READY; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_STATE_CHANGE, &sc); - spa_control_builder_end (&builder, &control); + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + spa_control_clear (&control); + } break; } case SPA_CONTROL_CMD_SET_PROPERTY: @@ -628,21 +753,44 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_START: { + SpaControlBuilder builder; + SpaControl control; + g_debug ("stream %p: start", stream); + control_builder_init (stream, &builder); if (priv->direction == PINOS_DIRECTION_INPUT) - send_need_input (stream, 0, SPA_ID_INVALID); + add_need_input (stream, &builder, 0); + add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); break; } case SPA_CONTROL_CMD_STOP: { + SpaControlBuilder builder; + SpaControl control; + g_debug ("stream %p: stop", stream); + + control_builder_init (stream, &builder); + add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); break; } - case SPA_CONTROL_CMD_ADD_MEM: { SpaControlCmdAddMem p; @@ -697,7 +845,12 @@ parse_control (PinosStream *stream, bid.size = p.mem.size; bid.buf = SPA_MEMBER (spa_memory_ensure_ptr (mem), p.mem.offset, SpaBuffer); + if (bid.id != priv->buffer_ids->len) { + g_warning ("unexpected id %u found, expected %u", bid.id, priv->buffer_ids->len); + priv->in_order = FALSE; + } g_array_append_val (priv->buffer_ids, bid); + g_signal_emit (stream, signals[SIGNAL_ADD_BUFFER], 0, p.buffer_id); break; } case SPA_CONTROL_CMD_REMOVE_BUFFER: @@ -709,30 +862,44 @@ parse_control (PinosStream *stream, break; g_debug ("remove buffer %d", p.buffer_id); - if ((bid = find_buffer (stream, p.buffer_id))) - bid->cleanup = true; + if ((bid = find_buffer (stream, p.buffer_id))) { + bid->cleanup = TRUE; + bid->used = TRUE; + g_signal_emit (stream, signals[SIGNAL_REMOVE_BUFFER], 0, p.buffer_id); + } break; } case SPA_CONTROL_CMD_PROCESS_BUFFER: { SpaControlCmdProcessBuffer p; - BufferId *bid; + + if (priv->direction != PINOS_DIRECTION_INPUT) + break; if (spa_control_iter_parse_cmd (&it, &p) < 0) break; - if ((bid = find_buffer (stream, p.buffer_id))) - priv->buffer = bid->buf; + g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id); + + send_need_input (stream, 0); break; } case SPA_CONTROL_CMD_REUSE_BUFFER: { SpaControlCmdReuseBuffer p; + BufferId *bid; + + if (priv->direction != PINOS_DIRECTION_OUTPUT) + break; if (spa_control_iter_parse_cmd (&it, &p) < 0) break; g_debug ("reuse buffer %d", p.buffer_id); + if ((bid = find_buffer (stream, p.buffer_id))) { + bid->used = FALSE; + g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id); + } break; } @@ -772,18 +939,17 @@ on_socket_condition (GSocket *socket, parse_control (stream, control); - if (priv->buffer) { - g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); - send_need_input (stream, 0, priv->buffer->id); - priv->buffer = NULL; - } for (i = 0; i < priv->buffer_ids->len; i++) { BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i); if (bid->cleanup) { g_array_remove_index_fast (priv->buffer_ids, i); i--; + priv->in_order = FALSE; } } + if (!priv->in_order && priv->buffer_ids->len == 0) + priv->in_order = TRUE; + spa_control_clear (control); break; } @@ -835,45 +1001,6 @@ unhandle_socket (PinosStream *stream) } } -static void -do_node_init (PinosStream *stream) -{ - PinosStreamPrivate *priv = stream->priv; - SpaControlCmdNodeUpdate nu; - SpaControlCmdPortUpdate pu; - SpaControlBuilder builder; - SpaControl control; - - control_builder_init (stream, &builder); - nu.change_mask = SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | - SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS; - nu.max_input_ports = priv->direction == PINOS_DIRECTION_INPUT ? 1 : 0; - nu.max_output_ports = priv->direction == PINOS_DIRECTION_OUTPUT ? 1 : 0; - nu.props = NULL; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_UPDATE, &nu); - - pu.port_id = 0; - pu.change_mask = SPA_CONTROL_CMD_PORT_UPDATE_DIRECTION | - SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | - SPA_CONTROL_CMD_PORT_UPDATE_INFO; - - pu.direction = priv->direction; - pu.n_possible_formats = priv->possible_formats->len; - pu.possible_formats = (SpaFormat **)priv->possible_formats->pdata; - pu.props = NULL; - pu.info = &priv->port_info; - priv->port_info.flags = SPA_PORT_INFO_FLAG_NONE | - SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); -} - - static void on_node_proxy (GObject *source_object, GAsyncResult *res, @@ -882,6 +1009,9 @@ on_node_proxy (GObject *source_object, PinosStream *stream = user_data; PinosStreamPrivate *priv = stream->priv; PinosContext *context = priv->context; + SpaControlBuilder builder; + SpaControl control; + GError *error = NULL; priv->node = pinos_subscribe_get_proxy_finish (context->priv->subscribe, @@ -890,7 +1020,23 @@ on_node_proxy (GObject *source_object, if (priv->node == NULL) goto node_failed; - do_node_init (stream); + control_builder_init (stream, &builder); + add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | + SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); + + priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_DIRECTION | + SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | + SPA_CONTROL_CMD_PORT_UPDATE_INFO); + + add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); + + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); g_object_unref (stream); @@ -978,8 +1124,9 @@ do_connect (PinosStream *stream) if (priv->properties == NULL) priv->properties = pinos_properties_new (NULL, NULL); - pinos_properties_set (priv->properties, - "pinos.target.node", priv->path); + if (priv->path) + pinos_properties_set (priv->properties, + "pinos.target.node", priv->path); g_dbus_proxy_call (context->priv->daemon, "CreateClientNode", @@ -1048,17 +1195,68 @@ pinos_stream_connect (PinosStream *stream, return TRUE; } +/** + * pinos_stream_start_allocation: + * @stream: a #PinosStream + * @props: a #PinosProperties + * + * Returns: %TRUE on success + */ +gboolean +pinos_stream_start_allocation (PinosStream *stream, + PinosProperties *props) +{ + PinosStreamPrivate *priv; + PinosContext *context; + SpaControlBuilder builder; + SpaControl control; + + g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); + priv = stream->priv; + context = priv->context; + + g_return_val_if_fail (pinos_context_get_state (context) == PINOS_CONTEXT_STATE_CONNECTED, FALSE); + + control_builder_init (stream, &builder); + + priv->port_info.params = priv->port_params; + priv->port_info.n_params = 1; + + priv->port_params[0] = &priv->param_buffers.param; + priv->param_buffers.param.type = SPA_ALLOC_PARAM_TYPE_BUFFERS; + priv->param_buffers.param.size = sizeof (SpaAllocParamBuffers); + priv->param_buffers.minsize = 115200; + priv->param_buffers.stride = 640; + priv->param_buffers.min_buffers = 0; + priv->param_buffers.max_buffers = 0; + priv->param_buffers.align = 16; + + /* send update port status */ + add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO); + + /* send state-change */ + if (priv->format) + add_state_change (stream, &builder, SPA_NODE_STATE_READY); + + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); + + return TRUE; +} + static gboolean do_start (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; SpaControlBuilder builder; - SpaControlCmdStateChange sc; SpaControl control; control_builder_init (stream, &builder); - sc.state = SPA_NODE_STATE_CONFIGURE; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_STATE_CHANGE, &sc); + add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -1223,46 +1421,111 @@ pinos_stream_disconnect (PinosStream *stream) } /** - * pinos_stream_peek_buffer: + * pinos_stream_get_empty_buffer: * @stream: a #PinosStream * - * Get the current buffer from @stream. This function should be called from + * Get the id of an empty buffer that can be filled + * + * Returns: the id of an empty buffer or #SPA_ID_INVALID when no buffer is + * available. + */ +guint +pinos_stream_get_empty_buffer (PinosStream *stream) +{ + PinosStreamPrivate *priv; + guint i; + + g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); + priv = stream->priv; + g_return_val_if_fail (priv->direction == PINOS_DIRECTION_OUTPUT, FALSE); + + for (i = 0; i < priv->buffer_ids->len; i++) { + BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i); + if (!bid->used) + return bid->id; + } + return SPA_ID_INVALID; +} + +/** + * pinos_stream_recycle_buffer: + * @stream: a #PinosStream + * @id: a buffer id + * + * Recycle the buffer with @id. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_stream_recycle_buffer (PinosStream *stream, + guint id) +{ + PinosStreamPrivate *priv; + + g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); + g_return_val_if_fail (id != SPA_ID_INVALID, FALSE); + priv = stream->priv; + g_return_val_if_fail (priv->direction == PINOS_DIRECTION_INPUT, FALSE); + + send_reuse_buffer (stream, 0, id); + + return TRUE; +} + +/** + * pinos_stream_peek_buffer: + * @stream: a #PinosStream + * @id: the buffer id + * + * Get the buffer with @id from @stream. This function should be called from * the new-buffer signal callback. * * Returns: a #SpaBuffer or %NULL when there is no buffer. */ SpaBuffer * -pinos_stream_peek_buffer (PinosStream *stream) +pinos_stream_peek_buffer (PinosStream *stream, guint id) { - PinosStreamPrivate *priv; + BufferId *bid; g_return_val_if_fail (PINOS_IS_STREAM (stream), NULL); - priv = stream->priv; - return priv->buffer; + if ((bid = find_buffer (stream, id))) + return bid->buf; + + return NULL; } /** * pinos_stream_send_buffer: * @stream: a #PinosStream - * @buffer: a #SpaBuffer + * @id: a buffer id + * @offset: the offset in the buffer + * @size: the size in the buffer * - * Send a buffer to @stream. + * Send a buffer with @id to @stream. * * For provider streams, this function should be called whenever there is a new frame * available. * - * For capture streams, this functions should be called for each fd-payload that - * should be released. - * - * Returns: %TRUE when @buffer was handled + * Returns: %TRUE when @id was handled */ gboolean -pinos_stream_send_buffer (PinosStream *stream, - SpaBuffer *buffer) +pinos_stream_send_buffer (PinosStream *stream, + guint id) { - g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - g_return_val_if_fail (buffer != NULL, FALSE); + PinosStreamPrivate *priv; + BufferId *bid; - return TRUE; + g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); + g_return_val_if_fail (id != SPA_ID_INVALID, FALSE); + priv = stream->priv; + g_return_val_if_fail (priv->direction == PINOS_DIRECTION_OUTPUT, FALSE); + + if ((bid = find_buffer (stream, id))) { + bid->used = TRUE; + send_process_buffer (stream, 0, id); + return TRUE; + } else { + return FALSE; + } } diff --git a/pinos/client/stream.h b/pinos/client/stream.h index d9a1eae9d..62f63eaef 100644 --- a/pinos/client/stream.h +++ b/pinos/client/stream.h @@ -103,12 +103,19 @@ gboolean pinos_stream_connect (PinosStream *stream, GPtrArray *possible_formats); gboolean pinos_stream_disconnect (PinosStream *stream); +gboolean pinos_stream_start_allocation (PinosStream *stream, + PinosProperties *props); + gboolean pinos_stream_start (PinosStream *stream); gboolean pinos_stream_stop (PinosStream *stream); -SpaBuffer * pinos_stream_peek_buffer (PinosStream *stream); +guint pinos_stream_get_empty_buffer (PinosStream *stream); +gboolean pinos_stream_recycle_buffer (PinosStream *stream, + guint id); +SpaBuffer * pinos_stream_peek_buffer (PinosStream *stream, + guint id); gboolean pinos_stream_send_buffer (PinosStream *stream, - SpaBuffer *buffer); + guint id); G_END_DECLS #endif /* __PINOS_STREAM_H__ */ diff --git a/pinos/dbus/org.pinos.xml b/pinos/dbus/org.pinos.xml index 4ac68e5ce..afb7a9548 100644 --- a/pinos/dbus/org.pinos.xml +++ b/pinos/dbus/org.pinos.xml @@ -141,12 +141,5 @@ - - - - - - - diff --git a/pinos/gst/gstpinospool.c b/pinos/gst/gstpinospool.c new file mode 100644 index 000000000..756de789d --- /dev/null +++ b/pinos/gst/gstpinospool.c @@ -0,0 +1,149 @@ +/* GStreamer + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include + +#include "gstpinospool.h" + +GST_DEBUG_CATEGORY_STATIC (gst_pinos_pool_debug_category); +#define GST_CAT_DEFAULT gst_pinos_pool_debug_category + +G_DEFINE_TYPE (GstPinosPool, gst_pinos_pool, GST_TYPE_BUFFER_POOL); + +GstPinosPool * +gst_pinos_pool_new (void) +{ + GstPinosPool *pool; + + pool = g_object_new (GST_TYPE_PINOS_POOL, NULL); + + return pool; +} + +gboolean +gst_pinos_pool_add_buffer (GstPinosPool *pool, GstBuffer *buffer) +{ + g_return_val_if_fail (GST_IS_PINOS_POOL (pool), FALSE); + g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); + + GST_OBJECT_LOCK (pool); + g_queue_push_tail (&pool->available, buffer); + g_cond_signal (&pool->cond); + GST_OBJECT_UNLOCK (pool); + + return TRUE; +} + +gboolean +gst_pinos_pool_remove_buffer (GstPinosPool *pool, GstBuffer *buffer) +{ + g_return_val_if_fail (GST_IS_PINOS_POOL (pool), FALSE); + g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); + + GST_OBJECT_LOCK (pool); + g_queue_remove (&pool->available, buffer); + GST_OBJECT_UNLOCK (pool); + + return TRUE; +} + +static GstFlowReturn +acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, + GstBufferPoolAcquireParams * params) +{ + GstPinosPool *p = GST_PINOS_POOL (pool); + + GST_OBJECT_LOCK (pool); + while (p->available.length == 0) { + g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool)); + } + *buffer = g_queue_pop_head (&p->available); + GST_OBJECT_UNLOCK (pool); + GST_DEBUG ("acquire buffer %p", *buffer); + + return GST_FLOW_OK; +} + +static void +release_buffer (GstBufferPool * pool, GstBuffer *buffer) +{ + GstPinosPool *p = GST_PINOS_POOL (pool); + + GST_DEBUG ("release buffer %p", buffer); + GST_OBJECT_LOCK (pool); + g_queue_push_tail (&p->available, buffer); + GST_OBJECT_UNLOCK (pool); +} + +static gboolean +do_start (GstBufferPool * pool) +{ + GstPinosPool *p = GST_PINOS_POOL (pool); + PinosProperties *props = NULL; + GstStructure *config; + GstCaps *caps; + guint size; + guint min_buffers; + guint max_buffers; + + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + + + pinos_stream_start_allocation (p->stream, props); + + return TRUE; +} + +static void +gst_pinos_pool_finalize (GObject * object) +{ + GstPinosPool *pool = GST_PINOS_POOL (object); + + GST_DEBUG_OBJECT (pool, "finalize"); + + G_OBJECT_CLASS (gst_pinos_pool_parent_class)->finalize (object); +} + +static void +gst_pinos_pool_class_init (GstPinosPoolClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBufferPoolClass *bufferpool_class = GST_BUFFER_POOL_CLASS (klass); + + gobject_class->finalize = gst_pinos_pool_finalize; + + bufferpool_class->start = do_start; + bufferpool_class->acquire_buffer = acquire_buffer; + bufferpool_class->release_buffer = release_buffer; + + GST_DEBUG_CATEGORY_INIT (gst_pinos_pool_debug_category, "pinospool", 0, + "debug category for pinospool object"); +} + +static void +gst_pinos_pool_init (GstPinosPool * pool) +{ + g_cond_init (&pool->cond); + g_queue_init (&pool->available); +} diff --git a/pinos/gst/gstpinospool.h b/pinos/gst/gstpinospool.h new file mode 100644 index 000000000..5a42b2dca --- /dev/null +++ b/pinos/gst/gstpinospool.h @@ -0,0 +1,66 @@ +/* GStreamer + * Copyright (C) <2016> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_PINOS_POOL_H__ +#define __GST_PINOS_POOL_H__ + +#include + +#include + +G_BEGIN_DECLS + +#define GST_TYPE_PINOS_POOL \ + (gst_pinos_pool_get_type()) +#define GST_PINOS_POOL(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PINOS_POOL,GstPinosPool)) +#define GST_PINOS_POOL_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PINOS_POOL,GstPinosPoolClass)) +#define GST_IS_PINOS_POOL(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PINOS_POOL)) +#define GST_IS_PINOS_POOL_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PINOS_POOL)) +#define GST_PINOS_POOL_GET_CLASS(klass) \ + (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PINOS_POOL, GstPinosPoolClass)) + +typedef struct _GstPinosPool GstPinosPool; +typedef struct _GstPinosPoolClass GstPinosPoolClass; + +struct _GstPinosPool { + GstBufferPool parent; + + PinosStream *stream; + GQueue available; + GCond cond; +}; + +struct _GstPinosPoolClass { + GstBufferPoolClass parent_class; +}; + +GType gst_pinos_pool_get_type (void); + +GstPinosPool * gst_pinos_pool_new (void); + +gboolean gst_pinos_pool_add_buffer (GstPinosPool *pool, GstBuffer *buffer); +gboolean gst_pinos_pool_remove_buffer (GstPinosPool *pool, GstBuffer *buffer); + +G_END_DECLS + +#endif /* __GST_PINOS_POOL_H__ */ diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 2b464b719..d1029a930 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -48,6 +48,7 @@ #include "gsttmpfileallocator.h" #include "gstpinosformat.h" +static GQuark process_mem_data_quark; GST_DEBUG_CATEGORY_STATIC (pinos_sink_debug); #define GST_CAT_DEFAULT pinos_sink_debug @@ -120,8 +121,9 @@ gst_pinos_sink_finalize (GObject * object) if (pinossink->properties) gst_structure_free (pinossink->properties); - g_hash_table_unref (pinossink->mem_ids); g_object_unref (pinossink->allocator); + g_object_unref (pinossink->pool); + g_hash_table_unref (pinossink->buf_ids); g_free (pinossink->path); g_free (pinossink->client_name); @@ -133,7 +135,7 @@ gst_pinos_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) { GstPinosSink *pinossink = GST_PINOS_SINK (bsink); - gst_query_add_allocation_param (query, pinossink->allocator, NULL); + gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pinossink->pool), 0, 0, 0); return TRUE; } @@ -207,18 +209,22 @@ gst_pinos_sink_class_init (GstPinosSinkClass * klass) GST_DEBUG_CATEGORY_INIT (pinos_sink_debug, "pinossink", 0, "Pinos Sink"); + + process_mem_data_quark = g_quark_from_static_string ("GstPinosSinkProcessMemQuark"); } static void gst_pinos_sink_init (GstPinosSink * sink) { sink->allocator = gst_tmpfile_allocator_new (); - sink->fdmanager = pinos_fd_manager_get (PINOS_FD_MANAGER_DEFAULT); + sink->pool = gst_pinos_pool_new (); sink->client_name = pinos_client_name(); sink->mode = DEFAULT_PROP_MODE; + g_queue_init (&sink->empty); + g_queue_init (&sink->filled); - sink->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - (GDestroyNotify) gst_memory_unref); + sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) gst_buffer_unref); } static GstCaps * @@ -327,58 +333,128 @@ gst_pinos_sink_get_property (GObject * object, guint prop_id, } } +typedef struct { + GstPinosSink *sink; + guint id; + SpaMetaHeader *header; + guint flags; +} ProcessMemData; + static void -on_new_buffer (GObject *gobject, +process_mem_data_destroy (gpointer user_data) +{ + ProcessMemData *data = user_data; + + gst_object_unref (data->sink); + g_slice_free (ProcessMemData, data); +} + +static void +on_add_buffer (GObject *gobject, + guint id, gpointer user_data) { GstPinosSink *pinossink = user_data; SpaBuffer *b; + GstBuffer *buf; + unsigned int i; + ProcessMemData data; + + GST_LOG_OBJECT (pinossink, "add buffer"); + + if (!(b = pinos_stream_peek_buffer (pinossink->stream, id))) { + g_warning ("failed to peek buffer"); + return; + } + + buf = gst_buffer_new (); + + data.sink = gst_object_ref (pinossink); + data.id = id; + data.header = NULL; + + for (i = 0; i < b->n_metas; i++) { + SpaMeta *m = &SPA_BUFFER_METAS(b)[i]; + + switch (m->type) { + case SPA_META_TYPE_HEADER: + data.header = SPA_MEMBER (b, m->offset, SpaMetaHeader); + break; + default: + break; + } + } + for (i = 0; i < b->n_datas; i++) { + SpaData *d = &SPA_BUFFER_DATAS (b)[i]; + SpaMemory *mem; + + mem = spa_memory_find (&d->mem.mem); + + if (mem->fd) { + GstMemory *fdmem = NULL; + + fdmem = gst_fd_allocator_alloc (pinossink->allocator, dup (mem->fd), + d->mem.offset + d->mem.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, d->mem.offset, d->mem.size); + gst_buffer_append_memory (buf, fdmem); + } else { + gst_buffer_append_memory (buf, + gst_memory_new_wrapped (0, mem->ptr, mem->size, d->mem.offset, + d->mem.size, NULL, NULL)); + } + } + data.flags = GST_BUFFER_FLAGS (buf); + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), + process_mem_data_quark, + g_slice_dup (ProcessMemData, &data), + process_mem_data_destroy); + + gst_pinos_pool_add_buffer (GST_PINOS_POOL (pinossink->pool), buf); + g_hash_table_insert (pinossink->buf_ids, GINT_TO_POINTER (id), buf); + + g_queue_push_tail (&pinossink->empty, buf); + pinos_main_loop_signal (pinossink->loop, FALSE); +} + +static void +on_remove_buffer (GObject *gobject, + guint id, + gpointer user_data) +{ + GstPinosSink *pinossink = user_data; + GstBuffer *buf; + + GST_LOG_OBJECT (pinossink, "remove buffer"); + buf = g_hash_table_lookup (pinossink->buf_ids, GINT_TO_POINTER (id)); + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + + gst_pinos_pool_remove_buffer (GST_PINOS_POOL (pinossink->pool), buf); + g_queue_remove (&pinossink->empty, buf); + g_queue_remove (&pinossink->filled, buf); + g_hash_table_remove (pinossink->buf_ids, GINT_TO_POINTER (id)); +} + +static void +on_new_buffer (GObject *gobject, + guint id, + gpointer user_data) +{ + GstPinosSink *pinossink = user_data; + GstBuffer *buf; GST_LOG_OBJECT (pinossink, "got new buffer"); if (pinossink->stream == NULL) { GST_LOG_OBJECT (pinossink, "no stream"); return; } + buf = g_hash_table_lookup (pinossink->buf_ids, GINT_TO_POINTER (id)); - if (!(b = pinos_stream_peek_buffer (pinossink->stream))) { - g_warning ("failed to capture buffer"); - return; + g_debug ("recycle buffer %d %p", id, buf); + if (buf) { + g_queue_remove (&pinossink->filled, buf); + g_queue_push_tail (&pinossink->empty, buf); + pinos_main_loop_signal (pinossink->loop, FALSE); } - -#if 0 - pinos_buffer_iter_init (&it, pbuf); - while (pinos_buffer_iter_next (&it)) { - switch (pinos_buffer_iter_get_type (&it)) { - case PINOS_PACKET_TYPE_REUSE_MEM: - { - PinosPacketReuseMem p; - - if (!pinos_buffer_iter_parse_reuse_mem (&it, &p)) - continue; - - GST_LOG ("mem index %d is reused", p.id); - g_hash_table_remove (pinossink->mem_ids, GINT_TO_POINTER (p.id)); - break; - } - case PINOS_PACKET_TYPE_REFRESH_REQUEST: - { - PinosPacketRefreshRequest p; - - if (!pinos_buffer_iter_parse_refresh_request (&it, &p)) - continue; - - GST_LOG ("refresh request"); - gst_pad_push_event (GST_BASE_SINK_PAD (pinossink), - gst_video_event_new_upstream_force_key_unit (p.pts, - p.request_type == 1, 0)); - break; - } - default: - break; - } - } - pinos_buffer_iter_end (&it); -#endif } static void @@ -409,6 +485,19 @@ on_stream_notify (GObject *gobject, pinos_main_loop_signal (pinossink->loop, FALSE); } +static void +on_format_notify (GObject *gobject, + GParamSpec *pspec, + gpointer user_data) +{ + GstPinosSink *pinossink = user_data; + SpaFormat *format; + PinosProperties *props = NULL; + + g_object_get (gobject, "format", &format, NULL); + +} + static gboolean gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) { @@ -452,7 +541,9 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) pinos_main_loop_wait (pinossink->loop); } } + res = TRUE; +#if 0 if (state != PINOS_STREAM_STATE_STREAMING) { res = pinos_stream_start (pinossink->stream); @@ -468,6 +559,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) pinos_main_loop_wait (pinossink->loop); } } +#endif pinos_main_loop_unlock (pinossink->loop); pinossink->negotiated = res; @@ -483,115 +575,54 @@ start_error: } } -typedef struct { - SpaBuffer buffer; - SpaMeta metas[1]; - SpaMetaHeader header; - SpaData datas[1]; - GstMemory *mem; - GstPinosSink *pinossink; - int fd; -} SinkBuffer; - static GstFlowReturn gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstPinosSink *pinossink; - SinkBuffer *b; - GstMemory *mem = NULL; - GstClockTime pts, dts, base; - gsize size; gboolean res; + ProcessMemData *data; pinossink = GST_PINOS_SINK (bsink); if (!pinossink->negotiated) goto not_negotiated; - base = GST_ELEMENT_CAST (bsink)->base_time; - - size = gst_buffer_get_size (buffer); - - b = g_slice_new (SinkBuffer); - b->buffer.id = pinos_fd_manager_get_id (pinossink->fdmanager); - b->buffer.mem.mem.pool_id = SPA_ID_INVALID; - b->buffer.mem.mem.id = SPA_ID_INVALID; - b->buffer.mem.offset = 0; - b->buffer.mem.size = sizeof (SinkBuffer); - b->buffer.n_metas = 1; - b->buffer.metas = offsetof (SinkBuffer, metas); - b->buffer.n_datas = 1; - b->buffer.datas = offsetof (SinkBuffer, datas); - - pts = GST_BUFFER_PTS (buffer); - dts = GST_BUFFER_DTS (buffer); - if (!GST_CLOCK_TIME_IS_VALID (pts)) - pts = dts; - else if (!GST_CLOCK_TIME_IS_VALID (dts)) - dts = pts; - - b->header.flags = 0; - b->header.seq = GST_BUFFER_OFFSET (buffer); - b->header.pts = GST_CLOCK_TIME_IS_VALID (pts) ? pts + base : base; - b->header.dts_offset = GST_CLOCK_TIME_IS_VALID (dts) && GST_CLOCK_TIME_IS_VALID (pts) ? pts - dts : 0; - b->metas[0].type = SPA_META_TYPE_HEADER; - b->metas[0].offset = offsetof (SinkBuffer, header); - b->metas[0].size = sizeof (b->header); - - if (gst_buffer_n_memory (buffer) == 1 - && gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) { - mem = gst_buffer_get_memory (buffer, 0); - } else { - GstMapInfo minfo; - GstAllocationParams params = {0, 0, 0, 0, { NULL, }}; - - GST_INFO_OBJECT (bsink, "Buffer cannot be payloaded without copying"); - - mem = gst_allocator_alloc (pinossink->allocator, size, ¶ms); - if (!gst_memory_map (mem, &minfo, GST_MAP_WRITE)) - goto map_error; - gst_buffer_extract (buffer, 0, minfo.data, size); - gst_memory_unmap (mem, &minfo); - } - pinos_main_loop_lock (pinossink->loop); if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING) goto streaming_error; - b->mem = mem; - b->fd = gst_fd_memory_get_fd (mem); + if (buffer->pool != GST_BUFFER_POOL_CAST (pinossink->pool)) { + GstBuffer *b = NULL; - b->datas[0].mem.mem.pool_id = SPA_ID_INVALID; - b->datas[0].mem.mem.id = SPA_ID_INVALID; - b->datas[0].mem.offset = mem->offset; - b->datas[0].mem.size = mem->size; - b->datas[0].stride = 0; + while (TRUE) { + b = g_queue_peek_head (&pinossink->empty); + if (b) + break; - if (!(res = pinos_stream_send_buffer (pinossink->stream, &b->buffer))) + pinos_main_loop_wait (pinossink->loop); + } + g_queue_push_tail (&pinossink->filled, b); + + buffer = b; + } + + data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), + process_mem_data_quark); + + if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) g_warning ("can't send buffer"); pinos_main_loop_unlock (pinossink->loop); - /* keep the memory around until we get the reuse mem message */ - g_hash_table_insert (pinossink->mem_ids, GINT_TO_POINTER (b->buffer.id), b); - return GST_FLOW_OK; not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } -map_error: - { - GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, - ("failed to map buffer"), (NULL)); - gst_memory_unref (mem); - return GST_FLOW_ERROR; - } streaming_error: { pinos_main_loop_unlock (pinossink->loop); - gst_memory_unref (mem); return GST_FLOW_ERROR; } } @@ -627,7 +658,11 @@ gst_pinos_sink_start (GstBaseSink * basesink) pinos_main_loop_lock (pinossink->loop); pinossink->stream = pinos_stream_new (pinossink->ctx, pinossink->client_name, props); + pinossink->pool->stream = pinossink->stream; g_signal_connect (pinossink->stream, "notify::state", (GCallback) on_stream_notify, pinossink); + g_signal_connect (pinossink->stream, "notify::format", (GCallback) on_format_notify, pinossink); + g_signal_connect (pinossink->stream, "add-buffer", (GCallback) on_add_buffer, pinossink); + g_signal_connect (pinossink->stream, "remove-buffer", (GCallback) on_remove_buffer, pinossink); g_signal_connect (pinossink->stream, "new-buffer", (GCallback) on_new_buffer, pinossink); pinos_main_loop_unlock (pinossink->loop); @@ -644,6 +679,7 @@ gst_pinos_sink_stop (GstBaseSink * basesink) pinos_stream_stop (pinossink->stream); pinos_stream_disconnect (pinossink->stream); g_clear_object (&pinossink->stream); + pinossink->pool->stream = NULL; } pinos_main_loop_unlock (pinossink->loop); @@ -787,10 +823,10 @@ gst_pinos_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: - g_hash_table_remove_all (this->mem_ids); + g_hash_table_remove_all (this->buf_ids); break; case GST_STATE_CHANGE_READY_TO_NULL: - g_hash_table_remove_all (this->mem_ids); + g_hash_table_remove_all (this->buf_ids); gst_pinos_sink_close (this); break; default: diff --git a/pinos/gst/gstpinossink.h b/pinos/gst/gstpinossink.h index d0708b6b6..757c90c92 100644 --- a/pinos/gst/gstpinossink.h +++ b/pinos/gst/gstpinossink.h @@ -24,6 +24,7 @@ #include #include +#include G_BEGIN_DECLS @@ -84,8 +85,11 @@ struct _GstPinosSink { GstStructure *properties; GstPinosSinkMode mode; - PinosFdManager *fdmanager; - GHashTable *mem_ids; + GstPinosPool *pool; + GHashTable *buf_ids; + + GQueue empty; + GQueue filled; }; struct _GstPinosSinkClass { diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index ed5d8c2ef..3302559d9 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -187,7 +187,7 @@ gst_pinos_src_finalize (GObject * object) gst_object_unref (pinossrc->clock); g_free (pinossrc->path); g_free (pinossrc->client_name); - g_hash_table_unref (pinossrc->mem_ids); + g_hash_table_unref (pinossrc->buf_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -274,7 +274,7 @@ gst_pinos_src_init (GstPinosSrc * src) src->fd_allocator = gst_fd_allocator_new (); src->client_name = pinos_client_name (); - src->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_memory_unref); + src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref); } static GstCaps * @@ -326,7 +326,9 @@ gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) typedef struct { GstPinosSrc *src; - SpaBuffer *buffer; + guint id; + SpaMetaHeader *header; + guint flags; } ProcessMemData; static void @@ -338,8 +340,24 @@ process_mem_data_destroy (gpointer user_data) g_slice_free (ProcessMemData, data); } +static gboolean +buffer_recycle (GstMiniObject *obj) +{ + ProcessMemData *data; + + gst_mini_object_ref (obj); + data = gst_mini_object_get_qdata (obj, + process_mem_data_quark); + GST_BUFFER_FLAGS (obj) = data->flags; + + pinos_stream_recycle_buffer (data->src->stream, data->id); + + return FALSE; +} + static void -on_new_buffer (GObject *gobject, +on_add_buffer (GObject *gobject, + guint id, gpointer user_data) { GstPinosSrc *pinossrc = user_data; @@ -348,39 +366,27 @@ on_new_buffer (GObject *gobject, unsigned int i; ProcessMemData data; - GST_LOG_OBJECT (pinossrc, "got new buffer"); - if (!(b = pinos_stream_peek_buffer (pinossrc->stream))) { - g_warning ("failed to capture buffer"); + GST_LOG_OBJECT (pinossrc, "add buffer"); + + if (!(b = pinos_stream_peek_buffer (pinossrc->stream, id))) { + g_warning ("failed to peek buffer"); return; } buf = gst_buffer_new (); + GST_MINI_OBJECT_CAST (buf)->dispose = buffer_recycle; data.src = gst_object_ref (pinossrc); - data.buffer = b; - gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark, - g_slice_dup (ProcessMemData, &data), - process_mem_data_destroy); + data.id = id; + data.header = NULL; for (i = 0; i < b->n_metas; i++) { SpaMeta *m = &SPA_BUFFER_METAS(b)[i]; switch (m->type) { case SPA_META_TYPE_HEADER: - { - SpaMetaHeader *h = SPA_MEMBER (b, m->offset, SpaMetaHeader); - - GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset); - - if (GST_CLOCK_TIME_IS_VALID (h->pts)) { - GST_BUFFER_PTS (buf) = h->pts; - if (GST_BUFFER_PTS (buf) + h->dts_offset > 0) - GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + h->dts_offset; - } - GST_BUFFER_OFFSET (buf) = h->seq; + data.header = SPA_MEMBER (b, m->offset, SpaMetaHeader); break; - } default: break; } @@ -404,8 +410,59 @@ on_new_buffer (GObject *gobject, d->mem.size, NULL, NULL)); } } + data.flags = GST_BUFFER_FLAGS (buf); + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), + process_mem_data_quark, + g_slice_dup (ProcessMemData, &data), + process_mem_data_destroy); + + g_hash_table_insert (pinossrc->buf_ids, GINT_TO_POINTER (id), buf); +} + +static void +on_remove_buffer (GObject *gobject, + guint id, + gpointer user_data) +{ + GstPinosSrc *pinossrc = user_data; + GstBuffer *buf; + + GST_LOG_OBJECT (pinossrc, "remove buffer"); + buf = g_hash_table_lookup (pinossrc->buf_ids, GINT_TO_POINTER (id)); + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + + g_hash_table_remove (pinossrc->buf_ids, GINT_TO_POINTER (id)); +} + +static void +on_new_buffer (GObject *gobject, + guint id, + gpointer user_data) +{ + GstPinosSrc *pinossrc = user_data; + GstBuffer *buf; + + GST_LOG_OBJECT (pinossrc, "got new buffer"); + buf = g_hash_table_lookup (pinossrc->buf_ids, GINT_TO_POINTER (id)); if (buf) { + ProcessMemData *data; + SpaMetaHeader *h; + + data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), + process_mem_data_quark); + h = data->header; + if (h) { + GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset); + + if (GST_CLOCK_TIME_IS_VALID (h->pts)) { + GST_BUFFER_PTS (buf) = h->pts; + if (GST_BUFFER_PTS (buf) + h->dts_offset > 0) + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + h->dts_offset; + } + GST_BUFFER_OFFSET (buf) = h->seq; + } + g_queue_push_tail (&pinossrc->queue, buf); pinos_main_loop_signal (pinossrc->loop, FALSE); @@ -486,7 +543,6 @@ parse_stream_properties (GstPinosSrc *pinossrc, PinosProperties *props) static gboolean gst_pinos_src_stream_start (GstPinosSrc *pinossrc) { - SpaFormat *format; gboolean res; PinosProperties *props; @@ -505,17 +561,8 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc) } g_object_get (pinossrc->stream, "properties", &props, NULL); - g_object_get (pinossrc->stream, "format", &format, NULL); - pinos_main_loop_unlock (pinossrc->loop); - if (format) { - GstCaps *caps = gst_caps_from_format (format); - gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); - gst_caps_unref (caps); - spa_format_unref (format); - } - parse_stream_properties (pinossrc, props); pinos_properties_free (props); @@ -675,6 +722,11 @@ on_format_notify (GObject *gobject, caps = gst_caps_from_format (format); gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); gst_caps_unref (caps); + + + + + pinos_stream_start_allocation (pinossrc->stream, NULL); } static gboolean @@ -944,6 +996,8 @@ gst_pinos_src_open (GstPinosSrc * pinossrc) pinossrc->stream = pinos_stream_new (pinossrc->ctx, pinossrc->client_name, props); g_signal_connect (pinossrc->stream, "notify::state", (GCallback) on_stream_notify, pinossrc); g_signal_connect (pinossrc->stream, "notify::format", (GCallback) on_format_notify, pinossrc); + g_signal_connect (pinossrc->stream, "add-buffer", (GCallback) on_add_buffer, pinossrc); + g_signal_connect (pinossrc->stream, "remove-buffer", (GCallback) on_remove_buffer, pinossrc); g_signal_connect (pinossrc->stream, "new-buffer", (GCallback) on_new_buffer, pinossrc); pinos_main_loop_unlock (pinossrc->loop); diff --git a/pinos/gst/gstpinossrc.h b/pinos/gst/gstpinossrc.h index a7cf62a32..74ee95499 100644 --- a/pinos/gst/gstpinossrc.h +++ b/pinos/gst/gstpinossrc.h @@ -71,7 +71,7 @@ struct _GstPinosSrc { GstAllocator *fd_allocator; GstStructure *properties; - GHashTable *mem_ids; + GHashTable *buf_ids; GQueue queue; GstClock *clock; }; diff --git a/pinos/modules/gst/gst-sink.c b/pinos/modules/gst/gst-sink.c index 045574ef4..5ba926b74 100644 --- a/pinos/modules/gst/gst-sink.c +++ b/pinos/modules/gst/gst-sink.c @@ -110,7 +110,7 @@ bus_handler (GstBus *bus, GST_INFO ("clock lost %s", GST_OBJECT_NAME (clock)); g_object_get (node, "properties", &props, NULL); - pinos_properties_remove (props, "gst.pipeline.clock"); + pinos_properties_set (props, "gst.pipeline.clock", NULL); g_object_set (node, "properties", props, NULL); pinos_properties_free (props); diff --git a/pinos/modules/gst/gst-source.c b/pinos/modules/gst/gst-source.c index 6ca515631..b043f9d1e 100644 --- a/pinos/modules/gst/gst-source.c +++ b/pinos/modules/gst/gst-source.c @@ -110,7 +110,7 @@ bus_handler (GstBus *bus, GST_INFO ("clock lost %s", GST_OBJECT_NAME (clock)); g_object_get (node, "properties", &props, NULL); - pinos_properties_remove (props, "gst.pipeline.clock"); + pinos_properties_set (props, "gst.pipeline.clock", NULL); g_object_set (node, "properties", props, NULL); pinos_properties_free (props); diff --git a/pinos/modules/spa/spa-alsa-sink.c b/pinos/modules/spa/spa-alsa-sink.c index 91bb66381..0b7887517 100644 --- a/pinos/modules/spa/spa-alsa-sink.c +++ b/pinos/modules/spa/spa-alsa-sink.c @@ -159,20 +159,15 @@ on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) PinosSpaAlsaSinkPrivate *priv = this->priv; switch (event->type) { - case SPA_EVENT_TYPE_PULL_INPUT: + case SPA_EVENT_TYPE_NEED_INPUT: { SpaInputInfo iinfo; SpaResult res; PinosRingbufferArea areas[2]; uint8_t *data; size_t size, towrite, total; - SpaEventPullInput *pi; - pi = event->data; - - g_debug ("pull ringbuffer %zd", pi->size); - - size = pi->size; + size = 0; data = NULL; pinos_ringbuffer_get_read_areas (priv->ringbuffer, areas); @@ -194,8 +189,6 @@ on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) iinfo.port_id = event->port_id; iinfo.flags = SPA_INPUT_FLAG_NONE; iinfo.buffer_id = 0; - iinfo.offset = 0; - iinfo.size = total; g_debug ("push sink %d", iinfo.buffer_id); if ((res = spa_node_port_push_input (node, 1, &iinfo)) < 0) @@ -223,7 +216,13 @@ on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) } break; } + case SPA_EVENT_TYPE_STATE_CHANGE: + { + SpaEventStateChange *sc = event->data; + pinos_node_update_node_state (PINOS_NODE (this), sc->state); + break; + } default: g_debug ("got event %d", event->type); break; @@ -244,7 +243,7 @@ setup_node (PinosSpaAlsaSink *this) g_debug ("got get_props error %d", res); value.type = SPA_PROP_TYPE_STRING; - value.value = "hw:0"; + value.value = "hw:1"; value.size = strlen (value.value)+1; spa_props_set_prop (props, spa_props_index_for_name (props, "device"), &value); @@ -376,7 +375,7 @@ on_received_buffer (PinosPort *port, PinosSpaAlsaSink *this = user_data; PinosSpaAlsaSinkPrivate *priv = this->priv; unsigned int i; - SpaBuffer *buffer = NULL; + SpaBuffer *buffer = port->buffers[buffer_id]; for (i = 0; i < buffer->n_datas; i++) { SpaData *d = SPA_BUFFER_DATAS (buffer); diff --git a/pinos/modules/spa/spa-v4l2-source.c b/pinos/modules/spa/spa-v4l2-source.c index 1d248f657..08e3fc1de 100644 --- a/pinos/modules/spa/spa-v4l2-source.c +++ b/pinos/modules/spa/spa-v4l2-source.c @@ -139,7 +139,7 @@ on_source_event (SpaNode *node, SpaEvent *event, void *user_data) PinosSpaV4l2SourcePrivate *priv = this->priv; switch (event->type) { - case SPA_EVENT_TYPE_CAN_PULL_OUTPUT: + case SPA_EVENT_TYPE_HAVE_OUTPUT: { SpaOutputInfo info[1] = { 0, }; SpaResult res; @@ -188,6 +188,13 @@ on_source_event (SpaNode *node, SpaEvent *event, void *user_data) } break; } + case SPA_EVENT_TYPE_STATE_CHANGE: + { + SpaEventStateChange *sc = event->data; + + pinos_node_update_node_state (PINOS_NODE (this), sc->state); + break; + } default: g_debug ("got event %d", event->type); break; @@ -208,7 +215,7 @@ setup_node (PinosSpaV4l2Source *this) g_debug ("got get_props error %d", res); value.type = SPA_PROP_TYPE_STRING; - value.value = "/dev/video0"; + value.value = "/dev/video1"; value.size = strlen (value.value)+1; spa_props_set_prop (props, spa_props_index_for_name (props, "device"), &value); @@ -363,9 +370,7 @@ on_received_event (PinosPort *port, SpaEvent *event, GError **error, gpointer us if ((res = spa_node_port_reuse_buffer (node->node, event->port_id, - rb->buffer_id, - rb->offset, - rb->size)) < 0) + rb->buffer_id)) < 0) g_warning ("client-node %p: error reuse buffer: %d", node, res); break; } diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 2a465f830..93b9e6da1 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -180,8 +180,6 @@ on_received_buffer (PinosPort *port, uint32_t buffer_id, GError **error, gpointe info[0].port_id = port->id; info[0].buffer_id = buffer_id; info[0].flags = SPA_INPUT_FLAG_NONE; - info[0].offset = 0; - info[0].size = -1; if ((res = spa_node_port_push_input (node->node, 1, info)) < 0) g_warning ("client-node %p: error pushing buffer: %d, %d", node, res, info[0].status); @@ -296,6 +294,8 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data) { SpaEventStateChange *sc = event->data; + pinos_node_update_node_state (PINOS_NODE (this), sc->state); + switch (sc->state) { case SPA_NODE_STATE_CONFIGURE: { @@ -329,6 +329,24 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data) stop_thread (this); break; } + case SPA_EVENT_TYPE_HAVE_OUTPUT: + { + PinosPort *port; + SpaOutputInfo info[1] = { 0, }; + SpaResult res; + GError *error = NULL; + + if ((res = spa_node_port_pull_output (node, 1, info)) < 0) + g_debug ("client-node %p: got pull error %d, %d", this, res, info[0].status); + + port = pinos_node_find_port (PINOS_NODE (this), info[0].port_id); + + if (!pinos_port_send_buffer (port, info[0].buffer_id, &error)) { + g_debug ("send failed: %s", error->message); + g_clear_error (&error); + } + break; + } case SPA_EVENT_TYPE_REUSE_BUFFER: { PinosPort *port; diff --git a/pinos/server/link.c b/pinos/server/link.c index 47d499905..2f8bdc9a8 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -56,8 +56,13 @@ struct _PinosLinkPrivate SpaNode *input_node; uint32_t input_port; - SpaBuffer *buffers[16]; - unsigned int n_buffers; + SpaNodeState input_state; + SpaNodeState output_state; + + SpaBuffer *in_buffers[16]; + unsigned int n_in_buffers; + SpaBuffer *out_buffers[16]; + unsigned int n_out_buffers; }; G_DEFINE_TYPE (PinosLink, pinos_link, G_TYPE_OBJECT); @@ -268,8 +273,9 @@ do_allocation (PinosLink *this) PinosLinkPrivate *priv = this->priv; SpaResult res; const SpaPortInfo *iinfo, *oinfo; + SpaPortInfoFlags in_flags, out_flags; - g_debug ("link %p: doing alloc buffers", this); + g_debug ("link %p: doing alloc buffers %p %p", this, priv->output_node, priv->input_node); /* find out what's possible */ if ((res = spa_node_port_get_info (priv->output_node, priv->output_port, &oinfo)) < 0) { g_warning ("error get port info: %d", res); @@ -280,18 +286,79 @@ do_allocation (PinosLink *this) return res; } - priv->n_buffers = 16; - if ((res = spa_node_port_alloc_buffers (priv->output_node, priv->output_port, - iinfo->params, iinfo->n_params, - priv->buffers, &priv->n_buffers)) < 0) { - g_warning ("error alloc buffers: %d", res); - return res; + spa_debug_port_info (oinfo); + spa_debug_port_info (iinfo); + + priv->n_in_buffers = 16; + priv->n_out_buffers = 16; + + if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) && + (iinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) { + out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + } else if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) && + (iinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) { + out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + } else if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) && + (iinfo->flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) { + out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + + if ((res = spa_buffer_alloc (oinfo->params, oinfo->n_params, + priv->in_buffers, + &priv->n_in_buffers)) < 0) { + g_warning ("error alloc buffers: %d", res); + return res; + } + memcpy (priv->out_buffers, priv->in_buffers, priv->n_in_buffers * sizeof (SpaBuffer*)); + priv->n_out_buffers = priv->n_in_buffers; + } else if ((oinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) && + (iinfo->flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) { + out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + } else { + g_warning ("error no common allocation found"); + return SPA_RESULT_ERROR; } - if ((res = spa_node_port_use_buffers (priv->input_node, priv->input_port, - priv->buffers, priv->n_buffers)) < 0) { - g_warning ("error alloc buffers: %d", res); - return res; + if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { + if ((res = spa_node_port_alloc_buffers (priv->input_node, priv->input_port, + oinfo->params, oinfo->n_params, + priv->in_buffers, &priv->n_in_buffers)) < 0) { + g_warning ("error alloc buffers: %d", res); + return res; + } + priv->input->n_buffers = priv->n_in_buffers; + priv->input->buffers = priv->in_buffers; + } + if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { + if ((res = spa_node_port_alloc_buffers (priv->output_node, priv->output_port, + iinfo->params, iinfo->n_params, + priv->out_buffers, &priv->n_out_buffers)) < 0) { + g_warning ("error alloc buffers: %d", res); + return res; + } + priv->output->n_buffers = priv->n_out_buffers; + priv->output->buffers = priv->out_buffers; + } + if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { + if ((res = spa_node_port_use_buffers (priv->input_node, priv->input_port, + priv->out_buffers, priv->n_out_buffers)) < 0) { + g_warning ("error use buffers: %d", res); + return res; + } + priv->input->n_buffers = priv->n_out_buffers; + priv->input->buffers = priv->out_buffers; + } + if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { + if ((res = spa_node_port_use_buffers (priv->output_node, priv->output_port, + priv->in_buffers, priv->n_in_buffers)) < 0) { + g_warning ("error use buffers: %d", res); + return res; + } + priv->output->n_buffers = priv->n_in_buffers; + priv->output->buffers = priv->in_buffers; } priv->allocated = TRUE; @@ -299,13 +366,86 @@ do_allocation (PinosLink *this) return SPA_RESULT_OK; } +static SpaResult +do_start (PinosLink *this) +{ + PinosLinkPrivate *priv = this->priv; + SpaCommand cmd; + SpaResult res; + + cmd.type = SPA_COMMAND_START; + if ((res = spa_node_send_command (priv->input_node, &cmd)) < 0) + g_warning ("got error %d", res); + if ((res = spa_node_send_command (priv->output_node, &cmd)) < 0) + g_warning ("got error %d", res); + + return res; +} + +static SpaResult +do_stop (PinosLink *this) +{ + PinosLinkPrivate *priv = this->priv; + SpaCommand cmd; + SpaResult res; + + cmd.type = SPA_COMMAND_STOP; + if ((res = spa_node_send_command (priv->input_node, &cmd)) < 0) + g_warning ("got error %d", res); + if ((res = spa_node_send_command (priv->output_node, &cmd)) < 0) + g_warning ("got error %d", res); + + return res; +} + +static SpaResult +check_states (PinosLink *this) +{ + PinosLinkPrivate *priv = this->priv; + SpaResult res; + + g_debug ("link %p: input %d, output %d", this, priv->input_state, priv->output_state); + + if (priv->input_state == SPA_NODE_STATE_CONFIGURE && + priv->output_state == SPA_NODE_STATE_CONFIGURE && + !priv->negotiated) { + if ((res = do_negotiate (this)) < 0) + return res; + } + if (priv->input_state == SPA_NODE_STATE_READY && + priv->output_state == SPA_NODE_STATE_READY && + !priv->allocated) { + if ((res = do_allocation (this)) < 0) + return res; + + if ((res = do_start (this)) < 0) + return res; + } + return SPA_RESULT_OK; +} + +static void +on_node_state_notify (GObject *obj, + GParamSpec *pspec, + gpointer user_data) +{ + PinosLink *this = user_data; + PinosLinkPrivate *priv = this->priv; + + g_debug ("link %p: node %p state change", this, obj); + if (obj == G_OBJECT (priv->input->node)) + priv->input_state = priv->input->node->node_state; + else + priv->output_state = priv->output->node->node_state; + + check_states (this); +} + static gboolean on_activate (PinosPort *port, gpointer user_data) { PinosLink *this = user_data; PinosLinkPrivate *priv = this->priv; - SpaCommand cmd; - SpaResult res; if (priv->active) return TRUE; @@ -316,18 +456,7 @@ on_activate (PinosPort *port, gpointer user_data) else pinos_port_activate (priv->input); - if (!priv->negotiated) - do_negotiate (this); - - /* negotiate allocation */ - if (!priv->allocated) - do_allocation (this); - - cmd.type = SPA_COMMAND_START; - if ((res = spa_node_send_command (priv->input_node, &cmd)) < 0) - g_warning ("got error %d", res); - if ((res = spa_node_send_command (priv->output_node, &cmd)) < 0) - g_warning ("got error %d", res); + check_states (this); return TRUE; } @@ -335,10 +464,8 @@ on_activate (PinosPort *port, gpointer user_data) static gboolean on_deactivate (PinosPort *port, gpointer user_data) { - PinosLink *link = user_data; - PinosLinkPrivate *priv = link->priv; - SpaCommand cmd; - SpaResult res; + PinosLink *this = user_data; + PinosLinkPrivate *priv = this->priv; if (!priv->active) return TRUE; @@ -349,11 +476,7 @@ on_deactivate (PinosPort *port, gpointer user_data) else pinos_port_deactivate (priv->input); - cmd.type = SPA_COMMAND_STOP; - if ((res = spa_node_send_command (priv->input_node, &cmd)) < 0) - g_warning ("got error %d", res); - if ((res = spa_node_send_command (priv->output_node, &cmd)) < 0) - g_warning ("got error %d", res); + do_stop (this); return TRUE; } @@ -363,8 +486,8 @@ on_property_notify (GObject *obj, GParamSpec *pspec, gpointer user_data) { - PinosLink *link = user_data; - PinosLinkPrivate *priv = link->priv; + PinosLink *this = user_data; + PinosLinkPrivate *priv = this->priv; if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "output") == 0) { gchar *port = g_strdup_printf ("%s:%d", pinos_node_get_object_path (priv->output->node), @@ -384,32 +507,38 @@ on_property_notify (GObject *obj, static void pinos_link_constructed (GObject * object) { - PinosLink *link = PINOS_LINK (object); - PinosLinkPrivate *priv = link->priv; + PinosLink *this = PINOS_LINK (object); + PinosLinkPrivate *priv = this->priv; priv->output_id = pinos_port_add_send_cb (priv->output, on_output_buffer, on_output_event, - link, + this, NULL); priv->input_id = pinos_port_add_send_cb (priv->input, on_input_buffer, on_input_event, - link, + this, NULL); - g_signal_connect (priv->input, "activate", (GCallback) on_activate, link); - g_signal_connect (priv->input, "deactivate", (GCallback) on_deactivate, link); - g_signal_connect (priv->output, "activate", (GCallback) on_activate, link); - g_signal_connect (priv->output, "deactivate", (GCallback) on_deactivate, link); + priv->input_state = priv->input->node->node_state; + priv->output_state = priv->output->node->node_state; - g_signal_connect (link, "notify", (GCallback) on_property_notify, link); + g_signal_connect (priv->input->node, "notify::node-state", (GCallback) on_node_state_notify, this); + g_signal_connect (priv->output->node, "notify::node-state", (GCallback) on_node_state_notify, this); + + g_signal_connect (priv->input, "activate", (GCallback) on_activate, this); + g_signal_connect (priv->input, "deactivate", (GCallback) on_deactivate, this); + g_signal_connect (priv->output, "activate", (GCallback) on_activate, this); + g_signal_connect (priv->output, "deactivate", (GCallback) on_deactivate, this); + + g_signal_connect (this, "notify", (GCallback) on_property_notify, this); G_OBJECT_CLASS (pinos_link_parent_class)->constructed (object); - on_property_notify (G_OBJECT (link), NULL, link); - g_debug ("link %p: constructed", link); - link_register_object (link); + on_property_notify (G_OBJECT (this), NULL, this); + g_debug ("link %p: constructed", this); + link_register_object (this); } static void @@ -427,8 +556,8 @@ pinos_link_dispose (GObject * object) pinos_port_deactivate (priv->input); pinos_port_deactivate (priv->output); } - g_clear_object (&priv->input); - g_clear_object (&priv->output); + priv->input = NULL; + priv->output = NULL; link_unregister_object (link); G_OBJECT_CLASS (pinos_link_parent_class)->dispose (object); diff --git a/pinos/server/node.c b/pinos/server/node.c index 168952795..40ed28327 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -64,6 +64,7 @@ enum PROP_STATE, PROP_PROPERTIES, PROP_NODE, + PROP_NODE_STATE, }; enum @@ -266,6 +267,10 @@ pinos_node_get_property (GObject *_object, g_value_set_pointer (value, node->node); break; + case PROP_NODE_STATE: + g_value_set_uint (value, node->node_state); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (node, prop_id, pspec); break; @@ -496,6 +501,17 @@ pinos_node_class_init (PinosNodeClass * klass) G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_NODE_STATE, + g_param_spec_uint ("node-state", + "Node State", + "The state of the SPA node", + 0, + G_MAXUINT, + SPA_NODE_STATE_INIT, + G_PARAM_READABLE | + G_PARAM_STATIC_STRINGS)); + signals[SIGNAL_REMOVE] = g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, @@ -954,3 +970,24 @@ pinos_node_report_busy (PinosNode *node) g_debug ("node %p: report busy", node); pinos_node_set_state (node, PINOS_NODE_STATE_RUNNING); } + +/** + * pinos_node_update_node_state: + * @node: a #PinosNode + * @state: a #SpaNodeState + * + * Update the state of a SPA node. This method is used from + * inside @node itself. + */ +void +pinos_node_update_node_state (PinosNode *node, + SpaNodeState state) +{ + g_return_if_fail (PINOS_IS_NODE (node)); + + if (node->node_state != state) { + g_debug ("node %p: update SPA state to %d", node, state); + node->node_state = state; + g_object_notify (G_OBJECT (node), "node-state"); + } +} diff --git a/pinos/server/node.h b/pinos/server/node.h index 51c0a11e3..4f310d2e6 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -52,6 +52,7 @@ struct _PinosNode { GObject object; SpaNode *node; + SpaNodeState node_state; PinosNodePrivate *priv; }; @@ -111,6 +112,8 @@ void pinos_node_report_error (PinosNode *node, GError void pinos_node_report_idle (PinosNode *node); void pinos_node_report_busy (PinosNode *node); +void pinos_node_update_node_state (PinosNode *node, SpaNodeState state); + G_END_DECLS #endif /* __PINOS_NODE_H__ */ diff --git a/pinos/server/port.h b/pinos/server/port.h index 4c981c70b..6d7258c09 100644 --- a/pinos/server/port.h +++ b/pinos/server/port.h @@ -55,6 +55,9 @@ struct _PinosPort { uint32_t id; PinosNode *node; + SpaBuffer **buffers; + guint n_buffers; + PinosPortPrivate *priv; }; diff --git a/spa/include/spa/buffer.h b/spa/include/spa/buffer.h index 36292359e..7020a08d2 100644 --- a/spa/include/spa/buffer.h +++ b/spa/include/spa/buffer.h @@ -27,9 +27,6 @@ extern "C" { typedef struct _SpaBuffer SpaBuffer; typedef struct _SpaBufferGroup SpaBufferGroup; -#include -#include - /** * SpaMetaType: * @SPA_META_TYPE_INVALID: invalid metadata, should be ignored @@ -40,8 +37,13 @@ typedef enum { SPA_META_TYPE_HEADER, SPA_META_TYPE_POINTER, SPA_META_TYPE_VIDEO_CROP, + SPA_META_TYPE_RINGBUFFER, } SpaMetaType; +#include +#include +#include + /** * SpaBufferFlags: * @SPA_BUFFER_FLAG_NONE: no flag @@ -87,6 +89,20 @@ typedef struct { int width, height; } SpaMetaVideoCrop; +/** + * SpaMetaRingbuffer: + * @readindex: + * @writeindex: + * @size: + * @size_mask: + */ +typedef struct { + volatile int readindex; + volatile int writeindex; + int size; + int size_mask; +} SpaMetaRingbuffer; + /** * SpaMeta: * @type: metadata type @@ -133,6 +149,11 @@ struct _SpaBuffer { #define spa_buffer_ref(b) spa_memory_ref (&(b)->mem) #define spa_buffer_unref(b) spa_memory_unref (&(b)->mem) +SpaResult spa_buffer_alloc (SpaAllocParam **params, + unsigned int n_params, + SpaBuffer **buffers, + unsigned int *n_buffers); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/include/spa/control.h b/spa/include/spa/control.h index 39deea2eb..8c8b90bdc 100644 --- a/spa/include/spa/control.h +++ b/spa/include/spa/control.h @@ -201,16 +201,12 @@ typedef struct { typedef struct { uint32_t port_id; uint32_t buffer_id; - off_t offset; - size_t size; } SpaControlCmdProcessBuffer; /* SPA_CONTROL_CMD_REUSE_BUFFER */ typedef struct { uint32_t port_id; uint32_t buffer_id; - off_t offset; - size_t size; } SpaControlCmdReuseBuffer; diff --git a/spa/include/spa/event.h b/spa/include/spa/event.h index bc3083073..1c04a26d4 100644 --- a/spa/include/spa/event.h +++ b/spa/include/spa/event.h @@ -33,11 +33,11 @@ typedef struct _SpaEvent SpaEvent; /** * SpaEventType: * @SPA_EVENT_TYPE_INVALID: invalid event, should be ignored + * @SPA_EVENT_TYPE_PORT_ADDED: a new port is added + * @SPA_EVENT_TYPE_PORT_REMOVED: a port is removed * @SPA_EVENT_TYPE_STATE_CHANGE: emited when the state changes - * @SPA_EVENT_TYPE_CAN_PULL_OUTPUT: emited when an async node has output that can be pulled - * @SPA_EVENT_TYPE_CAN_PUSH_INPUT: emited when more data can be pushed to an async node - * @SPA_EVENT_TYPE_PULL_INPUT: emited when data needs to be provided on an input. data points to - * SpaEventPullInput + * @SPA_EVENT_TYPE_HAVE_OUTPUT: emited when an async node has output that can be pulled + * @SPA_EVENT_TYPE_NEED_INPUT: emited when more data can be pushed to an async node * @SPA_EVENT_TYPE_REUSE_BUFFER: emited when a buffer can be reused * @SPA_EVENT_TYPE_ADD_POLL: emited when a pollfd should be added. data points to #SpaPollItem * @SPA_EVENT_TYPE_REMOVE_POLL: emited when a pollfd should be removed. data points to #SpaPollItem @@ -52,9 +52,8 @@ typedef enum { SPA_EVENT_TYPE_PORT_ADDED, SPA_EVENT_TYPE_PORT_REMOVED, SPA_EVENT_TYPE_STATE_CHANGE, - SPA_EVENT_TYPE_CAN_PULL_OUTPUT, - SPA_EVENT_TYPE_CAN_PUSH_INPUT, - SPA_EVENT_TYPE_PULL_INPUT, + SPA_EVENT_TYPE_HAVE_OUTPUT, + SPA_EVENT_TYPE_NEED_INPUT, SPA_EVENT_TYPE_REUSE_BUFFER, SPA_EVENT_TYPE_ADD_POLL, SPA_EVENT_TYPE_REMOVE_POLL, @@ -76,20 +75,12 @@ typedef struct { SpaDirection direction; } SpaEventPortAdded; -typedef struct { - uint32_t buffer_id; - off_t offset; - size_t size; -} SpaEventPullInput; - typedef struct { SpaNodeState state; } SpaEventStateChange; typedef struct { - uint32_t buffer_id; - off_t offset; - size_t size; + uint32_t buffer_id; } SpaEventReuseBuffer; #ifdef __cplusplus diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index 9800441cc..1b3904d9c 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -31,6 +31,7 @@ typedef struct _SpaNode SpaNode; * @SPA_NODE_STATE_INIT: the node is initializing * @SPA_NODE_STATE_CONFIGURE: the node needs at least one port format * @SPA_NODE_STATE_READY: the node is ready for memory allocation + * @SPA_NODE_STATE_PAUSED: the node is paused * @SPA_NODE_STATE_STREAMING: the node is streaming * @SPA_NODE_STATE_ERROR: the node is in error */ @@ -81,8 +82,6 @@ typedef enum { * @port_id: the port id * @flags: extra flags * @buffer_id: a buffer id - * @offset: offset of data in @id - * @size: size of data in @id * @status: status * * Input information for a node. @@ -91,8 +90,6 @@ typedef struct { uint32_t port_id; SpaInputFlags flags; uint32_t buffer_id; - off_t offset; - size_t size; SpaResult status; } SpaInputInfo; @@ -116,8 +113,6 @@ typedef enum { * @port_id: the port id * @flags: extra flags * @buffer_id: a buffer id will be set - * @offset: offset to get - * @size: size to get * @event: output event * @status: a status * @@ -127,8 +122,6 @@ typedef struct { uint32_t port_id; SpaOutputFlags flags; uint32_t buffer_id; - off_t offset; - size_t size; SpaEvent *event; SpaResult status; } SpaOutputInfo; @@ -416,9 +409,7 @@ struct _SpaNode { SpaResult (*port_reuse_buffer) (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size); + uint32_t buffer_id); SpaResult (*port_get_status) (SpaNode *node, uint32_t port_id, diff --git a/spa/include/spa/port.h b/spa/include/spa/port.h index 9bc04c7d3..a9ec20160 100644 --- a/spa/include/spa/port.h +++ b/spa/include/spa/port.h @@ -24,6 +24,8 @@ extern "C" { #endif +typedef struct _SpaAllocParam SpaAllocParam; + #include #include @@ -41,10 +43,10 @@ typedef enum { SPA_ALLOC_PARAM_TYPE_VIDEO_PADDING, } SpaAllocParamType; -typedef struct { +struct _SpaAllocParam { uint32_t type; size_t size; -} SpaAllocParam; +}; typedef struct { SpaAllocParam param; diff --git a/spa/lib/buffer.c b/spa/lib/buffer.c new file mode 100644 index 000000000..ea4f3db7c --- /dev/null +++ b/spa/lib/buffer.c @@ -0,0 +1,135 @@ +/* Simple Plugin API + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include + +#include +#include + +typedef struct { + SpaBuffer buffer; + SpaMeta metas[3]; + SpaMetaHeader header; + SpaMetaRingbuffer ringbuffer; + SpaMetaVideoCrop crop; + SpaData datas[4]; +} Buffer; + +SpaResult +spa_buffer_alloc (SpaAllocParam **params, + unsigned int n_params, + SpaBuffer **buffers, + unsigned int *n_buffers) +{ + unsigned int i, nbufs; + size_t size = 0, stride = 0; + SpaMemory *bmem, *dmem; + Buffer *bufs; + Buffer *b; + bool add_header = false; + int n_metas = 0; + + nbufs = *n_buffers; + if (nbufs == 0) + return SPA_RESULT_ERROR; + + for (i = 0; i < n_params; i++) { + SpaAllocParam *p = params[i]; + + switch (p->type) { + case SPA_ALLOC_PARAM_TYPE_BUFFERS: + { + SpaAllocParamBuffers *b = (SpaAllocParamBuffers *) p; + + size = SPA_MAX (size, b->minsize); + break; + } + case SPA_ALLOC_PARAM_TYPE_META_ENABLE: + { + SpaAllocParamMetaEnable *b = (SpaAllocParamMetaEnable *) p; + + switch (b->type) { + case SPA_META_TYPE_HEADER: + if (!add_header) + n_metas++; + add_header = true; + break; + case SPA_META_TYPE_POINTER: + break; + case SPA_META_TYPE_VIDEO_CROP: + break; + case SPA_META_TYPE_RINGBUFFER: + break; + default: + break; + } + break; + } + default: + break; + } + } + + *n_buffers = nbufs; + + bmem = spa_memory_alloc_with_fd (SPA_MEMORY_POOL_SHARED, + NULL, sizeof (Buffer) * nbufs); + dmem = spa_memory_alloc_with_fd (SPA_MEMORY_POOL_SHARED, + NULL, size * nbufs); + + bufs = spa_memory_ensure_ptr (bmem); + + for (i = 0; i < nbufs; i++) { + int mi = 0; + + b = &bufs[i]; + b->buffer.id = i; + b->buffer.mem.mem = bmem->mem; + b->buffer.mem.offset = sizeof (Buffer) * i; + b->buffer.mem.size = sizeof (Buffer); + + buffers[i] = &b->buffer; + + b->buffer.n_metas = n_metas; + b->buffer.metas = offsetof (Buffer, metas); + b->buffer.n_datas = 1; + b->buffer.datas = offsetof (Buffer, datas); + + if (add_header) { + b->header.flags = 0; + b->header.seq = 0; + b->header.pts = 0; + b->header.dts_offset = 0; + + b->metas[mi].type = SPA_META_TYPE_HEADER; + b->metas[mi].offset = offsetof (Buffer, header); + b->metas[mi].size = sizeof (b->header); + mi++; + } + + b->datas[0].mem.mem = dmem->mem; + b->datas[0].mem.offset = size * i; + b->datas[0].mem.size = size; + b->datas[0].stride = stride; + } + return SPA_RESULT_OK; +} diff --git a/spa/lib/control.c b/spa/lib/control.c index c3ad82efb..f2339923e 100644 --- a/spa/lib/control.c +++ b/spa/lib/control.c @@ -431,8 +431,17 @@ iter_parse_port_update (struct stack_iter *si, SpaControlCmdPortUpdate *pu) if (pu->props) pu->props = parse_props (mem, p, SPA_PTR_TO_INT (pu->props)); - if (pu->info) - pu->info = SPA_MEMBER (p, SPA_PTR_TO_INT (pu->info), SpaPortInfo); + if (pu->info) { + SpaPortInfo *pi; + + pu->info = p = SPA_MEMBER (p, SPA_PTR_TO_INT (pu->info), SpaPortInfo); + + pi = (SpaPortInfo *) pu->info; + pi->params = SPA_MEMBER (p, SPA_PTR_TO_INT (pi->params), SpaAllocParam *); + for (i = 0; i < pi->n_params; i++) { + pi->params[i] = SPA_MEMBER (p, SPA_PTR_TO_INT (pi->params[i]), SpaAllocParam); + } + } } static void @@ -945,25 +954,55 @@ write_format (void *p, const SpaFormat *format) return write_props (p, &format->props, sizeof (SpaFormat)); } +static size_t +write_port_info (void *p, const SpaPortInfo *info) +{ + SpaPortInfo *tp; + SpaAllocParam **ap; + int i; + size_t len; + + tp = p; + memcpy (tp, info, sizeof (SpaPortInfo)); + + p = SPA_MEMBER (tp, sizeof (SpaPortInfo), SpaAllocParam *); + ap = p; + if (info->n_params) + tp->params = SPA_INT_TO_PTR (SPA_PTRDIFF (p, tp)); + else + tp->params = 0; + tp->features = 0; + + p = SPA_MEMBER (p, sizeof (SpaAllocParam*) * info->n_params, void); + + for (i = 0; i < info->n_params; i++) { + len = info->params[i]->size; + memcpy (p, info->params[i], len); + ap[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, tp)); + p = SPA_MEMBER (p, len, void); + } + return SPA_PTRDIFF (p, tp); +} + static void builder_add_node_update (struct stack_builder *sb, SpaControlCmdNodeUpdate *nu) { size_t len; - void *p, *base; + void *p; SpaControlCmdNodeUpdate *d; /* calc len */ len = sizeof (SpaControlCmdNodeUpdate); len += calc_props_len (nu->props); - base = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_UPDATE, len); - memcpy (base, nu, sizeof (SpaControlCmdNodeUpdate)); - d = base; + p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_UPDATE, len); + memcpy (p, nu, sizeof (SpaControlCmdNodeUpdate)); + d = p; p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeUpdate), void); if (nu->props) { len = write_props (p, nu->props, sizeof (SpaProps)); - d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, base)); + d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); } else { d->props = 0; } @@ -973,7 +1012,7 @@ static void builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) { size_t len; - void *p, *base; + void *p; int i; SpaFormat **bfa; SpaControlCmdPortUpdate *d; @@ -984,15 +1023,21 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) for (i = 0; i < pu->n_possible_formats; i++) len += calc_format_len (pu->possible_formats[i]); len += calc_props_len (pu->props); + if (pu->info) { + len += sizeof (SpaPortInfo); + len += pu->info->n_params * sizeof (SpaAllocParam *); + for (i = 0; i < pu->info->n_params; i++) + len += pu->info->params[i]->size; + } - base = builder_add_cmd (sb, SPA_CONTROL_CMD_PORT_UPDATE, len); - memcpy (base, pu, sizeof (SpaControlCmdPortUpdate)); - d = base; + p = builder_add_cmd (sb, SPA_CONTROL_CMD_PORT_UPDATE, len); + memcpy (p, pu, sizeof (SpaControlCmdPortUpdate)); + d = p; p = SPA_MEMBER (d, sizeof (SpaControlCmdPortUpdate), void); bfa = p; if (pu->n_possible_formats) - d->possible_formats = SPA_INT_TO_PTR (SPA_PTRDIFF (p, base)); + d->possible_formats = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); else d->possible_formats = 0; @@ -1000,31 +1045,36 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) for (i = 0; i < pu->n_possible_formats; i++) { len = write_format (p, pu->possible_formats[i]); - bfa[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, base)); + bfa[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); p = SPA_MEMBER (p, len, void); } if (pu->props) { len = write_props (p, pu->props, sizeof (SpaProps)); - d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, base)); + d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); p = SPA_MEMBER (p, len, void); } else { d->props = 0; } - - /* FIXME add more things */ + if (pu->info) { + len = write_port_info (p, pu->info); + d->info = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } else { + d->info = 0; + } } static void builder_add_set_format (struct stack_builder *sb, SpaControlCmdSetFormat *sf) { size_t len; - void *p, *base; + void *p; /* calculate length */ /* port_id + format + mask */ len = sizeof (SpaControlCmdSetFormat) + calc_format_len (sf->format); - base = builder_add_cmd (sb, SPA_CONTROL_CMD_SET_FORMAT, len); - memcpy (base, sf, sizeof (SpaControlCmdSetFormat)); - sf = base; + p = builder_add_cmd (sb, SPA_CONTROL_CMD_SET_FORMAT, len); + memcpy (p, sf, sizeof (SpaControlCmdSetFormat)); + sf = p; p = SPA_MEMBER (sf, sizeof (SpaControlCmdSetFormat), void); len = write_format (p, sf->format); diff --git a/spa/lib/debug.c b/spa/lib/debug.c index 0af1ecb9b..a0b582b55 100644 --- a/spa/lib/debug.c +++ b/spa/lib/debug.c @@ -213,6 +213,7 @@ print_value (const SpaPropInfo *info, int size, const void *value) { SpaPropType type = info->type; bool enum_string = false; + const void *enum_value; if (info->range_type == SPA_PROP_RANGE_TYPE_ENUM) { int i; @@ -220,8 +221,7 @@ print_value (const SpaPropInfo *info, int size, const void *value) for (i = 0; i < info->n_range_values; i++) { if (memcmp (info->range_values[i].value, value, size) == 0) { if (info->range_values[i].name) { - type = SPA_PROP_TYPE_STRING; - value = info->range_values[i].name; + enum_value = info->range_values[i].name; enum_string = true; } } @@ -272,10 +272,7 @@ print_value (const SpaPropInfo *info, int size, const void *value) fprintf (stderr, "%g", *(double *)value); break; case SPA_PROP_TYPE_STRING: - if (enum_string) - fprintf (stderr, "%s", (char *)value); - else - fprintf (stderr, "\"%s\"", (char *)value); + fprintf (stderr, "\"%s\"", (char *)value); break; case SPA_PROP_TYPE_RECTANGLE: { @@ -297,6 +294,8 @@ print_value (const SpaPropInfo *info, int size, const void *value) default: break; } + if (enum_string) + fprintf (stderr, " (%s)", (char *)enum_value); } SpaResult diff --git a/spa/lib/memory.c b/spa/lib/memory.c index 371e20220..93e24cc5e 100644 --- a/spa/lib/memory.c +++ b/spa/lib/memory.c @@ -182,7 +182,7 @@ spa_memory_alloc_with_fd (uint32_t pool_id, void *data, size_t size) seals = F_SEAL_GROW | F_SEAL_SHRINK | F_SEAL_SEAL; if (fcntl (mem->fd, F_ADD_SEALS, seals) == -1) { - fprintf (stderr, "Failed to write data: %s\n", strerror (errno)); + fprintf (stderr, "Failed to add seals: %s\n", strerror (errno)); close (mem->fd); return NULL; } @@ -320,7 +320,7 @@ spa_memory_ensure_ptr (SpaMemory *mem) mem->ptr = mmap (NULL, mem->size, prot, MAP_SHARED, mem->fd, 0); if (mem->ptr == MAP_FAILED) { mem->ptr = NULL; - fprintf (stderr, "Failed to mmap memory %p: %s\n", mem, strerror (errno)); + fprintf (stderr, "Failed to mmap memory %zd %p: %s\n", mem->size, mem, strerror (errno)); } return mem->ptr; } diff --git a/spa/lib/meson.build b/spa/lib/meson.build index cdf797472..35c3ed46c 100644 --- a/spa/lib/meson.build +++ b/spa/lib/meson.build @@ -1,4 +1,5 @@ spalib_sources = ['audio-raw.c', + 'buffer.c', 'control.c', 'debug.c', 'memory.c', diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index a47072934..f4162eb20 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -67,9 +67,10 @@ typedef struct _ALSABuffer ALSABuffer; struct _ALSABuffer { SpaBuffer buffer; - SpaMeta meta[1]; + SpaMeta metas[2]; SpaMetaHeader header; - SpaData data[1]; + SpaMetaRingbuffer ringbuffer; + SpaData datas[1]; }; struct _SpaALSASink { @@ -88,12 +89,15 @@ struct _SpaALSASink { SpaALSAState state; SpaPortInfo info; + SpaAllocParam *params[1]; + SpaAllocParamBuffers param_buffers; SpaPortStatus status; SpaBuffer *buffers; unsigned int n_buffers; uint32_t input_buffer; + SpaMemory *mem; ALSABuffer buffer; }; @@ -395,6 +399,20 @@ spa_alsa_sink_node_port_set_format (SpaNode *node, if (alsa_set_format (this, &this->current_format, false) < 0) return SPA_RESULT_ERROR; + this->info.flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + this->info.maxbuffering = -1; + this->info.latency = -1; + this->info.n_params = 1; + this->params[0] = &this->param_buffers.param; + this->param_buffers.param.type = SPA_ALLOC_PARAM_TYPE_BUFFERS; + this->param_buffers.param.size = sizeof (this->param_buffers); + this->param_buffers.minsize = 1; + this->param_buffers.stride = 0; + this->param_buffers.min_buffers = 1; + this->param_buffers.max_buffers = 8; + this->param_buffers.align = 16; + this->info.features = NULL; + this->have_format = true; return SPA_RESULT_OK; @@ -476,15 +494,71 @@ spa_alsa_sink_node_port_alloc_buffers (SpaNode *node, SpaBuffer **buffers, uint32_t *n_buffers) { - return SPA_RESULT_NOT_IMPLEMENTED; + SpaALSASink *this; + ALSABuffer *b; + SpaALSAState *state; + + if (node == NULL || node->handle == NULL || buffers == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + if (port_id != 0) + return SPA_RESULT_INVALID_PORT; + + this = (SpaALSASink *) node->handle; + + if (!this->have_format) + return SPA_RESULT_NO_FORMAT; + + state = &this->state; + + if (!this->mem) + this->mem = spa_memory_alloc_with_fd (SPA_MEMORY_POOL_SHARED, NULL, state->buffer_size); + + b = &this->buffer; + b->buffer.id = 0; + b->buffer.mem.mem.pool_id = -1; + b->buffer.mem.mem.id = -1; + b->buffer.mem.offset = 0; + b->buffer.mem.size = sizeof (ALSABuffer); + + b->buffer.n_metas = 2; + b->buffer.metas = offsetof (ALSABuffer, metas); + b->buffer.n_datas = 1; + b->buffer.datas = offsetof (ALSABuffer, datas); + + b->header.flags = 0; + b->header.seq = 0; + b->header.pts = 0; + b->header.dts_offset = 0; + + b->metas[0].type = SPA_META_TYPE_HEADER; + b->metas[0].offset = offsetof (ALSABuffer, header); + b->metas[0].size = sizeof (b->header); + + b->ringbuffer.readindex = 0; + b->ringbuffer.writeindex = 0; + b->ringbuffer.size = 0; + b->ringbuffer.size_mask = 0; + + b->metas[1].type = SPA_META_TYPE_RINGBUFFER; + b->metas[1].offset = offsetof (ALSABuffer, ringbuffer); + b->metas[1].size = sizeof (b->ringbuffer); + + b->datas[0].mem.mem = this->mem->mem; + b->datas[0].mem.offset = 0; + b->datas[0].mem.size = state->buffer_size; + b->datas[0].stride = 0; + + buffers[0] = &b->buffer; + *n_buffers = 1; + + return SPA_RESULT_OK; } static SpaResult spa_alsa_sink_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { return SPA_RESULT_NOT_IMPLEMENTED; } @@ -627,7 +701,6 @@ alsa_sink_init (const SpaHandleFactory *factory, this->props[1].props.prop_info = prop_info; reset_alsa_sink_props (&this->props[1]); - this->info.flags = SPA_PORT_INFO_FLAG_NONE; this->status.flags = SPA_PORT_STATUS_FLAG_NEED_INPUT; return SPA_RESULT_OK; diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index b6793713b..6b73ad46a 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -225,17 +225,11 @@ static void pull_input (SpaALSASink *this, void *data, snd_pcm_uframes_t frames) { SpaEvent event; - SpaEventPullInput pi; - event.type = SPA_EVENT_TYPE_PULL_INPUT; + event.type = SPA_EVENT_TYPE_NEED_INPUT; event.port_id = 0; - event.size = frames * sizeof (uint16_t) * 2; - event.data = π - - pi.buffer_id = this->buffer.buffer.id; - pi.offset = 0; - pi.size = frames * sizeof (uint16_t) * 2; - + event.size = 0; + event.data = NULL; this->event_cb (&this->node, &event, this->user_data); } @@ -373,6 +367,9 @@ spa_alsa_stop (SpaALSASink *this) SpaALSAState *state = &this->state; SpaEvent event; + if (!state->opened) + return 0; + snd_pcm_drop (state->handle); event.type = SPA_EVENT_TYPE_REMOVE_POLL; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index e8e2f0811..f5d89f288 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -469,9 +469,7 @@ spa_audiomixer_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_audiomixer_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { return SPA_RESULT_NOT_IMPLEMENTED; } @@ -571,15 +569,11 @@ static void pull_port (SpaAudioMixer *this, uint32_t port_id, SpaOutputInfo *info, size_t pull_size) { SpaEvent event; - MixerBuffer *buffer = &this->ports[port_id].mix; - SpaEventPullInput pi; - event.type = SPA_EVENT_TYPE_PULL_INPUT; + event.type = SPA_EVENT_TYPE_NEED_INPUT; event.port_id = port_id; - event.data = π - pi.buffer_id = buffer->buffer.id; - pi.offset = 0; - pi.size = pull_size; + event.size = 0; + event.data = NULL; this->event_cb (&this->node, &event, this->user_data); } @@ -645,7 +639,7 @@ mix_data (SpaAudioMixer *this, SpaOutputInfo *info) if (info->port_id != 0) return SPA_RESULT_INVALID_PORT; - pull_size = info->size; + pull_size = 0; min_size = 0; min_port = 0; diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index dab2edd54..b228a67c1 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -424,9 +424,7 @@ spa_audiotestsrc_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_audiotestsrc_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { return SPA_RESULT_NOT_IMPLEMENTED; } @@ -492,7 +490,7 @@ spa_audiotestsrc_node_port_pull_output (SpaNode *node, continue; } - size = info[i].size; + size = 0; for (j = 0; j < size; j++) ptr[j] = rand(); diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index 94cfe0d9f..d46b21e77 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -428,9 +428,7 @@ spa_ffmpeg_dec_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_ffmpeg_dec_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offse, - size_t size) + uint32_t buffer_id) { if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index 9721209f1..a06dbe24f 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -428,9 +428,7 @@ spa_ffmpeg_enc_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_ffmpeg_enc_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; diff --git a/spa/plugins/remote/proxy.c b/spa/plugins/remote/proxy.c index 81551ce8f..326f81c87 100644 --- a/spa/plugins/remote/proxy.c +++ b/spa/plugins/remote/proxy.c @@ -58,6 +58,7 @@ typedef struct { SpaPortStatus status; unsigned int n_buffers; SpaBuffer **buffers; + uint32_t buffer_id; } SpaProxyPort; struct _SpaProxy { @@ -335,35 +336,46 @@ spa_proxy_node_get_port_ids (SpaNode *node, } static void -do_init_port (SpaProxy *this, - uint32_t port_id, - SpaDirection direction, - unsigned int n_formats, - SpaFormat **formats) +do_update_port (SpaProxy *this, + SpaControlCmdPortUpdate *pu) { SpaEvent event; SpaProxyPort *port; SpaEventPortAdded pa; - fprintf (stderr, "%p: adding port %d, %d\n", this, port_id, direction); - port = &this->ports[port_id]; - port->direction = direction; - port->valid = true; - port->format = NULL; - port->n_formats = n_formats; - port->formats = formats; + port = &this->ports[pu->port_id]; - if (direction == SPA_DIRECTION_INPUT) - this->n_inputs++; - else - this->n_outputs++; - event.type = SPA_EVENT_TYPE_PORT_ADDED; - event.port_id = port_id; - event.data = &pa; - event.size = sizeof (pa); - pa.direction = direction; - this->event_cb (&this->node, &event, this->user_data); + if (pu->change_mask & SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS) { + port->n_formats = pu->n_possible_formats; + port->formats = pu->possible_formats; + } + + if (pu->change_mask & SPA_CONTROL_CMD_PORT_UPDATE_PROPS) { + } + + if (pu->change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO) { + port->info = *pu->info; + } + + if (!port->valid) { + fprintf (stderr, "%p: adding port %d, %d\n", this, pu->port_id, pu->direction); + port->direction = pu->direction; + port->format = NULL; + port->valid = true; + + if (pu->direction == SPA_DIRECTION_INPUT) + this->n_inputs++; + else + this->n_outputs++; + + event.type = SPA_EVENT_TYPE_PORT_ADDED; + event.port_id = pu->port_id; + event.data = &pa; + event.size = sizeof (pa); + pa.direction = pu->direction; + this->event_cb (&this->node, &event, this->user_data); + } } static void @@ -400,6 +412,7 @@ spa_proxy_node_add_port (SpaNode *node, uint32_t port_id) { SpaProxy *this; + SpaControlCmdPortUpdate pu; if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -409,7 +422,17 @@ spa_proxy_node_add_port (SpaNode *node, if (!CHECK_FREE_PORT_ID (this, port_id)) return SPA_RESULT_INVALID_PORT; - do_init_port (this, port_id, direction, 0, NULL); + pu.change_mask = SPA_CONTROL_CMD_PORT_UPDATE_DIRECTION | + SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | + SPA_CONTROL_CMD_PORT_UPDATE_PROPS | + SPA_CONTROL_CMD_PORT_UPDATE_INFO; + pu.port_id = port_id; + pu.direction = direction; + pu.n_possible_formats = 0; + pu.possible_formats = NULL; + pu.props = NULL; + pu.info = NULL; + do_update_port (this, &pu); return SPA_RESULT_OK; } @@ -617,7 +640,7 @@ add_buffer (SpaProxy *this, uint32_t port_id, SpaBuffer *buffer) spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds)); if (buffer->mem.mem.id == SPA_ID_INVALID) { - fprintf (stderr, "proxy %p: alloc buffer space\n", this); + fprintf (stderr, "proxy %p: alloc buffer space %zd\n", this, buffer->mem.size); bmem = spa_memory_alloc_with_fd (SPA_MEMORY_POOL_SHARED, buffer, buffer->mem.size); b = spa_memory_ensure_ptr (bmem); b->mem.mem = bmem->mem; @@ -776,9 +799,7 @@ spa_proxy_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_proxy_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { return SPA_RESULT_NOT_IMPLEMENTED; } @@ -879,6 +900,9 @@ spa_proxy_node_port_pull_output (SpaNode *node, have_error = true; continue; } + + info[i].buffer_id = port->buffer_id; + info[i].status = SPA_RESULT_OK; } if (have_error) return SPA_RESULT_ERROR; @@ -893,6 +917,39 @@ spa_proxy_node_port_push_event (SpaNode *node, uint32_t port_id, SpaEvent *event) { + SpaProxy *this; + SpaResult res; + + if (node == NULL || node->handle == NULL || event == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + this = (SpaProxy *) node->handle; + + switch (event->type) { + case SPA_EVENT_TYPE_REUSE_BUFFER: + { + SpaEventReuseBuffer *rb = event->data; + SpaControlCmdReuseBuffer crb; + SpaControlBuilder builder; + SpaControl control; + uint8_t buf[128]; + + /* send start */ + spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + crb.port_id = event->port_id; + crb.buffer_id = rb->buffer_id; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REUSE_BUFFER, &crb); + spa_control_builder_end (&builder, &control); + + if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) + fprintf (stderr, "proxy %p: error writing control %d\n", this, res); + + spa_control_clear (&control); + return SPA_RESULT_OK; + } + default: + break; + } return SPA_RESULT_NOT_IMPLEMENTED; } @@ -936,7 +993,7 @@ parse_control (SpaProxy *this, case SPA_CONTROL_CMD_PORT_UPDATE: { SpaControlCmdPortUpdate pu; - SpaProxyPort *port; + bool remove; fprintf (stderr, "proxy %p: got port update %d\n", this, cmd); if (spa_control_iter_parse_cmd (&it, &pu) < 0) @@ -945,16 +1002,13 @@ parse_control (SpaProxy *this, if (pu.port_id >= MAX_PORTS) break; - port = &this->ports[pu.port_id]; + remove = (pu.change_mask & SPA_CONTROL_CMD_PORT_UPDATE_DIRECTION) && + (pu.direction == SPA_DIRECTION_INVALID); - if (!port->valid && pu.direction != SPA_DIRECTION_INVALID) { - do_init_port (this, - pu.port_id, - pu.direction, - pu.n_possible_formats, - pu.possible_formats); - } else { + if (remove) { do_uninit_port (this, pu.port_id); + } else { + do_update_port (this, &pu); } break; } @@ -981,6 +1035,17 @@ parse_control (SpaProxy *this, } case SPA_CONTROL_CMD_HAVE_OUTPUT: { + SpaEvent event; + SpaControlCmdHaveOutput cmd; + + if (spa_control_iter_parse_cmd (&it, &cmd) < 0) + break; + + event.type = SPA_EVENT_TYPE_HAVE_OUTPUT; + event.port_id = cmd.port_id; + event.data = NULL; + event.size = 0; + this->event_cb (&this->node, &event, this->user_data); break; } @@ -995,6 +1060,14 @@ parse_control (SpaProxy *this, case SPA_CONTROL_CMD_PROCESS_BUFFER: { + SpaControlCmdProcessBuffer cmd; + SpaProxyPort *port; + + if (spa_control_iter_parse_cmd (&it, &cmd) < 0) + break; + + port = &this->ports[cmd.port_id]; + port->buffer_id = cmd.buffer_id; break; } case SPA_CONTROL_CMD_REUSE_BUFFER: @@ -1011,8 +1084,6 @@ parse_control (SpaProxy *this, event.data = &rb; event.size = sizeof (rb); rb.buffer_id = crb.buffer_id; - rb.offset = crb.offset; - rb.size = crb.size; this->event_cb (&this->node, &event, this->user_data); break; diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index efb2c81b4..c702eb2da 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -203,6 +203,20 @@ spa_v4l2_source_node_set_props (SpaNode *node, return res; } +static void +send_state_change (SpaV4l2Source *this, SpaNodeState state) +{ + SpaEvent event; + SpaEventStateChange sc; + + event.type = SPA_EVENT_TYPE_STATE_CHANGE; + event.port_id = -1; + event.data = ≻ + event.size = sizeof (sc); + sc.state = state; + this->event_cb (&this->node, &event, this->user_data); +} + static SpaResult spa_v4l2_source_node_send_command (SpaNode *node, SpaCommand *command) @@ -221,34 +235,12 @@ spa_v4l2_source_node_send_command (SpaNode *node, case SPA_COMMAND_START: spa_v4l2_start (this); - if (this->event_cb) { - SpaEvent event; - SpaEventStateChange sc; - - event.type = SPA_EVENT_TYPE_STATE_CHANGE; - event.port_id = -1; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + send_state_change (this, SPA_NODE_STATE_STREAMING); break; case SPA_COMMAND_STOP: spa_v4l2_stop (this); - if (this->event_cb) { - SpaEvent event; - SpaEventStateChange sc; - - event.type = SPA_EVENT_TYPE_STATE_CHANGE; - event.port_id = -1; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + send_state_change (this, SPA_NODE_STATE_PAUSED); break; case SPA_COMMAND_FLUSH: @@ -274,6 +266,8 @@ spa_v4l2_source_node_set_event_callback (SpaNode *node, this->event_cb = event; this->user_data = user_data; + send_state_change (this, SPA_NODE_STATE_CONFIGURE); + return SPA_RESULT_OK; } @@ -427,6 +421,8 @@ spa_v4l2_source_node_port_set_format (SpaNode *node, if (!(flags & SPA_PORT_FORMAT_FLAG_TEST_ONLY)) { memcpy (tf, f, fs); state->current_format = tf; + + send_state_change (this, SPA_NODE_STATE_READY); } return SPA_RESULT_OK; @@ -525,7 +521,7 @@ spa_v4l2_source_node_port_alloc_buffers (SpaNode *node, { SpaV4l2Source *this; - if (node == NULL || node->handle == NULL) + if (node == NULL || node->handle == NULL || buffers == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = (SpaV4l2Source *) node->handle; @@ -541,9 +537,7 @@ spa_v4l2_source_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_v4l2_source_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { SpaV4l2Source *this; diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index fa0af2431..95baa21af 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -477,7 +477,7 @@ v4l2_on_fd_events (SpaPollNotifyData *data) if (mmap_read (this) < 0) return 0; - event.type = SPA_EVENT_TYPE_CAN_PULL_OUTPUT; + event.type = SPA_EVENT_TYPE_HAVE_OUTPUT; event.port_id = 0; event.size = 0; event.data = NULL; diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 47e98f32c..742a18d6a 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -418,9 +418,7 @@ spa_volume_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_volume_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { return SPA_RESULT_NOT_IMPLEMENTED; } @@ -530,8 +528,6 @@ release_buffer (SpaVolume *this, SpaBuffer *buffer) event.data = &rb; event.size = sizeof (rb); rb.buffer_id = buffer->id; - rb.offset = 0; - rb.size = -1; this->event_cb (&this->node, &event, this->user_data); } diff --git a/spa/plugins/xv/xv-sink.c b/spa/plugins/xv/xv-sink.c index 2c44135ab..47988d792 100644 --- a/spa/plugins/xv/xv-sink.c +++ b/spa/plugins/xv/xv-sink.c @@ -452,9 +452,7 @@ spa_xv_sink_node_port_alloc_buffers (SpaNode *node, static SpaResult spa_xv_sink_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, - uint32_t buffer_id, - off_t offset, - size_t size) + uint32_t buffer_id) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index dec44a2e5..0cac28853 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -94,19 +94,14 @@ on_mix_event (SpaNode *node, SpaEvent *event, void *user_data) AppData *data = user_data; switch (event->type) { - case SPA_EVENT_TYPE_PULL_INPUT: + case SPA_EVENT_TYPE_NEED_INPUT: { SpaInputInfo iinfo; SpaOutputInfo oinfo; SpaResult res; - SpaEventPullInput *pi; - - pi = event->data; oinfo.port_id = 0; oinfo.flags = SPA_OUTPUT_FLAG_NONE; - oinfo.size = pi->size; - oinfo.offset = pi->offset; if (event->port_id == data->mix_ports[0]) { if ((res = spa_node_port_pull_output (data->source1, 1, &oinfo)) < 0) @@ -136,19 +131,14 @@ on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) AppData *data = user_data; switch (event->type) { - case SPA_EVENT_TYPE_PULL_INPUT: + case SPA_EVENT_TYPE_NEED_INPUT: { SpaInputInfo iinfo; SpaOutputInfo oinfo; SpaResult res; - SpaEventPullInput *pi; - - pi = event->data; oinfo.port_id = 0; oinfo.flags = SPA_OUTPUT_FLAG_PULL; - oinfo.offset = pi->offset; - oinfo.size = pi->size; if ((res = spa_node_port_pull_output (data->mix, 1, &oinfo)) < 0) printf ("got error %d\n", res); diff --git a/spa/tests/test-v4l2.c b/spa/tests/test-v4l2.c index dc9d71d2f..42bf9ed5a 100644 --- a/spa/tests/test-v4l2.c +++ b/spa/tests/test-v4l2.c @@ -115,7 +115,7 @@ on_source_event (SpaNode *node, SpaEvent *event, void *user_data) AppData *data = user_data; switch (event->type) { - case SPA_EVENT_TYPE_CAN_PULL_OUTPUT: + case SPA_EVENT_TYPE_HAVE_OUTPUT: { SpaOutputInfo info[1] = { 0, }; SpaResult res; @@ -178,7 +178,7 @@ on_source_event (SpaNode *node, SpaEvent *event, void *user_data) SDL_RenderCopy (data->renderer, data->texture, NULL, NULL); SDL_RenderPresent (data->renderer); } - spa_node_port_reuse_buffer (data->source, 0, info->buffer_id, info->offset, info->size); + spa_node_port_reuse_buffer (data->source, 0, info->buffer_id); break; } case SPA_EVENT_TYPE_ADD_POLL: