stream: improve timing

Improve the delay, make sure we don't overflow
Flush and free pending memory
Keep track of requested_bytes
This commit is contained in:
Wim Taymans 2020-04-14 10:22:34 +02:00
parent 4fd43733c2
commit 6d24a034fb

View file

@ -458,19 +458,19 @@ static void update_timing_info(pa_stream *s)
s->have_time = false; s->have_time = false;
} }
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
ti->sink_usec = -delay; ti->sink_usec = delay;
ti->configured_sink_usec = delay;
ti->read_index = index; ti->read_index = index;
} else { } else {
ti->source_usec = delay; ti->source_usec = delay;
ti->configured_source_usec = delay;
ti->write_index = index; ti->write_index = index;
} }
ti->configured_sink_usec = 0;
ti->configured_source_usec = 0;
ti->since_underrun = 0; ti->since_underrun = 0;
s->timing_info_valid = true; s->timing_info_valid = true;
pw_log_trace("stream %p: %"PRIu64" rate:%d delay:%"PRIi64, s, pwt.queued, pwt.rate.denom, delay); pw_log_debug("stream %p: %"PRIu64" rate:%d delay:%"PRIi64 " read:%"PRIu64" write:%"PRIu64,
s, pwt.queued, pwt.rate.denom, delay, ti->read_index, ti->write_index);
} }
static void push_output(pa_stream *s) static void push_output(pa_stream *s)
@ -507,12 +507,13 @@ static void stream_process(void *data)
update_timing_info(s); update_timing_info(s);
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
s->requested_bytes += s->buffer_attr.tlength;
if (s->ready_bytes < s->buffer_attr.tlength) if (s->ready_bytes < s->buffer_attr.tlength)
s->requested_bytes = s->buffer_attr.tlength - s->ready_bytes; s->requested_bytes -= s->ready_bytes;
else else
s->requested_bytes = 0; s->requested_bytes = 0;
if (s->write_callback && s->requested_bytes) if (s->write_callback && s->requested_bytes > 0)
s->write_callback(s, s->requested_bytes, s->write_userdata); s->write_callback(s, s->requested_bytes, s->write_userdata);
push_output(s); push_output(s);
@ -633,7 +634,7 @@ static pa_stream* stream_new(pa_context *c, const char *name,
s->buffer_attr.prebuf = (uint32_t) -1; s->buffer_attr.prebuf = (uint32_t) -1;
s->buffer_attr.fragsize = (uint32_t) -1; s->buffer_attr.fragsize = (uint32_t) -1;
s->maxblock = INT_MAX; s->maxblock = INT_MAX;
s->requested_bytes = s->buffer_attr.tlength; s->requested_bytes = 0;
s->device_index = PA_INVALID_INDEX; s->device_index = PA_INVALID_INDEX;
s->device_name = NULL; s->device_name = NULL;
@ -695,6 +696,7 @@ static void stream_unlink(pa_stream *s)
static void stream_free(pa_stream *s) static void stream_free(pa_stream *s)
{ {
int i; int i;
struct pa_mem *m;
pw_log_debug("stream %p", s); pw_log_debug("stream %p", s);
@ -703,6 +705,10 @@ static void stream_free(pa_stream *s)
pw_stream_destroy(s->stream); pw_stream_destroy(s->stream);
} }
spa_list_consume(m, &s->free, link) {
spa_list_remove(&m->link);
free(m);
}
if (s->proplist) if (s->proplist)
pa_proplist_free(s->proplist); pa_proplist_free(s->proplist);
@ -1056,7 +1062,8 @@ int pa_stream_connect_record(
static void on_disconnected(pa_operation *o, void *userdata) static void on_disconnected(pa_operation *o, void *userdata)
{ {
pa_stream_set_state(o->stream, PA_STREAM_TERMINATED); pa_stream *s = o->stream;
pa_stream_set_state(s, PA_STREAM_TERMINATED);
} }
SPA_EXPORT SPA_EXPORT
@ -1246,6 +1253,7 @@ int pa_stream_write_ext_free(pa_stream *s,
release_mem(s); release_mem(s);
} }
s->timing_info.write_index += nbytes; s->timing_info.write_index += nbytes;
s->requested_bytes = s->requested_bytes > nbytes ? s->requested_bytes - nbytes : 0;
pw_log_trace("stream %p: written %zd bytes", s, nbytes); pw_log_trace("stream %p: written %zd bytes", s, nbytes);
return 0; return 0;
@ -1325,6 +1333,7 @@ size_t pa_stream_readable_size(PA_CONST pa_stream *s)
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD,
PA_ERR_BADSTATE, (size_t) -1); PA_ERR_BADSTATE, (size_t) -1);
pw_log_trace("stream %p: %zd", s, s->requested_bytes);
return s->requested_bytes; return s->requested_bytes;
} }
@ -1580,6 +1589,7 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
{ {
pa_operation *o; pa_operation *o;
struct success_ack *d; struct success_ack *d;
struct pa_mem *m;
spa_assert(s); spa_assert(s);
spa_assert(s->refcount >= 1); spa_assert(s->refcount >= 1);
@ -1592,6 +1602,12 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
d = o->userdata; d = o->userdata;
d->cb = cb; d->cb = cb;
d->userdata = userdata; d->userdata = userdata;
spa_list_consume(m, &s->ready, link) {
spa_list_remove(&m->link);
spa_list_append(&s->free, &m->link);
}
s->ready_bytes = 0;
s->timing_info.write_index = s->timing_info.read_index; s->timing_info.write_index = s->timing_info.read_index;
pa_operation_sync(o); pa_operation_sync(o);
@ -1675,9 +1691,8 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe
SPA_EXPORT SPA_EXPORT
int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
{ {
pa_usec_t res;
struct timespec ts; struct timespec ts;
uint64_t now, delay, time; uint64_t now, res;
pa_timing_info *i; pa_timing_info *i;
spa_assert(s); spa_assert(s);
@ -1685,33 +1700,31 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
update_timing_info(s);
i = &s->timing_info;
if (s->direction == PA_STREAM_PLAYBACK) {
res = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec);
if (res > i->sink_usec)
res -= i->sink_usec;
else
res = 0;
} else {
res = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec);
res += i->source_usec;
}
clock_gettime(CLOCK_MONOTONIC, &ts); clock_gettime(CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_USEC(&ts); now = SPA_TIMESPEC_TO_USEC(&ts);
i = &s->timing_info;
if (s->have_time) if (s->have_time)
delay = now - SPA_TIMEVAL_TO_USEC(&i->timestamp); res += now - SPA_TIMEVAL_TO_USEC(&i->timestamp);
else
delay = 0;
if (s->direction == PA_STREAM_PLAYBACK)
time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec) + i->sink_usec;
else
time = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec) + i->source_usec;
res = delay + time;
if (r_usec) if (r_usec)
*r_usec = res; *r_usec = res;
pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" time:%"PRIu64 pw_log_debug("stream %p: now:%"PRIu64" diff:%"PRIi64
" write-index:%"PRIi64" read_index:%"PRIi64" diff:%"PRIi64" res:%"PRIu64, " write-index:%"PRIi64" read_index:%"PRIi64" res:%"PRIu64,
s, now, delay, time, s, now, now - res, i->write_index, i->read_index, res);
i->write_index, i->read_index,
i->write_index - i->read_index,
res);
return 0; return 0;
} }
@ -1775,7 +1788,6 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s)
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
update_timing_info(s);
pw_log_trace("stream %p: %"PRIi64" %"PRIi64" %"PRIi64, s, pw_log_trace("stream %p: %"PRIi64" %"PRIi64" %"PRIi64, s,
s->timing_info.write_index, s->timing_info.read_index, s->timing_info.write_index, s->timing_info.read_index,