diff --git a/spa/include/spa/port.h b/spa/include/spa/port.h index a9ec20160..78d02f362 100644 --- a/spa/include/spa/port.h +++ b/spa/include/spa/port.h @@ -81,6 +81,8 @@ typedef struct { * @SPA_PORT_INFO_FLAG_IN_PLACE: the port can process data in-place and will need * a writable input buffer * @SPA_PORT_INFO_FLAG_NO_REF: the port does not keep a ref on the buffer + * @SPA_PORT_INFO_FLAG_LIVE: output buffers from this port are timestamped against + * a live clock. */ typedef enum { SPA_PORT_INFO_FLAG_NONE = 0, @@ -90,6 +92,7 @@ typedef enum { SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS = 1 << 3, SPA_PORT_INFO_FLAG_IN_PLACE = 1 << 4, SPA_PORT_INFO_FLAG_NO_REF = 1 << 5, + SPA_PORT_INFO_FLAG_LIVE = 1 << 6, } SpaPortInfoFlags; /** diff --git a/spa/include/spa/queue.h b/spa/include/spa/queue.h new file mode 100644 index 000000000..14aadbe0a --- /dev/null +++ b/spa/include/spa/queue.h @@ -0,0 +1,66 @@ +/* Simple Plugin API + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_QUEUE_H__ +#define __SPA_QUEUE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _SpaQueue SpaQueue; + +#include + +struct _SpaQueue { + void *head, *tail; + unsigned int length; +}; + +#define SPA_QUEUE_INIT(q) \ + do { \ + (q)->head = (q)->tail = NULL; \ + (q)->length = 0; \ + } while (0); + +#define SPA_QUEUE_PUSH_TAIL(q,t,i) \ + do { \ + if ((q)->tail) \ + ((t*)(q)->tail)->next = (i); \ + (q)->tail = (i); \ + if ((q)->head == NULL) \ + (q)->head = (i); \ + (q)->length++; \ + } while (0); + +#define SPA_QUEUE_POP_HEAD(q,t,i) \ + do { \ + if (((i) = (t*)((q)->head)) == NULL) \ + break; \ + (q)->head = (i)->next; \ + if ((q)->head == NULL) \ + (q)->tail = NULL; \ + (q)->length--; \ + } while (0); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_QUEUE_H__ */ diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 50ce7838f..7d3901939 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -315,7 +315,7 @@ spa_alsa_sink_node_port_set_format (SpaNode *node, SpaALSASink *this; SpaResult res; - if (node == NULL || node->handle == NULL || format == NULL) + if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = (SpaALSASink *) node->handle; @@ -325,6 +325,7 @@ spa_alsa_sink_node_port_set_format (SpaNode *node, if (format == NULL) { this->have_format = false; + this->have_buffers = false; update_state (this, SPA_NODE_STATE_CONFIGURE); return SPA_RESULT_OK; } @@ -501,6 +502,7 @@ spa_alsa_sink_node_port_alloc_buffers (SpaNode *node, buffers[i] = &b->buffer; } *n_buffers = n_bufs; + this->have_buffers = true; update_state (this, SPA_NODE_STATE_PAUSED); @@ -563,13 +565,12 @@ spa_alsa_sink_node_port_push_input (SpaNode *node, continue; } - if (this->ready_head != NULL) { + if (this->ready.length != 0) { info[i].status = SPA_RESULT_HAVE_ENOUGH_INPUT; have_enough = true; continue; } - - this->ready_head = &this->alloc_buffers[info[i].buffer_id]; + SPA_QUEUE_PUSH_TAIL (&this->ready, SpaALSABuffer, &this->alloc_buffers[info[i].buffer_id]); } info[i].status = SPA_RESULT_OK; } diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index 43974a7c7..69a712898 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -301,13 +301,10 @@ mmap_read (SpaALSAState *state) snd_pcm_status_get_htstamp (status, &htstamp); now = (int64_t)htstamp.tv_sec * 1000000000ll + (int64_t)htstamp.tv_nsec; - b = state->free_head; - if (b == NULL) + SPA_QUEUE_POP_HEAD (&state->free, SpaALSABuffer, b); + if (b == NULL) { fprintf (stderr, "no more buffers\n"); - else { - state->free_head = b->next; - if (state->free_head == NULL) - state->free_tail = NULL; + } else { b->next = NULL; dest = b->ptr; } @@ -349,12 +346,7 @@ mmap_read (SpaALSAState *state) d = SPA_BUFFER_DATAS (b->outbuf); d[0].mem.size = avail * state->frame_size; - if (state->ready_tail) - state->ready_tail->next = b; - state->ready_tail = b; - if (state->ready_head == NULL) - state->ready_head = b; - state->ready_count++; + SPA_QUEUE_PUSH_TAIL (&state->ready, SpaALSABuffer, b); event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; event.size = sizeof (ho); diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 23ced7678..071d485fd 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -18,10 +18,14 @@ */ #include +#include #include #include +#include +#include #include +#include #include typedef struct _SpaAudioTestSrc SpaAudioTestSrc; @@ -31,6 +35,7 @@ typedef struct { uint32_t wave; double freq; double volume; + bool live; } SpaAudioTestSrcProps; typedef struct _ATSBuffer ATSBuffer; @@ -43,6 +48,7 @@ struct _ATSBuffer { SpaBuffer *outbuf; bool outstanding; ATSBuffer *next; + SpaMetaHeader *h; void *ptr; size_t size; }; @@ -57,6 +63,9 @@ struct _SpaAudioTestSrc { void *user_data; SpaPollItem idle; + SpaPollItem timer; + SpaPollFd fds[1]; + struct itimerspec timerspec; SpaPortInfo info; SpaAllocParam *params[1]; @@ -66,22 +75,29 @@ struct _SpaAudioTestSrc { bool have_format; SpaFormatAudio query_format; SpaFormatAudio current_format; + size_t bpf; bool have_buffers; SpaMemory *alloc_mem; ATSBuffer *alloc_buffers; unsigned int n_buffers; - ATSBuffer *empty; - ATSBuffer *ready; - unsigned int ready_count; + bool started; + uint64_t start_time; + uint64_t elapsed_time; + + uint64_t sample_count; + SpaQueue empty; + SpaQueue ready; }; -static const uint32_t default_wave = 1.0; -static const double default_volume = 1.0; +#define DEFAULT_WAVE 0 +#define DEFAULT_VOLUME 1.0 +#define DEFAULT_FREQ 440.0 +#define DEFAULT_LIVE true + static const double min_volume = 0.0; static const double max_volume = 10.0; -static const double default_freq = 440.0; static const double min_freq = 0.0; static const double max_freq = 50000000.0; @@ -107,6 +123,7 @@ enum { PROP_ID_WAVE, PROP_ID_FREQ, PROP_ID_VOLUME, + PROP_ID_LIVE, PROP_ID_LAST, }; @@ -130,14 +147,23 @@ static const SpaPropInfo prop_info[] = SPA_PROP_TYPE_DOUBLE, sizeof (double), SPA_PROP_RANGE_TYPE_MIN_MAX, 2, volume_range, NULL }, + { PROP_ID_LIVE, offsetof (SpaAudioTestSrcProps, live), + "live", "Timestamp against the clock", + SPA_PROP_FLAG_READWRITE, + SPA_PROP_TYPE_BOOL, sizeof (bool), + SPA_PROP_RANGE_TYPE_NONE, 0, NULL, + NULL }, }; -static void +static SpaResult reset_audiotestsrc_props (SpaAudioTestSrcProps *props) { - props->wave = default_wave; - props->freq = default_freq; - props->volume = default_volume; + props->wave = DEFAULT_WAVE; + props->freq = DEFAULT_FREQ; + props->volume = DEFAULT_VOLUME; + props->live = DEFAULT_LIVE; + + return SPA_RESULT_OK; } static SpaResult @@ -172,10 +198,15 @@ spa_audiotestsrc_node_set_props (SpaNode *node, p = &this->props[1]; if (props == NULL) { - reset_audiotestsrc_props (p); - return SPA_RESULT_OK; + res = reset_audiotestsrc_props (p); + } else { + res = spa_props_copy (props, &p->props); } - res = spa_props_copy (props, &p->props); + + if (this->props[1].live) + this->info.flags |= SPA_PORT_INFO_FLAG_LIVE; + else + this->info.flags &= ~SPA_PORT_INFO_FLAG_LIVE; return res; } @@ -218,15 +249,31 @@ update_state (SpaAudioTestSrc *this, SpaNodeState state) } static SpaResult -update_idle_enabled (SpaAudioTestSrc *this, bool enabled) +update_poll_enabled (SpaAudioTestSrc *this, bool enabled) { SpaNodeEvent event; if (this->event_cb) { - this->idle.enabled = enabled; event.type = SPA_NODE_EVENT_TYPE_UPDATE_POLL; - event.data = &this->idle; - event.size = sizeof (this->idle); + if (this->props[1].live) { + this->timer.enabled = enabled; + if (enabled) { + uint64_t next_time = this->start_time + this->elapsed_time; + this->timerspec.it_value.tv_sec = next_time / 1000000000; + this->timerspec.it_value.tv_nsec = next_time % 1000000000; + } + else { + this->timerspec.it_value.tv_sec = 0; + this->timerspec.it_value.tv_nsec = 0; + } + timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); + event.data = &this->timer; + event.size = sizeof (this->timer); + } else { + this->idle.enabled = enabled; + event.data = &this->idle; + event.size = sizeof (this->idle); + } this->event_cb (&this->node, &event, this->user_data); } return SPA_RESULT_OK; @@ -249,14 +296,23 @@ spa_audiotestsrc_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_START: { + struct timespec now; + if (!this->have_format) return SPA_RESULT_NO_FORMAT; if (!this->have_buffers) return SPA_RESULT_NO_BUFFERS; - update_idle_enabled (this, true); + if (this->started) + return SPA_RESULT_OK; + clock_gettime (CLOCK_MONOTONIC, &now); + this->start_time = now.tv_sec * 1000000000ll + now.tv_nsec; + this->elapsed_time = 0; + + this->started = true; + update_poll_enabled (this, true); update_state (this, SPA_NODE_STATE_STREAMING); break; } @@ -268,8 +324,11 @@ spa_audiotestsrc_node_send_command (SpaNode *node, if (!this->have_buffers) return SPA_RESULT_NO_BUFFERS; - update_idle_enabled (this, false); + if (!this->started) + return SPA_RESULT_OK; + this->started = false; + update_poll_enabled (this, false); update_state (this, SPA_NODE_STATE_PAUSED); break; } @@ -297,7 +356,10 @@ spa_audiotestsrc_node_set_event_callback (SpaNode *node, if (event_cb == NULL && this->event_cb) { event.type = SPA_NODE_EVENT_TYPE_REMOVE_POLL; - event.data = &this->idle; + if (this->props[1].live) + event.data = &this->timer; + else + event.data = &this->idle; event.size = sizeof (this->idle); this->event_cb (&this->node, &event, this->user_data); } @@ -307,7 +369,10 @@ spa_audiotestsrc_node_set_event_callback (SpaNode *node, if (this->event_cb) { event.type = SPA_NODE_EVENT_TYPE_ADD_POLL; - event.data = &this->idle; + if (this->props[1].live) + event.data = &this->timer; + else + event.data = &this->idle; event.size = sizeof (this->idle); this->event_cb (&this->node, &event, this->user_data); } @@ -412,6 +477,7 @@ clear_buffers (SpaAudioTestSrc *this) if (this->alloc_mem) spa_memory_unref (&this->alloc_mem->mem); this->alloc_mem = NULL; + this->alloc_buffers = NULL; this->n_buffers = 0; this->have_buffers = false; } @@ -446,10 +512,11 @@ spa_audiotestsrc_node_port_set_format (SpaNode *node, } if (this->have_format) { - this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; this->info.maxbuffering = -1; this->info.latency = -1; + this->bpf = (2 * this->current_format.info.raw.channels); + this->info.n_params = 1; this->info.params = this->params; this->params[0] = &this->param_buffers.param; @@ -563,11 +630,6 @@ spa_audiotestsrc_node_port_use_buffers (SpaNode *node, SpaData *d = SPA_BUFFER_DATAS (buffers[i]); b = &this->alloc_buffers[i]; - if (i + 1 == n_buffers) - b->next = NULL; - else - b->next = &this->alloc_buffers[i + 1]; - b->buffer.mem.mem = this->alloc_mem->mem; b->buffer.mem.offset = sizeof (ATSBuffer) * i; b->buffer.mem.size = sizeof (ATSBuffer); @@ -582,10 +644,12 @@ spa_audiotestsrc_node_port_use_buffers (SpaNode *node, } b->ptr = SPA_MEMBER (spa_memory_ensure_ptr (mem), d[0].mem.offset, void); b->size = d[0].mem.size; + + b->next = NULL; + SPA_QUEUE_PUSH_TAIL (&this->empty, ATSBuffer, b); } this->n_buffers = n_buffers; this->have_buffers = true; - this->empty = this->alloc_buffers; } if (this->have_buffers) { @@ -651,11 +715,11 @@ spa_audiotestsrc_node_port_reuse_buffer (SpaNode *node, return SPA_RESULT_OK; b->outstanding = false; - b->next = this->empty; - this->empty = b; + b->next = NULL; + SPA_QUEUE_PUSH_TAIL (&this->empty, ATSBuffer, b); - if (b->next == NULL) - update_idle_enabled (this, true); + if (this->empty.length == 1) + update_poll_enabled (this, true); return SPA_RESULT_OK; } @@ -702,6 +766,9 @@ fill_buffer (SpaAudioTestSrc *this, ATSBuffer *b) p[i] = rand(); } + this->sample_count += b->size / this->bpf; + this->elapsed_time = this->sample_count * 1000000000ll / this->current_format.info.raw.rate; + return SPA_RESULT_OK; } @@ -711,18 +778,44 @@ audiotestsrc_idle (SpaPollNotifyData *data) SpaAudioTestSrc *this = data->user_data; ATSBuffer *empty; - empty = this->empty; + SPA_QUEUE_POP_HEAD (&this->empty, ATSBuffer, empty); if (!empty) { - update_idle_enabled (this, false); + update_poll_enabled (this, false); return 0; } fill_buffer (this, empty); - this->empty = empty->next; - empty->next = this->ready; - this->ready = empty; - this->ready_count++; + empty->next = NULL; + SPA_QUEUE_PUSH_TAIL (&this->ready, ATSBuffer, empty); + send_have_output (this); + + return 0; +} + +static int +audiotestsrc_on_timer (SpaPollNotifyData *data) +{ + SpaAudioTestSrc *this = data->user_data; + ATSBuffer *empty; + uint64_t expirations, next_time; + + if (read (this->fds[0].fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) + perror ("read timerfd"); + + SPA_QUEUE_POP_HEAD (&this->empty, ATSBuffer, empty); + if (empty == NULL) + return 0; + + fill_buffer (this, empty); + + next_time = this->start_time + this->elapsed_time; + this->timerspec.it_value.tv_sec = next_time / 1000000000; + this->timerspec.it_value.tv_nsec = next_time % 1000000000; + timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); + + empty->next = NULL; + SPA_QUEUE_PUSH_TAIL (&this->ready, ATSBuffer, empty); send_have_output (this); return 0; @@ -756,15 +849,13 @@ spa_audiotestsrc_node_port_pull_output (SpaNode *node, have_error = true; continue; } - if (this->ready_count == 0) { + + SPA_QUEUE_POP_HEAD (&this->ready, ATSBuffer, b); + if (b == NULL) { info[i].status = SPA_RESULT_UNEXPECTED; have_error = true; continue; } - - b = this->ready; - this->ready = b->next; - this->ready_count--; b->outstanding = true; info[i].buffer_id = b->outbuf->id; @@ -860,6 +951,9 @@ audiotestsrc_init (const SpaHandleFactory *factory, this->props[1].props.prop_info = prop_info; reset_audiotestsrc_props (&this->props[1]); + SPA_QUEUE_INIT (&this->empty); + SPA_QUEUE_INIT (&this->ready); + this->idle.id = 0; this->idle.enabled = false; this->idle.fds = NULL; @@ -869,8 +963,28 @@ audiotestsrc_init (const SpaHandleFactory *factory, this->idle.after_cb = NULL; this->idle.user_data = this; + this->fds[0].fd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + this->fds[0].events = POLLIN | POLLPRI | POLLERR; + this->fds[0].revents = 0; + this->timerspec.it_value.tv_sec = 0; + this->timerspec.it_value.tv_nsec = 0; + this->timerspec.it_interval.tv_sec = 0; + this->timerspec.it_interval.tv_nsec = 0; + + this->timer.id = 0; + this->timer.enabled = false; + this->timer.fds = this->fds; + this->timer.n_fds = 1; + this->timer.idle_cb = NULL; + this->timer.before_cb = NULL; + this->timer.after_cb = audiotestsrc_on_timer; + this->timer.user_data = this; + this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | SPA_PORT_INFO_FLAG_NO_REF; + if (this->props[1].live) + this->info.flags |= SPA_PORT_INFO_FLAG_LIVE; + this->status.flags = SPA_PORT_STATUS_FLAG_NONE; this->node.state = SPA_NODE_STATE_CONFIGURE;