From 6691eb784590918bf8145febc617843d2840e489 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 18 May 2017 17:16:48 +0200 Subject: [PATCH] graph: add graph datastructure and scheduler Improve event and command init so that it can be used more easily as compound literals. Improve volume Make it possible to use graph scheduler in test-mixer --- pinos/client/stream.c | 20 +- pinos/client/transport.h | 3 +- pinos/server/client-node.c | 8 +- pinos/server/node.c | 19 +- pinos/server/port.c | 3 +- spa/include/spa/command-node.h | 3 +- spa/include/spa/command.h | 4 +- spa/include/spa/event-node.h | 6 +- spa/include/spa/event.h | 4 +- spa/include/spa/graph.h | 221 +++++++++++ spa/plugins/alsa/alsa-sink.c | 7 +- spa/plugins/alsa/alsa-source.c | 14 +- spa/plugins/audiomixer/audiomixer.c | 1 - spa/plugins/v4l2/v4l2-source.c | 14 +- spa/plugins/volume/volume.c | 90 +++-- spa/tests/meson.build | 5 + spa/tests/test-graph.c | 562 ++++++++++++++++++++++++++++ spa/tests/test-mixer.c | 94 ++++- 18 files changed, 970 insertions(+), 108 deletions(-) create mode 100644 spa/include/spa/graph.h create mode 100644 spa/tests/test-graph.c diff --git a/pinos/client/stream.c b/pinos/client/stream.c index fc854249c..50395026b 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -386,10 +386,10 @@ static inline void send_have_output (PinosStream *stream) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - SpaEvent ho = SPA_EVENT_INIT (stream->context->type.event_transport.HaveOutput); uint64_t cmd = 1; - pinos_transport_add_event (impl->trans, &ho); + pinos_transport_add_event (impl->trans, + &SPA_EVENT_INIT (stream->context->type.event_transport.HaveOutput)); write (impl->rtwritefd, &cmd, 8); } @@ -397,11 +397,10 @@ static void add_request_clock_update (PinosStream *stream) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - SpaEventNodeRequestClockUpdate rcu = - SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT (stream->context->type.event_node.RequestClockUpdate, - SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, 0, 0); - pinos_client_node_do_event (impl->node_proxy, (SpaEvent*)&rcu); + pinos_client_node_do_event (impl->node_proxy, (SpaEvent*) + &SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT (stream->context->type.event_node.RequestClockUpdate, + SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, 0, 0)); } static void @@ -410,10 +409,11 @@ add_async_complete (PinosStream *stream, SpaResult res) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - SpaEventNodeAsyncComplete ac = - SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (stream->context->type.event_node.AsyncComplete, - seq, res); - pinos_client_node_do_event (impl->node_proxy, (SpaEvent*)&ac); + + pinos_client_node_do_event (impl->node_proxy, (SpaEvent*) + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (stream->context->type.event_node.AsyncComplete, + seq, res)); + } static void diff --git a/pinos/client/transport.h b/pinos/client/transport.h index 4d462b04b..5c0d94060 100644 --- a/pinos/client/transport.h +++ b/pinos/client/transport.h @@ -118,7 +118,8 @@ typedef struct { } PinosEventTransportReuseBuffer; #define PINOS_EVENT_TRANSPORT_REUSE_BUFFER_INIT(type,port_id,buffer_id) \ - SPA_EVENT_INIT_COMPLEX (sizeof (PinosEventTransportReuseBufferBody), type, \ + SPA_EVENT_INIT_COMPLEX (PinosEventTransportReuseBuffer, \ + sizeof (PinosEventTransportReuseBufferBody), type, \ SPA_POD_INT_INIT (port_id), \ SPA_POD_INT_INIT (buffer_id)) diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 373f79782..7789662db 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -172,9 +172,9 @@ static inline void send_need_input (SpaProxy *this) { PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy); - SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_transport.NeedInput); - pinos_transport_add_event (impl->transport, &event); + pinos_transport_add_event (impl->transport, + &SPA_EVENT_INIT (impl->core->type.event_transport.NeedInput)); do_flush (this); } @@ -182,9 +182,9 @@ static inline void send_have_output (SpaProxy *this) { PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy); - SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_transport.HaveOutput); - pinos_transport_add_event (impl->transport, &event); + pinos_transport_add_event (impl->transport, + &SPA_EVENT_INIT (impl->core->type.event_transport.HaveOutput)); do_flush (this); } diff --git a/pinos/server/node.c b/pinos/server/node.c index 8d9af3918..af38977c7 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -166,11 +166,10 @@ pause_node (PinosNode *this) return SPA_RESULT_OK; pinos_log_debug ("node %p: pause node", this); - { - SpaCommand cmd = SPA_COMMAND_INIT (this->core->type.command_node.Pause); - if ((res = spa_node_send_command (this->node, &cmd)) < 0) - pinos_log_debug ("got error %d", res); - } + + if ((res = spa_node_send_command (this->node, + &SPA_COMMAND_INIT (this->core->type.command_node.Pause))) < 0) + pinos_log_debug ("got error %d", res); return res; } @@ -181,11 +180,11 @@ start_node (PinosNode *this) SpaResult res; pinos_log_debug ("node %p: start node", this); - { - SpaCommand cmd = SPA_COMMAND_INIT (this->core->type.command_node.Start); - if ((res = spa_node_send_command (this->node, &cmd)) < 0) - pinos_log_debug ("got error %d", res); - } + + if ((res = spa_node_send_command (this->node, + &SPA_COMMAND_INIT (this->core->type.command_node.Start))) < 0) + pinos_log_debug ("got error %d", res); + return res; } diff --git a/pinos/server/port.c b/pinos/server/port.c index 4749bb313..74f695519 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -202,7 +202,6 @@ no_mem: SpaResult pinos_port_pause_rt (PinosPort *port) { - SpaCommand cmd = SPA_COMMAND_INIT (port->node->core->type.command_node.Pause); SpaResult res; if (port->state <= PINOS_PORT_STATE_PAUSED) @@ -211,7 +210,7 @@ pinos_port_pause_rt (PinosPort *port) res = spa_node_port_send_command (port->node->node, port->direction, port->port_id, - &cmd); + &SPA_COMMAND_INIT (port->node->core->type.command_node.Pause)); port->state = PINOS_PORT_STATE_PAUSED; pinos_log_debug ("port %p: state PAUSED", port); return res; diff --git a/spa/include/spa/command-node.h b/spa/include/spa/command-node.h index 9b3aafeb7..d60212e09 100644 --- a/spa/include/spa/command-node.h +++ b/spa/include/spa/command-node.h @@ -95,7 +95,8 @@ typedef struct { } SpaCommandNodeClockUpdate; #define SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(type,change_mask,rate,ticks,monotonic_time,offset,scale,state,flags,latency) \ - SPA_COMMAND_INIT_COMPLEX (sizeof (SpaCommandNodeClockUpdateBody), type, \ + SPA_COMMAND_INIT_COMPLEX (SpaCommandNodeClockUpdate, \ + sizeof (SpaCommandNodeClockUpdateBody), type, \ SPA_POD_INT_INIT (change_mask), \ SPA_POD_INT_INIT (rate), \ SPA_POD_LONG_INIT (ticks), \ diff --git a/spa/include/spa/command.h b/spa/include/spa/command.h index 59cad9fed..aca3caaaa 100644 --- a/spa/include/spa/command.h +++ b/spa/include/spa/command.h @@ -43,11 +43,11 @@ struct _SpaCommand { #define SPA_COMMAND_TYPE(cmd) ((cmd)->body.body.type) -#define SPA_COMMAND_INIT(type) \ +#define SPA_COMMAND_INIT(type) (SpaCommand) \ { { sizeof (SpaCommandBody), SPA_POD_TYPE_OBJECT }, \ { { 0, type } } } \ -#define SPA_COMMAND_INIT_COMPLEX(size,type,...) \ +#define SPA_COMMAND_INIT_COMPLEX(t,size,type,...) (t) \ { { size, SPA_POD_TYPE_OBJECT }, \ { { 0, type }, __VA_ARGS__ } } \ diff --git a/spa/include/spa/event-node.h b/spa/include/spa/event-node.h index e92eab23d..67b145187 100644 --- a/spa/include/spa/event-node.h +++ b/spa/include/spa/event-node.h @@ -70,7 +70,8 @@ typedef struct { } SpaEventNodeAsyncComplete; #define SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(type,seq,res) \ - SPA_EVENT_INIT_COMPLEX (sizeof (SpaEventNodeAsyncCompleteBody), type, \ + SPA_EVENT_INIT_COMPLEX (SpaEventNodeAsyncComplete, \ + sizeof (SpaEventNodeAsyncCompleteBody), type, \ SPA_POD_INT_INIT (seq), \ SPA_POD_INT_INIT (res)) @@ -90,7 +91,8 @@ typedef struct { } SpaEventNodeRequestClockUpdate; #define SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(type,update_mask,timestamp,offset) \ - SPA_EVENT_INIT_COMPLEX (sizeof (SpaEventNodeRequestClockUpdateBody), type, \ + SPA_EVENT_INIT_COMPLEX (SpaEventNodeRequestClockUpdate, \ + sizeof (SpaEventNodeRequestClockUpdateBody), type, \ SPA_POD_INT_INIT (update_mask), \ SPA_POD_LONG_INIT (timestamp), \ SPA_POD_LONG_INIT (offset)) diff --git a/spa/include/spa/event.h b/spa/include/spa/event.h index 987b4875e..f3e1d9d65 100644 --- a/spa/include/spa/event.h +++ b/spa/include/spa/event.h @@ -43,11 +43,11 @@ struct _SpaEvent { #define SPA_EVENT_TYPE(ev) ((ev)->body.body.type) -#define SPA_EVENT_INIT(type) \ +#define SPA_EVENT_INIT(type) (SpaEvent) \ { { sizeof (SpaEventBody), SPA_POD_TYPE_OBJECT }, \ { { 0, type } } } \ -#define SPA_EVENT_INIT_COMPLEX(size,type,...) \ +#define SPA_EVENT_INIT_COMPLEX(t,size,type,...) (t) \ { { size, SPA_POD_TYPE_OBJECT }, \ { { 0, type }, __VA_ARGS__ } } \ diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h new file mode 100644 index 000000000..d598e82fb --- /dev/null +++ b/spa/include/spa/graph.h @@ -0,0 +1,221 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_H__ +#define __SPA_GRAPH_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +typedef struct SpaGraph SpaGraph; +typedef struct SpaGraphNode SpaGraphNode; +typedef struct SpaGraphPort SpaGraphPort; + +struct SpaGraph { + SpaList nodes; + SpaList ready; +}; + +#define PROCESS_CHECK 0 +#define PROCESS_IN 1 +#define PROCESS_OUT 2 + +typedef SpaResult (*SpaGraphNodeFunc) (SpaGraphNode *node); + +struct SpaGraphNode { + SpaList link; + SpaList ready_link; + SpaList ports[2]; +#define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0) + uint32_t flags; + SpaResult state; + uint32_t action; + SpaGraphNodeFunc schedule; + void *user_data; + uint32_t max_in; + uint32_t required_in; + uint32_t ready_in; +}; + +struct SpaGraphPort { + SpaList link; + SpaGraphNode *node; + SpaDirection direction; + uint32_t port_id; + uint32_t flags; + SpaPortIO *io; + SpaGraphPort *peer; +}; + +static inline void +spa_graph_init (SpaGraph *graph) +{ + spa_list_init (&graph->nodes); + spa_list_init (&graph->ready); +} + +static inline SpaResult +spa_graph_node_schedule_default (SpaGraphNode *node) +{ + SpaNode *n = node->user_data; + if (node->action == PROCESS_IN) + return spa_node_process_input (n); + else if (node->action == PROCESS_OUT) + return spa_node_process_output (n); + else + return SPA_RESULT_ERROR; +} + +static inline void +spa_graph_node_add (SpaGraph *graph, SpaGraphNode *node, SpaGraphNodeFunc schedule, void *user_data) +{ + spa_list_init (&node->ports[SPA_DIRECTION_INPUT]); + spa_list_init (&node->ports[SPA_DIRECTION_OUTPUT]); + node->flags = 0; + node->state = SPA_RESULT_OK; + node->action = PROCESS_OUT; + node->schedule = schedule; + node->user_data = user_data; + spa_list_insert (graph->nodes.prev, &node->link); + node->max_in = node->required_in = node->ready_in = 0; +} + +static inline void +spa_graph_port_check (SpaGraph *graph, + SpaGraphPort *port) +{ + SpaGraphNode *node = port->node; + + if (port->io->status == SPA_RESULT_HAVE_BUFFER) + node->ready_in++; + + if (node->required_in > 0 && node->ready_in == node->required_in) { + node->action = PROCESS_IN; + if (node->ready_link.next == NULL) + spa_list_insert (graph->ready.prev, &node->ready_link); + } else if (node->ready_link.next) { + spa_list_remove (&node->ready_link); + node->ready_link.next = NULL; + } +} + +static inline void +spa_graph_port_add (SpaGraph *graph, + SpaGraphNode *node, + SpaGraphPort *port, + SpaDirection direction, + uint32_t port_id, + uint32_t flags, + SpaPortIO *io) +{ + port->node = node; + port->direction = direction; + port->port_id = port_id; + port->flags = flags; + port->io = io; + port->peer = NULL; + spa_list_insert (node->ports[port->direction].prev, &port->link); + node->max_in++; + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT) + node->required_in++; + spa_graph_port_check (graph, port); +} + +static inline void +spa_graph_node_remove (SpaGraph *graph, SpaGraphNode *node) +{ + spa_list_remove (&node->link); +} + +static inline void +spa_graph_port_remove (SpaGraph *graph, SpaGraphPort *port) +{ + spa_list_remove (&port->link); +} + +static inline void +spa_graph_port_link (SpaGraph *graph, SpaGraphPort *out, SpaGraphPort *in) +{ + out->peer = in; + in->peer = out; +} + +static inline void +spa_graph_port_unlink (SpaGraph *graph, SpaGraphPort *out, SpaGraphPort *in) +{ + out->peer = NULL; + in->peer = NULL; +} + +static inline void +spa_graph_node_schedule (SpaGraph *graph, SpaGraphNode *node) +{ + SpaGraphPort *p; + + if (node->ready_link.next == NULL) + spa_list_insert (graph->ready.prev, &node->ready_link); + + while (!spa_list_is_empty (&graph->ready)) { + SpaGraphNode *n = spa_list_first (&graph->ready, SpaGraphNode, ready_link); + + spa_list_remove (&n->ready_link); + n->ready_link.next = NULL; + + switch (n->action) { + case PROCESS_IN: + case PROCESS_OUT: + n->state = n->schedule (n); + n->action = PROCESS_CHECK; + spa_list_insert (graph->ready.prev, &n->ready_link); + break; + + case PROCESS_CHECK: + if (n->state == SPA_RESULT_NEED_BUFFER) { + n->ready_in = 0; + spa_list_for_each (p, &n->ports[SPA_DIRECTION_INPUT], link) { + SpaGraphNode *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + pn->action = PROCESS_OUT; + spa_list_insert (graph->ready.prev, &pn->ready_link); + } + else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; + } + } + else if (n->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each (p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check (graph, p->peer); + } + break; + + default: + break; + } + } +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_H__ */ diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 1b7a8eb95..c1ad02130 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -127,13 +127,12 @@ do_command (SpaLoop *loop, res = SPA_RESULT_NOT_IMPLEMENTED; if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index d8a1ab43d..41235f98b 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -120,13 +120,12 @@ do_start (SpaLoop *loop, res = spa_alsa_start (this, false); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; @@ -146,13 +145,12 @@ do_pause (SpaLoop *loop, res = spa_alsa_pause (this, false); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index dfa1ac397..cba37cfe2 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -532,7 +532,6 @@ static void recycle_buffer (SpaAudioMixer *this, uint32_t id) { SpaAudioMixerPort *port = &this->out_ports[0]; - MixerBuffer *b = &port->buffers[id]; if (!b->outstanding) { diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 4ac1c0d8c..a2f92bdff 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -267,13 +267,12 @@ do_pause (SpaLoop *loop, cmd); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->state[0].main_loop, do_pause_done, seq, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; @@ -313,13 +312,12 @@ do_start (SpaLoop *loop, cmd); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->state[0].main_loop, do_start_done, seq, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 73609cca1..75f7f3c75 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -567,13 +567,28 @@ spa_volume_node_port_set_io (SpaNode *node, return SPA_RESULT_OK; } +static void +recycle_buffer (SpaVolume *this, uint32_t id) +{ + SpaVolumePort *port = &this->out_ports[0]; + SpaVolumeBuffer *b = &port->buffers[id]; + + if (!b->outstanding) { + spa_log_warn (this->log, "volume %p: buffer %d not outstanding", this, id); + return; + } + + spa_list_insert (port->empty.prev, &b->link); + b->outstanding = false; + spa_log_trace (this->log, "volume %p: recycle buffer %d", this, id); +} + static SpaResult spa_volume_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, uint32_t buffer_id) { SpaVolume *this; - SpaVolumeBuffer *b; SpaVolumePort *port; spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS); @@ -590,12 +605,7 @@ spa_volume_node_port_reuse_buffer (SpaNode *node, if (buffer_id >= port->n_buffers) return SPA_RESULT_INVALID_BUFFER_ID; - b = &port->buffers[buffer_id]; - if (!b->outstanding) - return SPA_RESULT_OK; - - b->outstanding = false; - spa_list_insert (port->empty.prev, &b->link); + recycle_buffer (this, buffer_id); return SPA_RESULT_OK; } @@ -627,7 +637,8 @@ find_free_buffer (SpaVolume *this, SpaVolumePort *port) static inline void release_buffer (SpaVolume *this, SpaBuffer *buffer) { - this->callbacks.reuse_buffer (&this->node, 0, buffer->id, this->user_data); + if (this->callbacks.reuse_buffer) + this->callbacks.reuse_buffer (&this->node, 0, buffer->id, this->user_data); } static void @@ -686,43 +697,28 @@ spa_volume_node_process_input (SpaNode *node) this = SPA_CONTAINER_OF (node, SpaVolume, node); - in_port = &this->in_ports[0]; out_port = &this->out_ports[0]; + output = out_port->io; + spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR); - if ((input = in_port->io) == NULL) - return SPA_RESULT_ERROR; - if ((output = out_port->io) == NULL) - return SPA_RESULT_ERROR; + if (output->status == SPA_RESULT_HAVE_BUFFER) + return SPA_RESULT_HAVE_BUFFER; - if (!in_port->have_format) { - input->status = SPA_RESULT_NO_FORMAT; - return SPA_RESULT_ERROR; - } - if (input->buffer_id >= in_port->n_buffers) { - input->status = SPA_RESULT_INVALID_BUFFER_ID; - return SPA_RESULT_ERROR; - } + in_port = &this->in_ports[0]; + input = in_port->io; + spa_return_val_if_fail (input != NULL, SPA_RESULT_ERROR); - if (output->buffer_id >= out_port->n_buffers) { - dbuf = find_free_buffer (this, out_port); - } else { - dbuf = out_port->buffers[output->buffer_id].outbuf; - } - if (dbuf == NULL) + if ((dbuf = find_free_buffer (this, out_port)) == NULL) return SPA_RESULT_OUT_OF_BUFFERS; sbuf = in_port->buffers[input->buffer_id].outbuf; - input->buffer_id = SPA_ID_INVALID; - input->status = SPA_RESULT_OK; + input->status = SPA_RESULT_NEED_BUFFER; do_volume (this, sbuf, dbuf); output->buffer_id = dbuf->id; - output->status = SPA_RESULT_OK; - - if (sbuf != dbuf) - release_buffer (this, sbuf); + output->status = SPA_RESULT_HAVE_BUFFER; return SPA_RESULT_HAVE_BUFFER; } @@ -730,6 +726,34 @@ spa_volume_node_process_input (SpaNode *node) static SpaResult spa_volume_node_process_output (SpaNode *node) { + SpaVolume *this; + SpaVolumePort *in_port, *out_port; + SpaPortIO *input, *output; + + spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS); + + this = SPA_CONTAINER_OF (node, SpaVolume, node); + + out_port = &this->out_ports[0]; + output = out_port->io; + spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR); + + if (output->status == SPA_RESULT_HAVE_BUFFER) + return SPA_RESULT_HAVE_BUFFER; + + /* recycle */ + if (output->buffer_id != SPA_ID_INVALID) { + recycle_buffer (this, output->buffer_id); + output->buffer_id = SPA_ID_INVALID; + } + + in_port = &this->in_ports[0]; + input = in_port->io; + spa_return_val_if_fail (input != NULL, SPA_RESULT_ERROR); + + input->range = output->range; + input->status = SPA_RESULT_NEED_BUFFER; + return SPA_RESULT_NEED_BUFFER; } diff --git a/spa/tests/meson.build b/spa/tests/meson.build index 3d86ea65a..65a6a728d 100644 --- a/spa/tests/meson.build +++ b/spa/tests/meson.build @@ -8,6 +8,11 @@ executable('test-ringbuffer', 'test-ringbuffer.c', dependencies : [dl_lib, pthread_lib], link_with : spalib, install : false) +executable('test-graph', 'test-graph.c', + include_directories : [spa_inc, spa_libinc ], + dependencies : [dl_lib, pthread_lib], + link_with : spalib, + install : false) executable('stress-ringbuffer', 'stress-ringbuffer.c', include_directories : [spa_inc, spa_libinc ], dependencies : [dl_lib, pthread_lib], diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c new file mode 100644 index 000000000..e2d6c0eb5 --- /dev/null +++ b/spa/tests/test-graph.c @@ -0,0 +1,562 @@ +/* Spa + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +typedef struct { + uint32_t node; + uint32_t props; + uint32_t format; + uint32_t props_device; + uint32_t props_freq; + uint32_t props_volume; + uint32_t props_min_latency; + uint32_t props_live; + SpaTypeMeta meta; + SpaTypeData data; + SpaTypeMediaType media_type; + SpaTypeMediaSubtype media_subtype; + SpaTypeFormatAudio format_audio; + SpaTypeAudioFormat audio_format; + SpaTypeEventNode event_node; + SpaTypeCommandNode command_node; +} Type; + +static inline void +init_type (Type *type, SpaTypeMap *map) +{ + type->node = spa_type_map_get_id (map, SPA_TYPE__Node); + type->props = spa_type_map_get_id (map, SPA_TYPE__Props); + type->format = spa_type_map_get_id (map, SPA_TYPE__Format); + type->props_device = spa_type_map_get_id (map, SPA_TYPE_PROPS__device); + type->props_freq = spa_type_map_get_id (map, SPA_TYPE_PROPS__frequency); + type->props_volume = spa_type_map_get_id (map, SPA_TYPE_PROPS__volume); + type->props_min_latency = spa_type_map_get_id (map, SPA_TYPE_PROPS__minLatency); + type->props_live = spa_type_map_get_id (map, SPA_TYPE_PROPS__live); + spa_type_meta_map (map, &type->meta); + spa_type_data_map (map, &type->data); + spa_type_media_type_map (map, &type->media_type); + spa_type_media_subtype_map (map, &type->media_subtype); + spa_type_format_audio_map (map, &type->format_audio); + spa_type_audio_format_map (map, &type->audio_format); + spa_type_event_node_map (map, &type->event_node); + spa_type_command_node_map (map, &type->command_node); +} + +typedef struct { + SpaBuffer buffer; + SpaMeta metas[1]; + SpaMetaHeader header; + SpaData datas[1]; + SpaChunk chunks[1]; +} Buffer; + +typedef struct { + SpaTypeMap *map; + SpaLog *log; + SpaLoop data_loop; + Type type; + + SpaSupport support[4]; + uint32_t n_support; + + SpaGraph graph; + SpaGraphNode source_node; + SpaGraphPort source_out; + SpaGraphPort volume_in; + SpaGraphNode volume_node; + SpaGraphPort volume_out; + SpaGraphPort sink_in; + SpaGraphNode sink_node; + + SpaNode *sink; + SpaPortIO volume_sink_io[1]; + + SpaNode *volume; + SpaBuffer *volume_buffers[1]; + Buffer volume_buffer[1]; + + SpaNode *source; + SpaPortIO source_volume_io[1]; + SpaBuffer *source_buffers[1]; + Buffer source_buffer[1]; + + bool running; + pthread_t thread; + + SpaSource sources[16]; + unsigned int n_sources; + + bool rebuild_fds; + struct pollfd fds[16]; + unsigned int n_fds; +} AppData; + +#define MIN_LATENCY 64 + +#define BUFFER_SIZE MIN_LATENCY + +static void +init_buffer (AppData *data, SpaBuffer **bufs, Buffer *ba, int n_buffers, size_t size) +{ + int i; + + for (i = 0; i < n_buffers; i++) { + Buffer *b = &ba[i]; + bufs[i] = &b->buffer; + + b->buffer.id = i; + b->buffer.n_metas = 1; + b->buffer.metas = b->metas; + b->buffer.n_datas = 1; + b->buffer.datas = b->datas; + + b->header.flags = 0; + b->header.seq = 0; + b->header.pts = 0; + b->header.dts_offset = 0; + b->metas[0].type = data->type.meta.Header; + b->metas[0].data = &b->header; + b->metas[0].size = sizeof (b->header); + + b->datas[0].type = data->type.data.MemPtr; + b->datas[0].flags = 0; + b->datas[0].fd = -1; + b->datas[0].mapoffset = 0; + b->datas[0].maxsize = size; + b->datas[0].data = malloc (size); + b->datas[0].chunk = &b->chunks[0]; + b->datas[0].chunk->offset = 0; + b->datas[0].chunk->size = size; + b->datas[0].chunk->stride = 0; + } +} + +static SpaResult +make_node (AppData *data, SpaNode **node, const char *lib, const char *name) +{ + SpaHandle *handle; + SpaResult res; + void *hnd; + SpaEnumHandleFactoryFunc enum_func; + unsigned int i; + uint32_t state = 0; + + if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) { + printf ("can't load %s: %s\n", lib, dlerror()); + return SPA_RESULT_ERROR; + } + if ((enum_func = dlsym (hnd, "spa_enum_handle_factory")) == NULL) { + printf ("can't find enum function\n"); + return SPA_RESULT_ERROR; + } + + for (i = 0; ;i++) { + const SpaHandleFactory *factory; + void *iface; + + if ((res = enum_func (&factory, state++)) < 0) { + if (res != SPA_RESULT_ENUM_END) + printf ("can't enumerate factories: %d\n", res); + break; + } + if (strcmp (factory->name, name)) + continue; + + handle = calloc (1, factory->size); + if ((res = spa_handle_factory_init (factory, handle, NULL, data->support, data->n_support)) < 0) { + printf ("can't make factory instance: %d\n", res); + return res; + } + if ((res = spa_handle_get_interface (handle, data->type.node, &iface)) < 0) { + printf ("can't get interface %d\n", res); + return res; + } + *node = iface; + return SPA_RESULT_OK; + } + return SPA_RESULT_ERROR; +} + +static void +on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) +{ + printf ("got event %d\n", SPA_EVENT_TYPE (event)); +} + +static void +on_sink_need_input (SpaNode *node, void *user_data) +{ + AppData *data = user_data; + + data->sink_node.action = PROCESS_CHECK; + data->sink_node.state = SPA_RESULT_NEED_BUFFER; + + spa_graph_node_schedule (&data->graph, &data->sink_node); +} + +static void +on_sink_reuse_buffer (SpaNode *node, uint32_t port_id, uint32_t buffer_id, void *user_data) +{ + AppData *data = user_data; + + data->volume_sink_io[0].buffer_id = buffer_id; +} + +static const SpaNodeCallbacks sink_callbacks = +{ + &on_sink_event, + &on_sink_need_input, + NULL, + &on_sink_reuse_buffer +}; + +static SpaResult +do_add_source (SpaLoop *loop, + SpaSource *source) +{ + AppData *data = SPA_CONTAINER_OF (loop, AppData, data_loop); + + data->sources[data->n_sources] = *source; + data->n_sources++; + data->rebuild_fds = true; + + return SPA_RESULT_OK; +} + +static SpaResult +do_update_source (SpaSource *source) +{ + return SPA_RESULT_OK; +} + +static void +do_remove_source (SpaSource *source) +{ +} + +static SpaResult +do_invoke (SpaLoop *loop, + SpaInvokeFunc func, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + return func (loop, false, seq, size, data, user_data); +} + +static SpaResult +make_nodes (AppData *data, const char *device) +{ + SpaResult res; + SpaProps *props; + SpaPODBuilder b = { 0 }; + SpaPODFrame f[2]; + uint8_t buffer[128]; + + if ((res = make_node (data, &data->sink, + "build/spa/plugins/alsa/libspa-alsa.so", + "alsa-sink")) < 0) { + printf ("can't create alsa-sink: %d\n", res); + return res; + } + spa_node_set_callbacks (data->sink, &sink_callbacks, sizeof (sink_callbacks), data); + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_props (&b, &f[0], data->type.props, + SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, device ? device : "hw:0"), + SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, MIN_LATENCY)); + props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps); + + if ((res = spa_node_set_props (data->sink, props)) < 0) + printf ("got set_props error %d\n", res); + + if ((res = make_node (data, &data->volume, + "build/spa/plugins/volume/libspa-volume.so", + "volume")) < 0) { + printf ("can't create volume: %d\n", res); + return res; + } + + if ((res = make_node (data, &data->source, + "build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so", + "audiotestsrc")) < 0) { + printf ("can't create audiotestsrc: %d\n", res); + return res; + } + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_props (&b, &f[0], data->type.props, + SPA_POD_PROP (&f[1], data->type.props_freq, 0, SPA_POD_TYPE_DOUBLE, 1, 600.0), + SPA_POD_PROP (&f[1], data->type.props_volume, 0, SPA_POD_TYPE_DOUBLE, 1, 0.5), + SPA_POD_PROP (&f[1], data->type.props_live, 0, SPA_POD_TYPE_BOOL, 1, false)); + props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps); + + if ((res = spa_node_set_props (data->source, props)) < 0) + printf ("got set_props error %d\n", res); + + spa_node_port_set_io (data->source, SPA_DIRECTION_OUTPUT, 0, &data->source_volume_io[0]); + spa_node_port_set_io (data->volume, SPA_DIRECTION_INPUT, 0, &data->source_volume_io[0]); + spa_node_port_set_io (data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]); + spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]); + + spa_graph_node_add (&data->graph, &data->source_node, spa_graph_node_schedule_default, data->source); + spa_graph_port_add (&data->graph, &data->source_node, + &data->source_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->source_volume_io[0]); + + spa_graph_node_add (&data->graph, &data->volume_node, spa_graph_node_schedule_default, data->volume); + spa_graph_port_add (&data->graph, &data->volume_node, + &data->volume_in, SPA_DIRECTION_INPUT, 0, + 0, &data->source_volume_io[0]); + + spa_graph_port_link (&data->graph, &data->source_out, &data->volume_in); + + spa_graph_port_add (&data->graph, &data->volume_node, + &data->volume_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->volume_sink_io[0]); + + spa_graph_node_add (&data->graph, &data->sink_node, spa_graph_node_schedule_default, data->sink); + spa_graph_port_add (&data->graph, &data->sink_node, + &data->sink_in, SPA_DIRECTION_INPUT, 0, + 0, &data->volume_sink_io[0]); + + spa_graph_port_link (&data->graph, &data->volume_out, &data->sink_in); + + return res; +} + +static SpaResult +negotiate_formats (AppData *data) +{ + SpaResult res; + SpaFormat *format, *filter; + uint32_t state = 0; + SpaPODBuilder b = { 0 }; + SpaPODFrame f[2]; + uint8_t buffer[256]; + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_format (&b, &f[0], data->type.format, + data->type.media_type.audio, data->type.media_subtype.raw, + SPA_POD_PROP (&f[1], data->type.format_audio.format, 0, + SPA_POD_TYPE_ID, 1, + data->type.audio_format.S16), + SPA_POD_PROP (&f[1], data->type.format_audio.layout, 0, + SPA_POD_TYPE_INT, 1, + SPA_AUDIO_LAYOUT_INTERLEAVED), + SPA_POD_PROP (&f[1], data->type.format_audio.rate, 0, + SPA_POD_TYPE_INT, 1, + 44100), + SPA_POD_PROP (&f[1], data->type.format_audio.channels, 0, + SPA_POD_TYPE_INT, 1, + 2)); + filter = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaFormat); + + if ((res = spa_node_port_enum_formats (data->sink, SPA_DIRECTION_INPUT, 0, &format, filter, state)) < 0) + return res; + + + if ((res = spa_node_port_set_format (data->sink, SPA_DIRECTION_INPUT, 0, 0, format)) < 0) + return res; + + if ((res = spa_node_port_set_format (data->volume, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) + return res; + + init_buffer (data, data->volume_buffers, data->volume_buffer, 1, BUFFER_SIZE); + if ((res = spa_node_port_use_buffers (data->sink, SPA_DIRECTION_INPUT, 0, data->volume_buffers, 1)) < 0) + return res; + if ((res = spa_node_port_use_buffers (data->volume, SPA_DIRECTION_OUTPUT, 0, data->volume_buffers, 1)) < 0) + return res; + + if ((res = spa_node_port_set_format (data->volume, SPA_DIRECTION_INPUT, 0, 0, format)) < 0) + return res; + if ((res = spa_node_port_set_format (data->source, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) + return res; + + init_buffer (data, data->source_buffers, data->source_buffer, 1, BUFFER_SIZE); + if ((res = spa_node_port_use_buffers (data->volume, SPA_DIRECTION_INPUT, 0, data->source_buffers, 1)) < 0) + return res; + if ((res = spa_node_port_use_buffers (data->source, SPA_DIRECTION_OUTPUT, 0, data->source_buffers, 1)) < 0) + return res; + + return SPA_RESULT_OK; +} + +static void * +loop (void *user_data) +{ + AppData *data = user_data; + + printf ("enter thread %d\n", data->n_sources); + while (data->running) { + int i, r; + + /* rebuild */ + if (data->rebuild_fds) { + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + data->fds[i].fd = p->fd; + data->fds[i].events = p->mask; + } + data->n_fds = data->n_sources; + data->rebuild_fds = false; + } + + r = poll ((struct pollfd *) data->fds, data->n_fds, -1); + if (r < 0) { + if (errno == EINTR) + continue; + break; + } + if (r == 0) { + fprintf (stderr, "select timeout"); + break; + } + + /* after */ + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + p->rmask = 0; + if (data->fds[i].revents & POLLIN) + p->rmask |= SPA_IO_IN; + if (data->fds[i].revents & POLLOUT) + p->rmask |= SPA_IO_OUT; + if (data->fds[i].revents & POLLHUP) + p->rmask |= SPA_IO_HUP; + if (data->fds[i].revents & POLLERR) + p->rmask |= SPA_IO_ERR; + } + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + if (p->rmask) + p->func (p); + } + } + printf ("leave thread\n"); + + return NULL; +} + +static void +run_async_sink (AppData *data) +{ + SpaResult res; + int err; + + { + SpaCommand cmd = SPA_COMMAND_INIT (data->type.command_node.Start); + if ((res = spa_node_send_command (data->source, &cmd)) < 0) + printf ("got source error %d\n", res); + if ((res = spa_node_send_command (data->volume, &cmd)) < 0) + printf ("got volume error %d\n", res); + if ((res = spa_node_send_command (data->sink, &cmd)) < 0) + printf ("got sink error %d\n", res); + } + + data->running = true; + if ((err = pthread_create (&data->thread, NULL, loop, data)) != 0) { + printf ("can't create thread: %d %s", err, strerror (err)); + data->running = false; + } + + printf ("sleeping for 1000 seconds\n"); + sleep (1000); + + if (data->running) { + data->running = false; + pthread_join (data->thread, NULL); + } + + { + SpaCommand cmd = SPA_COMMAND_INIT (data->type.command_node.Pause); + if ((res = spa_node_send_command (data->sink, &cmd)) < 0) + printf ("got error %d\n", res); + if ((res = spa_node_send_command (data->volume, &cmd)) < 0) + printf ("got volume error %d\n", res); + if ((res = spa_node_send_command (data->source, &cmd)) < 0) + printf ("got source error %d\n", res); + } +} + +int +main (int argc, char *argv[]) +{ + AppData data = { NULL }; + SpaResult res; + const char *str; + + + spa_graph_init (&data.graph); + + data.map = spa_type_map_get_default(); + data.log = spa_log_get_default(); + data.data_loop.size = sizeof (SpaLoop); + data.data_loop.add_source = do_add_source; + data.data_loop.update_source = do_update_source; + data.data_loop.remove_source = do_remove_source; + data.data_loop.invoke = do_invoke; + + if ((str = getenv ("PINOS_DEBUG"))) + data.log->level = atoi (str); + + data.support[0].type = SPA_TYPE__TypeMap; + data.support[0].data = data.map; + data.support[1].type = SPA_TYPE__Log; + data.support[1].data = data.log; + data.support[2].type = SPA_TYPE_LOOP__DataLoop; + data.support[2].data = &data.data_loop; + data.support[3].type = SPA_TYPE_LOOP__MainLoop; + data.support[3].data = &data.data_loop; + data.n_support = 4; + + init_type (&data.type, data.map); + + if ((res = make_nodes (&data, argc > 1 ? argv[1] : NULL)) < 0) { + printf ("can't make nodes: %d\n", res); + return -1; + } + if ((res = negotiate_formats (&data)) < 0) { + printf ("can't negotiate nodes: %d\n", res); + return -1; + } + + run_async_sink (&data); +} diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index c4714bf5d..99d84d655 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,8 @@ #include #include +#undef USE_GRAPH + typedef struct { uint32_t node; uint32_t props; @@ -94,6 +97,17 @@ typedef struct { SpaSupport support[4]; uint32_t n_support; + SpaGraph graph; + SpaGraphNode source1_node; + SpaGraphPort source1_out; + SpaGraphNode source2_node; + SpaGraphPort source2_out; + SpaGraphPort mix_in[2]; + SpaGraphNode mix_node; + SpaGraphPort mix_out; + SpaGraphPort sink_in; + SpaGraphNode sink_node; + SpaNode *sink; SpaPortIO mix_sink_io[1]; @@ -220,6 +234,12 @@ static void on_sink_need_input (SpaNode *node, void *user_data) { AppData *data = user_data; +#ifdef USE_GRAPH + data->sink_node.action = PROCESS_CHECK; + data->sink_node.state = SPA_RESULT_NEED_BUFFER; + + spa_graph_node_schedule (&data->graph, &data->sink_node); +#else SpaResult res; res = spa_node_process_output (data->mix); @@ -249,6 +269,7 @@ push: } else { printf ("got process_output error from mixer %d\n", res); } +#endif } static void @@ -369,6 +390,55 @@ make_nodes (AppData *data, const char *device) if ((res = spa_node_set_props (data->source2, props)) < 0) printf ("got set_props error %d\n", res); + data->mix_ports[0] = 0; + if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 0)) < 0) + return res; + + data->mix_ports[1] = 1; + if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 1)) < 0) + return res; + + spa_node_port_set_io (data->source1, SPA_DIRECTION_OUTPUT, 0, &data->source1_mix_io[0]); + spa_node_port_set_io (data->source2, SPA_DIRECTION_OUTPUT, 0, &data->source2_mix_io[0]); + spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 0, &data->source1_mix_io[0]); + spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 1, &data->source2_mix_io[0]); + spa_node_port_set_io (data->mix, SPA_DIRECTION_OUTPUT, 0, &data->mix_sink_io[0]); + spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); + +#ifdef USE_GRAPH + spa_graph_node_add (&data->graph, &data->source1_node, spa_graph_node_schedule_default, data->source1); + spa_graph_port_add (&data->graph, &data->source1_node, + &data->source1_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->source1_mix_io[0]); + + spa_graph_node_add (&data->graph, &data->source2_node, spa_graph_node_schedule_default, data->source2); + spa_graph_port_add (&data->graph, &data->source2_node, + &data->source2_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->source2_mix_io[0]); + + spa_graph_node_add (&data->graph, &data->mix_node, spa_graph_node_schedule_default, data->mix); + spa_graph_port_add (&data->graph, &data->mix_node, + &data->mix_in[0], SPA_DIRECTION_INPUT, data->mix_ports[0], + 0, &data->source1_mix_io[0]); + spa_graph_port_add (&data->graph, &data->mix_node, + &data->mix_in[1], SPA_DIRECTION_INPUT, data->mix_ports[1], + 0, &data->source2_mix_io[0]); + + spa_graph_port_link (&data->graph, &data->source1_out, &data->mix_in[0]); + spa_graph_port_link (&data->graph, &data->source2_out, &data->mix_in[1]); + + spa_graph_port_add (&data->graph, &data->mix_node, + &data->mix_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->mix_sink_io[0]); + + spa_graph_node_add (&data->graph, &data->sink_node, spa_graph_node_schedule_default, data->sink); + spa_graph_port_add (&data->graph, &data->sink_node, + &data->sink_in, SPA_DIRECTION_INPUT, 0, + 0, &data->mix_sink_io[0]); + + spa_graph_port_link (&data->graph, &data->mix_out, &data->sink_in); +#endif + return res; } @@ -402,13 +472,9 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_enum_formats (data->sink, SPA_DIRECTION_INPUT, 0, &format, filter, state)) < 0) return res; - if ((res = spa_node_port_set_format (data->sink, SPA_DIRECTION_INPUT, 0, 0, format)) < 0) return res; - spa_node_port_set_io (data->mix, SPA_DIRECTION_OUTPUT, 0, &data->mix_sink_io[0]); - spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); - if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) return res; @@ -418,13 +484,6 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_OUTPUT, 0, data->mix_buffers, 1)) < 0) return res; - data->mix_ports[0] = 0; - if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 0)) < 0) - return res; - - spa_node_port_set_io (data->source1, SPA_DIRECTION_OUTPUT, 0, &data->source1_mix_io[0]); - spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 0, &data->source1_mix_io[0]); - if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[0], 0, format)) < 0) return res; @@ -437,13 +496,6 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_use_buffers (data->source1, SPA_DIRECTION_OUTPUT, 0, data->source1_buffers, 2)) < 0) return res; - data->mix_ports[1] = 1; - if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 1)) < 0) - return res; - - spa_node_port_set_io (data->source2, SPA_DIRECTION_OUTPUT, 0, &data->source2_mix_io[0]); - spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 1, &data->source2_mix_io[0]); - if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[1], 0, format)) < 0) return res; @@ -538,8 +590,8 @@ run_async_sink (AppData *data) data->running = false; } - printf ("sleeping for 1000 seconds\n"); - sleep (1000); + printf ("sleeping for 30 seconds\n"); + sleep (30); if (data->running) { data->running = false; @@ -574,6 +626,8 @@ main (int argc, char *argv[]) data.data_loop.remove_source = do_remove_source; data.data_loop.invoke = do_invoke; + spa_graph_init (&data.graph); + if ((str = getenv ("PINOS_DEBUG"))) data.log->level = atoi (str);