diff --git a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c index b50bd8b5a..651b0ad1b 100644 --- a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c +++ b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c @@ -141,6 +141,11 @@ int handle_cmd_set_clock_source_milan_v12(struct aecp *aecp, int64_t now, /** Descriptor always keep the network endianness */ dclk_domain->clock_source_index = htons(clock_src_index); + + /* milan-avb: apply the new selection to the data plane on the fly — + * (de)activate AAF media-clock recovery on the affected input streams. */ + avb_stream_update_clock_source(server); + rc = reply_success(aecp, m, len); if (rc) { pw_log_error("Reply failed for set_clock_source\n"); diff --git a/src/modules/module-avb/mc-recover.h b/src/modules/module-avb/mc-recover.h new file mode 100644 index 000000000..1d0c856bb --- /dev/null +++ b/src/modules/module-avb/mc-recover.h @@ -0,0 +1,113 @@ +/* AVB support */ +/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */ +/* SPDX-License-Identifier: MIT */ + +/* + * mc-recover.h — AAF media-clock recovery estimator (listener side). + * + * Self-contained and pure (no PipeWire/stream deps) so it can be unit-tested + * in isolation. A second-order DLL (spa_dll) recovers the talker media rate + * from the AAF avtp_timestamp progression: each PDU carries a presentation + * time in the talker's gPTP domain, advancing by frames_per_pdu samples. The + * model clock advances by the DLL-corrected period; the phase error against the + * received timestamp drives the DLL. Recovered rate = nominal / corr. + */ + +#ifndef AVB_MC_RECOVER_H +#define AVB_MC_RECOVER_H + +#include +#include + +#include + +struct mc_recover { + bool init; + struct spa_dll dll; + double corr; /* DLL output (period multiplier, ~1.0) */ + double rate; /* recovered media rate, Hz */ + int32_t last_err_ns; /* last phase error (model vs avtp_ts), ns */ + uint64_t model_ns; /* model presentation clock (DLL-tracked) */ + uint32_t last_avtp_ts; /* previous fed timestamp; model advances by actual PDU count */ + uint64_t pdus; /* PDUs since prime */ +}; + +static inline void mc_recover_reset(struct mc_recover *m, double nominal_rate) +{ + m->init = false; + m->corr = 1.0; + m->rate = nominal_rate; + m->last_err_ns = 0; + m->model_ns = 0; + m->last_avtp_ts = 0; + m->pdus = 0; +} + +/* Feed one PDU's presentation timestamp (low 32 bits of CLOCK_TAI ns). Returns + * the recovered media rate in Hz. nominal_rate/frames_per_pdu/pdu_period_ns + * describe the stream's nominal media clock. */ +static inline double mc_recover_update(struct mc_recover *m, uint32_t avtp_ts, + int frames_per_pdu, int nominal_rate, int64_t pdu_period_ns) +{ + int32_t err_ns; + double err_samples; + uint64_t step; + int32_t raw_delta; + int n_pdus; + + if (!m->init) { + spa_dll_init(&m->dll); + spa_dll_set_bw(&m->dll, SPA_DLL_BW_MIN, frames_per_pdu, nominal_rate); + m->corr = 1.0; + m->rate = nominal_rate; + m->last_err_ns = 0; + m->model_ns = avtp_ts; + m->last_avtp_ts = avtp_ts; + m->pdus = 0; + m->init = true; + return m->rate; + } + + /* Advance the model by the ACTUAL number of nominal PDUs elapsed since the + * last fed timestamp (avtp_ts delta rounded to pdu_period), then measure the + * phase error. Using the real PDU count (not a fixed one-per-call) keeps a + * non-1:1 feed — dropped or coalesced PDUs — from accumulating phase error + * and saturating the loop into a ±ppm hunt. err>0 = talker ahead of model; + * spa_dll returns corr<1, step grows, model catches up (negative feedback); + * recovered rate = nominal*corr. A large jump (>8 PDUs: stream gap, reorder, + * or the bind-transient seed) re-seeds the phase rather than slewing the + * deliberately-slow loop, which otherwise wedges it. */ + raw_delta = (int32_t)(avtp_ts - m->last_avtp_ts); + m->last_avtp_ts = avtp_ts; + n_pdus = (int)((double)raw_delta / (double)pdu_period_ns + 0.5); + if (n_pdus < 1) + n_pdus = 1; + if (n_pdus > 8) { + m->model_ns = avtp_ts; + m->last_err_ns = 0; + m->pdus++; + return m->rate; + } + step = (uint64_t)((double)n_pdus * (double)pdu_period_ns / m->corr + 0.5); + m->model_ns += step; + err_ns = (int32_t)(avtp_ts - (uint32_t)m->model_ns); + m->last_err_ns = err_ns; + err_samples = (double)err_ns * (double)nominal_rate / 1e9; + /* bound the response to a single corrupt/late timestamp */ + if (err_samples > 128.0) + err_samples = 128.0; + else if (err_samples < -128.0) + err_samples = -128.0; + + m->corr = spa_dll_update(&m->dll, err_samples); + /* clamp to ±10 % — far beyond any real media clock; guards 1/corr */ + if (m->corr < 0.9) + m->corr = 0.9; + else if (m->corr > 1.1) + m->corr = 1.1; + m->rate = (double)nominal_rate * m->corr; + m->pdus++; + return m->rate; +} + +#endif /* AVB_MC_RECOVER_H */ diff --git a/src/modules/module-avb/play-loop.h b/src/modules/module-avb/play-loop.h new file mode 100644 index 000000000..8d9970ad6 --- /dev/null +++ b/src/modules/module-avb/play-loop.h @@ -0,0 +1,63 @@ +/* AVB support */ +/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */ +/* SPDX-License-Identifier: MIT */ + +/* + * play-loop.h — consume-side actuator for the listener. + * + * Pure (no PipeWire deps) so it can be unit-tested like mc-recover.h. Keeps the + * listener ring at a target fill by trimming the output resampler ratio + * (SPA_IO_RateMatch). Same loop as module-rtp's receiver: + * error = target - avail; corr = spa_dll_update(dll, error); rate = ff / corr + * ff = nominal/recovered rate feeds the recovered clock forward; the DLL trims + * the rest. The sign is the trap (same as the old mc_recover bug); test_play_loop + * locks it: converges on the right sign, diverges on the wrong one. + */ + +#ifndef AVB_PLAY_LOOP_H +#define AVB_PLAY_LOOP_H + +#include + +#include +#include + +struct play_loop { + bool init; + struct spa_dll dll; + double corr; /* DLL output, ~1.0 */ + double rate; /* last applied resampler ratio (ff / corr) */ +}; + +static inline void play_loop_reset(struct play_loop *p) +{ + p->init = false; + p->corr = 1.0; + p->rate = 1.0; +} + +/* One step. error_samples = target - avail; max_error clamps a transient; + * ff_ratio = nominal/recovered rate (1.0 = no feedforward). Returns the ratio + * to apply via pw_stream_set_rate(). */ +static inline double play_loop_update(struct play_loop *p, double error_samples, + double max_error, double ff_ratio, unsigned period, unsigned rate) +{ + if (!p->init) { + spa_dll_init(&p->dll); + spa_dll_set_bw(&p->dll, SPA_DLL_BW_MIN, period, rate); + p->corr = 1.0; + p->rate = ff_ratio; + p->init = true; + } + error_samples = SPA_CLAMPD(error_samples, -max_error, max_error); + p->corr = spa_dll_update(&p->dll, error_samples); + /* clamp ±10 %, guards 1/corr */ + if (p->corr < 0.9) + p->corr = 0.9; + else if (p->corr > 1.1) + p->corr = 1.1; + p->rate = ff_ratio / p->corr; + return p->rate; +} + +#endif /* AVB_PLAY_LOOP_H */ diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index ba73a014a..98701206e 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -109,6 +109,7 @@ * -------------------------------------------------------------------------- */ +#include #include #include #include @@ -297,6 +298,81 @@ static void on_source_stream_process(void *data) } } + /* milan-avb: consume-side actuator. While AAF is the clock source and the + * adapter gave us a resampler, trim its ratio to hold the ring fill and feed + * the recovered media rate forward (play-loop.h). Off otherwise, so the + * default rate=1.0 path is untouched. */ + if (stream->mc_aaf_active && stream->io_rate_match != NULL) { + uint32_t rate = stream->info.info.raw.rate; + int32_t avail_samples = avail / (int32_t)stream->stride; + uint32_t quantum = buf->requested ? (uint32_t)buf->requested : + (stream->io_position ? stream->io_position->clock.duration : 1024); + int32_t ring_samples = (int32_t)(stream->buffer_size / stream->stride); + /* Target ~½ quantum: that is where the ring sits on average, so it is + * reachable. A full quantum never is, so the error stays saturated and + * the DLL winds up (rate ramps without bound). */ + int32_t target = (int32_t)(quantum / 2); + double max_error = 2.0 * rate / 1000.0; /* 2 ms, == module-rtp ERROR_MSEC */ + double ff, error, r; + const char *env_target = getenv("MILAN_AVB_PLAY_TARGET"); + + if (env_target) + target = atoi(env_target); + if (target < (int32_t)(rate / 1000)) /* >= ~1 ms underrun margin */ + target = (int32_t)(rate / 1000); + if (target > ring_samples / 2) /* keep well inside the ring */ + target = ring_samples / 2; + stream->play_target = target; + + ff = stream->mc.rate > 1.0 ? (double)rate / stream->mc.rate : 1.0; + error = (double)target - (double)avail_samples; + r = play_loop_update(&stream->play, error, max_error, ff, quantum, rate); + pw_stream_set_rate(stream->stream, r); + } else if (stream->play.init) { + /* clock source switched away from AAF: release the resampler so the + * graph free-runs at nominal again, and re-prime for next engage. */ + pw_stream_set_rate(stream->stream, 1.0); + play_loop_reset(&stream->play); + } + + /* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI + * via a monotonic/TAI offset) next to mc.rate and the actuator state. */ + if (stream->mc_aaf_active || getenv("MILAN_AVB_PLAY_LOG")) { + struct timespec ts_mono; + uint64_t mono_ns; + clock_gettime(CLOCK_MONOTONIC, &ts_mono); + mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono); + if (!stream->play_primed || + mono_ns - stream->play_log_last_ns >= SPA_NSEC_PER_SEC) { + struct pw_time pwt; + if (pw_stream_get_time_n(stream->stream, &pwt, sizeof(pwt)) == 0) { + uint64_t tai_ns, consume_tai; + /* milan-avb: gPTP time from the PHC so the consume clock + * stays in the gPTP domain even with NTP on the system clock. */ + tai_ns = stream_gptp_now(stream->server); + consume_tai = (uint64_t)pwt.now + (tai_ns - mono_ns); + if (stream->play_primed) { + int64_t dticks = (int64_t)(pwt.ticks - stream->play_last_ticks); + int64_t dtai = (int64_t)(consume_tai - stream->play_last_consume_tai); + double local_rate = dtai > 0 + ? (double)dticks * 1e9 / (double)dtai : 0.0; + pw_log_info("milan-avb: play measure local_rate=%.4f Hz " + "mc.rate=%.4f corr=%.6f err_ns=%d ticks=%llu | " + "actuator rate=%.6f play_corr=%.6f target=%d avail=%d", + local_rate, stream->mc.rate, stream->mc.corr, + stream->mc.last_err_ns, + (unsigned long long)pwt.ticks, + stream->play.rate, stream->play.corr, + stream->play_target, avail / (int32_t)stream->stride); + } + stream->play_last_ticks = pwt.ticks; + stream->play_last_consume_tai = consume_tai; + stream->play_log_last_ns = mono_ns; + stream->play_primed = true; + } + } + } + /* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */ if (avail <= 0) { memset(d[0].data, 0, n_bytes); @@ -327,9 +403,35 @@ static void on_source_stream_process(void *data) pw_stream_queue_buffer(stream->stream, buf); } +static void on_source_stream_io_changed(void *data, uint32_t id, + void *area, uint32_t size) +{ + struct stream *stream = data; + const char *name; + + switch (id) { + case SPA_IO_RateMatch: + stream->io_rate_match = area; + name = "RateMatch"; + break; + case SPA_IO_Position: + stream->io_position = area; + name = "Position"; + break; + case SPA_IO_Clock: name = "Clock"; break; + case SPA_IO_Buffers: name = "Buffers"; break; + default: name = "?"; break; + } + /* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator + * knob) on this source. */ + pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u", + id, name, area, (unsigned)size); +} + static const struct pw_stream_events source_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, + .io_changed = on_source_stream_io_changed, .process = on_source_stream_process }; @@ -799,6 +901,11 @@ void stream_destroy(struct stream *stream) avb_mrp_attribute_destroy(common->tastream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); } + + if (stream->raw_dump_fp) { + fclose(stream->raw_dump_fp); + stream->raw_dump_fp = NULL; + } } static int setup_socket(struct stream *stream) @@ -806,6 +913,96 @@ static int setup_socket(struct stream *stream) return avb_server_stream_setup_socket(stream->server, stream); } +/* milan-avb: media-clock recovery ------------------------------------------- + * + * Returns the CLOCK_SOURCE descriptor currently selected by CLOCK_DOMAIN 0, + * or NULL. The selection is clock_source_index, set at boot (Internal = 0) + * and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */ +static struct avb_aem_desc_clock_source *selected_clock_source(struct server *server) +{ + struct descriptor *dom; + struct descriptor *src; + struct avb_aem_desc_clock_domain *d; + uint16_t idx; + + dom = server_find_descriptor(server, AVB_AEM_DESC_CLOCK_DOMAIN, 0); + if (dom == NULL) + return NULL; + d = descriptor_body(dom); + idx = ntohs(d->clock_source_index); + src = server_find_descriptor(server, AVB_AEM_DESC_CLOCK_SOURCE, idx); + if (src == NULL) + return NULL; + return descriptor_body(src); +} + +/* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose + * location points at this listener stream. CRF (MEDIA_CLOCK_STREAM) is out of + * scope and returns false. */ +static bool stream_mc_aaf_selected(struct stream *stream) +{ + struct avb_aem_desc_clock_source *cs; + + if (stream->direction != SPA_DIRECTION_INPUT) + return false; + cs = selected_clock_source(stream->server); + if (cs == NULL) + return false; + if (ntohs(cs->clock_source_type) != AVB_AEM_DESC_CLOCK_SOURCE_TYPE_INPUT_STREAM) + return false; + if (ntohs(cs->clock_source_location_type) != AVB_AEM_DESC_STREAM_INPUT) + return false; + return ntohs(cs->clock_source_location_index) == stream->index; +} + +static void stream_mc_reset(struct stream *stream) +{ + mc_recover_reset(&stream->mc, stream->info.info.raw.rate); + play_loop_reset(&stream->play); +} + +void avb_stream_update_clock_source(struct server *server) +{ + struct stream *s; + + spa_list_for_each(s, &server->streams, link) { + bool active; + + if (s->direction != SPA_DIRECTION_INPUT) + continue; + active = stream_mc_aaf_selected(s); + if (active && !s->mc_aaf_active) + stream_mc_reset(s); + s->mc_aaf_active = active; + pw_log_info("milan-avb: stream %u media-clock source -> %s", + s->index, active ? "AAF (recovered)" : "internal/gPTP"); + } +} + +/* Recover the talker media rate from a PDU's avtp_timestamp. The timestamps + * carry the talker media clock in gPTP time; their inter-PDU deltas give its + * rate. A second-order DLL (spa_dll) tracks phase+frequency. Observe-only: + * drives mc_rate; consumption retiming is the next step. */ +static void stream_mc_recover(struct stream *stream, const struct avb_packet_aaf *p) +{ + uint32_t avtp_ts; + double rate; + + if (!stream->mc_aaf_active || !p->tv) + return; + + avtp_ts = ntohl(p->timestamp); + rate = mc_recover_update(&stream->mc, avtp_ts, stream->frames_per_pdu, + stream->info.info.raw.rate, stream->pdu_period); + + if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1) + pw_log_info("milan-avb: mc-recovery stream=%u pdus=%llu avtp_ts=%u model_lo=%u nom=%u pdu_ns=%lld rate=%.4f corr=%.8f err_ns=%d ppm=%.3f", + stream->index, (unsigned long long)stream->mc.pdus, avtp_ts, + (uint32_t)stream->mc.model_ns, (unsigned)stream->info.info.raw.rate, + (long long)stream->pdu_period, rate, stream->mc.corr, + stream->mc.last_err_ns, (stream->mc.corr - 1.0) * 1e6); +} + /* Milan 5.4.5.3 STREAM_INTERRUPTED: playback is interrupted by the loss of * "several" AVTPDUs (the spec leaves the count implementation-defined). A * single dropped/reordered PDU is a SEQ_NUM_MISMATCH but not a full @@ -887,6 +1084,10 @@ static void handle_aaf_packet(struct stream *stream, stream->prev_seq = p->seq_num; } + /* milan-avb: AAF media-clock recovery (active only when selected via the + * CLOCK_DOMAIN). Recovers the talker media rate from avtp_timestamps. */ + stream_mc_recover(stream, p); + /* milan-avb: latency observability (throttled to 1 Hz, env-gated). */ if (getenv("MILAN_AVB_LATENCY_LOG")) { static uint64_t last_log_ns = 0; @@ -915,6 +1116,26 @@ static void handle_aaf_packet(struct stream *stream, index += n_bytes; spa_ringbuffer_write_update(&stream->ring, index); stream_in_mark_counters_dirty(stream); + + /* milan-avb: env-gated raw PCM dump (S32BE interleaved) for offline THDN. */ + { + const char *dump_dir = getenv("MILAN_AVB_RAW_DUMP_DIR"); + if (dump_dir && stream->raw_dump_fp == NULL) { + char dpath[512]; + snprintf(dpath, sizeof(dpath), + "%s/avb-stream-in-%u.s32be", + dump_dir, stream->index); + stream->raw_dump_fp = fopen(dpath, "wb"); + if (stream->raw_dump_fp) + pw_log_info("milan-avb: dumping raw S32BE PCM to %s", dpath); + else + pw_log_warn("milan-avb: cannot open dump file %s: %m", dpath); + } + if (stream->raw_dump_fp) { + size_t w = fwrite(p->payload, 1, n_bytes, stream->raw_dump_fp); + stream->raw_dump_bytes += w; + } + } } static void handle_iec61883_packet(struct stream *stream, @@ -972,6 +1193,26 @@ static void handle_iec61883_packet(struct stream *stream, index += n_bytes; spa_ringbuffer_write_update(&stream->ring, index); stream_in_mark_counters_dirty(stream); + + /* milan-avb: env-gated raw PCM dump (S32BE interleaved) for offline THDN. */ + { + const char *dump_dir = getenv("MILAN_AVB_RAW_DUMP_DIR"); + if (dump_dir && stream->raw_dump_fp == NULL) { + char dpath[512]; + snprintf(dpath, sizeof(dpath), + "%s/avb-stream-in-%u.s32be", + dump_dir, stream->index); + stream->raw_dump_fp = fopen(dpath, "wb"); + if (stream->raw_dump_fp) + pw_log_info("milan-avb: dumping raw S32BE PCM to %s", dpath); + else + pw_log_warn("milan-avb: cannot open dump file %s: %m", dpath); + } + if (stream->raw_dump_fp) { + size_t w = fwrite(p->payload, 1, n_bytes, stream->raw_dump_fp); + stream->raw_dump_bytes += w; + } + } } static void on_socket_data(void *data, int fd, uint32_t mask) @@ -1102,6 +1343,12 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) pad_ringbuffer_with_silence(stream, (int)prefill_pdus); } + /* milan-avb: pick up the current media-clock selection for this input + * (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */ + stream->mc_aaf_active = stream_mc_aaf_selected(stream); + if (stream->mc_aaf_active) + stream_mc_reset(stream); + /* milan-avb: publish our contribution to graph latency so wpctl/pw-cli * report it. Latency is the prefill: one PipeWire quantum at 48 kHz. */ { @@ -1222,6 +1469,7 @@ int stream_deactivate(struct stream *stream, uint64_t now) if (si->mvrp_attr.mrp) { avb_mrp_attribute_leave(si->mvrp_attr.mrp, now); } + stream->mc_aaf_active = false; } else { avb_mrp_attribute_leave(common->tastream_attr.mrp, now); } diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h index 7b3895f5a..a8d2bf2eb 100644 --- a/src/modules/module-avb/stream.h +++ b/src/modules/module-avb/stream.h @@ -12,6 +12,10 @@ #include #include +#include + +#include "mc-recover.h" +#include "play-loop.h" #include @@ -38,6 +42,7 @@ struct stream { struct spa_source *flush_timer; uint64_t flush_last_ns; bool is_crf; + uint64_t next_txtime; int prio; int mtt; int t_uncertainty; @@ -66,6 +71,32 @@ struct stream { uint64_t format; uint32_t stride; struct spa_audio_info info; + + /* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only). + * Active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM) + * clock source whose location points at this stream. Estimator state in + * struct mc_recover (mc-recover.h); recovered from avtp_timestamp deltas. */ + bool mc_aaf_active; + struct mc_recover mc; + + /* milan-avb: actuator I/O areas (set via .io_changed). io_rate_match is the + * resampler knob — NULL unless the adapter inserted a resampler. */ + struct spa_io_rate_match *io_rate_match; + struct spa_io_position *io_position; + + /* milan-avb: previous 1 Hz sample for the local consume-rate log. */ + uint64_t play_last_consume_tai; + uint64_t play_last_ticks; + uint64_t play_log_last_ns; + bool play_primed; + + /* milan-avb: actuator state; servos the ring to play_target (play-loop.h). */ + struct play_loop play; + int32_t play_target; + + /* milan-avb: optional raw PCM dump for offline analysis (THDN, waveform). */ + FILE *raw_dump_fp; + size_t raw_dump_bytes; }; #include "msrp.h" @@ -81,4 +112,9 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now); int stream_deactivate(struct stream *stream, uint64_t now); int stream_activate_virtual(struct stream *stream, uint16_t index); +/* milan-avb: re-evaluate each input stream's media-clock recovery against the + * current CLOCK_DOMAIN selection. Call after SET_CLOCK_SOURCE for on-the-fly + * clock-source switching. */ +void avb_stream_update_clock_source(struct server *server); + #endif /* AVB_STREAM_H */