stream: tweak the timings and buffer attributes

Use tlength as the latency.
Round attributes to frame size
Only use the delay in get_time().
Tweak queued, writable and required bytes for callbacks.
This commit is contained in:
Wim Taymans 2020-09-24 17:28:30 +02:00
parent 584ae678c6
commit abf268d168

View file

@ -144,8 +144,8 @@ static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *att
blocks = 1; blocks = 1;
stride = pa_frame_size(&s->sample_spec); stride = pa_frame_size(&s->sample_spec);
maxsize = attr->tlength; maxsize = attr->tlength * MAX_BUFFERS;
size = attr->minreq; size = attr->tlength;
buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS); buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS);
pw_log_debug("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize, pw_log_debug("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize,
@ -219,21 +219,25 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag
if (attr->tlength == (uint32_t) -1 || attr->tlength == 0) if (attr->tlength == (uint32_t) -1 || attr->tlength == 0)
attr->tlength = (uint32_t) pa_usec_to_bytes(2*PA_USEC_PER_SEC, &s->sample_spec); attr->tlength = (uint32_t) pa_usec_to_bytes(2*PA_USEC_PER_SEC, &s->sample_spec);
attr->tlength = SPA_MIN(attr->tlength, attr->maxlength); attr->tlength = SPA_MIN(attr->tlength, attr->maxlength);
attr->tlength -= attr->tlength % stride;
attr->tlength = SPA_MAX(attr->tlength, MIN_SAMPLES * stride * MIN_BUFFERS); attr->tlength = SPA_MAX(attr->tlength, MIN_SAMPLES * stride * MIN_BUFFERS);
if (attr->minreq == (uint32_t) -1 || attr->minreq == 0) if (attr->minreq == (uint32_t) -1 || attr->minreq == 0)
attr->minreq = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); attr->minreq = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec);
attr->minreq = SPA_MIN(attr->minreq, attr->tlength / MIN_BUFFERS); attr->minreq = SPA_MIN(attr->minreq, attr->tlength / MIN_BUFFERS);
attr->minreq -= attr->minreq % stride;
attr->minreq = SPA_MAX(attr->minreq, stride); attr->minreq = SPA_MAX(attr->minreq, stride);
if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
attr->fragsize = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); attr->fragsize = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec);
attr->fragsize = SPA_MIN(attr->fragsize, attr->tlength / MIN_BUFFERS); attr->fragsize = SPA_MIN(attr->fragsize, attr->tlength / MIN_BUFFERS);
attr->fragsize -= attr->fragsize % stride;
attr->fragsize = SPA_MAX(attr->fragsize, stride); attr->fragsize = SPA_MAX(attr->fragsize, stride);
if (attr->prebuf == (uint32_t) -1 || attr->prebuf == 0) if (attr->prebuf == (uint32_t) -1 || attr->prebuf == 0)
attr->prebuf = attr->tlength - attr->minreq; attr->prebuf = attr->tlength - attr->minreq;
attr->prebuf = SPA_MIN(attr->prebuf, attr->tlength - attr->minreq); attr->prebuf = SPA_MIN(attr->prebuf, attr->tlength - attr->minreq);
attr->prebuf -= attr->prebuf % stride;
attr->prebuf = SPA_MAX(attr->prebuf, stride); attr->prebuf = SPA_MAX(attr->prebuf, stride);
dump_buffer_attr(s, attr); dump_buffer_attr(s, attr);
@ -289,9 +293,10 @@ static void stream_control_info(void *data, uint32_t id, const struct pw_stream_
static void stream_add_buffer(void *data, struct pw_buffer *buffer) static void stream_add_buffer(void *data, struct pw_buffer *buffer)
{ {
pa_stream *s = data; pa_stream *s = data;
buffer->size = buffer->buffer->datas[0].maxsize; uint32_t maxsize = buffer->buffer->datas[0].maxsize;
s->maxsize += buffer->size; buffer->size = 0;
s->maxblock = SPA_MIN(buffer->size, s->maxblock); s->maxsize += maxsize;
s->maxblock = SPA_MIN(maxsize, s->maxblock);
} }
static void stream_remove_buffer(void *data, struct pw_buffer *buffer) static void stream_remove_buffer(void *data, struct pw_buffer *buffer)
@ -326,10 +331,11 @@ static void update_timing_info(pa_stream *s)
ti->read_index_corrupt = false; ti->read_index_corrupt = false;
if (pwt.rate.denom > 0) { if (pwt.rate.denom > 0) {
uint64_t ticks = pwt.ticks;
if (!s->have_time) if (!s->have_time)
s->ticks_base = pwt.ticks + pwt.delay; s->ticks_base = ticks;
if (pwt.ticks > s->ticks_base) if (ticks > s->ticks_base)
pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; pos = ((ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
else else
pos = 0; pos = 0;
delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
@ -347,12 +353,12 @@ static void update_timing_info(pa_stream *s)
ti->configured_source_usec = delay; ti->configured_source_usec = delay;
ti->write_index = pos; ti->write_index = pos;
} }
s->timing_info_valid = true;
s->queued_bytes = pwt.queued + s->ready_bytes; s->queued_bytes = pwt.queued + s->ready_bytes;
s->timing_info_valid = true;
pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64 pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64
" write:%"PRIu64" queued:%"PRIi64, " write:%"PRIu64" queued:%"PRIi64,
s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay, s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, pwt.delay,
ti->read_index, ti->write_index, ti->read_index - ti->write_index); ti->read_index, ti->write_index, ti->read_index - ti->write_index);
} }
@ -427,12 +433,32 @@ static void pull_input(pa_stream *s)
m->user_data = buf; m->user_data = buf;
buf->user_data = m; buf->user_data = m;
pw_log_trace("input %p", m); pw_log_trace("input %p, size:%zd ready:%zd", m, m->size, s->ready_bytes);
spa_list_append(&s->ready, &m->link); spa_list_append(&s->ready, &m->link);
s->ready_bytes += m->size; s->ready_bytes += m->size;
} }
} }
static inline uint32_t queued_size(const pa_stream *s, uint64_t elapsed)
{
uint64_t queued;
const pa_timing_info *i = &s->timing_info;
queued = i->write_index - SPA_MIN(i->read_index, i->write_index);
queued -= SPA_MIN(queued, s->queued_bytes);
queued -= SPA_MIN(queued, elapsed);
return queued;
}
static inline uint32_t writable_size(const pa_stream *s, uint64_t queued)
{
return s->maxsize - SPA_MIN(queued, s->maxsize);
}
static inline uint32_t required_size(const pa_stream *s)
{
return s->buffer_attr.tlength;
}
static void stream_process(void *data) static void stream_process(void *data)
{ {
pa_stream *s = data; pa_stream *s = data;
@ -445,12 +471,12 @@ static void stream_process(void *data)
queue_output(s); queue_output(s);
queued = s->queued_bytes; queued = queued_size(s, 0);
writable = s->maxsize - SPA_MIN(queued, s->maxsize); writable = writable_size(s, queued);
required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); required = required_size(s);
if (s->write_callback && s->state == PA_STREAM_READY && writable >= required) if (s->write_callback && s->state == PA_STREAM_READY && queued < required && writable >= required)
s->write_callback(s, writable, s->write_userdata); s->write_callback(s, required, s->write_userdata);
} }
else { else {
pull_input(s); pull_input(s);
@ -954,7 +980,10 @@ static int create_stream(pa_stream_direction_t direction,
str = "Music"; str = "Music";
stride = pa_frame_size(&s->sample_spec); stride = pa_frame_size(&s->sample_spec);
sprintf(latency, "%u/%u", s->buffer_attr.minreq / stride, s->sample_spec.rate); if (monitor)
sprintf(latency, "%u/%u", s->buffer_attr.fragsize / stride, s->sample_spec.rate);
else
sprintf(latency, "%u/%u", s->buffer_attr.tlength / stride, s->sample_spec.rate);
n_items = 0; n_items = 0;
items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_TYPE, "Audio"); items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_TYPE, "Audio");
@ -1257,11 +1286,9 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s)
elapsed = 0; elapsed = 0;
} }
queued = s->queued_bytes; queued = queued_size(s, elapsed);
queued -= SPA_MIN(queued, elapsed); writable = writable_size(s, queued);
required = required_size(s);
writable = s->maxsize - SPA_MIN(queued, s->maxsize);
required = SPA_MIN(s->maxblock, s->buffer_attr.minreq);
pw_log_debug("stream %p: %"PRIu64" minreq:%u maxblock:%zu", s, pw_log_debug("stream %p: %"PRIu64" minreq:%u maxblock:%zu", s,
writable, s->buffer_attr.minreq, s->maxblock); writable, s->buffer_attr.minreq, s->maxblock);
@ -1669,10 +1696,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
res = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec); res = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec);
if (res > i->sink_usec) res -= SPA_MIN(res, i->sink_usec);
res -= i->sink_usec;
else
res = 0;
} else { } else {
res = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec); res = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec);
res += i->source_usec; res += i->source_usec;
@ -1688,9 +1712,9 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
*r_usec = res; *r_usec = res;
pw_log_trace("stream %p: now:%"PRIu64" diff:%"PRIi64 pw_log_trace("stream %p: now:%"PRIu64" diff:%"PRIi64
" write-index:%"PRIi64" read_index:%"PRIi64" res:%"PRIu64, " write-index:%"PRIi64" read_index:%"PRIi64" rw-diff:%"PRIi64" res:%"PRIu64,
s, now, now - res, i->write_index, i->read_index, res); s, now, now - res, i->write_index, i->read_index,
i->write_index - i->read_index, res);
return 0; return 0;
} }
@ -1742,8 +1766,8 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative)
else else
*r_usec = time_counter_diff(s, t, c, negative); *r_usec = time_counter_diff(s, t, c, negative);
pw_log_trace("stream %p: now:%"PRIu64" stream:%"PRIu64 pw_log_trace("stream %p: now:%"PRIu64" stream:%"PRIu64" cindex:%"PRIi64
" res:%"PRIu64, s, t, c, *r_usec); " res:%"PRIu64, s, t, c, cindex, *r_usec);
return 0; return 0;
} }