milan-avb: AAF media-clock recovery estimator + play-loop actuator + io_changed, SET_CLOCK_SOURCE switch, skip CRF audio data plane

This commit is contained in:
hackerman-kl 2026-05-31 15:06:14 +02:00 committed by Wim Taymans
parent 66959ca678
commit 90114c9839
5 changed files with 465 additions and 0 deletions

View file

@ -141,6 +141,11 @@ int handle_cmd_set_clock_source_milan_v12(struct aecp *aecp, int64_t now,
/** Descriptor always keep the network endianness */
dclk_domain->clock_source_index = htons(clock_src_index);
/* milan-avb: apply the new selection to the data plane on the fly —
* (de)activate AAF media-clock recovery on the affected input streams. */
avb_stream_update_clock_source(server);
rc = reply_success(aecp, m, len);
if (rc) {
pw_log_error("Reply failed for set_clock_source\n");

View file

@ -0,0 +1,113 @@
/* AVB support */
/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */
/* SPDX-License-Identifier: MIT */
/*
* mc-recover.h AAF media-clock recovery estimator (listener side).
*
* Self-contained and pure (no PipeWire/stream deps) so it can be unit-tested
* in isolation. A second-order DLL (spa_dll) recovers the talker media rate
* from the AAF avtp_timestamp progression: each PDU carries a presentation
* time in the talker's gPTP domain, advancing by frames_per_pdu samples. The
* model clock advances by the DLL-corrected period; the phase error against the
* received timestamp drives the DLL. Recovered rate = nominal / corr.
*/
#ifndef AVB_MC_RECOVER_H
#define AVB_MC_RECOVER_H
#include <stdint.h>
#include <stdbool.h>
#include <spa/utils/dll.h>
struct mc_recover {
bool init;
struct spa_dll dll;
double corr; /* DLL output (period multiplier, ~1.0) */
double rate; /* recovered media rate, Hz */
int32_t last_err_ns; /* last phase error (model vs avtp_ts), ns */
uint64_t model_ns; /* model presentation clock (DLL-tracked) */
uint32_t last_avtp_ts; /* previous fed timestamp; model advances by actual PDU count */
uint64_t pdus; /* PDUs since prime */
};
static inline void mc_recover_reset(struct mc_recover *m, double nominal_rate)
{
m->init = false;
m->corr = 1.0;
m->rate = nominal_rate;
m->last_err_ns = 0;
m->model_ns = 0;
m->last_avtp_ts = 0;
m->pdus = 0;
}
/* Feed one PDU's presentation timestamp (low 32 bits of CLOCK_TAI ns). Returns
* the recovered media rate in Hz. nominal_rate/frames_per_pdu/pdu_period_ns
* describe the stream's nominal media clock. */
static inline double mc_recover_update(struct mc_recover *m, uint32_t avtp_ts,
int frames_per_pdu, int nominal_rate, int64_t pdu_period_ns)
{
int32_t err_ns;
double err_samples;
uint64_t step;
int32_t raw_delta;
int n_pdus;
if (!m->init) {
spa_dll_init(&m->dll);
spa_dll_set_bw(&m->dll, SPA_DLL_BW_MIN, frames_per_pdu, nominal_rate);
m->corr = 1.0;
m->rate = nominal_rate;
m->last_err_ns = 0;
m->model_ns = avtp_ts;
m->last_avtp_ts = avtp_ts;
m->pdus = 0;
m->init = true;
return m->rate;
}
/* Advance the model by the ACTUAL number of nominal PDUs elapsed since the
* last fed timestamp (avtp_ts delta rounded to pdu_period), then measure the
* phase error. Using the real PDU count (not a fixed one-per-call) keeps a
* non-1:1 feed dropped or coalesced PDUs from accumulating phase error
* and saturating the loop into a ±ppm hunt. err>0 = talker ahead of model;
* spa_dll returns corr<1, step grows, model catches up (negative feedback);
* recovered rate = nominal*corr. A large jump (>8 PDUs: stream gap, reorder,
* or the bind-transient seed) re-seeds the phase rather than slewing the
* deliberately-slow loop, which otherwise wedges it. */
raw_delta = (int32_t)(avtp_ts - m->last_avtp_ts);
m->last_avtp_ts = avtp_ts;
n_pdus = (int)((double)raw_delta / (double)pdu_period_ns + 0.5);
if (n_pdus < 1)
n_pdus = 1;
if (n_pdus > 8) {
m->model_ns = avtp_ts;
m->last_err_ns = 0;
m->pdus++;
return m->rate;
}
step = (uint64_t)((double)n_pdus * (double)pdu_period_ns / m->corr + 0.5);
m->model_ns += step;
err_ns = (int32_t)(avtp_ts - (uint32_t)m->model_ns);
m->last_err_ns = err_ns;
err_samples = (double)err_ns * (double)nominal_rate / 1e9;
/* bound the response to a single corrupt/late timestamp */
if (err_samples > 128.0)
err_samples = 128.0;
else if (err_samples < -128.0)
err_samples = -128.0;
m->corr = spa_dll_update(&m->dll, err_samples);
/* clamp to ±10 % — far beyond any real media clock; guards 1/corr */
if (m->corr < 0.9)
m->corr = 0.9;
else if (m->corr > 1.1)
m->corr = 1.1;
m->rate = (double)nominal_rate * m->corr;
m->pdus++;
return m->rate;
}
#endif /* AVB_MC_RECOVER_H */

View file

@ -0,0 +1,63 @@
/* AVB support */
/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */
/* SPDX-License-Identifier: MIT */
/*
* play-loop.h consume-side actuator for the listener.
*
* Pure (no PipeWire deps) so it can be unit-tested like mc-recover.h. Keeps the
* listener ring at a target fill by trimming the output resampler ratio
* (SPA_IO_RateMatch). Same loop as module-rtp's receiver:
* error = target - avail; corr = spa_dll_update(dll, error); rate = ff / corr
* ff = nominal/recovered rate feeds the recovered clock forward; the DLL trims
* the rest. The sign is the trap (same as the old mc_recover bug); test_play_loop
* locks it: converges on the right sign, diverges on the wrong one.
*/
#ifndef AVB_PLAY_LOOP_H
#define AVB_PLAY_LOOP_H
#include <stdbool.h>
#include <spa/utils/defs.h>
#include <spa/utils/dll.h>
struct play_loop {
bool init;
struct spa_dll dll;
double corr; /* DLL output, ~1.0 */
double rate; /* last applied resampler ratio (ff / corr) */
};
static inline void play_loop_reset(struct play_loop *p)
{
p->init = false;
p->corr = 1.0;
p->rate = 1.0;
}
/* One step. error_samples = target - avail; max_error clamps a transient;
* ff_ratio = nominal/recovered rate (1.0 = no feedforward). Returns the ratio
* to apply via pw_stream_set_rate(). */
static inline double play_loop_update(struct play_loop *p, double error_samples,
double max_error, double ff_ratio, unsigned period, unsigned rate)
{
if (!p->init) {
spa_dll_init(&p->dll);
spa_dll_set_bw(&p->dll, SPA_DLL_BW_MIN, period, rate);
p->corr = 1.0;
p->rate = ff_ratio;
p->init = true;
}
error_samples = SPA_CLAMPD(error_samples, -max_error, max_error);
p->corr = spa_dll_update(&p->dll, error_samples);
/* clamp ±10 %, guards 1/corr */
if (p->corr < 0.9)
p->corr = 0.9;
else if (p->corr > 1.1)
p->corr = 1.1;
p->rate = ff_ratio / p->corr;
return p->rate;
}
#endif /* AVB_PLAY_LOOP_H */

View file

@ -109,6 +109,7 @@
* --------------------------------------------------------------------------
*/
#include <stdlib.h>
#include <unistd.h>
#include <linux/if_ether.h>
#include <linux/if_packet.h>
@ -297,6 +298,81 @@ static void on_source_stream_process(void *data)
}
}
/* milan-avb: consume-side actuator. While AAF is the clock source and the
* adapter gave us a resampler, trim its ratio to hold the ring fill and feed
* the recovered media rate forward (play-loop.h). Off otherwise, so the
* default rate=1.0 path is untouched. */
if (stream->mc_aaf_active && stream->io_rate_match != NULL) {
uint32_t rate = stream->info.info.raw.rate;
int32_t avail_samples = avail / (int32_t)stream->stride;
uint32_t quantum = buf->requested ? (uint32_t)buf->requested :
(stream->io_position ? stream->io_position->clock.duration : 1024);
int32_t ring_samples = (int32_t)(stream->buffer_size / stream->stride);
/* Target ~½ quantum: that is where the ring sits on average, so it is
* reachable. A full quantum never is, so the error stays saturated and
* the DLL winds up (rate ramps without bound). */
int32_t target = (int32_t)(quantum / 2);
double max_error = 2.0 * rate / 1000.0; /* 2 ms, == module-rtp ERROR_MSEC */
double ff, error, r;
const char *env_target = getenv("MILAN_AVB_PLAY_TARGET");
if (env_target)
target = atoi(env_target);
if (target < (int32_t)(rate / 1000)) /* >= ~1 ms underrun margin */
target = (int32_t)(rate / 1000);
if (target > ring_samples / 2) /* keep well inside the ring */
target = ring_samples / 2;
stream->play_target = target;
ff = stream->mc.rate > 1.0 ? (double)rate / stream->mc.rate : 1.0;
error = (double)target - (double)avail_samples;
r = play_loop_update(&stream->play, error, max_error, ff, quantum, rate);
pw_stream_set_rate(stream->stream, r);
} else if (stream->play.init) {
/* clock source switched away from AAF: release the resampler so the
* graph free-runs at nominal again, and re-prime for next engage. */
pw_stream_set_rate(stream->stream, 1.0);
play_loop_reset(&stream->play);
}
/* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI
* via a monotonic/TAI offset) next to mc.rate and the actuator state. */
if (stream->mc_aaf_active || getenv("MILAN_AVB_PLAY_LOG")) {
struct timespec ts_mono;
uint64_t mono_ns;
clock_gettime(CLOCK_MONOTONIC, &ts_mono);
mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono);
if (!stream->play_primed ||
mono_ns - stream->play_log_last_ns >= SPA_NSEC_PER_SEC) {
struct pw_time pwt;
if (pw_stream_get_time_n(stream->stream, &pwt, sizeof(pwt)) == 0) {
uint64_t tai_ns, consume_tai;
/* milan-avb: gPTP time from the PHC so the consume clock
* stays in the gPTP domain even with NTP on the system clock. */
tai_ns = stream_gptp_now(stream->server);
consume_tai = (uint64_t)pwt.now + (tai_ns - mono_ns);
if (stream->play_primed) {
int64_t dticks = (int64_t)(pwt.ticks - stream->play_last_ticks);
int64_t dtai = (int64_t)(consume_tai - stream->play_last_consume_tai);
double local_rate = dtai > 0
? (double)dticks * 1e9 / (double)dtai : 0.0;
pw_log_info("milan-avb: play measure local_rate=%.4f Hz "
"mc.rate=%.4f corr=%.6f err_ns=%d ticks=%llu | "
"actuator rate=%.6f play_corr=%.6f target=%d avail=%d",
local_rate, stream->mc.rate, stream->mc.corr,
stream->mc.last_err_ns,
(unsigned long long)pwt.ticks,
stream->play.rate, stream->play.corr,
stream->play_target, avail / (int32_t)stream->stride);
}
stream->play_last_ticks = pwt.ticks;
stream->play_last_consume_tai = consume_tai;
stream->play_log_last_ns = mono_ns;
stream->play_primed = true;
}
}
}
/* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */
if (avail <= 0) {
memset(d[0].data, 0, n_bytes);
@ -327,9 +403,35 @@ static void on_source_stream_process(void *data)
pw_stream_queue_buffer(stream->stream, buf);
}
static void on_source_stream_io_changed(void *data, uint32_t id,
void *area, uint32_t size)
{
struct stream *stream = data;
const char *name;
switch (id) {
case SPA_IO_RateMatch:
stream->io_rate_match = area;
name = "RateMatch";
break;
case SPA_IO_Position:
stream->io_position = area;
name = "Position";
break;
case SPA_IO_Clock: name = "Clock"; break;
case SPA_IO_Buffers: name = "Buffers"; break;
default: name = "?"; break;
}
/* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator
* knob) on this source. */
pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u",
id, name, area, (unsigned)size);
}
static const struct pw_stream_events source_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = on_stream_destroy,
.io_changed = on_source_stream_io_changed,
.process = on_source_stream_process
};
@ -799,6 +901,11 @@ void stream_destroy(struct stream *stream)
avb_mrp_attribute_destroy(common->tastream_attr.mrp);
avb_mrp_attribute_destroy(common->tfstream_attr.mrp);
}
if (stream->raw_dump_fp) {
fclose(stream->raw_dump_fp);
stream->raw_dump_fp = NULL;
}
}
static int setup_socket(struct stream *stream)
@ -806,6 +913,96 @@ static int setup_socket(struct stream *stream)
return avb_server_stream_setup_socket(stream->server, stream);
}
/* milan-avb: media-clock recovery -------------------------------------------
*
* Returns the CLOCK_SOURCE descriptor currently selected by CLOCK_DOMAIN 0,
* or NULL. The selection is clock_source_index, set at boot (Internal = 0)
* and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */
static struct avb_aem_desc_clock_source *selected_clock_source(struct server *server)
{
struct descriptor *dom;
struct descriptor *src;
struct avb_aem_desc_clock_domain *d;
uint16_t idx;
dom = server_find_descriptor(server, AVB_AEM_DESC_CLOCK_DOMAIN, 0);
if (dom == NULL)
return NULL;
d = descriptor_body(dom);
idx = ntohs(d->clock_source_index);
src = server_find_descriptor(server, AVB_AEM_DESC_CLOCK_SOURCE, idx);
if (src == NULL)
return NULL;
return descriptor_body(src);
}
/* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose
* location points at this listener stream. CRF (MEDIA_CLOCK_STREAM) is out of
* scope and returns false. */
static bool stream_mc_aaf_selected(struct stream *stream)
{
struct avb_aem_desc_clock_source *cs;
if (stream->direction != SPA_DIRECTION_INPUT)
return false;
cs = selected_clock_source(stream->server);
if (cs == NULL)
return false;
if (ntohs(cs->clock_source_type) != AVB_AEM_DESC_CLOCK_SOURCE_TYPE_INPUT_STREAM)
return false;
if (ntohs(cs->clock_source_location_type) != AVB_AEM_DESC_STREAM_INPUT)
return false;
return ntohs(cs->clock_source_location_index) == stream->index;
}
static void stream_mc_reset(struct stream *stream)
{
mc_recover_reset(&stream->mc, stream->info.info.raw.rate);
play_loop_reset(&stream->play);
}
void avb_stream_update_clock_source(struct server *server)
{
struct stream *s;
spa_list_for_each(s, &server->streams, link) {
bool active;
if (s->direction != SPA_DIRECTION_INPUT)
continue;
active = stream_mc_aaf_selected(s);
if (active && !s->mc_aaf_active)
stream_mc_reset(s);
s->mc_aaf_active = active;
pw_log_info("milan-avb: stream %u media-clock source -> %s",
s->index, active ? "AAF (recovered)" : "internal/gPTP");
}
}
/* Recover the talker media rate from a PDU's avtp_timestamp. The timestamps
* carry the talker media clock in gPTP time; their inter-PDU deltas give its
* rate. A second-order DLL (spa_dll) tracks phase+frequency. Observe-only:
* drives mc_rate; consumption retiming is the next step. */
static void stream_mc_recover(struct stream *stream, const struct avb_packet_aaf *p)
{
uint32_t avtp_ts;
double rate;
if (!stream->mc_aaf_active || !p->tv)
return;
avtp_ts = ntohl(p->timestamp);
rate = mc_recover_update(&stream->mc, avtp_ts, stream->frames_per_pdu,
stream->info.info.raw.rate, stream->pdu_period);
if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1)
pw_log_info("milan-avb: mc-recovery stream=%u pdus=%llu avtp_ts=%u model_lo=%u nom=%u pdu_ns=%lld rate=%.4f corr=%.8f err_ns=%d ppm=%.3f",
stream->index, (unsigned long long)stream->mc.pdus, avtp_ts,
(uint32_t)stream->mc.model_ns, (unsigned)stream->info.info.raw.rate,
(long long)stream->pdu_period, rate, stream->mc.corr,
stream->mc.last_err_ns, (stream->mc.corr - 1.0) * 1e6);
}
/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback is interrupted by the loss of
* "several" AVTPDUs (the spec leaves the count implementation-defined). A
* single dropped/reordered PDU is a SEQ_NUM_MISMATCH but not a full
@ -887,6 +1084,10 @@ static void handle_aaf_packet(struct stream *stream,
stream->prev_seq = p->seq_num;
}
/* milan-avb: AAF media-clock recovery (active only when selected via the
* CLOCK_DOMAIN). Recovers the talker media rate from avtp_timestamps. */
stream_mc_recover(stream, p);
/* milan-avb: latency observability (throttled to 1 Hz, env-gated). */
if (getenv("MILAN_AVB_LATENCY_LOG")) {
static uint64_t last_log_ns = 0;
@ -915,6 +1116,26 @@ static void handle_aaf_packet(struct stream *stream,
index += n_bytes;
spa_ringbuffer_write_update(&stream->ring, index);
stream_in_mark_counters_dirty(stream);
/* milan-avb: env-gated raw PCM dump (S32BE interleaved) for offline THDN. */
{
const char *dump_dir = getenv("MILAN_AVB_RAW_DUMP_DIR");
if (dump_dir && stream->raw_dump_fp == NULL) {
char dpath[512];
snprintf(dpath, sizeof(dpath),
"%s/avb-stream-in-%u.s32be",
dump_dir, stream->index);
stream->raw_dump_fp = fopen(dpath, "wb");
if (stream->raw_dump_fp)
pw_log_info("milan-avb: dumping raw S32BE PCM to %s", dpath);
else
pw_log_warn("milan-avb: cannot open dump file %s: %m", dpath);
}
if (stream->raw_dump_fp) {
size_t w = fwrite(p->payload, 1, n_bytes, stream->raw_dump_fp);
stream->raw_dump_bytes += w;
}
}
}
static void handle_iec61883_packet(struct stream *stream,
@ -972,6 +1193,26 @@ static void handle_iec61883_packet(struct stream *stream,
index += n_bytes;
spa_ringbuffer_write_update(&stream->ring, index);
stream_in_mark_counters_dirty(stream);
/* milan-avb: env-gated raw PCM dump (S32BE interleaved) for offline THDN. */
{
const char *dump_dir = getenv("MILAN_AVB_RAW_DUMP_DIR");
if (dump_dir && stream->raw_dump_fp == NULL) {
char dpath[512];
snprintf(dpath, sizeof(dpath),
"%s/avb-stream-in-%u.s32be",
dump_dir, stream->index);
stream->raw_dump_fp = fopen(dpath, "wb");
if (stream->raw_dump_fp)
pw_log_info("milan-avb: dumping raw S32BE PCM to %s", dpath);
else
pw_log_warn("milan-avb: cannot open dump file %s: %m", dpath);
}
if (stream->raw_dump_fp) {
size_t w = fwrite(p->payload, 1, n_bytes, stream->raw_dump_fp);
stream->raw_dump_bytes += w;
}
}
}
static void on_socket_data(void *data, int fd, uint32_t mask)
@ -1102,6 +1343,12 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
pad_ringbuffer_with_silence(stream, (int)prefill_pdus);
}
/* milan-avb: pick up the current media-clock selection for this input
* (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */
stream->mc_aaf_active = stream_mc_aaf_selected(stream);
if (stream->mc_aaf_active)
stream_mc_reset(stream);
/* milan-avb: publish our contribution to graph latency so wpctl/pw-cli
* report it. Latency is the prefill: one PipeWire quantum at 48 kHz. */
{
@ -1222,6 +1469,7 @@ int stream_deactivate(struct stream *stream, uint64_t now)
if (si->mvrp_attr.mrp) {
avb_mrp_attribute_leave(si->mvrp_attr.mrp, now);
}
stream->mc_aaf_active = false;
} else {
avb_mrp_attribute_leave(common->tastream_attr.mrp, now);
}

View file

@ -12,6 +12,10 @@
#include <spa/utils/ringbuffer.h>
#include <spa/param/audio/format.h>
#include <spa/node/io.h>
#include "mc-recover.h"
#include "play-loop.h"
#include <pipewire/pipewire.h>
@ -38,6 +42,7 @@ struct stream {
struct spa_source *flush_timer;
uint64_t flush_last_ns;
bool is_crf;
uint64_t next_txtime;
int prio;
int mtt;
int t_uncertainty;
@ -66,6 +71,32 @@ struct stream {
uint64_t format;
uint32_t stride;
struct spa_audio_info info;
/* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only).
* Active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM)
* clock source whose location points at this stream. Estimator state in
* struct mc_recover (mc-recover.h); recovered from avtp_timestamp deltas. */
bool mc_aaf_active;
struct mc_recover mc;
/* milan-avb: actuator I/O areas (set via .io_changed). io_rate_match is the
* resampler knob NULL unless the adapter inserted a resampler. */
struct spa_io_rate_match *io_rate_match;
struct spa_io_position *io_position;
/* milan-avb: previous 1 Hz sample for the local consume-rate log. */
uint64_t play_last_consume_tai;
uint64_t play_last_ticks;
uint64_t play_log_last_ns;
bool play_primed;
/* milan-avb: actuator state; servos the ring to play_target (play-loop.h). */
struct play_loop play;
int32_t play_target;
/* milan-avb: optional raw PCM dump for offline analysis (THDN, waveform). */
FILE *raw_dump_fp;
size_t raw_dump_bytes;
};
#include "msrp.h"
@ -81,4 +112,9 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now);
int stream_deactivate(struct stream *stream, uint64_t now);
int stream_activate_virtual(struct stream *stream, uint16_t index);
/* milan-avb: re-evaluate each input stream's media-clock recovery against the
* current CLOCK_DOMAIN selection. Call after SET_CLOCK_SOURCE for on-the-fly
* clock-source switching. */
void avb_stream_update_clock_source(struct server *server);
#endif /* AVB_STREAM_H */