From 6eb6e31bebcbb327256a96342e405c654afda2f6 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 19 Jul 2018 13:35:46 +0200 Subject: [PATCH] stream: improve timing Use the new stream time information to get more accurate read and write pointers. --- src/stream.c | 214 +++++++++++++++++++++++++++++---------------------- 1 file changed, 124 insertions(+), 90 deletions(-) diff --git a/src/stream.c b/src/stream.c index 0692f8ca5..25d0f79a7 100644 --- a/src/stream.c +++ b/src/stream.c @@ -32,16 +32,7 @@ #define MIN_QUEUED 1 -struct pending_data { - struct spa_list link; - - const void *data; - size_t nbytes; - size_t offset; - - pa_free_cb_t free_cb; - void *free_cb_data; -}; +#define MAX_SIZE (4*1024*1024) static const uint32_t audio_formats[] = { [PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8), @@ -76,7 +67,7 @@ static inline pa_sample_format_t format_id2pa(pa_stream *s, uint32_t id) return PA_SAMPLE_INVALID; } -static int dequeue_buffer(pa_stream *s) +static inline int dequeue_buffer(pa_stream *s) { struct pw_buffer *buf; uint32_t index; @@ -340,13 +331,46 @@ static void stream_remove_buffer(void *data, struct pw_buffer *buffer) s->maxsize -= buffer->buffer->datas[0].maxsize; } +static void update_timing_info(pa_stream *s) +{ + struct pw_time pwt; + pa_timing_info *ti = &s->timing_info; + size_t stride = pa_frame_size(&s->sample_spec); + + pw_stream_get_time(s->stream, &pwt); + s->timing_info_valid = false; + + if (pwt.rate.num == 0) + return; + + pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC); + ti->synchronized_clocks = true; + if (s->direction == PA_STREAM_PLAYBACK) + ti->sink_usec = (-pwt.delay * SPA_USEC_PER_SEC / pwt.rate.num); + else + ti->source_usec = (pwt.delay * SPA_USEC_PER_SEC / pwt.rate.num); + ti->transport_usec = 0; + ti->playing = 1; + ti->write_index_corrupt = false; + ti->write_index = pwt.queued + (pwt.ticks * s->sample_spec.rate / pwt.rate.num) * stride; + ti->read_index_corrupt = false; + ti->read_index = ((pwt.ticks + pwt.delay) * s->sample_spec.rate / pwt.rate.num) * stride; + + ti->configured_sink_usec = 0; + ti->configured_source_usec = 0; + ti->since_underrun = 0; + s->timing_info_valid = true; +} + static void stream_process(void *data) { pa_stream *s = data; - s->timing_info_valid = true; + update_timing_info(s); - if (dequeue_buffer(s) < 0 && s->dequeued_size <= 0) + while (dequeue_buffer(s) == 0); + + if (s->dequeued_size <= 0) return; if (s->direction == PA_STREAM_PLAYBACK) { @@ -826,6 +850,9 @@ int queue_buffer(pa_stream *s) s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size; spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); + s->buffer->size = s->buffer->buffer->datas[0].chunk->size; + pw_log_debug("%d %"PRIu64, s->buffer->buffer->id, s->buffer->size); + pw_stream_queue_buffer(s->stream, s->buffer); s->buffer = NULL; return 0; @@ -848,13 +875,15 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); if ((res = peek_buffer(s)) < 0) { - pw_log_warn("stream %p: no buffer", s); *data = NULL; *nbytes = 0; - return 0; } - *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); - *nbytes = s->buffer_size - s->buffer_offset; + else { + size_t max = s->buffer_size - s->buffer_offset; + *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); + *nbytes = *nbytes != -1 ? SPA_MIN(*nbytes, max) : max; + } + pw_log_debug("peek buffer %p %zd", *data, *nbytes); return 0; } @@ -868,46 +897,12 @@ int pa_stream_cancel_write(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + pw_log_debug("cancel %p %p %d", s->buffer, s->buffer_data, s->buffer_size); s->buffer = NULL; return 0; } -static void flush_pending(pa_stream *s) -{ - struct pending_data *p; - void *data; - size_t nbytes; - bool flush; - - while(!spa_list_is_empty(&s->pending)) { - p = spa_list_first(&s->pending, struct pending_data, link); - - pa_stream_begin_write(s, &data, &nbytes); - if (data == NULL || nbytes == 0) - break; - - nbytes = SPA_MIN(nbytes, p->nbytes - p->offset); - memcpy(data, p->data + p->offset, nbytes); - - p->offset += nbytes; - s->buffer_offset += nbytes; - - flush = p->offset >= p->nbytes; - - if (flush) { - spa_list_remove(&p->link); - if (p->free_cb) - p->free_cb(p->free_cb_data); - pa_xfree(p); - } - if (flush || s->buffer_offset >= s->buffer_size) { - s->buffer->buffer->datas[0].chunk->size = s->buffer_offset; - queue_buffer(s); - } - } -} - int pa_stream_write(pa_stream *s, const void *data, size_t nbytes, @@ -946,17 +941,30 @@ int pa_stream_write_ext_free(pa_stream *s, PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID); if (s->buffer == NULL) { - struct pending_data *p; + void *dst; + const void *src = data; + size_t towrite = nbytes, dsize; - p = pa_xmalloc(sizeof(struct pending_data)); - p->data = data; - p->nbytes = nbytes; - p->offset = 0; - p->free_cb = free_cb; - p->free_cb_data = free_cb_data; - spa_list_append(&s->pending, &p->link); + while (towrite > 0) { + dsize = towrite; - flush_pending(s); + if (pa_stream_begin_write(s, &dst, &dsize) < 0 || + dst == NULL || dsize == 0) { + pw_log_debug("out of buffers"); + break; + } + + memcpy(dst, src, dsize); + + s->buffer->buffer->datas[0].chunk->offset = 0; + s->buffer->buffer->datas[0].chunk->size = dsize; + queue_buffer(s); + + towrite -= dsize; + src += dsize; + } + if (free_cb) + free_cb(free_cb_data); } else { s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data; @@ -964,17 +972,8 @@ int pa_stream_write_ext_free(pa_stream *s, queue_buffer(s); } - /* Update the write index in the already available latency data */ - if (s->timing_info_valid) { - if (seek == PA_SEEK_ABSOLUTE) { - s->timing_info.write_index_corrupt = false; - s->timing_info.write_index = offset + (int64_t) nbytes; - } else if (seek == PA_SEEK_RELATIVE) { - if (!s->timing_info.write_index_corrupt) - s->timing_info.write_index += offset + (int64_t) nbytes; - } else - s->timing_info.write_index_corrupt = true; - } + update_timing_info(s); + return 0; } @@ -1081,8 +1080,9 @@ static void on_timing_success(pa_operation *o, void *userdata) { struct success_ack *d = userdata; pa_stream *s = o->stream; + + update_timing_info(s); pa_operation_done(o); - s->timing_info_valid = true; if (d->cb) d->cb(s, s->timing_info_valid, d->userdata); @@ -1352,10 +1352,10 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { - struct pw_time t; pa_usec_t res; struct timespec ts; - uint64_t now, delay; + uint64_t now, delay, read_time; + pa_timing_info *i; spa_assert(s); spa_assert(s->refcount >= 1); @@ -1364,28 +1364,49 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); - pw_stream_get_time(s->stream, &t); - clock_gettime(CLOCK_MONOTONIC, &ts); now = SPA_TIMESPEC_TO_TIME(&ts); - delay = (now - t.now) / PA_NSEC_PER_USEC; - if (t.rate.num != 0) - res = delay + ((t.ticks * t.rate.denom * PA_USEC_PER_SEC) / t.rate.num); - else - res = 0; + i = &s->timing_info; + delay = (now - SPA_TIMEVAL_TO_TIME(&i->timestamp)) / SPA_NSEC_PER_USEC; + read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec); + + res = delay + read_time; if (r_usec) *r_usec = res; - pw_log_debug("stream %p: %ld %ld %ld %ld %d/%d %ld", - s, now, t.now, delay, t.ticks, t.rate.num, t.rate.denom, res); + pw_log_debug("stream %p: %ld %ld %ld %ld %ld %ld %ld", s, now, delay, read_time, + i->write_index, i->read_index, + i->write_index - i->read_index, + res); return 0; } +static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) { + pa_assert(s); + pa_assert(s->refcount >= 1); + + if (negative) + *negative = 0; + + if (a >= b) + return a-b; + else { + if (negative && s->direction == PA_STREAM_RECORD) { + *negative = 1; + return b-a; + } else + return 0; + } +} + int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) { + pa_usec_t t, c; + int64_t cindex; + spa_assert(s); spa_assert(s->refcount >= 1); spa_assert(r_usec); @@ -1394,11 +1415,22 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); - pw_log_warn("Not Implemented"); - if (r_usec) - *r_usec = 0; - if (negative) - *negative = 0; + pa_stream_get_time(s, &t); + + if (s->direction == PA_STREAM_PLAYBACK) + cindex = s->timing_info.write_index; + else + cindex = s->timing_info.read_index; + + if (cindex < 0) + cindex = 0; + + c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec); + + if (s->direction == PA_STREAM_PLAYBACK) + *r_usec = time_counter_diff(s, c, t, negative); + else + *r_usec = time_counter_diff(s, t, c, negative); return 0; } @@ -1412,7 +1444,9 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA); - pw_log_warn("Not Implemented"); + pw_log_debug("stream %p: %ld %ld %ld", s, + s->timing_info.write_index, s->timing_info.read_index, + (s->timing_info.write_index - s->timing_info.read_index)); return &s->timing_info; }