node: the callbacks determine async operation

This commit is contained in:
Wim Taymans 2017-05-11 16:11:08 +02:00
parent 361d2b6a87
commit 6cbf398ad4
7 changed files with 51 additions and 75 deletions

View file

@ -51,8 +51,6 @@ pinos_spa_node_load (PinosCore *core,
SpaEnumHandleFactoryFunc enum_func; SpaEnumHandleFactoryFunc enum_func;
const SpaHandleFactory *factory; const SpaHandleFactory *factory;
void *iface; void *iface;
SpaDictItem items[1];
SpaDict dict = SPA_DICT_INIT (1, items);
if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) { if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) {
pinos_log_error ("can't load %s: %s", lib, dlerror()); pinos_log_error ("can't load %s: %s", lib, dlerror());
@ -73,13 +71,10 @@ pinos_spa_node_load (PinosCore *core,
break; break;
} }
items[0].key = "asynchronous";
items[0].value = "1";
handle = calloc (1, factory->size); handle = calloc (1, factory->size);
if ((res = spa_handle_factory_init (factory, if ((res = spa_handle_factory_init (factory,
handle, handle,
&dict, NULL,
core->support, core->support,
core->n_support)) < 0) { core->n_support)) < 0) {
pinos_log_error ("can't make factory instance: %d", res); pinos_log_error ("can't make factory instance: %d", res);

View file

@ -99,7 +99,7 @@ typedef struct {
* @user_data: user data provided when registering the callbacks * @user_data: user data provided when registering the callbacks
* *
* This will be called when an out-of-bound event is notified * This will be called when an out-of-bound event is notified
* on @node. * on @node. the callback can be called from any thread.
*/ */
void (*event) (SpaNode *node, void (*event) (SpaNode *node,
SpaEvent *event, SpaEvent *event,
@ -109,7 +109,11 @@ typedef struct {
* @node: a #SpaNode * @node: a #SpaNode
* @user_data: user data provided when registering the callbacks * @user_data: user data provided when registering the callbacks
* *
* The node needs more input * The node needs more input. This callback is called from the
* data thread.
*
* When this function is NULL, synchronous operation is requested
* on the input ports.
*/ */
void (*need_input) (SpaNode *node, void (*need_input) (SpaNode *node,
void *user_data); void *user_data);
@ -118,11 +122,14 @@ typedef struct {
* @node: a #SpaNode * @node: a #SpaNode
* @user_data: user data provided when registering the callbacks * @user_data: user data provided when registering the callbacks
* *
* The node has output input * The node has output input. This callback is called from the
* data thread.
*
* When this function is NULL, synchronous operation is requested
* on the output ports.
*/ */
void (*have_output) (SpaNode *node, void (*have_output) (SpaNode *node,
void *user_data); void *user_data);
/** /**
* SpaNodeCallbacks::reuse_buffer: * SpaNodeCallbacks::reuse_buffer:
* @node: a #SpaNode * @node: a #SpaNode
@ -130,7 +137,11 @@ typedef struct {
* @buffer_id: the buffer id to be reused * @buffer_id: the buffer id to be reused
* @user_data: user data provided when registering the callbacks * @user_data: user data provided when registering the callbacks
* *
* The node has a buffer that can be reused. * The node has a buffer that can be reused. This callback is called
* from the data thread.
*
* When this function is NULL, the buffers to reuse will be set in
* the io area or the input ports.
*/ */
void (*reuse_buffer) (SpaNode *node, void (*reuse_buffer) (SpaNode *node,
uint32_t port_id, uint32_t port_id,
@ -228,11 +239,8 @@ struct _SpaNode {
* @callbacks_size: size of the callbacks structure * @callbacks_size: size of the callbacks structure
* @user_data: user data passed to the callback functions * @user_data: user data passed to the callback functions
* *
* Set a callback to receive events from @node. if @callback is %NULL, the * Set callbacks to receive events and scheduling callbacks from @node.
* current callback is removed. * if @callbacks is %NULL, the current callbacks are removed.
*
* The callback can be emited from any thread. The caller should take
* appropriate actions to node the event in other threads when needed.
* *
* This function must be called from the main thread. * This function must be called from the main thread.
* *

View file

@ -117,7 +117,6 @@ struct _SpaAudioTestSrc {
SpaTypeMap *map; SpaTypeMap *map;
SpaLog *log; SpaLog *log;
SpaLoop *data_loop; SpaLoop *data_loop;
bool async;
uint8_t props_buffer[512]; uint8_t props_buffer[512];
SpaAudioTestSrcProps props; SpaAudioTestSrcProps props;
@ -251,7 +250,7 @@ send_have_output (SpaAudioTestSrc *this)
static void static void
set_timer (SpaAudioTestSrc *this, bool enabled) set_timer (SpaAudioTestSrc *this, bool enabled)
{ {
if (this->async || this->props.live) { if (this->callbacks.have_output || this->props.live) {
if (enabled) { if (enabled) {
if (this->props.live) { if (this->props.live) {
uint64_t next_time = this->start_time + this->elapsed_time; uint64_t next_time = this->start_time + this->elapsed_time;
@ -274,7 +273,7 @@ read_timer (SpaAudioTestSrc *this)
{ {
uint64_t expirations; uint64_t expirations;
if (this->async || this->props.live) { if (this->callbacks.have_output || this->props.live) {
if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd"); perror ("read timerfd");
} }
@ -428,6 +427,11 @@ spa_audiotestsrc_node_set_callbacks (SpaNode *node,
this = SPA_CONTAINER_OF (node, SpaAudioTestSrc, node); this = SPA_CONTAINER_OF (node, SpaAudioTestSrc, node);
if (this->data_loop == NULL && callbacks->have_output) {
spa_log_error (this->log, "a data_loop is needed for async operation");
return SPA_RESULT_ERROR;
}
this->callbacks = *callbacks; this->callbacks = *callbacks;
this->user_data = user_data; this->user_data = user_data;
@ -857,7 +861,7 @@ spa_audiotestsrc_node_process_output (SpaNode *node)
this->io->buffer_id = SPA_ID_INVALID; this->io->buffer_id = SPA_ID_INVALID;
} }
if (!this->async && (io->status == SPA_RESULT_NEED_BUFFER)) if (!this->callbacks.have_output && (io->status == SPA_RESULT_NEED_BUFFER))
return audiotestsrc_make_buffer (this); return audiotestsrc_make_buffer (this);
else else
return SPA_RESULT_OK; return SPA_RESULT_OK;
@ -984,7 +988,6 @@ audiotestsrc_init (const SpaHandleFactory *factory,
{ {
SpaAudioTestSrc *this; SpaAudioTestSrc *this;
uint32_t i; uint32_t i;
const char *str;
spa_return_val_if_fail (factory != NULL, SPA_RESULT_INVALID_ARGUMENTS); spa_return_val_if_fail (factory != NULL, SPA_RESULT_INVALID_ARGUMENTS);
spa_return_val_if_fail (handle != NULL, SPA_RESULT_INVALID_ARGUMENTS); spa_return_val_if_fail (handle != NULL, SPA_RESULT_INVALID_ARGUMENTS);
@ -994,11 +997,6 @@ audiotestsrc_init (const SpaHandleFactory *factory,
this = (SpaAudioTestSrc *) handle; this = (SpaAudioTestSrc *) handle;
if (info && (str = spa_dict_lookup (info, "asynchronous")))
this->async = atoi (str) == 1;
else
this->async = false;
for (i = 0; i < n_support; i++) { for (i = 0; i < n_support; i++) {
if (strcmp (support[i].type, SPA_TYPE__TypeMap) == 0) if (strcmp (support[i].type, SPA_TYPE__TypeMap) == 0)
this->map = support[i].data; this->map = support[i].data;
@ -1011,10 +1009,6 @@ audiotestsrc_init (const SpaHandleFactory *factory,
spa_log_error (this->log, "a type-map is needed"); spa_log_error (this->log, "a type-map is needed");
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
if (this->data_loop == NULL && this->async) {
spa_log_error (this->log, "a data_loop is needed");
return SPA_RESULT_ERROR;
}
init_type (&this->type, this->map); init_type (&this->type, this->map);
this->node = audiotestsrc_node; this->node = audiotestsrc_node;
@ -1041,7 +1035,7 @@ audiotestsrc_init (const SpaHandleFactory *factory,
if (this->props.live) if (this->props.live)
this->info.flags |= SPA_PORT_INFO_FLAG_LIVE; this->info.flags |= SPA_PORT_INFO_FLAG_LIVE;
spa_log_info (this->log, "audiotestsrc %p: initialized, async=%d", this, this->async); spa_log_info (this->log, "audiotestsrc %p: initialized", this);
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }

View file

@ -109,7 +109,6 @@ struct _SpaVideoTestSrc {
SpaTypeMap *map; SpaTypeMap *map;
SpaLog *log; SpaLog *log;
SpaLoop *data_loop; SpaLoop *data_loop;
bool async;
uint8_t props_buffer[512]; uint8_t props_buffer[512];
SpaVideoTestSrcProps props; SpaVideoTestSrcProps props;
@ -221,14 +220,6 @@ spa_videotestsrc_node_set_props (SpaNode *node,
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
static inline SpaResult
send_have_output (SpaVideoTestSrc *this)
{
if (this->callbacks.have_output)
this->callbacks.have_output (&this->node, this->user_data);
return SPA_RESULT_OK;
}
#include "draw.c" #include "draw.c"
static SpaResult static SpaResult
@ -240,7 +231,7 @@ fill_buffer (SpaVideoTestSrc *this, VTSBuffer *b)
static void static void
set_timer (SpaVideoTestSrc *this, bool enabled) set_timer (SpaVideoTestSrc *this, bool enabled)
{ {
if (this->async || this->props.live) { if (this->callbacks.have_output || this->props.live) {
if (enabled) { if (enabled) {
if (this->props.live) { if (this->props.live) {
uint64_t next_time = this->start_time + this->elapsed_time; uint64_t next_time = this->start_time + this->elapsed_time;
@ -263,7 +254,7 @@ read_timer (SpaVideoTestSrc *this)
{ {
uint64_t expirations; uint64_t expirations;
if (this->async || this->props.live) { if (this->callbacks.have_output || this->props.live) {
if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd"); perror ("read timerfd");
} }
@ -322,7 +313,7 @@ videotestsrc_on_output (SpaSource *source)
res = videotestsrc_make_buffer (this); res = videotestsrc_make_buffer (this);
if (res == SPA_RESULT_HAVE_BUFFER) if (res == SPA_RESULT_HAVE_BUFFER)
send_have_output (this); this->callbacks.have_output (&this->node, this->user_data);
} }
static SpaResult static SpaResult
@ -390,6 +381,10 @@ spa_videotestsrc_node_set_callbacks (SpaNode *node,
this = SPA_CONTAINER_OF (node, SpaVideoTestSrc, node); this = SPA_CONTAINER_OF (node, SpaVideoTestSrc, node);
if (this->data_loop == NULL && callbacks->have_output != NULL) {
spa_log_error (this->log, "a data_loop is needed for async operation");
return SPA_RESULT_ERROR;
}
this->callbacks = *callbacks; this->callbacks = *callbacks;
this->user_data = user_data; this->user_data = user_data;
@ -818,7 +813,7 @@ spa_videotestsrc_node_process_output (SpaNode *node)
this->io->buffer_id = SPA_ID_INVALID; this->io->buffer_id = SPA_ID_INVALID;
} }
if (!this->async && (io->status == SPA_RESULT_NEED_BUFFER)) if (!this->callbacks.have_output && (io->status == SPA_RESULT_NEED_BUFFER))
return videotestsrc_make_buffer (this); return videotestsrc_make_buffer (this);
else else
return SPA_RESULT_OK; return SPA_RESULT_OK;
@ -945,7 +940,6 @@ videotestsrc_init (const SpaHandleFactory *factory,
{ {
SpaVideoTestSrc *this; SpaVideoTestSrc *this;
uint32_t i; uint32_t i;
const char *str;
spa_return_val_if_fail (factory != NULL, SPA_RESULT_INVALID_ARGUMENTS); spa_return_val_if_fail (factory != NULL, SPA_RESULT_INVALID_ARGUMENTS);
spa_return_val_if_fail (handle != NULL, SPA_RESULT_INVALID_ARGUMENTS); spa_return_val_if_fail (handle != NULL, SPA_RESULT_INVALID_ARGUMENTS);
@ -955,11 +949,6 @@ videotestsrc_init (const SpaHandleFactory *factory,
this = (SpaVideoTestSrc *) handle; this = (SpaVideoTestSrc *) handle;
if (info && (str = spa_dict_lookup (info, "asynchronous")))
this->async = atoi (str) == 1;
else
this->async = false;
for (i = 0; i < n_support; i++) { for (i = 0; i < n_support; i++) {
if (strcmp (support[i].type, SPA_TYPE__TypeMap) == 0) if (strcmp (support[i].type, SPA_TYPE__TypeMap) == 0)
this->map = support[i].data; this->map = support[i].data;
@ -972,10 +961,6 @@ videotestsrc_init (const SpaHandleFactory *factory,
spa_log_error (this->log, "a type-map is needed"); spa_log_error (this->log, "a type-map is needed");
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
if (this->data_loop == NULL && this->async) {
spa_log_error (this->log, "a data_loop is needed");
return SPA_RESULT_ERROR;
}
init_type (&this->type, this->map); init_type (&this->type, this->map);
this->node = videotestsrc_node; this->node = videotestsrc_node;
@ -1002,7 +987,7 @@ videotestsrc_init (const SpaHandleFactory *factory,
if (this->props.live) if (this->props.live)
this->info.flags |= SPA_PORT_INFO_FLAG_LIVE; this->info.flags |= SPA_PORT_INFO_FLAG_LIVE;
spa_log_info (this->log, "videotestsrc %p: initialized, async=%d", this, this->async); spa_log_info (this->log, "videotestsrc %p: initialized", this);
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }

View file

@ -165,7 +165,7 @@ init_buffer (AppData *data, SpaBuffer **bufs, Buffer *ba, int n_buffers, size_t
} }
static SpaResult static SpaResult
make_node (AppData *data, SpaNode **node, const char *lib, const char *name, bool async) make_node (AppData *data, SpaNode **node, const char *lib, const char *name)
{ {
SpaHandle *handle; SpaHandle *handle;
SpaResult res; SpaResult res;
@ -173,8 +173,6 @@ make_node (AppData *data, SpaNode **node, const char *lib, const char *name, boo
SpaEnumHandleFactoryFunc enum_func; SpaEnumHandleFactoryFunc enum_func;
unsigned int i; unsigned int i;
uint32_t state = 0; uint32_t state = 0;
SpaDictItem items[1];
SpaDict dict = SPA_DICT_INIT (1, items);
if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) { if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) {
printf ("can't load %s: %s\n", lib, dlerror()); printf ("can't load %s: %s\n", lib, dlerror());
@ -185,9 +183,6 @@ make_node (AppData *data, SpaNode **node, const char *lib, const char *name, boo
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
items[0].key = "asynchronous";
items[0].value = async ? "1" : "0";
for (i = 0; ;i++) { for (i = 0; ;i++) {
const SpaHandleFactory *factory; const SpaHandleFactory *factory;
void *iface; void *iface;
@ -201,7 +196,7 @@ make_node (AppData *data, SpaNode **node, const char *lib, const char *name, boo
continue; continue;
handle = calloc (1, factory->size); handle = calloc (1, factory->size);
if ((res = spa_handle_factory_init (factory, handle, &dict, data->support, data->n_support)) < 0) { if ((res = spa_handle_factory_init (factory, handle, NULL, data->support, data->n_support)) < 0) {
printf ("can't make factory instance: %d\n", res); printf ("can't make factory instance: %d\n", res);
return res; return res;
} }
@ -318,7 +313,7 @@ make_nodes (AppData *data, const char *device)
if ((res = make_node (data, &data->sink, if ((res = make_node (data, &data->sink,
"build/spa/plugins/alsa/libspa-alsa.so", "build/spa/plugins/alsa/libspa-alsa.so",
"alsa-sink", true)) < 0) { "alsa-sink")) < 0) {
printf ("can't create alsa-sink: %d\n", res); printf ("can't create alsa-sink: %d\n", res);
return res; return res;
} }
@ -335,14 +330,14 @@ make_nodes (AppData *data, const char *device)
if ((res = make_node (data, &data->mix, if ((res = make_node (data, &data->mix,
"build/spa/plugins/audiomixer/libspa-audiomixer.so", "build/spa/plugins/audiomixer/libspa-audiomixer.so",
"audiomixer", false)) < 0) { "audiomixer")) < 0) {
printf ("can't create audiomixer: %d\n", res); printf ("can't create audiomixer: %d\n", res);
return res; return res;
} }
if ((res = make_node (data, &data->source1, if ((res = make_node (data, &data->source1,
"build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so", "build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so",
"audiotestsrc", false)) < 0) { "audiotestsrc")) < 0) {
printf ("can't create audiotestsrc: %d\n", res); printf ("can't create audiotestsrc: %d\n", res);
return res; return res;
} }
@ -359,7 +354,7 @@ make_nodes (AppData *data, const char *device)
if ((res = make_node (data, &data->source2, if ((res = make_node (data, &data->source2,
"build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so", "build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so",
"audiotestsrc", false)) < 0) { "audiotestsrc")) < 0) {
printf ("can't create audiotestsrc: %d\n", res); printf ("can't create audiotestsrc: %d\n", res);
return res; return res;
} }

View file

@ -157,7 +157,7 @@ init_buffer (AppData *data, SpaBuffer **bufs, Buffer *ba, int n_buffers, size_t
} }
static SpaResult static SpaResult
make_node (AppData *data, SpaNode **node, const char *lib, const char *name, bool async) make_node (AppData *data, SpaNode **node, const char *lib, const char *name)
{ {
SpaHandle *handle; SpaHandle *handle;
SpaResult res; SpaResult res;
@ -165,8 +165,6 @@ make_node (AppData *data, SpaNode **node, const char *lib, const char *name, boo
SpaEnumHandleFactoryFunc enum_func; SpaEnumHandleFactoryFunc enum_func;
unsigned int i; unsigned int i;
uint32_t state = 0; uint32_t state = 0;
SpaDictItem items[1];
SpaDict dict = SPA_DICT_INIT (1, items);
if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) { if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) {
printf ("can't load %s: %s\n", lib, dlerror()); printf ("can't load %s: %s\n", lib, dlerror());
@ -177,9 +175,6 @@ make_node (AppData *data, SpaNode **node, const char *lib, const char *name, boo
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
items[0].key = "asynchronous";
items[0].value = async ? "1" : "0";
for (i = 0; ;i++) { for (i = 0; ;i++) {
const SpaHandleFactory *factory; const SpaHandleFactory *factory;
void *iface; void *iface;
@ -193,7 +188,7 @@ make_node (AppData *data, SpaNode **node, const char *lib, const char *name, boo
continue; continue;
handle = calloc (1, factory->size); handle = calloc (1, factory->size);
if ((res = spa_handle_factory_init (factory, handle, &dict, data->support, data->n_support)) < 0) { if ((res = spa_handle_factory_init (factory, handle, NULL, data->support, data->n_support)) < 0) {
printf ("can't make factory instance: %d\n", res); printf ("can't make factory instance: %d\n", res);
return res; return res;
} }
@ -287,7 +282,7 @@ make_nodes (AppData *data, const char *device)
if ((res = make_node (data, &data->sink, if ((res = make_node (data, &data->sink,
"build/spa/plugins/alsa/libspa-alsa.so", "build/spa/plugins/alsa/libspa-alsa.so",
"alsa-sink", true)) < 0) { "alsa-sink")) < 0) {
printf ("can't create alsa-sink: %d\n", res); printf ("can't create alsa-sink: %d\n", res);
return res; return res;
} }
@ -304,7 +299,7 @@ make_nodes (AppData *data, const char *device)
if ((res = make_node (data, &data->source, if ((res = make_node (data, &data->source,
"build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so", "build/spa/plugins/audiotestsrc/libspa-audiotestsrc.so",
"audiotestsrc", false)) < 0) { "audiotestsrc")) < 0) {
printf ("can't create audiotestsrc: %d\n", res); printf ("can't create audiotestsrc: %d\n", res);
return res; return res;
} }

View file

@ -544,12 +544,16 @@ main (int argc, char *argv[])
{ {
AppData data = { 0 }; AppData data = { 0 };
SpaResult res; SpaResult res;
const char *str;
data.use_buffer = true; data.use_buffer = true;
data.map = spa_type_map_get_default (); data.map = spa_type_map_get_default ();
data.log = spa_log_get_default (); data.log = spa_log_get_default ();
if ((str = getenv ("PINOS_DEBUG")))
data.log->level = atoi (str);
data.data_loop.size = sizeof (SpaLoop); data.data_loop.size = sizeof (SpaLoop);
data.data_loop.add_source = do_add_source; data.data_loop.add_source = do_add_source;
data.data_loop.update_source = do_update_source; data.data_loop.update_source = do_update_source;