From afc7724070c3f82fe3776f16623eb1144f8eca8d Mon Sep 17 00:00:00 2001 From: hackerman-kl Date: Sun, 31 May 2026 15:05:27 +0200 Subject: [PATCH] milan-avb: stabilization extras - MRP registrar-before-notify, scoped-fd cleanup, VLAN sub-iface RX, SET_NAME validation, entity/firmware/8ch, MEDIA_UNLOCK 100ms + seq-settle, MRP Leave on teardown, reset STREAM_INPUT counters on bind --- .../aecp-aem-cmds-resps/cmd-get-counters.c | 6 +- src/modules/module-avb/aecp-aem-state.h | 12 +- src/modules/module-avb/avdecc.c | 215 +++++++++++++++- src/modules/module-avb/descriptors.c | 42 +-- .../module-avb/entity-model-milan-v12.h | 8 +- src/modules/module-avb/mrp.c | 31 +-- src/modules/module-avb/msrp.c | 63 ++--- src/modules/module-avb/stream.c | 239 +++++++++++++++--- 8 files changed, 482 insertions(+), 134 deletions(-) diff --git a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c index 793e6f909..91df002e1 100644 --- a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c +++ b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c @@ -360,7 +360,11 @@ static void emit_avb_interface_counters(struct aecp *aecp, uint16_t desc_index, * here, gated by last_counters_emit_ns. */ #define COUNTER_UNSOL_MIN_INTERVAL_NS ((int64_t)SPA_NSEC_PER_SEC) -#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(2 * SPA_NSEC_PER_MSEC)) +/* A frame is "missing" only after a gap far longer than event-loop scheduling + * jitter — 2 ms (16 PDU periods) is processed-not-arrived jitter, not media + * loss, and spuriously unlocks a healthy stream on a busy software listener. + * 100 ms is ~5x the listener prefill and well above loop jitter. */ +#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(100 * SPA_NSEC_PER_MSEC)) static bool counter_rate_limit_elapsed(int64_t now, int64_t last_emit) { diff --git a/src/modules/module-avb/aecp-aem-state.h b/src/modules/module-avb/aecp-aem-state.h index 1864f66ab..41f949af2 100644 --- a/src/modules/module-avb/aecp-aem-state.h +++ b/src/modules/module-avb/aecp-aem-state.h @@ -182,15 +182,17 @@ struct aecp_aem_stream_input_state { * most recent valid PDU; media_locked_state is the current edge. */ int64_t last_frame_rx_ns; bool media_locked_state; + + /* Settle window after a (re)lock: a Listener binding mid-stream behind an + * SRP bridge sees a one-time sequence step as the bridge opens forwarding a + * beat after the talker is already transmitting. Re-seed prev_seq for this + * many post-lock PDUs instead of counting it as SEQ_NUM_MISMATCH. */ + uint8_t seq_settle; }; struct acmp_stream_status_milan_v12 { uint64_t controller_entity_id; - /* Original talker entity_id captured at BIND_RX_COMMAND time, used - * verbatim when emitting CONNECT_TX_COMMAND probes. Needed because the - * stream_id ↔ entity_id round-trip via peer_id_from_entity_id() is - * lossy for non-EUI-64 entity_ids (e.g. MAC | entity_index). */ - uint64_t talker_entity_id; + uint64_t talker_entity_id; /* IEEE 1722.1-2021 Section 7.4.6, BIND_RX_COMMAND talker_guid */ uint32_t acmp_flags; uint8_t probing_status; uint8_t acmp_status; diff --git a/src/modules/module-avb/avdecc.c b/src/modules/module-avb/avdecc.c index 970ceea2e..ba23601d3 100644 --- a/src/modules/module-avb/avdecc.c +++ b/src/modules/module-avb/avdecc.c @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -253,11 +256,187 @@ static int raw_transport_setup(struct server *server) return 0; } +/* milan-avb: hand-rolled netlink VLAN sub-iface creator. + * + * On I210-class NICs with rx-vlan-filter[fixed]=on the silicon drops + * VLAN-tagged frames whose VID is not registered. Registering happens + * implicitly when a VLAN sub-iface is added via RTM_NEWLINK. We create + * . on first use, never delete it (bounded leak: one + * sub-iface per SR class), and bind the listener stream socket to it. + * + * Fall back to PACKET_MR_PROMISC if any step fails (no CAP_NET_ADMIN, + * no 8021q module loaded, etc.). */ + +#define MILAN_NLALIGN(n) (((n) + 3U) & ~3U) + +static int milan_nl_send(int fd, uint16_t mt, uint16_t flags, + uint32_t seq, const void *payload, size_t plen) +{ + struct { + struct nlmsghdr nlh; + char body[2048]; + } msg = { 0 }; + size_t need = NLMSG_HDRLEN + plen; + if (need > sizeof(msg)) + return -EMSGSIZE; + msg.nlh.nlmsg_len = need; + msg.nlh.nlmsg_type = mt; + msg.nlh.nlmsg_flags = flags; + msg.nlh.nlmsg_seq = seq; + msg.nlh.nlmsg_pid = 0; + if (plen) + memcpy(msg.body, payload, plen); + if (send(fd, &msg, need, 0) < 0) + return -errno; + return 0; +} + +static int milan_nl_recv_ack(int fd, uint32_t seq) +{ + char buf[4096]; + for (;;) { + ssize_t n = recv(fd, buf, sizeof(buf), 0); + size_t off = 0; + if (n < 0) + return -errno; + while (off + NLMSG_HDRLEN <= (size_t)n) { + struct nlmsghdr *h = (struct nlmsghdr *)(buf + off); + if (h->nlmsg_len < NLMSG_HDRLEN || + off + h->nlmsg_len > (size_t)n) + break; + if (h->nlmsg_type == NLMSG_ERROR && h->nlmsg_seq == seq) { + struct nlmsgerr *e = NLMSG_DATA(h); + return e->error; + } + off += NLMSG_ALIGN(h->nlmsg_len); + } + } +} + +static size_t milan_nl_attr(char *p, uint16_t type, const void *val, uint16_t vlen) +{ + struct rtattr *r = (struct rtattr *)p; + size_t total; + r->rta_len = RTA_LENGTH(vlen); + r->rta_type = type; + memcpy(RTA_DATA(r), val, vlen); + total = RTA_ALIGN(r->rta_len); + if (total > (size_t)r->rta_len) + memset(p + r->rta_len, 0, total - r->rta_len); + return total; +} + +/* Returns ifindex (>0) or -errno. */ +static int milan_get_ifindex(const char *name) +{ + int fd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0); + struct ifreq req; + int res, saved; + if (fd < 0) + return -errno; + spa_zero(req); + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", name); + res = ioctl(fd, SIOCGIFINDEX, &req); + saved = -errno; + close(fd); + return res < 0 ? saved : req.ifr_ifindex; +} + +/* Ensure . exists, is UP, and is a VLAN sub-iface of . + * Writes the sub-iface name to out and returns 0 on success. */ +static int milan_ensure_vlan_iface(const char *parent_ifname, uint16_t vid, + char *out, size_t out_size) +{ + int rc, existing, parent_idx, nlfd, err, new_idx; + struct sockaddr_nl sa = { .nl_family = AF_NETLINK }; + char payload[256]; + char *p = payload; + struct ifinfomsg ifi = { 0 }; + uint32_t pidx; + char nested[64]; + char *np = nested; + const char kind[] = "vlan"; + char data[16]; + char *dp = data; + uint16_t vidv = vid; + uint32_t seq = 1; + struct ifinfomsg up_ifi = { 0 }; + + if (vid == 0 || vid >= 4095) + return -EINVAL; + rc = snprintf(out, out_size, "%s.%u", parent_ifname, (unsigned)vid); + if (rc < 0 || (size_t)rc >= out_size) + return -ENAMETOOLONG; + + /* If the sub-iface already exists, assume it is what we want and reuse. */ + existing = milan_get_ifindex(out); + if (existing > 0) + return 0; + + parent_idx = milan_get_ifindex(parent_ifname); + if (parent_idx <= 0) + return parent_idx ? parent_idx : -ENODEV; + + nlfd = socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_ROUTE); + if (nlfd < 0) + return -errno; + if (bind(nlfd, (struct sockaddr *)&sa, sizeof(sa)) < 0) { + int e = -errno; close(nlfd); return e; + } + + /* RTM_NEWLINK: ifinfomsg + IFLA_IFNAME + IFLA_LINK + IFLA_LINKINFO{KIND=vlan, DATA{VLAN_ID}} */ + ifi.ifi_family = AF_UNSPEC; + ifi.ifi_change = 0xFFFFFFFFu; + memcpy(p, &ifi, sizeof(ifi)); p += sizeof(ifi); + p += milan_nl_attr(p, IFLA_IFNAME, out, (uint16_t)(strlen(out) + 1)); + pidx = (uint32_t)parent_idx; + p += milan_nl_attr(p, IFLA_LINK, &pidx, sizeof(pidx)); + + /* LINKINFO is nested: KIND=vlan, DATA={VLAN_ID=vid} */ + np += milan_nl_attr(np, IFLA_INFO_KIND, kind, sizeof(kind)); + dp += milan_nl_attr(dp, IFLA_VLAN_ID, &vidv, sizeof(vidv)); + np += milan_nl_attr(np, IFLA_INFO_DATA, data, (uint16_t)(dp - data)); + p += milan_nl_attr(p, IFLA_LINKINFO, nested, (uint16_t)(np - nested)); + + err = milan_nl_send(nlfd, RTM_NEWLINK, + NLM_F_REQUEST | NLM_F_CREATE | NLM_F_EXCL | NLM_F_ACK, + seq, payload, (size_t)(p - payload)); + if (err == 0) + err = milan_nl_recv_ack(nlfd, seq); + if (err != 0 && err != -EEXIST) { + close(nlfd); + return err; + } + + /* Bring it UP. */ + new_idx = milan_get_ifindex(out); + if (new_idx <= 0) { + close(nlfd); + return new_idx ? new_idx : -ENODEV; + } + up_ifi.ifi_family = AF_UNSPEC; + up_ifi.ifi_index = new_idx; + up_ifi.ifi_flags = IFF_UP; + up_ifi.ifi_change = IFF_UP; + err = milan_nl_send(nlfd, RTM_NEWLINK, NLM_F_REQUEST | NLM_F_ACK, + ++seq, &up_ifi, sizeof(up_ifi)); + if (err == 0) + err = milan_nl_recv_ack(nlfd, seq); + close(nlfd); + if (err != 0) + return err; + + return 0; +} + static int raw_stream_setup_socket(struct server *server, struct stream *stream) { int res; char buf[128]; struct ifreq req; + const char *bind_ifname = server->ifname; + char vlan_ifname[IFNAMSIZ]; + bool used_vlan_subiface = false; spa_autoclose int fd = socket(AF_PACKET, SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK, htons(ETH_P_ALL)); if (fd < 0) { @@ -265,8 +444,30 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream) return -errno; } + /* For listener RX: route stream via a VLAN sub-iface so the NIC's + * hardware filter accepts VID-tagged AAF without promisc on parent. */ + /* Listener-only: route stream RX via a VLAN sub-iface so the NIC accepts + * VID-tagged AAF without promisc on parent. OUTPUT direction stays on + * the parent because setup_pdu_milan_v12() already inserts a manual + * 802.1Q tag in the PDU header — the kernel would add a second tag + * (QinQ) if we bound the talker socket to enp6s0.. */ + if (stream->direction == SPA_DIRECTION_INPUT && stream->vlan_id > 0) { + int e = milan_ensure_vlan_iface(server->ifname, + (uint16_t)stream->vlan_id, + vlan_ifname, sizeof(vlan_ifname)); + if (e == 0) { + bind_ifname = vlan_ifname; + used_vlan_subiface = true; + pw_log_info("milan-avb: listener RX via VLAN sub-iface %s (vid %d)", + vlan_ifname, stream->vlan_id); + } else { + pw_log_warn("milan-avb: VLAN sub-iface setup failed (%d), " + "falling back to PACKET_MR_PROMISC on parent", -e); + } + } + spa_zero(req); - snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname); + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", bind_ifname); res = ioctl(fd, SIOCGIFINDEX, &req); if (res < 0) { pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname); @@ -319,6 +520,18 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream) pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m"); return -errno; } + + /* Fallback: lift promisc only when the VLAN sub-iface path didn't + * take. With the sub-iface, the NIC accepts VID 2 natively. */ + if (!used_vlan_subiface) { + spa_zero(mreq); + mreq.mr_ifindex = req.ifr_ifindex; + mreq.mr_type = PACKET_MR_PROMISC; + res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, + &mreq, sizeof(struct packet_mreq)); + if (res < 0) + pw_log_warn("setsockopt(PACKET_MR_PROMISC) fallback failed: %m"); + } } return spa_steal_fd(fd); } diff --git a/src/modules/module-avb/descriptors.c b/src/modules/module-avb/descriptors.c index 3320f4a4b..034ca743c 100644 --- a/src/modules/module-avb/descriptors.c +++ b/src/modules/module-avb/descriptors.c @@ -4,6 +4,10 @@ /* SPDX-FileCopyrightText: Copyright © 2025 Simon Gapp */ /* SPDX-License-Identifier: MIT */ +#include + +#include + #include "adp.h" #include "aecp-aem.h" #include "aecp-aem-types.h" @@ -300,23 +304,26 @@ static void init_descriptor_legacy_avb(struct server *server) static void init_descriptor_milan_v12(struct server *server) { + /* name the entity after the hostname so each box shows as pw0/pw1/pw2 */ + char hostname[64] = {0}; + if (gethostname(hostname, sizeof(hostname) - 1) != 0 || hostname[0] == '\0') + snprintf(hostname, sizeof(hostname), "%s", DSC_STRINGS_0_DEVICE_NAME); + // TODO PERSISTENCE: retrieve the saved buffers. /**************************************************************************************/ /* IEEE 1722.1-2021, Sec. 7.2.12 - STRINGS Descriptor * Up to 7 localized strings */ - es_builder_add_descriptor(server, AVB_AEM_DESC_STRINGS, 0, - sizeof(struct avb_aem_desc_strings), - &(struct avb_aem_desc_strings) - { - .string_0 = DSC_STRINGS_0_DEVICE_NAME, + struct avb_aem_desc_strings strings = { .string_1 = DSC_STRINGS_1_CONFIGURATION_NAME, .string_2 = DSC_STRINGS_2_MANUFACTURER_NAME, .string_3 = DSC_STRINGS_3_GROUP_NAME, .string_4 = DSC_STRINGS_4_MAINTAINER_0, .string_5 = DSC_STRINGS_4_MAINTAINER_1, - } - ); + }; + snprintf(strings.string_0, sizeof(strings.string_0), "%s", hostname); + es_builder_add_descriptor(server, AVB_AEM_DESC_STRINGS, 0, + sizeof(strings), &strings); /**************************************************************************************/ /* IEEE 1722.1-2021, Sec. 7.2.11 - LOCALE Descriptor */ @@ -334,8 +341,7 @@ static void init_descriptor_milan_v12(struct server *server) /* Milan v1.2, Sec. 5.3.3.1 */ struct avb_entity_config entity_conf = conf_load_entity(server->impl->props); - struct avb_aem_desc_entity entity_desc = - { + struct avb_aem_desc_entity entity = { .entity_id = htobe64(server->entity_id), .entity_model_id = htobe64(DSC_ENTITY_MODEL_ID), .entity_capabilities = htonl(entity_conf.entity_capabilities), @@ -351,21 +357,19 @@ static void init_descriptor_milan_v12(struct server *server) .available_index = htonl(DSC_ENTITY_MODEL_AVAILABLE_INDEX), .association_id = htobe64(DSC_ENTITY_MODEL_ASSOCIATION_ID), - .vendor_name_string = htons(entity_conf.vendor_name), - .model_name_string = htons(entity_conf.model_name), + .vendor_name_string = htons(DSC_ENTITY_MODEL_VENDOR_NAME_STRING), + .model_name_string = htons(DSC_ENTITY_MODEL_MODEL_NAME_STRING), + .group_name = DSC_ENTITY_MODEL_GROUP_NAME, + .serial_number = DSC_ENTITY_MODEL_SERIAL_NUMBER, .configurations_count = htons(DSC_ENTITY_MODEL_CONFIGURATIONS_COUNT), .current_configuration = htons(DSC_ENTITY_MODEL_CURRENT_CONFIGURATION) }; - memcpy(entity_desc.entity_name, entity_conf.entity_name, sizeof(entity_desc.entity_name)); - memcpy(entity_desc.firmware_version, entity_conf.firmware_version, sizeof(entity_desc.firmware_version)); - memcpy(entity_desc.group_name, entity_conf.group_name, sizeof(entity_desc.group_name)); - memcpy(entity_desc.serial_number, entity_conf.serial_number, sizeof(entity_desc.serial_number)); - + snprintf(entity.entity_name, sizeof(entity.entity_name), "%s", hostname); + /* firmware_version = the canonical PipeWire library version */ + snprintf(entity.firmware_version, sizeof(entity.firmware_version), "%s", pw_get_library_version()); es_builder_add_descriptor(server, AVB_AEM_DESC_ENTITY, 0, - sizeof(struct avb_aem_desc_entity), - &entity_desc); - + sizeof(entity), &entity); /**************************************************************************************/ /* IEEE 1722.1-2021, Sec. 7.2.2 - CONFIGURATION Descriptor*/ /* Milan v1.2, Sec. 5.3.3.2 */ diff --git a/src/modules/module-avb/entity-model-milan-v12.h b/src/modules/module-avb/entity-model-milan-v12.h index 1e603a82a..c071cf904 100644 --- a/src/modules/module-avb/entity-model-milan-v12.h +++ b/src/modules/module-avb/entity-model-milan-v12.h @@ -319,9 +319,11 @@ BUILD_SAMPLING_RATE(DSC_AUDIO_UNIT_SAMPLING_RATE_PULL, DSC_AUDIO_UNIT_SAMPLING_R #define DSC_STREAM_INPUT_LOCALIZED_DESCRIPTION AVB_AEM_DESC_INVALID #define DSC_STREAM_INPUT_CLOCK_DOMAIN_INDEX 0 #define DSC_STREAM_INPUT_STREAM_FLAGS (AVB_AEM_DESC_STREAM_FLAG_SYNC_SOURCE | AVB_AEM_DESC_STREAM_FLAG_CLASS_A) -// To match my talker +// Default current_format = AAF INT32/48k/8ch, to match the 8-channel PipeWire talker +// (pw1) without a per-bind set-format. The supported list below keeps the smaller +// channel counts (1/2/4/6) so 4ch talkers like the DS20 still bind via set-format. // TODO: Define based on AUDIO_UNIT etc. -#define DSC_STREAM_INPUT_CURRENT_FORMAT 0x0205022001006000ULL +#define DSC_STREAM_INPUT_CURRENT_FORMAT 0x0205022002006000ULL // TODO: Is 132 here, should be 138 according to spec #define DSC_STREAM_INPUT_FORMATS_OFFSET (4 + sizeof(struct avb_aem_desc_stream)) @@ -342,7 +344,7 @@ BUILD_SAMPLING_RATE(DSC_AUDIO_UNIT_SAMPLING_RATE_PULL, DSC_AUDIO_UNIT_SAMPLING_R #define DSC_STREAM_INPUT_AVB_INTERFACE_INDEX 0 #define DSC_STREAM_INPUT_BUFFER_LENGTH_IN_NS 2126000 -#define DSC_STREAM_INPUT_FORMATS_0 DSC_STREAM_INPUT_CURRENT_FORMAT +#define DSC_STREAM_INPUT_FORMATS_0 0x0205022001006000ULL /* 4ch (DS20) — kept supported */ #define DSC_STREAM_INPUT_FORMATS_1 0x0205022000406000ULL #define DSC_STREAM_INPUT_FORMATS_2 0x0205022000806000ULL #define DSC_STREAM_INPUT_FORMATS_3 0x0205022001806000ULL diff --git a/src/modules/module-avb/mrp.c b/src/modules/module-avb/mrp.c index d511daef0..3cd6d8d3d 100644 --- a/src/modules/module-avb/mrp.c +++ b/src/modules/module-avb/mrp.c @@ -108,16 +108,7 @@ static void mrp_periodic(void *data, uint64_t now) if (now > mrp->lva_timer.leave_all_timeout) { - /* IEEE 802.1Q-2018 §10.7.5.20: when our LeaveAll timer fires, we - * become the TRANSMITTER of LeaveAll. Mark lva_tx_pending so the - * next TX cycle includes the LVA PDU. We must NOT apply RX_LVA to - * our own attributes — per Table 10-4 the registrar transitions - * IN→LV only on RECEIVED LV/LVA, not on our own transmission. - * The old global_event(RX_LVA) here flipped our own SR-class - * Domain registrars to LV for a full 1 s leave_timeout window, - * during which the talker reads our Domain as "leaving" and - * reports MSRP TalkerFailed (failure_code 13). That broke the - * stream every 10–15 s. */ + /* IEEE 802.1Q-2018 Section 10.7.5.20: own LVA timer => TX path only, no RX_LVA */ mrp->lva_timer.state = FSM_LVA_ACTIVE; if (mrp->lva_timer.leave_all_timeout > 0) { mrp->lva_tx_pending = true; @@ -452,11 +443,7 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now } break; case AVB_MRP_EVENT_TX_LVA: - /* IEEE 802.1Q-2018 Table 10-4: TX events do NOT trigger - * registrar transitions. The previous code lumped TX_LVA with - * RX_LVA and so transitioned our own registrar IN→LV for the - * full 1 s leave_timeout, briefly hiding the peer's attribute - * from upper layers. Leave the registrar untouched on TX_LVA. */ + /* IEEE 802.1Q-2018 Table 10-4: TX events do not transition the registrar */ break; case AVB_MRP_EVENT_FLUSH: switch (state) { @@ -477,11 +464,10 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now default: break; } - if (notify) { - mrp_attribute_emit_notify(a, now, notify); - mrp_emit_notify(mrp, now, &a->attr, notify); - } - + /* commit registrar_state BEFORE notify: callbacks (e.g. notify_talker -> + * refresh_listener_param) read the registrar state, so they must see the new + * one. Emitting notify first made the Listener latch AskingFailed off a stale + * MT state and never recompute -> SRP bridge refused to forward the stream. */ if (a->registrar_state != state || notify) { pw_log_debug("REG: attr %p: %s %s %s -> %s notify=%s", a, a->attr.name, avb_mrp_event_name(event), avb_registrar_state_name(a->registrar_state), @@ -490,6 +476,11 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now a->registrar_state = state; } + if (notify) { + mrp_attribute_emit_notify(a, now, notify); + mrp_emit_notify(mrp, now, &a->attr, notify); + } + state = a->applicant_state; switch (event) { diff --git a/src/modules/module-avb/msrp.c b/src/modules/module-avb/msrp.c index 31d51fa87..b2458b271 100644 --- a/src/modules/module-avb/msrp.c +++ b/src/modules/module-avb/msrp.c @@ -58,6 +58,16 @@ static void debug_msrp_talker(const struct avb_packet_msrp_talker *t) /* IEEE 802.1Q Section 35.2.2.4.4: Listener may declare Ready only once the matching * Talker Advertise is registered; otherwise it stays in AskingFailed. */ +/* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN, else AskingFailed */ +static void refresh_listener_param(struct stream_common *sc) +{ + bool ta_in = sc->tastream_attr.mrp != NULL && + avb_mrp_attribute_get_registrar_state(sc->tastream_attr.mrp) == AVB_MRP_IN; + sc->lstream_attr.param = ta_in + ? AVB_MSRP_LISTENER_PARAM_READY + : AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; +} + static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) { struct stream_common *sc; @@ -76,12 +86,8 @@ static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, ui sc = SPA_CONTAINER_OF(attr->attr, struct stream_common, tastream_attr); if (sc->stream.direction == SPA_DIRECTION_INPUT) { - if (notify == AVB_MRP_NOTIFY_NEW || notify == AVB_MRP_NOTIFY_JOIN) - sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_READY; - else if (notify == AVB_MRP_NOTIFY_LEAVE) - sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; - /* Milan Table 5.10: TA registrar state flips flags_ex.REGISTERING - * in the listener-side GET_STREAM_INFO answer — emit an unsol. */ + refresh_listener_param(sc); + /* Milan Table 5.10: TA registrar state flips flags_ex.REGISTERING */ avb_aecp_aem_mark_stream_info_dirty(msrp->server, AVB_AEM_DESC_STREAM_INPUT, sc->stream.index); } @@ -103,21 +109,7 @@ static void notify_talker_failed(struct msrp *msrp, uint64_t now, struct attr *a sc = SPA_CONTAINER_OF(attr->attr, struct stream_common, tfstream_attr); if (sc->stream.direction == SPA_DIRECTION_INPUT) { - /* IEEE 802.1Q-2018 §35.2.2.4.4 + Milan v1.2 §4.3.3.1: when the - * Talker has transitioned to TalkerFailed, the Listener must - * declare AskingFailed so the Talker has the recovery signal - * ("I still want this stream, please re-evaluate"). Without - * this, the Listener was stuck at the previous Ready - * declaration from before the failure, the Talker read - * "listener is satisfied" and never re-attempted the advertise. - * Symptom on the wire: Talker oscillates Advertise → Failed → - * silence; stream drops after a few seconds and never resumes - * for the same bind. */ - if (notify == AVB_MRP_NOTIFY_NEW || notify == AVB_MRP_NOTIFY_JOIN) - sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; - /* Milan Table 5.10: TF registrar state also flips - * flags_ex.REGISTERING on the listener side; emit an unsol - * when it changes. */ + refresh_listener_param(sc); avb_aecp_aem_mark_stream_info_dirty(msrp->server, AVB_AEM_DESC_STREAM_INPUT, sc->stream.index); } @@ -340,35 +332,18 @@ static int process_domain(struct msrp *msrp, uint64_t now, uint8_t attr_type, } if (msrp->server->avb_mode == AVB_MODE_MILAN_V12) { - /* Milan v1.2 §4.2.7.2.1: "re-adjust the domain" means - * align the priority/vid of the matching SR class — NOT - * overwrite every locally-held Domain attribute with the - * incoming values. The old code iterated all Domain - * attributes and smashed each one whose (id, prio, vid) - * didn't match the incoming, so when the talker - * advertised SR-B (id=5) both of pipewire's Domain - * attributes (originally SR-A id=6 and SR-B id=5) were - * overwritten to SR-B; when the talker then advertised - * SR-A, both flipped back to SR-A. Pipewire transmitted - * only one of the two SR classes at any moment, so the - * talker never saw a stable SR-A Domain declaration from - * pipewire and locked into TalkerFailed code 13. - * - * Correct re-adjust: only the locally-held attribute - * with the SAME sr_class_id gets its priority/vid - * updated. */ + /* Milan v1.2 Section 4.2.7.2.1: re-adjust scoped to matching sr_class_id only */ + bool mismatch; if (a->attr->attr.domain.sr_class_id != d->sr_class_id) continue; - bool mismatch = (a->attr->attr.domain.sr_class_priority != d->sr_class_priority + mismatch = (a->attr->attr.domain.sr_class_priority != d->sr_class_priority || a->attr->attr.domain.sr_class_vid != d->sr_class_vid); if (mismatch) { - pw_log_info("Domain re-adjust (sr_class_id=%u): prio %u->%u vid %u->%u", - d->sr_class_id, - a->attr->attr.domain.sr_class_priority, - d->sr_class_priority, - ntohs(a->attr->attr.domain.sr_class_vid), + pw_log_info("Domain re-adjust sr_class_id=%u prio %u->%u vid %u->%u", + d->sr_class_id, a->attr->attr.domain.sr_class_priority, + d->sr_class_priority, ntohs(a->attr->attr.domain.sr_class_vid), ntohs(d->sr_class_vid)); a->attr->attr.domain.sr_class_priority = d->sr_class_priority; a->attr->attr.domain.sr_class_vid = d->sr_class_vid; diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index cec2cbe71..41c8f9045 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -82,17 +82,18 @@ * * Per-counter wiring status (Milan Section 5.4.5.3, Table 5.16 Stream Input): * FRAMES_RX live: handle_aaf_packet / handle_iec61883_packet - * STREAM_INTERRUPTED live: ringbuffer overrun in the same handlers + * STREAM_INTERRUPTED live: handle_aaf_packet, on the loss of several + * AVTPDUs (seq gap >= AVB_STREAM_INTERRUPT_MIN_LOST) * MEDIA_LOCKED live: first-frame edge in handle_*_packet * MEDIA_UNLOCKED live: cmd-get-counters periodic when last_frame_rx_ns * ages past MEDIA_UNLOCK_TIMEOUT_NS - * SEQ_NUM_MISMATCH TODO: compare p->seq_num against expected (last + 1 - * modulo 256), tick on mismatch and resync expected + * SEQ_NUM_MISMATCH live: handle_aaf_packet, p->seq_num != expected + * (last + 1 mod 256); resyncs expected each frame * MEDIA_RESET_IN TODO: tick when AVTPDU header sets the mr bit * (header reset notification) * TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header - * UNSUPPORTED_FORMAT TODO: tick when subtype/format mismatch the bound - * descriptor's current_format + * UNSUPPORTED_FORMAT live: handle_aaf_packet drops + ticks any AAF PDU + * whose media format is not the Milan base format * LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now * (frame missed its presentation deadline) * EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time @@ -118,6 +119,7 @@ #include #include #include +#include #include "aaf.h" #include "iec61883.h" @@ -263,17 +265,26 @@ static void on_source_stream_process(void *data) avail = spa_ringbuffer_get_read_index(&stream->ring, &index); - if (avail < wanted) { - pw_log_debug("capture underrun %d < %d", avail, wanted); + /* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */ + if (avail <= 0) { memset(d[0].data, 0, n_bytes); - } else { + } else if ((uint32_t)avail >= n_bytes) { spa_ringbuffer_read_data(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, d[0].data, n_bytes); - index += n_bytes; - spa_ringbuffer_read_update(&stream->ring, index); + spa_ringbuffer_read_update(&stream->ring, index + n_bytes); + } else { + uint32_t use = (uint32_t)avail; + spa_ringbuffer_read_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + d[0].data, use); + memset(SPA_PTROFF(d[0].data, use, void), 0, n_bytes - use); + spa_ringbuffer_read_update(&stream->ring, index + use); + pw_log_debug("capture partial-underrun %u/%u", use, n_bytes); } d[0].chunk->size = n_bytes; @@ -614,7 +625,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream if (stream->format) avb_aem_stream_format_decode(stream->format, &fi); - stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE; + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S32_BE; stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED; stream->info.info.raw.rate = fi.is_audio && fi.rate ? fi.rate : 48000; stream->info.info.raw.channels = fi.is_audio && fi.channels ? fi.channels : 8; @@ -732,6 +743,14 @@ error_free: void stream_destroy(struct stream *stream) { struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); + struct timespec now_ts; + uint64_t now = 0; + + /* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart + * or replug doesn't strand a stale reservation on the bridge (socket still open here). */ + if (clock_gettime(CLOCK_TAI, &now_ts) == 0) + now = SPA_TIMESPEC_TO_NSEC(&now_ts); + stream_deactivate(stream, now); if (stream->direction == SPA_DIRECTION_INPUT) { struct aecp_aem_stream_input_state *si = @@ -753,6 +772,30 @@ static int setup_socket(struct stream *stream) return avb_server_stream_setup_socket(stream->server, stream); } +/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback is interrupted by the loss of + * "several" AVTPDUs (the spec leaves the count implementation-defined). A + * single dropped/reordered PDU is a SEQ_NUM_MISMATCH but not a full + * interruption; a gap of this many or more missing PDUs is. */ +#define AVB_STREAM_INTERRUPT_MIN_LOST 2 + +/* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and + * NOT counted as SEQ_NUM_MISMATCH — covers the one-time bind/SRP-path-open gap of + * a Listener that joins mid-stream. Small, so genuine ongoing loss still counts. */ +#define AVB_STREAM_SEQ_SETTLE 8 + +/* Milan v1.2 Section 5.4: the listener supports only the Milan base stream + * format for decode — AAF PCM, 32-bit integer, 48 kHz, non-sparse. Channel + * count is a stream parameter (the ring buffers by data_len), not part of the + * format check, so any Milan channel count passes. */ +static inline bool aaf_is_milan_format(const struct avb_packet_aaf *p) +{ + return p->subtype == AVB_SUBTYPE_AAF && + p->format == AVB_AAF_FORMAT_INT_32BIT && + p->nsr == AVB_AAF_PCM_NSR_48KHZ && + p->bit_depth == 32 && + p->sp == AVB_AAF_PCM_SP_NORMAL; +} + static void handle_aaf_packet(struct stream *stream, struct avb_packet_aaf *p, int len) { @@ -764,32 +807,54 @@ static void handle_aaf_packet(struct stream *stream, filled = spa_ringbuffer_get_write_index(&stream->ring, &index); n_bytes = ntohs(p->data_len); - if (n_bytes > (uint32_t)(len - (int)sizeof(*p))) - return; - /* IEEE 1722.1 Section 7.4.42 / Milan v1.2 Section 5.4.5.3: FRAMES_RX counts - * every valid AVTPDU received on the wire — independent of whether the - * listener pipeline could absorb it. */ + /* milan-avb: support only the Milan format. EVERY received frame that is + * not a well-formed Milan AAF PDU — bad length, or subtype/format/sample- + * rate/bit-depth/sparse not the Milan base format — bumps UNSUPPORTED_FORMAT + * and is dropped, per frame: not counted as a valid frame, not media-locked, + * not written. (Channel count is a stream parameter, not part of the format + * check, so a valid 4ch talker like the DS20 is not flagged.) */ + if (n_bytes > (uint32_t)(len - (int)sizeof(*p)) || !aaf_is_milan_format(p)) { + cnt->unsupported_format++; + stream_in_mark_counters_dirty(stream); + return; + } + + /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every + * valid AVTPDU received on the wire — independent of whether the listener + * pipeline could absorb it. */ cnt->frame_rx++; clock_gettime(CLOCK_MONOTONIC, &now_ts); si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); + if (!si->media_locked_state) { cnt->media_locked++; si->media_locked_state = true; + stream->prev_seq = p->seq_num; /* (re)lock: seed seq, no gap */ + si->seq_settle = AVB_STREAM_SEQ_SETTLE; /* grace the bind/path-open step */ + } else if (si->seq_settle > 0) { + /* settling just after a (re)lock: a Listener that binds mid-stream + * behind an SRP bridge gets a one-time sequence step as the bridge + * opens forwarding — re-seed and don't count it. */ + si->seq_settle--; + stream->prev_seq = p->seq_num; + } else { + uint8_t expected = (uint8_t)(stream->prev_seq + 1); + if (p->seq_num != expected) { + /* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence + * discontinuity (loss, reorder or duplicate). */ + uint8_t lost = (uint8_t)(p->seq_num - expected); + cnt->seq_mistmatch++; + /* STREAM_INTERRUPTED only when several PDUs are missing. */ + if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST) + cnt->stream_interrupted++; + } + stream->prev_seq = p->seq_num; } if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { - /* Ringbuffer overrun. Per Milan v1.2 Section 5.4.5.3 the - * STREAM_INTERRUPTED counter is reserved for stream-level - * interruptions (e.g. loss of the SRP TalkerAdvertise) and - * MUST NOT be bumped per dropped frame. Without a consumer - * draining the ring (e.g. an ALSA sink hooked up), this branch - * would fire on every received AVTPDU and look identical to a - * total link failure on the controller dashboard — defeating - * the goals.md "error counters stay zero" target. Drop the - * frame silently; an out-of-band metric should be used to - * surface unconsumed-stream conditions. */ + /* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */ uint32_t r_index; spa_ringbuffer_get_read_index(&stream->ring, &r_index); spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes); @@ -833,10 +898,7 @@ static void handle_iec61883_packet(struct stream *stream, } if (filled + n_bytes > stream->buffer_size) { - /* Same reasoning as the AAF handler above: do NOT bump - * STREAM_INTERRUPTED on a per-frame ringbuffer overrun. - * Milan v1.2 §5.4.5.3 reserves that counter for stream-level - * interruptions (lost SRP TalkerAdvertise, etc.). */ + /* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */ uint32_t r_index; spa_ringbuffer_get_read_index(&stream->ring, &r_index); spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes); @@ -900,6 +962,30 @@ static void on_socket_data(void *data, int fd, uint32_t mask) } } +/* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the + * not-bound -> bound transition (and NOT on bound -> not-bound). Also re-arms + * the media-lock / seq-settle state: the unlock edge is detected only inside the + * GET_COUNTERS poll (100 ms silence), so a fast unbind/rebind can leave + * media_locked_state == true and miscount the bridge-open step as + * SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */ +static void stream_input_reset_counters(struct aecp_aem_stream_input_state *si) +{ + si->counters.media_locked = 0; + si->counters.media_unlocked = 0; + si->counters.stream_interrupted = 0; + si->counters.seq_mistmatch = 0; + si->counters.media_reset = 0; + si->counters.tu = 0; + si->counters.unsupported_format = 0; + si->counters.late_timestamp = 0; + si->counters.early_timestamp = 0; + si->counters.frame_rx = 0; + si->media_locked_state = false; + si->seq_settle = 0; + si->last_frame_rx_ns = 0; + si->counters_dirty = true; +} + int stream_activate(struct stream *stream, uint16_t index, uint64_t now) { struct server *server = stream->server; @@ -908,6 +994,25 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct stream_common *common; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); + /* milan-avb: SR-class priority + VLAN id come from the MSRP Domain, not a + * hardcoded default. process_domain() re-adjusts the AVB_INTERFACE domain + * to the network-declared sr_class_priority/sr_class_vid, so this is the + * authoritative source. Read it before setup_socket() — the listener uses + * stream->vlan_id to select its VLAN sub-iface. */ + { + struct descriptor *avbif = server_find_descriptor(server, + AVB_AEM_DESC_AVB_INTERFACE, 0); + if (avbif != NULL) { + struct aecp_aem_avb_interface_state *ifs = avbif->ptr; + uint8_t dprio = ifs->domain_attr.attr.domain.sr_class_priority; + uint16_t dvid = ntohs(ifs->domain_attr.attr.domain.sr_class_vid); + if (dvid != 0 && dvid < 4095) { + stream->prio = dprio; + stream->vlan_id = dvid; + } + } + } + if (stream->source == NULL) { if ((fd = setup_socket(stream)) < 0) return fd; @@ -925,11 +1030,57 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct aecp_aem_stream_input_state *input_stream; input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); - /* lstream_attr.listener.stream_id is already populated by the - * ACMP FSM from PROBE_TX_RESPONSE. Don't overwrite it here. - * Milan Section 4.3.3.1: Listener starts in AskingFailed; notify_talker - * promotes to Ready once the Talker Advertise registrar is IN. */ - common->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; + /* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the + * media-lock / seq-settle state on the not-bound -> bound transition. */ + stream_input_reset_counters(input_stream); + + /* Prime ring with one PipeWire quantum of silence (Milan v1.2 Section 5.4.5.3). */ + spa_ringbuffer_init(&stream->ring); + if (stream->frames_per_pdu > 0) { + uint32_t prefill_pdus = 1024u / stream->frames_per_pdu; + if (prefill_pdus > 0) + pad_ringbuffer_with_silence(stream, (int)prefill_pdus); + } + + /* milan-avb: publish our contribution to graph latency so wpctl/pw-cli + * report it. Latency is the prefill: one PipeWire quantum at 48 kHz. */ + { + struct spa_latency_info latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); + uint32_t rate = stream->info.info.raw.rate ? stream->info.info.raw.rate : 48000; + uint8_t lbuf[256]; + struct spa_pod_builder lb = { 0 }; + const struct spa_pod *lp; + char buf[64]; + struct pw_properties *props; + latency.min_quantum = 1.0f; + latency.max_quantum = 1.0f; + latency.min_rate = 1024; + latency.max_rate = 1024; + latency.min_ns = (uint64_t)1024 * SPA_NSEC_PER_SEC / rate; + latency.max_ns = latency.min_ns; + spa_pod_builder_init(&lb, lbuf, sizeof(lbuf)); + lp = spa_latency_build(&lb, SPA_PARAM_Latency, &latency); + pw_stream_update_params(stream->stream, &lp, 1); + + props = pw_properties_new(NULL, NULL); + snprintf(buf, sizeof(buf), "%llu", (unsigned long long)latency.min_ns); + pw_properties_set(props, "milan.avb.latency.prefill.ns", buf); + snprintf(buf, sizeof(buf), "%u", 1024u); + pw_properties_set(props, "milan.avb.latency.prefill.frames", buf); + snprintf(buf, sizeof(buf), "%u", (unsigned)stream->frames_per_pdu); + pw_properties_set(props, "milan.avb.frames_per_pdu", buf); + pw_stream_update_properties(stream->stream, &props->dict); + pw_properties_free(props); + } + + /* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN. + * Compute from current state so a reconnect picks up an already-IN TA + * registrar (no NEW/JOIN event fires when the registrar didn't transition). */ + common->lstream_attr.param = + (common->tastream_attr.mrp != NULL && + avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN) + ? AVB_MSRP_LISTENER_PARAM_READY + : AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; avb_mrp_attribute_begin(common->lstream_attr.mrp, now); avb_mrp_attribute_join(common->lstream_attr.mrp, now, true); @@ -987,6 +1138,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) int stream_deactivate(struct stream *stream, uint64_t now) { struct stream_common *common; + struct aecp_aem_stream_input_state *si; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); pw_stream_set_active(stream->stream, false); @@ -1000,14 +1152,19 @@ int stream_deactivate(struct stream *stream, uint64_t now) stream->flush_timer = NULL; stream->flush_last_ns = 0; } -#if 0 - avb_mrp_attribute_leave(stream->vlan_attr->mrp, now); -#endif // - - if (stream->direction == SPA_DIRECTION_INPUT) + /* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the + * reservation immediately (Leave) instead of holding stale state until its + * LeaveAll timer — otherwise a stop/restart or replug to another port can't + * re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */ + if (stream->direction == SPA_DIRECTION_INPUT) { + si = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); avb_mrp_attribute_leave(common->lstream_attr.mrp, now); - else + if (si->mvrp_attr.mrp) { + avb_mrp_attribute_leave(si->mvrp_attr.mrp, now); + } + } else { avb_mrp_attribute_leave(common->tastream_attr.mrp, now); + } /* Milan Table 5.17: STREAM_STOP counter ticks each transition the * other way. */