From 4c7b56020a28b60c9c2f6bcc2675cb5615b6a041 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 12 Apr 2017 10:40:17 +0200 Subject: [PATCH] audiomixer: improve mixing Remove PortIO flags, we can use the status Move PortIO to ports Move transport to client-node Improve scheduling --- pinos/client/stream.c | 5 +- pinos/client/transport.c | 6 +- pinos/gst/gstpinossink.c | 5 +- pinos/server/client-node.c | 109 +++++++++---- pinos/server/node.c | 196 ++++++++++------------- pinos/server/node.h | 28 ++-- pinos/server/port.c | 2 + pinos/server/port.h | 2 +- spa/include/spa/node.h | 8 +- spa/plugins/alsa/alsa-utils.c | 5 +- spa/plugins/audiomixer/audiomixer.c | 202 ++++++++++++++++-------- spa/plugins/audiotestsrc/audiotestsrc.c | 22 ++- spa/plugins/videotestsrc/videotestsrc.c | 3 +- spa/tests/test-mixer.c | 105 ++++++------ 14 files changed, 398 insertions(+), 300 deletions(-) diff --git a/pinos/client/stream.c b/pinos/client/stream.c index aa6080c08..ecc55e021 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -474,6 +474,7 @@ handle_rtnode_event (PinosStream *stream, for (i = 0; i < impl->trans->area->n_inputs; i++) { SpaPortIO *input = &impl->trans->inputs[i]; + pinos_log_trace ("stream %p: have output %d %d", stream, input->status, input->buffer_id); if (input->buffer_id == SPA_ID_INVALID) continue; @@ -488,6 +489,7 @@ handle_rtnode_event (PinosStream *stream, for (i = 0; i < impl->trans->area->n_outputs; i++) { SpaPortIO *output = &impl->trans->outputs[i]; + pinos_log_trace ("stream %p: buffer %d %u", stream, output->status, output->buffer_id); if (output->buffer_id == SPA_ID_INVALID) continue; @@ -1189,7 +1191,8 @@ pinos_stream_send_buffer (PinosStream *stream, if ((bid = find_buffer (stream, id)) && !bid->used) { bid->used = true; impl->trans->outputs[0].buffer_id = id; - impl->trans->outputs[0].status = SPA_RESULT_OK; + impl->trans->outputs[0].status = SPA_RESULT_HAVE_OUTPUT; + pinos_log_trace ("stream %p: send buffer %d", stream, id); send_have_output (stream); } else { pinos_log_debug ("stream %p: output %u was used", stream, id); diff --git a/pinos/client/transport.c b/pinos/client/transport.c index fa308993c..5d054e84e 100644 --- a/pinos/client/transport.c +++ b/pinos/client/transport.c @@ -85,14 +85,12 @@ transport_setup_area (void *p, PinosTransport *trans, bool reset) if (reset) { for (i = 0; i < a->max_inputs; i++) { - trans->inputs[i].flags = 0; - trans->inputs[i].buffer_id = SPA_ID_INVALID; trans->inputs[i].status = SPA_RESULT_OK; + trans->inputs[i].buffer_id = SPA_ID_INVALID; } for (i = 0; i < a->max_outputs; i++) { - trans->outputs[i].flags = 0; - trans->outputs[i].buffer_id = SPA_ID_INVALID; trans->outputs[i].status = SPA_RESULT_OK; + trans->outputs[i].buffer_id = SPA_ID_INVALID; } spa_ringbuffer_init (trans->input_buffer, INPUT_BUFFER_SIZE); spa_ringbuffer_init (trans->output_buffer, OUTPUT_BUFFER_SIZE); diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index efe05eeb4..cd2687b01 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -480,8 +480,10 @@ do_send_buffer (GstPinosSink *pinossink) guint i; buffer = g_queue_pop_head (&pinossink->queue); - if (buffer == NULL) + if (buffer == NULL) { + GST_WARNING ("out of buffers"); return; + } data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), process_mem_data_quark); @@ -702,6 +704,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) } else gst_buffer_ref (buffer); + GST_DEBUG ("push buffer in queue"); g_queue_push_tail (&pinossink->queue, buffer); // if (pinossink->need_ready) diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 6b234de00..56f29cc27 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -58,6 +58,7 @@ typedef struct _SpaProxy SpaProxy; typedef struct _ProxyBuffer ProxyBuffer; +typedef struct _PinosClientNodeImpl PinosClientNodeImpl; struct _ProxyBuffer { SpaBuffer *outbuf; @@ -88,7 +89,7 @@ struct _SpaProxy { SpaNode node; - PinosNode *pnode; + PinosClientNodeImpl *impl; SpaTypeMap *map; SpaLog *log; @@ -113,7 +114,7 @@ struct _SpaProxy uint32_t seq; }; -typedef struct +struct _PinosClientNodeImpl { PinosClientNode this; @@ -121,13 +122,15 @@ typedef struct SpaProxy proxy; + PinosTransport *transport; + PinosListener node_free; - PinosListener transport_changed; + PinosListener initialized; PinosListener loop_changed; PinosListener global_added; int data_fd; -} PinosClientNodeImpl; +}; static SpaResult clear_buffers (SpaProxy *this, SpaProxyPort *port) @@ -159,22 +162,22 @@ spa_proxy_node_set_props (SpaNode *node, static void send_need_input (SpaProxy *this) { - PinosNode *pnode = this->pnode; - SpaEvent event = SPA_EVENT_INIT (pnode->core->type.event_node.NeedInput); + PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy); + SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_node.NeedInput); uint64_t cmd = 1; - pinos_transport_add_event (pnode->transport, &event); + pinos_transport_add_event (impl->transport, &event); write (this->data_source.fd, &cmd, 8); } static void send_have_output (SpaProxy *this) { - PinosNode *pnode = this->pnode; - SpaEvent event = SPA_EVENT_INIT (pnode->core->type.event_node.HaveOutput); + PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy); + SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_node.HaveOutput); uint64_t cmd = 1; - pinos_transport_add_event (pnode->transport, &event); + pinos_transport_add_event (impl->transport, &event); write (this->data_source.fd, &cmd, 8); } @@ -194,7 +197,7 @@ spa_proxy_node_send_command (SpaNode *node, if (this->resource == NULL) return SPA_RESULT_OK; - core = this->pnode->core; + core = this->impl->core; if (SPA_COMMAND_TYPE (command) == core->type.command_node.ClockUpdate) { pinos_client_node_notify_node_command (this->resource, @@ -748,22 +751,22 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, uint32_t buffer_id) { SpaProxy *this; - PinosNode *pnode; + PinosClientNodeImpl *impl; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = SPA_CONTAINER_OF (node, SpaProxy, node); - pnode = this->pnode; + impl = this->impl; if (!CHECK_OUT_PORT (this, SPA_DIRECTION_OUTPUT, port_id)) return SPA_RESULT_INVALID_PORT; spa_log_trace (this->log, "reuse buffer %d", buffer_id); { - SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (pnode->core->type.event_node.ReuseBuffer, + SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (impl->core->type.event_node.ReuseBuffer, port_id, buffer_id); - pinos_transport_add_event (pnode->transport, (SpaEvent *)&rb); + pinos_transport_add_event (impl->transport, (SpaEvent *)&rb); } return SPA_RESULT_OK; @@ -789,13 +792,25 @@ spa_proxy_node_port_send_command (SpaNode *node, static SpaResult spa_proxy_node_process_input (SpaNode *node) { + PinosClientNodeImpl *impl; SpaProxy *this; + int i; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = SPA_CONTAINER_OF (node, SpaProxy, node); + impl = this->impl; + for (i = 0; i < MAX_INPUTS; i++) { + SpaPortIO *io = this->in_ports[i].io; + + if (!io) + continue; + + impl->transport->inputs[i] = *io; + io->status = SPA_RESULT_OK; + } send_have_output (this); return SPA_RESULT_OK; @@ -805,21 +820,44 @@ static SpaResult spa_proxy_node_process_output (SpaNode *node) { SpaProxy *this; + PinosClientNodeImpl *impl; int i; + bool send_need = false; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = SPA_CONTAINER_OF (node, SpaProxy, node); + impl = this->impl; for (i = 0; i < MAX_OUTPUTS; i++) { - SpaPortIO *io = this->out_ports[i].io; - if (io && io->buffer_id != SPA_ID_INVALID) { - spa_proxy_node_port_reuse_buffer (node, i, io->buffer_id); + SpaPortIO *io = this->out_ports[i].io, tmp; + + if (!io) + continue; + + if (io->buffer_id != SPA_ID_INVALID) { + SpaEventNodeReuseBuffer rb = + SPA_EVENT_NODE_REUSE_BUFFER_INIT (impl->core->type.event_node.ReuseBuffer, i, io->buffer_id); + + spa_log_trace (this->log, "reuse buffer %d", io->buffer_id); + + pinos_transport_add_event (impl->transport, (SpaEvent *)&rb); io->buffer_id = SPA_ID_INVALID; } + + tmp = impl->transport->outputs[i]; + impl->transport->outputs[i] = *io; + + pinos_log_trace ("%d %d %d %d", io->status, io->buffer_id, tmp.status, tmp.buffer_id); + + if (io->status == SPA_RESULT_NEED_INPUT) + send_need = true; + + *io = tmp; } - send_need_input (this); + if (send_need) + send_need_input (this); return SPA_RESULT_HAVE_OUTPUT; } @@ -922,7 +960,7 @@ static void proxy_on_data_fd_events (SpaSource *source) { SpaProxy *this = source->data; - PinosNode *pnode = this->pnode; + PinosClientNodeImpl *impl = this->impl; if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { spa_log_warn (this->log, "proxy %p: got error", this); @@ -935,9 +973,9 @@ proxy_on_data_fd_events (SpaSource *source) read (this->data_source.fd, &cmd, 8); - while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) { + while (pinos_transport_next_event (impl->transport, &event) == SPA_RESULT_OK) { SpaEvent *ev = alloca (SPA_POD_SIZE (&event)); - pinos_transport_parse_event (pnode->transport, ev); + pinos_transport_parse_event (impl->transport, ev); this->event_cb (&this->node, ev, this->user_data); } } @@ -1004,17 +1042,22 @@ proxy_init (SpaProxy *this, } static void -on_transport_changed (PinosListener *listener, - PinosNode *node) +on_initialized (PinosListener *listener, + PinosNode *node) { - PinosClientNodeImpl *impl = SPA_CONTAINER_OF (listener, PinosClientNodeImpl, transport_changed); + PinosClientNodeImpl *impl = SPA_CONTAINER_OF (listener, PinosClientNodeImpl, initialized); PinosClientNode *this = &impl->this; PinosTransportInfo info; if (this->resource == NULL) return; - pinos_transport_get_info (node->transport, &info); + impl->transport = pinos_transport_new (node->max_input_ports, + node->max_output_ports); + impl->transport->area->n_inputs = node->n_input_ports; + impl->transport->area->n_outputs = node->n_output_ports; + + pinos_transport_get_info (impl->transport, &info); pinos_client_node_notify_transport (this->resource, info.memfd, info.offset, @@ -1071,13 +1114,12 @@ client_node_resource_destroy (PinosResource *resource) pinos_signal_remove (&impl->global_added); pinos_signal_remove (&impl->loop_changed); - pinos_signal_remove (&impl->transport_changed); + pinos_signal_remove (&impl->initialized); if (proxy->data_source.fd != -1) { spa_loop_remove_source (proxy->data_loop, &proxy->data_source); close (proxy->data_source.fd); } - pinos_node_destroy (this->node); } @@ -1092,6 +1134,9 @@ on_node_free (PinosListener *listener, pinos_signal_remove (&impl->node_free); + if (impl->transport) + pinos_transport_destroy (impl->transport); + if (impl->data_fd != -1) close (impl->data_fd); free (impl); @@ -1140,7 +1185,7 @@ pinos_client_node_new (PinosClient *client, if (this->node == NULL) goto error_no_node; - impl->proxy.pnode = this->node; + impl->proxy.impl = impl; this->resource = pinos_resource_new (client, id, @@ -1156,9 +1201,9 @@ pinos_client_node_new (PinosClient *client, &impl->node_free, on_node_free); - pinos_signal_add (&this->node->transport_changed, - &impl->transport_changed, - on_transport_changed); + pinos_signal_add (&this->node->initialized, + &impl->initialized, + on_initialized); pinos_signal_add (&this->node->loop_changed, &impl->loop_changed, diff --git a/pinos/server/node.c b/pinos/server/node.c index 6cf3e5cee..07b3a2f82 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -62,15 +62,14 @@ update_port_ids (PinosNode *node) &n_output_ports, &max_output_ports); - node->transport = pinos_transport_new (max_input_ports, - max_output_ports); + node->n_input_ports = n_input_ports; + node->max_input_ports = max_input_ports; + node->n_output_ports = n_output_ports; + node->max_output_ports = max_output_ports; node->input_port_map = calloc (max_input_ports, sizeof (PinosPort *)); node->output_port_map = calloc (max_output_ports, sizeof (PinosPort *)); - node->transport->area->n_inputs = n_input_ports; - node->transport->area->n_outputs = n_output_ports; - input_port_ids = alloca (sizeof (uint32_t) * n_input_ports); output_port_ids = alloca (sizeof (uint32_t) * n_output_ports); @@ -98,8 +97,7 @@ update_port_ids (PinosNode *node) pinos_log_debug ("node %p: input port added %d", node, input_port_ids[i]); np = pinos_port_new (node, PINOS_DIRECTION_INPUT, input_port_ids[i]); - np->io = &node->transport->inputs[np->port_id]; - if ((res = spa_node_port_set_io (node->node, SPA_DIRECTION_INPUT, np->port_id, np->io)) < 0) + if ((res = spa_node_port_set_io (node->node, SPA_DIRECTION_INPUT, np->port_id, &np->io)) < 0) pinos_log_warn ("node %p: can't set input IO %d", node, res); spa_list_insert (ports, &np->link); @@ -137,8 +135,7 @@ update_port_ids (PinosNode *node) pinos_log_debug ("node %p: output port added %d", node, output_port_ids[i]); np = pinos_port_new (node, PINOS_DIRECTION_OUTPUT, output_port_ids[i]); - np->io = &node->transport->outputs[np->port_id]; - if ((res = spa_node_port_set_io (node->node, SPA_DIRECTION_OUTPUT, np->port_id, np->io)) < 0) + if ((res = spa_node_port_set_io (node->node, SPA_DIRECTION_OUTPUT, np->port_id, &np->io)) < 0) pinos_log_warn ("node %p: can't set output IO %d", node, res); spa_list_insert (ports, &np->link); @@ -160,8 +157,7 @@ update_port_ids (PinosNode *node) break; } } - - pinos_signal_emit (&node->transport_changed, node); + pinos_signal_emit (&node->initialized, node); } static SpaResult @@ -257,6 +253,61 @@ send_clock_update (PinosNode *this) pinos_log_debug ("got error %d", res); } +static SpaResult +do_pull (PinosNode *this) +{ + SpaResult res = SPA_RESULT_OK; + PinosPort *inport; + + spa_list_for_each (inport, &this->input_ports, link) { + PinosLink *link; + PinosPort *outport; + SpaPortIO *pi; + SpaPortIO *po; + + pi = &inport->io; + pinos_log_trace ("node %p: need input port %d, %d %d", this, + inport->port_id, pi->buffer_id, pi->status); + + if (pi->status != SPA_RESULT_NEED_INPUT) + continue; + + spa_list_for_each (link, &inport->rt.links, rt.input_link) { + if (link->rt.input == NULL || link->rt.output == NULL) + continue; + + outport = link->rt.output; + po = &outport->io; + + /* pull */ + *po = *pi; + + pinos_log_trace ("node %p: process output %p %d", outport->node, po, po->buffer_id); + + res = spa_node_process_output (outport->node->node); + + if (res == SPA_RESULT_NEED_INPUT) { + res = do_pull (outport->node); + + *pi = *po; + pinos_log_trace ("node %p: pull return %d", outport->node, res); + } + else if (res == SPA_RESULT_HAVE_OUTPUT) { + *pi = *po; + } + else + pinos_log_warn ("node %p: got process output %d", outport->node, res); + } + if (pi->buffer_id != SPA_ID_INVALID) { + pinos_log_trace ("node %p: process input %d %d", this, pi->status, pi->buffer_id); + res = spa_node_process_input (this->node); + if (res == SPA_RESULT_HAVE_OUTPUT) + break; + } + } + return res; +} + static void on_node_event (SpaNode *node, SpaEvent *event, void *user_data) { @@ -272,99 +323,29 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data) } } else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.NeedInput) { - SpaResult res; - int i; - - this->sched_state = STATE_IN; - - for (i = 0; i < this->transport->area->n_inputs; i++) { - PinosLink *link; - PinosPort *inport, *outport; - SpaPortIO *pi; - SpaPortIO *po; - - pi = &this->transport->inputs[i]; - pinos_log_trace ("node %p: need input port %d, %d", this, i, pi->buffer_id); - - inport = this->input_port_map[i]; - spa_list_for_each (link, &inport->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - outport = link->rt.output; - po = &outport->node->transport->outputs[outport->port_id]; - - if (outport->node->sched_state == STATE_IN) { - /* pull */ - *po = *pi; - pi->buffer_id = SPA_ID_INVALID; - - pinos_log_trace ("node %p: process output %p %d", outport->node, po, po->buffer_id); - - res = spa_node_process_output (outport->node->node); - - if (res == SPA_RESULT_NEED_INPUT) { - on_node_event (outport->node->node, event, outport->node); - if (outport->node->sched_state == STATE_OUT) - goto push; - } - else if (res == SPA_RESULT_HAVE_OUTPUT) { - outport->node->sched_state = STATE_OUT; - } - else - pinos_log_warn ("node %p: got process output %d", outport->node, res); - } else { - /* push */ -push: - *pi = *po; - - pinos_log_trace ("node %p: process output %p %d", outport->node, po, po->buffer_id); - - res = spa_node_process_output (outport->node->node); - - if (res == SPA_RESULT_HAVE_OUTPUT) - outport->node->sched_state = STATE_OUT; - else if (res == SPA_RESULT_NEED_INPUT) - outport->node->sched_state = STATE_IN; - else if (res < 0) - pinos_log_warn ("node %p: got process output %d", this, res); - } - } - if (pi->buffer_id != SPA_ID_INVALID) { - pinos_log_trace ("node %p: process input", this); - res = spa_node_process_input (this->node); - - if (res == SPA_RESULT_HAVE_OUTPUT) - this->sched_state = STATE_OUT; - else if (res == SPA_RESULT_NEED_INPUT) - this->sched_state = STATE_IN; - else if (res < 0) - pinos_log_warn ("node %p: got process input %d", this, res); - } - } + do_pull (this); } else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.HaveOutput) { SpaResult res; - int i; + PinosPort *outport; - for (i = 0; i < this->transport->area->n_outputs; i++) { + spa_list_for_each (outport, &this->output_ports, link) { PinosLink *link; - PinosPort *inport, *outport; + PinosPort *inport; SpaPortIO *po; - po = &this->transport->outputs[i]; + po = &outport->io; if (po->buffer_id == SPA_ID_INVALID) continue; pinos_log_trace ("node %p: have output %d", this, po->buffer_id); - outport = this->output_port_map[i]; spa_list_for_each (link, &outport->rt.links, rt.output_link) { if (link->rt.input == NULL || link->rt.output == NULL) continue; inport = link->rt.input; - inport->node->transport->inputs[inport->port_id] = *po; + inport->io = *po; if ((res = spa_node_process_input (inport->node->node)) < 0) pinos_log_warn ("node %p: got process input %d", inport->node, res); @@ -374,31 +355,25 @@ push: pinos_log_warn ("node %p: got process output %d", this, res); } -#if 0 else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.ReuseBuffer) { SpaEventNodeReuseBuffer *rb = (SpaEventNodeReuseBuffer *) event; - int i; + PinosPort *inport; pinos_log_trace ("node %p: reuse buffer %u", this, rb->body.buffer_id.value); - for (i = 0; i < this->transport->area->n_inputs; i++) { + spa_list_for_each (inport, &this->input_ports, link) { PinosLink *link; - PinosPort *inport, *outport; - SpaPortIO *po; + PinosPort *outport; - inport = this->input_port_map[i]; spa_list_for_each (link, &inport->rt.links, rt.input_link) { if (link->rt.input == NULL || link->rt.output == NULL) continue; outport = link->rt.output; - po = &outport->node->transport->outputs[outport->port_id]; - - po->buffer_id = rb->body.buffer_id.value; + outport->io.buffer_id = rb->body.buffer_id.value; } } } -#endif else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.RequestClockUpdate) { send_clock_update (this); } @@ -437,8 +412,8 @@ node_bind_func (PinosGlobal *global, info.id = global->id; info.change_mask = ~0; info.name = this->name; - info.max_inputs = this->transport->area->max_inputs; - info.n_inputs = this->transport->area->n_inputs; + info.max_inputs = this->max_input_ports; + info.n_inputs = this->n_input_ports; info.input_formats = NULL; for (info.n_input_formats = 0; ; info.n_input_formats++) { SpaFormat *fmt; @@ -454,8 +429,8 @@ node_bind_func (PinosGlobal *global, info.input_formats = realloc (info.input_formats, sizeof (SpaFormat*) * (info.n_input_formats + 1)); info.input_formats[info.n_input_formats] = spa_format_copy (fmt); } - info.max_outputs = this->transport->area->max_outputs; - info.n_outputs = this->transport->area->n_outputs; + info.max_outputs = this->max_output_ports; + info.n_outputs = this->n_output_ports; info.output_formats = NULL; for (info.n_output_formats = 0; ; info.n_output_formats++) { SpaFormat *fmt; @@ -568,7 +543,7 @@ pinos_node_new (PinosCore *core, pinos_signal_init (&this->state_changed); pinos_signal_init (&this->free_signal); pinos_signal_init (&this->async_complete); - pinos_signal_init (&this->transport_changed); + pinos_signal_init (&this->initialized); pinos_signal_init (&this->loop_changed); this->state = PINOS_NODE_STATE_CREATING; @@ -634,8 +609,6 @@ do_node_remove_done (SpaLoop *loop, pinos_work_queue_destroy (impl->work); - if (this->transport) - pinos_transport_destroy (this->transport); if (this->input_port_map) free (this->input_port_map); if (this->output_port_map) @@ -737,22 +710,19 @@ pinos_node_get_free_port (PinosNode *node, uint32_t *n_ports, max_ports; SpaList *ports; PinosPort *port = NULL, *p, **portmap; - SpaPortIO *io; SpaResult res; int i; if (direction == PINOS_DIRECTION_INPUT) { - max_ports = node->transport->area->max_inputs; - n_ports = &node->transport->area->n_inputs; + max_ports = node->max_input_ports; + n_ports = &node->n_input_ports; ports = &node->input_ports; portmap = node->input_port_map; - io = node->transport->inputs; } else { - max_ports = node->transport->area->max_outputs; - n_ports = &node->transport->area->n_outputs; + max_ports = node->max_output_ports; + n_ports = &node->n_output_ports; ports = &node->output_ports; portmap = node->output_port_map; - io = node->transport->outputs; } pinos_log_debug ("node %p: direction %d max %u, n %u", node, direction, max_ports, *n_ports); @@ -771,13 +741,13 @@ pinos_node_get_free_port (PinosNode *node, if (portmap[i] == NULL) { pinos_log_debug ("node %p: creating port direction %d %u", node, direction, i); port = portmap[i] = pinos_port_new (node, direction, i); - port->io = &io[i]; + spa_list_insert (ports, &port->link); (*n_ports)++; if ((res = spa_node_add_port (node->node, direction, i)) < 0) { pinos_log_error ("node %p: could not add port %d", node, i); } else { - spa_node_port_set_io (node->node, direction, i, port->io); + spa_node_port_set_io (node->node, direction, i, &port->io); } } } diff --git a/pinos/server/node.h b/pinos/server/node.h index 10e331d33..52ae888a5 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -67,12 +67,24 @@ struct _PinosNode { SpaNode *node; bool live; SpaClock *clock; - int sched_state; SpaList resource_list; - SpaList input_ports; - SpaList output_ports; + PINOS_SIGNAL (initialized, (PinosListener *listener, + PinosNode *object)); + + uint32_t max_input_ports; + uint32_t n_input_ports; + SpaList input_ports; + PinosPort **input_port_map; + uint32_t n_used_input_links; + + uint32_t max_output_ports; + uint32_t n_output_ports; + SpaList output_ports; + PinosPort **output_port_map; + uint32_t n_used_output_links; + PINOS_SIGNAL (port_added, (PinosListener *listener, PinosNode *node, PinosPort *port)); @@ -80,16 +92,6 @@ struct _PinosNode { PinosNode *node, PinosPort *port)); - PinosPort **input_port_map; - PinosPort **output_port_map; - - uint32_t n_used_output_links; - uint32_t n_used_input_links; - - PinosTransport *transport; - PINOS_SIGNAL (transport_changed, (PinosListener *listener, - PinosNode *object)); - PINOS_SIGNAL (destroy_signal, (PinosListener *listener, PinosNode *object)); PINOS_SIGNAL (free_signal, (PinosListener *listener, diff --git a/pinos/server/port.c b/pinos/server/port.c index 2000cfb96..e16a69b51 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -49,6 +49,8 @@ pinos_port_new (PinosNode *node, this->direction = direction; this->port_id = port_id; this->state = SPA_PORT_STATE_CONFIGURE; + this->io.status = SPA_RESULT_OK; + this->io.buffer_id = SPA_ID_INVALID; spa_list_init (&this->links); spa_list_init (&this->rt.links); diff --git a/pinos/server/port.h b/pinos/server/port.h index 2a19eccd7..2bbfa515a 100644 --- a/pinos/server/port.h +++ b/pinos/server/port.h @@ -46,7 +46,7 @@ struct _PinosPort { PinosDirection direction; uint32_t port_id; uint32_t state; - SpaPortIO *io; + SpaPortIO io; bool allocated; PinosMemblock buffer_mem; diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index c7f240f2e..ef32389a4 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -61,10 +61,8 @@ typedef struct { /** * SpaPortIO: - * @state: the port state - * @flags: extra flags - * @buffer_id: a buffer id * @status: the status + * @buffer_id: a buffer id * @range: requested range * @event: event * @@ -72,10 +70,8 @@ typedef struct { * by the host and configured on all ports for which IO is requested. */ typedef struct { -#define SPA_PORT_IO_FLAG_RANGE (1 << 0) /* a range is present */ - uint32_t flags; - uint32_t buffer_id; uint32_t status; + uint32_t buffer_id; SpaRange range; } SpaPortIO; diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index 418845040..2f62617d9 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -337,8 +337,7 @@ pull_frames_queue (SpaALSAState *state, if (spa_list_is_empty (&state->ready)) { SpaEvent event = SPA_EVENT_INIT (state->type.event_node.NeedInput); - io->flags = SPA_PORT_IO_FLAG_RANGE; - io->status = SPA_RESULT_OK; + io->status = SPA_RESULT_NEED_INPUT; io->range.offset = state->sample_count * state->frame_size; io->range.min_size = state->threshold * state->frame_size; io->range.max_size = frames * state->frame_size; @@ -370,7 +369,7 @@ pull_frames_queue (SpaALSAState *state, spa_list_remove (&b->link); b->outstanding = true; - + state->io->buffer_id = b->outbuf->id; spa_log_trace (state->log, "alsa-util %p: reuse buffer %u", state, b->outbuf->id); state->event_cb (&state->node, (SpaEvent *)&rb, state->user_data); state->ready_offset = 0; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index ba77e6d38..9be4ecd9e 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -327,7 +327,7 @@ static SpaResult clear_buffers (SpaAudioMixer *this, SpaAudioMixerPort *port) { if (port->n_buffers > 0) { - spa_log_info (this->log, "audio-mixer %p: clear buffers", this); + spa_log_info (this->log, "audio-mixer %p: clear buffers %p", this, port); port->n_buffers = 0; spa_list_init (&port->queue); } @@ -593,11 +593,15 @@ add_port_data (SpaAudioMixer *this, MixerBuffer *out, SpaAudioMixerPort *port, i int i; int16_t *op, *ip; size_t os, is, chunk; - MixerBuffer *b = spa_list_first (&port->queue, MixerBuffer, link); + MixerBuffer *b; op = SPA_MEMBER (out->outbuf->datas[0].data, out->outbuf->datas[0].chunk->offset, void); os = out->outbuf->datas[0].chunk->size; - ip = SPA_MEMBER (b->outbuf->datas[0].data, port->queued_offset + b->outbuf->datas[0].chunk->offset, void); + + b = spa_list_first (&port->queue, MixerBuffer, link); + + ip = SPA_MEMBER (b->outbuf->datas[0].data, + port->queued_offset + b->outbuf->datas[0].chunk->offset, void); is = b->outbuf->datas[0].chunk->size - port->queued_offset; chunk = SPA_MIN (os, is); @@ -611,36 +615,98 @@ add_port_data (SpaAudioMixer *this, MixerBuffer *out, SpaAudioMixerPort *port, i op[i] = SPA_CLAMP (op[i] + ip[i], INT16_MIN, INT16_MAX); } + op += chunk / 2; + os -= chunk; + port->queued_offset += chunk; port->queued_bytes -= chunk; if (chunk == is) { - spa_log_trace (this->log, "audiomixer %p: return buffer %d on port %p", this, b->outbuf->id, port); + spa_log_trace (this->log, "audiomixer %p: return buffer %d on port %p %zd", + this, b->outbuf->id, port, chunk); port->io->buffer_id = b->outbuf->id; spa_list_remove (&b->link); b->outstanding = true; port->queued_offset = 0; + } else { + spa_log_trace (this->log, "audiomixer %p: keeping buffer %d on port %p %zd %zd", + this, b->outbuf->id, port, port->queued_bytes, chunk); } } +static SpaResult +mix_output (SpaAudioMixer *this, size_t n_bytes) +{ + MixerBuffer *outbuf; + int i, layer; + SpaAudioMixerPort *outport; + SpaPortIO *output; + + outport = &this->out_ports[0]; + output = outport->io; + + if (spa_list_is_empty (&outport->queue)) + return SPA_RESULT_OUT_OF_BUFFERS; + + outbuf = spa_list_first (&outport->queue, MixerBuffer, link); + spa_list_remove (&outbuf->link); + + n_bytes = SPA_MIN (n_bytes, outbuf->outbuf->datas[0].maxsize); + + spa_log_trace (this->log, "audiomixer %p: dequeue output buffer %d %zd", this, outbuf->outbuf->id, n_bytes); + + outbuf->outstanding = true; + outbuf->outbuf->datas[0].chunk->offset = 0; + outbuf->outbuf->datas[0].chunk->size = n_bytes; + outbuf->outbuf->datas[0].chunk->stride = 0; + + for (layer = 0, i = 0; i < MAX_PORTS; i++) { + SpaAudioMixerPort *port = &this->in_ports[i]; + + if (port->io == NULL || port->n_buffers == 0) + continue; + + if (spa_list_is_empty (&port->queue)) { + spa_log_warn (this->log, "audiomixer %p: underrun stream %d", this, i); + port->queued_bytes = 0; + port->queued_offset = 0; + continue; + } + add_port_data (this, outbuf, port, layer++); + } + if (layer == 0) { + this->state = STATE_IN; + return SPA_RESULT_NEED_INPUT; + } + + output->buffer_id = outbuf->outbuf->id; + output->status = SPA_RESULT_HAVE_OUTPUT; + this->state = STATE_OUT; + + return SPA_RESULT_HAVE_OUTPUT; +} + static SpaResult spa_audiomixer_node_process_input (SpaNode *node) { + SpaResult res; SpaAudioMixer *this; uint32_t i; SpaAudioMixerPort *outport; - size_t min_queued = -1; + size_t min_queued = SIZE_MAX; + SpaPortIO *output; spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS); this = SPA_CONTAINER_OF (node, SpaAudioMixer, node); + outport = &this->out_ports[0]; + output = outport->io; + spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR); + if (this->state == STATE_OUT) return SPA_RESULT_HAVE_OUTPUT; - outport = &this->out_ports[0]; - spa_return_val_if_fail (outport->io != NULL, SPA_RESULT_ERROR); - for (i = 0; i < MAX_PORTS; i++) { SpaAudioMixerPort *port = &this->in_ports[i]; SpaPortIO *input; @@ -648,7 +714,9 @@ spa_audiomixer_node_process_input (SpaNode *node) if ((input = port->io) == NULL || port->n_buffers == 0) continue; - if (input->buffer_id != SPA_ID_INVALID) { + if (port->queued_bytes == 0 && + input->status == SPA_RESULT_HAVE_OUTPUT && + input->buffer_id != SPA_ID_INVALID) { MixerBuffer *b = &port->buffers[input->buffer_id]; if (!b->outstanding) { @@ -656,66 +724,31 @@ spa_audiomixer_node_process_input (SpaNode *node) continue; } - if (spa_list_is_empty (&port->queue)) { - port->queued_bytes = 0; - port->queued_offset = 0; - } - spa_list_insert (port->queue.prev, &b->link); b->outstanding = false; - port->queued_bytes += b->outbuf->datas[0].chunk->size; - spa_log_trace (this->log, "audiomixer %p: queue buffer %d on port %p %zd %zd", - this, b->outbuf->id, port, port->queued_bytes, min_queued); input->buffer_id = SPA_ID_INVALID; + + spa_list_insert (port->queue.prev, &b->link); + port->queued_bytes += b->outbuf->datas[0].chunk->size; + + spa_log_trace (this->log, "audiomixer %p: queue buffer %d on port %d %zd %zd", + this, b->outbuf->id, i, port->queued_bytes, min_queued); } - if (min_queued == -1 || (port->queued_bytes > 0 && port->queued_bytes < min_queued)) + if (min_queued == SIZE_MAX || port->queued_bytes < min_queued) min_queued = port->queued_bytes; - - input->status = SPA_RESULT_OK; } - if (min_queued > 0) { - MixerBuffer *outbuf; - SpaPortIO *output; - int layer; - - if (spa_list_is_empty (&outport->queue)) - return SPA_RESULT_OUT_OF_BUFFERS; - - outbuf = spa_list_first (&outport->queue, MixerBuffer, link); - spa_list_remove (&outbuf->link); - spa_log_trace (this->log, "audiomixer %p: dequeue output buffer %d %zd", this, outbuf->outbuf->id, min_queued); - outbuf->outstanding = true; - outbuf->outbuf->datas[0].chunk->offset = 0; - outbuf->outbuf->datas[0].chunk->size = min_queued; - outbuf->outbuf->datas[0].chunk->stride = 0; - - for (layer = 0, i = 0; i < MAX_PORTS; i++) { - SpaAudioMixerPort *port = &this->in_ports[i]; - - if (port->io == NULL) - continue; - - if (spa_list_is_empty (&port->queue)) { - spa_log_warn (this->log, "audiomixer %p: underrun stream %d", this, i); - continue; - } - - add_port_data (this, outbuf, port, layer++); - } - if (layer == 0) - clear_buffer (this, outbuf); - - output = outport->io; - output->buffer_id = outbuf->outbuf->id; - output->status = SPA_RESULT_OK; - this->state = STATE_OUT; + if (min_queued != SIZE_MAX && min_queued > 0) { + res = mix_output (this, min_queued); + } else { + res = SPA_RESULT_NEED_INPUT; } - return this->state == STATE_IN ? SPA_RESULT_NEED_INPUT : SPA_RESULT_HAVE_OUTPUT; + return res; } static SpaResult spa_audiomixer_node_process_output (SpaNode *node) { + SpaResult res; SpaAudioMixer *this; SpaAudioMixerPort *port; SpaPortIO *output; @@ -729,24 +762,54 @@ spa_audiomixer_node_process_output (SpaNode *node) spa_return_val_if_fail (port->io != NULL, SPA_RESULT_ERROR); output = port->io; - spa_return_val_if_fail (port->have_format, SPA_RESULT_NO_FORMAT); + spa_return_val_if_fail (port->n_buffers > 0, SPA_RESULT_NO_BUFFERS); - for (i = 0; i < MAX_PORTS; i++) { - SpaPortIO *input; - - if ((input = this->in_ports[i].io) == NULL) - continue; - - input->flags = output->flags; - input->range = output->range; - } + /* recycle */ if (output->buffer_id != SPA_ID_INVALID) { recycle_buffer (this, output->buffer_id); output->buffer_id = SPA_ID_INVALID; } - this->state = STATE_IN; + res = SPA_RESULT_NEED_INPUT; + /* produce more output if possible */ + if (this->state == STATE_OUT) { + size_t min_queued = -1; - return SPA_RESULT_NEED_INPUT; + for (i = 0; i < MAX_PORTS; i++) { + SpaAudioMixerPort *port = &this->in_ports[i]; + + if (port->io == NULL || port->n_buffers == 0) + continue; + + if (min_queued == -1 || port->queued_bytes < min_queued) + min_queued = port->queued_bytes; + } + if (min_queued != -1 && min_queued > 0) { + res = mix_output (this, min_queued); + } else { + this->state = STATE_IN; + } + } + /* take requested output range and apply to input */ + if (this->state == STATE_IN) { + for (i = 0; i < MAX_PORTS; i++) { + SpaAudioMixerPort *port = &this->in_ports[i]; + SpaPortIO *input; + + if ((input = port->io) == NULL || port->n_buffers == 0) + continue; + + if (port->queued_bytes == 0) { + input->range = output->range; + input->status = SPA_RESULT_NEED_INPUT; + } + else { + input->status = SPA_RESULT_OK; + } + spa_log_trace (this->log, "audiomixer %p: port %d %d queued %zd, res %d", this, + i, output->range.min_size, port->queued_bytes, input->status); + } + } + return res; } static const SpaNode audiomixer_node = { @@ -832,6 +895,7 @@ spa_audiomixer_init (const SpaHandleFactory *factory, init_type (&this->type, this->map); this->node = audiomixer_node; + this->state = STATE_IN; this->out_ports[0].io = NULL; this->out_ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index d10be06df..f9d434f1a 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -295,10 +295,13 @@ audiotestsrc_make_buffer (SpaAudioTestSrc *this) b->outstanding = true; n_bytes = b->outbuf->datas[0].maxsize; - if (io->flags & SPA_PORT_IO_FLAG_RANGE) - n_bytes = SPA_CLAMP (n_bytes, io->range.min_size, io->range.max_size); + if (io->range.min_size != 0) { + if (io->range.max_size < n_bytes) + n_bytes = io->range.max_size; + } - spa_log_trace (this->log, "audiotestsrc %p: dequeue buffer %d %d", this, b->outbuf->id, n_bytes); + spa_log_trace (this->log, "audiotestsrc %p: dequeue buffer %d %d %d", this, b->outbuf->id, + b->outbuf->datas[0].maxsize, n_bytes); n_samples = n_bytes / this->bpf; this->render_func (this, b->outbuf->datas[0].data, n_samples); @@ -317,9 +320,8 @@ audiotestsrc_make_buffer (SpaAudioTestSrc *this) this->elapsed_time = SAMPLES_TO_TIME (this, this->sample_count); set_timer (this, true); - io->flags = 0; io->buffer_id = b->outbuf->id; - io->status = SPA_RESULT_OK; + io->status = SPA_RESULT_HAVE_OUTPUT; return SPA_RESULT_HAVE_OUTPUT; } @@ -817,17 +819,23 @@ static SpaResult spa_audiotestsrc_node_process_output (SpaNode *node) { SpaAudioTestSrc *this; + SpaPortIO *io; spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS); this = SPA_CONTAINER_OF (node, SpaAudioTestSrc, node); + io = this->io; + spa_return_val_if_fail (io != NULL, SPA_RESULT_WRONG_STATE); - if (this->io && this->io->buffer_id != SPA_ID_INVALID) { + if (io->status == SPA_RESULT_HAVE_OUTPUT) + return SPA_RESULT_HAVE_OUTPUT; + + if (io->buffer_id != SPA_ID_INVALID) { reuse_buffer (this, this->io->buffer_id); this->io->buffer_id = SPA_ID_INVALID; } - if (!this->async) + if (!this->async && (io->status == SPA_RESULT_NEED_INPUT)) return audiotestsrc_make_buffer (this); else return SPA_RESULT_OK; diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index 63486b668..977ebde85 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -305,9 +305,8 @@ videotestsrc_make_buffer (SpaVideoTestSrc *this) this->elapsed_time = FRAMES_TO_TIME (this, this->frame_count); set_timer (this, true); - io->flags = 0; io->buffer_id = b->outbuf->id; - io->status = SPA_RESULT_OK; + io->status = SPA_RESULT_HAVE_OUTPUT; return SPA_RESULT_HAVE_OUTPUT; } diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 3e0fdab18..745ffd754 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -100,13 +100,13 @@ typedef struct { SpaNode *source1; SpaPortIO source1_mix_io[1]; - SpaBuffer *source1_buffers[1]; - Buffer source1_buffer[1]; + SpaBuffer *source1_buffers[2]; + Buffer source1_buffer[2]; SpaNode *source2; SpaPortIO source2_mix_io[1]; - SpaBuffer *source2_buffers[1]; - Buffer source2_buffer[1]; + SpaBuffer *source2_buffers[2]; + Buffer source2_buffer[2]; bool running; pthread_t thread; @@ -119,35 +119,43 @@ typedef struct { unsigned int n_fds; } AppData; -#define BUFFER_SIZE 4096 +#define BUFFER_SIZE1 4092 +#define BUFFER_SIZE2 4096 static void -init_buffer (AppData *data, Buffer *b, void *ptr, size_t size) +init_buffer (AppData *data, SpaBuffer **bufs, Buffer *ba, int n_buffers, size_t size) { - b->buffer.id = 0; - b->buffer.n_metas = 1; - b->buffer.metas = b->metas; - b->buffer.n_datas = 1; - b->buffer.datas = b->datas; + int i; - b->header.flags = 0; - b->header.seq = 0; - b->header.pts = 0; - b->header.dts_offset = 0; - b->metas[0].type = SPA_META_TYPE_HEADER; - b->metas[0].data = &b->header; - b->metas[0].size = sizeof (b->header); + for (i = 0; i < n_buffers; i++) { + Buffer *b = &ba[i]; + bufs[i] = &b->buffer; - b->datas[0].type = SPA_DATA_TYPE_MEMPTR; - b->datas[0].flags = 0; - b->datas[0].fd = -1; - b->datas[0].mapoffset = 0; - b->datas[0].maxsize = size; - b->datas[0].data = ptr; - b->datas[0].chunk = &b->chunks[0]; - b->datas[0].chunk->offset = 0; - b->datas[0].chunk->size = size; - b->datas[0].chunk->stride = 0; + b->buffer.id = i; + b->buffer.n_metas = 1; + b->buffer.metas = b->metas; + b->buffer.n_datas = 1; + b->buffer.datas = b->datas; + + b->header.flags = 0; + b->header.seq = 0; + b->header.pts = 0; + b->header.dts_offset = 0; + b->metas[0].type = SPA_META_TYPE_HEADER; + b->metas[0].data = &b->header; + b->metas[0].size = sizeof (b->header); + + b->datas[0].type = SPA_DATA_TYPE_MEMPTR; + b->datas[0].flags = 0; + b->datas[0].fd = -1; + b->datas[0].mapoffset = 0; + b->datas[0].maxsize = size; + b->datas[0].data = malloc (size); + b->datas[0].chunk = &b->chunks[0]; + b->datas[0].chunk->offset = 0; + b->datas[0].chunk->size = size; + b->datas[0].chunk->stride = 0; + } } static SpaResult @@ -213,15 +221,17 @@ on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) if (res == SPA_RESULT_NEED_INPUT) { - res = spa_node_process_output (data->source1); + if (data->source1_mix_io[0].status == SPA_RESULT_NEED_INPUT) { + res = spa_node_process_output (data->source1); + if (res != SPA_RESULT_HAVE_OUTPUT) + printf ("got process_output error from source1 %d\n", res); + } - if (res != SPA_RESULT_HAVE_OUTPUT) - printf ("got process_output error from source1 %d\n", res); - - res = spa_node_process_output (data->source2); - - if (res != SPA_RESULT_HAVE_OUTPUT) - printf ("got process_output error from source2 %d\n", res); + if (data->source2_mix_io[0].status == SPA_RESULT_NEED_INPUT) { + res = spa_node_process_output (data->source2); + if (res != SPA_RESULT_HAVE_OUTPUT) + printf ("got process_output error from source2 %d\n", res); + } res = spa_node_process_input (data->mix); if (res == SPA_RESULT_HAVE_OUTPUT) @@ -301,7 +311,7 @@ make_nodes (AppData *data) spa_pod_builder_init (&b, buffer, sizeof (buffer)); spa_pod_builder_props (&b, &f[0], data->type.props, - SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, "hw:1"), + SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, "hw:0"), SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, 256), SPA_POD_PROP (&f[1], data->type.props_live, 0, SPA_POD_TYPE_BOOL, 1, false)); props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps); @@ -392,8 +402,7 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) return res; - init_buffer (data, &data->mix_buffer[0], malloc (BUFFER_SIZE), BUFFER_SIZE); - data->mix_buffers[0] = &data->mix_buffer[0].buffer; + init_buffer (data, data->mix_buffers, data->mix_buffer, 1, BUFFER_SIZE2); if ((res = spa_node_port_use_buffers (data->sink, SPA_DIRECTION_INPUT, 0, data->mix_buffers, 1)) < 0) return res; if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_OUTPUT, 0, data->mix_buffers, 1)) < 0) @@ -412,11 +421,10 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_set_format (data->source1, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) return res; - init_buffer (data, &data->source1_buffer[0], malloc (BUFFER_SIZE), BUFFER_SIZE); - data->source1_buffers[0] = &data->source1_buffer[0].buffer; - if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[0], data->source1_buffers, 1)) < 0) + init_buffer (data, data->source1_buffers, data->source1_buffer, 2, BUFFER_SIZE1); + if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[0], data->source1_buffers, 2)) < 0) return res; - if ((res = spa_node_port_use_buffers (data->source1, SPA_DIRECTION_OUTPUT, 0, data->source1_buffers, 1)) < 0) + if ((res = spa_node_port_use_buffers (data->source1, SPA_DIRECTION_OUTPUT, 0, data->source1_buffers, 2)) < 0) return res; data->mix_ports[1] = 1; @@ -432,11 +440,10 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_set_format (data->source2, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) return res; - init_buffer (data, &data->source2_buffer[0], malloc (BUFFER_SIZE), BUFFER_SIZE); - data->source2_buffers[0] = &data->source2_buffer[0].buffer; - if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[1], data->source2_buffers, 1)) < 0) + init_buffer (data, data->source2_buffers, data->source2_buffer, 2, BUFFER_SIZE2); + if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[1], data->source2_buffers, 2)) < 0) return res; - if ((res = spa_node_port_use_buffers (data->source2, SPA_DIRECTION_OUTPUT, 0, data->source2_buffers, 1)) < 0) + if ((res = spa_node_port_use_buffers (data->source2, SPA_DIRECTION_OUTPUT, 0, data->source2_buffers, 2)) < 0) return res; return SPA_RESULT_OK; @@ -547,6 +554,7 @@ main (int argc, char *argv[]) { AppData data = { NULL }; SpaResult res; + const char *str; data.map = spa_type_map_get_default(); data.log = spa_log_get_default(); @@ -556,7 +564,8 @@ main (int argc, char *argv[]) data.data_loop.remove_source = do_remove_source; data.data_loop.invoke = do_invoke; -// data.log->level = SPA_LOG_LEVEL_TRACE; + if ((str = getenv ("PINOS_DEBUG"))) + data.log->level = atoi (str); data.support[0].type = SPA_TYPE__TypeMap; data.support[0].data = data.map;