diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index 4ef5fa933..9fd56719e 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -357,12 +357,14 @@ struct pa_stream { char *device_name; pa_timing_info timing_info; + int64_t ticks_base; uint32_t direct_on_input; - bool suspended:1; - bool corked:1; - bool timing_info_valid:1; + unsigned int suspended:1; + unsigned int corked:1; + unsigned int timing_info_valid:1; + unsigned int have_time:1; pa_stream_notify_cb_t state_callback; void *state_userdata; @@ -387,8 +389,6 @@ struct pa_stream { pa_stream_notify_cb_t buffer_attr_callback; void *buffer_attr_userdata; - int64_t offset; - struct pw_buffer *dequeued[MAX_BUFFERS]; struct spa_ringbuffer dequeued_ring; size_t dequeued_size; @@ -405,7 +405,6 @@ struct pa_stream { float channel_volumes[SPA_AUDIO_MAX_CHANNELS]; bool mute; pa_operation *drain; - uint64_t queued; }; void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 87da005cc..3047a1a5d 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -444,15 +444,12 @@ static void update_timing_info(pa_stream *s) struct pw_time pwt; pa_timing_info *ti = &s->timing_info; size_t stride = pa_frame_size(&s->sample_spec); - int64_t delay, queued, ticks; + int64_t delay, index; pw_stream_get_time(s->stream, &pwt); s->timing_info_valid = false; - s->queued = pwt.queued; - pw_log_debug("stream %p: %"PRIu64" rate:%d", s, s->queued, pwt.rate.denom); - if (pwt.rate.denom == 0) - return; + pw_log_debug("stream %p: %"PRIu64" rate:%d", s, pwt.queued, pwt.rate.denom); pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC); ti->synchronized_clocks = true; @@ -461,21 +458,23 @@ static void update_timing_info(pa_stream *s) ti->write_index_corrupt = false; ti->read_index_corrupt = false; - queued = pwt.queued + (pwt.ticks * s->sample_spec.rate / pwt.rate.denom) * stride; - ticks = ((pwt.ticks + pwt.delay) * s->sample_spec.rate / pwt.rate.denom) * stride; - - delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; + if (pwt.rate.denom > 0) { + if (s->ticks_base == -1) + s->ticks_base = pwt.ticks + pwt.delay; + index = ((pwt.ticks + pwt.delay - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; + delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; + s->have_time = true; + } else { + index = delay = 0; + s->have_time = false; + } if (s->direction == PA_STREAM_PLAYBACK) { ti->sink_usec = -delay; - ti->write_index = queued; - ti->read_index = ticks; - } - else { + ti->read_index = index; + } else { ti->source_usec = delay; - ti->read_index = queued; - ti->write_index = ticks; + ti->write_index = index; } - ti->configured_sink_usec = 0; ti->configured_source_usec = 0; ti->since_underrun = 0; @@ -486,10 +485,9 @@ static void stream_process(void *data) { pa_stream *s = data; - update_timing_info(s); - while (dequeue_buffer(s) == 0); + pw_log_trace("stream %p: %"PRIu64, s, s->dequeued_size); if (s->dequeued_size <= 0) return; @@ -569,6 +567,7 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->direction = PA_STREAM_NODIRECTION; s->state = PA_STREAM_UNCONNECTED; s->flags = 0; + s->ticks_base = -1; if (ss) s->sample_spec = *ss; @@ -1124,16 +1123,17 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); + pw_log_trace("peek buffer %p %zd %d %d", *data, *nbytes, s->buffer_size, s->buffer_offset); + if ((res = peek_buffer(s)) < 0) { *data = NULL; *nbytes = 0; - } - else { + } else { size_t max = s->buffer_size - s->buffer_offset; *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max; } - pw_log_trace("peek buffer %p %zd", *data, *nbytes); + pw_log_trace("peek buffer %p %zd %p %d", *data, *nbytes, s->buffer, res); return 0; } @@ -1198,6 +1198,8 @@ int pa_stream_write_ext_free(pa_stream *s, const void *src = data; size_t towrite = nbytes, dsize; + pw_log_debug("stream %p: write %zd bytes", s, nbytes); + while (towrite > 0) { dsize = towrite; @@ -1211,7 +1213,7 @@ int pa_stream_write_ext_free(pa_stream *s, s->buffer_offset += dsize; - if (s->buffer_offset >= s->buffer_size) { + if (s->buffer_offset >= s->buffer_size || towrite == dsize) { s->buffer->buffer->datas[0].chunk->offset = 0; s->buffer->buffer->datas[0].chunk->size = s->buffer_offset; queue_buffer(s); @@ -1229,8 +1231,8 @@ int pa_stream_write_ext_free(pa_stream *s, s->buffer->buffer->datas[0].chunk->size = nbytes; queue_buffer(s); } - - update_timing_info(s); + s->timing_info.write_index += nbytes; + pw_log_debug("stream %p: written %zd bytes", s, nbytes); return 0; } @@ -1334,7 +1336,6 @@ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *use pw_log_debug("stream %p", s); pw_stream_flush(s->stream, true); - update_timing_info(s); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); d = o->userdata; d->cb = cb; @@ -1541,6 +1542,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + pw_log_debug("stream %p: cork %d->%d", s, s->corked, b); s->corked = b; if (!b) pw_stream_set_active(s->stream, true); @@ -1566,11 +1568,11 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); pw_stream_flush(s->stream, false); - update_timing_info(s); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); d = o->userdata; d->cb = cb; d->userdata = userdata; + s->timing_info.write_index = s->timing_info.read_index; pa_operation_sync(o); return o; @@ -1663,13 +1665,17 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); + update_timing_info(s); clock_gettime(CLOCK_MONOTONIC, &ts); now = SPA_TIMESPEC_TO_USEC(&ts); i = &s->timing_info; - delay = now - SPA_TIMEVAL_TO_USEC(&i->timestamp); + if (s->have_time) + delay = now - SPA_TIMEVAL_TO_USEC(&i->timestamp); + else + delay = 0; + read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec); res = delay + read_time; @@ -1677,7 +1683,8 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) if (r_usec) *r_usec = res; - pw_log_trace("stream %p: %"PRIu64" %"PRIu64" %"PRIu64" %"PRIi64" %"PRIi64" %"PRIi64" %"PRIu64, + pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" read_time:%"PRIu64 + " write-index:%"PRIi64" read_index:%"PRIi64" diff:%"PRIi64" res:%"PRIu64, s, now, delay, read_time, i->write_index, i->read_index, i->write_index - i->read_index, @@ -1716,7 +1723,6 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); pa_stream_get_time(s, &t); @@ -1746,7 +1752,7 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA); + update_timing_info(s); pw_log_trace("stream %p: %"PRIi64" %"PRIi64" %"PRIi64, s, s->timing_info.write_index, s->timing_info.read_index,