diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 70f13eaf0..a90e01087 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -116,6 +116,8 @@ struct stream { struct spa_io_clock *clock; struct spa_io_position *position; struct spa_io_buffers *io; + struct spa_io_rate_match *rate_match; + uint32_t rate_queued; struct { struct spa_io_position *position; } rt; @@ -669,6 +671,12 @@ static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t else impl->io = NULL; break; + case SPA_IO_RateMatch: + if (data && size >= sizeof(struct spa_io_rate_match)) + impl->rate_match = data; + else + impl->rate_match = NULL; + break; } pw_stream_emit_io_changed(stream, id, data, size); @@ -917,8 +925,9 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe static inline void copy_position(struct stream *impl, int64_t queued) { struct spa_io_position *p = impl->rt.position; - if (SPA_UNLIKELY(p != NULL)) { - SEQ_WRITE(impl->seq); + + SEQ_WRITE(impl->seq); + if (SPA_LIKELY(p != NULL)) { impl->time.now = p->clock.nsec; impl->time.rate = p->clock.rate; if (SPA_UNLIKELY(impl->clock_id != p->clock.id)) { @@ -929,8 +938,10 @@ static inline void copy_position(struct stream *impl, int64_t queued) impl->time.delay = 0; impl->time.queued = queued; impl->quantum = p->clock.duration; - SEQ_WRITE(impl->seq); } + if (SPA_LIKELY(impl->rate_match != NULL)) + impl->rate_queued = impl->rate_match->delay; + SEQ_WRITE(impl->seq); } static int impl_node_process_input(void *object) @@ -2061,10 +2072,12 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); uintptr_t seq1, seq2; + uint32_t rate_queued; do { seq1 = SEQ_READ(impl->seq); *time = impl->time; + rate_queued = impl->rate_queued; seq2 = SEQ_READ(impl->seq); } while (!SEQ_READ_SUCCESS(seq1, seq2)); @@ -2073,6 +2086,8 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) else time->queued = (int64_t)(impl->queued.incount - time->queued); + time->queued += rate_queued; + time->delay += ((impl->latency.min_quantum + impl->latency.max_quantum) / 2) * impl->quantum; time->delay += (impl->latency.min_rate + impl->latency.max_rate) / 2; time->delay += ((impl->latency.min_ns + impl->latency.max_ns) / 2) * time->rate.denom / SPA_NSEC_PER_SEC; @@ -2109,7 +2124,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) errno = -res; return NULL; } - pw_log_trace("%p: dequeue buffer %d", stream, b->id); + pw_log_trace("%p: dequeue buffer %d size:%"PRIu64, stream, b->id, b->this.size); if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) { if (ATOMIC_INC(b->busy->count) > 1) {