diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index d22d41152..7c4a90b9e 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -126,7 +126,8 @@ struct stream { #define IDX_IO 2 #define IDX_Format 3 #define IDX_Buffers 4 -#define N_PORT_PARAMS 5 +#define IDX_Latency 5 +#define N_PORT_PARAMS 6 struct spa_param_info port_params[N_PORT_PARAMS]; struct spa_list param_list; @@ -151,6 +152,8 @@ struct stream { struct pw_time time; uint64_t base_pos; uint32_t clock_id; + struct spa_latency_info latency; + uint64_t quantum; unsigned int disconnecting:1; unsigned int disconnect_core:1; @@ -184,6 +187,8 @@ static int get_port_param_index(uint32_t id) return IDX_Format; case SPA_PARAM_Buffers: return IDX_Buffers; + case SPA_PARAM_Latency: + return IDX_Latency; default: return -1; } @@ -693,6 +698,24 @@ static void clear_buffers(struct pw_stream *stream) clear_queue(impl, &impl->queued); } +static int parse_latency(struct pw_stream *stream, const struct spa_pod *param) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + struct spa_latency_info latency; + int res; + + if (param == NULL) + return 0; + + if ((res = spa_latency_parse(param, &latency)) < 0) + return res; + if (latency.direction == impl->direction) + return 0; + + impl->latency = latency; + return 0; +} + static int impl_port_set_param(void *object, enum spa_direction direction, uint32_t port_id, uint32_t id, uint32_t flags, @@ -714,8 +737,16 @@ static int impl_port_set_param(void *object, if ((res = update_params(impl, id, ¶m, param ? 1 : 0)) < 0) return res; - if (id == SPA_PARAM_Format) + switch (id) { + case SPA_PARAM_Format: clear_buffers(stream); + break; + case SPA_PARAM_Latency: + parse_latency(stream, param); + break; + default: + break; + } pw_stream_emit_param_changed(stream, id, param); @@ -819,8 +850,9 @@ static inline void copy_position(struct stream *impl, int64_t queued) impl->clock_id = p->clock.id; } impl->time.ticks = p->clock.position - impl->base_pos; - impl->time.delay = p->clock.delay; + impl->time.delay = 0; impl->time.queued = queued; + impl->quantum = p->clock.duration; SEQ_WRITE(impl->seq); } } @@ -1610,6 +1642,7 @@ pw_stream_connect(struct pw_stream *stream, impl->port_params[IDX_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, 0); impl->port_params[IDX_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); impl->port_params[IDX_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + impl->port_params[IDX_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, 0); impl->port_info.props = &impl->port_props->dict; impl->port_info.params = impl->port_params; impl->port_info.n_params = N_PORT_PARAMS; @@ -1926,6 +1959,10 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) else time->queued = (int64_t)(impl->queued.incount - time->queued); + time->delay += ((impl->latency.min_quantum + impl->latency.max_quantum) / 2) * impl->quantum; + time->delay += (impl->latency.min_rate + impl->latency.max_rate) / 2; + time->delay += ((impl->latency.min_ns + impl->latency.max_ns) / 2) * time->rate.denom / SPA_NSEC_PER_SEC; + pw_log_trace(NAME" %p: %"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %" PRIu64" %"PRIu64" %"PRIu64" %"PRIu64, stream, time->now, time->delay, time->ticks,