* rename "latency correction" to "write index correction"

* add read index invalidation code
* rename "ipol_event" stuff to "auto_timing_update"
* remove buffer_usec field from pa_timing_info, since it can be easily calculated from write_index and read_index anyway
* add read_index_corrupt field to "pa_timing_info", similar to the already existing write_index_corrupt field
* restart automatic timing update event every time a query is issued, not just when the last event elapsed
* proper invalidation code for pa_stream_flush()
* do tarsnport/sink/source latency correction for playback time only when device is not corked


git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@686 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2006-04-12 17:17:23 +00:00
parent 77c2a1f561
commit 49b3150434
3 changed files with 217 additions and 135 deletions

View file

@ -204,13 +204,14 @@ typedef enum pa_subscription_event_type {
* pa_stream_update_timing_info() and pa_stream_get_timing_info(). The * pa_stream_update_timing_info() and pa_stream_get_timing_info(). The
* total output latency a sample that is written with * total output latency a sample that is written with
* pa_stream_write() takes to be played may be estimated by * pa_stream_write() takes to be played may be estimated by
* sink_usec+buffer_usec+transport_usec. The output buffer to which * sink_usec+buffer_usec+transport_usec. (where buffer_usec is defined
* buffer_usec relates may be manipulated freely (with * as pa_bytes_to_usec(write_index-read_index)) The output buffer
* which buffer_usec relates to may be manipulated freely (with
* pa_stream_write()'s seek argument, pa_stream_flush() and friends), * pa_stream_write()'s seek argument, pa_stream_flush() and friends),
* the buffers sink_usec and source_usec relate to are first-in * the buffers sink_usec and source_usec relate to are first-in
* first-out (FIFO) buffers which cannot be flushed or manipulated in any * first-out (FIFO) buffers which cannot be flushed or manipulated in
* way. The total input latency a sample that is recorded takes to be * any way. The total input latency a sample that is recorded takes to
* delivered to the application is: * be delivered to the application is:
* source_usec+buffer_usec+transport_usec-sink_usec. (Take care of * source_usec+buffer_usec+transport_usec-sink_usec. (Take care of
* sign issues!) When connected to a monitor source sink_usec contains * sign issues!) When connected to a monitor source sink_usec contains
* the latency of the owning sink. The two latency estimations * the latency of the owning sink. The two latency estimations
@ -226,7 +227,6 @@ typedef struct pa_timing_info {
* limited und unreliable itself. \since * limited und unreliable itself. \since
* 0.5 */ * 0.5 */
pa_usec_t buffer_usec; /**< Time in usecs the current buffer takes to play. For both playback and record streams. */
pa_usec_t sink_usec; /**< Time in usecs a sample takes to be played on the sink. For playback streams and record streams connected to a monitor source. */ pa_usec_t sink_usec; /**< Time in usecs a sample takes to be played on the sink. For playback streams and record streams connected to a monitor source. */
pa_usec_t source_usec; /**< Time in usecs a sample takes from being recorded to being delivered to the application. Only for record streams. \since 0.5*/ pa_usec_t source_usec; /**< Time in usecs a sample takes from being recorded to being delivered to the application. Only for record streams. \since 0.5*/
pa_usec_t transport_usec; /**< Estimated time in usecs a sample takes to be transferred to/from the daemon. For both playback and record streams. \since 0.5 */ pa_usec_t transport_usec; /**< Estimated time in usecs a sample takes to be transferred to/from the daemon. For both playback and record streams. \since 0.5 */
@ -240,7 +240,7 @@ typedef struct pa_timing_info {
* info was current . Only write * info was current . Only write
* commands with SEEK_RELATIVE_ON_READ * commands with SEEK_RELATIVE_ON_READ
* and SEEK_RELATIVE_END can corrupt * and SEEK_RELATIVE_END can corrupt
* write_index. */ * write_index. \since 0.8 */
int64_t write_index; /**< Current write index into the int64_t write_index; /**< Current write index into the
* playback buffer in bytes. Think twice before * playback buffer in bytes. Think twice before
* using this for seeking purposes: it * using this for seeking purposes: it
@ -248,6 +248,13 @@ typedef struct pa_timing_info {
* want to use it. Consider using * want to use it. Consider using
* PA_SEEK_RELATIVE instead. \since * PA_SEEK_RELATIVE instead. \since
* 0.8 */ * 0.8 */
int read_index_corrupt; /**< Non-zero if read_index is not
* up-to-date because a local pause or
* flush request that corrupted it has
* been issued in the time since this
* latency info was current. \since 0.8 */
int64_t read_index; /**< Current read index into the int64_t read_index; /**< Current read index into the
* playback buffer in bytes. Think twice before * playback buffer in bytes. Think twice before
* using this for seeking purposes: it * using this for seeking purposes: it

View file

@ -83,14 +83,14 @@ struct pa_context {
pa_client_conf *conf; pa_client_conf *conf;
}; };
#define PA_MAX_LATENCY_CORRECTIONS 10 #define PA_MAX_WRITE_INDEX_CORRECTIONS 10
typedef struct pa_latency_correction { typedef struct pa_index_correction {
uint32_t tag; uint32_t tag;
int valid; int valid;
int64_t value; int64_t value;
int absolute, corrupt; int absolute, corrupt;
} pa_latency_correction; } pa_index_correction;
struct pa_stream { struct pa_stream {
int ref; int ref;
@ -124,13 +124,18 @@ struct pa_stream {
/* Use to make sure that time advances monotonically */ /* Use to make sure that time advances monotonically */
pa_usec_t previous_time; pa_usec_t previous_time;
/* Latency correction stuff */ /* time updates with tags older than these are invalid */
pa_latency_correction latency_corrections[PA_MAX_LATENCY_CORRECTIONS]; uint32_t write_index_not_before;
int idx_latency_correction; uint32_t read_index_not_before;
/* Data about individual timing update correctoins */
pa_index_correction write_index_corrections[PA_MAX_WRITE_INDEX_CORRECTIONS];
int current_write_index_correction;
/* Latency interpolation stuff */ /* Latency interpolation stuff */
pa_time_event *ipol_event; pa_time_event *auto_timing_update_event;
int ipol_requested; int auto_timing_update_requested;
pa_usec_t ipol_usec; pa_usec_t ipol_usec;
int ipol_usec_valid; int ipol_usec_valid;
struct timeval ipol_timestamp; struct timeval ipol_timestamp;

View file

@ -90,18 +90,21 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->previous_time = 0; s->previous_time = 0;
s->timing_info_valid = 0; s->timing_info_valid = 0;
s->read_index_not_before = 0;
s->write_index_not_before = 0;
for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
s->write_index_corrections[i].valid = 0;
s->current_write_index_correction = 0;
s->corked = 0; s->corked = 0;
s->ipol_usec_valid = 0; s->ipol_usec_valid = 0;
s->ipol_timestamp.tv_sec = 0; s->ipol_timestamp.tv_sec = 0;
s->ipol_timestamp.tv_usec = 0; s->ipol_timestamp.tv_usec = 0;
s->ipol_event = NULL;
s->ipol_requested = 0;
for (i = 0; i < PA_MAX_LATENCY_CORRECTIONS; i++) s->auto_timing_update_event = NULL;
s->latency_corrections[i].valid = 0; s->auto_timing_update_requested = 0;
s->idx_latency_correction = 0;
PA_LLIST_PREPEND(pa_stream, c->streams, s); PA_LLIST_PREPEND(pa_stream, c->streams, s);
@ -119,9 +122,9 @@ static void stream_free(pa_stream *s) {
pa_context_unref(s->context); pa_context_unref(s->context);
if (s->ipol_event) { if (s->auto_timing_update_event) {
assert(s->mainloop); assert(s->mainloop);
s->mainloop->time_free(s->ipol_event); s->mainloop->time_free(s->auto_timing_update_event);
} }
if (s->peek_memchunk.memblock) if (s->peek_memchunk.memblock)
@ -296,34 +299,77 @@ void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC
} }
} }
finish: finish:
pa_context_unref(c); pa_context_unref(c);
} }
static void ipol_callback(pa_mainloop_api *m, pa_time_event *e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) { static void request_auto_timing_update(pa_stream *s, int force) {
struct timeval next; struct timeval next;
pa_stream *s = userdata; assert(s);
pa_stream_ref(s); if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
return;
/* pa_log("requesting new ipol data"); */ if (s->state == PA_STREAM_READY &&
(force || !s->auto_timing_update_requested)) {
if (s->state == PA_STREAM_READY && !s->ipol_requested) {
pa_operation *o; pa_operation *o;
/* pa_log("automatically requesting new timing data"); */
if ((o = pa_stream_update_timing_info(s, NULL, NULL))) { if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
pa_operation_unref(o); pa_operation_unref(o);
s->ipol_requested = 1; s->auto_timing_update_requested = 1;
} }
} }
pa_gettimeofday(&next); pa_gettimeofday(&next);
pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC); pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
m->time_restart(e, &next); s->mainloop->time_restart(s->auto_timing_update_event, &next);
pa_stream_unref(s);
} }
static void invalidate_indexes(pa_stream *s, int r, int w) {
assert(s);
pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
if (s->state != PA_STREAM_READY)
return;
if (w) {
s->write_index_not_before = s->context->ctag;
if (s->timing_info_valid)
s->timing_info.write_index_corrupt = 1;
pa_log("write_index invalidated");
}
if (r) {
s->read_index_not_before = s->context->ctag;
if (s->timing_info_valid)
s->timing_info.read_index_corrupt = 1;
pa_log("read_index invalidated");
}
if ((s->direction == PA_STREAM_PLAYBACK && r) ||
(s->direction == PA_STREAM_RECORD && w))
s->ipol_usec_valid = 0;
request_auto_timing_update(s, 1);
}
static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
pa_stream *s = userdata;
pa_log("time event");
pa_stream_ref(s);
request_auto_timing_update(s, 0);
pa_stream_unref(s);
}
void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) { void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_stream *s = userdata; pa_stream *s = userdata;
@ -371,6 +417,8 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
/* We add an extra ref as long as we're connected (i.e. in the dynarray) */ /* We add an extra ref as long as we're connected (i.e. in the dynarray) */
pa_stream_ref(s); pa_stream_ref(s);
pa_stream_set_state(s, PA_STREAM_READY);
if (s->direction != PA_STREAM_UPLOAD && if (s->direction != PA_STREAM_UPLOAD &&
s->flags & PA_STREAM_AUTO_TIMING_UPDATE) { s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
struct timeval tv; struct timeval tv;
@ -378,11 +426,11 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
pa_gettimeofday(&tv); pa_gettimeofday(&tv);
tv.tv_usec += LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */ tv.tv_usec += LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
assert(!s->ipol_event); assert(!s->auto_timing_update_event);
s->ipol_event = s->mainloop->time_new(s->mainloop, &tv, &ipol_callback, s); s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
}
pa_stream_set_state(s, PA_STREAM_READY); request_auto_timing_update(s, 1);
}
if (s->requested_bytes > 0 && s->ref > 1 && s->write_callback) if (s->requested_bytes > 0 && s->ref > 1 && s->write_callback)
s->write_callback(s, s->requested_bytes, s->write_userdata); s->write_callback(s, s->requested_bytes, s->write_userdata);
@ -548,18 +596,19 @@ int pa_stream_write(
s->requested_bytes = 0; s->requested_bytes = 0;
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
/* Update latency request correction */ /* Update latency request correction */
if (s->latency_corrections[s->idx_latency_correction].valid) { if (s->write_index_corrections[s->current_write_index_correction].valid) {
if (seek == PA_SEEK_ABSOLUTE) { if (seek == PA_SEEK_ABSOLUTE) {
s->latency_corrections[s->idx_latency_correction].corrupt = 0; s->write_index_corrections[s->current_write_index_correction].corrupt = 0;
s->latency_corrections[s->idx_latency_correction].absolute = 1; s->write_index_corrections[s->current_write_index_correction].absolute = 1;
s->latency_corrections[s->idx_latency_correction].value = offset + length; s->write_index_corrections[s->current_write_index_correction].value = offset + length;
} else if (seek == PA_SEEK_RELATIVE) { } else if (seek == PA_SEEK_RELATIVE) {
if (!s->latency_corrections[s->idx_latency_correction].corrupt) if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
s->latency_corrections[s->idx_latency_correction].value += offset + length; s->write_index_corrections[s->current_write_index_correction].value += offset + length;
} else } else
s->latency_corrections[s->idx_latency_correction].corrupt = 1; s->write_index_corrections[s->current_write_index_correction].corrupt = 1;
} }
/* Update the write index in the already available latency data */ /* Update the write index in the already available latency data */
@ -574,6 +623,9 @@ int pa_stream_write(
} else } else
s->timing_info.write_index_corrupt = 1; s->timing_info.write_index_corrupt = 1;
} }
if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
request_auto_timing_update(s, 1);
} }
return 0; return 0;
@ -672,15 +724,20 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
assert(o->context); assert(o->context);
i = &o->stream->timing_info; i = &o->stream->timing_info;
pa_log("pre corrupt w:%u r:%u\n", !o->stream->timing_info_valid || i->write_index_corrupt,!o->stream->timing_info_valid || i->read_index_corrupt);
o->stream->timing_info_valid = 0; o->stream->timing_info_valid = 0;
i->write_index_corrupt = 0; i->write_index_corrupt = 0;
i->read_index_corrupt = 0;
pa_log("timing update %u\n", tag);
if (command != PA_COMMAND_REPLY) { if (command != PA_COMMAND_REPLY) {
if (pa_context_handle_error(o->context, command, t) < 0) if (pa_context_handle_error(o->context, command, t) < 0)
goto finish; goto finish;
} else if (pa_tagstruct_get_usec(t, &i->buffer_usec) < 0 || } else if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
pa_tagstruct_get_usec(t, &i->source_usec) < 0 || pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
pa_tagstruct_get_boolean(t, &i->playing) < 0 || pa_tagstruct_get_boolean(t, &i->playing) < 0 ||
pa_tagstruct_get_timeval(t, &local) < 0 || pa_tagstruct_get_timeval(t, &local) < 0 ||
@ -692,8 +749,11 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
goto finish; goto finish;
} else { } else {
o->stream->timing_info_valid = 1;
pa_gettimeofday(&now); pa_gettimeofday(&now);
/* Calculcate timestamps */
if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) { if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
/* local and remote seem to have synchronized clocks */ /* local and remote seem to have synchronized clocks */
@ -712,6 +772,13 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
pa_timeval_add(&i->timestamp, i->transport_usec); pa_timeval_add(&i->timestamp, i->transport_usec);
} }
/* Invalidate read and write indexes if necessary */
if (tag < o->stream->read_index_not_before)
i->read_index_corrupt = 1;
if (tag < o->stream->write_index_not_before)
i->write_index_corrupt = 1;
if (o->stream->direction == PA_STREAM_PLAYBACK) { if (o->stream->direction == PA_STREAM_PLAYBACK) {
/* Write index correction */ /* Write index correction */
@ -720,50 +787,51 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command,
/* Go through the saved correction values and add up the total correction.*/ /* Go through the saved correction values and add up the total correction.*/
for (n = 0, j = o->stream->idx_latency_correction; for (n = 0, j = o->stream->current_write_index_correction+1;
n < PA_MAX_LATENCY_CORRECTIONS; n < PA_MAX_WRITE_INDEX_CORRECTIONS;
n++, j = (j + 1) % PA_MAX_LATENCY_CORRECTIONS) { n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
/* Step over invalid data or out-of-date data */ /* Step over invalid data or out-of-date data */
if (!o->stream->latency_corrections[j].valid || if (!o->stream->write_index_corrections[j].valid ||
o->stream->latency_corrections[j].tag < ctag) o->stream->write_index_corrections[j].tag < ctag)
continue; continue;
/* Make sure that everything is in order */ /* Make sure that everything is in order */
ctag = o->stream->latency_corrections[j].tag+1; ctag = o->stream->write_index_corrections[j].tag+1;
/* Now fix the write index */ /* Now fix the write index */
if (o->stream->latency_corrections[j].corrupt) { if (o->stream->write_index_corrections[j].corrupt) {
/* A corrupting seek was made */ /* A corrupting seek was made */
i->write_index = 0; i->write_index = 0;
i->write_index_corrupt = 1; i->write_index_corrupt = 1;
} else if (o->stream->latency_corrections[j].absolute) { } else if (o->stream->write_index_corrections[j].absolute) {
/* An absolute seek was made */ /* An absolute seek was made */
i->write_index = o->stream->latency_corrections[j].value; i->write_index = o->stream->write_index_corrections[j].value;
i->write_index_corrupt = 0; i->write_index_corrupt = 0;
} else if (!i->write_index_corrupt) { } else if (!i->write_index_corrupt) {
/* A relative seek was made */ /* A relative seek was made */
i->write_index += o->stream->latency_corrections[j].value; i->write_index += o->stream->write_index_corrections[j].value;
} }
} }
} }
o->stream->timing_info_valid = 1;
o->stream->ipol_timestamp = now; o->stream->ipol_timestamp = now;
o->stream->ipol_usec_valid = 0; o->stream->ipol_usec_valid = 0;
} }
o->stream->auto_timing_update_requested = 0;
pa_log("post corrupt w:%u r:%u\n", i->write_index_corrupt || !o->stream->timing_info_valid, i->read_index_corrupt || !o->stream->timing_info_valid);
/* Clear old correction entries */ /* Clear old correction entries */
if (o->stream->direction == PA_STREAM_PLAYBACK) { if (o->stream->direction == PA_STREAM_PLAYBACK) {
int n; int n;
for (n = 0; n < PA_MAX_LATENCY_CORRECTIONS; n++) { for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
if (!o->stream->latency_corrections[n].valid) if (!o->stream->write_index_corrections[n].valid)
continue; continue;
if (o->stream->latency_corrections[n].tag <= tag) if (o->stream->write_index_corrections[n].tag <= tag)
o->stream->latency_corrections[n].valid = 0; o->stream->write_index_corrections[n].valid = 0;
} }
} }
@ -793,13 +861,12 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
/* Find a place to store the write_index correction data for this entry */ /* Find a place to store the write_index correction data for this entry */
cidx = (s->idx_latency_correction + 1) % PA_MAX_LATENCY_CORRECTIONS; cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
/* Check if we could allocate a correction slot. If not, there are too many outstanding queries */ /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->latency_corrections[cidx].valid, PA_ERR_INTERNAL); PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
} }
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
t = pa_tagstruct_command( t = pa_tagstruct_command(
s->context, s->context,
@ -813,14 +880,16 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
/* Fill in initial correction data */ /* Fill in initial correction data */
o->stream->idx_latency_correction = cidx; o->stream->current_write_index_correction = cidx;
o->stream->latency_corrections[cidx].valid = 1; o->stream->write_index_corrections[cidx].valid = 1;
o->stream->latency_corrections[cidx].tag = tag; o->stream->write_index_corrections[cidx].tag = tag;
o->stream->latency_corrections[cidx].absolute = 0; o->stream->write_index_corrections[cidx].absolute = 0;
o->stream->latency_corrections[cidx].value = 0; o->stream->write_index_corrections[cidx].value = 0;
o->stream->latency_corrections[cidx].corrupt = 0; o->stream->write_index_corrections[cidx].corrupt = 0;
} }
pa_log("requesting update %u\n", tag);
return pa_operation_ref(o); return pa_operation_ref(o);
} }
@ -946,7 +1015,6 @@ finish:
pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) { pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
pa_operation *o; pa_operation *o;
pa_operation *lo;
pa_tagstruct *t; pa_tagstruct *t;
uint32_t tag; uint32_t tag;
@ -956,15 +1024,6 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
if (!s->corked && b) {
/* Refresh the interpolated data just befor pausing */
pa_stream_get_time(s, NULL);
} else if (s->corked && !b)
/* Unpausing */
pa_gettimeofday(&s->ipol_timestamp);
}
s->corked = b; s->corked = b;
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
@ -978,8 +1037,8 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
pa_pstream_send_tagstruct(s->context->pstream, t); pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
if ((lo = pa_stream_update_timing_info(s, NULL, NULL))) if (s->direction == PA_STREAM_PLAYBACK)
pa_operation_unref(lo); invalidate_indexes(s, 1, 0);
return pa_operation_ref(o); return pa_operation_ref(o);
} }
@ -1010,10 +1069,20 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata))) { if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata))) {
pa_operation *lo;
if ((lo = pa_stream_update_timing_info(s, NULL, NULL))) if (s->direction == PA_STREAM_PLAYBACK) {
pa_operation_unref(lo); if (s->write_index_corrections[s->current_write_index_correction].valid)
s->write_index_corrections[s->current_write_index_correction].corrupt = 1;
if (s->timing_info_valid)
s->timing_info.write_index_corrupt = 1;
if (s->buffer_attr.prebuf > 0)
invalidate_indexes(s, 1, 0);
else
request_auto_timing_update(s, 1);
} else
invalidate_indexes(s, 0, 1);
} }
return o; return o;
@ -1023,13 +1092,10 @@ pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *us
pa_operation *o; pa_operation *o;
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata))) { if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
pa_operation *lo; invalidate_indexes(s, 1, 0);
if ((lo = pa_stream_update_timing_info(s, NULL, NULL)))
pa_operation_unref(lo);
}
return o; return o;
} }
@ -1038,13 +1104,10 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u
pa_operation *o; pa_operation *o;
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata))) { if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
pa_operation *lo; invalidate_indexes(s, 1, 0);
if ((lo = pa_stream_update_timing_info(s, NULL, NULL)))
pa_operation_unref(lo);
}
return o; return o;
} }
@ -1084,8 +1147,10 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
if (s->flags & PA_STREAM_INTERPOLATE_TIMING && s->ipol_usec_valid ) if (s->flags & PA_STREAM_INTERPOLATE_TIMING && s->ipol_usec_valid)
usec = s->ipol_usec; usec = s->ipol_usec;
else { else {
if (s->direction == PA_STREAM_PLAYBACK) { if (s->direction == PA_STREAM_PLAYBACK) {
@ -1093,41 +1158,45 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
* had this time value associated */ * had this time value associated */
usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec); usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
/* Because the latency info took a little time to come if (!s->corked) {
* to us, we assume that the real output time is actually /* Because the latency info took a little time to come
* a little ahead */ * to us, we assume that the real output time is actually
usec += s->timing_info.transport_usec; * a little ahead */
usec += s->timing_info.transport_usec;
/* However, the output device usually maintains a buffer /* However, the output device usually maintains a buffer
too, hence the real sample currently played is a little too, hence the real sample currently played is a little
back */ back */
if (s->timing_info.sink_usec >= usec) if (s->timing_info.sink_usec >= usec)
usec = 0; usec = 0;
else else
usec -= s->timing_info.sink_usec; usec -= s->timing_info.sink_usec;
}
} else if (s->direction == PA_STREAM_RECORD) { } else if (s->direction == PA_STREAM_RECORD) {
/* The last byte written into the server side queue had /* The last byte written into the server side queue had
* this time value associated */ * this time value associated */
usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec); usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
/* Add transport latency */ if (!s->corked) {
usec += s->timing_info.transport_usec; /* Add transport latency */
usec += s->timing_info.transport_usec;
/* Add latency of data in device buffer */ /* Add latency of data in device buffer */
usec += s->timing_info.source_usec; usec += s->timing_info.source_usec;
/* If this is a monitor source, we need to correct the /* If this is a monitor source, we need to correct the
* time by the playback device buffer */ * time by the playback device buffer */
if (s->timing_info.sink_usec >= usec) if (s->timing_info.sink_usec >= usec)
usec = 0; usec = 0;
else else
usec -= s->timing_info.sink_usec; usec -= s->timing_info.sink_usec;
}
} }
if (s->flags & PA_STREAM_INTERPOLATE_TIMING) { if (s->flags & PA_STREAM_INTERPOLATE_TIMING) {
s->ipol_usec_valid = 1;
s->ipol_usec = usec; s->ipol_usec = usec;
s->ipol_usec_valid = 1;
} }
} }
@ -1189,6 +1258,7 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
if ((r = pa_stream_get_time(s, &t)) < 0) if ((r = pa_stream_get_time(s, &t)) < 0)
return r; return r;