Add port direction again

simplify port numbering again by using 0->max_ports for bot input ports
and output ports. This means we need to tall what direction the port is.
Add port_info serialize functions
Copy metadata and data when we are not sharing buffers.
Make pinossink work again.
This commit is contained in:
Wim Taymans 2016-10-03 19:43:42 +02:00
parent b208e8b690
commit d828073bb8
26 changed files with 1104 additions and 648 deletions

View file

@ -54,7 +54,9 @@ typedef struct {
typedef struct {
uint32_t id;
bool used;
void *buf_ptr;
SpaBuffer *buf;
SpaData *datas;
} BufferId;
struct _PinosStreamPrivate
@ -68,13 +70,13 @@ struct _PinosStreamPrivate
PinosStreamState state;
GError *error;
PinosDirection direction;
gchar *path;
SpaNodeState node_state;
GPtrArray *possible_formats;
SpaFormat *format;
SpaPortInfo port_info;
SpaDirection direction;
uint32_t port_id;
uint32_t pending_seq;
@ -628,9 +630,9 @@ add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang
nu.change_mask = change_mask;
if (change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS)
nu.max_input_ports = priv->direction == PINOS_DIRECTION_INPUT ? 1 : 0;
nu.max_input_ports = priv->direction == SPA_DIRECTION_INPUT ? 1 : 0;
if (change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS)
nu.max_output_ports = priv->direction == PINOS_DIRECTION_OUTPUT ? 1 : 0;
nu.max_output_ports = priv->direction == SPA_DIRECTION_OUTPUT ? 1 : 0;
nu.props = NULL;
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_UPDATE, &nu);
}
@ -653,6 +655,7 @@ add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang
PinosStreamPrivate *priv = stream->priv;
SpaControlCmdPortUpdate pu = { 0, };;
pu.direction = priv->direction;
pu.port_id = priv->port_id;
pu.change_mask = change_mask;
if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS) {
@ -663,8 +666,10 @@ add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang
pu.format = priv->format;
}
pu.props = NULL;
if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO)
if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO) {
pu.info = &priv->port_info;
spa_debug_port_info (pu.info);
}
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu);
}
@ -775,6 +780,7 @@ send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id)
SpaNodeEventHaveOutput ho;
control_builder_init (stream, &builder);
pb.direction = priv->direction;
pb.port_id = port_id;
pb.buffer_id = buffer_id;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb);
@ -871,7 +877,7 @@ handle_node_event (PinosStream *stream,
if (p->port_id != priv->port_id)
break;
if (priv->direction != PINOS_DIRECTION_OUTPUT)
if (priv->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->buffer_id))) {
@ -932,7 +938,7 @@ handle_node_command (PinosStream *stream,
g_debug ("stream %p: start", stream);
control_builder_init (stream, &builder);
if (priv->direction == PINOS_DIRECTION_INPUT)
if (priv->direction == SPA_DIRECTION_INPUT)
add_need_input (stream, &builder, priv->port_id);
add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING);
add_async_complete (stream, &builder, seq, SPA_RESULT_OK);
@ -1072,6 +1078,7 @@ parse_control (PinosStream *stream,
unsigned int i, j;
SpaControlBuilder builder;
SpaControl control;
SpaBuffer *b;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
@ -1095,16 +1102,62 @@ parse_control (PinosStream *stream,
}
}
bid.buf = spa_buffer_deserialize (mid->ptr, p.buffers[i].offset);
bid.id = bid.buf->id;
bid.buf_ptr = SPA_MEMBER (mid->ptr, p.buffers[i].offset, void);
{
size_t size;
unsigned int i;
SpaMeta *m;
b = bid.buf_ptr;
size = sizeof (SpaBuffer);
m = SPA_MEMBER (b, SPA_PTR_TO_INT (b->metas), SpaMeta);
for (i = 0; i < b->n_metas; i++)
size += sizeof (SpaMeta) + m[i].size;
for (i = 0; i < b->n_datas; i++)
size += sizeof (SpaData);
b = bid.buf = g_memdup (bid.buf_ptr, size);
if (b->metas)
b->metas = SPA_MEMBER (b, SPA_PTR_TO_INT (b->metas), SpaMeta);
if (b->datas) {
bid.datas = SPA_MEMBER (bid.buf_ptr, SPA_PTR_TO_INT (b->datas), SpaData);
b->datas = SPA_MEMBER (b, SPA_PTR_TO_INT (b->datas), SpaData);
}
}
bid.id = b->id;
g_debug ("add buffer %d %d %zd", mid->id, bid.id, p.buffers[i].offset);
for (j = 0; j < bid.buf->n_datas; j++) {
SpaData *d = &bid.buf->datas[j];
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->data = SPA_INT_TO_PTR (bmid->fd);
g_debug (" data %d %u -> %d", j, bmid->id, bmid->fd);
for (j = 0; j < b->n_metas; j++) {
SpaMeta *m = &b->metas[j];
if (m->data)
m->data = SPA_MEMBER (bid.buf_ptr, SPA_PTR_TO_INT (m->data), void);
}
for (j = 0; j < b->n_datas; j++) {
SpaData *d = &b->datas[j];
switch (d->type) {
case SPA_DATA_TYPE_ID:
{
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->type = SPA_DATA_TYPE_FD;
d->data = SPA_INT_TO_PTR (bmid->fd);
g_debug (" data %d %u -> %d", j, bmid->id, bmid->fd);
break;
}
case SPA_DATA_TYPE_MEMPTR:
{
d->data = SPA_MEMBER (bid.buf_ptr, SPA_PTR_TO_INT (d->data), void);
g_debug (" data %d %u -> %p", j, bid.id, d->data);
break;
}
default:
g_warning ("unknown buffer data type %d", d->type);
break;
}
}
if (bid.id != priv->buffer_ids->len) {
@ -1133,16 +1186,23 @@ parse_control (PinosStream *stream,
case SPA_CONTROL_CMD_PROCESS_BUFFER:
{
SpaControlCmdProcessBuffer p;
guint i;
BufferId *bid;
if (priv->direction != PINOS_DIRECTION_INPUT)
if (priv->direction != SPA_DIRECTION_INPUT)
break;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id);
if ((bid = find_buffer (stream, p.buffer_id))) {
for (i = 0; i < bid->buf->n_datas; i++) {
bid->buf->datas[i].size = bid->datas[i].size;
}
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id);
send_need_input (stream, priv->port_id);
send_need_input (stream, priv->port_id);
}
break;
}
case SPA_CONTROL_CMD_NODE_EVENT:
@ -1434,8 +1494,8 @@ pinos_stream_connect (PinosStream *stream,
g_return_val_if_fail (pinos_context_get_state (context) == PINOS_CONTEXT_STATE_CONNECTED, FALSE);
g_return_val_if_fail (pinos_stream_get_state (stream) == PINOS_STREAM_STATE_UNCONNECTED, FALSE);
priv->direction = direction;
priv->port_id = direction == PINOS_DIRECTION_INPUT ? 0 : MAX_INPUTS;
priv->direction = direction == PINOS_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
priv->port_id = 0;
priv->mode = mode;
g_free (priv->path);
priv->path = g_strdup (port_path);
@ -1502,6 +1562,9 @@ pinos_stream_finish_format (PinosStream *stream,
add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE);
}
}
priv->port_info.params = NULL;
priv->port_info.n_params = 0;
add_async_complete (stream, &builder, priv->pending_seq, res);
spa_control_builder_end (&builder, &control);
@ -1705,7 +1768,7 @@ pinos_stream_get_empty_buffer (PinosStream *stream)
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->direction == PINOS_DIRECTION_OUTPUT, FALSE);
g_return_val_if_fail (priv->direction == SPA_DIRECTION_OUTPUT, FALSE);
for (i = 0; i < priv->buffer_ids->len; i++) {
BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i);
@ -1733,7 +1796,7 @@ pinos_stream_recycle_buffer (PinosStream *stream,
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (id != SPA_ID_INVALID, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->direction == PINOS_DIRECTION_INPUT, FALSE);
g_return_val_if_fail (priv->direction == SPA_DIRECTION_INPUT, FALSE);
send_reuse_buffer (stream, priv->port_id, id);
@ -1783,14 +1846,18 @@ pinos_stream_send_buffer (PinosStream *stream,
{
PinosStreamPrivate *priv;
BufferId *bid;
guint i;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (id != SPA_ID_INVALID, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->direction == PINOS_DIRECTION_OUTPUT, FALSE);
g_return_val_if_fail (priv->direction == SPA_DIRECTION_OUTPUT, FALSE);
if ((bid = find_buffer (stream, id))) {
bid->used = TRUE;
for (i = 0; i < bid->buf->n_datas; i++) {
bid->datas[i].size = bid->buf->datas[i].size;
}
send_process_buffer (stream, priv->port_id, id);
return TRUE;
} else {

View file

@ -333,6 +333,7 @@ gst_pinos_sink_get_property (GObject * object, guint prop_id,
typedef struct {
GstPinosSink *sink;
guint id;
SpaBuffer *buf;
SpaMetaHeader *header;
guint flags;
} ProcessMemData;
@ -368,6 +369,7 @@ on_add_buffer (GObject *gobject,
data.sink = gst_object_ref (pinossink);
data.id = id;
data.buf = b;
data.header = NULL;
for (i = 0; i < b->n_metas; i++) {
@ -591,6 +593,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GstPinosSink *pinossink;
gboolean res;
ProcessMemData *data;
guint i;
pinossink = GST_PINOS_SINK (bsink);
@ -599,7 +602,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
pinos_main_loop_lock (pinossink->loop);
if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
goto streaming_error;
goto done;
// goto streaming_error;
if (buffer->pool != GST_BUFFER_POOL_CAST (pinossink->pool)) {
GstBuffer *b = NULL;
@ -613,9 +617,22 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer),
process_mem_data_quark);
if (data->header) {
data->header->seq = GST_BUFFER_OFFSET (buffer);
data->header->pts = GST_BUFFER_PTS (buffer);
data->header->dts_offset = GST_BUFFER_DTS (buffer);
}
for (i = 0; i < data->buf->n_datas; i++) {
SpaData *d = &data->buf->datas[i];
GstMemory *mem = gst_buffer_get_memory (buffer, i);
d->offset = mem->offset;
d->size = mem->size;
}
if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id)))
g_warning ("can't send buffer");
done:
pinos_main_loop_unlock (pinossink->loop);
return GST_FLOW_OK;
@ -624,11 +641,11 @@ not_negotiated:
{
return GST_FLOW_NOT_NEGOTIATED;
}
streaming_error:
{
pinos_main_loop_unlock (pinossink->loop);
return GST_FLOW_ERROR;
}
//streaming_error:
// {
// pinos_main_loop_unlock (pinossink->loop);
// return GST_FLOW_ERROR;
// }
}
static gboolean

View file

@ -237,6 +237,7 @@ do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
g_debug ("link %p: doing negotiate format", this);
again:
if ((res = spa_node_port_enum_formats (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
&filter,
NULL,
@ -253,6 +254,7 @@ again:
spa_debug_format (filter);
if ((res = spa_node_port_enum_formats (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
&format,
filter,
@ -273,6 +275,7 @@ again:
} else if (in_state == SPA_NODE_STATE_CONFIGURE && out_state > SPA_NODE_STATE_CONFIGURE) {
/* only input needs format */
if ((res = spa_node_port_get_format (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
(const SpaFormat **)&format)) < 0) {
g_set_error (&error,
@ -284,6 +287,7 @@ again:
} else if (out_state == SPA_NODE_STATE_CONFIGURE && in_state > SPA_NODE_STATE_CONFIGURE) {
/* only output needs format */
if ((res = spa_node_port_get_format (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
(const SpaFormat **)&format)) < 0) {
g_set_error (&error,
@ -301,6 +305,7 @@ again:
if (out_state == SPA_NODE_STATE_CONFIGURE) {
g_debug ("link %p: doing set format on output", this);
if ((res = spa_node_port_set_format (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
SPA_PORT_FORMAT_FLAG_NEAREST,
format)) < 0) {
@ -313,6 +318,7 @@ again:
} else if (in_state == SPA_NODE_STATE_CONFIGURE) {
g_debug ("link %p: doing set format on input", this);
if ((res = spa_node_port_set_format (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
SPA_PORT_FORMAT_FLAG_NEAREST,
format)) < 0) {
@ -360,14 +366,20 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
g_debug ("link %p: doing alloc buffers %p %p", this, this->output->node, this->input->node);
/* find out what's possible */
if ((res = spa_node_port_get_info (this->output->node->node, this->output->port, &oinfo)) < 0) {
if ((res = spa_node_port_get_info (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
&oinfo)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error get output port info: %d", res);
goto error;
}
if ((res = spa_node_port_get_info (this->input->node->node, this->input->port, &iinfo)) < 0) {
if ((res = spa_node_port_get_info (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
&iinfo)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
@ -421,16 +433,20 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (priv->buffers == NULL) {
SpaAllocParamBuffers *in_alloc, *out_alloc;
guint max_buffers = MAX_BUFFERS;
guint max_buffers = MAX_BUFFERS, min_size = 4096;
gboolean alloc_data = TRUE;
max_buffers = MAX_BUFFERS;
in_alloc = find_param (iinfo, SPA_ALLOC_PARAM_TYPE_BUFFERS);
if (in_alloc)
if (in_alloc) {
max_buffers = in_alloc->max_buffers == 0 ? max_buffers : SPA_MIN (in_alloc->max_buffers, max_buffers);
min_size = SPA_MAX (min_size, in_alloc->minsize);
}
out_alloc = find_param (oinfo, SPA_ALLOC_PARAM_TYPE_BUFFERS);
if (out_alloc)
if (out_alloc) {
max_buffers = out_alloc->max_buffers == 0 ? max_buffers : SPA_MIN (out_alloc->max_buffers, max_buffers);
min_size = SPA_MAX (min_size, out_alloc->minsize);
}
if ((in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) ||
(out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS))
@ -445,23 +461,29 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
g_debug ("reusing %d output buffers %p", priv->n_buffers, priv->buffers);
} else {
guint i;
size_t hdr_size;
size_t hdr_size, data_size, buf_size;
void *p;
spa_alloc_params_get_header_size (oinfo->params, oinfo->n_params, 1, &hdr_size);
if (alloc_data)
data_size = min_size;
else
data_size = 0;
buf_size = hdr_size + data_size;
priv->n_buffers = max_buffers;
pinos_memblock_alloc (PINOS_MEMBLOCK_FLAG_WITH_FD |
PINOS_MEMBLOCK_FLAG_MAP_READWRITE |
PINOS_MEMBLOCK_FLAG_SEAL,
priv->n_buffers * (sizeof (SpaBuffer*) + hdr_size),
priv->n_buffers * (sizeof (SpaBuffer*) + buf_size),
&priv->buffer_mem);
priv->buffers = p = priv->buffer_mem.ptr;
p = SPA_MEMBER (p, priv->n_buffers * sizeof (SpaBuffer*), void);
for (i = 0; i < priv->n_buffers; i++)
priv->buffers[i] = SPA_MEMBER (p, hdr_size * i, SpaBuffer);
priv->buffers[i] = SPA_MEMBER (p, buf_size * i, SpaBuffer);
if ((res = spa_buffer_init_headers (oinfo->params,
oinfo->n_params,
@ -474,12 +496,25 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
"error buffer alloc: %d", res);
goto error;
}
for (i = 0; i < priv->n_buffers; i++) {
SpaData *d = &priv->buffers[i]->datas[0];
g_debug (" buffers %p %d", priv->buffers[i], priv->buffers[i]->id);
d->type = SPA_DATA_TYPE_MEMPTR;
d->data = SPA_MEMBER (priv->buffers[i], hdr_size, void);
d->size = data_size;
d->maxsize = data_size;
d->offset = 0;
d->stride = 0;
}
g_debug ("allocated %d buffers %p", priv->n_buffers, priv->buffers);
priv->allocated = TRUE;
}
if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = spa_node_port_alloc_buffers (this->input->node->node, this->input->port,
if ((res = spa_node_port_alloc_buffers (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
oinfo->params, oinfo->n_params,
priv->buffers, &priv->n_buffers)) < 0) {
g_set_error (&error,
@ -496,7 +531,9 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
g_debug ("allocated %d buffers %p from input port", priv->n_buffers, priv->buffers);
}
else if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = spa_node_port_alloc_buffers (this->output->node->node, this->output->port,
if ((res = spa_node_port_alloc_buffers (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
iinfo->params, iinfo->n_params,
priv->buffers, &priv->n_buffers)) < 0) {
g_set_error (&error,
@ -516,8 +553,11 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
g_debug ("using %d buffers %p on input port", priv->n_buffers, priv->buffers);
if ((res = spa_node_port_use_buffers (this->input->node->node, this->input->port,
priv->buffers, priv->n_buffers)) < 0) {
if ((res = spa_node_port_use_buffers (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
priv->buffers,
priv->n_buffers)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
@ -530,8 +570,11 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
}
else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
g_debug ("using %d buffers %p on output port", priv->n_buffers, priv->buffers);
if ((res = spa_node_port_use_buffers (this->output->node->node, this->output->port,
priv->buffers, priv->n_buffers)) < 0) {
if ((res = spa_node_port_use_buffers (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
priv->buffers,
priv->n_buffers)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,

View file

@ -281,7 +281,7 @@ suspend_node (PinosNode *this)
for (walk = priv->input_ports; walk; walk = g_list_next (walk)) {
NodePort *p = walk->data;
if ((res = spa_node_port_set_format (this->node, p->port.port, 0, NULL)) < 0)
if ((res = spa_node_port_set_format (this->node, SPA_DIRECTION_INPUT, p->port.port, 0, NULL)) < 0)
g_warning ("error unset format output: %d", res);
p->port.buffers = NULL;
p->port.n_buffers = 0;
@ -291,7 +291,7 @@ suspend_node (PinosNode *this)
}
for (walk = priv->output_ports; walk; walk = g_list_next (walk)) {
NodePort *p = walk->data;
if ((res = spa_node_port_set_format (this->node, p->port.port, 0, NULL)) < 0)
if ((res = spa_node_port_set_format (this->node, SPA_DIRECTION_OUTPUT, p->port.port, 0, NULL)) < 0)
g_warning ("error unset format output: %d", res);
p->port.buffers = NULL;
p->port.n_buffers = 0;
@ -1132,15 +1132,14 @@ pinos_node_get_free_port (PinosNode *node,
max_ports = priv->max_input_ports;
n_ports = priv->n_input_ports;
ports = priv->input_ports;
free_port = 0;
} else {
max_ports = priv->max_output_ports;
n_ports = priv->n_output_ports;
ports = priv->output_ports;
free_port = priv->max_input_ports;
}
free_port = 0;
g_debug ("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports);
g_debug ("node %p: direction %d max %u, n %u, free_port %u", node, direction, max_ports, n_ports, free_port);
for (walk = ports; walk; walk = g_list_next (walk)) {
PinosPort *p = walk->data;