alsa: work on ringbuffer data transport

Add ringbuffer test
This commit is contained in:
Wim Taymans 2017-04-20 19:25:14 +02:00
parent 0b508db9fc
commit f0aafb5b51
9 changed files with 589 additions and 113 deletions

View file

@ -266,7 +266,6 @@ spa_alsa_clear_buffers (SpaALSASink *this)
if (this->n_buffers > 0) {
spa_list_init (&this->ready);
this->n_buffers = 0;
this->ringbuffer = NULL;
}
return SPA_RESULT_OK;
}
@ -450,8 +449,6 @@ spa_alsa_sink_node_port_use_buffers (SpaNode *node,
b->h = spa_buffer_find_meta (b->outbuf, SPA_META_TYPE_HEADER);
b->rb = spa_buffer_find_meta (b->outbuf, SPA_META_TYPE_RINGBUFFER);
if (b->rb)
this->ringbuffer = b;
switch (buffers[i]->datas[0].type) {
case SPA_DATA_TYPE_MEMFD:
@ -583,13 +580,10 @@ spa_alsa_sink_node_process_input (SpaNode *node)
input->status = SPA_RESULT_INVALID_BUFFER_ID;
return SPA_RESULT_ERROR;
}
if (this->ringbuffer) {
this->ringbuffer->outstanding = true;
this->ringbuffer = b;
} else {
spa_list_insert (this->ready.prev, &b->link);
spa_log_trace (this->log, "alsa-sink %p: queue buffer %u", this, input->buffer_id);
}
spa_log_trace (this->log, "alsa-sink %p: queue buffer %u", this, input->buffer_id);
spa_list_insert (this->ready.prev, &b->link);
b->outstanding = false;
input->buffer_id = SPA_ID_INVALID;
input->status = SPA_RESULT_OK;

View file

@ -325,11 +325,11 @@ set_swparams (SpaALSAState *state)
return 0;
}
static snd_pcm_uframes_t
pull_frames_queue (SpaALSAState *state,
const snd_pcm_channel_area_t *my_areas,
snd_pcm_uframes_t offset,
snd_pcm_uframes_t frames)
static inline snd_pcm_uframes_t
pull_frames (SpaALSAState *state,
const snd_pcm_channel_area_t *my_areas,
snd_pcm_uframes_t offset,
snd_pcm_uframes_t frames)
{
snd_pcm_uframes_t total_frames = 0, to_write = frames;
SpaPortIO *io = state->io;
@ -348,22 +348,46 @@ pull_frames_queue (SpaALSAState *state,
size_t n_bytes, n_frames, size;
off_t offs;
SpaALSABuffer *b;
bool reuse = false;
SpaData *d;
b = spa_list_first (&state->ready, SpaALSABuffer, link);
d = b->outbuf->datas;
offs = SPA_MIN (b->outbuf->datas[0].chunk->offset, b->outbuf->datas[0].maxsize);
src = SPA_MEMBER (b->outbuf->datas[0].data, offs, uint8_t);
size = SPA_MIN (b->outbuf->datas[0].chunk->size, b->outbuf->datas[0].maxsize - offs);
src = SPA_MEMBER (src, state->ready_offset, uint8_t);
dst = SPA_MEMBER (my_areas[0].addr, offset * state->frame_size, uint8_t);
n_bytes = SPA_MIN (size - state->ready_offset, to_write * state->frame_size);
n_frames = SPA_MIN (to_write, n_bytes / state->frame_size);
memcpy (dst, src, n_bytes);
if (b->rb) {
SpaRingbuffer *ringbuffer = &b->rb->ringbuffer;
uint32_t index;
int32_t avail;
state->ready_offset += n_bytes;
if (state->ready_offset >= size) {
avail = spa_ringbuffer_get_read_index (ringbuffer, &index);
n_bytes = SPA_MIN (avail, to_write * state->frame_size);
n_frames = SPA_MIN (to_write, n_bytes / state->frame_size);
spa_ringbuffer_read_data (ringbuffer,
d[0].data,
index & ringbuffer->mask,
dst,
n_bytes);
spa_ringbuffer_read_advance (ringbuffer, n_bytes);
reuse = avail == n_bytes;
} else {
offs = SPA_MIN (d[0].chunk->offset + state->ready_offset, d[0].maxsize);
size = SPA_MIN (d[0].chunk->size, d[0].maxsize - offs);
src = SPA_MEMBER (d[0].data, offs, uint8_t);
n_bytes = SPA_MIN (size, to_write * state->frame_size);
n_frames = SPA_MIN (to_write, n_bytes / state->frame_size);
memcpy (dst, src, n_bytes);
state->ready_offset += n_bytes;
reuse = (state->ready_offset >= size);
}
if (reuse) {
SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (state->type.event_node.ReuseBuffer,
0, b->outbuf->id);
@ -378,7 +402,7 @@ pull_frames_queue (SpaALSAState *state,
to_write -= n_frames;
}
if (total_frames == 0) {
total_frames = state->threshold;
total_frames = SPA_MIN (frames, state->threshold);
spa_log_warn (state->log, "underrun, want %zd frames", total_frames);
snd_pcm_areas_silence (my_areas, offset, state->channels, total_frames, state->format);
}
@ -386,58 +410,13 @@ pull_frames_queue (SpaALSAState *state,
}
static snd_pcm_uframes_t
pull_frames_ringbuffer (SpaALSAState *state,
const snd_pcm_channel_area_t *my_areas,
snd_pcm_uframes_t offset,
snd_pcm_uframes_t frames)
{
int32_t avail;
uint32_t index;
size_t size;
SpaALSABuffer *b;
uint8_t *src, *dst;
b = state->ringbuffer;
src = b->outbuf->datas[0].data;
dst = SPA_MEMBER (my_areas[0].addr, offset * state->frame_size, uint8_t);
avail = spa_ringbuffer_get_read_index (&b->rb->ringbuffer, &index);
size = SPA_MIN (avail, frames * state->frame_size);
spa_log_trace (state->log, "%u %d %zd %zd", index, avail, offset, size);
if (size > 0) {
spa_ringbuffer_read_data (&b->rb->ringbuffer,
src,
index & b->rb->ringbuffer.mask,
dst,
size);
spa_ringbuffer_read_advance (&b->rb->ringbuffer, size);
frames = size / state->frame_size;
} else {
spa_log_warn (state->log, "underrun");
snd_pcm_areas_silence (my_areas, offset, state->channels, frames, state->format);
}
b->outstanding = true;
{
SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (state->type.event_node.ReuseBuffer,
0, b->outbuf->id);
state->event_cb (&state->node, (SpaEvent*)&rb, state->user_data);
}
return frames;
}
static snd_pcm_uframes_t
push_frames_queue (SpaALSAState *state,
const snd_pcm_channel_area_t *my_areas,
snd_pcm_uframes_t offset,
snd_pcm_uframes_t frames)
push_frames (SpaALSAState *state,
const snd_pcm_channel_area_t *my_areas,
snd_pcm_uframes_t offset,
snd_pcm_uframes_t frames)
{
snd_pcm_uframes_t total_frames = 0;
SpaPortIO *io = state->io;
if (spa_list_is_empty (&state->free)) {
spa_log_warn (state->log, "no more buffers");
@ -447,7 +426,6 @@ push_frames_queue (SpaALSAState *state,
size_t n_bytes;
SpaALSABuffer *b;
SpaData *d;
SpaPortIO *io;
b = spa_list_first (&state->free, SpaALSABuffer, link);
spa_list_remove (&b->link);
@ -470,7 +448,7 @@ push_frames_queue (SpaALSAState *state,
d[0].chunk->size = n_bytes;
d[0].chunk->stride = 0;
if ((io = state->io)) {
{
SpaEvent event = SPA_EVENT_INIT (state->type.event_node.HaveOutput);
b->outstanding = true;
@ -483,15 +461,6 @@ push_frames_queue (SpaALSAState *state,
return total_frames;
}
static snd_pcm_uframes_t
push_frames_ringbuffer (SpaALSAState *state,
const snd_pcm_channel_area_t *my_areas,
snd_pcm_uframes_t offset,
snd_pcm_uframes_t frames)
{
return frames;
}
static int
alsa_try_resume (SpaALSAState *state)
{
@ -518,9 +487,9 @@ calc_timeout (size_t target,
ts->tv_sec = now->tv_sec;
ts->tv_nsec = now->tv_nsec;
if (target > current)
ts->tv_nsec += (target - current) * SPA_NSEC_PER_SEC / rate;
ts->tv_nsec += ((target - current) * SPA_NSEC_PER_SEC) / rate;
while (ts->tv_nsec > SPA_NSEC_PER_SEC) {
while (ts->tv_nsec >= SPA_NSEC_PER_SEC) {
ts->tv_sec++;
ts->tv_nsec -= SPA_NSEC_PER_SEC;
}
@ -579,10 +548,7 @@ alsa_on_playback_timeout_event (SpaSource *source)
return;
}
if (state->ringbuffer)
written = pull_frames_ringbuffer (state, my_areas, offset, frames);
else
written = pull_frames_queue (state, my_areas, offset, frames);
written = pull_frames (state, my_areas, offset, frames);
if (written < frames)
to_write = 0;
@ -607,8 +573,9 @@ alsa_on_playback_timeout_event (SpaSource *source)
calc_timeout (total_written + filled, state->threshold, state->rate, &htstamp, &ts.it_value);
spa_log_trace (state->log, "timeout %ld %ld %ld %ld", total_written, filled,
ts.it_value.tv_sec, ts.it_value.tv_nsec);
spa_log_trace (state->log, "timeout %ld %ld %ld %ld %ld", total_written, filled,
ts.it_value.tv_sec, ts.it_value.tv_nsec,
ts.it_value.tv_nsec - htstamp.tv_nsec);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
@ -663,12 +630,8 @@ alsa_on_capture_timeout_event (SpaSource *source)
return;
}
if (state->ringbuffer)
read = push_frames_ringbuffer (state, my_areas, offset, frames);
else
read = push_frames_queue (state, my_areas, offset, frames);
if (read < to_read)
read = push_frames (state, my_areas, offset, frames);
if (read < frames)
to_read = 0;
if ((res = snd_pcm_mmap_commit (hndl, offset, read)) < 0) {

View file

@ -144,8 +144,6 @@ struct _SpaALSAState {
SpaALSABuffer buffers[MAX_BUFFERS];
unsigned int n_buffers;
bool use_ringbuffer;
SpaALSABuffer *ringbuffer;
SpaList free;
SpaList ready;