videotestsrc: remove threads

Remove the threads from the element and use timerfd to schedule timeouts
Propagate live flag between node links
This commit is contained in:
Wim Taymans 2016-09-19 19:17:59 +02:00
parent 1e565a5f65
commit 4b83d6cfc8
9 changed files with 809 additions and 802 deletions

View file

@ -28,6 +28,11 @@
#include <spa/queue.h>
#include <spa/audio/format.h>
#define TIMESPEC_TO_TIME(ts) ((ts)->tv_sec * 1000000000ll + (ts)->tv_nsec)
#define SAMPLES_TO_TIME(this,s) ((s) * 1000000000ll / (this)->current_format.info.raw.rate)
#define BYTES_TO_SAMPLES(this,b) ((b)/(this)->bpf)
#define BYTES_TO_TIME(this,b) SAMPLES_TO_TIME(this, BYTES_TO_SAMPLES (this, b))
typedef struct _SpaAudioTestSrc SpaAudioTestSrc;
typedef struct {
@ -56,13 +61,13 @@ struct _ATSBuffer {
struct _SpaAudioTestSrc {
SpaHandle handle;
SpaNode node;
SpaClock clock;
SpaAudioTestSrcProps props[2];
SpaNodeEventCallback event_cb;
void *user_data;
SpaPollItem idle;
SpaPollItem timer;
SpaPollFd fds[1];
struct itimerspec timerspec;
@ -228,6 +233,95 @@ send_have_output (SpaAudioTestSrc *this)
return SPA_RESULT_OK;
}
static SpaResult
fill_buffer (SpaAudioTestSrc *this, ATSBuffer *b)
{
uint8_t *p = b->ptr;
size_t i;
for (i = 0; i < b->size; i++) {
p[i] = rand();
}
this->sample_count += b->size / this->bpf;
this->elapsed_time = SAMPLES_TO_TIME (this, this->sample_count);
return SPA_RESULT_OK;
}
static int audiotestsrc_on_output (SpaPollNotifyData *data);
static SpaResult
update_poll_enabled (SpaAudioTestSrc *this, bool enabled)
{
SpaNodeEvent event;
if (this->event_cb) {
event.type = SPA_NODE_EVENT_TYPE_UPDATE_POLL;
this->timer.enabled = enabled;
if (this->props[1].live) {
if (enabled) {
uint64_t next_time = this->start_time + this->elapsed_time;
fprintf (stderr, "%"PRIu64"\n", next_time);
this->timerspec.it_value.tv_sec = next_time / 1000000000;
this->timerspec.it_value.tv_nsec = next_time % 1000000000;
}
else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 0;
}
timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
this->timer.fds = this->fds;
this->timer.n_fds = 1;
this->timer.idle_cb = NULL;
this->timer.after_cb = audiotestsrc_on_output;
} else {
this->timer.fds = NULL;
this->timer.n_fds = 0;
this->timer.idle_cb = audiotestsrc_on_output;
this->timer.after_cb = NULL;
}
event.data = &this->timer;
event.size = sizeof (this->timer);
this->event_cb (&this->node, &event, this->user_data);
}
return SPA_RESULT_OK;
}
static int
audiotestsrc_on_output (SpaPollNotifyData *data)
{
SpaAudioTestSrc *this = data->user_data;
ATSBuffer *b;
SPA_QUEUE_POP_HEAD (&this->empty, ATSBuffer, next, b);
fprintf (stderr, "on_output %p\n", b);
if (b == NULL) {
update_poll_enabled (this, false);
return 0;
}
fill_buffer (this, b);
if (this->props[1].live) {
uint64_t expirations, next_time;
if (read (this->fds[0].fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd");
next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / 1000000000;
this->timerspec.it_value.tv_nsec = next_time % 1000000000;
timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
b->next = NULL;
SPA_QUEUE_PUSH_TAIL (&this->ready, ATSBuffer, next, b);
send_have_output (this);
return 0;
}
static void
update_state (SpaAudioTestSrc *this, SpaNodeState state)
{
@ -248,37 +342,6 @@ update_state (SpaAudioTestSrc *this, SpaNodeState state)
}
}
static SpaResult
update_poll_enabled (SpaAudioTestSrc *this, bool enabled)
{
SpaNodeEvent event;
if (this->event_cb) {
event.type = SPA_NODE_EVENT_TYPE_UPDATE_POLL;
if (this->props[1].live) {
this->timer.enabled = enabled;
if (enabled) {
uint64_t next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / 1000000000;
this->timerspec.it_value.tv_nsec = next_time % 1000000000;
}
else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 0;
}
timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
event.data = &this->timer;
event.size = sizeof (this->timer);
} else {
this->idle.enabled = enabled;
event.data = &this->idle;
event.size = sizeof (this->idle);
}
this->event_cb (&this->node, &event, this->user_data);
}
return SPA_RESULT_OK;
}
static SpaResult
spa_audiotestsrc_node_send_command (SpaNode *node,
SpaNodeCommand *command)
@ -308,7 +371,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node,
return SPA_RESULT_OK;
clock_gettime (CLOCK_MONOTONIC, &now);
this->start_time = now.tv_sec * 1000000000ll + now.tv_nsec;
this->start_time = TIMESPEC_TO_TIME (&now);
this->elapsed_time = 0;
this->started = true;
@ -356,11 +419,8 @@ spa_audiotestsrc_node_set_event_callback (SpaNode *node,
if (event_cb == NULL && this->event_cb) {
event.type = SPA_NODE_EVENT_TYPE_REMOVE_POLL;
if (this->props[1].live)
event.data = &this->timer;
else
event.data = &this->idle;
event.size = sizeof (this->idle);
event.data = &this->timer;
event.size = sizeof (this->timer);
this->event_cb (&this->node, &event, this->user_data);
}
@ -369,11 +429,8 @@ spa_audiotestsrc_node_set_event_callback (SpaNode *node,
if (this->event_cb) {
event.type = SPA_NODE_EVENT_TYPE_ADD_POLL;
if (this->props[1].live)
event.data = &this->timer;
else
event.data = &this->idle;
event.size = sizeof (this->idle);
event.data = &this->timer;
event.size = sizeof (this->timer);
this->event_cb (&this->node, &event, this->user_data);
}
return SPA_RESULT_OK;
@ -508,14 +565,13 @@ spa_audiotestsrc_node_port_set_format (SpaNode *node,
if ((res = spa_format_audio_parse (format, &this->current_format)) < 0)
return res;
this->bpf = (2 * this->current_format.info.raw.channels);
this->have_format = true;
}
if (this->have_format) {
this->info.maxbuffering = -1;
this->info.latency = -1;
this->bpf = (2 * this->current_format.info.raw.channels);
this->info.latency = BYTES_TO_TIME (this, 1024);
this->info.n_params = 1;
this->info.params = this->params;
@ -646,7 +702,7 @@ spa_audiotestsrc_node_port_use_buffers (SpaNode *node,
b->size = d[0].mem.size;
b->next = NULL;
SPA_QUEUE_PUSH_TAIL (&this->empty, ATSBuffer, b);
SPA_QUEUE_PUSH_TAIL (&this->empty, ATSBuffer, next, b);
}
this->n_buffers = n_buffers;
this->have_buffers = true;
@ -716,9 +772,9 @@ spa_audiotestsrc_node_port_reuse_buffer (SpaNode *node,
b->outstanding = false;
b->next = NULL;
SPA_QUEUE_PUSH_TAIL (&this->empty, ATSBuffer, b);
SPA_QUEUE_PUSH_TAIL (&this->empty, ATSBuffer, next, b);
if (this->empty.length == 1)
if (this->empty.length == 1 && !this->props[1].live)
update_poll_enabled (this, true);
return SPA_RESULT_OK;
@ -755,72 +811,6 @@ spa_audiotestsrc_node_port_push_input (SpaNode *node,
{
return SPA_RESULT_INVALID_PORT;
}
static SpaResult
fill_buffer (SpaAudioTestSrc *this, ATSBuffer *b)
{
uint8_t *p = b->ptr;
size_t i;
for (i = 0; i < b->size; i++) {
p[i] = rand();
}
this->sample_count += b->size / this->bpf;
this->elapsed_time = this->sample_count * 1000000000ll / this->current_format.info.raw.rate;
return SPA_RESULT_OK;
}
static int
audiotestsrc_idle (SpaPollNotifyData *data)
{
SpaAudioTestSrc *this = data->user_data;
ATSBuffer *empty;
SPA_QUEUE_POP_HEAD (&this->empty, ATSBuffer, empty);
if (!empty) {
update_poll_enabled (this, false);
return 0;
}
fill_buffer (this, empty);
empty->next = NULL;
SPA_QUEUE_PUSH_TAIL (&this->ready, ATSBuffer, empty);
send_have_output (this);
return 0;
}
static int
audiotestsrc_on_timer (SpaPollNotifyData *data)
{
SpaAudioTestSrc *this = data->user_data;
ATSBuffer *empty;
uint64_t expirations, next_time;
if (read (this->fds[0].fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd");
SPA_QUEUE_POP_HEAD (&this->empty, ATSBuffer, empty);
if (empty == NULL)
return 0;
fill_buffer (this, empty);
next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / 1000000000;
this->timerspec.it_value.tv_nsec = next_time % 1000000000;
timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
empty->next = NULL;
SPA_QUEUE_PUSH_TAIL (&this->ready, ATSBuffer, empty);
send_have_output (this);
return 0;
}
static SpaResult
spa_audiotestsrc_node_port_pull_output (SpaNode *node,
unsigned int n_info,
@ -850,7 +840,7 @@ spa_audiotestsrc_node_port_pull_output (SpaNode *node,
continue;
}
SPA_QUEUE_POP_HEAD (&this->ready, ATSBuffer, b);
SPA_QUEUE_POP_HEAD (&this->ready, ATSBuffer, next, b);
if (b == NULL) {
info[i].status = SPA_RESULT_UNEXPECTED;
have_error = true;
@ -903,6 +893,56 @@ static const SpaNode audiotestsrc_node = {
spa_audiotestsrc_node_port_push_event,
};
static SpaResult
spa_audiotestsrc_clock_get_props (SpaClock *clock,
SpaProps **props)
{
return SPA_RESULT_NOT_IMPLEMENTED;
}
static SpaResult
spa_audiotestsrc_clock_set_props (SpaClock *clock,
const SpaProps *props)
{
return SPA_RESULT_NOT_IMPLEMENTED;
}
static SpaResult
spa_audiotestsrc_clock_get_time (SpaClock *clock,
int32_t *rate,
int64_t *ticks,
int64_t *monotonic_time)
{
struct timespec now;
uint64_t tnow;
if (clock == NULL || clock->handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
if (rate)
*rate = 1000000000;
clock_gettime (CLOCK_MONOTONIC, &now);
tnow = TIMESPEC_TO_TIME (&now);
if (ticks)
*ticks = tnow;
if (monotonic_time)
*monotonic_time = tnow;
return SPA_RESULT_OK;
}
static const SpaClock audiotestsrc_clock = {
NULL,
sizeof (SpaClock),
NULL,
SPA_CLOCK_STATE_STOPPED,
spa_audiotestsrc_clock_get_props,
spa_audiotestsrc_clock_set_props,
spa_audiotestsrc_clock_get_time,
};
static SpaResult
spa_audiotestsrc_get_interface (SpaHandle *handle,
uint32_t interface_id,
@ -919,6 +959,9 @@ spa_audiotestsrc_get_interface (SpaHandle *handle,
case SPA_INTERFACE_ID_NODE:
*interface = &this->node;
break;
case SPA_INTERFACE_ID_CLOCK:
*interface = &this->clock;
break;
default:
return SPA_RESULT_UNKNOWN_INTERFACE;
}
@ -928,6 +971,15 @@ spa_audiotestsrc_get_interface (SpaHandle *handle,
static SpaResult
audiotestsrc_clear (SpaHandle *handle)
{
SpaAudioTestSrc *this;
if (handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = (SpaAudioTestSrc *) handle;
close (this->fds[0].fd);
return SPA_RESULT_OK;
}
@ -947,6 +999,8 @@ audiotestsrc_init (const SpaHandleFactory *factory,
this = (SpaAudioTestSrc *) handle;
this->node = audiotestsrc_node;
this->node.handle = handle;
this->clock = audiotestsrc_clock;
this->clock.handle = handle;
this->props[1].props.n_prop_info = PROP_ID_LAST;
this->props[1].props.prop_info = prop_info;
reset_audiotestsrc_props (&this->props[1]);
@ -954,15 +1008,6 @@ audiotestsrc_init (const SpaHandleFactory *factory,
SPA_QUEUE_INIT (&this->empty);
SPA_QUEUE_INIT (&this->ready);
this->idle.id = 0;
this->idle.enabled = false;
this->idle.fds = NULL;
this->idle.n_fds = 0;
this->idle.idle_cb = audiotestsrc_idle;
this->idle.before_cb = NULL;
this->idle.after_cb = NULL;
this->idle.user_data = this;
this->fds[0].fd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
this->fds[0].events = POLLIN | POLLPRI | POLLERR;
this->fds[0].revents = 0;
@ -973,11 +1018,9 @@ audiotestsrc_init (const SpaHandleFactory *factory,
this->timer.id = 0;
this->timer.enabled = false;
this->timer.fds = this->fds;
this->timer.n_fds = 1;
this->timer.idle_cb = NULL;
this->timer.before_cb = NULL;
this->timer.after_cb = audiotestsrc_on_timer;
this->timer.after_cb = NULL;
this->timer.user_data = this;
this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
@ -998,6 +1041,10 @@ static const SpaInterfaceInfo audiotestsrc_interfaces[] =
SPA_INTERFACE_ID_NODE_NAME,
SPA_INTERFACE_ID_NODE_DESCRIPTION,
},
{ SPA_INTERFACE_ID_CLOCK,
SPA_INTERFACE_ID_CLOCK_NAME,
SPA_INTERFACE_ID_CLOCK_DESCRIPTION,
},
};
static SpaResult