From abf268d168f0175a011b51419c02c542b08c001b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 24 Sep 2020 17:28:30 +0200 Subject: [PATCH] stream: tweak the timings and buffer attributes Use tlength as the latency. Round attributes to frame size Only use the delay in get_time(). Tweak queued, writable and required bytes for callbacks. --- pipewire-pulseaudio/src/stream.c | 86 ++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 31 deletions(-) diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 137782230..c658e87c3 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -144,8 +144,8 @@ static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *att blocks = 1; stride = pa_frame_size(&s->sample_spec); - maxsize = attr->tlength; - size = attr->minreq; + maxsize = attr->tlength * MAX_BUFFERS; + size = attr->tlength; buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS); pw_log_debug("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize, @@ -219,21 +219,25 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag if (attr->tlength == (uint32_t) -1 || attr->tlength == 0) attr->tlength = (uint32_t) pa_usec_to_bytes(2*PA_USEC_PER_SEC, &s->sample_spec); attr->tlength = SPA_MIN(attr->tlength, attr->maxlength); + attr->tlength -= attr->tlength % stride; attr->tlength = SPA_MAX(attr->tlength, MIN_SAMPLES * stride * MIN_BUFFERS); if (attr->minreq == (uint32_t) -1 || attr->minreq == 0) attr->minreq = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); attr->minreq = SPA_MIN(attr->minreq, attr->tlength / MIN_BUFFERS); + attr->minreq -= attr->minreq % stride; attr->minreq = SPA_MAX(attr->minreq, stride); if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) attr->fragsize = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); attr->fragsize = SPA_MIN(attr->fragsize, attr->tlength / MIN_BUFFERS); + attr->fragsize -= attr->fragsize % stride; attr->fragsize = SPA_MAX(attr->fragsize, stride); if (attr->prebuf == (uint32_t) -1 || attr->prebuf == 0) attr->prebuf = attr->tlength - attr->minreq; attr->prebuf = SPA_MIN(attr->prebuf, attr->tlength - attr->minreq); + attr->prebuf -= attr->prebuf % stride; attr->prebuf = SPA_MAX(attr->prebuf, stride); dump_buffer_attr(s, attr); @@ -289,9 +293,10 @@ static void stream_control_info(void *data, uint32_t id, const struct pw_stream_ static void stream_add_buffer(void *data, struct pw_buffer *buffer) { pa_stream *s = data; - buffer->size = buffer->buffer->datas[0].maxsize; - s->maxsize += buffer->size; - s->maxblock = SPA_MIN(buffer->size, s->maxblock); + uint32_t maxsize = buffer->buffer->datas[0].maxsize; + buffer->size = 0; + s->maxsize += maxsize; + s->maxblock = SPA_MIN(maxsize, s->maxblock); } static void stream_remove_buffer(void *data, struct pw_buffer *buffer) @@ -326,10 +331,11 @@ static void update_timing_info(pa_stream *s) ti->read_index_corrupt = false; if (pwt.rate.denom > 0) { + uint64_t ticks = pwt.ticks; if (!s->have_time) - s->ticks_base = pwt.ticks + pwt.delay; - if (pwt.ticks > s->ticks_base) - pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; + s->ticks_base = ticks; + if (ticks > s->ticks_base) + pos = ((ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; else pos = 0; delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; @@ -347,12 +353,12 @@ static void update_timing_info(pa_stream *s) ti->configured_source_usec = delay; ti->write_index = pos; } - s->timing_info_valid = true; s->queued_bytes = pwt.queued + s->ready_bytes; + s->timing_info_valid = true; pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64 " write:%"PRIu64" queued:%"PRIi64, - s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay, + s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, pwt.delay, ti->read_index, ti->write_index, ti->read_index - ti->write_index); } @@ -427,12 +433,32 @@ static void pull_input(pa_stream *s) m->user_data = buf; buf->user_data = m; - pw_log_trace("input %p", m); + pw_log_trace("input %p, size:%zd ready:%zd", m, m->size, s->ready_bytes); spa_list_append(&s->ready, &m->link); s->ready_bytes += m->size; } } + +static inline uint32_t queued_size(const pa_stream *s, uint64_t elapsed) +{ + uint64_t queued; + const pa_timing_info *i = &s->timing_info; + queued = i->write_index - SPA_MIN(i->read_index, i->write_index); + queued -= SPA_MIN(queued, s->queued_bytes); + queued -= SPA_MIN(queued, elapsed); + return queued; +} + +static inline uint32_t writable_size(const pa_stream *s, uint64_t queued) +{ + return s->maxsize - SPA_MIN(queued, s->maxsize); +} +static inline uint32_t required_size(const pa_stream *s) +{ + return s->buffer_attr.tlength; +} + static void stream_process(void *data) { pa_stream *s = data; @@ -445,12 +471,12 @@ static void stream_process(void *data) queue_output(s); - queued = s->queued_bytes; - writable = s->maxsize - SPA_MIN(queued, s->maxsize); - required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); + queued = queued_size(s, 0); + writable = writable_size(s, queued); + required = required_size(s); - if (s->write_callback && s->state == PA_STREAM_READY && writable >= required) - s->write_callback(s, writable, s->write_userdata); + if (s->write_callback && s->state == PA_STREAM_READY && queued < required && writable >= required) + s->write_callback(s, required, s->write_userdata); } else { pull_input(s); @@ -954,7 +980,10 @@ static int create_stream(pa_stream_direction_t direction, str = "Music"; stride = pa_frame_size(&s->sample_spec); - sprintf(latency, "%u/%u", s->buffer_attr.minreq / stride, s->sample_spec.rate); + if (monitor) + sprintf(latency, "%u/%u", s->buffer_attr.fragsize / stride, s->sample_spec.rate); + else + sprintf(latency, "%u/%u", s->buffer_attr.tlength / stride, s->sample_spec.rate); n_items = 0; items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_TYPE, "Audio"); @@ -1257,11 +1286,9 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s) elapsed = 0; } - queued = s->queued_bytes; - queued -= SPA_MIN(queued, elapsed); - - writable = s->maxsize - SPA_MIN(queued, s->maxsize); - required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); + queued = queued_size(s, elapsed); + writable = writable_size(s, queued); + required = required_size(s); pw_log_debug("stream %p: %"PRIu64" minreq:%u maxblock:%zu", s, writable, s->buffer_attr.minreq, s->maxblock); @@ -1669,10 +1696,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) if (s->direction == PA_STREAM_PLAYBACK) { res = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec); - if (res > i->sink_usec) - res -= i->sink_usec; - else - res = 0; + res -= SPA_MIN(res, i->sink_usec); } else { res = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec); res += i->source_usec; @@ -1688,9 +1712,9 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) *r_usec = res; pw_log_trace("stream %p: now:%"PRIu64" diff:%"PRIi64 - " write-index:%"PRIi64" read_index:%"PRIi64" res:%"PRIu64, - s, now, now - res, i->write_index, i->read_index, res); - + " write-index:%"PRIi64" read_index:%"PRIi64" rw-diff:%"PRIi64" res:%"PRIu64, + s, now, now - res, i->write_index, i->read_index, + i->write_index - i->read_index, res); return 0; } @@ -1742,8 +1766,8 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) else *r_usec = time_counter_diff(s, t, c, negative); - pw_log_trace("stream %p: now:%"PRIu64" stream:%"PRIu64 - " res:%"PRIu64, s, t, c, *r_usec); + pw_log_trace("stream %p: now:%"PRIu64" stream:%"PRIu64" cindex:%"PRIi64 + " res:%"PRIu64, s, t, c, cindex, *r_usec); return 0; }