milan-avb: bit-perfect AAF audio path — avb.sink/avb.source self-driven drivers (drive_timer + trigger_process), 4-quanta ring (no per-quantum overwrite), rate_diff=1.0 coherent talker drive (no pw-cat resample), monotonic PHC-anchored AVTP timestamps, listener sample-lock driven at recovered mc.rate

This commit is contained in:
hackerman-kl 2026-06-01 20:06:07 +00:00 committed by Wim Taymans
parent 5b8fa0a9b6
commit 2bcec2f3f0
2 changed files with 263 additions and 150 deletions

View file

@ -94,9 +94,10 @@
* TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header * TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header
* UNSUPPORTED_FORMAT live: handle_aaf_packet drops + ticks per PDU any AAF PDU * UNSUPPORTED_FORMAT live: handle_aaf_packet drops + ticks per PDU any AAF PDU
* whose format != the Stream Input current format * whose format != the Stream Input current format
* LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now * LATE_TIMESTAMP TODO: tick when p->timestamp < stream_gptp_now() (the PHC,
* NOT CLOCK_TAI no phc2sys, so the system clock is not gPTP)
* (frame missed its presentation deadline) * (frame missed its presentation deadline)
* EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time * EARLY_TIMESTAMP TODO: tick when p->timestamp > stream_gptp_now() (PHC) + max_transit_time
* (frame arrived too far ahead of its deadline) * (frame arrived too far ahead of its deadline)
* Table 5.17 Stream Output: * Table 5.17 Stream Output:
* FRAMES_TX live: per send in flush_write_milan_v12 / _legacy * FRAMES_TX live: per send in flush_write_milan_v12 / _legacy
@ -235,10 +236,7 @@ static void on_flush_tick(void *data, uint64_t expirations)
(void)expirations; (void)expirations;
/* Pace the send rate off CLOCK_MONOTONIC (a stable local 1x clock); use the gPTP /* Pace the flush drain off CLOCK_MONOTONIC, the SAME clock as the graph-fill driver (the drive_timer runs on a CLOCK_MONOTONIC timerfd, which cannot be _RAW) so producer and consumer of the ring stay rate-matched; gPTP is used only for the AVTP timestamp. Pacing on _RAW here decouples drain from fill -> ring drift -> glitch noise (measured -55dB vs -99dB THD+N). The independent gPTP/PHC reference still uses CLOCK_MONOTONIC_RAW (gptp-clock.h). */
* clock only for the AVTP presentation timestamp. Pacing must not ride the absolute
* PHC interpolation, whose steps during gPTP re-convergence burst the talker past
* its SRP reservation and get the stream policed away by the bridge. */
clock_gettime(CLOCK_MONOTONIC, &ts); clock_gettime(CLOCK_MONOTONIC, &ts);
now_mono = SPA_TIMESPEC_TO_NSEC(&ts); now_mono = SPA_TIMESPEC_TO_NSEC(&ts);
now_gptp = stream_gptp_now(server); now_gptp = stream_gptp_now(server);
@ -267,9 +265,7 @@ static void on_flush_tick(void *data, uint64_t expirations)
} }
} }
/* Talker egress pacing runs on the RT data loop (impl->data_loop). A source /* Talker egress pacing runs on the RT data loop (impl->data_loop); a source cannot be added/removed off-thread, so the flush timer is created and destroyed ON the RT thread via pw_loop_invoke. */
* cannot be added to or removed from a running loop off-thread, so the flush
* timer is created and destroyed ON the RT thread via pw_loop_invoke. */
static int do_add_flush_timer(struct spa_loop *loop, bool async, uint32_t seq, static int do_add_flush_timer(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data) const void *data, size_t size, void *user_data)
{ {
@ -343,30 +339,28 @@ static void on_source_stream_process(void *data)
} }
} }
/* milan-avb: consume-side actuator. While AAF is the clock source and the /* milan-avb: consume-side actuator, FOLLOWER path only; when avb.source DRIVES the graph at the recovered mc.rate there is no resampler on its output, so we deliver the ring samples 1:1 (bit-perfect) and must NOT trim a ratio. */
* adapter gave us a resampler, trim its ratio to hold the ring fill and feed if (!stream->driving && stream->mc_aaf_active && stream->io_rate_match != NULL) {
* the recovered media rate forward (play-loop.h). Off otherwise, so the
* default rate=1.0 path is untouched. */
if (stream->mc_aaf_active && stream->io_rate_match != NULL) {
uint32_t rate = stream->info.info.raw.rate; uint32_t rate = stream->info.info.raw.rate;
int32_t avail_samples = avail / (int32_t)stream->stride; int32_t avail_samples = avail / (int32_t)stream->stride;
uint32_t quantum = buf->requested ? (uint32_t)buf->requested : uint32_t quantum = buf->requested ? (uint32_t)buf->requested :
(stream->io_position ? stream->io_position->clock.duration : 1024); (stream->io_position ? stream->io_position->clock.duration : 1024);
int32_t ring_samples = (int32_t)(stream->buffer_size / stream->stride); int32_t ring_samples = (int32_t)(stream->buffer_size / stream->stride);
/* Target ~½ quantum: that is where the ring sits on average, so it is /* Target ~½ quantum: where the ring sits on average so it is reachable; a full quantum never is, so the error saturates and the DLL winds up. */
* reachable. A full quantum never is, so the error stays saturated and
* the DLL winds up (rate ramps without bound). */
int32_t target = (int32_t)(quantum / 2); int32_t target = (int32_t)(quantum / 2);
double max_error = 2.0 * rate / 1000.0; /* 2 ms, == module-rtp ERROR_MSEC */ double max_error = 2.0 * rate / 1000.0; /* 2 ms, == module-rtp ERROR_MSEC */
double ff, error, r; double ff, error, r;
const char *env_target = getenv("MILAN_AVB_PLAY_TARGET"); const char *env_target = getenv("MILAN_AVB_PLAY_TARGET");
if (env_target) if (env_target) {
target = atoi(env_target); target = atoi(env_target);
if (target < (int32_t)(rate / 1000)) /* >= ~1 ms underrun margin */ }
if (target < (int32_t)(rate / 1000)) { /* >= ~1 ms underrun margin */
target = (int32_t)(rate / 1000); target = (int32_t)(rate / 1000);
if (target > ring_samples / 2) /* keep well inside the ring */ }
if (target > ring_samples / 2) { /* keep well inside the ring */
target = ring_samples / 2; target = ring_samples / 2;
}
stream->play_target = target; stream->play_target = target;
ff = stream->mc.rate > 1.0 ? (double)rate / stream->mc.rate : 1.0; ff = stream->mc.rate > 1.0 ? (double)rate / stream->mc.rate : 1.0;
@ -374,17 +368,16 @@ static void on_source_stream_process(void *data)
r = play_loop_update(&stream->play, error, max_error, ff, quantum, rate); r = play_loop_update(&stream->play, error, max_error, ff, quantum, rate);
pw_stream_set_rate(stream->stream, r); pw_stream_set_rate(stream->stream, r);
} else if (stream->play.init) { } else if (stream->play.init) {
/* clock source switched away from AAF: release the resampler so the /* clock source switched away from AAF: release the resampler so the graph free-runs at nominal again, and re-prime for next engage. */
* graph free-runs at nominal again, and re-prime for next engage. */
pw_stream_set_rate(stream->stream, 1.0); pw_stream_set_rate(stream->stream, 1.0);
play_loop_reset(&stream->play); play_loop_reset(&stream->play);
} }
/* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI /* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI) next to mc.rate and the actuator state. */
* via a monotonic/TAI offset) next to mc.rate and the actuator state. */
if (stream->mc_aaf_active || getenv("MILAN_AVB_PLAY_LOG")) { if (stream->mc_aaf_active || getenv("MILAN_AVB_PLAY_LOG")) {
struct timespec ts_mono; struct timespec ts_mono;
uint64_t mono_ns; uint64_t mono_ns;
/* CLOCK_MONOTONIC (NOT _RAW): mono_ns is offset against pwt.now below, which PipeWire reports in the CLOCK_MONOTONIC domain — they must match. */
clock_gettime(CLOCK_MONOTONIC, &ts_mono); clock_gettime(CLOCK_MONOTONIC, &ts_mono);
mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono); mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono);
if (!stream->play_primed || if (!stream->play_primed ||
@ -392,8 +385,7 @@ static void on_source_stream_process(void *data)
struct pw_time pwt; struct pw_time pwt;
if (pw_stream_get_time_n(stream->stream, &pwt, sizeof(pwt)) == 0) { if (pw_stream_get_time_n(stream->stream, &pwt, sizeof(pwt)) == 0) {
uint64_t tai_ns, consume_tai; uint64_t tai_ns, consume_tai;
/* milan-avb: gPTP time from the PHC so the consume clock /* milan-avb: gPTP time from the PHC so the consume clock stays in the gPTP domain even with NTP on the system clock. */
* stays in the gPTP domain even with NTP on the system clock. */
tai_ns = stream_gptp_now(stream->server); tai_ns = stream_gptp_now(stream->server);
consume_tai = (uint64_t)pwt.now + (tai_ns - mono_ns); consume_tai = (uint64_t)pwt.now + (tai_ns - mono_ns);
if (stream->play_primed) { if (stream->play_primed) {
@ -467,15 +459,19 @@ static void on_source_stream_io_changed(void *data, uint32_t id,
case SPA_IO_Buffers: name = "Buffers"; break; case SPA_IO_Buffers: name = "Buffers"; break;
default: name = "?"; break; default: name = "?"; break;
} }
/* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator /* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator knob) on this source. */
* knob) on this source. */
pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u", pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u",
id, name, area, (unsigned)size); id, name, area, (unsigned)size);
} }
/* generic: arms the self-driving timer on STREAMING (defined below, used by both source and sink stream-event tables). */
static void on_sink_stream_state_changed(void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error);
static const struct pw_stream_events source_stream_events = { static const struct pw_stream_events source_stream_events = {
PW_VERSION_STREAM_EVENTS, PW_VERSION_STREAM_EVENTS,
.destroy = on_stream_destroy, .destroy = on_stream_destroy,
.state_changed = on_sink_stream_state_changed,
.io_changed = on_source_stream_io_changed, .io_changed = on_source_stream_io_changed,
.process = on_source_stream_process .process = on_source_stream_process
}; };
@ -494,22 +490,35 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, i
{ {
int32_t avail; int32_t avail;
uint32_t index; uint32_t index;
uint64_t ptime, txtime; uint64_t ptime;
int pdu_count; int pdu_count;
ssize_t n; ssize_t n;
struct avb_frame_header *h = (void*)stream->pdu; struct avb_frame_header *h = (void*)stream->pdu;
struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void);
uint64_t base;
int64_t err;
avail = spa_ringbuffer_get_read_index(&stream->ring, &index); avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
pdu_count = (avail / stream->stride) / stream->frames_per_pdu; pdu_count = (avail / stream->stride) / stream->frames_per_pdu;
/* Pace to real time: only drain what is due this tick, so the ETF /* Pace to real time: drain only what is due this tick so the ETF launch schedule cannot run ahead and overflow the qdisc backlog. */
* launch schedule cannot run ahead and overflow the qdisc backlog. */ if (pdu_count > max_pdus) {
if (pdu_count > max_pdus)
pdu_count = max_pdus; pdu_count = max_pdus;
}
txtime = current_time + stream->t_uncertainty; /* M2: monotonic AVTP timestamps anchored to the PHC; advance by pdu_period per PDU and slow-leak (err/1024) toward the live PHC so the rate reflects the real gPTP media clock without per-tick interpolation jitter (audible FM at the listener); re-anchor hard on a >1s gap (gPTP re-converge). */
ptime = txtime + stream->mtt; base = current_time + stream->t_uncertainty + stream->mtt;
if (stream->tx_pts == 0) {
stream->tx_pts = base;
} else {
err = (int64_t)(base - stream->tx_pts);
if (err > (int64_t)SPA_NSEC_PER_SEC || err < -(int64_t)SPA_NSEC_PER_SEC) {
stream->tx_pts = base;
} else {
stream->tx_pts += err / 1024;
}
}
ptime = stream->tx_pts;
while (pdu_count--) { while (pdu_count--) {
/* CBS-exclusive: no SCM_TXTIME; txtime feeds ptime only */ /* CBS-exclusive: no SCM_TXTIME; txtime feeds ptime only */
@ -526,15 +535,17 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, i
n = avb_server_stream_send(stream->server, stream, n = avb_server_stream_send(stream->server, stream,
&stream->msg, MSG_NOSIGNAL); &stream->msg, MSG_NOSIGNAL);
if (n < 0 || n != (ssize_t)stream->pdu_size) if (n < 0 || n != (ssize_t)stream->pdu_size) {
pw_log_error("stream send failed %zd != %zd: %m", pw_log_error("stream send failed %zd != %zd: %m",
n, stream->pdu_size); n, stream->pdu_size);
else } else {
stream_out_counters(stream)->frame_tx++; stream_out_counters(stream)->frame_tx++;
txtime += stream->pdu_period; }
ptime += stream->pdu_period; ptime += stream->pdu_period;
index += stream->payload_size; index += stream->payload_size;
} }
/* M2: keep the accumulator monotonic across ticks (advance by emitted PDUs). */
stream->tx_pts = ptime;
stream_out_mark_counters_dirty(stream); stream_out_mark_counters_dirty(stream);
spa_ringbuffer_read_update(&stream->ring, index); spa_ringbuffer_read_update(&stream->ring, index);
@ -716,17 +727,144 @@ static int setup_msg(struct stream *stream)
stream->msg.msg_namelen = sizeof(stream->sock_addr); stream->msg.msg_namelen = sizeof(stream->sock_addr);
stream->msg.msg_iov = stream->iov; stream->msg.msg_iov = stream->iov;
stream->msg.msg_iovlen = 3; stream->msg.msg_iovlen = 3;
/* CBS/Qav-exclusive: no SCM_TXTIME control message -- CBS and SO_TXTIME /* CBS/Qav-exclusive: no SCM_TXTIME control message -- CBS and SO_TXTIME cannot coexist; the egress CBS qdisc paces the stream. */
* cannot coexist; the egress CBS qdisc paces the stream. */
stream->msg.msg_control = NULL; stream->msg.msg_control = NULL;
stream->msg.msg_controllen = 0; stream->msg.msg_controllen = 0;
stream->cmsg = NULL; stream->cmsg = NULL;
return 0; return 0;
} }
/* milan-avb: arm the self-driving one-shot timer at absolute time `when` (ns on CLOCK_MONOTONIC); when==0 disarms; runs on the RT data loop. */
static void set_drive_timeout(struct stream *stream, uint64_t when)
{
struct timespec ts;
struct timespec interval = { 0, 0 };
if (stream->drive_timer == NULL) {
return;
}
ts.tv_sec = (time_t)(when / SPA_NSEC_PER_SEC);
ts.tv_nsec = (long)(when % SPA_NSEC_PER_SEC);
pw_loop_update_timer(stream->server->impl->data_loop,
stream->drive_timer, &ts, &interval, true);
}
/* milan-avb: graph driver tick (pipe-tunnel pattern); fires once per quantum, fills io_position->clock so the core schedules followers against our clock, re-arms the next tick, then triggers the cycle exactly once from the data loop (never re-entrantly from process()). */
static void on_drive_timeout(void *data, uint64_t expirations)
{
struct stream *stream = data;
struct spa_io_position *pos = stream->io_position;
struct timespec ts;
uint64_t duration = 1024, mono_now, nominal_ns;
uint32_t rate = 48000;
uint64_t phc_now;
uint64_t this_time;
double nom;
(void)expirations;
if (!stream->driving) {
return;
}
if (pos != NULL) {
if (pos->clock.target_duration != 0) {
duration = pos->clock.target_duration;
}
if (pos->clock.target_rate.denom != 0) {
rate = pos->clock.target_rate.denom;
}
}
clock_gettime(CLOCK_MONOTONIC, &ts);
mono_now = SPA_TIMESPEC_TO_NSEC(&ts);
nominal_ns = duration * SPA_NSEC_PER_SEC / rate;
/* LISTENER (avb.source): pace at the RECOVERED talker rate (mc.rate from mc_recover) so the ring drain rate == the AAF arrival rate and process() delivers samples 1:1 with no resampling (bit-perfect, sample-locked). */
if (stream->direction == SPA_DIRECTION_INPUT &&
stream->mc_aaf_active && stream->mc.rate > 1.0) {
nominal_ns = (uint64_t)((double)duration * (double)SPA_NSEC_PER_SEC
/ stream->mc.rate);
}
/* TALKER (sink): pace at the EXACT nominal rate so the exported clock has rate_diff==1.0 CONSTANT; a varying rate_diff makes pw-cat's adapter resample (FM baked into the wire), 1.0 gives adapter passthrough = bit-perfect, and the listener recovers the rate from timestamp arrival. */
phc_now = stream_gptp_now(stream->server);
(void)phc_now;
stream->drive_phc_last = phc_now;
stream->drive_mono_last = mono_now;
/* Export the SMOOTH scheduled time (not the jittery wake-up mono_now) so the follower resampler sees an evenly-paced clock; rate_diff=nom/nominal keeps nsec/next_nsec/duration/rate_diff self-consistent (pipe-tunnel sets corr, not 1.0). */
this_time = stream->drive_next_time;
nom = (double)duration * (double)SPA_NSEC_PER_SEC / (double)rate;
stream->drive_next_time += nominal_ns;
if (pos != NULL) {
pos->clock.nsec = this_time;
pos->clock.rate = pos->clock.target_rate;
pos->clock.position += pos->clock.duration;
pos->clock.duration = pos->clock.target_duration;
pos->clock.delay = 0;
pos->clock.rate_diff = nominal_ns > 0 ? nom / (double)nominal_ns : 1.0;
pos->clock.next_nsec = stream->drive_next_time;
}
set_drive_timeout(stream, stream->drive_next_time);
pw_stream_trigger_process(stream->stream);
}
/* milan-avb: avb.sink/avb.source is created as a DRIVER; when it reaches STREAMING and the core elected it (pw_stream_is_driving), start the self-driving timer. */
static void on_sink_stream_state_changed(void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct stream *stream = data;
struct timespec ts;
(void)old; (void)error;
switch (state) {
case PW_STREAM_STATE_STREAMING:
stream->driving = pw_stream_is_driving(stream->stream);
pw_log_info("milan-avb: avb.sink STREAMING driving=%d", stream->driving);
if (stream->driving) {
clock_gettime(CLOCK_MONOTONIC, &ts);
stream->drive_next_time = SPA_TIMESPEC_TO_NSEC(&ts);
stream->drive_phc_last = 0;
stream->drive_mono_last = 0;
stream->drive_ratio_ema = 0.0;
set_drive_timeout(stream, stream->drive_next_time);
}
break;
case PW_STREAM_STATE_PAUSED:
case PW_STREAM_STATE_ERROR:
case PW_STREAM_STATE_UNCONNECTED:
stream->driving = false;
set_drive_timeout(stream, 0);
break;
default:
break;
}
}
/* milan-avb: capture the driver clock/position areas the core hands the driver node. */
static void on_sink_stream_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct stream *stream = data;
(void)size;
switch (id) {
case SPA_IO_Position:
stream->io_position = area;
break;
case SPA_IO_RateMatch:
stream->io_rate_match = area;
break;
default:
break;
}
}
static const struct pw_stream_events sink_stream_events = { static const struct pw_stream_events sink_stream_events = {
PW_VERSION_STREAM_EVENTS, PW_VERSION_STREAM_EVENTS,
.destroy = on_stream_destroy, .destroy = on_stream_destroy,
.state_changed = on_sink_stream_state_changed,
.io_changed = on_sink_stream_io_changed,
.process = on_sink_stream_process .process = on_sink_stream_process
}; };
@ -746,9 +884,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->prio = AVB_MSRP_PRIORITY_DEFAULT;
stream->vlan_id = AVB_DEFAULT_VLAN; stream->vlan_id = AVB_DEFAULT_VLAN;
stream->mtt = 2000000; stream->mtt = 2000000;
/* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is /* TX timestamp jitter budget added on top of the gPTP (PHC) time; 125 µs is the upper bound at 1 GbE class-A per IEEE 802.1Qav, safe default until we measure it from gPTP. */
* the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe
* default until we have a way to measure it from gPTP. */
stream->t_uncertainty = 0; stream->t_uncertainty = 0;
stream->id = (uint64_t)server->mac_addr[0] << 56 | stream->id = (uint64_t)server->mac_addr[0] << 56 |
@ -771,7 +907,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
PW_KEY_MEDIA_CLASS, "Audio/Source", PW_KEY_MEDIA_CLASS, "Audio/Source",
PW_KEY_NODE_NAME, "avb.source", PW_KEY_NODE_NAME, "avb.source",
PW_KEY_NODE_DESCRIPTION, "AVB Source", PW_KEY_NODE_DESCRIPTION, "AVB Source",
PW_KEY_NODE_WANT_DRIVER, "true", /* milan-avb: avb.source IS the listener's media clock; it drives the graph at the recovered talker rate (mc.rate) so consumers run sample-locked (no resampling, bit-perfect); NODE_DRIVER + high priority elects it over the fallback Dummy-Driver. */
PW_KEY_NODE_DRIVER, "true",
PW_KEY_PRIORITY_DRIVER, "300000",
NULL)); NULL));
} else { } else {
stream->stream = pw_stream_new(server->impl->core, "sink", stream->stream = pw_stream_new(server->impl->core, "sink",
@ -779,7 +917,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
PW_KEY_MEDIA_CLASS, "Audio/Sink", PW_KEY_MEDIA_CLASS, "Audio/Sink",
PW_KEY_NODE_NAME, "avb.sink", PW_KEY_NODE_NAME, "avb.sink",
PW_KEY_NODE_DESCRIPTION, "AVB Sink", PW_KEY_NODE_DESCRIPTION, "AVB Sink",
PW_KEY_NODE_WANT_DRIVER, "true", /* milan-avb: avb.sink IS the graph driver (self-clocked off the AVTP/PHC rate), not a follower; NODE_DRIVER + high PRIORITY_DRIVER elect it over the fallback Dummy-Driver (priority 200000) so pw-cat clocks to us. */
PW_KEY_NODE_DRIVER, "true",
PW_KEY_PRIORITY_DRIVER, "300000",
NULL)); NULL));
} }
@ -825,10 +965,21 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
PW_ID_ANY, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_INACTIVE | PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_RT_PROCESS, PW_STREAM_FLAG_RT_PROCESS |
/* milan-avb: both directions drive the graph themselves (talker off its media clock, listener off the recovered AAF clock), staying INACTIVE until a Milan ACMP/MSRP connection activates them. */
PW_STREAM_FLAG_DRIVER,
params, n_params)) < 0) params, n_params)) < 0)
goto error_free_stream; goto error_free_stream;
/* milan-avb: the self-driving timer lives on the RT data loop and is armed once the stream reaches STREAMING (state_changed); both directions drive (talker off its media clock, listener off the recovered AAF clock). */
if (!stream->is_crf) {
stream->drive_timer = pw_loop_add_timer(server->impl->data_loop,
on_drive_timeout, stream);
if (stream->drive_timer == NULL) {
pw_log_warn("avb stream: no drive_timer; core will pick a driver");
}
}
stream->frames_per_pdu = 6; stream->frames_per_pdu = 6;
stream->pdu_period = SPA_NSEC_PER_SEC * stream->frames_per_pdu / stream->pdu_period = SPA_NSEC_PER_SEC * stream->frames_per_pdu /
stream->info.info.raw.rate; stream->info.info.raw.rate;
@ -862,15 +1013,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
goto error_free; goto error_free;
} }
/* Milan Section 5.3.8.8 / Section 5.4.2.10.1.1: a Listener observes foreign /* Milan Section 5.3.8.8 / 5.4.2.10.1.1: a Listener observes foreign Talker Advertise PDUs matching the bound talker's stream_id; create the registrar attribute now (stream_id set later at BIND_RX, cleared at UNBIND_RX) and start its FSM without a join (observer, not declarant); once a matching TA arrives msrp.c populates attr.talker (accumulated_latency, dest_addr, vlan_id), moves the registrar to IN, and the Listener answers GET_STREAM_INFO with the real msrp_accumulated_latency. */
* Talker Advertise PDUs matching the bound talker's stream_id.
* Create the registrar attribute now (stream_id is set later at
* BIND_RX, cleared at UNBIND_RX) and start its FSM without a
* join we are an observer, not a declarant. Once a matching TA
* arrives from the wire, msrp.c populates attr.talker
* (accumulated_latency, dest_addr, vlan_id) and moves the
* registrar to IN. The Listener side reads those fields to
* answer GET_STREAM_INFO with real msrp_accumulated_latency. */
res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr, res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr,
AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE);
if (res) { if (res) {
@ -891,25 +1034,19 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
goto error_free; goto error_free;
} }
/* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker /* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker stream_id so foreign Listener declarations from peers reach it via process_listener and are observed through notify_listener (sets listener_observed on stream_output_state). */
* stream_id so foreign Listener declarations from peers are
* delivered to it via process_listener and observed through
* notify_listener (sets listener_observed on stream_output_state). */
common->lstream_attr.attr.listener.stream_id = htobe64(stream->id); common->lstream_attr.attr.listener.stream_id = htobe64(stream->id);
common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id); common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id);
if (server->avb_mode == AVB_MODE_MILAN_V12) /* Milan v1.2 Section 4.3.3.2 Table 4.4: MaxFrameSize is the AVTPDU (header + payload) ONLY plus 1 byte for PAAD sampling-clock drift; the Ethernet header and FCS are added by the bandwidth rule (F = MaxFrameSize + 22), so exclude our avb_frame_header (the L2 header) from pdu_size. */
/* Milan v1.2 Section 4.3.3.2 Table 4.4: MaxFrameSize is the AVTPDU if (server->avb_mode == AVB_MODE_MILAN_V12) {
* (header + payload) ONLY, plus 1 byte to account for the PAAD
* sampling clock possibly running slightly fast. The Ethernet header
* and FCS are added separately by the bandwidth rule (F = MaxFrameSize
* + 22), so exclude our avb_frame_header (the L2 header) from pdu_size. */
common->tastream_attr.attr.talker.tspec_max_frame_size = common->tastream_attr.attr.talker.tspec_max_frame_size =
htons((uint16_t)(stream->pdu_size - htons((uint16_t)(stream->pdu_size -
sizeof(struct avb_frame_header) + 1)); sizeof(struct avb_frame_header) + 1));
else } else {
common->tastream_attr.attr.talker.tspec_max_frame_size = common->tastream_attr.attr.talker.tspec_max_frame_size =
htons((uint16_t)(32 + stream->frames_per_pdu * stream->stride)); htons((uint16_t)(32 + stream->frames_per_pdu * stream->stride));
}
common->tastream_attr.attr.talker.tspec_max_interval_frames = common->tastream_attr.attr.talker.tspec_max_interval_frames =
htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT); htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT);
common->tastream_attr.attr.talker.priority = stream->prio; common->tastream_attr.attr.talker.priority = stream->prio;
@ -933,8 +1070,7 @@ void stream_destroy(struct stream *stream)
struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream);
uint64_t now; uint64_t now;
/* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart /* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart or replug doesn't strand a stale reservation on the bridge (socket still open here). */
* or replug doesn't strand a stale reservation on the bridge (socket still open here). */
now = stream_gptp_now(stream->server); now = stream_gptp_now(stream->server);
stream_deactivate(stream, now); stream_deactivate(stream, now);
@ -952,6 +1088,12 @@ void stream_destroy(struct stream *stream)
avb_mrp_attribute_destroy(common->tfstream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp);
} }
if (stream->drive_timer != NULL) {
set_drive_timeout(stream, 0);
pw_loop_destroy_source(stream->server->impl->data_loop, stream->drive_timer);
stream->drive_timer = NULL;
}
if (stream->raw_dump_fp) { if (stream->raw_dump_fp) {
fclose(stream->raw_dump_fp); fclose(stream->raw_dump_fp);
stream->raw_dump_fp = NULL; stream->raw_dump_fp = NULL;
@ -963,11 +1105,7 @@ static int setup_socket(struct stream *stream)
return avb_server_stream_setup_socket(stream->server, stream); return avb_server_stream_setup_socket(stream->server, stream);
} }
/* milan-avb: media-clock recovery ------------------------------------------- /* milan-avb: media-clock recovery; returns the CLOCK_SOURCE descriptor selected by CLOCK_DOMAIN 0 (or NULL); selection is clock_source_index, set at boot (Internal=0) and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */
*
* Returns the CLOCK_SOURCE descriptor currently selected by CLOCK_DOMAIN 0,
* or NULL. The selection is clock_source_index, set at boot (Internal = 0)
* and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */
static struct avb_aem_desc_clock_source *selected_clock_source(struct server *server) static struct avb_aem_desc_clock_source *selected_clock_source(struct server *server)
{ {
struct descriptor *dom; struct descriptor *dom;
@ -986,9 +1124,7 @@ static struct avb_aem_desc_clock_source *selected_clock_source(struct server *se
return descriptor_body(src); return descriptor_body(src);
} }
/* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose /* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose location points at this listener stream; CRF (MEDIA_CLOCK_STREAM) is out of scope and returns false. */
* location points at this listener stream. CRF (MEDIA_CLOCK_STREAM) is out of
* scope and returns false. */
static bool stream_mc_aaf_selected(struct stream *stream) static bool stream_mc_aaf_selected(struct stream *stream)
{ {
struct avb_aem_desc_clock_source *cs; struct avb_aem_desc_clock_source *cs;
@ -1029,43 +1165,36 @@ void avb_stream_update_clock_source(struct server *server)
} }
} }
/* Recover the talker media rate from a PDU's avtp_timestamp. The timestamps /* Recover the talker media rate from a PDU's avtp_timestamp (which carries the talker media clock in gPTP time); inter-PDU deltas give the rate, a second-order DLL (spa_dll) tracks phase+frequency, drives mc_rate. */
* carry the talker media clock in gPTP time; their inter-PDU deltas give its
* rate. A second-order DLL (spa_dll) tracks phase+frequency. Observe-only:
* drives mc_rate; consumption retiming is the next step. */
static void stream_mc_recover(struct stream *stream, const struct avb_packet_aaf *p) static void stream_mc_recover(struct stream *stream, const struct avb_packet_aaf *p)
{ {
uint32_t avtp_ts; uint32_t avtp_ts;
double rate; double rate;
if (!stream->mc_aaf_active || !p->tv) if (!stream->mc_aaf_active || !p->tv) {
return; return;
}
avtp_ts = ntohl(p->timestamp); avtp_ts = ntohl(p->timestamp);
rate = mc_recover_update(&stream->mc, avtp_ts, stream->frames_per_pdu, rate = mc_recover_update(&stream->mc, avtp_ts, stream->frames_per_pdu,
stream->info.info.raw.rate, stream->pdu_period); stream->info.info.raw.rate, stream->pdu_period);
if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1) if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1) {
pw_log_info("milan-avb: mc-recovery stream=%u pdus=%llu avtp_ts=%u model_lo=%u nom=%u pdu_ns=%lld rate=%.4f corr=%.8f err_ns=%d ppm=%.3f", pw_log_info("milan-avb: mc-recovery stream=%u pdus=%llu avtp_ts=%u model_lo=%u nom=%u pdu_ns=%lld rate=%.4f corr=%.8f err_ns=%d ppm=%.3f",
stream->index, (unsigned long long)stream->mc.pdus, avtp_ts, stream->index, (unsigned long long)stream->mc.pdus, avtp_ts,
(uint32_t)stream->mc.model_ns, (unsigned)stream->info.info.raw.rate, (uint32_t)stream->mc.model_ns, (unsigned)stream->info.info.raw.rate,
(long long)stream->pdu_period, rate, stream->mc.corr, (long long)stream->pdu_period, rate, stream->mc.corr,
stream->mc.last_err_ns, (stream->mc.corr - 1.0) * 1e6); stream->mc.last_err_ns, (stream->mc.corr - 1.0) * 1e6);
} }
}
/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback is interrupted by the loss of /* Milan 5.4.5.3 STREAM_INTERRUPTED: playback interrupted by loss of "several" AVTPDUs (count implementation-defined); a single dropped/reordered PDU is a SEQ_NUM_MISMATCH, a gap of this many or more is an interruption. */
* "several" AVTPDUs (the spec leaves the count implementation-defined). A
* single dropped/reordered PDU is a SEQ_NUM_MISMATCH but not a full
* interruption; a gap of this many or more missing PDUs is. */
#define AVB_STREAM_INTERRUPT_MIN_LOST 2 #define AVB_STREAM_INTERRUPT_MIN_LOST 2
/* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and /* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and NOT counted as SEQ_NUM_MISMATCH — covers the one-time bind/SRP-path-open gap of a mid-stream join; small, so genuine ongoing loss still counts. */
* NOT counted as SEQ_NUM_MISMATCH covers the one-time bind/SRP-path-open gap of
* a Listener that joins mid-stream. Small, so genuine ongoing loss still counts. */
#define AVB_STREAM_SEQ_SETTLE 8 #define AVB_STREAM_SEQ_SETTLE 8
/* Milan v1.2 Section 5.4: a received AAF AVTPDU matches the current format when /* Milan v1.2 Section 5.4: a received AAF AVTPDU matches the current format when subtype, format, nsr, bit depth, channels and sparse all match. */
* subtype, format, nsr, bit depth, channels and sparse all match. */
static inline bool aaf_pdu_format_matches(const struct avb_packet_aaf *p, static inline bool aaf_pdu_format_matches(const struct avb_packet_aaf *p,
const struct avb_aem_stream_format_info *fi) const struct avb_aem_stream_format_info *fi)
{ {
@ -1077,8 +1206,7 @@ static inline bool aaf_pdu_format_matches(const struct avb_packet_aaf *p,
p->sp == fi->sparse; p->sp == fi->sparse;
} }
/* Read the current format from the Stream Input descriptor. SET_STREAM_FORMAT /* Read the current format from the Stream Input descriptor; SET_STREAM_FORMAT updates it there, so this is always the current one. */
* updates it there, so this is always the current one. */
static void stream_in_current_format(struct stream *stream, static void stream_in_current_format(struct stream *stream,
struct avb_aem_stream_format_info *out) struct avb_aem_stream_format_info *out)
{ {
@ -1104,8 +1232,7 @@ static void handle_aaf_packet(struct stream *stream,
filled = spa_ringbuffer_get_write_index(&stream->ring, &index); filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
n_bytes = ntohs(p->data_len); n_bytes = ntohs(p->data_len);
/* IEEE 1722.1-2021 Table 7-156: per-PDU, bump UNSUPPORTED_FORMAT on any AVTPDU /* IEEE 1722.1-2021 Table 7-156: per-PDU, bump UNSUPPORTED_FORMAT on any AVTPDU whose format != the Stream Input current format (from descriptor), or malformed. */
* whose format != the Stream Input current format (from descriptor), or malformed. */
stream_in_current_format(stream, &cur); stream_in_current_format(stream, &cur);
if (n_bytes > (uint32_t)(len - (int)sizeof(*p)) || !aaf_pdu_format_matches(p, &cur)) { if (n_bytes > (uint32_t)(len - (int)sizeof(*p)) || !aaf_pdu_format_matches(p, &cur)) {
cnt->unsupported_format++; cnt->unsupported_format++;
@ -1113,9 +1240,7 @@ static void handle_aaf_packet(struct stream *stream,
return; return;
} }
/* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every valid AVTPDU received on the wire, independent of whether the listener pipeline could absorb it. */
* valid AVTPDU received on the wire independent of whether the listener
* pipeline could absorb it. */
cnt->frame_rx++; cnt->frame_rx++;
clock_gettime(CLOCK_MONOTONIC, &now_ts); clock_gettime(CLOCK_MONOTONIC, &now_ts);
@ -1127,27 +1252,24 @@ static void handle_aaf_packet(struct stream *stream,
stream->prev_seq = p->seq_num; /* (re)lock: seed seq, no gap */ stream->prev_seq = p->seq_num; /* (re)lock: seed seq, no gap */
si->seq_settle = AVB_STREAM_SEQ_SETTLE; /* grace the bind/path-open step */ si->seq_settle = AVB_STREAM_SEQ_SETTLE; /* grace the bind/path-open step */
} else if (si->seq_settle > 0) { } else if (si->seq_settle > 0) {
/* settling just after a (re)lock: a Listener that binds mid-stream /* settling just after a (re)lock: a Listener that binds mid-stream behind an SRP bridge gets a one-time sequence step as the bridge opens forwarding — re-seed and don't count it. */
* behind an SRP bridge gets a one-time sequence step as the bridge
* opens forwarding re-seed and don't count it. */
si->seq_settle--; si->seq_settle--;
stream->prev_seq = p->seq_num; stream->prev_seq = p->seq_num;
} else { } else {
uint8_t expected = (uint8_t)(stream->prev_seq + 1); uint8_t expected = (uint8_t)(stream->prev_seq + 1);
if (p->seq_num != expected) { if (p->seq_num != expected) {
/* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence /* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence discontinuity (loss, reorder or duplicate). */
* discontinuity (loss, reorder or duplicate). */
uint8_t lost = (uint8_t)(p->seq_num - expected); uint8_t lost = (uint8_t)(p->seq_num - expected);
cnt->seq_mistmatch++; cnt->seq_mistmatch++;
/* STREAM_INTERRUPTED only when several PDUs are missing. */ /* STREAM_INTERRUPTED only when several PDUs are missing. */
if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST) if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST) {
cnt->stream_interrupted++; cnt->stream_interrupted++;
} }
}
stream->prev_seq = p->seq_num; stream->prev_seq = p->seq_num;
} }
/* milan-avb: AAF media-clock recovery (active only when selected via the /* milan-avb: AAF media-clock recovery (active only when selected via the CLOCK_DOMAIN); recovers the talker media rate from avtp_timestamps. */
* CLOCK_DOMAIN). Recovers the talker media rate from avtp_timestamps. */
stream_mc_recover(stream, p); stream_mc_recover(stream, p);
/* milan-avb: latency observability (throttled to 1 Hz, env-gated). */ /* milan-avb: latency observability (throttled to 1 Hz, env-gated). */
@ -1277,8 +1399,7 @@ static void handle_iec61883_packet(struct stream *stream,
} }
} }
/* TODO: RX is on the main loop, not the RT data_loop — preemption can drop PDUs /* TODO: RX is on the main loop, not the RT data_loop — preemption can drop PDUs (SEQ_NUM_MISMATCH); move it to data_loop + a big SO_RCVBUF, like the flush_timer. */
* (SEQ_NUM_MISMATCH). Move it to data_loop + a big SO_RCVBUF, like the flush_timer. */
static void on_socket_data(void *data, int fd, uint32_t mask) static void on_socket_data(void *data, int fd, uint32_t mask)
{ {
struct stream *stream = data; struct stream *stream = data;
@ -1316,8 +1437,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask)
len - (int)sizeof(*h)); len - (int)sizeof(*h));
break; break;
case AVB_SUBTYPE_CRF: case AVB_SUBTYPE_CRF:
/* CRF clock-reference stream: no audio data plane. /* CRF clock-reference stream: no audio data plane; consume and ignore (clock recovery is future work). */
* Consume and ignore (clock recovery is future work). */
break; break;
default: default:
pw_log_warn("unsupported subtype 0x%02x", ph->subtype); pw_log_warn("unsupported subtype 0x%02x", ph->subtype);
@ -1327,12 +1447,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask)
} }
} }
/* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the /* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the not-bound -> bound transition (NOT the reverse); also re-arms the media-lock / seq-settle state, since the unlock edge is detected only in the GET_COUNTERS poll (100 ms silence) so a fast unbind/rebind could leave media_locked_state==true and miscount the bridge-open step as SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */
* not-bound -> bound transition (and NOT on bound -> not-bound). Also re-arms
* the media-lock / seq-settle state: the unlock edge is detected only inside the
* GET_COUNTERS poll (100 ms silence), so a fast unbind/rebind can leave
* media_locked_state == true and miscount the bridge-open step as
* SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */
static void stream_input_reset_counters(struct aecp_aem_stream_input_state *si) static void stream_input_reset_counters(struct aecp_aem_stream_input_state *si)
{ {
si->counters.media_locked = 0; si->counters.media_locked = 0;
@ -1359,11 +1474,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
struct stream_common *common; struct stream_common *common;
common = SPA_CONTAINER_OF(stream, struct stream_common, stream); common = SPA_CONTAINER_OF(stream, struct stream_common, stream);
/* milan-avb: SR-class priority + VLAN id come from the MSRP Domain, not a /* milan-avb: SR-class priority + VLAN id come from the MSRP Domain (the authoritative network-declared values), not a hardcoded default; read before setup_socket() since the listener uses stream->vlan_id to select its VLAN sub-iface. */
* hardcoded default. process_domain() re-adjusts the AVB_INTERFACE domain
* to the network-declared sr_class_priority/sr_class_vid, so this is the
* authoritative source. Read it before setup_socket() the listener uses
* stream->vlan_id to select its VLAN sub-iface. */
{ {
struct descriptor *avbif = server_find_descriptor(server, struct descriptor *avbif = server_find_descriptor(server,
AVB_AEM_DESC_AVB_INTERFACE, 0); AVB_AEM_DESC_AVB_INTERFACE, 0);
@ -1395,26 +1506,25 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
struct aecp_aem_stream_input_state *input_stream; struct aecp_aem_stream_input_state *input_stream;
input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common);
/* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the /* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the media-lock / seq-settle state on the not-bound -> bound transition. */
* media-lock / seq-settle state on the not-bound -> bound transition. */
stream_input_reset_counters(input_stream); stream_input_reset_counters(input_stream);
/* Prime ring with one PipeWire quantum of silence (Milan v1.2 Section 5.4.5.3). */ /* Prime ring with one PipeWire quantum of silence (Milan v1.2 Section 5.4.5.3). */
spa_ringbuffer_init(&stream->ring); spa_ringbuffer_init(&stream->ring);
if (stream->frames_per_pdu > 0) { if (stream->frames_per_pdu > 0) {
uint32_t prefill_pdus = 1024u / stream->frames_per_pdu; uint32_t prefill_pdus = 1024u / stream->frames_per_pdu;
if (prefill_pdus > 0) if (prefill_pdus > 0) {
pad_ringbuffer_with_silence(stream, (int)prefill_pdus); pad_ringbuffer_with_silence(stream, (int)prefill_pdus);
} }
}
/* milan-avb: pick up the current media-clock selection for this input /* milan-avb: pick up the current media-clock selection for this input (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */
* (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */
stream->mc_aaf_active = stream_mc_aaf_selected(stream); stream->mc_aaf_active = stream_mc_aaf_selected(stream);
if (stream->mc_aaf_active) if (stream->mc_aaf_active) {
stream_mc_reset(stream); stream_mc_reset(stream);
}
/* milan-avb: publish our contribution to graph latency so wpctl/pw-cli /* milan-avb: publish our contribution to graph latency (the prefill: one PipeWire quantum at 48 kHz) so wpctl/pw-cli report it. */
* report it. Latency is the prefill: one PipeWire quantum at 48 kHz. */
{ {
struct spa_latency_info latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); struct spa_latency_info latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT);
uint32_t rate = stream->info.info.raw.rate ? stream->info.info.raw.rate : 48000; uint32_t rate = stream->info.info.raw.rate ? stream->info.info.raw.rate : 48000;
@ -1444,9 +1554,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
pw_properties_free(props); pw_properties_free(props);
} }
/* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN. /* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN; compute from current state so a reconnect picks up an already-IN TA registrar (no NEW/JOIN event fires when the registrar didn't transition). */
* Compute from current state so a reconnect picks up an already-IN TA
* registrar (no NEW/JOIN event fires when the registrar didn't transition). */
common->lstream_attr.param = common->lstream_attr.param =
(common->tastream_attr.mrp != NULL && (common->tastream_attr.mrp != NULL &&
avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN) avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN)
@ -1459,8 +1567,12 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
avb_mrp_attribute_begin(input_stream->mvrp_attr.mrp, now); avb_mrp_attribute_begin(input_stream->mvrp_attr.mrp, now);
avb_mrp_attribute_join(input_stream->mvrp_attr.mrp, now, true); avb_mrp_attribute_join(input_stream->mvrp_attr.mrp, now, true);
} else { } else {
if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) {
return res; return res;
}
/* M2: re-anchor the presentation-timestamp accumulator on connect. */
stream->tx_pts = 0;
common->tastream_attr.attr.talker.stream_id = htobe64(stream->id); common->tastream_attr.attr.talker.stream_id = htobe64(stream->id);
memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6); memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6);
@ -1478,8 +1590,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
pw_stream_set_active(stream->stream, true); pw_stream_set_active(stream->stream, true);
/* Milan Table 5.17: STREAM_START counter ticks each time the stream /* Milan Table 5.17: STREAM_START counter ticks each time the stream transitions from stopped → started. */
* transitions from stopped started. */
if (stream->direction == SPA_DIRECTION_OUTPUT) { if (stream->direction == SPA_DIRECTION_OUTPUT) {
stream_out_counters(stream)->stream_start++; stream_out_counters(stream)->stream_start++;
stream_out_mark_counters_dirty(stream); stream_out_mark_counters_dirty(stream);
@ -1509,10 +1620,7 @@ int stream_deactivate(struct stream *stream, uint64_t now)
pw_loop_invoke(stream->server->impl->data_loop, do_remove_flush_timer, pw_loop_invoke(stream->server->impl->data_loop, do_remove_flush_timer,
0, NULL, 0, true, stream); 0, NULL, 0, true, stream);
} }
/* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the /* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the reservation immediately (Leave) instead of holding stale state until its LeaveAll timer — otherwise a stop/restart or replug to another port can't re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */
* reservation immediately (Leave) instead of holding stale state until its
* LeaveAll timer otherwise a stop/restart or replug to another port can't
* re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */
if (stream->direction == SPA_DIRECTION_INPUT) { if (stream->direction == SPA_DIRECTION_INPUT) {
si = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); si = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common);
avb_mrp_attribute_leave(common->lstream_attr.mrp, now); avb_mrp_attribute_leave(common->lstream_attr.mrp, now);
@ -1524,8 +1632,7 @@ int stream_deactivate(struct stream *stream, uint64_t now)
avb_mrp_attribute_leave(common->tastream_attr.mrp, now); avb_mrp_attribute_leave(common->tastream_attr.mrp, now);
} }
/* Milan Table 5.17: STREAM_STOP counter ticks each transition the /* Milan Table 5.17: STREAM_STOP counter ticks each transition the other way. */
* other way. */
if (stream->direction == SPA_DIRECTION_OUTPUT) { if (stream->direction == SPA_DIRECTION_OUTPUT) {
stream_out_counters(stream)->stream_stop++; stream_out_counters(stream)->stream_stop++;
stream_out_mark_counters_dirty(stream); stream_out_mark_counters_dirty(stream);

View file

@ -19,7 +19,8 @@
#include <pipewire/pipewire.h> #include <pipewire/pipewire.h>
#define BUFFER_SIZE (1u<<16) /* milan-avb: the talker ring must hold SEVERAL graph quanta so the burst-fill (one quantum/cycle) and the 125us flush-drain decouple; at 1<<16 the ring equalled ONE quantum so each cycle overwrote unread samples (~40 glitches/s). 1<<18 = 4 quanta. */
#define BUFFER_SIZE (1u<<18)
#define BUFFER_MASK (BUFFER_SIZE-1) #define BUFFER_MASK (BUFFER_SIZE-1)
struct stream { struct stream {
@ -41,6 +42,17 @@ struct stream {
struct spa_source *source; struct spa_source *source;
struct spa_source *flush_timer; struct spa_source *flush_timer;
uint64_t flush_last_ns; uint64_t flush_last_ns;
/* milan-avb: self-driving timer (the node IS the graph DRIVER); a data-loop one-shot fills io_position->clock, re-arms to drive_next_time, and triggers one graph cycle (pipe-tunnel pattern) so the graph fill and AVTP egress share one clock (no follower-resampler FM). */
struct spa_source *drive_timer;
uint64_t drive_next_time;
bool driving;
uint64_t drive_phc_last;
uint64_t drive_mono_last;
double drive_ratio_ema; /* M2: EMA of PHC/MONOTONIC rate ratio */
/* M2: monotonic AVTP presentation-timestamp accumulator (PHC domain); advances by exactly pdu_period per PDU, slow-leaked toward the live PHC, so the listener's recovered media clock stays jitter-free (per-tick PHC reads inject interpolation jitter, audible FM). */
uint64_t tx_pts;
bool is_crf; bool is_crf;
uint64_t next_txtime; uint64_t next_txtime;
int prio; int prio;
@ -72,15 +84,11 @@ struct stream {
uint32_t stride; uint32_t stride;
struct spa_audio_info info; struct spa_audio_info info;
/* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only). /* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only); active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM) source pointing at this stream; recovered from avtp_timestamp deltas (mc-recover.h). */
* Active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM)
* clock source whose location points at this stream. Estimator state in
* struct mc_recover (mc-recover.h); recovered from avtp_timestamp deltas. */
bool mc_aaf_active; bool mc_aaf_active;
struct mc_recover mc; struct mc_recover mc;
/* milan-avb: actuator I/O areas (set via .io_changed). io_rate_match is the /* milan-avb: actuator I/O areas (set via .io_changed); io_rate_match is the resampler knob, NULL unless the adapter inserted a resampler. */
* resampler knob NULL unless the adapter inserted a resampler. */
struct spa_io_rate_match *io_rate_match; struct spa_io_rate_match *io_rate_match;
struct spa_io_position *io_position; struct spa_io_position *io_position;
@ -112,9 +120,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now);
int stream_deactivate(struct stream *stream, uint64_t now); int stream_deactivate(struct stream *stream, uint64_t now);
int stream_activate_virtual(struct stream *stream, uint16_t index); int stream_activate_virtual(struct stream *stream, uint16_t index);
/* milan-avb: re-evaluate each input stream's media-clock recovery against the /* milan-avb: re-evaluate each input stream's media-clock recovery against the current CLOCK_DOMAIN selection; call after SET_CLOCK_SOURCE for on-the-fly clock-source switching. */
* current CLOCK_DOMAIN selection. Call after SET_CLOCK_SOURCE for on-the-fly
* clock-source switching. */
void avb_stream_update_clock_source(struct server *server); void avb_stream_update_clock_source(struct server *server);
#endif /* AVB_STREAM_H */ #endif /* AVB_STREAM_H */