Add support for async results

Add an async result code and an event to signal the completion.
Use async return values to signal completion of a method and potential
state change.
Add selected format to port update message.
Make it possible to parse into a custom copy of the command memory.
Remove state change events from the elements, we now just update the
state.
Implement async results in the proxy element
Add support for removing buffers in the client.
Fix up pinossink
Deal with async return in the links.
This commit is contained in:
Wim Taymans 2016-09-22 08:55:30 +02:00
parent 27acab7532
commit 68148188fa
25 changed files with 456 additions and 406 deletions

View file

@ -39,6 +39,8 @@
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 32
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
typedef struct {
bool cleanup;
@ -47,13 +49,6 @@ typedef struct {
SpaBuffer *buf;
} BufferId;
static void
clear_buffer_id (BufferId *id)
{
spa_memory_unref (&id->buf->mem.mem);
id->buf = NULL;
}
struct _PinosStreamPrivate
{
PinosContext *context;
@ -68,9 +63,12 @@ struct _PinosStreamPrivate
PinosDirection direction;
gchar *path;
SpaNodeState node_state;
GPtrArray *possible_formats;
SpaFormat *format;
SpaPortInfo port_info;
uint32_t port_id;
uint32_t pending_seq;
PinosStreamFlags flags;
@ -126,6 +124,23 @@ enum
static guint signals[LAST_SIGNAL] = { 0 };
static void
clear_buffers (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
guint i;
for (i = 0; i < priv->buffer_ids->len; i++) {
BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i);
g_signal_emit (stream, signals[SIGNAL_REMOVE_BUFFER], 0, bid->id);
spa_memory_unref (&bid->buf->mem.mem);
bid->buf = NULL;
}
g_array_set_size (priv->buffer_ids, 0);
priv->in_order = TRUE;
}
static void
pinos_stream_get_property (GObject *_object,
guint prop_id,
@ -491,9 +506,9 @@ pinos_stream_init (PinosStream * stream)
g_debug ("new stream %p", stream);
priv->state = PINOS_STREAM_STATE_UNCONNECTED;
priv->node_state = SPA_NODE_STATE_INIT;
priv->buffer_ids = g_array_sized_new (FALSE, FALSE, sizeof (BufferId), 64);
g_array_set_clear_func (priv->buffer_ids, (GDestroyNotify) clear_buffer_id);
priv->in_order = TRUE;
priv->pending_seq = SPA_ID_INVALID;
}
/**
@ -611,39 +626,37 @@ add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_UPDATE, &nu);
}
static void
add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state)
{
PinosStreamPrivate *priv = stream->priv;
SpaControlCmdNodeStateChange sc;
sc.state = priv->node_state = state;
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_STATE_CHANGE, &sc);
}
static void
add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask)
{
PinosStreamPrivate *priv = stream->priv;
SpaControlCmdPortUpdate pu = { 0, };;
pu.port_id = 0;
pu.port_id = priv->port_id;
pu.change_mask = change_mask;
if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS) {
pu.n_possible_formats = priv->possible_formats->len;
pu.possible_formats = (SpaFormat **)priv->possible_formats->pdata;
}
if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_FORMAT) {
pu.format = priv->format;
}
pu.props = NULL;
if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO)
pu.info = &priv->port_info;
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu);
}
static void
add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state)
{
SpaControlCmdNodeEvent cne;
SpaNodeEvent ne;
SpaNodeEventStateChange sc;
cne.event = &ne;
ne.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE;
ne.data = &sc;
ne.size = sizeof (sc);
sc.state = state;
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne);
}
static void
add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id)
{
@ -693,6 +706,25 @@ add_request_clock_update (PinosStream *stream, SpaControlBuilder *builder)
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne);
}
static void
add_async_complete (PinosStream *stream,
SpaControlBuilder *builder,
uint32_t seq,
SpaResult res)
{
SpaControlCmdNodeEvent cne;
SpaNodeEvent ne;
SpaNodeEventAsyncComplete ac;
cne.event = &ne;
ne.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
ne.data = &ac;
ne.size = sizeof (ac);
ac.seq = seq;
ac.res = res;
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne);
}
static void
send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id)
{
@ -751,6 +783,30 @@ send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id)
spa_control_clear (&control);
}
static void
do_node_init (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
SpaControlBuilder builder;
SpaControl control;
control_builder_init (stream, &builder);
add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS |
SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS);
priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS |
SPA_CONTROL_CMD_PORT_UPDATE_INFO);
add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
spa_control_clear (&control);
}
static BufferId *
find_buffer (PinosStream *stream, uint32_t id)
{
@ -779,9 +835,9 @@ handle_node_event (PinosStream *stream,
case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_PORT_ADDED:
case SPA_NODE_EVENT_TYPE_PORT_REMOVED:
case SPA_NODE_EVENT_TYPE_STATE_CHANGE:
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
g_warning ("unhandled node event %d", event->type);
break;
@ -790,7 +846,7 @@ handle_node_event (PinosStream *stream,
SpaNodeEventReuseBuffer *p = event->data;
BufferId *bid;
if (p->port_id != 0)
if (p->port_id != priv->port_id)
break;
if (priv->direction != PINOS_DIRECTION_OUTPUT)
break;
@ -818,6 +874,7 @@ handle_node_event (PinosStream *stream,
static gboolean
handle_node_command (PinosStream *stream,
uint32_t seq,
SpaNodeCommand *command)
{
PinosStreamPrivate *priv = stream->priv;
@ -834,6 +891,7 @@ handle_node_command (PinosStream *stream,
control_builder_init (stream, &builder);
add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED);
add_async_complete (stream, &builder, seq, SPA_RESULT_OK);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
@ -852,8 +910,9 @@ handle_node_command (PinosStream *stream,
g_debug ("stream %p: start", stream);
control_builder_init (stream, &builder);
if (priv->direction == PINOS_DIRECTION_INPUT)
add_need_input (stream, &builder, 0);
add_need_input (stream, &builder, priv->port_id);
add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING);
add_async_complete (stream, &builder, seq, SPA_RESULT_OK);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
@ -867,8 +926,21 @@ handle_node_command (PinosStream *stream,
case SPA_NODE_COMMAND_FLUSH:
case SPA_NODE_COMMAND_DRAIN:
case SPA_NODE_COMMAND_MARKER:
{
SpaControlBuilder builder;
SpaControl control;
g_warning ("unhandled node command %d", command->type);
control_builder_init (stream, &builder);
add_async_complete (stream, &builder, seq, SPA_RESULT_NOT_IMPLEMENTED);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
spa_control_clear (&control);
break;
}
case SPA_NODE_COMMAND_CLOCK_UPDATE:
{
@ -904,6 +976,7 @@ parse_control (PinosStream *stream,
case SPA_CONTROL_CMD_PORT_UPDATE:
case SPA_CONTROL_CMD_PORT_REMOVED:
case SPA_CONTROL_CMD_PORT_STATUS_CHANGE:
case SPA_CONTROL_CMD_NODE_STATE_CHANGE:
g_warning ("got unexpected control %d", cmd);
break;
@ -924,21 +997,8 @@ parse_control (PinosStream *stream,
priv->format = p.format;
spa_debug_format (p.format);
priv->pending_seq = p.seq;
g_object_notify (G_OBJECT (stream), "format");
if (priv->port_info.n_params != 0) {
SpaControlBuilder builder;
SpaControl control;
control_builder_init (stream, &builder);
add_state_change (stream, &builder, SPA_NODE_STATE_READY);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
spa_control_clear (&control);
}
break;
}
case SPA_CONTROL_CMD_SET_PROPERTY:
@ -960,7 +1020,7 @@ parse_control (PinosStream *stream,
mem = spa_memory_import (&p.mem);
if (mem->fd == -1) {
g_debug ("add mem %d,%d, %d, %d", p.mem.pool_id, p.mem.id, fd, p.flags);
g_debug ("add mem %u:%u, fd %d, flags %d", p.mem.pool_id, p.mem.id, fd, p.flags);
mem->flags = p.flags;
mem->fd = fd;
mem->ptr = NULL;
@ -991,13 +1051,15 @@ parse_control (PinosStream *stream,
break;
/* clear previous buffers */
g_array_set_size (priv->buffer_ids, 0);
clear_buffers (stream);
for (i = 0; i < p.n_buffers; i++) {
bid.buf = p.buffers[i];
bid.cleanup = false;
bid.id = bid.buf->id;
g_debug ("add buffer %d, %d, %zd, %zd", bid.id, bid.buf->mem.mem.id, bid.buf->mem.offset, bid.buf->mem.size);
g_debug ("add buffer %d: %u:%u, %zd-%zd", bid.id,
bid.buf->mem.mem.pool_id, bid.buf->mem.mem.id,
bid.buf->mem.offset, bid.buf->mem.size);
if (bid.id != priv->buffer_ids->len) {
g_warning ("unexpected id %u found, expected %u", bid.id, priv->buffer_ids->len);
@ -1013,6 +1075,7 @@ parse_control (PinosStream *stream,
} else {
add_state_change (stream, &builder, SPA_NODE_STATE_READY);
}
add_async_complete (stream, &builder, p.seq, SPA_RESULT_OK);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
@ -1033,7 +1096,7 @@ parse_control (PinosStream *stream,
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id);
send_need_input (stream, 0);
send_need_input (stream, priv->port_id);
break;
}
case SPA_CONTROL_CMD_NODE_EVENT:
@ -1053,7 +1116,7 @@ parse_control (PinosStream *stream,
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
handle_node_command (stream, p.command);
handle_node_command (stream, p.seq, p.command);
break;
}
@ -1175,8 +1238,6 @@ on_node_proxy (GObject *source_object,
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
PinosContext *context = priv->context;
SpaControlBuilder builder;
SpaControl control;
GError *error = NULL;
@ -1186,22 +1247,7 @@ on_node_proxy (GObject *source_object,
if (priv->node == NULL)
goto node_failed;
control_builder_init (stream, &builder);
add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS |
SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS);
priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS |
SPA_CONTROL_CMD_PORT_UPDATE_INFO);
add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
spa_control_clear (&control);
do_node_init (stream);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
g_object_unref (stream);
@ -1343,6 +1389,7 @@ pinos_stream_connect (PinosStream *stream,
g_return_val_if_fail (pinos_stream_get_state (stream) == PINOS_STREAM_STATE_UNCONNECTED, FALSE);
priv->direction = direction;
priv->port_id = direction == PINOS_DIRECTION_INPUT ? 0 : MAX_INPUTS;
priv->mode = mode;
g_free (priv->path);
priv->path = g_strdup (port_path);
@ -1361,16 +1408,26 @@ pinos_stream_connect (PinosStream *stream,
}
/**
* pinos_stream_start_allocation:
* pinos_stream_finish_format:
* @stream: a #PinosStream
* @props: a #PinosProperties
* @res: a #SpaResult
* @params: an array of pointers to #SpaAllocParam
* @n_params: number of elements in @params
*
* Complete the negotiation process with result code @res.
*
* This function should be called after notification of the format.
* When @res indicates success, @params contain the parameters for the
* allocation state.
*
* Returns: %TRUE on success
*/
gboolean
pinos_stream_start_allocation (PinosStream *stream,
SpaAllocParam **params,
unsigned int n_params)
pinos_stream_finish_format (PinosStream *stream,
SpaResult res,
SpaAllocParam **params,
unsigned int n_params)
{
PinosStreamPrivate *priv;
PinosContext *context;
@ -1379,24 +1436,31 @@ pinos_stream_start_allocation (PinosStream *stream,
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->pending_seq != SPA_ID_INVALID, FALSE);
context = priv->context;
g_return_val_if_fail (pinos_context_get_state (context) == PINOS_CONTEXT_STATE_CONNECTED, FALSE);
control_builder_init (stream, &builder);
priv->port_info.params = params;
priv->port_info.n_params = n_params;
/* send update port status */
add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO);
/* send state-change */
if (priv->format)
add_state_change (stream, &builder, SPA_NODE_STATE_READY);
control_builder_init (stream, &builder);
if (SPA_RESULT_IS_OK (res)) {
add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO |
SPA_CONTROL_CMD_PORT_UPDATE_FORMAT);
if (priv->format) {
add_state_change (stream, &builder, SPA_NODE_STATE_READY);
} else {
clear_buffers (stream);
add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE);
}
}
add_async_complete (stream, &builder, priv->pending_seq, res);
spa_control_builder_end (&builder, &control);
priv->pending_seq = SPA_ID_INVALID;
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
@ -1659,7 +1723,7 @@ pinos_stream_recycle_buffer (PinosStream *stream,
priv = stream->priv;
g_return_val_if_fail (priv->direction == PINOS_DIRECTION_INPUT, FALSE);
send_reuse_buffer (stream, 0, id);
send_reuse_buffer (stream, priv->port_id, id);
return TRUE;
}
@ -1715,7 +1779,7 @@ pinos_stream_send_buffer (PinosStream *stream,
if ((bid = find_buffer (stream, id))) {
bid->used = TRUE;
send_process_buffer (stream, 0, id);
send_process_buffer (stream, priv->port_id, id);
return TRUE;
} else {
return FALSE;