module-rtp: Handle Latency and ProcessLatency in stream

This commit is contained in:
Carlos Rafael Giani 2025-09-24 20:18:56 +02:00
parent f1e1f720bf
commit 63df661eff
3 changed files with 86 additions and 42 deletions

View file

@ -38,7 +38,6 @@
#include <spa/pod/builder.h>
#include <spa/param/audio/format-utils.h>
#include <spa/param/audio/raw.h>
#include <spa/param/latency-utils.h>
#include <pipewire/impl.h>
#include <pipewire/i18n.h>
@ -272,8 +271,6 @@ struct impl {
bool mute;
float volume;
struct spa_process_latency_info process_latency;
struct spa_ringbuffer ring;
uint8_t buffer[BUFFER_SIZE];
@ -852,30 +849,13 @@ static uint32_t msec_to_samples(struct impl *impl, uint32_t msec)
return (uint64_t) msec * impl->rate / 1000;
}
static void update_latency(struct impl *impl)
{
uint32_t n_params = 0;
const struct spa_pod *params[2];
uint8_t buffer[1024];
struct spa_pod_builder b;
struct spa_latency_info latency;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
latency = SPA_LATENCY_INFO(PW_DIRECTION_INPUT);
spa_process_latency_info_add(&impl->process_latency, &latency);
params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
params[n_params++] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &impl->process_latency);
rtp_stream_update_params(impl->stream, params, n_params);
}
static int rtsp_record_reply(void *data, int status, const struct spa_dict *headers, const struct pw_array *content)
{
struct impl *impl = data;
const char *str;
char progress[128];
struct timespec timeout, interval;
struct spa_process_latency_info process_latency;
pw_log_info("record status: %d", status);
switch (status) {
@ -903,9 +883,10 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head
if (spa_atou32(str, &l, 0))
impl->latency = SPA_MAX(l, impl->latency);
}
impl->process_latency.rate = impl->latency + msec_to_samples(impl, RAOP_LATENCY_MS);
spa_zero(process_latency);
process_latency.rate = impl->latency + msec_to_samples(impl, RAOP_LATENCY_MS);
update_latency(impl);
rtp_stream_update_process_latency(impl->stream, &process_latency);
rtp_stream_set_first(impl->stream);
@ -1668,20 +1649,6 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp
rtp_stream_set_param(impl->stream, id, param);
}
static void param_process_latency_changed(struct impl *impl, const struct spa_pod *param)
{
struct spa_process_latency_info info;
if (param == NULL)
spa_zero(info);
else if (spa_process_latency_parse(param, &info) < 0)
return;
if (spa_process_latency_info_compare(&impl->process_latency, &info) == 0)
return;
impl->process_latency = info;
update_latency(impl);
}
static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param)
{
struct impl *impl = data;
@ -1697,9 +1664,6 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
if (param != NULL)
stream_props_changed(impl, id, param);
break;
case SPA_PARAM_ProcessLatency:
param_process_latency_changed(impl, param);
break;
default:
break;
}
@ -1990,7 +1954,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_log_error("can't create raop stream: %m");
goto error;
}
update_latency(impl);
impl->headers = pw_properties_new(NULL, NULL);

View file

@ -14,6 +14,7 @@
#include <spa/utils/dll.h>
#include <spa/param/audio/format-utils.h>
#include <spa/param/audio/raw-json.h>
#include <spa/param/latency-utils.h>
#include <spa/control/control.h>
#include <spa/control/ump-utils.h>
#include <spa/debug/types.h>
@ -185,6 +186,9 @@ struct impl {
/* And some bookkeping for the sender processing */
uint64_t rtp_base_ts;
uint32_t rtp_last_ts;
/* The process latency, set by on_stream_param_changed(). */
struct spa_process_latency_info process_latency;
};
/* Atomic internal_state accessors.
@ -482,10 +486,70 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old,
}
}
static void update_latency_params(struct impl *impl)
{
uint32_t n_params = 0;
const struct spa_pod *params[2];
uint8_t buffer[1024];
struct spa_pod_builder b;
struct spa_latency_info main_latency;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
/* main_latency is the latency in the direction indicated by impl->direction.
* In RTP streams, this consists solely of the process latency. (In theory,
* PipeWire SPA nodes could have additional latencies on top of the process
* latency, but this is not the case here.) The other direction is already
* handled by pw_stream.
*
* The main_latncy is passed as updated SPA_PARAM_Latency params to the stream.
* That way, the stream always gets information of latency for _both_ directions;
* the direction indicated by impl->direction is covered by main_latency, and
* the opposite direction is already taken care of by the default pw_stream
* param handling.
*
* The process latency is also passed on as an SPA_PARAM_ProcessLatency param.
*/
main_latency = SPA_LATENCY_INFO(impl->direction);
spa_process_latency_info_add(&impl->process_latency, &main_latency);
params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &main_latency);
params[n_params++] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency,
&impl->process_latency);
pw_stream_update_params(impl->stream, params, n_params);
}
static void param_process_latency_changed(struct impl *impl, const struct spa_pod *param)
{
struct spa_process_latency_info process_latency;
if (param == NULL)
spa_zero(process_latency);
else if (spa_process_latency_parse(param, &process_latency) < 0)
return;
if (spa_process_latency_info_compare(&impl->process_latency, &process_latency) == 0)
return;
impl->process_latency = process_latency;
update_latency_params(impl);
}
static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod *param)
{
struct impl *impl = d;
rtp_stream_emit_param_changed(impl, id, param);
switch (id) {
case SPA_PARAM_ProcessLatency:
param_process_latency_changed(impl, param);
break;
default:
rtp_stream_emit_param_changed(impl, id, param);
break;
}
};
static const struct pw_stream_events stream_events = {
@ -1007,3 +1071,17 @@ int rtp_stream_update_params(struct rtp_stream *s,
struct impl *impl = (struct impl*)s;
return pw_stream_update_params(impl->stream, params, n_params);
}
void rtp_stream_update_process_latency(struct rtp_stream *s,
const struct spa_process_latency_info *process_latency)
{
struct impl *impl = (struct impl*)s;
if (spa_process_latency_info_compare(&impl->process_latency, process_latency) == 0)
return;
spa_memcpy(&(impl->process_latency), process_latency,
sizeof(const struct spa_process_latency_info));
update_latency_params(impl);
}

View file

@ -82,6 +82,9 @@ int rtp_stream_update_params(struct rtp_stream *stream,
const struct spa_pod **params,
uint32_t n_params);
void rtp_stream_update_process_latency(struct rtp_stream *stream,
const struct spa_process_latency_info *process_latency);
#ifdef __cplusplus
}
#endif