From bf95887a021c446502d34817c7c97a6c1641f5bf Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 15 Apr 2020 16:19:42 +0200 Subject: [PATCH] pulse: improve timing Make writable size more accurate by using the clock. mplayer uses this to check if the clock is advancing. Remove requested_bytes, we can use ready_bytes for the ready bytes and use the queued bytes for playback. Reset after a flush, wait for a new timing update. --- pipewire-pulseaudio/src/internal.h | 1 - pipewire-pulseaudio/src/stream.c | 73 ++++++++++++++++++------------ 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index 7df4b7846..4c9bf343a 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -400,7 +400,6 @@ struct pa_stream { size_t maxsize; size_t maxblock; - size_t requested_bytes; struct pa_mem *mem; /* current mem for playback */ struct spa_list free; /* free to fill */ diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 0387de131..eef256510 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -445,7 +445,7 @@ static void update_timing_info(pa_stream *s) ti->read_index_corrupt = false; if (pwt.rate.denom > 0) { - if (s->ticks_base == (uint64_t)-1) + if (!s->have_time) s->ticks_base = pwt.ticks + pwt.delay; if (pwt.ticks > s->ticks_base) pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; @@ -470,10 +470,9 @@ static void update_timing_info(pa_stream *s) s->timing_info_valid = true; pw_log_debug("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64 - " write:%"PRIu64" diff:%"PRIi64, + " write:%"PRIu64" queued:%"PRIi64, s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay, - ti->read_index, ti->write_index, - ti->read_index - ti->write_index); + ti->read_index, ti->write_index, ti->read_index - ti->write_index); } static void queue_output(pa_stream *s) @@ -551,23 +550,21 @@ static void stream_process(void *data) update_timing_info(s); if (s->direction == PA_STREAM_PLAYBACK) { - s->requested_bytes += s->maxblock; - if (s->ready_bytes < s->maxblock) - s->requested_bytes -= s->ready_bytes; - else - s->requested_bytes = 0; - - if (s->write_callback && s->requested_bytes > 0) - s->write_callback(s, s->requested_bytes, s->write_userdata); + int64_t queued, requested; queue_output(s); + + queued = s->timing_info.write_index - s->timing_info.read_index; + requested = s->maxsize - SPA_MIN(queued, (int64_t)s->maxsize); + + if (s->write_callback && requested > 0) + s->write_callback(s, requested, s->write_userdata); } else { pull_input(s); - s->requested_bytes = s->ready_bytes; - if (s->read_callback && s->requested_bytes > 0) - s->read_callback(s, s->requested_bytes, s->read_userdata); + if (s->read_callback && s->ready_bytes > 0) + s->read_callback(s, s->ready_bytes, s->read_userdata); } } @@ -638,7 +635,7 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->direction = PA_STREAM_NODIRECTION; s->state = PA_STREAM_UNCONNECTED; s->flags = 0; - s->ticks_base = -1; + s->have_time = false; if (ss) s->sample_spec = *ss; @@ -681,7 +678,6 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->buffer_attr.prebuf = (uint32_t) -1; s->buffer_attr.fragsize = (uint32_t) -1; s->maxblock = INT_MAX; - s->requested_bytes = 0; s->device_index = PA_INVALID_INDEX; s->device_name = NULL; @@ -1164,7 +1160,7 @@ int pa_stream_begin_write( *data = s->mem->data; *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, s->mem->maxsize) : s->mem->maxsize; - pw_log_trace("peek buffer %p %zd %p", *data, *nbytes, s->mem); + pw_log_trace("buffer %p %zd %p", *data, *nbytes, s->mem); return 0; } @@ -1262,8 +1258,7 @@ int pa_stream_write_ext_free(pa_stream *s, free_cb(free_cb_data); s->timing_info.write_index += nbytes; - s->requested_bytes = s->requested_bytes > nbytes ? s->requested_bytes - nbytes : 0; - pw_log_trace("stream %p: written %zd bytes", s, nbytes); + pw_log_debug("stream %p: written %zd bytes", s, nbytes); return 0; } @@ -1314,10 +1309,9 @@ int pa_stream_drop(pa_stream *s) pw_log_trace("stream %p %zd", s, nbytes); spa_list_remove(&s->mem->link); + s->ready_bytes -= nbytes; s->timing_info.read_index += nbytes; - s->ready_bytes -= nbytes; - s->requested_bytes = s->ready_bytes; buf = s->mem->user_data; pw_stream_queue_buffer(s->stream, buf); @@ -1332,6 +1326,10 @@ int pa_stream_drop(pa_stream *s) SPA_EXPORT size_t pa_stream_writable_size(PA_CONST pa_stream *s) { + const pa_timing_info *i; + uint64_t now, queued, writable, elapsed, min; + struct timespec ts; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -1340,8 +1338,25 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s) PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - pw_log_trace("stream %p: %zd", s, s->requested_bytes); - return s->requested_bytes; + if (!s->have_time) + return 0; + + i = &s->timing_info; + + min = SPA_MIN(i->read_index, i->write_index); + if (s->direction == PA_STREAM_PLAYBACK) + queued = i->write_index - min; + else + queued = i->read_index - min; + + clock_gettime(CLOCK_MONOTONIC, &ts); + now = SPA_TIMESPEC_TO_USEC(&ts); + elapsed = pa_usec_to_bytes(now - SPA_TIMEVAL_TO_USEC(&i->timestamp), &s->sample_spec); + queued -= SPA_MIN(queued, elapsed); + + writable = s->maxblock - SPA_MIN(queued, s->maxblock); + pw_log_debug("stream %p: %lu", s, writable); + return writable; } SPA_EXPORT @@ -1355,8 +1370,8 @@ size_t pa_stream_readable_size(PA_CONST pa_stream *s) PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - pw_log_trace("stream %p: %zd", s, s->requested_bytes); - return s->requested_bytes; + pw_log_trace("stream %p: %zd", s, s->ready_bytes); + return s->ready_bytes; } struct success_ack { @@ -1619,6 +1634,7 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + pw_log_debug("stream %p:", s); pw_stream_flush(s->stream, false); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); d = o->userdata; @@ -1630,7 +1646,8 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use spa_list_append(&s->free, &m->link); } s->ready_bytes = 0; - s->timing_info.write_index = s->timing_info.read_index; + s->timing_info.write_index = s->timing_info.read_index = 0; + s->have_time = false; pa_operation_sync(o); return o; @@ -1744,7 +1761,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) if (r_usec) *r_usec = res; - pw_log_trace("stream %p: now:%"PRIu64" diff:%"PRIi64 + pw_log_debug("stream %p: now:%"PRIu64" diff:%"PRIi64 " write-index:%"PRIi64" read_index:%"PRIi64" res:%"PRIu64, s, now, now - res, i->write_index, i->read_index, res);