improve node io

Unify input and output io areas.
Add support for ranges in the io area.
Automatically recycle buffers in the output areas in process_output
Improve the mixer, add use_buffer support, use a queue of input buffers,
fix mixing, add support for ranges.
Fix mixer and v4l2 tests
This commit is contained in:
Wim Taymans 2017-04-03 14:56:04 +02:00
parent 29fbf2e841
commit 01c13adab5
28 changed files with 983 additions and 747 deletions

View file

@ -418,6 +418,8 @@ on_context_data (SpaSource *source,
PinosProxy *proxy;
const PinosDemarshalFunc *demarshal;
pinos_log_trace ("context %p: got message %d from %u", this, opcode, id);
proxy = pinos_map_lookup (&this->objects, id);
if (proxy == NULL) {
pinos_log_error ("context %p: could not find proxy %u", this, id);
@ -428,8 +430,6 @@ on_context_data (SpaSource *source,
continue;
}
pinos_log_debug ("context %p: object demarshal %u, %u", this, id, opcode);
demarshal = proxy->iface->events;
if (demarshal[opcode]) {
if (!demarshal[opcode] (proxy, message, size))

View file

@ -54,12 +54,12 @@ pinos_proxy_new (PinosContext *context,
spa_list_insert (&this->context->proxy_list, &this->link);
pinos_log_debug ("proxy %p: new %u", this, this->id);
pinos_log_trace ("proxy %p: new %u", this, this->id);
return this;
in_use:
pinos_log_debug ("proxy %p: id %u in use for context %p", this, id, context);
pinos_log_error ("proxy %p: id %u in use for context %p", this, id, context);
free (impl);
return NULL;
}
@ -69,12 +69,11 @@ pinos_proxy_destroy (PinosProxy *proxy)
{
PinosProxyImpl *impl = SPA_CONTAINER_OF (proxy, PinosProxyImpl, this);
pinos_log_debug ("proxy %p: destroy %u", proxy, proxy->id);
pinos_log_trace ("proxy %p: destroy %u", proxy, proxy->id);
pinos_signal_emit (&proxy->destroy_signal, proxy);
pinos_map_remove (&proxy->context->objects, proxy->id);
spa_list_remove (&proxy->link);
pinos_log_debug ("proxy %p: free", proxy);
free (impl);
}

View file

@ -463,6 +463,18 @@ find_buffer (PinosStream *stream, uint32_t id)
return NULL;
}
static inline void
reuse_buffer (PinosStream *stream, uint32_t id)
{
BufferId *bid;
if ((bid = find_buffer (stream, id)) && bid->used) {
pinos_log_trace ("stream %p: reuse buffer %u", stream, id);
bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, id);
}
}
static void
handle_rtnode_event (PinosStream *stream,
SpaEvent *event)
@ -473,10 +485,10 @@ handle_rtnode_event (PinosStream *stream,
if (SPA_EVENT_TYPE (event) == context->type.event_node.HaveOutput) {
int i;
//pinos_log_debug ("stream %p: have output", stream);
pinos_log_trace ("stream %p: have output", stream);
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i];
SpaPortIO *input = &impl->trans->inputs[i];
if (input->buffer_id == SPA_ID_INVALID)
continue;
@ -487,7 +499,19 @@ handle_rtnode_event (PinosStream *stream,
send_need_input (stream);
}
else if (SPA_EVENT_TYPE (event) == context->type.event_node.NeedInput) {
//pinos_log_debug ("stream %p: need input", stream);
int i;
BufferId *bid;
for (i = 0; i < impl->trans->area->n_outputs; i++) {
SpaPortIO *output = &impl->trans->outputs[i];
if (output->buffer_id == SPA_ID_INVALID)
continue;
reuse_buffer (stream, output->buffer_id);
output->buffer_id = SPA_ID_INVALID;
}
pinos_log_trace ("stream %p: need input", stream);
pinos_signal_emit (&stream->need_buffer, stream);
}
else if (SPA_EVENT_TYPE (event) == context->type.event_node.ReuseBuffer) {
@ -499,10 +523,7 @@ handle_rtnode_event (PinosStream *stream,
if (impl->direction != SPA_DIRECTION_OUTPUT)
return;
if ((bid = find_buffer (stream, p->body.buffer_id.value)) && bid->used) {
bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, p->body.buffer_id.value);
}
reuse_buffer (stream, p->body.buffer_id.value);
}
else {
pinos_log_warn ("unexpected node event %d", SPA_EVENT_TYPE (event));
@ -547,7 +568,7 @@ handle_socket (PinosStream *stream, int rtfd)
impl->rtfd = rtfd;
impl->rtsocket_source = pinos_loop_add_io (stream->context->loop,
impl->rtfd,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP,
SPA_IO_ERR | SPA_IO_HUP,
true,
on_rtsocket_condition,
stream);
@ -583,17 +604,27 @@ handle_node_command (PinosStream *stream,
if (SPA_COMMAND_TYPE (command) == context->type.command_node.Pause) {
pinos_log_debug ("stream %p: pause %d", stream, seq);
pinos_loop_update_io (stream->context->loop,
impl->rtsocket_source,
SPA_IO_ERR | SPA_IO_HUP);
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
}
else if (SPA_COMMAND_TYPE (command) == context->type.command_node.Start) {
pinos_log_debug ("stream %p: start %d", stream, seq);
pinos_log_debug ("stream %p: start %d %d", stream, seq, impl->direction);
add_state_change (stream, SPA_NODE_STATE_STREAMING, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
pinos_loop_update_io (stream->context->loop,
impl->rtsocket_source,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
if (impl->direction == SPA_DIRECTION_INPUT)
send_need_input (stream);
else
pinos_signal_emit (&stream->need_buffer, stream);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
}

View file

@ -47,8 +47,8 @@ transport_area_get_size (PinosTransportArea *area)
{
size_t size;
size = sizeof (PinosTransportArea);
size += area->max_inputs * sizeof (SpaPortInput);
size += area->max_outputs * sizeof (SpaPortOutput);
size += area->max_inputs * sizeof (SpaPortIO);
size += area->max_outputs * sizeof (SpaPortIO);
size += sizeof (SpaRingbuffer);
size += INPUT_BUFFER_SIZE;
size += sizeof (SpaRingbuffer);
@ -63,25 +63,25 @@ transport_setup_area (void *p, PinosTransport *trans)
int i;
trans->area = a = p;
p = SPA_MEMBER (p, sizeof (PinosTransportArea), SpaPortInput);
p = SPA_MEMBER (p, sizeof (PinosTransportArea), SpaPortIO);
trans->inputs = p;
for (i = 0; i < a->max_inputs; i++) {
trans->inputs[i].state = SPA_PORT_STATE_FLAG_NONE;
trans->inputs[i].flags = SPA_PORT_INPUT_FLAG_NONE;
trans->inputs[i].flags = SPA_PORT_IO_FLAG_NONE;
trans->inputs[i].buffer_id = SPA_ID_INVALID;
trans->inputs[i].status = SPA_RESULT_OK;
}
p = SPA_MEMBER (p, a->max_inputs * sizeof (SpaPortInput), void);
p = SPA_MEMBER (p, a->max_inputs * sizeof (SpaPortIO), void);
trans->outputs = p;
for (i = 0; i < a->max_outputs; i++) {
trans->outputs[i].state = SPA_PORT_STATE_FLAG_NONE;
trans->outputs[i].flags = SPA_PORT_OUTPUT_FLAG_NONE;
trans->outputs[i].flags = SPA_PORT_IO_FLAG_NONE;
trans->outputs[i].buffer_id = SPA_ID_INVALID;
trans->outputs[i].status = SPA_RESULT_OK;
}
p = SPA_MEMBER (p, a->max_outputs * sizeof (SpaPortOutput), void);
p = SPA_MEMBER (p, a->max_outputs * sizeof (SpaPortIO), void);
trans->input_buffer = p;
spa_ringbuffer_init (trans->input_buffer, INPUT_BUFFER_SIZE);

View file

@ -58,8 +58,8 @@ struct _PinosTransport {
PinosTransport *trans));
PinosTransportArea *area;
SpaPortInput *inputs;
SpaPortOutput *outputs;
SpaPortIO *inputs;
SpaPortIO *outputs;
void *input_data;
SpaRingbuffer *input_buffer;
void *output_data;

View file

@ -565,41 +565,24 @@ spa_proxy_node_port_set_props (SpaNode *node,
}
static SpaResult
spa_proxy_node_port_set_input (SpaNode *node,
uint32_t port_id,
SpaPortInput *input)
spa_proxy_node_port_set_io (SpaNode *node,
SpaDirection direction,
uint32_t port_id,
SpaPortIO *io)
{
SpaProxy *this;
SpaProxyPort *port;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
if (!CHECK_PORT (this, SPA_DIRECTION_INPUT, port_id))
if (!CHECK_PORT (this, direction, port_id))
return SPA_RESULT_INVALID_PORT;
this->in_ports[port_id].io = input;
return SPA_RESULT_OK;
}
static SpaResult
spa_proxy_node_port_set_output (SpaNode *node,
uint32_t port_id,
SpaPortOutput *output)
{
SpaProxy *this;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
if (!CHECK_PORT (this, SPA_DIRECTION_OUTPUT, port_id))
return SPA_RESULT_INVALID_PORT;
this->out_ports[port_id].io = output;
port = direction == SPA_DIRECTION_INPUT ? &this->in_ports[port_id] : &this->out_ports[port_id];
port->io = io;
return SPA_RESULT_OK;
}
@ -815,11 +798,14 @@ static SpaResult
spa_proxy_node_process_output (SpaNode *node)
{
SpaProxy *this;
PinosNode *pnode;
int i;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
pnode = this->pnode;
send_need_input (this);
@ -982,8 +968,7 @@ static const SpaNode proxy_node = {
spa_proxy_node_port_set_props,
spa_proxy_node_port_use_buffers,
spa_proxy_node_port_alloc_buffers,
spa_proxy_node_port_set_input,
spa_proxy_node_port_set_output,
spa_proxy_node_port_set_io,
spa_proxy_node_port_reuse_buffer,
spa_proxy_node_port_send_command,
spa_proxy_node_process_input,

View file

@ -150,9 +150,9 @@ update_port_ids (PinosNode *node, bool create)
node->transport->area->n_outputs = n_output_ports;
for (i = 0; i < max_input_ports; i++)
spa_node_port_set_input (node->node, i, &node->transport->inputs[i]);
spa_node_port_set_io (node->node, SPA_DIRECTION_INPUT, i, &node->transport->inputs[i]);
for (i = 0; i < max_output_ports; i++)
spa_node_port_set_output (node->node, i, &node->transport->outputs[i]);
spa_node_port_set_io (node->node, SPA_DIRECTION_OUTPUT, i, &node->transport->outputs[i]);
pinos_signal_emit (&node->transport_changed, node);
}
@ -272,8 +272,8 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
for (i = 0; i < this->transport->area->n_inputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
SpaPortIO *pi;
SpaPortIO *po;
pi = &this->transport->inputs[i];
if (pi->buffer_id != SPA_ID_INVALID)
@ -289,16 +289,8 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
if (po->buffer_id != SPA_ID_INVALID) {
processed = true;
pi->buffer_id = po->buffer_id;
if ((res = spa_node_port_reuse_buffer (outport->node->node,
outport->port_id,
po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", outport->node, res);
po->buffer_id = SPA_ID_INVALID;
*pi = *po;
}
if ((res = spa_node_process_output (outport->node->node)) < 0)
pinos_log_warn ("node %p: got process output %d", outport->node, res);
}
@ -316,8 +308,7 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
for (i = 0; i < this->transport->area->n_outputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
SpaPortIO *po;
po = &this->transport->outputs[i];
if (po->buffer_id == SPA_ID_INVALID)
@ -331,22 +322,12 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
continue;
inport = link->rt.input;
pi = &inport->node->transport->inputs[inport->port_id];
pi->buffer_id = po->buffer_id;
inport->node->transport->inputs[inport->port_id] = *po;
if ((res = spa_node_process_input (inport->node->node)) < 0)
pinos_log_warn ("node %p: got process input %d", inport->node, res);
}
if ((res = spa_node_port_reuse_buffer (this->node,
outport->port_id,
po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", this, res);
processed = true;
po->buffer_id = SPA_ID_INVALID;
}
if (processed) {
if ((res = spa_node_process_output (this->node)) < 0)

View file

@ -72,7 +72,7 @@ pinos_resource_destroy (PinosResource *resource)
{
PinosClient *client = resource->client;
pinos_log_debug ("resource %p: destroy %u", resource, resource->id);
pinos_log_trace ("resource %p: destroy %u", resource, resource->id);
pinos_signal_emit (&resource->destroy_signal, resource);
pinos_map_insert_at (&client->objects, resource->id, NULL);
@ -84,6 +84,5 @@ pinos_resource_destroy (PinosResource *resource)
if (client->core_resource)
pinos_core_notify_remove_id (client->core_resource, resource->id);
pinos_log_debug ("resource %p: free", resource);
free (resource);
}