stream: improve timing

Use the new stream time information to get more accurate read and
write pointers.
This commit is contained in:
Wim Taymans 2018-07-19 13:35:46 +02:00
parent ffde111099
commit 6eb6e31beb

View file

@ -32,16 +32,7 @@
#define MIN_QUEUED 1
struct pending_data {
struct spa_list link;
const void *data;
size_t nbytes;
size_t offset;
pa_free_cb_t free_cb;
void *free_cb_data;
};
#define MAX_SIZE (4*1024*1024)
static const uint32_t audio_formats[] = {
[PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8),
@ -76,7 +67,7 @@ static inline pa_sample_format_t format_id2pa(pa_stream *s, uint32_t id)
return PA_SAMPLE_INVALID;
}
static int dequeue_buffer(pa_stream *s)
static inline int dequeue_buffer(pa_stream *s)
{
struct pw_buffer *buf;
uint32_t index;
@ -340,13 +331,46 @@ static void stream_remove_buffer(void *data, struct pw_buffer *buffer)
s->maxsize -= buffer->buffer->datas[0].maxsize;
}
static void update_timing_info(pa_stream *s)
{
struct pw_time pwt;
pa_timing_info *ti = &s->timing_info;
size_t stride = pa_frame_size(&s->sample_spec);
pw_stream_get_time(s->stream, &pwt);
s->timing_info_valid = false;
if (pwt.rate.num == 0)
return;
pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC);
ti->synchronized_clocks = true;
if (s->direction == PA_STREAM_PLAYBACK)
ti->sink_usec = (-pwt.delay * SPA_USEC_PER_SEC / pwt.rate.num);
else
ti->source_usec = (pwt.delay * SPA_USEC_PER_SEC / pwt.rate.num);
ti->transport_usec = 0;
ti->playing = 1;
ti->write_index_corrupt = false;
ti->write_index = pwt.queued + (pwt.ticks * s->sample_spec.rate / pwt.rate.num) * stride;
ti->read_index_corrupt = false;
ti->read_index = ((pwt.ticks + pwt.delay) * s->sample_spec.rate / pwt.rate.num) * stride;
ti->configured_sink_usec = 0;
ti->configured_source_usec = 0;
ti->since_underrun = 0;
s->timing_info_valid = true;
}
static void stream_process(void *data)
{
pa_stream *s = data;
s->timing_info_valid = true;
update_timing_info(s);
if (dequeue_buffer(s) < 0 && s->dequeued_size <= 0)
while (dequeue_buffer(s) == 0);
if (s->dequeued_size <= 0)
return;
if (s->direction == PA_STREAM_PLAYBACK) {
@ -826,6 +850,9 @@ int queue_buffer(pa_stream *s)
s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size;
spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1);
s->buffer->size = s->buffer->buffer->datas[0].chunk->size;
pw_log_debug("%d %"PRIu64, s->buffer->buffer->id, s->buffer->size);
pw_stream_queue_buffer(s->stream, s->buffer);
s->buffer = NULL;
return 0;
@ -848,13 +875,15 @@ int pa_stream_begin_write(
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
if ((res = peek_buffer(s)) < 0) {
pw_log_warn("stream %p: no buffer", s);
*data = NULL;
*nbytes = 0;
return 0;
}
else {
size_t max = s->buffer_size - s->buffer_offset;
*data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void);
*nbytes = s->buffer_size - s->buffer_offset;
*nbytes = *nbytes != -1 ? SPA_MIN(*nbytes, max) : max;
}
pw_log_debug("peek buffer %p %zd", *data, *nbytes);
return 0;
}
@ -868,46 +897,12 @@ int pa_stream_cancel_write(pa_stream *s)
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK ||
s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
pw_log_debug("cancel %p %p %d", s->buffer, s->buffer_data, s->buffer_size);
s->buffer = NULL;
return 0;
}
static void flush_pending(pa_stream *s)
{
struct pending_data *p;
void *data;
size_t nbytes;
bool flush;
while(!spa_list_is_empty(&s->pending)) {
p = spa_list_first(&s->pending, struct pending_data, link);
pa_stream_begin_write(s, &data, &nbytes);
if (data == NULL || nbytes == 0)
break;
nbytes = SPA_MIN(nbytes, p->nbytes - p->offset);
memcpy(data, p->data + p->offset, nbytes);
p->offset += nbytes;
s->buffer_offset += nbytes;
flush = p->offset >= p->nbytes;
if (flush) {
spa_list_remove(&p->link);
if (p->free_cb)
p->free_cb(p->free_cb_data);
pa_xfree(p);
}
if (flush || s->buffer_offset >= s->buffer_size) {
s->buffer->buffer->datas[0].chunk->size = s->buffer_offset;
queue_buffer(s);
}
}
}
int pa_stream_write(pa_stream *s,
const void *data,
size_t nbytes,
@ -946,17 +941,30 @@ int pa_stream_write_ext_free(pa_stream *s,
PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID);
if (s->buffer == NULL) {
struct pending_data *p;
void *dst;
const void *src = data;
size_t towrite = nbytes, dsize;
p = pa_xmalloc(sizeof(struct pending_data));
p->data = data;
p->nbytes = nbytes;
p->offset = 0;
p->free_cb = free_cb;
p->free_cb_data = free_cb_data;
spa_list_append(&s->pending, &p->link);
while (towrite > 0) {
dsize = towrite;
flush_pending(s);
if (pa_stream_begin_write(s, &dst, &dsize) < 0 ||
dst == NULL || dsize == 0) {
pw_log_debug("out of buffers");
break;
}
memcpy(dst, src, dsize);
s->buffer->buffer->datas[0].chunk->offset = 0;
s->buffer->buffer->datas[0].chunk->size = dsize;
queue_buffer(s);
towrite -= dsize;
src += dsize;
}
if (free_cb)
free_cb(free_cb_data);
}
else {
s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data;
@ -964,17 +972,8 @@ int pa_stream_write_ext_free(pa_stream *s,
queue_buffer(s);
}
/* Update the write index in the already available latency data */
if (s->timing_info_valid) {
if (seek == PA_SEEK_ABSOLUTE) {
s->timing_info.write_index_corrupt = false;
s->timing_info.write_index = offset + (int64_t) nbytes;
} else if (seek == PA_SEEK_RELATIVE) {
if (!s->timing_info.write_index_corrupt)
s->timing_info.write_index += offset + (int64_t) nbytes;
} else
s->timing_info.write_index_corrupt = true;
}
update_timing_info(s);
return 0;
}
@ -1081,8 +1080,9 @@ static void on_timing_success(pa_operation *o, void *userdata)
{
struct success_ack *d = userdata;
pa_stream *s = o->stream;
update_timing_info(s);
pa_operation_done(o);
s->timing_info_valid = true;
if (d->cb)
d->cb(s, s->timing_info_valid, d->userdata);
@ -1352,10 +1352,10 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe
int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
{
struct pw_time t;
pa_usec_t res;
struct timespec ts;
uint64_t now, delay;
uint64_t now, delay, read_time;
pa_timing_info *i;
spa_assert(s);
spa_assert(s->refcount >= 1);
@ -1364,28 +1364,49 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
pw_stream_get_time(s->stream, &t);
clock_gettime(CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_TIME(&ts);
delay = (now - t.now) / PA_NSEC_PER_USEC;
if (t.rate.num != 0)
res = delay + ((t.ticks * t.rate.denom * PA_USEC_PER_SEC) / t.rate.num);
else
res = 0;
i = &s->timing_info;
delay = (now - SPA_TIMEVAL_TO_TIME(&i->timestamp)) / SPA_NSEC_PER_USEC;
read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec);
res = delay + read_time;
if (r_usec)
*r_usec = res;
pw_log_debug("stream %p: %ld %ld %ld %ld %d/%d %ld",
s, now, t.now, delay, t.ticks, t.rate.num, t.rate.denom, res);
pw_log_debug("stream %p: %ld %ld %ld %ld %ld %ld %ld", s, now, delay, read_time,
i->write_index, i->read_index,
i->write_index - i->read_index,
res);
return 0;
}
static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
pa_assert(s);
pa_assert(s->refcount >= 1);
if (negative)
*negative = 0;
if (a >= b)
return a-b;
else {
if (negative && s->direction == PA_STREAM_RECORD) {
*negative = 1;
return b-a;
} else
return 0;
}
}
int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative)
{
pa_usec_t t, c;
int64_t cindex;
spa_assert(s);
spa_assert(s->refcount >= 1);
spa_assert(r_usec);
@ -1394,11 +1415,22 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative)
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
pw_log_warn("Not Implemented");
if (r_usec)
*r_usec = 0;
if (negative)
*negative = 0;
pa_stream_get_time(s, &t);
if (s->direction == PA_STREAM_PLAYBACK)
cindex = s->timing_info.write_index;
else
cindex = s->timing_info.read_index;
if (cindex < 0)
cindex = 0;
c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
if (s->direction == PA_STREAM_PLAYBACK)
*r_usec = time_counter_diff(s, c, t, negative);
else
*r_usec = time_counter_diff(s, t, c, negative);
return 0;
}
@ -1412,7 +1444,9 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s)
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
pw_log_warn("Not Implemented");
pw_log_debug("stream %p: %ld %ld %ld", s,
s->timing_info.write_index, s->timing_info.read_index,
(s->timing_info.write_index - s->timing_info.read_index));
return &s->timing_info;
}