diff --git a/pinos/client/stream.c b/pinos/client/stream.c index fc854249c..50395026b 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -386,10 +386,10 @@ static inline void send_have_output (PinosStream *stream) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - SpaEvent ho = SPA_EVENT_INIT (stream->context->type.event_transport.HaveOutput); uint64_t cmd = 1; - pinos_transport_add_event (impl->trans, &ho); + pinos_transport_add_event (impl->trans, + &SPA_EVENT_INIT (stream->context->type.event_transport.HaveOutput)); write (impl->rtwritefd, &cmd, 8); } @@ -397,11 +397,10 @@ static void add_request_clock_update (PinosStream *stream) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - SpaEventNodeRequestClockUpdate rcu = - SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT (stream->context->type.event_node.RequestClockUpdate, - SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, 0, 0); - pinos_client_node_do_event (impl->node_proxy, (SpaEvent*)&rcu); + pinos_client_node_do_event (impl->node_proxy, (SpaEvent*) + &SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT (stream->context->type.event_node.RequestClockUpdate, + SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, 0, 0)); } static void @@ -410,10 +409,11 @@ add_async_complete (PinosStream *stream, SpaResult res) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - SpaEventNodeAsyncComplete ac = - SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (stream->context->type.event_node.AsyncComplete, - seq, res); - pinos_client_node_do_event (impl->node_proxy, (SpaEvent*)&ac); + + pinos_client_node_do_event (impl->node_proxy, (SpaEvent*) + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (stream->context->type.event_node.AsyncComplete, + seq, res)); + } static void diff --git a/pinos/client/transport.h b/pinos/client/transport.h index 4d462b04b..5c0d94060 100644 --- a/pinos/client/transport.h +++ b/pinos/client/transport.h @@ -118,7 +118,8 @@ typedef struct { } PinosEventTransportReuseBuffer; #define PINOS_EVENT_TRANSPORT_REUSE_BUFFER_INIT(type,port_id,buffer_id) \ - SPA_EVENT_INIT_COMPLEX (sizeof (PinosEventTransportReuseBufferBody), type, \ + SPA_EVENT_INIT_COMPLEX (PinosEventTransportReuseBuffer, \ + sizeof (PinosEventTransportReuseBufferBody), type, \ SPA_POD_INT_INIT (port_id), \ SPA_POD_INT_INIT (buffer_id)) diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 373f79782..7789662db 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -172,9 +172,9 @@ static inline void send_need_input (SpaProxy *this) { PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy); - SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_transport.NeedInput); - pinos_transport_add_event (impl->transport, &event); + pinos_transport_add_event (impl->transport, + &SPA_EVENT_INIT (impl->core->type.event_transport.NeedInput)); do_flush (this); } @@ -182,9 +182,9 @@ static inline void send_have_output (SpaProxy *this) { PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy); - SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_transport.HaveOutput); - pinos_transport_add_event (impl->transport, &event); + pinos_transport_add_event (impl->transport, + &SPA_EVENT_INIT (impl->core->type.event_transport.HaveOutput)); do_flush (this); } diff --git a/pinos/server/node.c b/pinos/server/node.c index 8d9af3918..af38977c7 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -166,11 +166,10 @@ pause_node (PinosNode *this) return SPA_RESULT_OK; pinos_log_debug ("node %p: pause node", this); - { - SpaCommand cmd = SPA_COMMAND_INIT (this->core->type.command_node.Pause); - if ((res = spa_node_send_command (this->node, &cmd)) < 0) - pinos_log_debug ("got error %d", res); - } + + if ((res = spa_node_send_command (this->node, + &SPA_COMMAND_INIT (this->core->type.command_node.Pause))) < 0) + pinos_log_debug ("got error %d", res); return res; } @@ -181,11 +180,11 @@ start_node (PinosNode *this) SpaResult res; pinos_log_debug ("node %p: start node", this); - { - SpaCommand cmd = SPA_COMMAND_INIT (this->core->type.command_node.Start); - if ((res = spa_node_send_command (this->node, &cmd)) < 0) - pinos_log_debug ("got error %d", res); - } + + if ((res = spa_node_send_command (this->node, + &SPA_COMMAND_INIT (this->core->type.command_node.Start))) < 0) + pinos_log_debug ("got error %d", res); + return res; } diff --git a/pinos/server/port.c b/pinos/server/port.c index 4749bb313..74f695519 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -202,7 +202,6 @@ no_mem: SpaResult pinos_port_pause_rt (PinosPort *port) { - SpaCommand cmd = SPA_COMMAND_INIT (port->node->core->type.command_node.Pause); SpaResult res; if (port->state <= PINOS_PORT_STATE_PAUSED) @@ -211,7 +210,7 @@ pinos_port_pause_rt (PinosPort *port) res = spa_node_port_send_command (port->node->node, port->direction, port->port_id, - &cmd); + &SPA_COMMAND_INIT (port->node->core->type.command_node.Pause)); port->state = PINOS_PORT_STATE_PAUSED; pinos_log_debug ("port %p: state PAUSED", port); return res; diff --git a/spa/include/spa/command-node.h b/spa/include/spa/command-node.h index 9b3aafeb7..d60212e09 100644 --- a/spa/include/spa/command-node.h +++ b/spa/include/spa/command-node.h @@ -95,7 +95,8 @@ typedef struct { } SpaCommandNodeClockUpdate; #define SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(type,change_mask,rate,ticks,monotonic_time,offset,scale,state,flags,latency) \ - SPA_COMMAND_INIT_COMPLEX (sizeof (SpaCommandNodeClockUpdateBody), type, \ + SPA_COMMAND_INIT_COMPLEX (SpaCommandNodeClockUpdate, \ + sizeof (SpaCommandNodeClockUpdateBody), type, \ SPA_POD_INT_INIT (change_mask), \ SPA_POD_INT_INIT (rate), \ SPA_POD_LONG_INIT (ticks), \ diff --git a/spa/include/spa/command.h b/spa/include/spa/command.h index 59cad9fed..aca3caaaa 100644 --- a/spa/include/spa/command.h +++ b/spa/include/spa/command.h @@ -43,11 +43,11 @@ struct _SpaCommand { #define SPA_COMMAND_TYPE(cmd) ((cmd)->body.body.type) -#define SPA_COMMAND_INIT(type) \ +#define SPA_COMMAND_INIT(type) (SpaCommand) \ { { sizeof (SpaCommandBody), SPA_POD_TYPE_OBJECT }, \ { { 0, type } } } \ -#define SPA_COMMAND_INIT_COMPLEX(size,type,...) \ +#define SPA_COMMAND_INIT_COMPLEX(t,size,type,...) (t) \ { { size, SPA_POD_TYPE_OBJECT }, \ { { 0, type }, __VA_ARGS__ } } \ diff --git a/spa/include/spa/event-node.h b/spa/include/spa/event-node.h index e92eab23d..67b145187 100644 --- a/spa/include/spa/event-node.h +++ b/spa/include/spa/event-node.h @@ -70,7 +70,8 @@ typedef struct { } SpaEventNodeAsyncComplete; #define SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(type,seq,res) \ - SPA_EVENT_INIT_COMPLEX (sizeof (SpaEventNodeAsyncCompleteBody), type, \ + SPA_EVENT_INIT_COMPLEX (SpaEventNodeAsyncComplete, \ + sizeof (SpaEventNodeAsyncCompleteBody), type, \ SPA_POD_INT_INIT (seq), \ SPA_POD_INT_INIT (res)) @@ -90,7 +91,8 @@ typedef struct { } SpaEventNodeRequestClockUpdate; #define SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(type,update_mask,timestamp,offset) \ - SPA_EVENT_INIT_COMPLEX (sizeof (SpaEventNodeRequestClockUpdateBody), type, \ + SPA_EVENT_INIT_COMPLEX (SpaEventNodeRequestClockUpdate, \ + sizeof (SpaEventNodeRequestClockUpdateBody), type, \ SPA_POD_INT_INIT (update_mask), \ SPA_POD_LONG_INIT (timestamp), \ SPA_POD_LONG_INIT (offset)) diff --git a/spa/include/spa/event.h b/spa/include/spa/event.h index 987b4875e..f3e1d9d65 100644 --- a/spa/include/spa/event.h +++ b/spa/include/spa/event.h @@ -43,11 +43,11 @@ struct _SpaEvent { #define SPA_EVENT_TYPE(ev) ((ev)->body.body.type) -#define SPA_EVENT_INIT(type) \ +#define SPA_EVENT_INIT(type) (SpaEvent) \ { { sizeof (SpaEventBody), SPA_POD_TYPE_OBJECT }, \ { { 0, type } } } \ -#define SPA_EVENT_INIT_COMPLEX(size,type,...) \ +#define SPA_EVENT_INIT_COMPLEX(t,size,type,...) (t) \ { { size, SPA_POD_TYPE_OBJECT }, \ { { 0, type }, __VA_ARGS__ } } \ diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h new file mode 100644 index 000000000..d598e82fb --- /dev/null +++ b/spa/include/spa/graph.h @@ -0,0 +1,221 @@ +/* Simple Plugin API + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_GRAPH_H__ +#define __SPA_GRAPH_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +typedef struct SpaGraph SpaGraph; +typedef struct SpaGraphNode SpaGraphNode; +typedef struct SpaGraphPort SpaGraphPort; + +struct SpaGraph { + SpaList nodes; + SpaList ready; +}; + +#define PROCESS_CHECK 0 +#define PROCESS_IN 1 +#define PROCESS_OUT 2 + +typedef SpaResult (*SpaGraphNodeFunc) (SpaGraphNode *node); + +struct SpaGraphNode { + SpaList link; + SpaList ready_link; + SpaList ports[2]; +#define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0) + uint32_t flags; + SpaResult state; + uint32_t action; + SpaGraphNodeFunc schedule; + void *user_data; + uint32_t max_in; + uint32_t required_in; + uint32_t ready_in; +}; + +struct SpaGraphPort { + SpaList link; + SpaGraphNode *node; + SpaDirection direction; + uint32_t port_id; + uint32_t flags; + SpaPortIO *io; + SpaGraphPort *peer; +}; + +static inline void +spa_graph_init (SpaGraph *graph) +{ + spa_list_init (&graph->nodes); + spa_list_init (&graph->ready); +} + +static inline SpaResult +spa_graph_node_schedule_default (SpaGraphNode *node) +{ + SpaNode *n = node->user_data; + if (node->action == PROCESS_IN) + return spa_node_process_input (n); + else if (node->action == PROCESS_OUT) + return spa_node_process_output (n); + else + return SPA_RESULT_ERROR; +} + +static inline void +spa_graph_node_add (SpaGraph *graph, SpaGraphNode *node, SpaGraphNodeFunc schedule, void *user_data) +{ + spa_list_init (&node->ports[SPA_DIRECTION_INPUT]); + spa_list_init (&node->ports[SPA_DIRECTION_OUTPUT]); + node->flags = 0; + node->state = SPA_RESULT_OK; + node->action = PROCESS_OUT; + node->schedule = schedule; + node->user_data = user_data; + spa_list_insert (graph->nodes.prev, &node->link); + node->max_in = node->required_in = node->ready_in = 0; +} + +static inline void +spa_graph_port_check (SpaGraph *graph, + SpaGraphPort *port) +{ + SpaGraphNode *node = port->node; + + if (port->io->status == SPA_RESULT_HAVE_BUFFER) + node->ready_in++; + + if (node->required_in > 0 && node->ready_in == node->required_in) { + node->action = PROCESS_IN; + if (node->ready_link.next == NULL) + spa_list_insert (graph->ready.prev, &node->ready_link); + } else if (node->ready_link.next) { + spa_list_remove (&node->ready_link); + node->ready_link.next = NULL; + } +} + +static inline void +spa_graph_port_add (SpaGraph *graph, + SpaGraphNode *node, + SpaGraphPort *port, + SpaDirection direction, + uint32_t port_id, + uint32_t flags, + SpaPortIO *io) +{ + port->node = node; + port->direction = direction; + port->port_id = port_id; + port->flags = flags; + port->io = io; + port->peer = NULL; + spa_list_insert (node->ports[port->direction].prev, &port->link); + node->max_in++; + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT) + node->required_in++; + spa_graph_port_check (graph, port); +} + +static inline void +spa_graph_node_remove (SpaGraph *graph, SpaGraphNode *node) +{ + spa_list_remove (&node->link); +} + +static inline void +spa_graph_port_remove (SpaGraph *graph, SpaGraphPort *port) +{ + spa_list_remove (&port->link); +} + +static inline void +spa_graph_port_link (SpaGraph *graph, SpaGraphPort *out, SpaGraphPort *in) +{ + out->peer = in; + in->peer = out; +} + +static inline void +spa_graph_port_unlink (SpaGraph *graph, SpaGraphPort *out, SpaGraphPort *in) +{ + out->peer = NULL; + in->peer = NULL; +} + +static inline void +spa_graph_node_schedule (SpaGraph *graph, SpaGraphNode *node) +{ + SpaGraphPort *p; + + if (node->ready_link.next == NULL) + spa_list_insert (graph->ready.prev, &node->ready_link); + + while (!spa_list_is_empty (&graph->ready)) { + SpaGraphNode *n = spa_list_first (&graph->ready, SpaGraphNode, ready_link); + + spa_list_remove (&n->ready_link); + n->ready_link.next = NULL; + + switch (n->action) { + case PROCESS_IN: + case PROCESS_OUT: + n->state = n->schedule (n); + n->action = PROCESS_CHECK; + spa_list_insert (graph->ready.prev, &n->ready_link); + break; + + case PROCESS_CHECK: + if (n->state == SPA_RESULT_NEED_BUFFER) { + n->ready_in = 0; + spa_list_for_each (p, &n->ports[SPA_DIRECTION_INPUT], link) { + SpaGraphNode *pn = p->peer->node; + if (p->io->status == SPA_RESULT_NEED_BUFFER) { + pn->action = PROCESS_OUT; + spa_list_insert (graph->ready.prev, &pn->ready_link); + } + else if (p->io->status == SPA_RESULT_OK) + n->ready_in++; + } + } + else if (n->state == SPA_RESULT_HAVE_BUFFER) { + spa_list_for_each (p, &n->ports[SPA_DIRECTION_OUTPUT], link) + spa_graph_port_check (graph, p->peer); + } + break; + + default: + break; + } + } +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_GRAPH_H__ */ diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 1b7a8eb95..c1ad02130 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -127,13 +127,12 @@ do_command (SpaLoop *loop, res = SPA_RESULT_NOT_IMPLEMENTED; if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index d8a1ab43d..41235f98b 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -120,13 +120,12 @@ do_start (SpaLoop *loop, res = spa_alsa_start (this, false); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; @@ -146,13 +145,12 @@ do_pause (SpaLoop *loop, res = spa_alsa_pause (this, false); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index dfa1ac397..cba37cfe2 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -532,7 +532,6 @@ static void recycle_buffer (SpaAudioMixer *this, uint32_t id) { SpaAudioMixerPort *port = &this->out_ports[0]; - MixerBuffer *b = &port->buffers[id]; if (!b->outstanding) { diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 4ac1c0d8c..a2f92bdff 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -267,13 +267,12 @@ do_pause (SpaLoop *loop, cmd); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->state[0].main_loop, do_pause_done, seq, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } return res; @@ -313,13 +312,12 @@ do_start (SpaLoop *loop, cmd); if (async) { - SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, - seq, res); spa_loop_invoke (this->state[0].main_loop, do_start_done, seq, - sizeof (ac), - &ac, + sizeof (SpaEventNodeAsyncComplete), + &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete, + seq, res), this); } diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 73609cca1..75f7f3c75 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -567,13 +567,28 @@ spa_volume_node_port_set_io (SpaNode *node, return SPA_RESULT_OK; } +static void +recycle_buffer (SpaVolume *this, uint32_t id) +{ + SpaVolumePort *port = &this->out_ports[0]; + SpaVolumeBuffer *b = &port->buffers[id]; + + if (!b->outstanding) { + spa_log_warn (this->log, "volume %p: buffer %d not outstanding", this, id); + return; + } + + spa_list_insert (port->empty.prev, &b->link); + b->outstanding = false; + spa_log_trace (this->log, "volume %p: recycle buffer %d", this, id); +} + static SpaResult spa_volume_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, uint32_t buffer_id) { SpaVolume *this; - SpaVolumeBuffer *b; SpaVolumePort *port; spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS); @@ -590,12 +605,7 @@ spa_volume_node_port_reuse_buffer (SpaNode *node, if (buffer_id >= port->n_buffers) return SPA_RESULT_INVALID_BUFFER_ID; - b = &port->buffers[buffer_id]; - if (!b->outstanding) - return SPA_RESULT_OK; - - b->outstanding = false; - spa_list_insert (port->empty.prev, &b->link); + recycle_buffer (this, buffer_id); return SPA_RESULT_OK; } @@ -627,7 +637,8 @@ find_free_buffer (SpaVolume *this, SpaVolumePort *port) static inline void release_buffer (SpaVolume *this, SpaBuffer *buffer) { - this->callbacks.reuse_buffer (&this->node, 0, buffer->id, this->user_data); + if (this->callbacks.reuse_buffer) + this->callbacks.reuse_buffer (&this->node, 0, buffer->id, this->user_data); } static void @@ -686,43 +697,28 @@ spa_volume_node_process_input (SpaNode *node) this = SPA_CONTAINER_OF (node, SpaVolume, node); - in_port = &this->in_ports[0]; out_port = &this->out_ports[0]; + output = out_port->io; + spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR); - if ((input = in_port->io) == NULL) - return SPA_RESULT_ERROR; - if ((output = out_port->io) == NULL) - return SPA_RESULT_ERROR; + if (output->status == SPA_RESULT_HAVE_BUFFER) + return SPA_RESULT_HAVE_BUFFER; - if (!in_port->have_format) { - input->status = SPA_RESULT_NO_FORMAT; - return SPA_RESULT_ERROR; - } - if (input->buffer_id >= in_port->n_buffers) { - input->status = SPA_RESULT_INVALID_BUFFER_ID; - return SPA_RESULT_ERROR; - } + in_port = &this->in_ports[0]; + input = in_port->io; + spa_return_val_if_fail (input != NULL, SPA_RESULT_ERROR); - if (output->buffer_id >= out_port->n_buffers) { - dbuf = find_free_buffer (this, out_port); - } else { - dbuf = out_port->buffers[output->buffer_id].outbuf; - } - if (dbuf == NULL) + if ((dbuf = find_free_buffer (this, out_port)) == NULL) return SPA_RESULT_OUT_OF_BUFFERS; sbuf = in_port->buffers[input->buffer_id].outbuf; - input->buffer_id = SPA_ID_INVALID; - input->status = SPA_RESULT_OK; + input->status = SPA_RESULT_NEED_BUFFER; do_volume (this, sbuf, dbuf); output->buffer_id = dbuf->id; - output->status = SPA_RESULT_OK; - - if (sbuf != dbuf) - release_buffer (this, sbuf); + output->status = SPA_RESULT_HAVE_BUFFER; return SPA_RESULT_HAVE_BUFFER; } @@ -730,6 +726,34 @@ spa_volume_node_process_input (SpaNode *node) static SpaResult spa_volume_node_process_output (SpaNode *node) { + SpaVolume *this; + SpaVolumePort *in_port, *out_port; + SpaPortIO *input, *output; + + spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS); + + this = SPA_CONTAINER_OF (node, SpaVolume, node); + + out_port = &this->out_ports[0]; + output = out_port->io; + spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR); + + if (output->status == SPA_RESULT_HAVE_BUFFER) + return SPA_RESULT_HAVE_BUFFER; + + /* recycle */ + if (output->buffer_id != SPA_ID_INVALID) { + recycle_buffer (this, output->buffer_id); + output->buffer_id = SPA_ID_INVALID; + } + + in_port = &this->in_ports[0]; + input = in_port->io; + spa_return_val_if_fail (input != NULL, SPA_RESULT_ERROR); + + input->range = output->range; + input->status = SPA_RESULT_NEED_BUFFER; + return SPA_RESULT_NEED_BUFFER; } diff --git a/spa/tests/meson.build b/spa/tests/meson.build index 3d86ea65a..65a6a728d 100644 --- a/spa/tests/meson.build +++ b/spa/tests/meson.build @@ -8,6 +8,11 @@ executable('test-ringbuffer', 'test-ringbuffer.c', dependencies : [dl_lib, pthread_lib], link_with : spalib, install : false) +executable('test-graph', 'test-graph.c', + include_directories : [spa_inc, spa_libinc ], + dependencies : [dl_lib, pthread_lib], + link_with : spalib, + install : false) executable('stress-ringbuffer', 'stress-ringbuffer.c', include_directories : [spa_inc, spa_libinc ], dependencies : [dl_lib, pthread_lib], diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c new file mode 100644 index 000000000..e2d6c0eb5 --- /dev/null +++ b/spa/tests/test-graph.c @@ -0,0 +1,562 @@ +/* Spa + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +typedef struct { + uint32_t node; + uint32_t props; + uint32_t format; + uint32_t props_device; + uint32_t props_freq; + uint32_t props_volume; + uint32_t props_min_latency; + uint32_t props_live; + SpaTypeMeta meta; + SpaTypeData data; + SpaTypeMediaType media_type; + SpaTypeMediaSubtype media_subtype; + SpaTypeFormatAudio format_audio; + SpaTypeAudioFormat audio_format; + SpaTypeEventNode event_node; + SpaTypeCommandNode command_node; +} Type; + +static inline void +init_type (Type *type, SpaTypeMap *map) +{ + type->node = spa_type_map_get_id (map, SPA_TYPE__Node); + type->props = spa_type_map_get_id (map, SPA_TYPE__Props); + type->format = spa_type_map_get_id (map, SPA_TYPE__Format); + type->props_device = spa_type_map_get_id (map, SPA_TYPE_PROPS__device); + type->props_freq = spa_type_map_get_id (map, SPA_TYPE_PROPS__frequency); + type->props_volume = spa_type_map_get_id (map, SPA_TYPE_PROPS__volume); + type->props_min_latency = spa_type_map_get_id (map, SPA_TYPE_PROPS__minLatency); + type->props_live = spa_type_map_get_id (map, SPA_TYPE_PROPS__live); + spa_type_meta_map (map, &type->meta); + spa_type_data_map (map, &type->data); + spa_type_media_type_map (map, &type->media_type); + spa_type_media_subtype_map (map, &type->media_subtype); + spa_type_format_audio_map (map, &type->format_audio); + spa_type_audio_format_map (map, &type->audio_format); + spa_type_event_node_map (map, &type->event_node); + spa_type_command_node_map (map, &type->command_node); +} + +typedef struct { + SpaBuffer buffer; + SpaMeta metas[1]; + SpaMetaHeader header; + SpaData datas[1]; + SpaChunk chunks[1]; +} Buffer; + +typedef struct { + SpaTypeMap *map; + SpaLog *log; + SpaLoop data_loop; + Type type; + + SpaSupport support[4]; + uint32_t n_support; + + SpaGraph graph; + SpaGraphNode source_node; + SpaGraphPort source_out; + SpaGraphPort volume_in; + SpaGraphNode volume_node; + SpaGraphPort volume_out; + SpaGraphPort sink_in; + SpaGraphNode sink_node; + + SpaNode *sink; + SpaPortIO volume_sink_io[1]; + + SpaNode *volume; + SpaBuffer *volume_buffers[1]; + Buffer volume_buffer[1]; + + SpaNode *source; + SpaPortIO source_volume_io[1]; + SpaBuffer *source_buffers[1]; + Buffer source_buffer[1]; + + bool running; + pthread_t thread; + + SpaSource sources[16]; + unsigned int n_sources; + + bool rebuild_fds; + struct pollfd fds[16]; + unsigned int n_fds; +} AppData; + +#define MIN_LATENCY 64 + +#define BUFFER_SIZE MIN_LATENCY + +static void +init_buffer (AppData *data, SpaBuffer **bufs, Buffer *ba, int n_buffers, size_t size) +{ + int i; + + for (i = 0; i < n_buffers; i++) { + Buffer *b = &ba[i]; + bufs[i] = &b->buffer; + + b->buffer.id = i; + b->buffer.n_metas = 1; + b->buffer.metas = b->metas; + b->buffer.n_datas = 1; + b->buffer.datas = b->datas; + + b->header.flags = 0; + b->header.seq = 0; + b->header.pts = 0; + b->header.dts_offset = 0; + b->metas[0].type = data->type.meta.Header; + b->metas[0].data = &b->header; + b->metas[0].size = sizeof (b->header); + + b->datas[0].type = data->type.data.MemPtr; + b->datas[0].flags = 0; + b->datas[0].fd = -1; + b->datas[0].mapoffset = 0; + b->datas[0].maxsize = size; + b->datas[0].data = malloc (size); + b->datas[0].chunk = &b->chunks[0]; + b->datas[0].chunk->offset = 0; + b->datas[0].chunk->size = size; + b->datas[0].chunk->stride = 0; + } +} + +static SpaResult +make_node (AppData *data, SpaNode **node, const char *lib, const char *name) +{ + SpaHandle *handle; + SpaResult res; + void *hnd; + SpaEnumHandleFactoryFunc enum_func; + unsigned int i; + uint32_t state = 0; + + if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) { + printf ("can't load %s: %s\n", lib, dlerror()); + return SPA_RESULT_ERROR; + } + if ((enum_func = dlsym (hnd, "spa_enum_handle_factory")) == NULL) { + printf ("can't find enum function\n"); + return SPA_RESULT_ERROR; + } + + for (i = 0; ;i++) { + const SpaHandleFactory *factory; + void *iface; + + if ((res = enum_func (&factory, state++)) < 0) { + if (res != SPA_RESULT_ENUM_END) + printf ("can't enumerate factories: %d\n", res); + break; + } + if (strcmp (factory->name, name)) + continue; + + handle = calloc (1, factory->size); + if ((res = spa_handle_factory_init (factory, handle, NULL, data->support, data->n_support)) < 0) { + printf ("can't make factory instance: %d\n", res); + return res; + } + if ((res = spa_handle_get_interface (handle, data->type.node, &iface)) < 0) { + printf ("can't get interface %d\n", res); + return res; + } + *node = iface; + return SPA_RESULT_OK; + } + return SPA_RESULT_ERROR; +} + +static void +on_sink_event (SpaNode *node, SpaEvent *event, void *user_data) +{ + printf ("got event %d\n", SPA_EVENT_TYPE (event)); +} + +static void +on_sink_need_input (SpaNode *node, void *user_data) +{ + AppData *data = user_data; + + data->sink_node.action = PROCESS_CHECK; + data->sink_node.state = SPA_RESULT_NEED_BUFFER; + + spa_graph_node_schedule (&data->graph, &data->sink_node); +} + +static void +on_sink_reuse_buffer (SpaNode *node, uint32_t port_id, uint32_t buffer_id, void *user_data) +{ + AppData *data = user_data; + + data->volume_sink_io[0].buffer_id = buffer_id; +} + +static const SpaNodeCallbacks sink_callbacks = +{ + &on_sink_event, + &on_sink_need_input, + NULL, + &on_sink_reuse_buffer +}; + +static SpaResult +do_add_source (SpaLoop *loop, + SpaSource *source) +{ + AppData *data = SPA_CONTAINER_OF (loop, AppData, data_loop); + + data->sources[data->n_sources] = *source; + data->n_sources++; + data->rebuild_fds = true; + + return SPA_RESULT_OK; +} + +static SpaResult +do_update_source (SpaSource *source) +{ + return SPA_RESULT_OK; +} + +static void +do_remove_source (SpaSource *source) +{ +} + +static SpaResult +do_invoke (SpaLoop *loop, + SpaInvokeFunc func, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + return func (loop, false, seq, size, data, user_data); +} + +static SpaResult +make_nodes (AppData *data, const char *device) +{ + SpaResult res; + SpaProps *props; + SpaPODBuilder b = { 0 }; + SpaPODFrame f[2]; + uint8_t buffer[128]; + + if ((res = make_node (data, &data->sink, + "build/spa/plugins/alsa/libspa-alsa.so", + "alsa-sink")) < 0) { + printf ("can't create alsa-sink: %d\n", res); + return res; + } + spa_node_set_callbacks (data->sink, &sink_callbacks, sizeof (sink_callbacks), data); + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_props (&b, &f[0], data->type.props, + SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, device ? device : "hw:0"), + SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, MIN_LATENCY)); + props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps); + + if ((res = spa_node_set_props (data->sink, props)) < 0) + printf ("got set_props error %d\n", res); + + if ((res = make_node (data, &data->volume, + "build/spa/plugins/volume/libspa-volume.so", + "volume")) < 0) { + printf ("can't create volume: %d\n", res); + return res; + } + + if ((res = make_node (data, &data->source, + "build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so", + "audiotestsrc")) < 0) { + printf ("can't create audiotestsrc: %d\n", res); + return res; + } + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_props (&b, &f[0], data->type.props, + SPA_POD_PROP (&f[1], data->type.props_freq, 0, SPA_POD_TYPE_DOUBLE, 1, 600.0), + SPA_POD_PROP (&f[1], data->type.props_volume, 0, SPA_POD_TYPE_DOUBLE, 1, 0.5), + SPA_POD_PROP (&f[1], data->type.props_live, 0, SPA_POD_TYPE_BOOL, 1, false)); + props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps); + + if ((res = spa_node_set_props (data->source, props)) < 0) + printf ("got set_props error %d\n", res); + + spa_node_port_set_io (data->source, SPA_DIRECTION_OUTPUT, 0, &data->source_volume_io[0]); + spa_node_port_set_io (data->volume, SPA_DIRECTION_INPUT, 0, &data->source_volume_io[0]); + spa_node_port_set_io (data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]); + spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]); + + spa_graph_node_add (&data->graph, &data->source_node, spa_graph_node_schedule_default, data->source); + spa_graph_port_add (&data->graph, &data->source_node, + &data->source_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->source_volume_io[0]); + + spa_graph_node_add (&data->graph, &data->volume_node, spa_graph_node_schedule_default, data->volume); + spa_graph_port_add (&data->graph, &data->volume_node, + &data->volume_in, SPA_DIRECTION_INPUT, 0, + 0, &data->source_volume_io[0]); + + spa_graph_port_link (&data->graph, &data->source_out, &data->volume_in); + + spa_graph_port_add (&data->graph, &data->volume_node, + &data->volume_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->volume_sink_io[0]); + + spa_graph_node_add (&data->graph, &data->sink_node, spa_graph_node_schedule_default, data->sink); + spa_graph_port_add (&data->graph, &data->sink_node, + &data->sink_in, SPA_DIRECTION_INPUT, 0, + 0, &data->volume_sink_io[0]); + + spa_graph_port_link (&data->graph, &data->volume_out, &data->sink_in); + + return res; +} + +static SpaResult +negotiate_formats (AppData *data) +{ + SpaResult res; + SpaFormat *format, *filter; + uint32_t state = 0; + SpaPODBuilder b = { 0 }; + SpaPODFrame f[2]; + uint8_t buffer[256]; + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_format (&b, &f[0], data->type.format, + data->type.media_type.audio, data->type.media_subtype.raw, + SPA_POD_PROP (&f[1], data->type.format_audio.format, 0, + SPA_POD_TYPE_ID, 1, + data->type.audio_format.S16), + SPA_POD_PROP (&f[1], data->type.format_audio.layout, 0, + SPA_POD_TYPE_INT, 1, + SPA_AUDIO_LAYOUT_INTERLEAVED), + SPA_POD_PROP (&f[1], data->type.format_audio.rate, 0, + SPA_POD_TYPE_INT, 1, + 44100), + SPA_POD_PROP (&f[1], data->type.format_audio.channels, 0, + SPA_POD_TYPE_INT, 1, + 2)); + filter = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaFormat); + + if ((res = spa_node_port_enum_formats (data->sink, SPA_DIRECTION_INPUT, 0, &format, filter, state)) < 0) + return res; + + + if ((res = spa_node_port_set_format (data->sink, SPA_DIRECTION_INPUT, 0, 0, format)) < 0) + return res; + + if ((res = spa_node_port_set_format (data->volume, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) + return res; + + init_buffer (data, data->volume_buffers, data->volume_buffer, 1, BUFFER_SIZE); + if ((res = spa_node_port_use_buffers (data->sink, SPA_DIRECTION_INPUT, 0, data->volume_buffers, 1)) < 0) + return res; + if ((res = spa_node_port_use_buffers (data->volume, SPA_DIRECTION_OUTPUT, 0, data->volume_buffers, 1)) < 0) + return res; + + if ((res = spa_node_port_set_format (data->volume, SPA_DIRECTION_INPUT, 0, 0, format)) < 0) + return res; + if ((res = spa_node_port_set_format (data->source, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) + return res; + + init_buffer (data, data->source_buffers, data->source_buffer, 1, BUFFER_SIZE); + if ((res = spa_node_port_use_buffers (data->volume, SPA_DIRECTION_INPUT, 0, data->source_buffers, 1)) < 0) + return res; + if ((res = spa_node_port_use_buffers (data->source, SPA_DIRECTION_OUTPUT, 0, data->source_buffers, 1)) < 0) + return res; + + return SPA_RESULT_OK; +} + +static void * +loop (void *user_data) +{ + AppData *data = user_data; + + printf ("enter thread %d\n", data->n_sources); + while (data->running) { + int i, r; + + /* rebuild */ + if (data->rebuild_fds) { + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + data->fds[i].fd = p->fd; + data->fds[i].events = p->mask; + } + data->n_fds = data->n_sources; + data->rebuild_fds = false; + } + + r = poll ((struct pollfd *) data->fds, data->n_fds, -1); + if (r < 0) { + if (errno == EINTR) + continue; + break; + } + if (r == 0) { + fprintf (stderr, "select timeout"); + break; + } + + /* after */ + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + p->rmask = 0; + if (data->fds[i].revents & POLLIN) + p->rmask |= SPA_IO_IN; + if (data->fds[i].revents & POLLOUT) + p->rmask |= SPA_IO_OUT; + if (data->fds[i].revents & POLLHUP) + p->rmask |= SPA_IO_HUP; + if (data->fds[i].revents & POLLERR) + p->rmask |= SPA_IO_ERR; + } + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + if (p->rmask) + p->func (p); + } + } + printf ("leave thread\n"); + + return NULL; +} + +static void +run_async_sink (AppData *data) +{ + SpaResult res; + int err; + + { + SpaCommand cmd = SPA_COMMAND_INIT (data->type.command_node.Start); + if ((res = spa_node_send_command (data->source, &cmd)) < 0) + printf ("got source error %d\n", res); + if ((res = spa_node_send_command (data->volume, &cmd)) < 0) + printf ("got volume error %d\n", res); + if ((res = spa_node_send_command (data->sink, &cmd)) < 0) + printf ("got sink error %d\n", res); + } + + data->running = true; + if ((err = pthread_create (&data->thread, NULL, loop, data)) != 0) { + printf ("can't create thread: %d %s", err, strerror (err)); + data->running = false; + } + + printf ("sleeping for 1000 seconds\n"); + sleep (1000); + + if (data->running) { + data->running = false; + pthread_join (data->thread, NULL); + } + + { + SpaCommand cmd = SPA_COMMAND_INIT (data->type.command_node.Pause); + if ((res = spa_node_send_command (data->sink, &cmd)) < 0) + printf ("got error %d\n", res); + if ((res = spa_node_send_command (data->volume, &cmd)) < 0) + printf ("got volume error %d\n", res); + if ((res = spa_node_send_command (data->source, &cmd)) < 0) + printf ("got source error %d\n", res); + } +} + +int +main (int argc, char *argv[]) +{ + AppData data = { NULL }; + SpaResult res; + const char *str; + + + spa_graph_init (&data.graph); + + data.map = spa_type_map_get_default(); + data.log = spa_log_get_default(); + data.data_loop.size = sizeof (SpaLoop); + data.data_loop.add_source = do_add_source; + data.data_loop.update_source = do_update_source; + data.data_loop.remove_source = do_remove_source; + data.data_loop.invoke = do_invoke; + + if ((str = getenv ("PINOS_DEBUG"))) + data.log->level = atoi (str); + + data.support[0].type = SPA_TYPE__TypeMap; + data.support[0].data = data.map; + data.support[1].type = SPA_TYPE__Log; + data.support[1].data = data.log; + data.support[2].type = SPA_TYPE_LOOP__DataLoop; + data.support[2].data = &data.data_loop; + data.support[3].type = SPA_TYPE_LOOP__MainLoop; + data.support[3].data = &data.data_loop; + data.n_support = 4; + + init_type (&data.type, data.map); + + if ((res = make_nodes (&data, argc > 1 ? argv[1] : NULL)) < 0) { + printf ("can't make nodes: %d\n", res); + return -1; + } + if ((res = negotiate_formats (&data)) < 0) { + printf ("can't negotiate nodes: %d\n", res); + return -1; + } + + run_async_sink (&data); +} diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index c4714bf5d..99d84d655 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,8 @@ #include #include +#undef USE_GRAPH + typedef struct { uint32_t node; uint32_t props; @@ -94,6 +97,17 @@ typedef struct { SpaSupport support[4]; uint32_t n_support; + SpaGraph graph; + SpaGraphNode source1_node; + SpaGraphPort source1_out; + SpaGraphNode source2_node; + SpaGraphPort source2_out; + SpaGraphPort mix_in[2]; + SpaGraphNode mix_node; + SpaGraphPort mix_out; + SpaGraphPort sink_in; + SpaGraphNode sink_node; + SpaNode *sink; SpaPortIO mix_sink_io[1]; @@ -220,6 +234,12 @@ static void on_sink_need_input (SpaNode *node, void *user_data) { AppData *data = user_data; +#ifdef USE_GRAPH + data->sink_node.action = PROCESS_CHECK; + data->sink_node.state = SPA_RESULT_NEED_BUFFER; + + spa_graph_node_schedule (&data->graph, &data->sink_node); +#else SpaResult res; res = spa_node_process_output (data->mix); @@ -249,6 +269,7 @@ push: } else { printf ("got process_output error from mixer %d\n", res); } +#endif } static void @@ -369,6 +390,55 @@ make_nodes (AppData *data, const char *device) if ((res = spa_node_set_props (data->source2, props)) < 0) printf ("got set_props error %d\n", res); + data->mix_ports[0] = 0; + if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 0)) < 0) + return res; + + data->mix_ports[1] = 1; + if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 1)) < 0) + return res; + + spa_node_port_set_io (data->source1, SPA_DIRECTION_OUTPUT, 0, &data->source1_mix_io[0]); + spa_node_port_set_io (data->source2, SPA_DIRECTION_OUTPUT, 0, &data->source2_mix_io[0]); + spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 0, &data->source1_mix_io[0]); + spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 1, &data->source2_mix_io[0]); + spa_node_port_set_io (data->mix, SPA_DIRECTION_OUTPUT, 0, &data->mix_sink_io[0]); + spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); + +#ifdef USE_GRAPH + spa_graph_node_add (&data->graph, &data->source1_node, spa_graph_node_schedule_default, data->source1); + spa_graph_port_add (&data->graph, &data->source1_node, + &data->source1_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->source1_mix_io[0]); + + spa_graph_node_add (&data->graph, &data->source2_node, spa_graph_node_schedule_default, data->source2); + spa_graph_port_add (&data->graph, &data->source2_node, + &data->source2_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->source2_mix_io[0]); + + spa_graph_node_add (&data->graph, &data->mix_node, spa_graph_node_schedule_default, data->mix); + spa_graph_port_add (&data->graph, &data->mix_node, + &data->mix_in[0], SPA_DIRECTION_INPUT, data->mix_ports[0], + 0, &data->source1_mix_io[0]); + spa_graph_port_add (&data->graph, &data->mix_node, + &data->mix_in[1], SPA_DIRECTION_INPUT, data->mix_ports[1], + 0, &data->source2_mix_io[0]); + + spa_graph_port_link (&data->graph, &data->source1_out, &data->mix_in[0]); + spa_graph_port_link (&data->graph, &data->source2_out, &data->mix_in[1]); + + spa_graph_port_add (&data->graph, &data->mix_node, + &data->mix_out, SPA_DIRECTION_OUTPUT, 0, + 0, &data->mix_sink_io[0]); + + spa_graph_node_add (&data->graph, &data->sink_node, spa_graph_node_schedule_default, data->sink); + spa_graph_port_add (&data->graph, &data->sink_node, + &data->sink_in, SPA_DIRECTION_INPUT, 0, + 0, &data->mix_sink_io[0]); + + spa_graph_port_link (&data->graph, &data->mix_out, &data->sink_in); +#endif + return res; } @@ -402,13 +472,9 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_enum_formats (data->sink, SPA_DIRECTION_INPUT, 0, &format, filter, state)) < 0) return res; - if ((res = spa_node_port_set_format (data->sink, SPA_DIRECTION_INPUT, 0, 0, format)) < 0) return res; - spa_node_port_set_io (data->mix, SPA_DIRECTION_OUTPUT, 0, &data->mix_sink_io[0]); - spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); - if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0) return res; @@ -418,13 +484,6 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_OUTPUT, 0, data->mix_buffers, 1)) < 0) return res; - data->mix_ports[0] = 0; - if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 0)) < 0) - return res; - - spa_node_port_set_io (data->source1, SPA_DIRECTION_OUTPUT, 0, &data->source1_mix_io[0]); - spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 0, &data->source1_mix_io[0]); - if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[0], 0, format)) < 0) return res; @@ -437,13 +496,6 @@ negotiate_formats (AppData *data) if ((res = spa_node_port_use_buffers (data->source1, SPA_DIRECTION_OUTPUT, 0, data->source1_buffers, 2)) < 0) return res; - data->mix_ports[1] = 1; - if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 1)) < 0) - return res; - - spa_node_port_set_io (data->source2, SPA_DIRECTION_OUTPUT, 0, &data->source2_mix_io[0]); - spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 1, &data->source2_mix_io[0]); - if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[1], 0, format)) < 0) return res; @@ -538,8 +590,8 @@ run_async_sink (AppData *data) data->running = false; } - printf ("sleeping for 1000 seconds\n"); - sleep (1000); + printf ("sleeping for 30 seconds\n"); + sleep (30); if (data->running) { data->running = false; @@ -574,6 +626,8 @@ main (int argc, char *argv[]) data.data_loop.remove_source = do_remove_source; data.data_loop.invoke = do_invoke; + spa_graph_init (&data.graph); + if ((str = getenv ("PINOS_DEBUG"))) data.log->level = atoi (str);