diff --git a/src/modules/module-avb/aaf.h b/src/modules/module-avb/aaf.h index da61dc4a3..b50e7c109 100644 --- a/src/modules/module-avb/aaf.h +++ b/src/modules/module-avb/aaf.h @@ -79,4 +79,22 @@ struct avb_packet_aaf { uint8_t payload[0]; } __attribute__ ((__packed__)); +/* IEEE 1722-2016 Table 18: AAF nsr code to rate in Hz, 0 if user/reserved. */ +static inline uint32_t avb_aaf_nsr_to_rate(uint8_t nsr) +{ + switch (nsr) { + case AVB_AAF_PCM_NSR_8KHZ: return 8000; + case AVB_AAF_PCM_NSR_16KHZ: return 16000; + case AVB_AAF_PCM_NSR_24KHZ: return 24000; + case AVB_AAF_PCM_NSR_32KHZ: return 32000; + case AVB_AAF_PCM_NSR_44_1KHZ: return 44100; + case AVB_AAF_PCM_NSR_48KHZ: return 48000; + case AVB_AAF_PCM_NSR_88_2KHZ: return 88200; + case AVB_AAF_PCM_NSR_96KHZ: return 96000; + case AVB_AAF_PCM_NSR_176_4KHZ: return 176400; + case AVB_AAF_PCM_NSR_192KHZ: return 192000; + default: return 0; + } +} + #endif /* AVB_AAF_H */ diff --git a/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c b/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c index 7d4f56f87..8ead61950 100644 --- a/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c +++ b/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c @@ -53,6 +53,12 @@ static inline uint64_t peer_id_from_entity_id(uint64_t entity_id, uint16_t uniqu return 0; } + /* Non-EUI-64 entity_id (MAC|entity_index): high 48 bits are the MAC, swap the + * low 16 for unique_id. EUI-64 (FF:FE marker) rebuilds the stream_id below. */ + if (((entity_id >> 24) & 0xFFFFULL) != 0xFFFEULL) { + return (entity_id & 0xFFFFFFFFFFFF0000ULL) | unique_id; + } + return (entity_id & 0xFFFFFF0000000000ULL) | ((entity_id & 0xFFFFFFULL) << 16) | unique_id; @@ -69,11 +75,17 @@ static inline void clear_stream_binding(struct aecp_aem_stream_input_state_milan sizeof(stream->stream_in_sta.common.stream.addr)); stream->stream_in_sta.common.stream.vlan_id = AVB_DEFAULT_VLAN; + stream->acmp_sta.talker_entity_id = 0; stream->stream_in_sta.stream_info_dirty = true; } static inline uint64_t stream_talker_entity_id(const struct aecp_aem_stream_input_state_milan_v12 *s) { + /* Prefer the talker entity_id stashed at BIND_RX (round-trip-safe); deriving it + * from the MSRP stream_id is lossy for non-EUI-64 entity_ids. */ + if (s->acmp_sta.talker_entity_id != 0) { + return s->acmp_sta.talker_entity_id; + } return entity_id_from_peer_id(be64toh(s->stream_in_sta.common.lstream_attr.attr.listener.stream_id)); } @@ -455,6 +467,7 @@ static void binding_save_parameters(struct acmp *acmp, uint64_t stream_id = htobe64(peer_id_from_entity_id(be64toh(p->talker_guid), ntohs(p->talker_unique_id))); stream->acmp_sta.controller_entity_id = be64toh(p->controller_guid); + stream->acmp_sta.talker_entity_id = be64toh(p->talker_guid); stream->stream_in_sta.common.lstream_attr.attr.listener.stream_id = stream_id; stream->stream_in_sta.common.tastream_attr.attr.talker.stream_id = stream_id; stream->stream_in_sta.common.tfstream_attr.attr.talker_fail.talker.stream_id = stream_id; @@ -2444,6 +2457,38 @@ void acmp_periodic_milan_v12(struct acmp *acmp, uint64_t now) stream_out->last_probe_rx_time = 0; } } + + /* Milan Section 4.3.3.1 / 5.5.3: a settled Listener with no reservation yet + * (SETTLED_NO_RSV) re-evaluates Listener Ready against the Talker Advertise + * registrar each tick. After a bridge convergence delay the TA arrives with no + * fresh ACMP event, so this re-declares Ready and advances to SETTLED_RSV_OK the + * instant the TA is IN — the stall self-heals, no controller re-bind needed. */ + for (uint16_t desc_index = 0; desc_index < UINT16_MAX; desc_index++) { + struct descriptor *desc; + struct aecp_aem_stream_input_state_milan_v12 *si_m; + struct stream_common *common; + bool ta_in; + + desc = server_find_descriptor(acmp->server, AVB_AEM_DESC_STREAM_INPUT, + desc_index); + if (desc == NULL) + break; + + si_m = desc->ptr; + if (si_m->acmp_sta.fsm_acmp_state != FSM_ACMP_STATE_MILAN_V12_SETTLED_NO_RSV) + continue; + + common = &si_m->stream_in_sta.common; + ta_in = common->tastream_attr.mrp != NULL && + avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN; + common->lstream_attr.param = ta_in + ? AVB_MSRP_LISTENER_PARAM_READY + : AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; + if (common->lstream_attr.mrp != NULL) + avb_mrp_attribute_join(common->lstream_attr.mrp, now, true); + if (ta_in) + si_m->acmp_sta.fsm_acmp_state = FSM_ACMP_STATE_MILAN_V12_SETTLED_RSV_OK; + } } static const char *probing_status_name(uint8_t s) diff --git a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c index 793e6f909..91df002e1 100644 --- a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c +++ b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c @@ -360,7 +360,11 @@ static void emit_avb_interface_counters(struct aecp *aecp, uint16_t desc_index, * here, gated by last_counters_emit_ns. */ #define COUNTER_UNSOL_MIN_INTERVAL_NS ((int64_t)SPA_NSEC_PER_SEC) -#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(2 * SPA_NSEC_PER_MSEC)) +/* A frame is "missing" only after a gap far longer than event-loop scheduling + * jitter — 2 ms (16 PDU periods) is processed-not-arrived jitter, not media + * loss, and spuriously unlocks a healthy stream on a busy software listener. + * 100 ms is ~5x the listener prefill and well above loop jitter. */ +#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(100 * SPA_NSEC_PER_MSEC)) static bool counter_rate_limit_elapsed(int64_t now, int64_t last_emit) { diff --git a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c index b50bd8b5a..651b0ad1b 100644 --- a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c +++ b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-clock-source.c @@ -141,6 +141,11 @@ int handle_cmd_set_clock_source_milan_v12(struct aecp *aecp, int64_t now, /** Descriptor always keep the network endianness */ dclk_domain->clock_source_index = htons(clock_src_index); + + /* milan-avb: apply the new selection to the data plane on the fly — + * (de)activate AAF media-clock recovery on the affected input streams. */ + avb_stream_update_clock_source(server); + rc = reply_success(aecp, m, len); if (rc) { pw_log_error("Reply failed for set_clock_source\n"); diff --git a/src/modules/module-avb/aecp-aem-state.h b/src/modules/module-avb/aecp-aem-state.h index df3f65c4c..41f949af2 100644 --- a/src/modules/module-avb/aecp-aem-state.h +++ b/src/modules/module-avb/aecp-aem-state.h @@ -182,10 +182,17 @@ struct aecp_aem_stream_input_state { * most recent valid PDU; media_locked_state is the current edge. */ int64_t last_frame_rx_ns; bool media_locked_state; + + /* Settle window after a (re)lock: a Listener binding mid-stream behind an + * SRP bridge sees a one-time sequence step as the bridge opens forwarding a + * beat after the talker is already transmitting. Re-seed prev_seq for this + * many post-lock PDUs instead of counting it as SEQ_NUM_MISMATCH. */ + uint8_t seq_settle; }; struct acmp_stream_status_milan_v12 { uint64_t controller_entity_id; + uint64_t talker_entity_id; /* IEEE 1722.1-2021 Section 7.4.6, BIND_RX_COMMAND talker_guid */ uint32_t acmp_flags; uint8_t probing_status; uint8_t acmp_status; diff --git a/src/modules/module-avb/aecp-aem.h b/src/modules/module-avb/aecp-aem.h index 997747ed8..efcb4be1b 100644 --- a/src/modules/module-avb/aecp-aem.h +++ b/src/modules/module-avb/aecp-aem.h @@ -13,6 +13,7 @@ #include "aecp.h" #include "aecp-aem-types.h" #include "packets.h" +#include "aaf.h" /* AVDECC stream_format decoder. * @@ -23,12 +24,8 @@ * plane plus an `is_audio` shortcut so callers can skip non-media streams * (CRF). Fields are 0 when not applicable. * - * NOTE: bit-level decoding inside each subtype is incomplete. Today the - * decoder identifies the subtype reliably and falls back to the historical - * 8 ch / 48 kHz / 24-bit defaults for audio — sufficient for current Milan - * builds where every stream_input_0 / stream_output_0 uses one of those - * formats. TODO Section H.1 (AAF) and Section F (IEC 61883-6 AM824) fields once a real - * conformance run needs other rates / channel counts. */ + * AAF (Section H.1) decodes each field. IEC 61883-6 (Section F) still uses the + * historical 8 ch / 48 kHz defaults, TODO decode it. */ enum avb_aem_stream_format_kind { AVB_AEM_STREAM_FORMAT_KIND_UNKNOWN = 0, AVB_AEM_STREAM_FORMAT_KIND_AAF, @@ -44,6 +41,9 @@ struct avb_aem_stream_format_info { uint16_t channels; uint8_t bit_depth; uint16_t samples_per_frame; + uint8_t format; + uint8_t nsr; + uint8_t sparse; }; static inline void avb_aem_stream_format_decode(uint64_t fmt_be, @@ -57,15 +57,23 @@ static inline void avb_aem_stream_format_decode(uint64_t fmt_be, out->channels = 0; out->bit_depth = 0; out->samples_per_frame = 0; + out->format = 0; + out->nsr = 0; + out->sparse = 0; switch (out->subtype) { case AVB_SUBTYPE_AAF: + /* Section H.1 quadlet: nsr[51:48] format[47:40] bit_depth[39:32] + * channels[31:22] samples_per_frame[21:12]. */ out->kind = AVB_AEM_STREAM_FORMAT_KIND_AAF; out->is_audio = true; - out->rate = 48000; - out->channels = 8; - out->bit_depth = 24; - out->samples_per_frame = 6; + out->nsr = (uint8_t)((f >> 48) & 0x0F); + out->format = (uint8_t)((f >> 40) & 0xFF); + out->bit_depth = (uint8_t)((f >> 32) & 0xFF); + out->channels = (uint16_t)((f >> 22) & 0x3FF); + out->samples_per_frame = (uint16_t)((f >> 12) & 0x3FF); + out->sparse = AVB_AAF_PCM_SP_NORMAL; + out->rate = avb_aaf_nsr_to_rate(out->nsr); break; case AVB_SUBTYPE_61883_IIDC: out->kind = AVB_AEM_STREAM_FORMAT_KIND_IEC_61883_6; diff --git a/src/modules/module-avb/avb.c b/src/modules/module-avb/avb.c index eeacb7b7c..207155d18 100644 --- a/src/modules/module-avb/avb.c +++ b/src/modules/module-avb/avb.c @@ -41,6 +41,10 @@ struct pw_avb *pw_avb_new(struct pw_context *context, impl->context = context; impl->loop = pw_context_get_main_loop(context); + /* Acquire an RT loop (data.rt class) so the talker flush timer is paced + * under SCHED_FIFO; main-loop scheduling jitter exceeds the 2ms class-A + * presentation margin and produces late timestamps at the listener. */ + impl->data_loop = pw_context_acquire_loop(context, NULL); impl->timer_queue = pw_context_get_timer_queue(context); impl->props = props; impl->core = pw_context_get_object(context, PW_TYPE_INTERFACE_Core); @@ -80,6 +84,9 @@ static void impl_free(struct impl *impl) spa_list_consume(s, &impl->servers, link) avdecc_server_free(s); + if (impl->data_loop != NULL) { + pw_context_release_loop(impl->context, impl->data_loop); + } free(impl); } diff --git a/src/modules/module-avb/avdecc.c b/src/modules/module-avb/avdecc.c index 970ceea2e..41b9322aa 100644 --- a/src/modules/module-avb/avdecc.c +++ b/src/modules/module-avb/avdecc.c @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -253,11 +256,187 @@ static int raw_transport_setup(struct server *server) return 0; } +/* milan-avb: hand-rolled netlink VLAN sub-iface creator. + * + * On I210-class NICs with rx-vlan-filter[fixed]=on the silicon drops + * VLAN-tagged frames whose VID is not registered. Registering happens + * implicitly when a VLAN sub-iface is added via RTM_NEWLINK. We create + * . on first use, never delete it (bounded leak: one + * sub-iface per SR class), and bind the listener stream socket to it. + * + * Fall back to PACKET_MR_PROMISC if any step fails (no CAP_NET_ADMIN, + * no 8021q module loaded, etc.). */ + +#define MILAN_NLALIGN(n) (((n) + 3U) & ~3U) + +static int milan_nl_send(int fd, uint16_t mt, uint16_t flags, + uint32_t seq, const void *payload, size_t plen) +{ + struct { + struct nlmsghdr nlh; + char body[2048]; + } msg = { 0 }; + size_t need = NLMSG_HDRLEN + plen; + if (need > sizeof(msg)) + return -EMSGSIZE; + msg.nlh.nlmsg_len = need; + msg.nlh.nlmsg_type = mt; + msg.nlh.nlmsg_flags = flags; + msg.nlh.nlmsg_seq = seq; + msg.nlh.nlmsg_pid = 0; + if (plen) + memcpy(msg.body, payload, plen); + if (send(fd, &msg, need, 0) < 0) + return -errno; + return 0; +} + +static int milan_nl_recv_ack(int fd, uint32_t seq) +{ + char buf[4096]; + for (;;) { + ssize_t n = recv(fd, buf, sizeof(buf), 0); + size_t off = 0; + if (n < 0) + return -errno; + while (off + NLMSG_HDRLEN <= (size_t)n) { + struct nlmsghdr *h = (struct nlmsghdr *)(buf + off); + if (h->nlmsg_len < NLMSG_HDRLEN || + off + h->nlmsg_len > (size_t)n) + break; + if (h->nlmsg_type == NLMSG_ERROR && h->nlmsg_seq == seq) { + struct nlmsgerr *e = NLMSG_DATA(h); + return e->error; + } + off += NLMSG_ALIGN(h->nlmsg_len); + } + } +} + +static size_t milan_nl_attr(char *p, uint16_t type, const void *val, uint16_t vlen) +{ + struct rtattr *r = (struct rtattr *)p; + size_t total; + r->rta_len = RTA_LENGTH(vlen); + r->rta_type = type; + memcpy(RTA_DATA(r), val, vlen); + total = RTA_ALIGN(r->rta_len); + if (total > (size_t)r->rta_len) + memset(p + r->rta_len, 0, total - r->rta_len); + return total; +} + +/* Returns ifindex (>0) or -errno. */ +static int milan_get_ifindex(const char *name) +{ + int fd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0); + struct ifreq req; + int res, saved; + if (fd < 0) + return -errno; + spa_zero(req); + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", name); + res = ioctl(fd, SIOCGIFINDEX, &req); + saved = -errno; + close(fd); + return res < 0 ? saved : req.ifr_ifindex; +} + +/* Ensure . exists, is UP, and is a VLAN sub-iface of . + * Writes the sub-iface name to out and returns 0 on success. */ +static int milan_ensure_vlan_iface(const char *parent_ifname, uint16_t vid, + char *out, size_t out_size) +{ + int rc, existing, parent_idx, nlfd, err, new_idx; + struct sockaddr_nl sa = { .nl_family = AF_NETLINK }; + char payload[256]; + char *p = payload; + struct ifinfomsg ifi = { 0 }; + uint32_t pidx; + char nested[64]; + char *np = nested; + const char kind[] = "vlan"; + char data[16]; + char *dp = data; + uint16_t vidv = vid; + uint32_t seq = 1; + struct ifinfomsg up_ifi = { 0 }; + + if (vid == 0 || vid >= 4095) + return -EINVAL; + rc = snprintf(out, out_size, "%s.%u", parent_ifname, (unsigned)vid); + if (rc < 0 || (size_t)rc >= out_size) + return -ENAMETOOLONG; + + /* If the sub-iface already exists, assume it is what we want and reuse. */ + existing = milan_get_ifindex(out); + if (existing > 0) + return 0; + + parent_idx = milan_get_ifindex(parent_ifname); + if (parent_idx <= 0) + return parent_idx ? parent_idx : -ENODEV; + + nlfd = socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_ROUTE); + if (nlfd < 0) + return -errno; + if (bind(nlfd, (struct sockaddr *)&sa, sizeof(sa)) < 0) { + int e = -errno; close(nlfd); return e; + } + + /* RTM_NEWLINK: ifinfomsg + IFLA_IFNAME + IFLA_LINK + IFLA_LINKINFO{KIND=vlan, DATA{VLAN_ID}} */ + ifi.ifi_family = AF_UNSPEC; + ifi.ifi_change = 0xFFFFFFFFu; + memcpy(p, &ifi, sizeof(ifi)); p += sizeof(ifi); + p += milan_nl_attr(p, IFLA_IFNAME, out, (uint16_t)(strlen(out) + 1)); + pidx = (uint32_t)parent_idx; + p += milan_nl_attr(p, IFLA_LINK, &pidx, sizeof(pidx)); + + /* LINKINFO is nested: KIND=vlan, DATA={VLAN_ID=vid} */ + np += milan_nl_attr(np, IFLA_INFO_KIND, kind, sizeof(kind)); + dp += milan_nl_attr(dp, IFLA_VLAN_ID, &vidv, sizeof(vidv)); + np += milan_nl_attr(np, IFLA_INFO_DATA, data, (uint16_t)(dp - data)); + p += milan_nl_attr(p, IFLA_LINKINFO, nested, (uint16_t)(np - nested)); + + err = milan_nl_send(nlfd, RTM_NEWLINK, + NLM_F_REQUEST | NLM_F_CREATE | NLM_F_EXCL | NLM_F_ACK, + seq, payload, (size_t)(p - payload)); + if (err == 0) + err = milan_nl_recv_ack(nlfd, seq); + if (err != 0 && err != -EEXIST) { + close(nlfd); + return err; + } + + /* Bring it UP. */ + new_idx = milan_get_ifindex(out); + if (new_idx <= 0) { + close(nlfd); + return new_idx ? new_idx : -ENODEV; + } + up_ifi.ifi_family = AF_UNSPEC; + up_ifi.ifi_index = new_idx; + up_ifi.ifi_flags = IFF_UP; + up_ifi.ifi_change = IFF_UP; + err = milan_nl_send(nlfd, RTM_NEWLINK, NLM_F_REQUEST | NLM_F_ACK, + ++seq, &up_ifi, sizeof(up_ifi)); + if (err == 0) + err = milan_nl_recv_ack(nlfd, seq); + close(nlfd); + if (err != 0) + return err; + + return 0; +} + static int raw_stream_setup_socket(struct server *server, struct stream *stream) { int res; char buf[128]; struct ifreq req; + const char *bind_ifname = server->ifname; + char vlan_ifname[IFNAMSIZ]; + bool used_vlan_subiface = false; spa_autoclose int fd = socket(AF_PACKET, SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK, htons(ETH_P_ALL)); if (fd < 0) { @@ -265,8 +444,30 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream) return -errno; } + /* For listener RX: route stream via a VLAN sub-iface so the NIC's + * hardware filter accepts VID-tagged AAF without promisc on parent. */ + /* Listener-only: route stream RX via a VLAN sub-iface so the NIC accepts + * VID-tagged AAF without promisc on parent. OUTPUT direction stays on + * the parent because setup_pdu_milan_v12() already inserts a manual + * 802.1Q tag in the PDU header — the kernel would add a second tag + * (QinQ) if we bound the talker socket to enp6s0.. */ + if (stream->direction == SPA_DIRECTION_INPUT && stream->vlan_id > 0) { + int e = milan_ensure_vlan_iface(server->ifname, + (uint16_t)stream->vlan_id, + vlan_ifname, sizeof(vlan_ifname)); + if (e == 0) { + bind_ifname = vlan_ifname; + used_vlan_subiface = true; + pw_log_info("milan-avb: listener RX via VLAN sub-iface %s (vid %d)", + vlan_ifname, stream->vlan_id); + } else { + pw_log_warn("milan-avb: VLAN sub-iface setup failed (%d), " + "falling back to PACKET_MR_PROMISC on parent", -e); + } + } + spa_zero(req); - snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname); + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", bind_ifname); res = ioctl(fd, SIOCGIFINDEX, &req); if (res < 0) { pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname); @@ -279,23 +480,15 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream) stream->sock_addr.sll_ifindex = req.ifr_ifindex; if (stream->direction == SPA_DIRECTION_OUTPUT) { - struct sock_txtime txtime_cfg; - + /* CBS/Qav-exclusive: set only the traffic-class priority so the egress + * CBS qdisc shapes this stream. SO_TXTIME (launch-time/ETF) is NOT set -- + * CBS and SO_TXTIME cannot coexist on the same queue. */ res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &stream->prio, sizeof(stream->prio)); if (res < 0) { pw_log_error("setsockopt(SO_PRIORITY %d) failed: %m", stream->prio); return -errno; } - - txtime_cfg.clockid = CLOCK_TAI; - txtime_cfg.flags = 0; - res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg, - sizeof(txtime_cfg)); - if (res < 0) { - pw_log_error("setsockopt(SO_TXTIME) failed: %m"); - return -errno; - } } else { struct packet_mreq mreq; @@ -319,6 +512,18 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream) pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m"); return -errno; } + + /* Fallback: lift promisc only when the VLAN sub-iface path didn't + * take. With the sub-iface, the NIC accepts VID 2 natively. */ + if (!used_vlan_subiface) { + spa_zero(mreq); + mreq.mr_ifindex = req.ifr_ifindex; + mreq.mr_type = PACKET_MR_PROMISC; + res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, + &mreq, sizeof(struct packet_mreq)); + if (res < 0) + pw_log_warn("setsockopt(PACKET_MR_PROMISC) fallback failed: %m"); + } } return spa_steal_fd(fd); } diff --git a/src/modules/module-avb/descriptors.c b/src/modules/module-avb/descriptors.c index 3320f4a4b..034ca743c 100644 --- a/src/modules/module-avb/descriptors.c +++ b/src/modules/module-avb/descriptors.c @@ -4,6 +4,10 @@ /* SPDX-FileCopyrightText: Copyright © 2025 Simon Gapp */ /* SPDX-License-Identifier: MIT */ +#include + +#include + #include "adp.h" #include "aecp-aem.h" #include "aecp-aem-types.h" @@ -300,23 +304,26 @@ static void init_descriptor_legacy_avb(struct server *server) static void init_descriptor_milan_v12(struct server *server) { + /* name the entity after the hostname so each box shows as pw0/pw1/pw2 */ + char hostname[64] = {0}; + if (gethostname(hostname, sizeof(hostname) - 1) != 0 || hostname[0] == '\0') + snprintf(hostname, sizeof(hostname), "%s", DSC_STRINGS_0_DEVICE_NAME); + // TODO PERSISTENCE: retrieve the saved buffers. /**************************************************************************************/ /* IEEE 1722.1-2021, Sec. 7.2.12 - STRINGS Descriptor * Up to 7 localized strings */ - es_builder_add_descriptor(server, AVB_AEM_DESC_STRINGS, 0, - sizeof(struct avb_aem_desc_strings), - &(struct avb_aem_desc_strings) - { - .string_0 = DSC_STRINGS_0_DEVICE_NAME, + struct avb_aem_desc_strings strings = { .string_1 = DSC_STRINGS_1_CONFIGURATION_NAME, .string_2 = DSC_STRINGS_2_MANUFACTURER_NAME, .string_3 = DSC_STRINGS_3_GROUP_NAME, .string_4 = DSC_STRINGS_4_MAINTAINER_0, .string_5 = DSC_STRINGS_4_MAINTAINER_1, - } - ); + }; + snprintf(strings.string_0, sizeof(strings.string_0), "%s", hostname); + es_builder_add_descriptor(server, AVB_AEM_DESC_STRINGS, 0, + sizeof(strings), &strings); /**************************************************************************************/ /* IEEE 1722.1-2021, Sec. 7.2.11 - LOCALE Descriptor */ @@ -334,8 +341,7 @@ static void init_descriptor_milan_v12(struct server *server) /* Milan v1.2, Sec. 5.3.3.1 */ struct avb_entity_config entity_conf = conf_load_entity(server->impl->props); - struct avb_aem_desc_entity entity_desc = - { + struct avb_aem_desc_entity entity = { .entity_id = htobe64(server->entity_id), .entity_model_id = htobe64(DSC_ENTITY_MODEL_ID), .entity_capabilities = htonl(entity_conf.entity_capabilities), @@ -351,21 +357,19 @@ static void init_descriptor_milan_v12(struct server *server) .available_index = htonl(DSC_ENTITY_MODEL_AVAILABLE_INDEX), .association_id = htobe64(DSC_ENTITY_MODEL_ASSOCIATION_ID), - .vendor_name_string = htons(entity_conf.vendor_name), - .model_name_string = htons(entity_conf.model_name), + .vendor_name_string = htons(DSC_ENTITY_MODEL_VENDOR_NAME_STRING), + .model_name_string = htons(DSC_ENTITY_MODEL_MODEL_NAME_STRING), + .group_name = DSC_ENTITY_MODEL_GROUP_NAME, + .serial_number = DSC_ENTITY_MODEL_SERIAL_NUMBER, .configurations_count = htons(DSC_ENTITY_MODEL_CONFIGURATIONS_COUNT), .current_configuration = htons(DSC_ENTITY_MODEL_CURRENT_CONFIGURATION) }; - memcpy(entity_desc.entity_name, entity_conf.entity_name, sizeof(entity_desc.entity_name)); - memcpy(entity_desc.firmware_version, entity_conf.firmware_version, sizeof(entity_desc.firmware_version)); - memcpy(entity_desc.group_name, entity_conf.group_name, sizeof(entity_desc.group_name)); - memcpy(entity_desc.serial_number, entity_conf.serial_number, sizeof(entity_desc.serial_number)); - + snprintf(entity.entity_name, sizeof(entity.entity_name), "%s", hostname); + /* firmware_version = the canonical PipeWire library version */ + snprintf(entity.firmware_version, sizeof(entity.firmware_version), "%s", pw_get_library_version()); es_builder_add_descriptor(server, AVB_AEM_DESC_ENTITY, 0, - sizeof(struct avb_aem_desc_entity), - &entity_desc); - + sizeof(entity), &entity); /**************************************************************************************/ /* IEEE 1722.1-2021, Sec. 7.2.2 - CONFIGURATION Descriptor*/ /* Milan v1.2, Sec. 5.3.3.2 */ diff --git a/src/modules/module-avb/entity-model-milan-v12.h b/src/modules/module-avb/entity-model-milan-v12.h index 1e603a82a..c071cf904 100644 --- a/src/modules/module-avb/entity-model-milan-v12.h +++ b/src/modules/module-avb/entity-model-milan-v12.h @@ -319,9 +319,11 @@ BUILD_SAMPLING_RATE(DSC_AUDIO_UNIT_SAMPLING_RATE_PULL, DSC_AUDIO_UNIT_SAMPLING_R #define DSC_STREAM_INPUT_LOCALIZED_DESCRIPTION AVB_AEM_DESC_INVALID #define DSC_STREAM_INPUT_CLOCK_DOMAIN_INDEX 0 #define DSC_STREAM_INPUT_STREAM_FLAGS (AVB_AEM_DESC_STREAM_FLAG_SYNC_SOURCE | AVB_AEM_DESC_STREAM_FLAG_CLASS_A) -// To match my talker +// Default current_format = AAF INT32/48k/8ch, to match the 8-channel PipeWire talker +// (pw1) without a per-bind set-format. The supported list below keeps the smaller +// channel counts (1/2/4/6) so 4ch talkers like the DS20 still bind via set-format. // TODO: Define based on AUDIO_UNIT etc. -#define DSC_STREAM_INPUT_CURRENT_FORMAT 0x0205022001006000ULL +#define DSC_STREAM_INPUT_CURRENT_FORMAT 0x0205022002006000ULL // TODO: Is 132 here, should be 138 according to spec #define DSC_STREAM_INPUT_FORMATS_OFFSET (4 + sizeof(struct avb_aem_desc_stream)) @@ -342,7 +344,7 @@ BUILD_SAMPLING_RATE(DSC_AUDIO_UNIT_SAMPLING_RATE_PULL, DSC_AUDIO_UNIT_SAMPLING_R #define DSC_STREAM_INPUT_AVB_INTERFACE_INDEX 0 #define DSC_STREAM_INPUT_BUFFER_LENGTH_IN_NS 2126000 -#define DSC_STREAM_INPUT_FORMATS_0 DSC_STREAM_INPUT_CURRENT_FORMAT +#define DSC_STREAM_INPUT_FORMATS_0 0x0205022001006000ULL /* 4ch (DS20) — kept supported */ #define DSC_STREAM_INPUT_FORMATS_1 0x0205022000406000ULL #define DSC_STREAM_INPUT_FORMATS_2 0x0205022000806000ULL #define DSC_STREAM_INPUT_FORMATS_3 0x0205022001806000ULL diff --git a/src/modules/module-avb/es-builder.c b/src/modules/module-avb/es-builder.c index 4eb7222dd..cb1b45be4 100644 --- a/src/modules/module-avb/es-builder.c +++ b/src/modules/module-avb/es-builder.c @@ -122,6 +122,15 @@ static struct descriptor *es_buidler_desc_avb_interface(struct server *server, avb_mrp_attribute_begin(if_ptr->domain_attr.mrp, 0); avb_mrp_attribute_join(if_ptr->domain_attr.mrp, 0, true); + /* milan-avb: declare VID membership (MVRP) once per interface, held for the + * life of the interface like the SR Domain above — NOT per stream, so that + * destroying one stream cannot withdraw the VLAN other streams still need. */ + avb_mvrp_attribute_new(server->mvrp, &if_ptr->vlan_attr, + AVB_MVRP_ATTRIBUTE_TYPE_VID); + if_ptr->vlan_attr.attr.vid.vlan = htons(AVB_DEFAULT_VLAN); + avb_mrp_attribute_begin(if_ptr->vlan_attr.mrp, 0); + avb_mrp_attribute_join(if_ptr->vlan_attr.mrp, 0, true); + return desc; } diff --git a/src/modules/module-avb/gptp-clock.h b/src/modules/module-avb/gptp-clock.h new file mode 100644 index 000000000..10cf64353 --- /dev/null +++ b/src/modules/module-avb/gptp-clock.h @@ -0,0 +1,132 @@ +/* AVB support */ +/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */ +/* SPDX-License-Identifier: MIT */ + +/* gPTP time read from the NIC PHC (dynamic POSIX clock) mapped onto CLOCK_MONOTONIC_RAW, + * decoupled from the system wall clock so it stays free for NTP. */ + +#ifndef AVB_GPTP_CLOCK_H +#define AVB_GPTP_CLOCK_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define AVB_CLOCKFD 3 +#define AVB_FD_TO_CLOCKID(fd) ((~(clockid_t)(fd) << 3) | AVB_CLOCKFD) +#define AVB_GPTP_REFRESH_NS (10 * SPA_NSEC_PER_MSEC) /* re-anchor phase/freq ~100 Hz */ +#define AVB_GPTP_READ_BRACKET_NS (50 * SPA_NSEC_PER_USEC) /* reject a jittered PHC read */ + +struct avb_gptp_clock { + int phc_fd; + clockid_t phc_id; + bool ok; + uint64_t base_mono; /* CLOCK_MONOTONIC_RAW ns at last anchor */ + uint64_t base_gptp; /* PHC ns at last anchor */ + double ratio; /* d(phc)/d(mono) ~ 1.0 (the frequency offset) */ + uint64_t last_refresh_mono; +}; + +/* Resolve ifname -> PHC index via ETHTOOL_GET_TS_INFO, open /dev/ptpN. >=0 = phc_index. */ +static inline int avb_gptp_clock_open(struct avb_gptp_clock *c, const char *ifname) +{ + struct ethtool_ts_info tsi; + struct ifreq ifr; + char path[32]; + int sock; + + memset(c, 0, sizeof(*c)); + c->phc_fd = -1; + c->ratio = 1.0; + + sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + return -1; + } + memset(&tsi, 0, sizeof(tsi)); + tsi.cmd = ETHTOOL_GET_TS_INFO; + memset(&ifr, 0, sizeof(ifr)); + snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", ifname); + ifr.ifr_data = (void *)&tsi; + if (ioctl(sock, SIOCETHTOOL, &ifr) < 0) { + close(sock); + return -1; + } + close(sock); + if (tsi.phc_index < 0) { + return -1; + } + snprintf(path, sizeof(path), "/dev/ptp%d", tsi.phc_index); + c->phc_fd = open(path, O_RDONLY); + if (c->phc_fd < 0) { + return -1; + } + c->phc_id = AVB_FD_TO_CLOCKID(c->phc_fd); + c->ok = true; + return tsi.phc_index; +} + +/* Re-anchor (mono,gptp) and update the frequency ratio. Off the hot loop (~100 Hz). */ +static inline void avb_gptp_clock_refresh(struct avb_gptp_clock *c) +{ + struct timespec m1, p, m2; + uint64_t mono, gptp; + double r; + + if (!c->ok) { + return; + } + if (clock_gettime(CLOCK_MONOTONIC_RAW, &m1) < 0 || + clock_gettime(c->phc_id, &p) < 0 || + clock_gettime(CLOCK_MONOTONIC_RAW, &m2) < 0) { + return; + } + if (SPA_TIMESPEC_TO_NSEC(&m2) - SPA_TIMESPEC_TO_NSEC(&m1) > AVB_GPTP_READ_BRACKET_NS) { + return; + } + mono = (SPA_TIMESPEC_TO_NSEC(&m1) + SPA_TIMESPEC_TO_NSEC(&m2)) / 2; + gptp = SPA_TIMESPEC_TO_NSEC(&p); + if (c->base_mono != 0 && mono > c->base_mono) { + r = (double)(gptp - c->base_gptp) / (double)(mono - c->base_mono); + if (r > 0.999 && r < 1.001) { + c->ratio += 0.10 * (r - c->ratio); + } + } + c->base_mono = mono; + c->base_gptp = gptp; + c->last_refresh_mono = mono; +} + +/* gPTP time now, in ns; cheap monotonic read + phase/freq map. 0 if no PHC (caller falls back). */ +static inline uint64_t avb_gptp_now(struct avb_gptp_clock *c) +{ + struct timespec ts; + uint64_t mono; + + if (!c->ok) { + return 0; + } + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + mono = SPA_TIMESPEC_TO_NSEC(&ts); + if (c->base_mono == 0 || mono - c->last_refresh_mono > AVB_GPTP_REFRESH_NS) { + avb_gptp_clock_refresh(c); + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + mono = SPA_TIMESPEC_TO_NSEC(&ts); + } + if (c->base_mono == 0) { + return 0; + } + return c->base_gptp + (uint64_t)((double)(mono - c->base_mono) * c->ratio); +} + +#endif /* AVB_GPTP_CLOCK_H */ diff --git a/src/modules/module-avb/gptp.c b/src/modules/module-avb/gptp.c index 8f78d3cc1..1d7e0db51 100644 --- a/src/modules/module-avb/gptp.c +++ b/src/modules/module-avb/gptp.c @@ -230,14 +230,14 @@ static void gptp_invalidate_state(struct gptp *gptp) gptp->path_trace_valid; struct avb_aem_desc_avb_interface *iface; - gptp->data_valid = false; + /* keep last-known GM/data_valid across transient ptp4l query timeouts (see avb_gptp_get_grandmaster_id) */ gptp->data_valid_current = false; gptp->path_trace_valid = false; gptp->path_trace_count = 0; gptp->steps_removed = 0; gptp->offset_from_master_scaled_ns = 0; memset(gptp->clock_id, 0, sizeof(gptp->clock_id)); - memset(gptp->gm_id, 0, sizeof(gptp->gm_id)); + /* gm_id kept (last-known grandmaster) so the ADP does not flap to self on a missed query */ memset(gptp->path_trace, 0, sizeof(gptp->path_trace)); iface = get_avb_interface(gptp); diff --git a/src/modules/module-avb/internal.h b/src/modules/module-avb/internal.h index 4fe8e7e75..f41d50832 100644 --- a/src/modules/module-avb/internal.h +++ b/src/modules/module-avb/internal.h @@ -9,6 +9,8 @@ #include +#include "gptp-clock.h" + #ifdef __cplusplus extern "C" { #endif @@ -38,6 +40,7 @@ struct avb_transport_ops { struct impl { struct pw_loop *loop; + struct pw_loop *data_loop; /* RT (SCHED_FIFO) loop for talker egress pacing */ struct pw_timer_queue *timer_queue; struct pw_context *context; struct spa_hook context_listener; @@ -104,6 +107,11 @@ struct server { uint64_t entity_id; int ifindex; + /* milan-avb: gPTP time read from the NIC PHC (server->ifname), decoupled from the + * system clock. Lazily opened on first use; gclock_tried guards the one-shot open. */ + struct avb_gptp_clock gclock; + unsigned gclock_tried:1; + const struct avb_transport_ops *transport; void *transport_data; diff --git a/src/modules/module-avb/mc-recover.h b/src/modules/module-avb/mc-recover.h new file mode 100644 index 000000000..1d0c856bb --- /dev/null +++ b/src/modules/module-avb/mc-recover.h @@ -0,0 +1,113 @@ +/* AVB support */ +/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */ +/* SPDX-License-Identifier: MIT */ + +/* + * mc-recover.h — AAF media-clock recovery estimator (listener side). + * + * Self-contained and pure (no PipeWire/stream deps) so it can be unit-tested + * in isolation. A second-order DLL (spa_dll) recovers the talker media rate + * from the AAF avtp_timestamp progression: each PDU carries a presentation + * time in the talker's gPTP domain, advancing by frames_per_pdu samples. The + * model clock advances by the DLL-corrected period; the phase error against the + * received timestamp drives the DLL. Recovered rate = nominal / corr. + */ + +#ifndef AVB_MC_RECOVER_H +#define AVB_MC_RECOVER_H + +#include +#include + +#include + +struct mc_recover { + bool init; + struct spa_dll dll; + double corr; /* DLL output (period multiplier, ~1.0) */ + double rate; /* recovered media rate, Hz */ + int32_t last_err_ns; /* last phase error (model vs avtp_ts), ns */ + uint64_t model_ns; /* model presentation clock (DLL-tracked) */ + uint32_t last_avtp_ts; /* previous fed timestamp; model advances by actual PDU count */ + uint64_t pdus; /* PDUs since prime */ +}; + +static inline void mc_recover_reset(struct mc_recover *m, double nominal_rate) +{ + m->init = false; + m->corr = 1.0; + m->rate = nominal_rate; + m->last_err_ns = 0; + m->model_ns = 0; + m->last_avtp_ts = 0; + m->pdus = 0; +} + +/* Feed one PDU's presentation timestamp (low 32 bits of CLOCK_TAI ns). Returns + * the recovered media rate in Hz. nominal_rate/frames_per_pdu/pdu_period_ns + * describe the stream's nominal media clock. */ +static inline double mc_recover_update(struct mc_recover *m, uint32_t avtp_ts, + int frames_per_pdu, int nominal_rate, int64_t pdu_period_ns) +{ + int32_t err_ns; + double err_samples; + uint64_t step; + int32_t raw_delta; + int n_pdus; + + if (!m->init) { + spa_dll_init(&m->dll); + spa_dll_set_bw(&m->dll, SPA_DLL_BW_MIN, frames_per_pdu, nominal_rate); + m->corr = 1.0; + m->rate = nominal_rate; + m->last_err_ns = 0; + m->model_ns = avtp_ts; + m->last_avtp_ts = avtp_ts; + m->pdus = 0; + m->init = true; + return m->rate; + } + + /* Advance the model by the ACTUAL number of nominal PDUs elapsed since the + * last fed timestamp (avtp_ts delta rounded to pdu_period), then measure the + * phase error. Using the real PDU count (not a fixed one-per-call) keeps a + * non-1:1 feed — dropped or coalesced PDUs — from accumulating phase error + * and saturating the loop into a ±ppm hunt. err>0 = talker ahead of model; + * spa_dll returns corr<1, step grows, model catches up (negative feedback); + * recovered rate = nominal*corr. A large jump (>8 PDUs: stream gap, reorder, + * or the bind-transient seed) re-seeds the phase rather than slewing the + * deliberately-slow loop, which otherwise wedges it. */ + raw_delta = (int32_t)(avtp_ts - m->last_avtp_ts); + m->last_avtp_ts = avtp_ts; + n_pdus = (int)((double)raw_delta / (double)pdu_period_ns + 0.5); + if (n_pdus < 1) + n_pdus = 1; + if (n_pdus > 8) { + m->model_ns = avtp_ts; + m->last_err_ns = 0; + m->pdus++; + return m->rate; + } + step = (uint64_t)((double)n_pdus * (double)pdu_period_ns / m->corr + 0.5); + m->model_ns += step; + err_ns = (int32_t)(avtp_ts - (uint32_t)m->model_ns); + m->last_err_ns = err_ns; + err_samples = (double)err_ns * (double)nominal_rate / 1e9; + /* bound the response to a single corrupt/late timestamp */ + if (err_samples > 128.0) + err_samples = 128.0; + else if (err_samples < -128.0) + err_samples = -128.0; + + m->corr = spa_dll_update(&m->dll, err_samples); + /* clamp to ±10 % — far beyond any real media clock; guards 1/corr */ + if (m->corr < 0.9) + m->corr = 0.9; + else if (m->corr > 1.1) + m->corr = 1.1; + m->rate = (double)nominal_rate * m->corr; + m->pdus++; + return m->rate; +} + +#endif /* AVB_MC_RECOVER_H */ diff --git a/src/modules/module-avb/mrp.c b/src/modules/module-avb/mrp.c index 8580bed13..3cd6d8d3d 100644 --- a/src/modules/module-avb/mrp.c +++ b/src/modules/module-avb/mrp.c @@ -108,11 +108,10 @@ static void mrp_periodic(void *data, uint64_t now) if (now > mrp->lva_timer.leave_all_timeout) { - /* 802.1Q-2014 Table 10-5 */ + /* IEEE 802.1Q-2018 Section 10.7.5.20: own LVA timer => TX path only, no RX_LVA */ mrp->lva_timer.state = FSM_LVA_ACTIVE; if (mrp->lva_timer.leave_all_timeout > 0) { mrp->lva_tx_pending = true; - global_event(mrp, now, AVB_MRP_EVENT_RX_LVA); leave_all = true; } } @@ -435,7 +434,6 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now break; case AVB_MRP_EVENT_RX_LV: case AVB_MRP_EVENT_RX_LVA: - case AVB_MRP_EVENT_TX_LVA: case AVB_MRP_EVENT_REDECLARE: switch (state) { case AVB_MRP_IN: @@ -444,6 +442,9 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now break; } break; + case AVB_MRP_EVENT_TX_LVA: + /* IEEE 802.1Q-2018 Table 10-4: TX events do not transition the registrar */ + break; case AVB_MRP_EVENT_FLUSH: switch (state) { case AVB_MRP_LV: @@ -463,11 +464,10 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now default: break; } - if (notify) { - mrp_attribute_emit_notify(a, now, notify); - mrp_emit_notify(mrp, now, &a->attr, notify); - } - + /* commit registrar_state BEFORE notify: callbacks (e.g. notify_talker -> + * refresh_listener_param) read the registrar state, so they must see the new + * one. Emitting notify first made the Listener latch AskingFailed off a stale + * MT state and never recompute -> SRP bridge refused to forward the stream. */ if (a->registrar_state != state || notify) { pw_log_debug("REG: attr %p: %s %s %s -> %s notify=%s", a, a->attr.name, avb_mrp_event_name(event), avb_registrar_state_name(a->registrar_state), @@ -476,6 +476,11 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now a->registrar_state = state; } + if (notify) { + mrp_attribute_emit_notify(a, now, notify); + mrp_emit_notify(mrp, now, &a->attr, notify); + } + state = a->applicant_state; switch (event) { diff --git a/src/modules/module-avb/msrp.c b/src/modules/module-avb/msrp.c index 19c21ecb6..b2458b271 100644 --- a/src/modules/module-avb/msrp.c +++ b/src/modules/module-avb/msrp.c @@ -58,6 +58,16 @@ static void debug_msrp_talker(const struct avb_packet_msrp_talker *t) /* IEEE 802.1Q Section 35.2.2.4.4: Listener may declare Ready only once the matching * Talker Advertise is registered; otherwise it stays in AskingFailed. */ +/* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN, else AskingFailed */ +static void refresh_listener_param(struct stream_common *sc) +{ + bool ta_in = sc->tastream_attr.mrp != NULL && + avb_mrp_attribute_get_registrar_state(sc->tastream_attr.mrp) == AVB_MRP_IN; + sc->lstream_attr.param = ta_in + ? AVB_MSRP_LISTENER_PARAM_READY + : AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; +} + static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) { struct stream_common *sc; @@ -76,12 +86,8 @@ static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, ui sc = SPA_CONTAINER_OF(attr->attr, struct stream_common, tastream_attr); if (sc->stream.direction == SPA_DIRECTION_INPUT) { - if (notify == AVB_MRP_NOTIFY_NEW || notify == AVB_MRP_NOTIFY_JOIN) - sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_READY; - else if (notify == AVB_MRP_NOTIFY_LEAVE) - sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; - /* Milan Table 5.10: TA registrar state flips flags_ex.REGISTERING - * in the listener-side GET_STREAM_INFO answer — emit an unsol. */ + refresh_listener_param(sc); + /* Milan Table 5.10: TA registrar state flips flags_ex.REGISTERING */ avb_aecp_aem_mark_stream_info_dirty(msrp->server, AVB_AEM_DESC_STREAM_INPUT, sc->stream.index); } @@ -101,12 +107,12 @@ static void notify_talker_failed(struct msrp *msrp, uint64_t now, struct attr *a handle_evt_tk_registration_failed(msrp->server->acmp, attr->attr, now); } - /* Milan Table 5.10: TF registrar state also flips flags_ex.REGISTERING - * on the listener side; emit an unsol when it changes. */ sc = SPA_CONTAINER_OF(attr->attr, struct stream_common, tfstream_attr); - if (sc->stream.direction == SPA_DIRECTION_INPUT) + if (sc->stream.direction == SPA_DIRECTION_INPUT) { + refresh_listener_param(sc); avb_aecp_aem_mark_stream_info_dirty(msrp->server, AVB_AEM_DESC_STREAM_INPUT, sc->stream.index); + } } static int process_talker(struct msrp *msrp, uint64_t now, uint8_t attr_type, @@ -325,18 +331,22 @@ static int process_domain(struct msrp *msrp, uint64_t now, uint8_t attr_type, continue; } - if (msrp->server->avb_mode == AVB_MODE_MILAN_V12) - { - /** Milan V1.2 Section 4.2.7.2.1: - The endstation shall re-adjust the domain according - to the from the MSRPDU received on its interface */ - bool mismatch = (a->attr->attr.domain.sr_class_id != d->sr_class_id - || a->attr->attr.domain.sr_class_priority != d->sr_class_priority + if (msrp->server->avb_mode == AVB_MODE_MILAN_V12) { + /* Milan v1.2 Section 4.2.7.2.1: re-adjust scoped to matching sr_class_id only */ + bool mismatch; + if (a->attr->attr.domain.sr_class_id != d->sr_class_id) + continue; + + mismatch = (a->attr->attr.domain.sr_class_priority != d->sr_class_priority || a->attr->attr.domain.sr_class_vid != d->sr_class_vid); if (mismatch) { - pw_log_info("Domain mismatch re-adjusting"); - a->attr->attr.domain = *d; + pw_log_info("Domain re-adjust sr_class_id=%u prio %u->%u vid %u->%u", + d->sr_class_id, a->attr->attr.domain.sr_class_priority, + d->sr_class_priority, ntohs(a->attr->attr.domain.sr_class_vid), + ntohs(d->sr_class_vid)); + a->attr->attr.domain.sr_class_priority = d->sr_class_priority; + a->attr->attr.domain.sr_class_vid = d->sr_class_vid; avb_mrp_attribute_leave(a->attr->mrp, now); avb_mrp_attribute_begin(a->attr->mrp, now); avb_mrp_attribute_join(a->attr->mrp, now, true); diff --git a/src/modules/module-avb/play-loop.h b/src/modules/module-avb/play-loop.h new file mode 100644 index 000000000..8d9970ad6 --- /dev/null +++ b/src/modules/module-avb/play-loop.h @@ -0,0 +1,63 @@ +/* AVB support */ +/* SPDX-FileCopyrightText: Copyright © 2025 Kebag-Logic */ +/* SPDX-License-Identifier: MIT */ + +/* + * play-loop.h — consume-side actuator for the listener. + * + * Pure (no PipeWire deps) so it can be unit-tested like mc-recover.h. Keeps the + * listener ring at a target fill by trimming the output resampler ratio + * (SPA_IO_RateMatch). Same loop as module-rtp's receiver: + * error = target - avail; corr = spa_dll_update(dll, error); rate = ff / corr + * ff = nominal/recovered rate feeds the recovered clock forward; the DLL trims + * the rest. The sign is the trap (same as the old mc_recover bug); test_play_loop + * locks it: converges on the right sign, diverges on the wrong one. + */ + +#ifndef AVB_PLAY_LOOP_H +#define AVB_PLAY_LOOP_H + +#include + +#include +#include + +struct play_loop { + bool init; + struct spa_dll dll; + double corr; /* DLL output, ~1.0 */ + double rate; /* last applied resampler ratio (ff / corr) */ +}; + +static inline void play_loop_reset(struct play_loop *p) +{ + p->init = false; + p->corr = 1.0; + p->rate = 1.0; +} + +/* One step. error_samples = target - avail; max_error clamps a transient; + * ff_ratio = nominal/recovered rate (1.0 = no feedforward). Returns the ratio + * to apply via pw_stream_set_rate(). */ +static inline double play_loop_update(struct play_loop *p, double error_samples, + double max_error, double ff_ratio, unsigned period, unsigned rate) +{ + if (!p->init) { + spa_dll_init(&p->dll); + spa_dll_set_bw(&p->dll, SPA_DLL_BW_MIN, period, rate); + p->corr = 1.0; + p->rate = ff_ratio; + p->init = true; + } + error_samples = SPA_CLAMPD(error_samples, -max_error, max_error); + p->corr = spa_dll_update(&p->dll, error_samples); + /* clamp ±10 %, guards 1/corr */ + if (p->corr < 0.9) + p->corr = 0.9; + else if (p->corr > 1.1) + p->corr = 1.1; + p->rate = ff_ratio / p->corr; + return p->rate; +} + +#endif /* AVB_PLAY_LOOP_H */ diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index 4b2cdd56a..2687ef1b6 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -39,7 +39,7 @@ * absent. * * So an output stream owns its own periodic timer (`flush_timer`, - * AVB_FLUSH_TICK_NS = 1 ms = 8 PDUs). Each tick: + * AVB_FLUSH_TICK_NS = 125 us = one PDU at 48 kHz/6-frame). Each tick: * * 1. computes how many PDUs are owed since the last drain * (`(now - flush_last_ns) / pdu_period`), @@ -82,20 +82,22 @@ * * Per-counter wiring status (Milan Section 5.4.5.3, Table 5.16 Stream Input): * FRAMES_RX live: handle_aaf_packet / handle_iec61883_packet - * STREAM_INTERRUPTED live: ringbuffer overrun in the same handlers + * STREAM_INTERRUPTED live: handle_aaf_packet, on the loss of several + * AVTPDUs (seq gap >= AVB_STREAM_INTERRUPT_MIN_LOST) * MEDIA_LOCKED live: first-frame edge in handle_*_packet * MEDIA_UNLOCKED live: cmd-get-counters periodic when last_frame_rx_ns * ages past MEDIA_UNLOCK_TIMEOUT_NS - * SEQ_NUM_MISMATCH TODO: compare p->seq_num against expected (last + 1 - * modulo 256), tick on mismatch and resync expected + * SEQ_NUM_MISMATCH live: handle_aaf_packet, p->seq_num != expected + * (last + 1 mod 256); resyncs expected each frame * MEDIA_RESET_IN TODO: tick when AVTPDU header sets the mr bit * (header reset notification) * TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header - * UNSUPPORTED_FORMAT TODO: tick when subtype/format mismatch the bound - * descriptor's current_format - * LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now + * UNSUPPORTED_FORMAT live: handle_aaf_packet drops + ticks per PDU any AAF PDU + * whose format != the Stream Input current format + * LATE_TIMESTAMP TODO: tick when p->timestamp < stream_gptp_now() (the PHC, + * NOT CLOCK_TAI — no phc2sys, so the system clock is not gPTP) * (frame missed its presentation deadline) - * EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time + * EARLY_TIMESTAMP TODO: tick when p->timestamp > stream_gptp_now() (PHC) + max_transit_time * (frame arrived too far ahead of its deadline) * Table 5.17 Stream Output: * FRAMES_TX live: per send in flush_write_milan_v12 / _legacy @@ -108,6 +110,7 @@ * -------------------------------------------------------------------------- */ +#include #include #include #include @@ -118,6 +121,7 @@ #include #include #include +#include #include "aaf.h" #include "iec61883.h" @@ -159,9 +163,9 @@ static inline void stream_out_mark_counters_dirty(struct stream *s) so->counters_dirty = true; } -#define AVB_FLUSH_TICK_NS ((uint64_t)1000000) +#define AVB_FLUSH_TICK_NS ((uint64_t)(125 * SPA_NSEC_PER_USEC)) -static int flush_write_milan_v12(struct stream *stream, uint64_t current_time); +static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, int max_pdus); static int flush_write_legacy(struct stream *stream, uint64_t current_time); static void on_stream_destroy(void *d) @@ -208,38 +212,92 @@ static void pad_ringbuffer_with_silence(struct stream *stream, int owed) spa_ringbuffer_write_update(&stream->ring, index + (uint32_t)deficit); } +/* milan-avb: gPTP time (ns) from the NIC PHC of server->ifname; 0 if no PHC. See gptp-clock.h. */ +static uint64_t stream_gptp_now(struct server *server) +{ + if (!server->gclock.ok && !server->gclock_tried) { + server->gclock_tried = 1; + if (avb_gptp_clock_open(&server->gclock, server->ifname) >= 0) { + pw_log_info("milan-avb: gptp clock = PHC of %s", server->ifname); + } else { + pw_log_warn("milan-avb: no PHC for %s", server->ifname); + } + } + return avb_gptp_now(&server->gclock); +} + static void on_flush_tick(void *data, uint64_t expirations) { struct stream *stream = data; struct server *server = stream->server; - struct timespec now_ts; - uint64_t now_ns; + struct timespec ts; + uint64_t now_mono, now_gptp, stamp; int owed; (void)expirations; - if (clock_gettime(CLOCK_TAI, &now_ts) < 0) - return; - now_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); + /* Pace the flush drain off CLOCK_MONOTONIC, the SAME clock as the graph-fill driver (the drive_timer runs on a CLOCK_MONOTONIC timerfd, which cannot be _RAW) so producer and consumer of the ring stay rate-matched; gPTP is used only for the AVTP timestamp. Pacing on _RAW here decouples drain from fill -> ring drift -> glitch noise (measured -55dB vs -99dB THD+N). The independent gPTP/PHC reference still uses CLOCK_MONOTONIC_RAW (gptp-clock.h). */ + clock_gettime(CLOCK_MONOTONIC, &ts); + now_mono = SPA_TIMESPEC_TO_NSEC(&ts); + now_gptp = stream_gptp_now(server); + stamp = now_gptp != 0 ? now_gptp : now_mono; - if (stream->flush_last_ns == 0) { - stream->flush_last_ns = now_ns; + if (stream->pdu_period == 0) { return; } - if (stream->pdu_period == 0) + if (stream->flush_last_ns == 0) { + stream->flush_last_ns = now_mono; return; + } - owed = (int)((now_ns - stream->flush_last_ns) / (uint64_t)stream->pdu_period); - if (owed <= 0) + owed = (int)((now_mono - stream->flush_last_ns) / (uint64_t)stream->pdu_period); + if (owed <= 0) { return; + } stream->flush_last_ns += (uint64_t)owed * (uint64_t)stream->pdu_period; pad_ringbuffer_with_silence(stream, owed); - if (server->avb_mode == AVB_MODE_MILAN_V12) - flush_write_milan_v12(stream, now_ns); - else - flush_write_legacy(stream, now_ns); + if (server->avb_mode == AVB_MODE_MILAN_V12) { + flush_write_milan_v12(stream, stamp, owed); + } else { + flush_write_legacy(stream, stamp); + } +} + +/* Talker egress pacing runs on the RT data loop (impl->data_loop); a source cannot be added/removed off-thread, so the flush timer is created and destroyed ON the RT thread via pw_loop_invoke. */ +static int do_add_flush_timer(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct stream *stream = user_data; + struct pw_loop *dl = stream->server->impl->data_loop; + struct timespec value = { + .tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC), + .tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC), + }; + struct timespec interval = value; + + stream->flush_last_ns = 0; + stream->flush_timer = pw_loop_add_timer(dl, on_flush_tick, stream); + if (stream->flush_timer != NULL) { + pw_loop_update_timer(dl, stream->flush_timer, &value, &interval, false); + } else { + pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)", stream); + } + return 0; +} + +static int do_remove_flush_timer(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct stream *stream = user_data; + + if (stream->flush_timer != NULL) { + pw_loop_destroy_source(stream->server->impl->data_loop, stream->flush_timer); + stream->flush_timer = NULL; + stream->flush_last_ns = 0; + } + return 0; } static void on_source_stream_process(void *data) @@ -263,17 +321,115 @@ static void on_source_stream_process(void *data) avail = spa_ringbuffer_get_read_index(&stream->ring, &index); - if (avail < wanted) { - pw_log_debug("capture underrun %d < %d", avail, wanted); + /* milan-avb: latency observability (throttled, env-gated). */ + if (getenv("MILAN_AVB_LATENCY_LOG")) { + static uint64_t last_log_ns = 0; + struct timespec ts_mono; + uint64_t now_mono_ns; + clock_gettime(CLOCK_MONOTONIC, &ts_mono); + now_mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono); + if (now_mono_ns - last_log_ns >= SPA_NSEC_PER_SEC) { + uint64_t residency_ns = stream->stride > 0 + ? (uint64_t)avail * SPA_NSEC_PER_SEC + / ((uint64_t)stream->stride * (uint64_t)stream->info.info.raw.rate) + : 0; + pw_log_info("milan-avb: lat C residency_bytes=%d residency_ns=%llu wanted=%u", + avail, (unsigned long long)residency_ns, (unsigned)n_bytes); + last_log_ns = now_mono_ns; + } + } + + /* milan-avb: consume-side actuator, FOLLOWER path only; when avb.source DRIVES the graph at the recovered mc.rate there is no resampler on its output, so we deliver the ring samples 1:1 (bit-perfect) and must NOT trim a ratio. */ + if (!stream->driving && stream->mc_aaf_active && stream->io_rate_match != NULL) { + uint32_t rate = stream->info.info.raw.rate; + int32_t avail_samples = avail / (int32_t)stream->stride; + uint32_t quantum = buf->requested ? (uint32_t)buf->requested : + (stream->io_position ? stream->io_position->clock.duration : 1024); + int32_t ring_samples = (int32_t)(stream->buffer_size / stream->stride); + /* Target ~½ quantum: where the ring sits on average so it is reachable; a full quantum never is, so the error saturates and the DLL winds up. */ + int32_t target = (int32_t)(quantum / 2); + double max_error = 2.0 * rate / 1000.0; /* 2 ms, == module-rtp ERROR_MSEC */ + double ff, error, r; + const char *env_target = getenv("MILAN_AVB_PLAY_TARGET"); + + if (env_target) { + target = atoi(env_target); + } + if (target < (int32_t)(rate / 1000)) { /* >= ~1 ms underrun margin */ + target = (int32_t)(rate / 1000); + } + if (target > ring_samples / 2) { /* keep well inside the ring */ + target = ring_samples / 2; + } + stream->play_target = target; + + ff = stream->mc.rate > 1.0 ? (double)rate / stream->mc.rate : 1.0; + error = (double)target - (double)avail_samples; + r = play_loop_update(&stream->play, error, max_error, ff, quantum, rate); + pw_stream_set_rate(stream->stream, r); + } else if (stream->play.init) { + /* clock source switched away from AAF: release the resampler so the graph free-runs at nominal again, and re-prime for next engage. */ + pw_stream_set_rate(stream->stream, 1.0); + play_loop_reset(&stream->play); + } + + /* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI) next to mc.rate and the actuator state. */ + if (stream->mc_aaf_active || getenv("MILAN_AVB_PLAY_LOG")) { + struct timespec ts_mono; + uint64_t mono_ns; + /* CLOCK_MONOTONIC (NOT _RAW): mono_ns is offset against pwt.now below, which PipeWire reports in the CLOCK_MONOTONIC domain — they must match. */ + clock_gettime(CLOCK_MONOTONIC, &ts_mono); + mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono); + if (!stream->play_primed || + mono_ns - stream->play_log_last_ns >= SPA_NSEC_PER_SEC) { + struct pw_time pwt; + if (pw_stream_get_time_n(stream->stream, &pwt, sizeof(pwt)) == 0) { + uint64_t tai_ns, consume_tai; + /* milan-avb: gPTP time from the PHC so the consume clock stays in the gPTP domain even with NTP on the system clock. */ + tai_ns = stream_gptp_now(stream->server); + consume_tai = (uint64_t)pwt.now + (tai_ns - mono_ns); + if (stream->play_primed) { + int64_t dticks = (int64_t)(pwt.ticks - stream->play_last_ticks); + int64_t dtai = (int64_t)(consume_tai - stream->play_last_consume_tai); + double local_rate = dtai > 0 + ? (double)dticks * 1e9 / (double)dtai : 0.0; + pw_log_info("milan-avb: play measure local_rate=%.4f Hz " + "mc.rate=%.4f corr=%.6f err_ns=%d ticks=%llu | " + "actuator rate=%.6f play_corr=%.6f target=%d avail=%d", + local_rate, stream->mc.rate, stream->mc.corr, + stream->mc.last_err_ns, + (unsigned long long)pwt.ticks, + stream->play.rate, stream->play.corr, + stream->play_target, avail / (int32_t)stream->stride); + } + stream->play_last_ticks = pwt.ticks; + stream->play_last_consume_tai = consume_tai; + stream->play_log_last_ns = mono_ns; + stream->play_primed = true; + } + } + } + + /* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */ + if (avail <= 0) { memset(d[0].data, 0, n_bytes); - } else { + } else if ((uint32_t)avail >= n_bytes) { spa_ringbuffer_read_data(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, d[0].data, n_bytes); - index += n_bytes; - spa_ringbuffer_read_update(&stream->ring, index); + spa_ringbuffer_read_update(&stream->ring, index + n_bytes); + } else { + uint32_t use = (uint32_t)avail; + spa_ringbuffer_read_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + d[0].data, use); + memset(SPA_PTROFF(d[0].data, use, void), 0, n_bytes - use); + spa_ringbuffer_read_update(&stream->ring, index + use); + pw_log_debug("capture partial-underrun %u/%u", use, n_bytes); } d[0].chunk->size = n_bytes; @@ -284,9 +440,39 @@ static void on_source_stream_process(void *data) pw_stream_queue_buffer(stream->stream, buf); } +static void on_source_stream_io_changed(void *data, uint32_t id, + void *area, uint32_t size) +{ + struct stream *stream = data; + const char *name; + + switch (id) { + case SPA_IO_RateMatch: + stream->io_rate_match = area; + name = "RateMatch"; + break; + case SPA_IO_Position: + stream->io_position = area; + name = "Position"; + break; + case SPA_IO_Clock: name = "Clock"; break; + case SPA_IO_Buffers: name = "Buffers"; break; + default: name = "?"; break; + } + /* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator knob) on this source. */ + pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u", + id, name, area, (unsigned)size); +} + +/* generic: arms the self-driving timer on STREAMING (defined below, used by both source and sink stream-event tables). */ +static void on_sink_stream_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error); + static const struct pw_stream_events source_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, + .state_changed = on_sink_stream_state_changed, + .io_changed = on_source_stream_io_changed, .process = on_source_stream_process }; @@ -300,25 +486,42 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, iov[1].iov_base = buffer; } -static int flush_write_milan_v12(struct stream *stream, uint64_t current_time) +static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, int max_pdus) { int32_t avail; uint32_t index; - uint64_t ptime, txtime; + uint64_t ptime; int pdu_count; ssize_t n; struct avb_frame_header *h = (void*)stream->pdu; struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); + uint64_t base; + int64_t err; avail = spa_ringbuffer_get_read_index(&stream->ring, &index); pdu_count = (avail / stream->stride) / stream->frames_per_pdu; + /* Pace to real time: drain only what is due this tick so the ETF launch schedule cannot run ahead and overflow the qdisc backlog. */ + if (pdu_count > max_pdus) { + pdu_count = max_pdus; + } - txtime = current_time + stream->t_uncertainty; - ptime = txtime + stream->mtt; + /* M2: monotonic AVTP timestamps anchored to the PHC; advance by pdu_period per PDU and slow-leak (err/1024) toward the live PHC so the rate reflects the real gPTP media clock without per-tick interpolation jitter (audible FM at the listener); re-anchor hard on a >1s gap (gPTP re-converge). */ + base = current_time + stream->t_uncertainty + stream->mtt; + if (stream->tx_pts == 0) { + stream->tx_pts = base; + } else { + err = (int64_t)(base - stream->tx_pts); + if (err > (int64_t)SPA_NSEC_PER_SEC || err < -(int64_t)SPA_NSEC_PER_SEC) { + stream->tx_pts = base; + } else { + stream->tx_pts += err / 1024; + } + } + ptime = stream->tx_pts; while (pdu_count--) { - *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; + /* CBS-exclusive: no SCM_TXTIME; txtime feeds ptime only */ set_iovec(&stream->ring, stream->buffer_data, @@ -332,15 +535,17 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time) n = avb_server_stream_send(stream->server, stream, &stream->msg, MSG_NOSIGNAL); - if (n < 0 || n != (ssize_t)stream->pdu_size) + if (n < 0 || n != (ssize_t)stream->pdu_size) { pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); - else + } else { stream_out_counters(stream)->frame_tx++; - txtime += stream->pdu_period; + } ptime += stream->pdu_period; index += stream->payload_size; } + /* M2: keep the accumulator monotonic across ticks (advance by emitted PDUs). */ + stream->tx_pts = ptime; stream_out_mark_counters_dirty(stream); spa_ringbuffer_read_update(&stream->ring, index); @@ -366,7 +571,7 @@ static int flush_write_legacy(struct stream *stream, uint64_t current_time) ptime = txtime + stream->mtt; while (pdu_count--) { - *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; + /* CBS-exclusive: no SCM_TXTIME; txtime feeds ptime only */ set_iovec(&stream->ring, stream->buffer_data, @@ -522,18 +727,144 @@ static int setup_msg(struct stream *stream) stream->msg.msg_namelen = sizeof(stream->sock_addr); stream->msg.msg_iov = stream->iov; stream->msg.msg_iovlen = 3; - stream->msg.msg_control = stream->control; - stream->msg.msg_controllen = sizeof(stream->control); - stream->cmsg = CMSG_FIRSTHDR(&stream->msg); - stream->cmsg->cmsg_level = SOL_SOCKET; - stream->cmsg->cmsg_type = SCM_TXTIME; - stream->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); + /* CBS/Qav-exclusive: no SCM_TXTIME control message -- CBS and SO_TXTIME cannot coexist; the egress CBS qdisc paces the stream. */ + stream->msg.msg_control = NULL; + stream->msg.msg_controllen = 0; + stream->cmsg = NULL; return 0; } +/* milan-avb: arm the self-driving one-shot timer at absolute time `when` (ns on CLOCK_MONOTONIC); when==0 disarms; runs on the RT data loop. */ +static void set_drive_timeout(struct stream *stream, uint64_t when) +{ + struct timespec ts; + struct timespec interval = { 0, 0 }; + + if (stream->drive_timer == NULL) { + return; + } + ts.tv_sec = (time_t)(when / SPA_NSEC_PER_SEC); + ts.tv_nsec = (long)(when % SPA_NSEC_PER_SEC); + pw_loop_update_timer(stream->server->impl->data_loop, + stream->drive_timer, &ts, &interval, true); +} + +/* milan-avb: graph driver tick (pipe-tunnel pattern); fires once per quantum, fills io_position->clock so the core schedules followers against our clock, re-arms the next tick, then triggers the cycle exactly once from the data loop (never re-entrantly from process()). */ +static void on_drive_timeout(void *data, uint64_t expirations) +{ + struct stream *stream = data; + struct spa_io_position *pos = stream->io_position; + struct timespec ts; + uint64_t duration = 1024, mono_now, nominal_ns; + uint32_t rate = 48000; + uint64_t phc_now; + uint64_t this_time; + double nom; + + (void)expirations; + if (!stream->driving) { + return; + } + + if (pos != NULL) { + if (pos->clock.target_duration != 0) { + duration = pos->clock.target_duration; + } + if (pos->clock.target_rate.denom != 0) { + rate = pos->clock.target_rate.denom; + } + } + + clock_gettime(CLOCK_MONOTONIC, &ts); + mono_now = SPA_TIMESPEC_TO_NSEC(&ts); + nominal_ns = duration * SPA_NSEC_PER_SEC / rate; + + /* LISTENER (avb.source): pace at the RECOVERED talker rate (mc.rate from mc_recover) so the ring drain rate == the AAF arrival rate and process() delivers samples 1:1 with no resampling (bit-perfect, sample-locked). */ + if (stream->direction == SPA_DIRECTION_INPUT && + stream->mc_aaf_active && stream->mc.rate > 1.0) { + nominal_ns = (uint64_t)((double)duration * (double)SPA_NSEC_PER_SEC + / stream->mc.rate); + } + + /* TALKER (sink): pace at the EXACT nominal rate so the exported clock has rate_diff==1.0 CONSTANT; a varying rate_diff makes pw-cat's adapter resample (FM baked into the wire), 1.0 gives adapter passthrough = bit-perfect, and the listener recovers the rate from timestamp arrival. */ + phc_now = stream_gptp_now(stream->server); + (void)phc_now; + stream->drive_phc_last = phc_now; + stream->drive_mono_last = mono_now; + + /* Export the SMOOTH scheduled time (not the jittery wake-up mono_now) so the follower resampler sees an evenly-paced clock; rate_diff=nom/nominal keeps nsec/next_nsec/duration/rate_diff self-consistent (pipe-tunnel sets corr, not 1.0). */ + this_time = stream->drive_next_time; + nom = (double)duration * (double)SPA_NSEC_PER_SEC / (double)rate; + stream->drive_next_time += nominal_ns; + if (pos != NULL) { + pos->clock.nsec = this_time; + pos->clock.rate = pos->clock.target_rate; + pos->clock.position += pos->clock.duration; + pos->clock.duration = pos->clock.target_duration; + pos->clock.delay = 0; + pos->clock.rate_diff = nominal_ns > 0 ? nom / (double)nominal_ns : 1.0; + pos->clock.next_nsec = stream->drive_next_time; + } + + set_drive_timeout(stream, stream->drive_next_time); + pw_stream_trigger_process(stream->stream); +} + +/* milan-avb: avb.sink/avb.source is created as a DRIVER; when it reaches STREAMING and the core elected it (pw_stream_is_driving), start the self-driving timer. */ +static void on_sink_stream_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct stream *stream = data; + struct timespec ts; + + (void)old; (void)error; + switch (state) { + case PW_STREAM_STATE_STREAMING: + stream->driving = pw_stream_is_driving(stream->stream); + pw_log_info("milan-avb: avb.sink STREAMING driving=%d", stream->driving); + if (stream->driving) { + clock_gettime(CLOCK_MONOTONIC, &ts); + stream->drive_next_time = SPA_TIMESPEC_TO_NSEC(&ts); + stream->drive_phc_last = 0; + stream->drive_mono_last = 0; + stream->drive_ratio_ema = 0.0; + set_drive_timeout(stream, stream->drive_next_time); + } + break; + case PW_STREAM_STATE_PAUSED: + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + stream->driving = false; + set_drive_timeout(stream, 0); + break; + default: + break; + } +} + +/* milan-avb: capture the driver clock/position areas the core hands the driver node. */ +static void on_sink_stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + struct stream *stream = data; + + (void)size; + switch (id) { + case SPA_IO_Position: + stream->io_position = area; + break; + case SPA_IO_RateMatch: + stream->io_rate_match = area; + break; + default: + break; + } +} + static const struct pw_stream_events sink_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, + .state_changed = on_sink_stream_state_changed, + .io_changed = on_sink_stream_io_changed, .process = on_sink_stream_process }; @@ -553,10 +884,8 @@ struct stream *server_create_stream(struct server *server, struct stream *stream stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->vlan_id = AVB_DEFAULT_VLAN; stream->mtt = 2000000; - /* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is - * the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe - * default until we have a way to measure it from gPTP. */ - stream->t_uncertainty = 125000; + /* TX timestamp jitter budget added on top of the gPTP (PHC) time; 125 µs is the upper bound at 1 GbE class-A per IEEE 802.1Qav, safe default until we measure it from gPTP. */ + stream->t_uncertainty = 0; stream->id = (uint64_t)server->mac_addr[0] << 56 | (uint64_t)server->mac_addr[1] << 48 | @@ -578,7 +907,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream PW_KEY_MEDIA_CLASS, "Audio/Source", PW_KEY_NODE_NAME, "avb.source", PW_KEY_NODE_DESCRIPTION, "AVB Source", - PW_KEY_NODE_WANT_DRIVER, "true", + /* milan-avb: avb.source IS the listener's media clock; it drives the graph at the recovered talker rate (mc.rate) so consumers run sample-locked (no resampling, bit-perfect); NODE_DRIVER + high priority elects it over the fallback Dummy-Driver. */ + PW_KEY_NODE_DRIVER, "true", + PW_KEY_PRIORITY_DRIVER, "300000", NULL)); } else { stream->stream = pw_stream_new(server->impl->core, "sink", @@ -586,7 +917,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream PW_KEY_MEDIA_CLASS, "Audio/Sink", PW_KEY_NODE_NAME, "avb.sink", PW_KEY_NODE_DESCRIPTION, "AVB Sink", - PW_KEY_NODE_WANT_DRIVER, "true", + /* milan-avb: avb.sink IS the graph driver (self-clocked off the AVTP/PHC rate), not a follower; NODE_DRIVER + high PRIORITY_DRIVER elect it over the fallback Dummy-Driver (priority 200000) so pw-cat clocks to us. */ + PW_KEY_NODE_DRIVER, "true", + PW_KEY_PRIORITY_DRIVER, "300000", NULL)); } @@ -614,7 +947,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream if (stream->format) avb_aem_stream_format_decode(stream->format, &fi); - stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE; + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S32_BE; stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED; stream->info.info.raw.rate = fi.is_audio && fi.rate ? fi.rate : 48000; stream->info.info.raw.channels = fi.is_audio && fi.channels ? fi.channels : 8; @@ -632,10 +965,21 @@ struct stream *server_create_stream(struct server *server, struct stream *stream PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_INACTIVE | - PW_STREAM_FLAG_RT_PROCESS, + PW_STREAM_FLAG_RT_PROCESS | + /* milan-avb: both directions drive the graph themselves (talker off its media clock, listener off the recovered AAF clock), staying INACTIVE until a Milan ACMP/MSRP connection activates them. */ + PW_STREAM_FLAG_DRIVER, params, n_params)) < 0) goto error_free_stream; + /* milan-avb: the self-driving timer lives on the RT data loop and is armed once the stream reaches STREAMING (state_changed); both directions drive (talker off its media clock, listener off the recovered AAF clock). */ + if (!stream->is_crf) { + stream->drive_timer = pw_loop_add_timer(server->impl->data_loop, + on_drive_timeout, stream); + if (stream->drive_timer == NULL) { + pw_log_warn("avb stream: no drive_timer; core will pick a driver"); + } + } + stream->frames_per_pdu = 6; stream->pdu_period = SPA_NSEC_PER_SEC * stream->frames_per_pdu / stream->info.info.raw.rate; @@ -669,15 +1013,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream goto error_free; } - /* Milan Section 5.3.8.8 / Section 5.4.2.10.1.1: a Listener observes foreign - * Talker Advertise PDUs matching the bound talker's stream_id. - * Create the registrar attribute now (stream_id is set later at - * BIND_RX, cleared at UNBIND_RX) and start its FSM without a - * join — we are an observer, not a declarant. Once a matching TA - * arrives from the wire, msrp.c populates attr.talker - * (accumulated_latency, dest_addr, vlan_id) and moves the - * registrar to IN. The Listener side reads those fields to - * answer GET_STREAM_INFO with real msrp_accumulated_latency. */ + /* Milan Section 5.3.8.8 / 5.4.2.10.1.1: a Listener observes foreign Talker Advertise PDUs matching the bound talker's stream_id; create the registrar attribute now (stream_id set later at BIND_RX, cleared at UNBIND_RX) and start its FSM without a join (observer, not declarant); once a matching TA arrives msrp.c populates attr.talker (accumulated_latency, dest_addr, vlan_id), moves the registrar to IN, and the Listener answers GET_STREAM_INFO with the real msrp_accumulated_latency. */ res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr, AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); if (res) { @@ -698,19 +1034,19 @@ struct stream *server_create_stream(struct server *server, struct stream *stream goto error_free; } - /* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker - * stream_id so foreign Listener declarations from peers are - * delivered to it via process_listener and observed through - * notify_listener (sets listener_observed on stream_output_state). */ + /* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker stream_id so foreign Listener declarations from peers reach it via process_listener and are observed through notify_listener (sets listener_observed on stream_output_state). */ common->lstream_attr.attr.listener.stream_id = htobe64(stream->id); common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id); - if (server->avb_mode == AVB_MODE_MILAN_V12) + /* Milan v1.2 Section 4.3.3.2 Table 4.4: MaxFrameSize is the AVTPDU (header + payload) ONLY plus 1 byte for PAAD sampling-clock drift; the Ethernet header and FCS are added by the bandwidth rule (F = MaxFrameSize + 22), so exclude our avb_frame_header (the L2 header) from pdu_size. */ + if (server->avb_mode == AVB_MODE_MILAN_V12) { common->tastream_attr.attr.talker.tspec_max_frame_size = - htons((uint16_t)stream->pdu_size); - else + htons((uint16_t)(stream->pdu_size - + sizeof(struct avb_frame_header) + 1)); + } else { common->tastream_attr.attr.talker.tspec_max_frame_size = htons((uint16_t)(32 + stream->frames_per_pdu * stream->stride)); + } common->tastream_attr.attr.talker.tspec_max_interval_frames = htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT); common->tastream_attr.attr.talker.priority = stream->prio; @@ -732,6 +1068,11 @@ error_free: void stream_destroy(struct stream *stream) { struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); + uint64_t now; + + /* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart or replug doesn't strand a stale reservation on the bridge (socket still open here). */ + now = stream_gptp_now(stream->server); + stream_deactivate(stream, now); if (stream->direction == SPA_DIRECTION_INPUT) { struct aecp_aem_stream_input_state *si = @@ -746,6 +1087,17 @@ void stream_destroy(struct stream *stream) avb_mrp_attribute_destroy(common->tastream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); } + + if (stream->drive_timer != NULL) { + set_drive_timeout(stream, 0); + pw_loop_destroy_source(stream->server->impl->data_loop, stream->drive_timer); + stream->drive_timer = NULL; + } + + if (stream->raw_dump_fp) { + fclose(stream->raw_dump_fp); + stream->raw_dump_fp = NULL; + } } static int setup_socket(struct stream *stream) @@ -753,40 +1105,191 @@ static int setup_socket(struct stream *stream) return avb_server_stream_setup_socket(stream->server, stream); } +/* milan-avb: media-clock recovery; returns the CLOCK_SOURCE descriptor selected by CLOCK_DOMAIN 0 (or NULL); selection is clock_source_index, set at boot (Internal=0) and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */ +static struct avb_aem_desc_clock_source *selected_clock_source(struct server *server) +{ + struct descriptor *dom; + struct descriptor *src; + struct avb_aem_desc_clock_domain *d; + uint16_t idx; + + dom = server_find_descriptor(server, AVB_AEM_DESC_CLOCK_DOMAIN, 0); + if (dom == NULL) + return NULL; + d = descriptor_body(dom); + idx = ntohs(d->clock_source_index); + src = server_find_descriptor(server, AVB_AEM_DESC_CLOCK_SOURCE, idx); + if (src == NULL) + return NULL; + return descriptor_body(src); +} + +/* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose location points at this listener stream; CRF (MEDIA_CLOCK_STREAM) is out of scope and returns false. */ +static bool stream_mc_aaf_selected(struct stream *stream) +{ + struct avb_aem_desc_clock_source *cs; + + if (stream->direction != SPA_DIRECTION_INPUT) + return false; + cs = selected_clock_source(stream->server); + if (cs == NULL) + return false; + if (ntohs(cs->clock_source_type) != AVB_AEM_DESC_CLOCK_SOURCE_TYPE_INPUT_STREAM) + return false; + if (ntohs(cs->clock_source_location_type) != AVB_AEM_DESC_STREAM_INPUT) + return false; + return ntohs(cs->clock_source_location_index) == stream->index; +} + +static void stream_mc_reset(struct stream *stream) +{ + mc_recover_reset(&stream->mc, stream->info.info.raw.rate); + play_loop_reset(&stream->play); +} + +void avb_stream_update_clock_source(struct server *server) +{ + struct stream *s; + + spa_list_for_each(s, &server->streams, link) { + bool active; + + if (s->direction != SPA_DIRECTION_INPUT) + continue; + active = stream_mc_aaf_selected(s); + if (active && !s->mc_aaf_active) + stream_mc_reset(s); + s->mc_aaf_active = active; + pw_log_info("milan-avb: stream %u media-clock source -> %s", + s->index, active ? "AAF (recovered)" : "internal/gPTP"); + } +} + +/* Recover the talker media rate from a PDU's avtp_timestamp (which carries the talker media clock in gPTP time); inter-PDU deltas give the rate, a second-order DLL (spa_dll) tracks phase+frequency, drives mc_rate. */ +static void stream_mc_recover(struct stream *stream, const struct avb_packet_aaf *p) +{ + uint32_t avtp_ts; + double rate; + + if (!stream->mc_aaf_active || !p->tv) { + return; + } + + avtp_ts = ntohl(p->timestamp); + rate = mc_recover_update(&stream->mc, avtp_ts, stream->frames_per_pdu, + stream->info.info.raw.rate, stream->pdu_period); + + if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1) { + pw_log_info("milan-avb: mc-recovery stream=%u pdus=%llu avtp_ts=%u model_lo=%u nom=%u pdu_ns=%lld rate=%.4f corr=%.8f err_ns=%d ppm=%.3f", + stream->index, (unsigned long long)stream->mc.pdus, avtp_ts, + (uint32_t)stream->mc.model_ns, (unsigned)stream->info.info.raw.rate, + (long long)stream->pdu_period, rate, stream->mc.corr, + stream->mc.last_err_ns, (stream->mc.corr - 1.0) * 1e6); + } +} + +/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback interrupted by loss of "several" AVTPDUs (count implementation-defined); a single dropped/reordered PDU is a SEQ_NUM_MISMATCH, a gap of this many or more is an interruption. */ +#define AVB_STREAM_INTERRUPT_MIN_LOST 2 + +/* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and NOT counted as SEQ_NUM_MISMATCH — covers the one-time bind/SRP-path-open gap of a mid-stream join; small, so genuine ongoing loss still counts. */ +#define AVB_STREAM_SEQ_SETTLE 8 + +/* Milan v1.2 Section 5.4: a received AAF AVTPDU matches the current format when subtype, format, nsr, bit depth, channels and sparse all match. */ +static inline bool aaf_pdu_format_matches(const struct avb_packet_aaf *p, + const struct avb_aem_stream_format_info *fi) +{ + return p->subtype == fi->subtype && + p->format == fi->format && + p->nsr == fi->nsr && + p->bit_depth == fi->bit_depth && + p->chan_per_frame == fi->channels && + p->sp == fi->sparse; +} + +/* Read the current format from the Stream Input descriptor; SET_STREAM_FORMAT updates it there, so this is always the current one. */ +static void stream_in_current_format(struct stream *stream, + struct avb_aem_stream_format_info *out) +{ + struct descriptor *desc; + struct avb_aem_desc_stream *body; + + desc = server_find_descriptor(stream->server, AVB_AEM_DESC_STREAM_INPUT, + stream->index); + body = desc ? descriptor_body(desc) : NULL; + avb_aem_stream_format_decode(body ? body->current_format : 0, out); +} + static void handle_aaf_packet(struct stream *stream, struct avb_packet_aaf *p, int len) { struct aecp_aem_stream_input_state *si = stream_in_state(stream); struct aecp_aem_stream_input_counters *cnt = &si->counters; + struct avb_aem_stream_format_info cur; struct timespec now_ts; uint32_t index, n_bytes; int32_t filled; filled = spa_ringbuffer_get_write_index(&stream->ring, &index); n_bytes = ntohs(p->data_len); - if (n_bytes > (uint32_t)(len - (int)sizeof(*p))) - return; - /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every valid - * AVTPDU received on the wire — independent of whether the listener - * pipeline could absorb it. A ringbuffer overrun is a separate event - * that bumps STREAM_INTERRUPTED. Counting both unconditionally keeps - * Hive's dashboard meaningful even when no PipeWire consumer is - * draining the source side. */ + /* IEEE 1722.1-2021 Table 7-156: per-PDU, bump UNSUPPORTED_FORMAT on any AVTPDU whose format != the Stream Input current format (from descriptor), or malformed. */ + stream_in_current_format(stream, &cur); + if (n_bytes > (uint32_t)(len - (int)sizeof(*p)) || !aaf_pdu_format_matches(p, &cur)) { + cnt->unsupported_format++; + stream_in_mark_counters_dirty(stream); + return; + } + + /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every valid AVTPDU received on the wire, independent of whether the listener pipeline could absorb it. */ cnt->frame_rx++; clock_gettime(CLOCK_MONOTONIC, &now_ts); si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); + if (!si->media_locked_state) { cnt->media_locked++; si->media_locked_state = true; + stream->prev_seq = p->seq_num; /* (re)lock: seed seq, no gap */ + si->seq_settle = AVB_STREAM_SEQ_SETTLE; /* grace the bind/path-open step */ + } else if (si->seq_settle > 0) { + /* settling just after a (re)lock: a Listener that binds mid-stream behind an SRP bridge gets a one-time sequence step as the bridge opens forwarding — re-seed and don't count it. */ + si->seq_settle--; + stream->prev_seq = p->seq_num; + } else { + uint8_t expected = (uint8_t)(stream->prev_seq + 1); + if (p->seq_num != expected) { + /* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence discontinuity (loss, reorder or duplicate). */ + uint8_t lost = (uint8_t)(p->seq_num - expected); + cnt->seq_mistmatch++; + /* STREAM_INTERRUPTED only when several PDUs are missing. */ + if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST) { + cnt->stream_interrupted++; + } + } + stream->prev_seq = p->seq_num; + } + + /* milan-avb: AAF media-clock recovery (active only when selected via the CLOCK_DOMAIN); recovers the talker media rate from avtp_timestamps. */ + stream_mc_recover(stream, p); + + /* milan-avb: latency observability (throttled to 1 Hz, env-gated). */ + if (getenv("MILAN_AVB_LATENCY_LOG")) { + static uint64_t last_log_ns = 0; + uint64_t now_tai_ns = stream_gptp_now(stream->server); + if (now_tai_ns - last_log_ns >= SPA_NSEC_PER_SEC) { + uint32_t avtp_ts = ntohl(p->timestamp); + int32_t talker_to_recv_ns = (int32_t)((uint32_t)now_tai_ns - avtp_ts); + pw_log_info("milan-avb: lat A+B seq=%u avtp_ts=%u talker_to_recv_ns=%d", + (unsigned)p->seq_num, avtp_ts, talker_to_recv_ns); + last_log_ns = now_tai_ns; + } } if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { + /* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */ uint32_t r_index; spa_ringbuffer_get_read_index(&stream->ring, &r_index); spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes); - cnt->stream_interrupted++; filled -= n_bytes; } spa_ringbuffer_write_data(&stream->ring, @@ -797,6 +1300,26 @@ static void handle_aaf_packet(struct stream *stream, index += n_bytes; spa_ringbuffer_write_update(&stream->ring, index); stream_in_mark_counters_dirty(stream); + + /* milan-avb: env-gated raw PCM dump (S32BE interleaved) for offline THDN. */ + { + const char *dump_dir = getenv("MILAN_AVB_RAW_DUMP_DIR"); + if (dump_dir && stream->raw_dump_fp == NULL) { + char dpath[512]; + snprintf(dpath, sizeof(dpath), + "%s/avb-stream-in-%u.s32be", + dump_dir, stream->index); + stream->raw_dump_fp = fopen(dpath, "wb"); + if (stream->raw_dump_fp) + pw_log_info("milan-avb: dumping raw S32BE PCM to %s", dpath); + else + pw_log_warn("milan-avb: cannot open dump file %s: %m", dpath); + } + if (stream->raw_dump_fp) { + size_t w = fwrite(p->payload, 1, n_bytes, stream->raw_dump_fp); + stream->raw_dump_bytes += w; + } + } } static void handle_iec61883_packet(struct stream *stream, @@ -826,11 +1349,24 @@ static void handle_iec61883_packet(struct stream *stream, si->media_locked_state = true; } + /* milan-avb: latency observability (throttled to 1 Hz, env-gated). */ + if (getenv("MILAN_AVB_LATENCY_LOG")) { + static uint64_t last_log_ns = 0; + uint64_t now_tai_ns = stream_gptp_now(stream->server); + if (now_tai_ns - last_log_ns >= SPA_NSEC_PER_SEC) { + uint32_t avtp_ts = ntohl(p->timestamp); + int32_t talker_to_recv_ns = (int32_t)((uint32_t)now_tai_ns - avtp_ts); + pw_log_info("milan-avb: lat A+B seq=%u avtp_ts=%u talker_to_recv_ns=%d", + (unsigned)p->seq_num, avtp_ts, talker_to_recv_ns); + last_log_ns = now_tai_ns; + } + } + if (filled + n_bytes > stream->buffer_size) { + /* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */ uint32_t r_index; spa_ringbuffer_get_read_index(&stream->ring, &r_index); spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes); - cnt->stream_interrupted++; filled -= n_bytes; } spa_ringbuffer_write_data(&stream->ring, @@ -841,8 +1377,29 @@ static void handle_iec61883_packet(struct stream *stream, index += n_bytes; spa_ringbuffer_write_update(&stream->ring, index); stream_in_mark_counters_dirty(stream); + + /* milan-avb: env-gated raw PCM dump (S32BE interleaved) for offline THDN. */ + { + const char *dump_dir = getenv("MILAN_AVB_RAW_DUMP_DIR"); + if (dump_dir && stream->raw_dump_fp == NULL) { + char dpath[512]; + snprintf(dpath, sizeof(dpath), + "%s/avb-stream-in-%u.s32be", + dump_dir, stream->index); + stream->raw_dump_fp = fopen(dpath, "wb"); + if (stream->raw_dump_fp) + pw_log_info("milan-avb: dumping raw S32BE PCM to %s", dpath); + else + pw_log_warn("milan-avb: cannot open dump file %s: %m", dpath); + } + if (stream->raw_dump_fp) { + size_t w = fwrite(p->payload, 1, n_bytes, stream->raw_dump_fp); + stream->raw_dump_bytes += w; + } + } } +/* TODO: RX is on the main loop, not the RT data_loop — preemption can drop PDUs (SEQ_NUM_MISMATCH); move it to data_loop + a big SO_RCVBUF, like the flush_timer. */ static void on_socket_data(void *data, int fd, uint32_t mask) { struct stream *stream = data; @@ -880,8 +1437,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask) len - (int)sizeof(*h)); break; case AVB_SUBTYPE_CRF: - /* CRF clock-reference stream: no audio data plane. - * Consume and ignore (clock recovery is future work). */ + /* CRF clock-reference stream: no audio data plane; consume and ignore (clock recovery is future work). */ break; default: pw_log_warn("unsupported subtype 0x%02x", ph->subtype); @@ -891,6 +1447,25 @@ static void on_socket_data(void *data, int fd, uint32_t mask) } } +/* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the not-bound -> bound transition (NOT the reverse); also re-arms the media-lock / seq-settle state, since the unlock edge is detected only in the GET_COUNTERS poll (100 ms silence) so a fast unbind/rebind could leave media_locked_state==true and miscount the bridge-open step as SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */ +static void stream_input_reset_counters(struct aecp_aem_stream_input_state *si) +{ + si->counters.media_locked = 0; + si->counters.media_unlocked = 0; + si->counters.stream_interrupted = 0; + si->counters.seq_mistmatch = 0; + si->counters.media_reset = 0; + si->counters.tu = 0; + si->counters.unsupported_format = 0; + si->counters.late_timestamp = 0; + si->counters.early_timestamp = 0; + si->counters.frame_rx = 0; + si->media_locked_state = false; + si->seq_settle = 0; + si->last_frame_rx_ns = 0; + si->counters_dirty = true; +} + int stream_activate(struct stream *stream, uint16_t index, uint64_t now) { struct server *server = stream->server; @@ -899,6 +1474,21 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct stream_common *common; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); + /* milan-avb: SR-class priority + VLAN id come from the MSRP Domain (the authoritative network-declared values), not a hardcoded default; read before setup_socket() since the listener uses stream->vlan_id to select its VLAN sub-iface. */ + { + struct descriptor *avbif = server_find_descriptor(server, + AVB_AEM_DESC_AVB_INTERFACE, 0); + if (avbif != NULL) { + struct aecp_aem_avb_interface_state *ifs = avbif->ptr; + uint8_t dprio = ifs->domain_attr.attr.domain.sr_class_priority; + uint16_t dvid = ntohs(ifs->domain_attr.attr.domain.sr_class_vid); + if (dvid != 0 && dvid < 4095) { + stream->prio = dprio; + stream->vlan_id = dvid; + } + } + } + if (stream->source == NULL) { if ((fd = setup_socket(stream)) < 0) return fd; @@ -916,11 +1506,60 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct aecp_aem_stream_input_state *input_stream; input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); - /* lstream_attr.listener.stream_id is already populated by the - * ACMP FSM from PROBE_TX_RESPONSE. Don't overwrite it here. - * Milan Section 4.3.3.1: Listener starts in AskingFailed; notify_talker - * promotes to Ready once the Talker Advertise registrar is IN. */ - common->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; + /* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the media-lock / seq-settle state on the not-bound -> bound transition. */ + stream_input_reset_counters(input_stream); + + /* Prime ring with one PipeWire quantum of silence (Milan v1.2 Section 5.4.5.3). */ + spa_ringbuffer_init(&stream->ring); + if (stream->frames_per_pdu > 0) { + uint32_t prefill_pdus = 1024u / stream->frames_per_pdu; + if (prefill_pdus > 0) { + pad_ringbuffer_with_silence(stream, (int)prefill_pdus); + } + } + + /* milan-avb: pick up the current media-clock selection for this input (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */ + stream->mc_aaf_active = stream_mc_aaf_selected(stream); + if (stream->mc_aaf_active) { + stream_mc_reset(stream); + } + + /* milan-avb: publish our contribution to graph latency (the prefill: one PipeWire quantum at 48 kHz) so wpctl/pw-cli report it. */ + { + struct spa_latency_info latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); + uint32_t rate = stream->info.info.raw.rate ? stream->info.info.raw.rate : 48000; + uint8_t lbuf[256]; + struct spa_pod_builder lb = { 0 }; + const struct spa_pod *lp; + char buf[64]; + struct pw_properties *props; + latency.min_quantum = 1.0f; + latency.max_quantum = 1.0f; + latency.min_rate = 1024; + latency.max_rate = 1024; + latency.min_ns = (uint64_t)1024 * SPA_NSEC_PER_SEC / rate; + latency.max_ns = latency.min_ns; + spa_pod_builder_init(&lb, lbuf, sizeof(lbuf)); + lp = spa_latency_build(&lb, SPA_PARAM_Latency, &latency); + pw_stream_update_params(stream->stream, &lp, 1); + + props = pw_properties_new(NULL, NULL); + snprintf(buf, sizeof(buf), "%llu", (unsigned long long)latency.min_ns); + pw_properties_set(props, "milan.avb.latency.prefill.ns", buf); + snprintf(buf, sizeof(buf), "%u", 1024u); + pw_properties_set(props, "milan.avb.latency.prefill.frames", buf); + snprintf(buf, sizeof(buf), "%u", (unsigned)stream->frames_per_pdu); + pw_properties_set(props, "milan.avb.frames_per_pdu", buf); + pw_stream_update_properties(stream->stream, &props->dict); + pw_properties_free(props); + } + + /* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN; compute from current state so a reconnect picks up an already-IN TA registrar (no NEW/JOIN event fires when the registrar didn't transition). */ + common->lstream_attr.param = + (common->tastream_attr.mrp != NULL && + avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN) + ? AVB_MSRP_LISTENER_PARAM_READY + : AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; avb_mrp_attribute_begin(common->lstream_attr.mrp, now); avb_mrp_attribute_join(common->lstream_attr.mrp, now, true); @@ -928,8 +1567,12 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) avb_mrp_attribute_begin(input_stream->mvrp_attr.mrp, now); avb_mrp_attribute_join(input_stream->mvrp_attr.mrp, now, true); } else { - if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) + if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) { return res; + } + + /* M2: re-anchor the presentation-timestamp accumulator on connect. */ + stream->tx_pts = 0; common->tastream_attr.attr.talker.stream_id = htobe64(stream->id); memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6); @@ -947,28 +1590,14 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) pw_stream_set_active(stream->stream, true); - /* Milan Table 5.17: STREAM_START counter ticks each time the stream - * transitions from stopped → started. */ + /* Milan Table 5.17: STREAM_START counter ticks each time the stream transitions from stopped → started. */ if (stream->direction == SPA_DIRECTION_OUTPUT) { stream_out_counters(stream)->stream_start++; stream_out_mark_counters_dirty(stream); if (stream->flush_timer == NULL) { - struct timespec value = { - .tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC), - .tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC), - }; - struct timespec interval = value; - stream->flush_last_ns = 0; - stream->flush_timer = pw_loop_add_timer(server->impl->loop, - on_flush_tick, stream); - if (stream->flush_timer) - pw_loop_update_timer(server->impl->loop, - stream->flush_timer, - &value, &interval, false); - else - pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)", - stream); + pw_loop_invoke(server->impl->data_loop, do_add_flush_timer, + 0, NULL, 0, true, stream); } } @@ -978,6 +1607,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) int stream_deactivate(struct stream *stream, uint64_t now) { struct stream_common *common; + struct aecp_aem_stream_input_state *si; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); pw_stream_set_active(stream->stream, false); @@ -987,21 +1617,22 @@ int stream_deactivate(struct stream *stream, uint64_t now) stream->source = NULL; } if (stream->flush_timer != NULL) { - pw_loop_destroy_source(stream->server->impl->loop, stream->flush_timer); - stream->flush_timer = NULL; - stream->flush_last_ns = 0; + pw_loop_invoke(stream->server->impl->data_loop, do_remove_flush_timer, + 0, NULL, 0, true, stream); } -#if 0 - avb_mrp_attribute_leave(stream->vlan_attr->mrp, now); -#endif // - - if (stream->direction == SPA_DIRECTION_INPUT) + /* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the reservation immediately (Leave) instead of holding stale state until its LeaveAll timer — otherwise a stop/restart or replug to another port can't re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */ + if (stream->direction == SPA_DIRECTION_INPUT) { + si = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); avb_mrp_attribute_leave(common->lstream_attr.mrp, now); - else + if (si->mvrp_attr.mrp) { + avb_mrp_attribute_leave(si->mvrp_attr.mrp, now); + } + stream->mc_aaf_active = false; + } else { avb_mrp_attribute_leave(common->tastream_attr.mrp, now); + } - /* Milan Table 5.17: STREAM_STOP counter ticks each transition the - * other way. */ + /* Milan Table 5.17: STREAM_STOP counter ticks each transition the other way. */ if (stream->direction == SPA_DIRECTION_OUTPUT) { stream_out_counters(stream)->stream_stop++; stream_out_mark_counters_dirty(stream); diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h index 7b3895f5a..ce34b82cc 100644 --- a/src/modules/module-avb/stream.h +++ b/src/modules/module-avb/stream.h @@ -12,10 +12,15 @@ #include #include +#include + +#include "mc-recover.h" +#include "play-loop.h" #include -#define BUFFER_SIZE (1u<<16) +/* milan-avb: the talker ring must hold SEVERAL graph quanta so the burst-fill (one quantum/cycle) and the 125us flush-drain decouple; at 1<<16 the ring equalled ONE quantum so each cycle overwrote unread samples (~40 glitches/s). 1<<18 = 4 quanta. */ +#define BUFFER_SIZE (1u<<18) #define BUFFER_MASK (BUFFER_SIZE-1) struct stream { @@ -37,7 +42,19 @@ struct stream { struct spa_source *source; struct spa_source *flush_timer; uint64_t flush_last_ns; + + /* milan-avb: self-driving timer (the node IS the graph DRIVER); a data-loop one-shot fills io_position->clock, re-arms to drive_next_time, and triggers one graph cycle (pipe-tunnel pattern) so the graph fill and AVTP egress share one clock (no follower-resampler FM). */ + struct spa_source *drive_timer; + uint64_t drive_next_time; + bool driving; + uint64_t drive_phc_last; + uint64_t drive_mono_last; + double drive_ratio_ema; /* M2: EMA of PHC/MONOTONIC rate ratio */ + + /* M2: monotonic AVTP presentation-timestamp accumulator (PHC domain); advances by exactly pdu_period per PDU, slow-leaked toward the live PHC, so the listener's recovered media clock stays jitter-free (per-tick PHC reads inject interpolation jitter, audible FM). */ + uint64_t tx_pts; bool is_crf; + uint64_t next_txtime; int prio; int mtt; int t_uncertainty; @@ -66,6 +83,28 @@ struct stream { uint64_t format; uint32_t stride; struct spa_audio_info info; + + /* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only); active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM) source pointing at this stream; recovered from avtp_timestamp deltas (mc-recover.h). */ + bool mc_aaf_active; + struct mc_recover mc; + + /* milan-avb: actuator I/O areas (set via .io_changed); io_rate_match is the resampler knob, NULL unless the adapter inserted a resampler. */ + struct spa_io_rate_match *io_rate_match; + struct spa_io_position *io_position; + + /* milan-avb: previous 1 Hz sample for the local consume-rate log. */ + uint64_t play_last_consume_tai; + uint64_t play_last_ticks; + uint64_t play_log_last_ns; + bool play_primed; + + /* milan-avb: actuator state; servos the ring to play_target (play-loop.h). */ + struct play_loop play; + int32_t play_target; + + /* milan-avb: optional raw PCM dump for offline analysis (THDN, waveform). */ + FILE *raw_dump_fp; + size_t raw_dump_bytes; }; #include "msrp.h" @@ -81,4 +120,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now); int stream_deactivate(struct stream *stream, uint64_t now); int stream_activate_virtual(struct stream *stream, uint16_t index); +/* milan-avb: re-evaluate each input stream's media-clock recovery against the current CLOCK_DOMAIN selection; call after SET_CLOCK_SOURCE for on-the-fly clock-source switching. */ +void avb_stream_update_clock_source(struct server *server); + #endif /* AVB_STREAM_H */