pulse: improve timing

Keep track of written bytes and use this to calculate the latency.
This commit is contained in:
Wim Taymans 2020-04-12 20:47:33 +02:00
parent c446dfb1d6
commit b01c264c5c
2 changed files with 42 additions and 37 deletions

View file

@ -357,12 +357,14 @@ struct pa_stream {
char *device_name; char *device_name;
pa_timing_info timing_info; pa_timing_info timing_info;
int64_t ticks_base;
uint32_t direct_on_input; uint32_t direct_on_input;
bool suspended:1; unsigned int suspended:1;
bool corked:1; unsigned int corked:1;
bool timing_info_valid:1; unsigned int timing_info_valid:1;
unsigned int have_time:1;
pa_stream_notify_cb_t state_callback; pa_stream_notify_cb_t state_callback;
void *state_userdata; void *state_userdata;
@ -387,8 +389,6 @@ struct pa_stream {
pa_stream_notify_cb_t buffer_attr_callback; pa_stream_notify_cb_t buffer_attr_callback;
void *buffer_attr_userdata; void *buffer_attr_userdata;
int64_t offset;
struct pw_buffer *dequeued[MAX_BUFFERS]; struct pw_buffer *dequeued[MAX_BUFFERS];
struct spa_ringbuffer dequeued_ring; struct spa_ringbuffer dequeued_ring;
size_t dequeued_size; size_t dequeued_size;
@ -405,7 +405,6 @@ struct pa_stream {
float channel_volumes[SPA_AUDIO_MAX_CHANNELS]; float channel_volumes[SPA_AUDIO_MAX_CHANNELS];
bool mute; bool mute;
pa_operation *drain; pa_operation *drain;
uint64_t queued;
}; };
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); void pa_stream_set_state(pa_stream *s, pa_stream_state_t st);

View file

@ -444,15 +444,12 @@ static void update_timing_info(pa_stream *s)
struct pw_time pwt; struct pw_time pwt;
pa_timing_info *ti = &s->timing_info; pa_timing_info *ti = &s->timing_info;
size_t stride = pa_frame_size(&s->sample_spec); size_t stride = pa_frame_size(&s->sample_spec);
int64_t delay, queued, ticks; int64_t delay, index;
pw_stream_get_time(s->stream, &pwt); pw_stream_get_time(s->stream, &pwt);
s->timing_info_valid = false; s->timing_info_valid = false;
s->queued = pwt.queued;
pw_log_debug("stream %p: %"PRIu64" rate:%d", s, s->queued, pwt.rate.denom);
if (pwt.rate.denom == 0) pw_log_debug("stream %p: %"PRIu64" rate:%d", s, pwt.queued, pwt.rate.denom);
return;
pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC); pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC);
ti->synchronized_clocks = true; ti->synchronized_clocks = true;
@ -461,21 +458,23 @@ static void update_timing_info(pa_stream *s)
ti->write_index_corrupt = false; ti->write_index_corrupt = false;
ti->read_index_corrupt = false; ti->read_index_corrupt = false;
queued = pwt.queued + (pwt.ticks * s->sample_spec.rate / pwt.rate.denom) * stride; if (pwt.rate.denom > 0) {
ticks = ((pwt.ticks + pwt.delay) * s->sample_spec.rate / pwt.rate.denom) * stride; if (s->ticks_base == -1)
s->ticks_base = pwt.ticks + pwt.delay;
index = ((pwt.ticks + pwt.delay - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
s->have_time = true;
} else {
index = delay = 0;
s->have_time = false;
}
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
ti->sink_usec = -delay; ti->sink_usec = -delay;
ti->write_index = queued; ti->read_index = index;
ti->read_index = ticks; } else {
}
else {
ti->source_usec = delay; ti->source_usec = delay;
ti->read_index = queued; ti->write_index = index;
ti->write_index = ticks;
} }
ti->configured_sink_usec = 0; ti->configured_sink_usec = 0;
ti->configured_source_usec = 0; ti->configured_source_usec = 0;
ti->since_underrun = 0; ti->since_underrun = 0;
@ -486,10 +485,9 @@ static void stream_process(void *data)
{ {
pa_stream *s = data; pa_stream *s = data;
update_timing_info(s);
while (dequeue_buffer(s) == 0); while (dequeue_buffer(s) == 0);
pw_log_trace("stream %p: %"PRIu64, s, s->dequeued_size);
if (s->dequeued_size <= 0) if (s->dequeued_size <= 0)
return; return;
@ -569,6 +567,7 @@ static pa_stream* stream_new(pa_context *c, const char *name,
s->direction = PA_STREAM_NODIRECTION; s->direction = PA_STREAM_NODIRECTION;
s->state = PA_STREAM_UNCONNECTED; s->state = PA_STREAM_UNCONNECTED;
s->flags = 0; s->flags = 0;
s->ticks_base = -1;
if (ss) if (ss)
s->sample_spec = *ss; s->sample_spec = *ss;
@ -1124,16 +1123,17 @@ int pa_stream_begin_write(
PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
pw_log_trace("peek buffer %p %zd %d %d", *data, *nbytes, s->buffer_size, s->buffer_offset);
if ((res = peek_buffer(s)) < 0) { if ((res = peek_buffer(s)) < 0) {
*data = NULL; *data = NULL;
*nbytes = 0; *nbytes = 0;
} } else {
else {
size_t max = s->buffer_size - s->buffer_offset; size_t max = s->buffer_size - s->buffer_offset;
*data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void);
*nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max; *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max;
} }
pw_log_trace("peek buffer %p %zd", *data, *nbytes); pw_log_trace("peek buffer %p %zd %p %d", *data, *nbytes, s->buffer, res);
return 0; return 0;
} }
@ -1198,6 +1198,8 @@ int pa_stream_write_ext_free(pa_stream *s,
const void *src = data; const void *src = data;
size_t towrite = nbytes, dsize; size_t towrite = nbytes, dsize;
pw_log_debug("stream %p: write %zd bytes", s, nbytes);
while (towrite > 0) { while (towrite > 0) {
dsize = towrite; dsize = towrite;
@ -1211,7 +1213,7 @@ int pa_stream_write_ext_free(pa_stream *s,
s->buffer_offset += dsize; s->buffer_offset += dsize;
if (s->buffer_offset >= s->buffer_size) { if (s->buffer_offset >= s->buffer_size || towrite == dsize) {
s->buffer->buffer->datas[0].chunk->offset = 0; s->buffer->buffer->datas[0].chunk->offset = 0;
s->buffer->buffer->datas[0].chunk->size = s->buffer_offset; s->buffer->buffer->datas[0].chunk->size = s->buffer_offset;
queue_buffer(s); queue_buffer(s);
@ -1229,8 +1231,8 @@ int pa_stream_write_ext_free(pa_stream *s,
s->buffer->buffer->datas[0].chunk->size = nbytes; s->buffer->buffer->datas[0].chunk->size = nbytes;
queue_buffer(s); queue_buffer(s);
} }
s->timing_info.write_index += nbytes;
update_timing_info(s); pw_log_debug("stream %p: written %zd bytes", s, nbytes);
return 0; return 0;
} }
@ -1334,7 +1336,6 @@ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *use
pw_log_debug("stream %p", s); pw_log_debug("stream %p", s);
pw_stream_flush(s->stream, true); pw_stream_flush(s->stream, true);
update_timing_info(s);
o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack));
d = o->userdata; d = o->userdata;
d->cb = cb; d->cb = cb;
@ -1541,6 +1542,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
pw_log_debug("stream %p: cork %d->%d", s, s->corked, b);
s->corked = b; s->corked = b;
if (!b) if (!b)
pw_stream_set_active(s->stream, true); pw_stream_set_active(s->stream, true);
@ -1566,11 +1568,11 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
pw_stream_flush(s->stream, false); pw_stream_flush(s->stream, false);
update_timing_info(s);
o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack));
d = o->userdata; d = o->userdata;
d->cb = cb; d->cb = cb;
d->userdata = userdata; d->userdata = userdata;
s->timing_info.write_index = s->timing_info.read_index;
pa_operation_sync(o); pa_operation_sync(o);
return o; return o;
@ -1663,13 +1665,17 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); update_timing_info(s);
clock_gettime(CLOCK_MONOTONIC, &ts); clock_gettime(CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_USEC(&ts); now = SPA_TIMESPEC_TO_USEC(&ts);
i = &s->timing_info; i = &s->timing_info;
if (s->have_time)
delay = now - SPA_TIMEVAL_TO_USEC(&i->timestamp); delay = now - SPA_TIMEVAL_TO_USEC(&i->timestamp);
else
delay = 0;
read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec); read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec);
res = delay + read_time; res = delay + read_time;
@ -1677,7 +1683,8 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
if (r_usec) if (r_usec)
*r_usec = res; *r_usec = res;
pw_log_trace("stream %p: %"PRIu64" %"PRIu64" %"PRIu64" %"PRIi64" %"PRIi64" %"PRIi64" %"PRIu64, pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" read_time:%"PRIu64
" write-index:%"PRIi64" read_index:%"PRIi64" diff:%"PRIi64" res:%"PRIu64,
s, now, delay, read_time, s, now, delay, read_time,
i->write_index, i->read_index, i->write_index, i->read_index,
i->write_index - i->read_index, i->write_index - i->read_index,
@ -1716,7 +1723,6 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative)
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
pa_stream_get_time(s, &t); pa_stream_get_time(s, &t);
@ -1746,7 +1752,7 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s)
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA); update_timing_info(s);
pw_log_trace("stream %p: %"PRIi64" %"PRIi64" %"PRIi64, s, pw_log_trace("stream %p: %"PRIi64" %"PRIi64" %"PRIi64, s,
s->timing_info.write_index, s->timing_info.read_index, s->timing_info.write_index, s->timing_info.read_index,