From 63df661eff4633988e1555828b62193b9f92baab Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Wed, 24 Sep 2025 20:18:56 +0200 Subject: [PATCH] module-rtp: Handle Latency and ProcessLatency in stream --- src/modules/module-raop-sink.c | 45 ++----------------- src/modules/module-rtp/stream.c | 80 ++++++++++++++++++++++++++++++++- src/modules/module-rtp/stream.h | 3 ++ 3 files changed, 86 insertions(+), 42 deletions(-) diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 6e9fbf8ab..5f464628c 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -38,7 +38,6 @@ #include #include #include -#include #include #include @@ -272,8 +271,6 @@ struct impl { bool mute; float volume; - struct spa_process_latency_info process_latency; - struct spa_ringbuffer ring; uint8_t buffer[BUFFER_SIZE]; @@ -852,30 +849,13 @@ static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) return (uint64_t) msec * impl->rate / 1000; } -static void update_latency(struct impl *impl) -{ - uint32_t n_params = 0; - const struct spa_pod *params[2]; - uint8_t buffer[1024]; - struct spa_pod_builder b; - struct spa_latency_info latency; - - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - - latency = SPA_LATENCY_INFO(PW_DIRECTION_INPUT); - - spa_process_latency_info_add(&impl->process_latency, &latency); - params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency); - params[n_params++] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &impl->process_latency); - rtp_stream_update_params(impl->stream, params, n_params); -} - static int rtsp_record_reply(void *data, int status, const struct spa_dict *headers, const struct pw_array *content) { struct impl *impl = data; const char *str; char progress[128]; struct timespec timeout, interval; + struct spa_process_latency_info process_latency; pw_log_info("record status: %d", status); switch (status) { @@ -903,9 +883,10 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head if (spa_atou32(str, &l, 0)) impl->latency = SPA_MAX(l, impl->latency); } - impl->process_latency.rate = impl->latency + msec_to_samples(impl, RAOP_LATENCY_MS); + spa_zero(process_latency); + process_latency.rate = impl->latency + msec_to_samples(impl, RAOP_LATENCY_MS); - update_latency(impl); + rtp_stream_update_process_latency(impl->stream, &process_latency); rtp_stream_set_first(impl->stream); @@ -1668,20 +1649,6 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp rtp_stream_set_param(impl->stream, id, param); } -static void param_process_latency_changed(struct impl *impl, const struct spa_pod *param) -{ - struct spa_process_latency_info info; - - if (param == NULL) - spa_zero(info); - else if (spa_process_latency_parse(param, &info) < 0) - return; - if (spa_process_latency_info_compare(&impl->process_latency, &info) == 0) - return; - impl->process_latency = info; - update_latency(impl); -} - static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) { struct impl *impl = data; @@ -1697,9 +1664,6 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * if (param != NULL) stream_props_changed(impl, id, param); break; - case SPA_PARAM_ProcessLatency: - param_process_latency_changed(impl, param); - break; default: break; } @@ -1990,7 +1954,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_error("can't create raop stream: %m"); goto error; } - update_latency(impl); impl->headers = pw_properties_new(NULL, NULL); diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index bb9da4995..35c6009f8 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -185,6 +186,9 @@ struct impl { /* And some bookkeping for the sender processing */ uint64_t rtp_base_ts; uint32_t rtp_last_ts; + + /* The process latency, set by on_stream_param_changed(). */ + struct spa_process_latency_info process_latency; }; /* Atomic internal_state accessors. @@ -482,10 +486,70 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, } } +static void update_latency_params(struct impl *impl) +{ + uint32_t n_params = 0; + const struct spa_pod *params[2]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + struct spa_latency_info main_latency; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + + /* main_latency is the latency in the direction indicated by impl->direction. + * In RTP streams, this consists solely of the process latency. (In theory, + * PipeWire SPA nodes could have additional latencies on top of the process + * latency, but this is not the case here.) The other direction is already + * handled by pw_stream. + * + * The main_latncy is passed as updated SPA_PARAM_Latency params to the stream. + * That way, the stream always gets information of latency for _both_ directions; + * the direction indicated by impl->direction is covered by main_latency, and + * the opposite direction is already taken care of by the default pw_stream + * param handling. + * + * The process latency is also passed on as an SPA_PARAM_ProcessLatency param. + */ + + main_latency = SPA_LATENCY_INFO(impl->direction); + spa_process_latency_info_add(&impl->process_latency, &main_latency); + + params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &main_latency); + params[n_params++] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, + &impl->process_latency); + + pw_stream_update_params(impl->stream, params, n_params); +} + +static void param_process_latency_changed(struct impl *impl, const struct spa_pod *param) +{ + struct spa_process_latency_info process_latency; + + if (param == NULL) + spa_zero(process_latency); + + else if (spa_process_latency_parse(param, &process_latency) < 0) + return; + if (spa_process_latency_info_compare(&impl->process_latency, &process_latency) == 0) + return; + + impl->process_latency = process_latency; + + update_latency_params(impl); +} + static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod *param) { struct impl *impl = d; - rtp_stream_emit_param_changed(impl, id, param); + + switch (id) { + case SPA_PARAM_ProcessLatency: + param_process_latency_changed(impl, param); + break; + default: + rtp_stream_emit_param_changed(impl, id, param); + break; + } }; static const struct pw_stream_events stream_events = { @@ -1007,3 +1071,17 @@ int rtp_stream_update_params(struct rtp_stream *s, struct impl *impl = (struct impl*)s; return pw_stream_update_params(impl->stream, params, n_params); } + +void rtp_stream_update_process_latency(struct rtp_stream *s, + const struct spa_process_latency_info *process_latency) +{ + struct impl *impl = (struct impl*)s; + + if (spa_process_latency_info_compare(&impl->process_latency, process_latency) == 0) + return; + + spa_memcpy(&(impl->process_latency), process_latency, + sizeof(const struct spa_process_latency_info)); + + update_latency_params(impl); +} diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 67908d706..ea358f350 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -82,6 +82,9 @@ int rtp_stream_update_params(struct rtp_stream *stream, const struct spa_pod **params, uint32_t n_params); +void rtp_stream_update_process_latency(struct rtp_stream *stream, + const struct spa_process_latency_info *process_latency); + #ifdef __cplusplus } #endif