buffer: don't use ringbuffer in chunk

We can't use a ringbuffer on the chunk because it implies the
consumer would write to it to update the read position, which we
can't do because the chunk is read-only and might even be shared.
Go back to offset/size pairs, which can sortof do the same thing
if we want later when we keep a non-shared read pointer in the
consumer.
Keep alsa timestamp around and filled state for future.
mmap the input port meta/data/chunk as read-only.
Only do clock update requests when asked.
This commit is contained in:
Wim Taymans 2017-11-21 12:30:15 +01:00
parent 2ad722b579
commit 4288a634f4
25 changed files with 165 additions and 126 deletions

View file

@ -316,12 +316,29 @@ static int set_swparams(struct state *state)
return 0;
}
static inline void try_pull(struct state *state, snd_pcm_uframes_t frames, bool do_pull)
static inline void calc_timeout(size_t target, size_t current,
size_t rate, snd_htimestamp_t *now,
struct timespec *ts)
{
ts->tv_sec = now->tv_sec;
ts->tv_nsec = now->tv_nsec;
if (target > current)
ts->tv_nsec += ((target - current) * SPA_NSEC_PER_SEC) / rate;
while (ts->tv_nsec >= SPA_NSEC_PER_SEC) {
ts->tv_sec++;
ts->tv_nsec -= SPA_NSEC_PER_SEC;
}
}
static inline void try_pull(struct state *state, snd_pcm_uframes_t frames,
snd_pcm_uframes_t written, bool do_pull)
{
struct spa_port_io *io = state->io;
if (spa_list_is_empty(&state->ready) && do_pull) {
spa_log_trace(state->log, "alsa-util %p: %d", state, io->status);
spa_log_trace(state->log, "alsa-util %p: %d %lu", state, io->status,
state->filled + written);
io->status = SPA_STATUS_NEED_BUFFER;
io->range.offset = state->sample_count * state->frame_size;
io->range.min_size = state->threshold * state->frame_size;
@ -340,47 +357,53 @@ pull_frames(struct state *state,
snd_pcm_uframes_t total_frames = 0, to_write = SPA_MIN(frames, state->props.max_latency);
bool underrun = false;
try_pull(state, frames, do_pull);
try_pull(state, frames, 0, do_pull);
while (!spa_list_is_empty(&state->ready) && to_write > 0) {
uint8_t *dst;
uint8_t *dst, *src;
size_t n_bytes, n_frames;
struct buffer *b;
struct spa_data *d;
struct spa_ringbuffer *ringbuffer;
uint32_t index;
int32_t avail;
uint32_t index, offs, avail, l0, l1;
b = spa_list_first(&state->ready, struct buffer, link);
d = b->outbuf->datas;
dst = SPA_MEMBER(my_areas[0].addr, offset * state->frame_size, uint8_t);
src = d[0].data;
ringbuffer = &d[0].chunk->area;
avail = spa_ringbuffer_get_read_index(ringbuffer, &index);
index = d[0].chunk->offset + state->ready_offset;
avail = d[0].chunk->size - state->ready_offset;
avail /= state->frame_size;
n_frames = SPA_MIN(avail, to_write);
n_bytes = n_frames * state->frame_size;
spa_ringbuffer_read_data(ringbuffer, d[0].data, d[0].maxsize,
index % d[0].maxsize, dst, n_bytes);
spa_ringbuffer_read_update(ringbuffer, index + n_bytes);
offs = index % d[0].maxsize;
l0 = SPA_MIN(n_bytes, d[0].maxsize - offs);
l1 = n_bytes - l0;
if (avail == n_frames || state->n_buffers == 1) {
memcpy(dst, src + offs, l0);
if (l1 > 0)
memcpy(dst + l0, src, l1);
state->ready_offset += n_bytes;
if (state->ready_offset >= d[0].chunk->size) {
spa_list_remove(&b->link);
b->outstanding = true;
spa_log_trace(state->log, "alsa-util %p: reuse buffer %u", state, b->outbuf->id);
state->callbacks->reuse_buffer(state->callbacks_data, 0, b->outbuf->id);
state->ready_offset = 0;
}
total_frames += n_frames;
to_write -= n_frames;
spa_log_trace(state->log, "alsa-util %p: %u written %lu frames, left %ld", state, index, total_frames, to_write);
spa_log_trace(state->log, "alsa-util %p: written %lu frames, left %ld",
state, total_frames, to_write);
}
try_pull(state, frames, do_pull);
try_pull(state, frames, total_frames, do_pull);
if (total_frames == 0 && do_pull) {
total_frames = SPA_MIN(frames, state->threshold);
@ -414,8 +437,7 @@ push_frames(struct state *state,
size_t n_bytes;
struct buffer *b;
struct spa_data *d;
uint32_t index, avail;
int32_t filled;
uint32_t index, offs, avail, l0, l1;
b = spa_list_first(&state->free, struct buffer, link);
spa_list_remove(&b->link);
@ -430,15 +452,21 @@ push_frames(struct state *state,
src = SPA_MEMBER(my_areas[0].addr, offset * state->frame_size, uint8_t);
filled = spa_ringbuffer_get_write_index(&d[0].chunk->area, &index);
avail = (d[0].maxsize - filled) / state->frame_size;
avail = d[0].maxsize / state->frame_size;
index = 0;
total_frames = SPA_MIN(avail, frames);
n_bytes = total_frames * state->frame_size;
spa_ringbuffer_write_data(&d[0].chunk->area, d[0].data, d[0].maxsize,
index % d[0].maxsize, src, n_bytes);
offs = index % d[0].maxsize;
l0 = SPA_MIN(n_bytes, d[0].maxsize - offs);
l1 = n_bytes - l0;
spa_ringbuffer_write_update(&d[0].chunk->area, index + n_bytes);
memcpy(src, d[0].data + offs, l0);
if (l1 > 0)
memcpy(src + l0, d[0].data, l1);
d[0].chunk->offset = index;
d[0].chunk->size = n_bytes;
d[0].chunk->stride = state->frame_size;
b->outstanding = true;
@ -464,21 +492,6 @@ static int alsa_try_resume(struct state *state)
return res;
}
static inline void calc_timeout(size_t target, size_t current,
size_t rate, snd_htimestamp_t *now,
struct timespec *ts)
{
ts->tv_sec = now->tv_sec;
ts->tv_nsec = now->tv_nsec;
if (target > current)
ts->tv_nsec += ((target - current) * SPA_NSEC_PER_SEC) / rate;
while (ts->tv_nsec >= SPA_NSEC_PER_SEC) {
ts->tv_sec++;
ts->tv_nsec -= SPA_NSEC_PER_SEC;
}
}
static void alsa_on_playback_timeout_event(struct spa_source *source)
{
uint64_t exp;
@ -487,10 +500,9 @@ static void alsa_on_playback_timeout_event(struct spa_source *source)
snd_pcm_t *hndl = state->hndl;
snd_pcm_sframes_t avail;
struct itimerspec ts;
snd_pcm_uframes_t total_written = 0, filled;
snd_pcm_uframes_t total_written = 0;
const snd_pcm_channel_area_t *my_areas;
snd_pcm_status_t *status;
snd_htimestamp_t htstamp;
if (state->started && read(state->timerfd, &exp, sizeof(uint64_t)) != sizeof(uint64_t))
spa_log_warn(state->log, "error reading timerfd: %s", strerror(errno));
@ -503,27 +515,27 @@ static void alsa_on_playback_timeout_event(struct spa_source *source)
}
avail = snd_pcm_status_get_avail(status);
snd_pcm_status_get_htstamp(status, &htstamp);
snd_pcm_status_get_htstamp(status, &state->now);
if (avail > state->buffer_frames)
avail = state->buffer_frames;
filled = state->buffer_frames - avail;
state->filled = state->buffer_frames - avail;
state->last_ticks = state->sample_count - filled;
state->last_monotonic = (int64_t) htstamp.tv_sec * SPA_NSEC_PER_SEC + (int64_t) htstamp.tv_nsec;
state->last_ticks = state->sample_count - state->filled;
state->last_monotonic = (int64_t) state->now.tv_sec * SPA_NSEC_PER_SEC + (int64_t) state->now.tv_nsec;
spa_log_trace(state->log, "timeout %ld %d %ld %ld %ld", filled, state->threshold,
state->sample_count, htstamp.tv_sec, htstamp.tv_nsec);
spa_log_trace(state->log, "timeout %ld %d %ld %ld %ld", state->filled, state->threshold,
state->sample_count, state->now.tv_sec, state->now.tv_nsec);
if (filled > state->threshold) {
if (state->filled > state->threshold) {
if (snd_pcm_state(hndl) == SND_PCM_STATE_SUSPENDED) {
spa_log_error(state->log, "suspended: try resume");
if ((res = alsa_try_resume(state)) < 0)
return;
}
} else {
snd_pcm_uframes_t to_write = state->buffer_frames - filled;
snd_pcm_uframes_t to_write = avail;
bool do_pull = true;
while (total_written < to_write) {
@ -547,9 +559,10 @@ static void alsa_on_playback_timeout_event(struct spa_source *source)
return;
}
total_written += written;
state->sample_count += written;
state->filled += written;
do_pull = false;
}
state->sample_count += total_written;
}
if (!state->alsa_started && total_written > 0) {
spa_log_debug(state->log, "snd_pcm_start");
@ -560,7 +573,7 @@ static void alsa_on_playback_timeout_event(struct spa_source *source)
state->alsa_started = true;
}
calc_timeout(total_written + filled, state->threshold, state->rate, &htstamp, &ts.it_value);
calc_timeout(state->filled, state->threshold, state->rate, &state->now, &ts.it_value);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;

View file

@ -32,7 +32,6 @@ extern "C" {
#include <spa/support/loop.h>
#include <spa/support/log.h>
#include <spa/utils/list.h>
#include <spa/utils/ringbuffer.h>
#include <spa/clock/clock.h>
#include <spa/node/node.h>
@ -150,13 +149,17 @@ struct state {
struct spa_list free;
struct spa_list ready;
size_t ready_offset;
bool started;
struct spa_source source;
int timerfd;
bool alsa_started;
int threshold;
snd_htimestamp_t now;
int64_t sample_count;
int64_t filled;
int64_t last_ticks;
int64_t last_monotonic;

View file

@ -688,20 +688,22 @@ add_port_data(struct impl *this, void *out, size_t outsize, size_t next, struct
{
size_t insize;
struct buffer *b;
uint32_t index = 0, offset, len1, len2, maxsize;
uint32_t index, offset, len1, len2, maxsize;
mix_func_t mix = layer == 0 ? this->copy : this->add;
struct spa_data *d;
void *data;
struct spa_ringbuffer *rb;
b = spa_list_first(&port->queue, struct buffer, link);
maxsize = b->outbuf->datas[0].maxsize;
data = b->outbuf->datas[0].data;
rb = &b->outbuf->datas[0].chunk->area,
d = b->outbuf->datas;
insize = spa_ringbuffer_get_read_index(rb, &index);
maxsize = d[0].maxsize;
data = d[0].data;
insize = SPA_MIN(d[0].chunk->size, maxsize);
outsize = SPA_MIN(outsize, insize);
index = d[0].chunk->offset + (insize - port->queued_bytes);
offset = index % maxsize;
len1 = SPA_MIN(outsize, maxsize - offset);
@ -709,17 +711,14 @@ add_port_data(struct impl *this, void *out, size_t outsize, size_t next, struct
if ((len2 = outsize - len1) > 0)
mix(out + len1, data, len2);
spa_ringbuffer_read_update(rb, index + outsize);
port->queued_bytes -= outsize;
if (outsize == insize || next == 0) {
if (port->queued_bytes == 0) {
spa_log_trace(this->log, NAME " %p: return buffer %d on port %p %zd",
this, b->outbuf->id, port, outsize);
port->io->buffer_id = b->outbuf->id;
spa_list_remove(&b->link);
b->outstanding = true;
port->queued_bytes = 0;
} else {
spa_log_trace(this->log, NAME " %p: keeping buffer %d on port %p %zd %zd",
this, b->outbuf->id, port, port->queued_bytes, outsize);
@ -733,9 +732,7 @@ static int mix_output(struct impl *this, size_t n_bytes)
struct port *outport;
struct spa_port_io *outio;
struct spa_data *od;
int32_t filled, avail, maxsize;
uint32_t index = 0, len1, len2, offset;
struct spa_ringbuffer *rb;
uint32_t avail, index, maxsize, len1, len2, offset;
outport = GET_OUT_PORT(this, 0);
outio = outport->io;
@ -752,10 +749,8 @@ static int mix_output(struct impl *this, size_t n_bytes)
od = outbuf->outbuf->datas;
maxsize = od[0].maxsize;
rb = &od[0].chunk->area;
filled = spa_ringbuffer_get_write_index(rb, &index);
avail = maxsize - filled;
avail = maxsize;
index = 0;
n_bytes = SPA_MIN(n_bytes, avail);
offset = index % maxsize;
@ -782,7 +777,9 @@ static int mix_output(struct impl *this, size_t n_bytes)
layer++;
}
spa_ringbuffer_write_update(rb, index + n_bytes);
od[0].chunk->offset = index;
od[0].chunk->size = n_bytes;
od[0].chunk->stride = 0;
outio->buffer_id = outbuf->outbuf->id;
outio->status = SPA_STATUS_HAVE_BUFFER;
@ -819,8 +816,7 @@ static int impl_node_process_input(struct spa_node *node)
if (inport->queued_bytes == 0 &&
inio->status == SPA_STATUS_HAVE_BUFFER && inio->buffer_id < inport->n_buffers) {
struct buffer *b = &inport->buffers[inio->buffer_id];
uint32_t index;
struct spa_ringbuffer *rb;
struct spa_data *d = b->outbuf->datas;
if (!b->outstanding) {
spa_log_warn(this->log, NAME " %p: buffer %u in use", this,
@ -835,8 +831,7 @@ static int impl_node_process_input(struct spa_node *node)
spa_list_append(&inport->queue, &b->link);
rb = &b->outbuf->datas[0].chunk->area;
inport->queued_bytes += spa_ringbuffer_get_read_index(rb, &index);
inport->queued_bytes = SPA_MIN(d[0].chunk->size, d[0].maxsize);
spa_log_trace(this->log, NAME " %p: queue buffer %d on port %d %zd %zd",
this, b->outbuf->id, i, inport->queued_bytes, min_queued);

View file

@ -299,7 +299,6 @@ static int make_buffer(struct impl *this)
int n_bytes, n_samples;
uint32_t maxsize;
void *data;
struct spa_ringbuffer *rb;
struct spa_data *d;
int32_t filled, avail;
uint32_t index, offset, l0, l1;
@ -329,9 +328,8 @@ static int make_buffer(struct impl *this)
spa_log_trace(this->log, NAME " %p: dequeue buffer %d %d %d", this, b->outbuf->id,
maxsize, n_bytes);
rb = &d[0].chunk->area;
filled = spa_ringbuffer_get_write_index(rb, &index);
filled = 0;
index = 0;
avail = maxsize - filled;
n_bytes = SPA_MIN(avail, n_bytes);
@ -346,7 +344,9 @@ static int make_buffer(struct impl *this)
if (l1 > 0)
this->render_func(this, data, l1);
spa_ringbuffer_write_update(rb, index + n_bytes);
d[0].chunk->offset = index;
d[0].chunk->size = n_bytes;
d[0].chunk->stride = this->bpf;
if (b->h) {
b->h->seq = this->sample_count;

View file

@ -267,7 +267,8 @@ static int consume_buffer(struct impl *this)
render_buffer(this, b);
spa_ringbuffer_set_avail(&b->outbuf->datas[0].chunk->area, n_bytes);
b->outbuf->datas[0].chunk->offset = 0;
b->outbuf->datas[0].chunk->size = n_bytes;
b->outbuf->datas[0].chunk->stride = n_bytes;
if (b->h) {

View file

@ -279,7 +279,8 @@ static int make_buffer(struct impl *this)
fill_buffer(this, b);
spa_ringbuffer_set_avail(&b->outbuf->datas[0].chunk->area, n_bytes);
b->outbuf->datas[0].chunk->offset = 0;
b->outbuf->datas[0].chunk->size = n_bytes;
b->outbuf->datas[0].chunk->stride = n_bytes;
if (b->h) {

View file

@ -930,7 +930,8 @@ static int mmap_read(struct impl *this)
}
d = b->outbuf->datas;
spa_ringbuffer_set_avail(&d[0].chunk->area, buf.bytesused);
d[0].chunk->offset = 0;
d[0].chunk->size = buf.bytesused;
d[0].chunk->stride = port->fmt.fmt.pix.bytesperline;
b->outstanding = true;
@ -1090,7 +1091,8 @@ mmap_init(struct impl *this,
d = buffers[i]->datas;
d[0].mapoffset = 0;
d[0].maxsize = b->v4l2_buffer.length;
spa_ringbuffer_set_avail(&d[0].chunk->area, 0);
d[0].chunk->offset = 0;
d[0].chunk->size = 0;
d[0].chunk->stride = state->fmt.fmt.pix.bytesperline;
if (state->export_buf) {

View file

@ -294,7 +294,8 @@ static int make_buffer(struct impl *this)
fill_buffer(this, b);
spa_ringbuffer_set_avail(&b->outbuf->datas[0].chunk->area, n_bytes);
b->outbuf->datas[0].chunk->offset = 0;
b->outbuf->datas[0].chunk->size = n_bytes;
b->outbuf->datas[0].chunk->stride = this->stride;
if (b->h) {

View file

@ -690,7 +690,7 @@ static void do_volume(struct impl *this, struct spa_buffer *dbuf, struct spa_buf
struct spa_data *sd, *dd;
int16_t *src, *dst;
double volume;
uint32_t towrite, savail, davail;
uint32_t written, towrite, savail, davail;
uint32_t sindex, dindex;
volume = this->props.volume;
@ -698,13 +698,16 @@ static void do_volume(struct impl *this, struct spa_buffer *dbuf, struct spa_buf
sd = sbuf->datas;
dd = dbuf->datas;
savail = spa_ringbuffer_get_read_index(&sd[0].chunk->area, &sindex);
davail = spa_ringbuffer_get_write_index(&dd[0].chunk->area, &dindex);
savail = SPA_MIN(sd[0].chunk->size, sd[0].maxsize);
sindex = sd[0].chunk->offset;
davail = 0;
dindex = 0;
davail = dd[0].maxsize - davail;
towrite = SPA_MIN(savail, davail);
written = 0;
while (towrite > 0) {
while (written < towrite) {
uint32_t soffset = sindex % sd[0].maxsize;
uint32_t doffset = dindex % dd[0].maxsize;
@ -720,10 +723,11 @@ static void do_volume(struct impl *this, struct spa_buffer *dbuf, struct spa_buf
sindex += n_bytes;
dindex += n_bytes;
towrite -= n_bytes;
written += n_bytes;
}
spa_ringbuffer_read_update(&sd[0].chunk->area, sindex);
spa_ringbuffer_write_update(&dd[0].chunk->area, dindex);
dd[0].chunk->offset = 0;
dd[0].chunk->size = written;
dd[0].chunk->stride = 0;
}
static int impl_node_process_input(struct spa_node *node)