milan-avb: read gPTP PHC time for talker/listener via NIC PHC mapped onto CLOCK_MONOTONIC_RAW, decoupled from system clock

This commit is contained in:
hackerman-kl 2026-05-31 15:05:51 +02:00 committed by Wim Taymans
parent afc7724070
commit 66959ca678
3 changed files with 212 additions and 13 deletions

View file

@ -0,0 +1,132 @@
/* AVB support */
/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */
/* SPDX-License-Identifier: MIT */
/* gPTP time read from the NIC PHC (dynamic POSIX clock) mapped onto CLOCK_MONOTONIC_RAW,
* decoupled from the system wall clock so it stays free for NTP. */
#ifndef AVB_GPTP_CLOCK_H
#define AVB_GPTP_CLOCK_H
#include <stdint.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <net/if.h>
#include <linux/ethtool.h>
#include <linux/sockios.h>
#include <spa/utils/defs.h>
#define AVB_CLOCKFD 3
#define AVB_FD_TO_CLOCKID(fd) ((~(clockid_t)(fd) << 3) | AVB_CLOCKFD)
#define AVB_GPTP_REFRESH_NS (10 * SPA_NSEC_PER_MSEC) /* re-anchor phase/freq ~100 Hz */
#define AVB_GPTP_READ_BRACKET_NS (50 * SPA_NSEC_PER_USEC) /* reject a jittered PHC read */
struct avb_gptp_clock {
int phc_fd;
clockid_t phc_id;
bool ok;
uint64_t base_mono; /* CLOCK_MONOTONIC_RAW ns at last anchor */
uint64_t base_gptp; /* PHC ns at last anchor */
double ratio; /* d(phc)/d(mono) ~ 1.0 (the frequency offset) */
uint64_t last_refresh_mono;
};
/* Resolve ifname -> PHC index via ETHTOOL_GET_TS_INFO, open /dev/ptpN. >=0 = phc_index. */
static inline int avb_gptp_clock_open(struct avb_gptp_clock *c, const char *ifname)
{
struct ethtool_ts_info tsi;
struct ifreq ifr;
char path[32];
int sock;
memset(c, 0, sizeof(*c));
c->phc_fd = -1;
c->ratio = 1.0;
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
return -1;
}
memset(&tsi, 0, sizeof(tsi));
tsi.cmd = ETHTOOL_GET_TS_INFO;
memset(&ifr, 0, sizeof(ifr));
snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", ifname);
ifr.ifr_data = (void *)&tsi;
if (ioctl(sock, SIOCETHTOOL, &ifr) < 0) {
close(sock);
return -1;
}
close(sock);
if (tsi.phc_index < 0) {
return -1;
}
snprintf(path, sizeof(path), "/dev/ptp%d", tsi.phc_index);
c->phc_fd = open(path, O_RDONLY);
if (c->phc_fd < 0) {
return -1;
}
c->phc_id = AVB_FD_TO_CLOCKID(c->phc_fd);
c->ok = true;
return tsi.phc_index;
}
/* Re-anchor (mono,gptp) and update the frequency ratio. Off the hot loop (~100 Hz). */
static inline void avb_gptp_clock_refresh(struct avb_gptp_clock *c)
{
struct timespec m1, p, m2;
uint64_t mono, gptp;
double r;
if (!c->ok) {
return;
}
if (clock_gettime(CLOCK_MONOTONIC_RAW, &m1) < 0 ||
clock_gettime(c->phc_id, &p) < 0 ||
clock_gettime(CLOCK_MONOTONIC_RAW, &m2) < 0) {
return;
}
if (SPA_TIMESPEC_TO_NSEC(&m2) - SPA_TIMESPEC_TO_NSEC(&m1) > AVB_GPTP_READ_BRACKET_NS) {
return;
}
mono = (SPA_TIMESPEC_TO_NSEC(&m1) + SPA_TIMESPEC_TO_NSEC(&m2)) / 2;
gptp = SPA_TIMESPEC_TO_NSEC(&p);
if (c->base_mono != 0 && mono > c->base_mono) {
r = (double)(gptp - c->base_gptp) / (double)(mono - c->base_mono);
if (r > 0.999 && r < 1.001) {
c->ratio += 0.10 * (r - c->ratio);
}
}
c->base_mono = mono;
c->base_gptp = gptp;
c->last_refresh_mono = mono;
}
/* gPTP time now, in ns; cheap monotonic read + phase/freq map. 0 if no PHC (caller falls back). */
static inline uint64_t avb_gptp_now(struct avb_gptp_clock *c)
{
struct timespec ts;
uint64_t mono;
if (!c->ok) {
return 0;
}
clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
mono = SPA_TIMESPEC_TO_NSEC(&ts);
if (c->base_mono == 0 || mono - c->last_refresh_mono > AVB_GPTP_REFRESH_NS) {
avb_gptp_clock_refresh(c);
clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
mono = SPA_TIMESPEC_TO_NSEC(&ts);
}
if (c->base_mono == 0) {
return 0;
}
return c->base_gptp + (uint64_t)((double)(mono - c->base_mono) * c->ratio);
}
#endif /* AVB_GPTP_CLOCK_H */

View file

@ -9,6 +9,8 @@
#include <pipewire/pipewire.h> #include <pipewire/pipewire.h>
#include "gptp-clock.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -104,6 +106,11 @@ struct server {
uint64_t entity_id; uint64_t entity_id;
int ifindex; int ifindex;
/* milan-avb: gPTP time read from the NIC PHC (server->ifname), decoupled from the
* system clock. Lazily opened on first use; gclock_tried guards the one-shot open. */
struct avb_gptp_clock gclock;
unsigned gclock_tried:1;
const struct avb_transport_ops *transport; const struct avb_transport_ops *transport;
void *transport_data; void *transport_data;

View file

@ -39,7 +39,7 @@
* absent. * absent.
* *
* So an output stream owns its own periodic timer (`flush_timer`, * So an output stream owns its own periodic timer (`flush_timer`,
* AVB_FLUSH_TICK_NS = 1 ms = 8 PDUs). Each tick: * AVB_FLUSH_TICK_NS = 125 us = one PDU at 48 kHz/6-frame). Each tick:
* *
* 1. computes how many PDUs are owed since the last drain * 1. computes how many PDUs are owed since the last drain
* (`(now - flush_last_ns) / pdu_period`), * (`(now - flush_last_ns) / pdu_period`),
@ -161,9 +161,9 @@ static inline void stream_out_mark_counters_dirty(struct stream *s)
so->counters_dirty = true; so->counters_dirty = true;
} }
#define AVB_FLUSH_TICK_NS ((uint64_t)1000000) #define AVB_FLUSH_TICK_NS ((uint64_t)(125 * SPA_NSEC_PER_USEC))
static int flush_write_milan_v12(struct stream *stream, uint64_t current_time); static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, int max_pdus);
static int flush_write_legacy(struct stream *stream, uint64_t current_time); static int flush_write_legacy(struct stream *stream, uint64_t current_time);
static void on_stream_destroy(void *d) static void on_stream_destroy(void *d)
@ -210,19 +210,33 @@ static void pad_ringbuffer_with_silence(struct stream *stream, int owed)
spa_ringbuffer_write_update(&stream->ring, index + (uint32_t)deficit); spa_ringbuffer_write_update(&stream->ring, index + (uint32_t)deficit);
} }
/* milan-avb: gPTP time (ns) from the NIC PHC of server->ifname; 0 if no PHC. See gptp-clock.h. */
static uint64_t stream_gptp_now(struct server *server)
{
if (!server->gclock.ok && !server->gclock_tried) {
server->gclock_tried = 1;
if (avb_gptp_clock_open(&server->gclock, server->ifname) >= 0) {
pw_log_info("milan-avb: gptp clock = PHC of %s", server->ifname);
} else {
pw_log_warn("milan-avb: no PHC for %s", server->ifname);
}
}
return avb_gptp_now(&server->gclock);
}
static void on_flush_tick(void *data, uint64_t expirations) static void on_flush_tick(void *data, uint64_t expirations)
{ {
struct stream *stream = data; struct stream *stream = data;
struct server *server = stream->server; struct server *server = stream->server;
struct timespec now_ts;
uint64_t now_ns; uint64_t now_ns;
int owed; int owed;
(void)expirations; (void)expirations;
if (clock_gettime(CLOCK_TAI, &now_ts) < 0) now_ns = stream_gptp_now(server);
if (now_ns == 0) {
return; return;
now_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); }
if (stream->flush_last_ns == 0) { if (stream->flush_last_ns == 0) {
stream->flush_last_ns = now_ns; stream->flush_last_ns = now_ns;
@ -239,7 +253,7 @@ static void on_flush_tick(void *data, uint64_t expirations)
pad_ringbuffer_with_silence(stream, owed); pad_ringbuffer_with_silence(stream, owed);
if (server->avb_mode == AVB_MODE_MILAN_V12) if (server->avb_mode == AVB_MODE_MILAN_V12)
flush_write_milan_v12(stream, now_ns); flush_write_milan_v12(stream, now_ns, owed);
else else
flush_write_legacy(stream, now_ns); flush_write_legacy(stream, now_ns);
} }
@ -265,6 +279,24 @@ static void on_source_stream_process(void *data)
avail = spa_ringbuffer_get_read_index(&stream->ring, &index); avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
/* milan-avb: latency observability (throttled, env-gated). */
if (getenv("MILAN_AVB_LATENCY_LOG")) {
static uint64_t last_log_ns = 0;
struct timespec ts_mono;
uint64_t now_mono_ns;
clock_gettime(CLOCK_MONOTONIC, &ts_mono);
now_mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono);
if (now_mono_ns - last_log_ns >= SPA_NSEC_PER_SEC) {
uint64_t residency_ns = stream->stride > 0
? (uint64_t)avail * SPA_NSEC_PER_SEC
/ ((uint64_t)stream->stride * (uint64_t)stream->info.info.raw.rate)
: 0;
pw_log_info("milan-avb: lat C residency_bytes=%d residency_ns=%llu wanted=%u",
avail, (unsigned long long)residency_ns, (unsigned)n_bytes);
last_log_ns = now_mono_ns;
}
}
/* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */ /* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */
if (avail <= 0) { if (avail <= 0) {
memset(d[0].data, 0, n_bytes); memset(d[0].data, 0, n_bytes);
@ -311,7 +343,7 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
iov[1].iov_base = buffer; iov[1].iov_base = buffer;
} }
static int flush_write_milan_v12(struct stream *stream, uint64_t current_time) static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, int max_pdus)
{ {
int32_t avail; int32_t avail;
uint32_t index; uint32_t index;
@ -324,6 +356,10 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time)
avail = spa_ringbuffer_get_read_index(&stream->ring, &index); avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
pdu_count = (avail / stream->stride) / stream->frames_per_pdu; pdu_count = (avail / stream->stride) / stream->frames_per_pdu;
/* Pace to real time: only drain what is due this tick, so the ETF
* launch schedule cannot run ahead and overflow the qdisc backlog. */
if (pdu_count > max_pdus)
pdu_count = max_pdus;
txtime = current_time + stream->t_uncertainty; txtime = current_time + stream->t_uncertainty;
ptime = txtime + stream->mtt; ptime = txtime + stream->mtt;
@ -567,7 +603,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
/* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is /* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is
* the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe * the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe
* default until we have a way to measure it from gPTP. */ * default until we have a way to measure it from gPTP. */
stream->t_uncertainty = 125000; stream->t_uncertainty = 0;
stream->id = (uint64_t)server->mac_addr[0] << 56 | stream->id = (uint64_t)server->mac_addr[0] << 56 |
(uint64_t)server->mac_addr[1] << 48 | (uint64_t)server->mac_addr[1] << 48 |
@ -743,13 +779,11 @@ error_free:
void stream_destroy(struct stream *stream) void stream_destroy(struct stream *stream)
{ {
struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream);
struct timespec now_ts; uint64_t now;
uint64_t now = 0;
/* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart /* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart
* or replug doesn't strand a stale reservation on the bridge (socket still open here). */ * or replug doesn't strand a stale reservation on the bridge (socket still open here). */
if (clock_gettime(CLOCK_TAI, &now_ts) == 0) now = stream_gptp_now(stream->server);
now = SPA_TIMESPEC_TO_NSEC(&now_ts);
stream_deactivate(stream, now); stream_deactivate(stream, now);
if (stream->direction == SPA_DIRECTION_INPUT) { if (stream->direction == SPA_DIRECTION_INPUT) {
@ -853,6 +887,19 @@ static void handle_aaf_packet(struct stream *stream,
stream->prev_seq = p->seq_num; stream->prev_seq = p->seq_num;
} }
/* milan-avb: latency observability (throttled to 1 Hz, env-gated). */
if (getenv("MILAN_AVB_LATENCY_LOG")) {
static uint64_t last_log_ns = 0;
uint64_t now_tai_ns = stream_gptp_now(stream->server);
if (now_tai_ns - last_log_ns >= SPA_NSEC_PER_SEC) {
uint32_t avtp_ts = ntohl(p->timestamp);
int32_t talker_to_recv_ns = (int32_t)((uint32_t)now_tai_ns - avtp_ts);
pw_log_info("milan-avb: lat A+B seq=%u avtp_ts=%u talker_to_recv_ns=%d",
(unsigned)p->seq_num, avtp_ts, talker_to_recv_ns);
last_log_ns = now_tai_ns;
}
}
if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) {
/* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */ /* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */
uint32_t r_index; uint32_t r_index;
@ -897,6 +944,19 @@ static void handle_iec61883_packet(struct stream *stream,
si->media_locked_state = true; si->media_locked_state = true;
} }
/* milan-avb: latency observability (throttled to 1 Hz, env-gated). */
if (getenv("MILAN_AVB_LATENCY_LOG")) {
static uint64_t last_log_ns = 0;
uint64_t now_tai_ns = stream_gptp_now(stream->server);
if (now_tai_ns - last_log_ns >= SPA_NSEC_PER_SEC) {
uint32_t avtp_ts = ntohl(p->timestamp);
int32_t talker_to_recv_ns = (int32_t)((uint32_t)now_tai_ns - avtp_ts);
pw_log_info("milan-avb: lat A+B seq=%u avtp_ts=%u talker_to_recv_ns=%d",
(unsigned)p->seq_num, avtp_ts, talker_to_recv_ns);
last_log_ns = now_tai_ns;
}
}
if (filled + n_bytes > stream->buffer_size) { if (filled + n_bytes > stream->buffer_size) {
/* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */ /* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */
uint32_t r_index; uint32_t r_index;