stream: improve timing

Dequeue the input buffers so that we can calculate the available
bytes to read.
This commit is contained in:
Wim Taymans 2020-04-14 16:50:29 +02:00
parent 1d0ce27005
commit 30fac2b242
2 changed files with 111 additions and 88 deletions

View file

@ -432,7 +432,7 @@ static void update_timing_info(pa_stream *s)
struct pw_time pwt;
pa_timing_info *ti = &s->timing_info;
size_t stride = pa_frame_size(&s->sample_spec);
int64_t delay, index;
int64_t delay, pos;
pw_stream_get_time(s->stream, &pwt);
s->timing_info_valid = false;
@ -448,32 +448,35 @@ static void update_timing_info(pa_stream *s)
if (s->ticks_base == (uint64_t)-1)
s->ticks_base = pwt.ticks + pwt.delay;
if (pwt.ticks > s->ticks_base)
index = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
else
index = 0;
pos = 0;
delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
s->have_time = true;
} else {
index = delay = 0;
pos = delay = 0;
s->have_time = false;
}
if (s->direction == PA_STREAM_PLAYBACK) {
ti->sink_usec = delay;
ti->configured_sink_usec = delay;
ti->read_index = index;
ti->read_index = pos;
} else {
ti->source_usec = delay;
ti->configured_source_usec = delay;
ti->write_index = index;
ti->write_index = pos;
}
ti->since_underrun = 0;
s->timing_info_valid = true;
pw_log_debug("stream %p: %"PRIu64" rate:%d delay:%"PRIi64 " read:%"PRIu64" write:%"PRIu64,
s, pwt.queued, pwt.rate.denom, delay, ti->read_index, ti->write_index);
pw_log_debug("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64
" write:%"PRIu64" diff:%"PRIi64,
s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay,
ti->read_index, ti->write_index,
ti->read_index - ti->write_index);
}
static void push_output(pa_stream *s)
static void queue_output(pa_stream *s)
{
struct pa_mem *m, *t, *old;
struct pw_buffer *buf;
@ -499,6 +502,47 @@ static void push_output(pa_stream *s)
}
}
struct pa_mem *alloc_mem(pa_stream *s, size_t len)
{
struct pa_mem *m;
if (spa_list_is_empty(&s->free)) {
if (len > s->maxblock)
len = s->maxblock;
m = calloc(1, sizeof(struct pa_mem) + len);
if (m == NULL)
return NULL;
m->data = SPA_MEMBER(m, sizeof(struct pa_mem), void);
m->maxsize = len;
} else {
m = spa_list_first(&s->free, struct pa_mem, link);
spa_list_remove(&m->link);
}
return m;
}
static void pull_input(pa_stream *s)
{
struct pw_buffer *buf;
struct pa_mem *m;
while ((buf = pw_stream_dequeue_buffer(s->stream)) != NULL) {
if ((m = alloc_mem(s, 0)) == NULL) {
pw_log_error("stream %p: Can't alloc mem: %m", s);
pw_stream_queue_buffer(s->stream, buf);
continue;
}
m->data = buf->buffer->datas[0].data;
m->maxsize = buf->buffer->datas[0].maxsize;
m->offset = buf->buffer->datas[0].chunk->offset;
m->size = buf->buffer->datas[0].chunk->size;
m->user_data = buf;
buf->user_data = m;
spa_list_append(&s->ready, &m->link);
s->ready_bytes += m->size;
}
}
static void stream_process(void *data)
{
pa_stream *s = data;
@ -507,8 +551,8 @@ static void stream_process(void *data)
update_timing_info(s);
if (s->direction == PA_STREAM_PLAYBACK) {
s->requested_bytes += s->buffer_attr.tlength;
if (s->ready_bytes < s->buffer_attr.tlength)
s->requested_bytes += s->maxblock;
if (s->ready_bytes < s->maxblock)
s->requested_bytes -= s->ready_bytes;
else
s->requested_bytes = 0;
@ -516,10 +560,13 @@ static void stream_process(void *data)
if (s->write_callback && s->requested_bytes > 0)
s->write_callback(s, s->requested_bytes, s->write_userdata);
push_output(s);
queue_output(s);
}
else {
if (s->read_callback)
pull_input(s);
s->requested_bytes = s->ready_bytes;
if (s->read_callback && s->requested_bytes > 0)
s->read_callback(s, s->requested_bytes, s->read_userdata);
}
}
@ -1091,49 +1138,12 @@ int pa_stream_disconnect(pa_stream *s)
return 0;
}
struct pa_mem *get_mem(pa_stream *s, size_t len)
{
struct pa_mem *m;
if (s->mem != NULL)
return s->mem;
if (spa_list_is_empty(&s->free)) {
if (len > s->maxblock)
len = s->maxblock;
m = calloc(1, sizeof(struct pa_mem) + len);
if (m == NULL)
return NULL;
m->data = SPA_MEMBER(m, sizeof(struct pa_mem), void);
m->maxsize = len;
spa_list_append(&s->free, &m->link);
}
m = spa_list_first(&s->free, struct pa_mem, link);
spa_list_remove(&m->link);
m->offset = 0;
m->size = 0;
return m;
}
int release_mem(pa_stream *s)
{
if (s->mem == NULL)
return -EINVAL;
spa_list_append(&s->ready, &s->mem->link);
s->ready_bytes += s->mem->size;
s->mem = NULL;
push_output(s);
return 0;
}
SPA_EXPORT
int pa_stream_begin_write(
pa_stream *s,
void **data,
size_t *nbytes)
{
size_t max;
spa_assert(s);
spa_assert(s->refcount >= 1);
@ -1143,15 +1153,16 @@ int pa_stream_begin_write(
PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
s->mem = get_mem(s, *nbytes);
if (s->mem == NULL)
s->mem = alloc_mem(s, *nbytes);
if (s->mem == NULL) {
*data = NULL;
*nbytes = 0;
return -errno;
}
max = s->mem->maxsize - s->mem->size;
*data = SPA_MEMBER(s->mem->data, s->mem->offset, void);
*nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max;
s->mem->offset = s->mem->size = 0;
*data = s->mem->data;
*nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, s->mem->maxsize) : s->mem->maxsize;
pw_log_trace("peek buffer %p %zd %p", *data, *nbytes, s->mem);
@ -1199,6 +1210,9 @@ int pa_stream_write_ext_free(pa_stream *s,
int64_t offset,
pa_seek_mode_t seek)
{
const void *src = data;
size_t towrite;
spa_assert(s);
spa_assert(s->refcount >= 1);
spa_assert(data);
@ -1220,38 +1234,33 @@ int pa_stream_write_ext_free(pa_stream *s,
pw_log_trace("stream %p: write %zd bytes", s, nbytes);
if (s->mem == NULL) {
void *dst;
const void *src = data;
size_t towrite = nbytes, dsize;
while (towrite > 0) {
dsize = towrite;
towrite = nbytes;
while (towrite > 0) {
size_t dsize = towrite;
if (s->mem == NULL) {
void *dst;
if (pa_stream_begin_write(s, &dst, &dsize) < 0 ||
dst == NULL || dsize == 0) {
pw_log_error("stream %p: out of buffers, wanted %zd bytes", s, nbytes);
break;
}
memcpy(dst, src, dsize);
s->mem->size += dsize;
if (s->mem->size >= s->mem->maxsize || towrite == dsize)
release_mem(s);
towrite -= dsize;
src = SPA_MEMBER(src, dsize, void);
} else {
s->mem->offset = SPA_PTRDIFF(src, s->mem->data);
}
towrite -= dsize;
s->mem->size = dsize;
if (s->mem->size >= s->mem->maxsize || towrite == 0) {
spa_list_append(&s->ready, &s->mem->link);
s->ready_bytes += s->mem->size;
s->mem = NULL;
queue_output(s);
}
if (free_cb)
free_cb(free_cb_data);
}
else {
s->mem->offset = SPA_PTRDIFF(data, s->mem->data);
s->mem->size = nbytes;
release_mem(s);
}
if (free_cb)
free_cb(free_cb_data);
s->timing_info.write_index += nbytes;
s->requested_bytes = s->requested_bytes > nbytes ? s->requested_bytes - nbytes : 0;
pw_log_trace("stream %p: written %zd bytes", s, nbytes);
@ -1272,18 +1281,16 @@ int pa_stream_peek(pa_stream *s,
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
if (s->buffer == NULL)
s->buffer = pw_stream_dequeue_buffer(s->stream);
if (s->buffer == NULL) {
if (spa_list_is_empty(&s->ready)) {
pw_log_error("stream %p: no buffer: %m", s);
*data = NULL;
*nbytes = 0;
return 0;
}
s->mem = spa_list_first(&s->ready, struct pa_mem, link);
*data = SPA_MEMBER(s->buffer->buffer->datas[0].data,
s->buffer->buffer->datas[0].chunk->offset, void);
*nbytes = s->buffer->buffer->datas[0].chunk->size;
*data = SPA_MEMBER(s->mem->data, s->mem->offset, void);
*nbytes = s->mem->size;
pw_log_trace("stream %p: %p %zd", s, *data, *nbytes);
@ -1293,16 +1300,31 @@ int pa_stream_peek(pa_stream *s,
SPA_EXPORT
int pa_stream_drop(pa_stream *s)
{
size_t nbytes;
struct pw_buffer *buf;
spa_assert(s);
spa_assert(s->refcount >= 1);
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->mem, PA_ERR_BADSTATE);
pw_log_trace("stream %p", s);
pw_stream_queue_buffer(s->stream, s->buffer);
s->buffer = NULL;
nbytes = s->mem->size;
pw_log_trace("stream %p %zd", s, nbytes);
spa_list_remove(&s->mem->link);
s->timing_info.read_index += nbytes;
s->ready_bytes -= nbytes;
s->requested_bytes = s->ready_bytes;
buf = s->mem->user_data;
pw_stream_queue_buffer(s->stream, buf);
buf->user_data = NULL;
spa_list_append(&s->free, &s->mem->link);
s->mem = NULL;
return 0;
}
@ -1722,7 +1744,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
if (r_usec)
*r_usec = res;
pw_log_debug("stream %p: now:%"PRIu64" diff:%"PRIi64
pw_log_trace("stream %p: now:%"PRIu64" diff:%"PRIi64
" write-index:%"PRIi64" read_index:%"PRIi64" res:%"PRIu64,
s, now, now - res, i->write_index, i->read_index, res);