diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 86efb38c3..bc275bcda 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -39,6 +39,8 @@ #define MAX_BUFFER_SIZE 4096 #define MAX_FDS 32 +#define MAX_INPUTS 64 +#define MAX_OUTPUTS 64 typedef struct { bool cleanup; @@ -47,13 +49,6 @@ typedef struct { SpaBuffer *buf; } BufferId; -static void -clear_buffer_id (BufferId *id) -{ - spa_memory_unref (&id->buf->mem.mem); - id->buf = NULL; -} - struct _PinosStreamPrivate { PinosContext *context; @@ -68,9 +63,12 @@ struct _PinosStreamPrivate PinosDirection direction; gchar *path; + SpaNodeState node_state; GPtrArray *possible_formats; SpaFormat *format; SpaPortInfo port_info; + uint32_t port_id; + uint32_t pending_seq; PinosStreamFlags flags; @@ -126,6 +124,23 @@ enum static guint signals[LAST_SIGNAL] = { 0 }; +static void +clear_buffers (PinosStream *stream) +{ + PinosStreamPrivate *priv = stream->priv; + guint i; + + for (i = 0; i < priv->buffer_ids->len; i++) { + BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i); + + g_signal_emit (stream, signals[SIGNAL_REMOVE_BUFFER], 0, bid->id); + spa_memory_unref (&bid->buf->mem.mem); + bid->buf = NULL; + } + g_array_set_size (priv->buffer_ids, 0); + priv->in_order = TRUE; +} + static void pinos_stream_get_property (GObject *_object, guint prop_id, @@ -491,9 +506,9 @@ pinos_stream_init (PinosStream * stream) g_debug ("new stream %p", stream); priv->state = PINOS_STREAM_STATE_UNCONNECTED; + priv->node_state = SPA_NODE_STATE_INIT; priv->buffer_ids = g_array_sized_new (FALSE, FALSE, sizeof (BufferId), 64); - g_array_set_clear_func (priv->buffer_ids, (GDestroyNotify) clear_buffer_id); - priv->in_order = TRUE; + priv->pending_seq = SPA_ID_INVALID; } /** @@ -611,39 +626,37 @@ add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_UPDATE, &nu); } +static void +add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlCmdNodeStateChange sc; + + sc.state = priv->node_state = state; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_STATE_CHANGE, &sc); +} + static void add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask) { PinosStreamPrivate *priv = stream->priv; SpaControlCmdPortUpdate pu = { 0, };; - pu.port_id = 0; + pu.port_id = priv->port_id; pu.change_mask = change_mask; if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS) { pu.n_possible_formats = priv->possible_formats->len; pu.possible_formats = (SpaFormat **)priv->possible_formats->pdata; } + if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_FORMAT) { + pu.format = priv->format; + } pu.props = NULL; if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO) pu.info = &priv->port_info; spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu); } -static void -add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) -{ - SpaControlCmdNodeEvent cne; - SpaNodeEvent ne; - SpaNodeEventStateChange sc; - - cne.event = ≠ - ne.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - ne.data = ≻ - ne.size = sizeof (sc); - sc.state = state; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); -} - static void add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id) { @@ -693,6 +706,25 @@ add_request_clock_update (PinosStream *stream, SpaControlBuilder *builder) spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); } +static void +add_async_complete (PinosStream *stream, + SpaControlBuilder *builder, + uint32_t seq, + SpaResult res) +{ + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventAsyncComplete ac; + + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + ne.data = ∾ + ne.size = sizeof (ac); + ac.seq = seq; + ac.res = res; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); +} + static void send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) { @@ -751,6 +783,30 @@ send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) spa_control_clear (&control); } +static void +do_node_init (PinosStream *stream) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlBuilder builder; + SpaControl control; + + control_builder_init (stream, &builder); + add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | + SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); + + priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | + SPA_CONTROL_CMD_PORT_UPDATE_INFO); + + add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); +} + static BufferId * find_buffer (PinosStream *stream, uint32_t id) { @@ -779,9 +835,9 @@ handle_node_event (PinosStream *stream, case SPA_NODE_EVENT_TYPE_INVALID: case SPA_NODE_EVENT_TYPE_PORT_ADDED: case SPA_NODE_EVENT_TYPE_PORT_REMOVED: - case SPA_NODE_EVENT_TYPE_STATE_CHANGE: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT: + case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: g_warning ("unhandled node event %d", event->type); break; @@ -790,7 +846,7 @@ handle_node_event (PinosStream *stream, SpaNodeEventReuseBuffer *p = event->data; BufferId *bid; - if (p->port_id != 0) + if (p->port_id != priv->port_id) break; if (priv->direction != PINOS_DIRECTION_OUTPUT) break; @@ -818,6 +874,7 @@ handle_node_event (PinosStream *stream, static gboolean handle_node_command (PinosStream *stream, + uint32_t seq, SpaNodeCommand *command) { PinosStreamPrivate *priv = stream->priv; @@ -834,6 +891,7 @@ handle_node_command (PinosStream *stream, control_builder_init (stream, &builder); add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); + add_async_complete (stream, &builder, seq, SPA_RESULT_OK); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -852,8 +910,9 @@ handle_node_command (PinosStream *stream, g_debug ("stream %p: start", stream); control_builder_init (stream, &builder); if (priv->direction == PINOS_DIRECTION_INPUT) - add_need_input (stream, &builder, 0); + add_need_input (stream, &builder, priv->port_id); add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); + add_async_complete (stream, &builder, seq, SPA_RESULT_OK); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -867,8 +926,21 @@ handle_node_command (PinosStream *stream, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + { + SpaControlBuilder builder; + SpaControl control; + g_warning ("unhandled node command %d", command->type); + control_builder_init (stream, &builder); + add_async_complete (stream, &builder, seq, SPA_RESULT_NOT_IMPLEMENTED); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); break; + } case SPA_NODE_COMMAND_CLOCK_UPDATE: { @@ -904,6 +976,7 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_PORT_UPDATE: case SPA_CONTROL_CMD_PORT_REMOVED: case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: g_warning ("got unexpected control %d", cmd); break; @@ -924,21 +997,8 @@ parse_control (PinosStream *stream, priv->format = p.format; spa_debug_format (p.format); + priv->pending_seq = p.seq; g_object_notify (G_OBJECT (stream), "format"); - - if (priv->port_info.n_params != 0) { - SpaControlBuilder builder; - SpaControl control; - - control_builder_init (stream, &builder); - add_state_change (stream, &builder, SPA_NODE_STATE_READY); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); - } break; } case SPA_CONTROL_CMD_SET_PROPERTY: @@ -960,7 +1020,7 @@ parse_control (PinosStream *stream, mem = spa_memory_import (&p.mem); if (mem->fd == -1) { - g_debug ("add mem %d,%d, %d, %d", p.mem.pool_id, p.mem.id, fd, p.flags); + g_debug ("add mem %u:%u, fd %d, flags %d", p.mem.pool_id, p.mem.id, fd, p.flags); mem->flags = p.flags; mem->fd = fd; mem->ptr = NULL; @@ -991,13 +1051,15 @@ parse_control (PinosStream *stream, break; /* clear previous buffers */ - g_array_set_size (priv->buffer_ids, 0); + clear_buffers (stream); for (i = 0; i < p.n_buffers; i++) { bid.buf = p.buffers[i]; bid.cleanup = false; bid.id = bid.buf->id; - g_debug ("add buffer %d, %d, %zd, %zd", bid.id, bid.buf->mem.mem.id, bid.buf->mem.offset, bid.buf->mem.size); + g_debug ("add buffer %d: %u:%u, %zd-%zd", bid.id, + bid.buf->mem.mem.pool_id, bid.buf->mem.mem.id, + bid.buf->mem.offset, bid.buf->mem.size); if (bid.id != priv->buffer_ids->len) { g_warning ("unexpected id %u found, expected %u", bid.id, priv->buffer_ids->len); @@ -1013,6 +1075,7 @@ parse_control (PinosStream *stream, } else { add_state_change (stream, &builder, SPA_NODE_STATE_READY); } + add_async_complete (stream, &builder, p.seq, SPA_RESULT_OK); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -1033,7 +1096,7 @@ parse_control (PinosStream *stream, g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id); - send_need_input (stream, 0); + send_need_input (stream, priv->port_id); break; } case SPA_CONTROL_CMD_NODE_EVENT: @@ -1053,7 +1116,7 @@ parse_control (PinosStream *stream, if (spa_control_iter_parse_cmd (&it, &p) < 0) break; - handle_node_command (stream, p.command); + handle_node_command (stream, p.seq, p.command); break; } @@ -1175,8 +1238,6 @@ on_node_proxy (GObject *source_object, PinosStream *stream = user_data; PinosStreamPrivate *priv = stream->priv; PinosContext *context = priv->context; - SpaControlBuilder builder; - SpaControl control; GError *error = NULL; @@ -1186,22 +1247,7 @@ on_node_proxy (GObject *source_object, if (priv->node == NULL) goto node_failed; - control_builder_init (stream, &builder); - add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | - SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); - - priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | - SPA_CONTROL_CMD_PORT_UPDATE_INFO); - - add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); - - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + do_node_init (stream); stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); g_object_unref (stream); @@ -1343,6 +1389,7 @@ pinos_stream_connect (PinosStream *stream, g_return_val_if_fail (pinos_stream_get_state (stream) == PINOS_STREAM_STATE_UNCONNECTED, FALSE); priv->direction = direction; + priv->port_id = direction == PINOS_DIRECTION_INPUT ? 0 : MAX_INPUTS; priv->mode = mode; g_free (priv->path); priv->path = g_strdup (port_path); @@ -1361,16 +1408,26 @@ pinos_stream_connect (PinosStream *stream, } /** - * pinos_stream_start_allocation: + * pinos_stream_finish_format: * @stream: a #PinosStream - * @props: a #PinosProperties + * @res: a #SpaResult + * @params: an array of pointers to #SpaAllocParam + * @n_params: number of elements in @params + * + * Complete the negotiation process with result code @res. + * + * This function should be called after notification of the format. + + * When @res indicates success, @params contain the parameters for the + * allocation state. * * Returns: %TRUE on success */ gboolean -pinos_stream_start_allocation (PinosStream *stream, - SpaAllocParam **params, - unsigned int n_params) +pinos_stream_finish_format (PinosStream *stream, + SpaResult res, + SpaAllocParam **params, + unsigned int n_params) { PinosStreamPrivate *priv; PinosContext *context; @@ -1379,24 +1436,31 @@ pinos_stream_start_allocation (PinosStream *stream, g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); priv = stream->priv; + g_return_val_if_fail (priv->pending_seq != SPA_ID_INVALID, FALSE); context = priv->context; g_return_val_if_fail (pinos_context_get_state (context) == PINOS_CONTEXT_STATE_CONNECTED, FALSE); - control_builder_init (stream, &builder); - priv->port_info.params = params; priv->port_info.n_params = n_params; - /* send update port status */ - add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO); - - /* send state-change */ - if (priv->format) - add_state_change (stream, &builder, SPA_NODE_STATE_READY); + control_builder_init (stream, &builder); + if (SPA_RESULT_IS_OK (res)) { + add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO | + SPA_CONTROL_CMD_PORT_UPDATE_FORMAT); + if (priv->format) { + add_state_change (stream, &builder, SPA_NODE_STATE_READY); + } else { + clear_buffers (stream); + add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); + } + } + add_async_complete (stream, &builder, priv->pending_seq, res); spa_control_builder_end (&builder, &control); + priv->pending_seq = SPA_ID_INVALID; + if (spa_control_write (&control, priv->fd) < 0) g_warning ("stream %p: error writing control", stream); @@ -1659,7 +1723,7 @@ pinos_stream_recycle_buffer (PinosStream *stream, priv = stream->priv; g_return_val_if_fail (priv->direction == PINOS_DIRECTION_INPUT, FALSE); - send_reuse_buffer (stream, 0, id); + send_reuse_buffer (stream, priv->port_id, id); return TRUE; } @@ -1715,7 +1779,7 @@ pinos_stream_send_buffer (PinosStream *stream, if ((bid = find_buffer (stream, id))) { bid->used = TRUE; - send_process_buffer (stream, 0, id); + send_process_buffer (stream, priv->port_id, id); return TRUE; } else { return FALSE; diff --git a/pinos/client/stream.h b/pinos/client/stream.h index 3e2481b3a..1ac07ff29 100644 --- a/pinos/client/stream.h +++ b/pinos/client/stream.h @@ -108,7 +108,8 @@ gboolean pinos_stream_connect (PinosStream *stream, GPtrArray *possible_formats); gboolean pinos_stream_disconnect (PinosStream *stream); -gboolean pinos_stream_start_allocation (PinosStream *stream, +gboolean pinos_stream_finish_format (PinosStream *stream, + SpaResult res, SpaAllocParam **params, unsigned int n_params); diff --git a/pinos/gst/gstpinospool.c b/pinos/gst/gstpinospool.c index aa0aaa1a1..2019bb11d 100644 --- a/pinos/gst/gstpinospool.c +++ b/pinos/gst/gstpinospool.c @@ -124,7 +124,7 @@ do_start (GstBufferPool * pool) param_meta_enable.param.size = sizeof (SpaAllocParamMetaEnable); param_meta_enable.type = SPA_META_TYPE_HEADER; - pinos_stream_start_allocation (p->stream, port_params, 2); + pinos_stream_finish_format (p->stream, SPA_RESULT_OK, port_params, 2); return TRUE; } diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 17b8babad..683a89591 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -481,6 +481,33 @@ on_format_notify (GObject *gobject, GParamSpec *pspec, gpointer user_data) { + GstPinosSink *pinossink = user_data; + GstStructure *config; + GstCaps *caps; + guint size; + guint min_buffers; + guint max_buffers; + SpaAllocParam *port_params[2]; + SpaAllocParamMetaEnable param_meta_enable; + SpaAllocParamBuffers param_buffers; + + config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pinossink->pool)); + gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + + port_params[0] = ¶m_buffers.param; + param_buffers.param.type = SPA_ALLOC_PARAM_TYPE_BUFFERS; + param_buffers.param.size = sizeof (SpaAllocParamBuffers); + param_buffers.minsize = size; + param_buffers.stride = 0; + param_buffers.min_buffers = min_buffers; + param_buffers.max_buffers = max_buffers; + param_buffers.align = 16; + port_params[1] = ¶m_meta_enable.param; + param_meta_enable.param.type = SPA_ALLOC_PARAM_TYPE_META_ENABLE; + param_meta_enable.param.size = sizeof (SpaAllocParamMetaEnable); + param_meta_enable.type = SPA_META_TYPE_HEADER; + + pinos_stream_finish_format (pinossink->stream, SPA_RESULT_OK, port_params, 2); } static gboolean diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 6f8bd1daf..c828b6b5c 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -718,14 +718,19 @@ on_format_notify (GObject *gobject, GstPinosSrc *pinossrc = user_data; SpaFormat *format; GstCaps *caps; + gboolean res; g_object_get (gobject, "format", &format, NULL); caps = gst_caps_from_format (format); - gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); + res = gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); gst_caps_unref (caps); - pinos_stream_start_allocation (pinossrc->stream, NULL, 0); + if (res) { + pinos_stream_finish_format (pinossrc->stream, SPA_RESULT_OK, NULL, 0); + } else { + pinos_stream_finish_format (pinossrc->stream, SPA_RESULT_INVALID_MEDIA_TYPE, NULL, 0); + } } static gboolean diff --git a/pinos/server/link.c b/pinos/server/link.c index 9309d895d..3ead648e1 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -34,7 +34,7 @@ #define PINOS_LINK_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_LINK, PinosLinkPrivate)) -#define MAX_BUFFERS 64 +#define MAX_BUFFERS 16 struct _PinosLinkPrivate { @@ -498,7 +498,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) } g_debug ("allocated out_buffers %p from output port", priv->out_buffers); } - else if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { + if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { g_debug ("using out_buffers %p on input port", priv->out_buffers); if ((res = spa_node_port_use_buffers (this->input_node->node, this->input_port, priv->out_buffers, priv->n_out_buffers)) < 0) { @@ -563,6 +563,7 @@ check_states (PinosLink *this) SpaResult res; SpaNodeState in_state, out_state; +again: in_state = this->input_node->node->state; out_state = this->output_node->node->state; @@ -577,9 +578,21 @@ check_states (PinosLink *this) if ((res = do_start (this, in_state, out_state)) < 0) return res; + if (this->input_node->node->state != in_state) + goto again; + if (this->output_node->node->state != out_state) + goto again; + return SPA_RESULT_OK; } +static gboolean +do_check_states (PinosLink *this) +{ + check_states (this); + return G_SOURCE_REMOVE; +} + static void on_node_state_notify (GObject *obj, GParamSpec *pspec, @@ -588,7 +601,7 @@ on_node_state_notify (GObject *obj, PinosLink *this = user_data; g_debug ("link %p: node %p state change", this, obj); - check_states (this); + g_idle_add ((GSourceFunc) do_check_states, this); } static void diff --git a/pinos/server/node.c b/pinos/server/node.c index 044059202..f07dcd45d 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -311,7 +311,6 @@ suspend_node (PinosNode *this) if ((res = spa_node_port_set_format (this->node, 0, 0, NULL)) < 0) g_warning ("error unset format output: %d", res); - } static void @@ -405,6 +404,23 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) PinosNodePrivate *priv = this->priv; switch (event->type) { + case SPA_NODE_EVENT_TYPE_INVALID: + case SPA_NODE_EVENT_TYPE_DRAINED: + case SPA_NODE_EVENT_TYPE_MARKER: + case SPA_NODE_EVENT_TYPE_ERROR: + case SPA_NODE_EVENT_TYPE_BUFFERING: + case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: + break; + + case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: + { + SpaNodeEventAsyncComplete *ac = event->data; + g_debug ("async complete %u %d", ac->seq, ac->res); + if (SPA_RESULT_IS_OK (ac->res)) + g_object_notify (G_OBJECT (this), "node-state"); + break; + } + case SPA_NODE_EVENT_TYPE_PORT_ADDED: { SpaNodeEventPortAdded *pa = event->data; @@ -432,17 +448,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) g_main_context_invoke (NULL, (GSourceFunc) do_signal_port_removed, data); break; } - case SPA_NODE_EVENT_TYPE_STATE_CHANGE: - { - SpaNodeEventStateChange *sc = event->data; - - g_debug ("node %p: update SPA state to %d", this, sc->state); - if (sc->state == SPA_NODE_STATE_CONFIGURE) { - update_port_ids (this, FALSE); - } - g_object_notify (G_OBJECT (this), "node-state"); - break; - } case SPA_NODE_EVENT_TYPE_ADD_POLL: { SpaPollItem *poll = event->data; @@ -499,10 +504,13 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: { + SpaNodeEventHaveOutput *ho = event->data; SpaPortOutputInfo oinfo[1] = { 0, }; SpaResult res; guint i; + oinfo[0].port_id = ho->port_id; + if ((res = spa_node_port_pull_output (node, 1, oinfo)) < 0) { g_warning ("node %p: got pull error %d, %d", this, res, oinfo[0].status); break; @@ -554,10 +562,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: send_clock_update (this); break; - - default: - g_debug ("node %p: got event %d", this, event->type); - break; } } diff --git a/spa/include/spa/control.h b/spa/include/spa/control.h index 664702992..f56850162 100644 --- a/spa/include/spa/control.h +++ b/spa/include/spa/control.h @@ -35,6 +35,7 @@ typedef struct _SpaControlBuilder SpaControlBuilder; #include #include #include +#include struct _SpaControl { size_t x[16]; @@ -59,8 +60,9 @@ typedef enum { SPA_CONTROL_CMD_NODE_UPDATE = 1, SPA_CONTROL_CMD_PORT_UPDATE = 2, SPA_CONTROL_CMD_PORT_REMOVED = 3, + SPA_CONTROL_CMD_NODE_STATE_CHANGE = 4, - SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 4, + SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 5, /* server to client */ SPA_CONTROL_CMD_ADD_PORT = 32, @@ -92,16 +94,17 @@ typedef struct { const SpaProps *props; } SpaControlCmdNodeUpdate; - /* SPA_CONTROL_CMD_PORT_UPDATE */ typedef struct { uint32_t port_id; #define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 0) -#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 1) -#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 2) +#define SPA_CONTROL_CMD_PORT_UPDATE_FORMAT (1 << 1) +#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 2) +#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 3) uint32_t change_mask; unsigned int n_possible_formats; SpaFormat **possible_formats; + SpaFormat *format; const SpaProps *props; const SpaPortInfo *info; } SpaControlCmdPortUpdate; @@ -113,25 +116,34 @@ typedef struct { /* SPA_CONTROL_CMD_PORT_STATUS_CHANGE */ +/* SPA_CONTROL_CMD_NODE_STATE_CHANGE */ +typedef struct { + SpaNodeState state; +} SpaControlCmdNodeStateChange; + /* SPA_CONTROL_CMD_ADD_PORT */ typedef struct { - uint32_t port_id; + uint32_t seq; + uint32_t port_id; } SpaControlCmdAddPort; /* SPA_CONTROL_CMD_REMOVE_PORT */ typedef struct { + uint32_t seq; uint32_t port_id; } SpaControlCmdRemovePort; /* SPA_CONTROL_CMD_SET_FORMAT */ typedef struct { + uint32_t seq; uint32_t port_id; SpaFormat *format; } SpaControlCmdSetFormat; /* SPA_CONTROL_CMD_SET_PROPERTY */ typedef struct { + uint32_t seq; uint32_t port_id; uint32_t id; size_t size; @@ -140,11 +152,13 @@ typedef struct { /* SPA_CONTROL_CMD_NODE_COMMAND */ typedef struct { + uint32_t seq; SpaNodeCommand *command; } SpaControlCmdNodeCommand; /* SPA_CONTROL_CMD_ADD_MEM */ typedef struct { + uint32_t seq; uint32_t port_id; SpaMemoryRef mem; uint32_t mem_type; @@ -155,12 +169,14 @@ typedef struct { /* SPA_CONTROL_CMD_REMOVE_MEM */ typedef struct { + uint32_t seq; uint32_t port_id; SpaMemoryRef mem; } SpaControlCmdRemoveMem; /* SPA_CONTROL_CMD_USE_BUFFERS */ typedef struct { + uint32_t seq; uint32_t port_id; unsigned int n_buffers; SpaBuffer **buffers; @@ -192,7 +208,10 @@ SpaResult spa_control_iter_end (SpaControlIter *iter); SpaControlCmd spa_control_iter_get_cmd (SpaControlIter *iter); void * spa_control_iter_get_data (SpaControlIter *iter, - size_t *size); + size_t *size); +SpaResult spa_control_iter_set_data (SpaControlIter *iter, + void *data, + size_t size); SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, void *command); diff --git a/spa/include/spa/defs.h b/spa/include/spa/defs.h index beb1bc16f..5c635c62b 100644 --- a/spa/include/spa/defs.h +++ b/spa/include/spa/defs.h @@ -30,6 +30,8 @@ extern "C" { #include typedef enum { + SPA_RESULT_ASYNC = (1 << 30), + SPA_RESULT_MODIFIED = 1, SPA_RESULT_OK = 0, SPA_RESULT_ERROR = -1, SPA_RESULT_INACTIVE = -2, @@ -59,8 +61,17 @@ typedef enum { SPA_RESULT_NO_BUFFERS = -27, SPA_RESULT_INVALID_BUFFER_ID = -28, SPA_RESULT_WRONG_STATE = -29, + SPA_RESULT_ASYNC_BUSY = -30, } SpaResult; +#define SPA_RESULT_IS_OK(res) ((res) >= 0) +#define SPA_RESULT_IS_ERROR(res) ((res) < 0) +#define SPA_RESULT_IS_ASYNC(res) (((res) & SPA_RESULT_ASYNC) == SPA_RESULT_ASYNC) + +#define SPA_ASYNC_SEQ_MASK (SPA_RESULT_ASYNC - 1) +#define SPA_RESULT_ASYNC_SEQ(res) ((res) & SPA_ASYNC_SEQ_MASK) +#define SPA_RESULT_RETURN_ASYNC(seq) (SPA_RESULT_ASYNC | ((seq) & SPA_ASYNC_SEQ_MASK)) + typedef void (*SpaNotify) (void *data); #define SPA_N_ELEMENTS(arr) (sizeof (arr) / sizeof ((arr)[0])) diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 486c64d70..6d1762123 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -33,9 +33,9 @@ typedef struct _SpaNodeEvent SpaNodeEvent; /** * SpaEventType: * @SPA_NODE_EVENT_TYPE_INVALID: invalid event, should be ignored + * @SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: an async operation completed * @SPA_NODE_EVENT_TYPE_PORT_ADDED: a new port is added * @SPA_NODE_EVENT_TYPE_PORT_REMOVED: a port is removed - * @SPA_NODE_EVENT_TYPE_STATE_CHANGE: emited when the state changes * @SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: emited when an async node has output that can be pulled * @SPA_NODE_EVENT_TYPE_NEED_INPUT: emited when more data can be pushed to an async node * @SPA_NODE_EVENT_TYPE_REUSE_BUFFER: emited when a buffer can be reused @@ -50,9 +50,9 @@ typedef struct _SpaNodeEvent SpaNodeEvent; */ typedef enum { SPA_NODE_EVENT_TYPE_INVALID = 0, + SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE, SPA_NODE_EVENT_TYPE_PORT_ADDED, SPA_NODE_EVENT_TYPE_PORT_REMOVED, - SPA_NODE_EVENT_TYPE_STATE_CHANGE, SPA_NODE_EVENT_TYPE_HAVE_OUTPUT, SPA_NODE_EVENT_TYPE_NEED_INPUT, SPA_NODE_EVENT_TYPE_REUSE_BUFFER, @@ -73,6 +73,11 @@ struct _SpaNodeEvent { size_t size; }; +typedef struct { + uint32_t seq; + SpaResult res; +} SpaNodeEventAsyncComplete; + typedef struct { uint32_t port_id; } SpaNodeEventPortAdded; diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index 332d93105..c44d2dc79 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -215,10 +215,13 @@ struct _SpaNode { * * Send a command to @node. * + * Upon completion, a command might change the state of a node. + * * Returns: #SPA_RESULT_OK on success * #SPA_RESULT_INVALID_ARGUMENTS when node or command is %NULL * #SPA_RESULT_NOT_IMPLEMENTED when this node can't process commands * #SPA_RESULT_INVALID_COMMAND @command is an invalid command + * #SPA_RESULT_ASYNC @command is executed asynchronously */ SpaResult (*send_command) (SpaNode *node, SpaNodeCommand *command); @@ -339,14 +342,20 @@ struct _SpaNode { * * This function takes a copy of the format. * + * Upon completion, this function might change the state of a node to + * the READY state or to CONFIGURE when @format is NULL. + * * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_OK_RECHECK on success * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_PORT when port_id is not valid * #SPA_RESULT_INVALID_MEDIA_TYPE when the media type is not valid * #SPA_RESULT_INVALID_FORMAT_PROPERTIES when one of the mandatory format - * properties is not specified. + * properties is not specified and #SPA_PORT_FORMAT_FLAG_FIXATE was + * not set in @flags. * #SPA_RESULT_WRONG_PROPERTY_TYPE when the type or size of a property * is not correct. + * #SPA_RESULT_ASYNC the function is executed asynchronously */ SpaResult (*port_set_format) (SpaNode *node, uint32_t port_id, @@ -401,7 +410,12 @@ struct _SpaNode { * Passing %NULL as @buffers will remove the reference that the port has * on the buffers. * + * Upon completion, this function might change the state of the + * node to PAUSED, when the node has enough buffers, or READY when + * @buffers are %NULL. + * * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_ASYNC the function is executed asynchronously */ SpaResult (*port_use_buffers) (SpaNode *node, uint32_t port_id, diff --git a/spa/lib/control.c b/spa/lib/control.c index 81aa463fc..594754dc9 100644 --- a/spa/lib/control.c +++ b/spa/lib/control.c @@ -418,6 +418,8 @@ iter_parse_port_update (struct stack_iter *si, SpaControlCmdPortUpdate *pu) pu->possible_formats[i] = parse_format (p, si->size, SPA_PTR_TO_INT (pu->possible_formats[i])); } + if (pu->format) + pu->format = parse_format (p, si->size, SPA_PTR_TO_INT (pu->format)); if (pu->props) pu->props = parse_props (p, SPA_PTR_TO_INT (pu->props)); @@ -484,6 +486,26 @@ iter_parse_node_command (struct stack_iter *si, SpaControlCmdNodeCommand *cmd) cmd->command->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command->data), void); } +SpaResult +spa_control_iter_set_data (SpaControlIter *iter, + void *data, + size_t size) +{ + struct stack_iter *si = SCSI (iter); + SpaResult res = SPA_RESULT_OK; + + if (!is_valid_iter (iter)) + return SPA_RESULT_INVALID_ARGUMENTS; + + if (si->size > size) + return SPA_RESULT_INVALID_ARGUMENTS; + + si->size = size; + si->data = data; + + return SPA_RESULT_OK; +} + SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, void *command) @@ -514,6 +536,12 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, fprintf (stderr, "implement iter of %d\n", si->cmd); break; + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + if (si->size < sizeof (SpaControlCmdNodeStateChange)) + return SPA_RESULT_ERROR; + memcpy (command, si->data, sizeof (SpaControlCmdNodeStateChange)); + break; + /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: if (si->size < sizeof (SpaControlCmdAddPort)) @@ -572,7 +600,6 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, return res; } - struct stack_builder { size_t magic; @@ -945,8 +972,8 @@ write_format (void *p, const SpaFormat *format) tf = p; tf->media_type = format->media_type; tf->media_subtype = format->media_subtype; - tf->mem.mem.pool_id = SPA_ID_INVALID; - tf->mem.mem.id = SPA_ID_INVALID; + tf->mem.mem.pool_id = 0; + tf->mem.mem.id = 0; tf->mem.offset = 0; tf->mem.size = 0; @@ -1025,6 +1052,7 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) len += pu->n_possible_formats * sizeof (SpaFormat *); for (i = 0; i < pu->n_possible_formats; i++) len += calc_format_len (pu->possible_formats[i]); + len += calc_format_len (pu->format); len += calc_props_len (pu->props); if (pu->info) { len += sizeof (SpaPortInfo); @@ -1051,6 +1079,13 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) bfa[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); p = SPA_MEMBER (p, len, void); } + if (pu->format) { + len = write_format (p, pu->format); + d->format = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } else { + d->format = 0; + } if (pu->props) { len = write_props (p, pu->props, sizeof (SpaProps)); d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); @@ -1066,6 +1101,7 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) d->info = 0; } } + static void builder_add_set_format (struct stack_builder *sb, SpaControlCmdSetFormat *sf) { @@ -1218,6 +1254,11 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, p = builder_add_cmd (sb, cmd, 0); break; + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdNodeStateChange)); + memcpy (p, command, sizeof (SpaControlCmdNodeStateChange)); + break; + /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddPort)); @@ -1265,7 +1306,6 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, builder_add_node_command (sb, command); break; - default: case SPA_CONTROL_CMD_INVALID: return SPA_RESULT_INVALID_ARGUMENTS; } diff --git a/spa/lib/format.c b/spa/lib/format.c index 0b572b775..4223c1181 100644 --- a/spa/lib/format.c +++ b/spa/lib/format.c @@ -36,50 +36,10 @@ spa_format_to_string (const SpaFormat *format, char **result) SpaResult spa_format_fixate (SpaFormat *format) { -#if 0 - unsigned int i, j; - SpaProps *props; - uint32_t mask; -#endif - if (format == NULL) return SPA_RESULT_INVALID_ARGUMENTS; format->props.unset_mask = 0; -#if 0 - props = &format->props; - mask = props->unset_mask; - - for (i = 0; i < props->n_prop_info; i++) { - if (mask & 1) { - const SpaPropInfo *pi = &props->prop_info[i]; - - switch (pi->range_type) { - case SPA_PROP_RANGE_TYPE_NONE: - break; - case SPA_PROP_RANGE_TYPE_MIN_MAX: - break; - case SPA_PROP_RANGE_TYPE_STEP: - break; - case SPA_PROP_RANGE_TYPE_ENUM: - { - for (j = 0; j < pi->n_range_values; j++) { - const SpaPropRangeInfo *ri = &pi->range_values[j]; - memcpy (SPA_MEMBER (props, pi->offset, void), ri->value, ri->size); - SPA_PROPS_INDEX_SET (props, i); - break; - } - break; - } - case SPA_PROP_RANGE_TYPE_FLAGS: - break; - default: - break; - } - } - mask >>= 1; - } -#endif return SPA_RESULT_OK; } diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 9cbe1b16e..d799c8c46 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -45,19 +45,7 @@ reset_alsa_sink_props (SpaALSAProps *props) static void update_state (SpaALSASink *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); } static const uint32_t min_uint32 = 1; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 89383b573..5bd83f6bb 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -32,19 +32,7 @@ typedef struct _SpaALSAState SpaALSASource; static void update_state (SpaALSASource *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); } static const char default_device[] = "hw:0"; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index a905e6f1d..f53408305 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -131,6 +131,12 @@ spa_audiomixer_node_set_props (SpaNode *node, return res; } +static void +update_state (SpaAudioMixer *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_audiomixer_node_send_command (SpaNode *node, SpaNodeCommand *command) @@ -147,31 +153,11 @@ spa_audiomixer_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 1e1ec8e75..b53f1448a 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -296,21 +296,7 @@ audiotestsrc_on_output (SpaPollNotifyData *data) static void update_state (SpaAudioTestSrc *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - if (this->event_cb) { - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); - } } static SpaResult diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index 7012fefce..5f0c0f6a6 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -127,6 +127,12 @@ spa_ffmpeg_dec_node_set_props (SpaNode *node, return res; } +static void +update_state (SpaFFMpegDec *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_ffmpeg_dec_node_send_command (SpaNode *node, SpaNodeCommand *command) @@ -143,30 +149,11 @@ spa_ffmpeg_dec_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; + case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index 2d97eb943..f683e55e2 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -86,6 +86,12 @@ static const SpaPropInfo prop_info[] = { 0, }, }; +static void +update_state (SpaFFMpegEnc *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_ffmpeg_enc_node_get_props (SpaNode *node, SpaProps **props) @@ -143,30 +149,11 @@ spa_ffmpeg_enc_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; + case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/remote/proxy.c b/spa/plugins/remote/proxy.c index ae4b26cf7..577cfd9c0 100644 --- a/spa/plugins/remote/proxy.c +++ b/spa/plugins/remote/proxy.c @@ -78,6 +78,8 @@ struct _SpaProxy { unsigned int max_outputs; unsigned int n_outputs; SpaProxyPort ports[MAX_PORTS]; + + uint32_t seq; }; enum { @@ -101,11 +103,12 @@ reset_proxy_props (SpaProxyProps *props) props->socketfd = -1; } -static void +static SpaResult update_poll (SpaProxy *this, int socketfd) { SpaNodeEvent event; SpaProxyProps *p; + SpaResult res = SPA_RESULT_OK; p = &this->props[1]; @@ -124,6 +127,21 @@ update_poll (SpaProxy *this, int socketfd) event.size = sizeof (this->poll); this->event_cb (&this->node, &event, this->user_data); } + return res; +} + +static void +send_async_complete (SpaProxy *this, uint32_t seq, SpaResult res) +{ + SpaNodeEvent event; + SpaNodeEventAsyncComplete ac; + + event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + event.data = ∾ + event.size = sizeof (ac); + ac.seq = seq; + ac.res = res; + this->event_cb (&this->node, &event, this->user_data); } static SpaResult @@ -169,7 +187,7 @@ spa_proxy_node_set_props (SpaNode *node, /* compare changes */ if (op->socketfd != np->socketfd) - update_poll (this, np->socketfd); + res = update_poll (this, np->socketfd); /* commit changes */ memcpy (op, np, sizeof (*np)); @@ -182,7 +200,7 @@ spa_proxy_node_send_command (SpaNode *node, SpaNodeCommand *command) { SpaProxy *this; - SpaResult res; + SpaResult res = SPA_RESULT_OK; if (node == NULL || node->handle == NULL || command == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -194,49 +212,31 @@ spa_proxy_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - { - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; - SpaControlCmdNodeCommand cnc; - - /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - cnc.command = command; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - spa_control_builder_end (&builder, &control); - - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - fprintf (stderr, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); - break; - } - case SPA_NODE_COMMAND_PAUSE: - { - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; - SpaControlCmdNodeCommand cnc; - - /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - cnc.command = command; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - spa_control_builder_end (&builder, &control); - - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - fprintf (stderr, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); - break; - } - case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: - return SPA_RESULT_NOT_IMPLEMENTED; + { + SpaControlBuilder builder; + SpaControl control; + uint8_t buf[128]; + SpaControlCmdNodeCommand cnc; + + /* send start */ + spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + cnc.seq = this->seq++; + cnc.command = command; + spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); + spa_control_builder_end (&builder, &control); + + if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) + fprintf (stderr, "proxy %p: error writing control %d\n", this, res); + + spa_control_clear (&control); + + res = SPA_RESULT_RETURN_ASYNC (cnc.seq); + break; + } case SPA_NODE_COMMAND_CLOCK_UPDATE: { @@ -258,7 +258,7 @@ spa_proxy_node_send_command (SpaNode *node, break; } } - return SPA_RESULT_OK; + return res; } static SpaResult @@ -502,7 +502,7 @@ spa_proxy_node_port_set_format (SpaNode *node, if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; - this = (SpaProxy *) node->handle; + this = (SpaProxy *) node->handle; if (!CHECK_PORT_ID (this, port_id)) return SPA_RESULT_INVALID_PORT; @@ -510,6 +510,7 @@ spa_proxy_node_port_set_format (SpaNode *node, port = &this->ports[port_id]; spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + sf.seq = this->seq++; sf.port_id = port_id; sf.format = (SpaFormat *) format; spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_SET_FORMAT, &sf); @@ -520,13 +521,9 @@ spa_proxy_node_port_set_format (SpaNode *node, spa_control_clear (&control); - if (port->format) - spa_format_unref (port->format); - if (format) - spa_format_ref ((SpaFormat *) format); - port->format = (SpaFormat *)format; + port->format = format; - return SPA_RESULT_OK; + return SPA_RESULT_RETURN_ASYNC (sf.seq); } static SpaResult @@ -705,7 +702,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, if (port->n_buffers == n_buffers && port->buffers == buffers) return SPA_RESULT_OK; - spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds)); + spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, SPA_N_ELEMENTS (fds)); if (buffers == NULL || n_buffers == 0) { port->buffers = NULL; @@ -717,6 +714,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, for (i = 0; i < port->n_buffers; i++) add_buffer_mem (this, &builder, port_id, port->buffers[i]); + ub.seq = this->seq++; ub.port_id = port_id; ub.n_buffers = port->n_buffers; ub.buffers = port->buffers; @@ -729,7 +727,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, spa_control_clear (&control); - return SPA_RESULT_OK; + return SPA_RESULT_RETURN_ASYNC (ub.seq); } static SpaResult @@ -885,6 +883,7 @@ spa_proxy_node_port_pull_output (SpaNode *node, for (i = 0; i < n_info; i++) { if (!CHECK_PORT_ID_OUT (this, info[i].port_id)) { + fprintf (stderr, "invalid port %u\n", info[i].port_id); info[i].status = SPA_RESULT_INVALID_PORT; have_error = true; continue; @@ -933,23 +932,11 @@ handle_node_event (SpaProxy *this, { switch (event->type) { case SPA_NODE_EVENT_TYPE_INVALID: + break; + case SPA_NODE_EVENT_TYPE_PORT_ADDED: case SPA_NODE_EVENT_TYPE_PORT_REMOVED: - this->event_cb (&this->node, event, this->user_data); - break; - - case SPA_NODE_EVENT_TYPE_STATE_CHANGE: - { - SpaNodeEventStateChange *sc = event->data; - - fprintf (stderr, "proxy %p: got state-change to %d\n", this, sc->state); - if (this->node.state != sc->state) { - this->node.state = sc->state; - this->event_cb (&this->node, event, this->user_data); - } - break; - } - + case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: @@ -980,6 +967,7 @@ parse_control (SpaProxy *this, SpaControlCmd cmd = spa_control_iter_get_cmd (&it); switch (cmd) { + case SPA_CONTROL_CMD_INVALID: case SPA_CONTROL_CMD_ADD_PORT: case SPA_CONTROL_CMD_REMOVE_PORT: case SPA_CONTROL_CMD_SET_FORMAT: @@ -999,10 +987,14 @@ parse_control (SpaProxy *this, if (spa_control_iter_parse_cmd (&it, &nu) < 0) break; - this->max_inputs = nu.max_input_ports; - this->max_outputs = nu.max_output_ports; - fprintf (stderr, "proxy %p: got node update %d, %u, %u\n", this, cmd, + if (nu.change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS) + this->max_inputs = nu.max_input_ports; + if (nu.change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS) + this->max_outputs = nu.max_output_ports; + + fprintf (stderr, "proxy %p: got node update %d, max_in %u, max_out %u\n", this, cmd, this->max_inputs, this->max_outputs); + break; } @@ -1010,13 +1002,24 @@ parse_control (SpaProxy *this, { SpaControlCmdPortUpdate pu; bool remove; + SpaMemory *mem; + void *data; + size_t size; + + data = spa_control_iter_get_data (&it, &size); + mem = spa_memory_alloc_size (SPA_MEMORY_POOL_LOCAL, data, size); + spa_control_iter_set_data (&it, spa_memory_ensure_ptr (mem), size); fprintf (stderr, "proxy %p: got port update %d\n", this, cmd); - if (spa_control_iter_parse_cmd (&it, &pu) < 0) + if (spa_control_iter_parse_cmd (&it, &pu) < 0) { + spa_memory_unref (&mem->mem); break; + } - if (pu.port_id >= MAX_PORTS) + if (pu.port_id >= MAX_PORTS) { + spa_memory_unref (&mem->mem); break; + } remove = (pu.change_mask == 0); @@ -1033,6 +1036,24 @@ parse_control (SpaProxy *this, fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd); break; } + + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + { + SpaControlCmdNodeStateChange sc; + SpaNodeState old = this->node.state; + + if (spa_control_iter_parse_cmd (&it, &sc) < 0) + break; + + this->node.state = sc.state; + if (old == SPA_NODE_STATE_INIT) + send_async_complete (this, 0, SPA_RESULT_OK); + + fprintf (stderr, "proxy %p: got node state change %d\n", this, this->node.state); + + break; + } + case SPA_CONTROL_CMD_ADD_MEM: break; case SPA_CONTROL_CMD_REMOVE_MEM: @@ -1065,9 +1086,6 @@ parse_control (SpaProxy *this, handle_node_event (this, cne.event); break; } - default: - fprintf (stderr, "proxy %p: command unhandled %d\n", this, cmd); - break; } } spa_control_iter_end (&it); @@ -1184,7 +1202,7 @@ proxy_init (const SpaHandleFactory *factory, this->poll.after_cb = proxy_on_fd_events; this->poll.user_data = this; - return SPA_RESULT_OK; + return SPA_RESULT_RETURN_ASYNC (this->seq++); } static const SpaInterfaceInfo proxy_interfaces[] = diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 061fc6352..ea7d61765 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -137,19 +137,7 @@ struct _SpaV4l2Source { static void update_state (SpaV4l2Source *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); } #include "v4l2-utils.c" diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 457e2cdc0..efdff82db 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -111,6 +111,7 @@ spa_v4l2_clear_buffers (SpaV4l2Source *this) if (state->alloc_mem) spa_memory_unref (&state->alloc_mem->mem); + state->alloc_mem = NULL; state->have_buffers = false; return SPA_RESULT_OK; diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index fbeb4ef9f..cae68705e 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -244,21 +244,7 @@ videotestsrc_on_output (SpaPollNotifyData *data) static void update_state (SpaVideoTestSrc *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - if (this->event_cb) { - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); - } } static SpaResult diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 7bbe3d271..f18844b58 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -96,6 +96,12 @@ reset_volume_props (SpaVolumeProps *props) props->mute = default_mute; } +static void +update_state (SpaVolume *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_volume_node_get_props (SpaNode *node, SpaProps **props) @@ -152,31 +158,11 @@ spa_volume_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/xv/xv-sink.c b/spa/plugins/xv/xv-sink.c index fe30dc130..4db0e44bc 100644 --- a/spa/plugins/xv/xv-sink.c +++ b/spa/plugins/xv/xv-sink.c @@ -117,6 +117,12 @@ static const SpaPropInfo prop_info[] = NULL }, }; +static void +update_state (SpaXvSink *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_xv_sink_node_get_props (SpaNode *node, SpaProps **props) @@ -176,32 +182,12 @@ spa_xv_sink_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_START: spa_xv_start (this); - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; case SPA_NODE_COMMAND_PAUSE: spa_xv_stop (this); - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: