graph: add graph datastructure and scheduler

Improve event and command init so that it can be used more easily
as compound literals.
Improve volume
Make it possible to use graph scheduler in test-mixer
This commit is contained in:
Wim Taymans 2017-05-18 17:16:48 +02:00
parent 53dd63eb3a
commit 6691eb7845
18 changed files with 970 additions and 108 deletions

View file

@ -95,7 +95,8 @@ typedef struct {
} SpaCommandNodeClockUpdate;
#define SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(type,change_mask,rate,ticks,monotonic_time,offset,scale,state,flags,latency) \
SPA_COMMAND_INIT_COMPLEX (sizeof (SpaCommandNodeClockUpdateBody), type, \
SPA_COMMAND_INIT_COMPLEX (SpaCommandNodeClockUpdate, \
sizeof (SpaCommandNodeClockUpdateBody), type, \
SPA_POD_INT_INIT (change_mask), \
SPA_POD_INT_INIT (rate), \
SPA_POD_LONG_INIT (ticks), \

View file

@ -43,11 +43,11 @@ struct _SpaCommand {
#define SPA_COMMAND_TYPE(cmd) ((cmd)->body.body.type)
#define SPA_COMMAND_INIT(type) \
#define SPA_COMMAND_INIT(type) (SpaCommand) \
{ { sizeof (SpaCommandBody), SPA_POD_TYPE_OBJECT }, \
{ { 0, type } } } \
#define SPA_COMMAND_INIT_COMPLEX(size,type,...) \
#define SPA_COMMAND_INIT_COMPLEX(t,size,type,...) (t) \
{ { size, SPA_POD_TYPE_OBJECT }, \
{ { 0, type }, __VA_ARGS__ } } \

View file

@ -70,7 +70,8 @@ typedef struct {
} SpaEventNodeAsyncComplete;
#define SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(type,seq,res) \
SPA_EVENT_INIT_COMPLEX (sizeof (SpaEventNodeAsyncCompleteBody), type, \
SPA_EVENT_INIT_COMPLEX (SpaEventNodeAsyncComplete, \
sizeof (SpaEventNodeAsyncCompleteBody), type, \
SPA_POD_INT_INIT (seq), \
SPA_POD_INT_INIT (res))
@ -90,7 +91,8 @@ typedef struct {
} SpaEventNodeRequestClockUpdate;
#define SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(type,update_mask,timestamp,offset) \
SPA_EVENT_INIT_COMPLEX (sizeof (SpaEventNodeRequestClockUpdateBody), type, \
SPA_EVENT_INIT_COMPLEX (SpaEventNodeRequestClockUpdate, \
sizeof (SpaEventNodeRequestClockUpdateBody), type, \
SPA_POD_INT_INIT (update_mask), \
SPA_POD_LONG_INIT (timestamp), \
SPA_POD_LONG_INIT (offset))

View file

@ -43,11 +43,11 @@ struct _SpaEvent {
#define SPA_EVENT_TYPE(ev) ((ev)->body.body.type)
#define SPA_EVENT_INIT(type) \
#define SPA_EVENT_INIT(type) (SpaEvent) \
{ { sizeof (SpaEventBody), SPA_POD_TYPE_OBJECT }, \
{ { 0, type } } } \
#define SPA_EVENT_INIT_COMPLEX(size,type,...) \
#define SPA_EVENT_INIT_COMPLEX(t,size,type,...) (t) \
{ { size, SPA_POD_TYPE_OBJECT }, \
{ { 0, type }, __VA_ARGS__ } } \

221
spa/include/spa/graph.h Normal file
View file

@ -0,0 +1,221 @@
/* Simple Plugin API
* Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __SPA_GRAPH_H__
#define __SPA_GRAPH_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <spa/defs.h>
#include <spa/list.h>
typedef struct SpaGraph SpaGraph;
typedef struct SpaGraphNode SpaGraphNode;
typedef struct SpaGraphPort SpaGraphPort;
struct SpaGraph {
SpaList nodes;
SpaList ready;
};
#define PROCESS_CHECK 0
#define PROCESS_IN 1
#define PROCESS_OUT 2
typedef SpaResult (*SpaGraphNodeFunc) (SpaGraphNode *node);
struct SpaGraphNode {
SpaList link;
SpaList ready_link;
SpaList ports[2];
#define SPA_GRAPH_NODE_FLAG_ASYNC (1 << 0)
uint32_t flags;
SpaResult state;
uint32_t action;
SpaGraphNodeFunc schedule;
void *user_data;
uint32_t max_in;
uint32_t required_in;
uint32_t ready_in;
};
struct SpaGraphPort {
SpaList link;
SpaGraphNode *node;
SpaDirection direction;
uint32_t port_id;
uint32_t flags;
SpaPortIO *io;
SpaGraphPort *peer;
};
static inline void
spa_graph_init (SpaGraph *graph)
{
spa_list_init (&graph->nodes);
spa_list_init (&graph->ready);
}
static inline SpaResult
spa_graph_node_schedule_default (SpaGraphNode *node)
{
SpaNode *n = node->user_data;
if (node->action == PROCESS_IN)
return spa_node_process_input (n);
else if (node->action == PROCESS_OUT)
return spa_node_process_output (n);
else
return SPA_RESULT_ERROR;
}
static inline void
spa_graph_node_add (SpaGraph *graph, SpaGraphNode *node, SpaGraphNodeFunc schedule, void *user_data)
{
spa_list_init (&node->ports[SPA_DIRECTION_INPUT]);
spa_list_init (&node->ports[SPA_DIRECTION_OUTPUT]);
node->flags = 0;
node->state = SPA_RESULT_OK;
node->action = PROCESS_OUT;
node->schedule = schedule;
node->user_data = user_data;
spa_list_insert (graph->nodes.prev, &node->link);
node->max_in = node->required_in = node->ready_in = 0;
}
static inline void
spa_graph_port_check (SpaGraph *graph,
SpaGraphPort *port)
{
SpaGraphNode *node = port->node;
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = PROCESS_IN;
if (node->ready_link.next == NULL)
spa_list_insert (graph->ready.prev, &node->ready_link);
} else if (node->ready_link.next) {
spa_list_remove (&node->ready_link);
node->ready_link.next = NULL;
}
}
static inline void
spa_graph_port_add (SpaGraph *graph,
SpaGraphNode *node,
SpaGraphPort *port,
SpaDirection direction,
uint32_t port_id,
uint32_t flags,
SpaPortIO *io)
{
port->node = node;
port->direction = direction;
port->port_id = port_id;
port->flags = flags;
port->io = io;
port->peer = NULL;
spa_list_insert (node->ports[port->direction].prev, &port->link);
node->max_in++;
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT)
node->required_in++;
spa_graph_port_check (graph, port);
}
static inline void
spa_graph_node_remove (SpaGraph *graph, SpaGraphNode *node)
{
spa_list_remove (&node->link);
}
static inline void
spa_graph_port_remove (SpaGraph *graph, SpaGraphPort *port)
{
spa_list_remove (&port->link);
}
static inline void
spa_graph_port_link (SpaGraph *graph, SpaGraphPort *out, SpaGraphPort *in)
{
out->peer = in;
in->peer = out;
}
static inline void
spa_graph_port_unlink (SpaGraph *graph, SpaGraphPort *out, SpaGraphPort *in)
{
out->peer = NULL;
in->peer = NULL;
}
static inline void
spa_graph_node_schedule (SpaGraph *graph, SpaGraphNode *node)
{
SpaGraphPort *p;
if (node->ready_link.next == NULL)
spa_list_insert (graph->ready.prev, &node->ready_link);
while (!spa_list_is_empty (&graph->ready)) {
SpaGraphNode *n = spa_list_first (&graph->ready, SpaGraphNode, ready_link);
spa_list_remove (&n->ready_link);
n->ready_link.next = NULL;
switch (n->action) {
case PROCESS_IN:
case PROCESS_OUT:
n->state = n->schedule (n);
n->action = PROCESS_CHECK;
spa_list_insert (graph->ready.prev, &n->ready_link);
break;
case PROCESS_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;
spa_list_for_each (p, &n->ports[SPA_DIRECTION_INPUT], link) {
SpaGraphNode *pn = p->peer->node;
if (p->io->status == SPA_RESULT_NEED_BUFFER) {
pn->action = PROCESS_OUT;
spa_list_insert (graph->ready.prev, &pn->ready_link);
}
else if (p->io->status == SPA_RESULT_OK)
n->ready_in++;
}
}
else if (n->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each (p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check (graph, p->peer);
}
break;
default:
break;
}
}
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* __SPA_GRAPH_H__ */

View file

@ -127,13 +127,12 @@ do_command (SpaLoop *loop,
res = SPA_RESULT_NOT_IMPLEMENTED;
if (async) {
SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res);
spa_loop_invoke (this->main_loop,
do_send_event,
SPA_ID_INVALID,
sizeof (ac),
&ac,
sizeof (SpaEventNodeAsyncComplete),
&SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res),
this);
}
return res;

View file

@ -120,13 +120,12 @@ do_start (SpaLoop *loop,
res = spa_alsa_start (this, false);
if (async) {
SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res);
spa_loop_invoke (this->main_loop,
do_send_event,
SPA_ID_INVALID,
sizeof (ac),
&ac,
sizeof (SpaEventNodeAsyncComplete),
&SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res),
this);
}
return res;
@ -146,13 +145,12 @@ do_pause (SpaLoop *loop,
res = spa_alsa_pause (this, false);
if (async) {
SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res);
spa_loop_invoke (this->main_loop,
do_send_event,
SPA_ID_INVALID,
sizeof (ac),
&ac,
sizeof (SpaEventNodeAsyncComplete),
&SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res),
this);
}
return res;

View file

@ -532,7 +532,6 @@ static void
recycle_buffer (SpaAudioMixer *this, uint32_t id)
{
SpaAudioMixerPort *port = &this->out_ports[0];
MixerBuffer *b = &port->buffers[id];
if (!b->outstanding) {

View file

@ -267,13 +267,12 @@ do_pause (SpaLoop *loop,
cmd);
if (async) {
SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res);
spa_loop_invoke (this->state[0].main_loop,
do_pause_done,
seq,
sizeof (ac),
&ac,
sizeof (SpaEventNodeAsyncComplete),
&SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res),
this);
}
return res;
@ -313,13 +312,12 @@ do_start (SpaLoop *loop,
cmd);
if (async) {
SpaEventNodeAsyncComplete ac = SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res);
spa_loop_invoke (this->state[0].main_loop,
do_start_done,
seq,
sizeof (ac),
&ac,
sizeof (SpaEventNodeAsyncComplete),
&SPA_EVENT_NODE_ASYNC_COMPLETE_INIT (this->type.event_node.AsyncComplete,
seq, res),
this);
}

View file

@ -567,13 +567,28 @@ spa_volume_node_port_set_io (SpaNode *node,
return SPA_RESULT_OK;
}
static void
recycle_buffer (SpaVolume *this, uint32_t id)
{
SpaVolumePort *port = &this->out_ports[0];
SpaVolumeBuffer *b = &port->buffers[id];
if (!b->outstanding) {
spa_log_warn (this->log, "volume %p: buffer %d not outstanding", this, id);
return;
}
spa_list_insert (port->empty.prev, &b->link);
b->outstanding = false;
spa_log_trace (this->log, "volume %p: recycle buffer %d", this, id);
}
static SpaResult
spa_volume_node_port_reuse_buffer (SpaNode *node,
uint32_t port_id,
uint32_t buffer_id)
{
SpaVolume *this;
SpaVolumeBuffer *b;
SpaVolumePort *port;
spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS);
@ -590,12 +605,7 @@ spa_volume_node_port_reuse_buffer (SpaNode *node,
if (buffer_id >= port->n_buffers)
return SPA_RESULT_INVALID_BUFFER_ID;
b = &port->buffers[buffer_id];
if (!b->outstanding)
return SPA_RESULT_OK;
b->outstanding = false;
spa_list_insert (port->empty.prev, &b->link);
recycle_buffer (this, buffer_id);
return SPA_RESULT_OK;
}
@ -627,7 +637,8 @@ find_free_buffer (SpaVolume *this, SpaVolumePort *port)
static inline void
release_buffer (SpaVolume *this, SpaBuffer *buffer)
{
this->callbacks.reuse_buffer (&this->node, 0, buffer->id, this->user_data);
if (this->callbacks.reuse_buffer)
this->callbacks.reuse_buffer (&this->node, 0, buffer->id, this->user_data);
}
static void
@ -686,43 +697,28 @@ spa_volume_node_process_input (SpaNode *node)
this = SPA_CONTAINER_OF (node, SpaVolume, node);
in_port = &this->in_ports[0];
out_port = &this->out_ports[0];
output = out_port->io;
spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR);
if ((input = in_port->io) == NULL)
return SPA_RESULT_ERROR;
if ((output = out_port->io) == NULL)
return SPA_RESULT_ERROR;
if (output->status == SPA_RESULT_HAVE_BUFFER)
return SPA_RESULT_HAVE_BUFFER;
if (!in_port->have_format) {
input->status = SPA_RESULT_NO_FORMAT;
return SPA_RESULT_ERROR;
}
if (input->buffer_id >= in_port->n_buffers) {
input->status = SPA_RESULT_INVALID_BUFFER_ID;
return SPA_RESULT_ERROR;
}
in_port = &this->in_ports[0];
input = in_port->io;
spa_return_val_if_fail (input != NULL, SPA_RESULT_ERROR);
if (output->buffer_id >= out_port->n_buffers) {
dbuf = find_free_buffer (this, out_port);
} else {
dbuf = out_port->buffers[output->buffer_id].outbuf;
}
if (dbuf == NULL)
if ((dbuf = find_free_buffer (this, out_port)) == NULL)
return SPA_RESULT_OUT_OF_BUFFERS;
sbuf = in_port->buffers[input->buffer_id].outbuf;
input->buffer_id = SPA_ID_INVALID;
input->status = SPA_RESULT_OK;
input->status = SPA_RESULT_NEED_BUFFER;
do_volume (this, sbuf, dbuf);
output->buffer_id = dbuf->id;
output->status = SPA_RESULT_OK;
if (sbuf != dbuf)
release_buffer (this, sbuf);
output->status = SPA_RESULT_HAVE_BUFFER;
return SPA_RESULT_HAVE_BUFFER;
}
@ -730,6 +726,34 @@ spa_volume_node_process_input (SpaNode *node)
static SpaResult
spa_volume_node_process_output (SpaNode *node)
{
SpaVolume *this;
SpaVolumePort *in_port, *out_port;
SpaPortIO *input, *output;
spa_return_val_if_fail (node != NULL, SPA_RESULT_INVALID_ARGUMENTS);
this = SPA_CONTAINER_OF (node, SpaVolume, node);
out_port = &this->out_ports[0];
output = out_port->io;
spa_return_val_if_fail (output != NULL, SPA_RESULT_ERROR);
if (output->status == SPA_RESULT_HAVE_BUFFER)
return SPA_RESULT_HAVE_BUFFER;
/* recycle */
if (output->buffer_id != SPA_ID_INVALID) {
recycle_buffer (this, output->buffer_id);
output->buffer_id = SPA_ID_INVALID;
}
in_port = &this->in_ports[0];
input = in_port->io;
spa_return_val_if_fail (input != NULL, SPA_RESULT_ERROR);
input->range = output->range;
input->status = SPA_RESULT_NEED_BUFFER;
return SPA_RESULT_NEED_BUFFER;
}

View file

@ -8,6 +8,11 @@ executable('test-ringbuffer', 'test-ringbuffer.c',
dependencies : [dl_lib, pthread_lib],
link_with : spalib,
install : false)
executable('test-graph', 'test-graph.c',
include_directories : [spa_inc, spa_libinc ],
dependencies : [dl_lib, pthread_lib],
link_with : spalib,
install : false)
executable('stress-ringbuffer', 'stress-ringbuffer.c',
include_directories : [spa_inc, spa_libinc ],
dependencies : [dl_lib, pthread_lib],

562
spa/tests/test-graph.c Normal file
View file

@ -0,0 +1,562 @@
/* Spa
* Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <dlfcn.h>
#include <errno.h>
#include <pthread.h>
#include <poll.h>
#include <spa/node.h>
#include <spa/log.h>
#include <spa/loop.h>
#include <spa/type-map.h>
#include <spa/audio/format-utils.h>
#include <spa/format-utils.h>
#include <spa/format-builder.h>
#include <spa/graph.h>
#include <lib/mapper.h>
#include <lib/debug.h>
#include <lib/props.h>
typedef struct {
uint32_t node;
uint32_t props;
uint32_t format;
uint32_t props_device;
uint32_t props_freq;
uint32_t props_volume;
uint32_t props_min_latency;
uint32_t props_live;
SpaTypeMeta meta;
SpaTypeData data;
SpaTypeMediaType media_type;
SpaTypeMediaSubtype media_subtype;
SpaTypeFormatAudio format_audio;
SpaTypeAudioFormat audio_format;
SpaTypeEventNode event_node;
SpaTypeCommandNode command_node;
} Type;
static inline void
init_type (Type *type, SpaTypeMap *map)
{
type->node = spa_type_map_get_id (map, SPA_TYPE__Node);
type->props = spa_type_map_get_id (map, SPA_TYPE__Props);
type->format = spa_type_map_get_id (map, SPA_TYPE__Format);
type->props_device = spa_type_map_get_id (map, SPA_TYPE_PROPS__device);
type->props_freq = spa_type_map_get_id (map, SPA_TYPE_PROPS__frequency);
type->props_volume = spa_type_map_get_id (map, SPA_TYPE_PROPS__volume);
type->props_min_latency = spa_type_map_get_id (map, SPA_TYPE_PROPS__minLatency);
type->props_live = spa_type_map_get_id (map, SPA_TYPE_PROPS__live);
spa_type_meta_map (map, &type->meta);
spa_type_data_map (map, &type->data);
spa_type_media_type_map (map, &type->media_type);
spa_type_media_subtype_map (map, &type->media_subtype);
spa_type_format_audio_map (map, &type->format_audio);
spa_type_audio_format_map (map, &type->audio_format);
spa_type_event_node_map (map, &type->event_node);
spa_type_command_node_map (map, &type->command_node);
}
typedef struct {
SpaBuffer buffer;
SpaMeta metas[1];
SpaMetaHeader header;
SpaData datas[1];
SpaChunk chunks[1];
} Buffer;
typedef struct {
SpaTypeMap *map;
SpaLog *log;
SpaLoop data_loop;
Type type;
SpaSupport support[4];
uint32_t n_support;
SpaGraph graph;
SpaGraphNode source_node;
SpaGraphPort source_out;
SpaGraphPort volume_in;
SpaGraphNode volume_node;
SpaGraphPort volume_out;
SpaGraphPort sink_in;
SpaGraphNode sink_node;
SpaNode *sink;
SpaPortIO volume_sink_io[1];
SpaNode *volume;
SpaBuffer *volume_buffers[1];
Buffer volume_buffer[1];
SpaNode *source;
SpaPortIO source_volume_io[1];
SpaBuffer *source_buffers[1];
Buffer source_buffer[1];
bool running;
pthread_t thread;
SpaSource sources[16];
unsigned int n_sources;
bool rebuild_fds;
struct pollfd fds[16];
unsigned int n_fds;
} AppData;
#define MIN_LATENCY 64
#define BUFFER_SIZE MIN_LATENCY
static void
init_buffer (AppData *data, SpaBuffer **bufs, Buffer *ba, int n_buffers, size_t size)
{
int i;
for (i = 0; i < n_buffers; i++) {
Buffer *b = &ba[i];
bufs[i] = &b->buffer;
b->buffer.id = i;
b->buffer.n_metas = 1;
b->buffer.metas = b->metas;
b->buffer.n_datas = 1;
b->buffer.datas = b->datas;
b->header.flags = 0;
b->header.seq = 0;
b->header.pts = 0;
b->header.dts_offset = 0;
b->metas[0].type = data->type.meta.Header;
b->metas[0].data = &b->header;
b->metas[0].size = sizeof (b->header);
b->datas[0].type = data->type.data.MemPtr;
b->datas[0].flags = 0;
b->datas[0].fd = -1;
b->datas[0].mapoffset = 0;
b->datas[0].maxsize = size;
b->datas[0].data = malloc (size);
b->datas[0].chunk = &b->chunks[0];
b->datas[0].chunk->offset = 0;
b->datas[0].chunk->size = size;
b->datas[0].chunk->stride = 0;
}
}
static SpaResult
make_node (AppData *data, SpaNode **node, const char *lib, const char *name)
{
SpaHandle *handle;
SpaResult res;
void *hnd;
SpaEnumHandleFactoryFunc enum_func;
unsigned int i;
uint32_t state = 0;
if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) {
printf ("can't load %s: %s\n", lib, dlerror());
return SPA_RESULT_ERROR;
}
if ((enum_func = dlsym (hnd, "spa_enum_handle_factory")) == NULL) {
printf ("can't find enum function\n");
return SPA_RESULT_ERROR;
}
for (i = 0; ;i++) {
const SpaHandleFactory *factory;
void *iface;
if ((res = enum_func (&factory, state++)) < 0) {
if (res != SPA_RESULT_ENUM_END)
printf ("can't enumerate factories: %d\n", res);
break;
}
if (strcmp (factory->name, name))
continue;
handle = calloc (1, factory->size);
if ((res = spa_handle_factory_init (factory, handle, NULL, data->support, data->n_support)) < 0) {
printf ("can't make factory instance: %d\n", res);
return res;
}
if ((res = spa_handle_get_interface (handle, data->type.node, &iface)) < 0) {
printf ("can't get interface %d\n", res);
return res;
}
*node = iface;
return SPA_RESULT_OK;
}
return SPA_RESULT_ERROR;
}
static void
on_sink_event (SpaNode *node, SpaEvent *event, void *user_data)
{
printf ("got event %d\n", SPA_EVENT_TYPE (event));
}
static void
on_sink_need_input (SpaNode *node, void *user_data)
{
AppData *data = user_data;
data->sink_node.action = PROCESS_CHECK;
data->sink_node.state = SPA_RESULT_NEED_BUFFER;
spa_graph_node_schedule (&data->graph, &data->sink_node);
}
static void
on_sink_reuse_buffer (SpaNode *node, uint32_t port_id, uint32_t buffer_id, void *user_data)
{
AppData *data = user_data;
data->volume_sink_io[0].buffer_id = buffer_id;
}
static const SpaNodeCallbacks sink_callbacks =
{
&on_sink_event,
&on_sink_need_input,
NULL,
&on_sink_reuse_buffer
};
static SpaResult
do_add_source (SpaLoop *loop,
SpaSource *source)
{
AppData *data = SPA_CONTAINER_OF (loop, AppData, data_loop);
data->sources[data->n_sources] = *source;
data->n_sources++;
data->rebuild_fds = true;
return SPA_RESULT_OK;
}
static SpaResult
do_update_source (SpaSource *source)
{
return SPA_RESULT_OK;
}
static void
do_remove_source (SpaSource *source)
{
}
static SpaResult
do_invoke (SpaLoop *loop,
SpaInvokeFunc func,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
return func (loop, false, seq, size, data, user_data);
}
static SpaResult
make_nodes (AppData *data, const char *device)
{
SpaResult res;
SpaProps *props;
SpaPODBuilder b = { 0 };
SpaPODFrame f[2];
uint8_t buffer[128];
if ((res = make_node (data, &data->sink,
"build/spa/plugins/alsa/libspa-alsa.so",
"alsa-sink")) < 0) {
printf ("can't create alsa-sink: %d\n", res);
return res;
}
spa_node_set_callbacks (data->sink, &sink_callbacks, sizeof (sink_callbacks), data);
spa_pod_builder_init (&b, buffer, sizeof (buffer));
spa_pod_builder_props (&b, &f[0], data->type.props,
SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, device ? device : "hw:0"),
SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, MIN_LATENCY));
props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps);
if ((res = spa_node_set_props (data->sink, props)) < 0)
printf ("got set_props error %d\n", res);
if ((res = make_node (data, &data->volume,
"build/spa/plugins/volume/libspa-volume.so",
"volume")) < 0) {
printf ("can't create volume: %d\n", res);
return res;
}
if ((res = make_node (data, &data->source,
"build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so",
"audiotestsrc")) < 0) {
printf ("can't create audiotestsrc: %d\n", res);
return res;
}
spa_pod_builder_init (&b, buffer, sizeof (buffer));
spa_pod_builder_props (&b, &f[0], data->type.props,
SPA_POD_PROP (&f[1], data->type.props_freq, 0, SPA_POD_TYPE_DOUBLE, 1, 600.0),
SPA_POD_PROP (&f[1], data->type.props_volume, 0, SPA_POD_TYPE_DOUBLE, 1, 0.5),
SPA_POD_PROP (&f[1], data->type.props_live, 0, SPA_POD_TYPE_BOOL, 1, false));
props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps);
if ((res = spa_node_set_props (data->source, props)) < 0)
printf ("got set_props error %d\n", res);
spa_node_port_set_io (data->source, SPA_DIRECTION_OUTPUT, 0, &data->source_volume_io[0]);
spa_node_port_set_io (data->volume, SPA_DIRECTION_INPUT, 0, &data->source_volume_io[0]);
spa_node_port_set_io (data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]);
spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]);
spa_graph_node_add (&data->graph, &data->source_node, spa_graph_node_schedule_default, data->source);
spa_graph_port_add (&data->graph, &data->source_node,
&data->source_out, SPA_DIRECTION_OUTPUT, 0,
0, &data->source_volume_io[0]);
spa_graph_node_add (&data->graph, &data->volume_node, spa_graph_node_schedule_default, data->volume);
spa_graph_port_add (&data->graph, &data->volume_node,
&data->volume_in, SPA_DIRECTION_INPUT, 0,
0, &data->source_volume_io[0]);
spa_graph_port_link (&data->graph, &data->source_out, &data->volume_in);
spa_graph_port_add (&data->graph, &data->volume_node,
&data->volume_out, SPA_DIRECTION_OUTPUT, 0,
0, &data->volume_sink_io[0]);
spa_graph_node_add (&data->graph, &data->sink_node, spa_graph_node_schedule_default, data->sink);
spa_graph_port_add (&data->graph, &data->sink_node,
&data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->volume_sink_io[0]);
spa_graph_port_link (&data->graph, &data->volume_out, &data->sink_in);
return res;
}
static SpaResult
negotiate_formats (AppData *data)
{
SpaResult res;
SpaFormat *format, *filter;
uint32_t state = 0;
SpaPODBuilder b = { 0 };
SpaPODFrame f[2];
uint8_t buffer[256];
spa_pod_builder_init (&b, buffer, sizeof (buffer));
spa_pod_builder_format (&b, &f[0], data->type.format,
data->type.media_type.audio, data->type.media_subtype.raw,
SPA_POD_PROP (&f[1], data->type.format_audio.format, 0,
SPA_POD_TYPE_ID, 1,
data->type.audio_format.S16),
SPA_POD_PROP (&f[1], data->type.format_audio.layout, 0,
SPA_POD_TYPE_INT, 1,
SPA_AUDIO_LAYOUT_INTERLEAVED),
SPA_POD_PROP (&f[1], data->type.format_audio.rate, 0,
SPA_POD_TYPE_INT, 1,
44100),
SPA_POD_PROP (&f[1], data->type.format_audio.channels, 0,
SPA_POD_TYPE_INT, 1,
2));
filter = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaFormat);
if ((res = spa_node_port_enum_formats (data->sink, SPA_DIRECTION_INPUT, 0, &format, filter, state)) < 0)
return res;
if ((res = spa_node_port_set_format (data->sink, SPA_DIRECTION_INPUT, 0, 0, format)) < 0)
return res;
if ((res = spa_node_port_set_format (data->volume, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0)
return res;
init_buffer (data, data->volume_buffers, data->volume_buffer, 1, BUFFER_SIZE);
if ((res = spa_node_port_use_buffers (data->sink, SPA_DIRECTION_INPUT, 0, data->volume_buffers, 1)) < 0)
return res;
if ((res = spa_node_port_use_buffers (data->volume, SPA_DIRECTION_OUTPUT, 0, data->volume_buffers, 1)) < 0)
return res;
if ((res = spa_node_port_set_format (data->volume, SPA_DIRECTION_INPUT, 0, 0, format)) < 0)
return res;
if ((res = spa_node_port_set_format (data->source, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0)
return res;
init_buffer (data, data->source_buffers, data->source_buffer, 1, BUFFER_SIZE);
if ((res = spa_node_port_use_buffers (data->volume, SPA_DIRECTION_INPUT, 0, data->source_buffers, 1)) < 0)
return res;
if ((res = spa_node_port_use_buffers (data->source, SPA_DIRECTION_OUTPUT, 0, data->source_buffers, 1)) < 0)
return res;
return SPA_RESULT_OK;
}
static void *
loop (void *user_data)
{
AppData *data = user_data;
printf ("enter thread %d\n", data->n_sources);
while (data->running) {
int i, r;
/* rebuild */
if (data->rebuild_fds) {
for (i = 0; i < data->n_sources; i++) {
SpaSource *p = &data->sources[i];
data->fds[i].fd = p->fd;
data->fds[i].events = p->mask;
}
data->n_fds = data->n_sources;
data->rebuild_fds = false;
}
r = poll ((struct pollfd *) data->fds, data->n_fds, -1);
if (r < 0) {
if (errno == EINTR)
continue;
break;
}
if (r == 0) {
fprintf (stderr, "select timeout");
break;
}
/* after */
for (i = 0; i < data->n_sources; i++) {
SpaSource *p = &data->sources[i];
p->rmask = 0;
if (data->fds[i].revents & POLLIN)
p->rmask |= SPA_IO_IN;
if (data->fds[i].revents & POLLOUT)
p->rmask |= SPA_IO_OUT;
if (data->fds[i].revents & POLLHUP)
p->rmask |= SPA_IO_HUP;
if (data->fds[i].revents & POLLERR)
p->rmask |= SPA_IO_ERR;
}
for (i = 0; i < data->n_sources; i++) {
SpaSource *p = &data->sources[i];
if (p->rmask)
p->func (p);
}
}
printf ("leave thread\n");
return NULL;
}
static void
run_async_sink (AppData *data)
{
SpaResult res;
int err;
{
SpaCommand cmd = SPA_COMMAND_INIT (data->type.command_node.Start);
if ((res = spa_node_send_command (data->source, &cmd)) < 0)
printf ("got source error %d\n", res);
if ((res = spa_node_send_command (data->volume, &cmd)) < 0)
printf ("got volume error %d\n", res);
if ((res = spa_node_send_command (data->sink, &cmd)) < 0)
printf ("got sink error %d\n", res);
}
data->running = true;
if ((err = pthread_create (&data->thread, NULL, loop, data)) != 0) {
printf ("can't create thread: %d %s", err, strerror (err));
data->running = false;
}
printf ("sleeping for 1000 seconds\n");
sleep (1000);
if (data->running) {
data->running = false;
pthread_join (data->thread, NULL);
}
{
SpaCommand cmd = SPA_COMMAND_INIT (data->type.command_node.Pause);
if ((res = spa_node_send_command (data->sink, &cmd)) < 0)
printf ("got error %d\n", res);
if ((res = spa_node_send_command (data->volume, &cmd)) < 0)
printf ("got volume error %d\n", res);
if ((res = spa_node_send_command (data->source, &cmd)) < 0)
printf ("got source error %d\n", res);
}
}
int
main (int argc, char *argv[])
{
AppData data = { NULL };
SpaResult res;
const char *str;
spa_graph_init (&data.graph);
data.map = spa_type_map_get_default();
data.log = spa_log_get_default();
data.data_loop.size = sizeof (SpaLoop);
data.data_loop.add_source = do_add_source;
data.data_loop.update_source = do_update_source;
data.data_loop.remove_source = do_remove_source;
data.data_loop.invoke = do_invoke;
if ((str = getenv ("PINOS_DEBUG")))
data.log->level = atoi (str);
data.support[0].type = SPA_TYPE__TypeMap;
data.support[0].data = data.map;
data.support[1].type = SPA_TYPE__Log;
data.support[1].data = data.log;
data.support[2].type = SPA_TYPE_LOOP__DataLoop;
data.support[2].data = &data.data_loop;
data.support[3].type = SPA_TYPE_LOOP__MainLoop;
data.support[3].data = &data.data_loop;
data.n_support = 4;
init_type (&data.type, data.map);
if ((res = make_nodes (&data, argc > 1 ? argv[1] : NULL)) < 0) {
printf ("can't make nodes: %d\n", res);
return -1;
}
if ((res = negotiate_formats (&data)) < 0) {
printf ("can't negotiate nodes: %d\n", res);
return -1;
}
run_async_sink (&data);
}

View file

@ -29,6 +29,7 @@
#include <spa/node.h>
#include <spa/log.h>
#include <spa/loop.h>
#include <spa/graph.h>
#include <spa/type-map.h>
#include <spa/audio/format-utils.h>
#include <spa/format-utils.h>
@ -37,6 +38,8 @@
#include <lib/debug.h>
#include <lib/props.h>
#undef USE_GRAPH
typedef struct {
uint32_t node;
uint32_t props;
@ -94,6 +97,17 @@ typedef struct {
SpaSupport support[4];
uint32_t n_support;
SpaGraph graph;
SpaGraphNode source1_node;
SpaGraphPort source1_out;
SpaGraphNode source2_node;
SpaGraphPort source2_out;
SpaGraphPort mix_in[2];
SpaGraphNode mix_node;
SpaGraphPort mix_out;
SpaGraphPort sink_in;
SpaGraphNode sink_node;
SpaNode *sink;
SpaPortIO mix_sink_io[1];
@ -220,6 +234,12 @@ static void
on_sink_need_input (SpaNode *node, void *user_data)
{
AppData *data = user_data;
#ifdef USE_GRAPH
data->sink_node.action = PROCESS_CHECK;
data->sink_node.state = SPA_RESULT_NEED_BUFFER;
spa_graph_node_schedule (&data->graph, &data->sink_node);
#else
SpaResult res;
res = spa_node_process_output (data->mix);
@ -249,6 +269,7 @@ push:
} else {
printf ("got process_output error from mixer %d\n", res);
}
#endif
}
static void
@ -369,6 +390,55 @@ make_nodes (AppData *data, const char *device)
if ((res = spa_node_set_props (data->source2, props)) < 0)
printf ("got set_props error %d\n", res);
data->mix_ports[0] = 0;
if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 0)) < 0)
return res;
data->mix_ports[1] = 1;
if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 1)) < 0)
return res;
spa_node_port_set_io (data->source1, SPA_DIRECTION_OUTPUT, 0, &data->source1_mix_io[0]);
spa_node_port_set_io (data->source2, SPA_DIRECTION_OUTPUT, 0, &data->source2_mix_io[0]);
spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 0, &data->source1_mix_io[0]);
spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 1, &data->source2_mix_io[0]);
spa_node_port_set_io (data->mix, SPA_DIRECTION_OUTPUT, 0, &data->mix_sink_io[0]);
spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]);
#ifdef USE_GRAPH
spa_graph_node_add (&data->graph, &data->source1_node, spa_graph_node_schedule_default, data->source1);
spa_graph_port_add (&data->graph, &data->source1_node,
&data->source1_out, SPA_DIRECTION_OUTPUT, 0,
0, &data->source1_mix_io[0]);
spa_graph_node_add (&data->graph, &data->source2_node, spa_graph_node_schedule_default, data->source2);
spa_graph_port_add (&data->graph, &data->source2_node,
&data->source2_out, SPA_DIRECTION_OUTPUT, 0,
0, &data->source2_mix_io[0]);
spa_graph_node_add (&data->graph, &data->mix_node, spa_graph_node_schedule_default, data->mix);
spa_graph_port_add (&data->graph, &data->mix_node,
&data->mix_in[0], SPA_DIRECTION_INPUT, data->mix_ports[0],
0, &data->source1_mix_io[0]);
spa_graph_port_add (&data->graph, &data->mix_node,
&data->mix_in[1], SPA_DIRECTION_INPUT, data->mix_ports[1],
0, &data->source2_mix_io[0]);
spa_graph_port_link (&data->graph, &data->source1_out, &data->mix_in[0]);
spa_graph_port_link (&data->graph, &data->source2_out, &data->mix_in[1]);
spa_graph_port_add (&data->graph, &data->mix_node,
&data->mix_out, SPA_DIRECTION_OUTPUT, 0,
0, &data->mix_sink_io[0]);
spa_graph_node_add (&data->graph, &data->sink_node, spa_graph_node_schedule_default, data->sink);
spa_graph_port_add (&data->graph, &data->sink_node,
&data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->mix_sink_io[0]);
spa_graph_port_link (&data->graph, &data->mix_out, &data->sink_in);
#endif
return res;
}
@ -402,13 +472,9 @@ negotiate_formats (AppData *data)
if ((res = spa_node_port_enum_formats (data->sink, SPA_DIRECTION_INPUT, 0, &format, filter, state)) < 0)
return res;
if ((res = spa_node_port_set_format (data->sink, SPA_DIRECTION_INPUT, 0, 0, format)) < 0)
return res;
spa_node_port_set_io (data->mix, SPA_DIRECTION_OUTPUT, 0, &data->mix_sink_io[0]);
spa_node_port_set_io (data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]);
if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_OUTPUT, 0, 0, format)) < 0)
return res;
@ -418,13 +484,6 @@ negotiate_formats (AppData *data)
if ((res = spa_node_port_use_buffers (data->mix, SPA_DIRECTION_OUTPUT, 0, data->mix_buffers, 1)) < 0)
return res;
data->mix_ports[0] = 0;
if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 0)) < 0)
return res;
spa_node_port_set_io (data->source1, SPA_DIRECTION_OUTPUT, 0, &data->source1_mix_io[0]);
spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 0, &data->source1_mix_io[0]);
if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[0], 0, format)) < 0)
return res;
@ -437,13 +496,6 @@ negotiate_formats (AppData *data)
if ((res = spa_node_port_use_buffers (data->source1, SPA_DIRECTION_OUTPUT, 0, data->source1_buffers, 2)) < 0)
return res;
data->mix_ports[1] = 1;
if ((res = spa_node_add_port (data->mix, SPA_DIRECTION_INPUT, 1)) < 0)
return res;
spa_node_port_set_io (data->source2, SPA_DIRECTION_OUTPUT, 0, &data->source2_mix_io[0]);
spa_node_port_set_io (data->mix, SPA_DIRECTION_INPUT, 1, &data->source2_mix_io[0]);
if ((res = spa_node_port_set_format (data->mix, SPA_DIRECTION_INPUT, data->mix_ports[1], 0, format)) < 0)
return res;
@ -538,8 +590,8 @@ run_async_sink (AppData *data)
data->running = false;
}
printf ("sleeping for 1000 seconds\n");
sleep (1000);
printf ("sleeping for 30 seconds\n");
sleep (30);
if (data->running) {
data->running = false;
@ -574,6 +626,8 @@ main (int argc, char *argv[])
data.data_loop.remove_source = do_remove_source;
data.data_loop.invoke = do_invoke;
spa_graph_init (&data.graph);
if ((str = getenv ("PINOS_DEBUG")))
data.log->level = atoi (str);