milan-avb: stabilization extras - MRP registrar-before-notify, scoped-fd cleanup, VLAN sub-iface RX, SET_NAME validation, entity/firmware/8ch, MEDIA_UNLOCK 100ms + seq-settle, MRP Leave on teardown, reset STREAM_INPUT counters on bind

This commit is contained in:
hackerman-kl 2026-05-31 15:05:27 +02:00 committed by Wim Taymans
parent 93e4957959
commit afc7724070
8 changed files with 482 additions and 134 deletions

View file

@ -360,7 +360,11 @@ static void emit_avb_interface_counters(struct aecp *aecp, uint16_t desc_index,
* here, gated by last_counters_emit_ns. */
#define COUNTER_UNSOL_MIN_INTERVAL_NS ((int64_t)SPA_NSEC_PER_SEC)
#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(2 * SPA_NSEC_PER_MSEC))
/* A frame is "missing" only after a gap far longer than event-loop scheduling
* jitter 2 ms (16 PDU periods) is processed-not-arrived jitter, not media
* loss, and spuriously unlocks a healthy stream on a busy software listener.
* 100 ms is ~5x the listener prefill and well above loop jitter. */
#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(100 * SPA_NSEC_PER_MSEC))
static bool counter_rate_limit_elapsed(int64_t now, int64_t last_emit)
{

View file

@ -182,15 +182,17 @@ struct aecp_aem_stream_input_state {
* most recent valid PDU; media_locked_state is the current edge. */
int64_t last_frame_rx_ns;
bool media_locked_state;
/* Settle window after a (re)lock: a Listener binding mid-stream behind an
* SRP bridge sees a one-time sequence step as the bridge opens forwarding a
* beat after the talker is already transmitting. Re-seed prev_seq for this
* many post-lock PDUs instead of counting it as SEQ_NUM_MISMATCH. */
uint8_t seq_settle;
};
struct acmp_stream_status_milan_v12 {
uint64_t controller_entity_id;
/* Original talker entity_id captured at BIND_RX_COMMAND time, used
* verbatim when emitting CONNECT_TX_COMMAND probes. Needed because the
* stream_id entity_id round-trip via peer_id_from_entity_id() is
* lossy for non-EUI-64 entity_ids (e.g. MAC | entity_index). */
uint64_t talker_entity_id;
uint64_t talker_entity_id; /* IEEE 1722.1-2021 Section 7.4.6, BIND_RX_COMMAND talker_guid */
uint32_t acmp_flags;
uint8_t probing_status;
uint8_t acmp_status;

View file

@ -5,6 +5,9 @@
#include <linux/if_ether.h>
#include <linux/if_packet.h>
#include <linux/filter.h>
#include <linux/netlink.h>
#include <linux/rtnetlink.h>
#include <linux/if_link.h>
#include <linux/net_tstamp.h>
#include <limits.h>
#include <net/if.h>
@ -253,11 +256,187 @@ static int raw_transport_setup(struct server *server)
return 0;
}
/* milan-avb: hand-rolled netlink VLAN sub-iface creator.
*
* On I210-class NICs with rx-vlan-filter[fixed]=on the silicon drops
* VLAN-tagged frames whose VID is not registered. Registering happens
* implicitly when a VLAN sub-iface is added via RTM_NEWLINK. We create
* <parent>.<vid> on first use, never delete it (bounded leak: one
* sub-iface per SR class), and bind the listener stream socket to it.
*
* Fall back to PACKET_MR_PROMISC if any step fails (no CAP_NET_ADMIN,
* no 8021q module loaded, etc.). */
#define MILAN_NLALIGN(n) (((n) + 3U) & ~3U)
static int milan_nl_send(int fd, uint16_t mt, uint16_t flags,
uint32_t seq, const void *payload, size_t plen)
{
struct {
struct nlmsghdr nlh;
char body[2048];
} msg = { 0 };
size_t need = NLMSG_HDRLEN + plen;
if (need > sizeof(msg))
return -EMSGSIZE;
msg.nlh.nlmsg_len = need;
msg.nlh.nlmsg_type = mt;
msg.nlh.nlmsg_flags = flags;
msg.nlh.nlmsg_seq = seq;
msg.nlh.nlmsg_pid = 0;
if (plen)
memcpy(msg.body, payload, plen);
if (send(fd, &msg, need, 0) < 0)
return -errno;
return 0;
}
static int milan_nl_recv_ack(int fd, uint32_t seq)
{
char buf[4096];
for (;;) {
ssize_t n = recv(fd, buf, sizeof(buf), 0);
size_t off = 0;
if (n < 0)
return -errno;
while (off + NLMSG_HDRLEN <= (size_t)n) {
struct nlmsghdr *h = (struct nlmsghdr *)(buf + off);
if (h->nlmsg_len < NLMSG_HDRLEN ||
off + h->nlmsg_len > (size_t)n)
break;
if (h->nlmsg_type == NLMSG_ERROR && h->nlmsg_seq == seq) {
struct nlmsgerr *e = NLMSG_DATA(h);
return e->error;
}
off += NLMSG_ALIGN(h->nlmsg_len);
}
}
}
static size_t milan_nl_attr(char *p, uint16_t type, const void *val, uint16_t vlen)
{
struct rtattr *r = (struct rtattr *)p;
size_t total;
r->rta_len = RTA_LENGTH(vlen);
r->rta_type = type;
memcpy(RTA_DATA(r), val, vlen);
total = RTA_ALIGN(r->rta_len);
if (total > (size_t)r->rta_len)
memset(p + r->rta_len, 0, total - r->rta_len);
return total;
}
/* Returns ifindex (>0) or -errno. */
static int milan_get_ifindex(const char *name)
{
int fd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0);
struct ifreq req;
int res, saved;
if (fd < 0)
return -errno;
spa_zero(req);
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", name);
res = ioctl(fd, SIOCGIFINDEX, &req);
saved = -errno;
close(fd);
return res < 0 ? saved : req.ifr_ifindex;
}
/* Ensure <parent>.<vid> exists, is UP, and is a VLAN sub-iface of <parent>.
* Writes the sub-iface name to out and returns 0 on success. */
static int milan_ensure_vlan_iface(const char *parent_ifname, uint16_t vid,
char *out, size_t out_size)
{
int rc, existing, parent_idx, nlfd, err, new_idx;
struct sockaddr_nl sa = { .nl_family = AF_NETLINK };
char payload[256];
char *p = payload;
struct ifinfomsg ifi = { 0 };
uint32_t pidx;
char nested[64];
char *np = nested;
const char kind[] = "vlan";
char data[16];
char *dp = data;
uint16_t vidv = vid;
uint32_t seq = 1;
struct ifinfomsg up_ifi = { 0 };
if (vid == 0 || vid >= 4095)
return -EINVAL;
rc = snprintf(out, out_size, "%s.%u", parent_ifname, (unsigned)vid);
if (rc < 0 || (size_t)rc >= out_size)
return -ENAMETOOLONG;
/* If the sub-iface already exists, assume it is what we want and reuse. */
existing = milan_get_ifindex(out);
if (existing > 0)
return 0;
parent_idx = milan_get_ifindex(parent_ifname);
if (parent_idx <= 0)
return parent_idx ? parent_idx : -ENODEV;
nlfd = socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_ROUTE);
if (nlfd < 0)
return -errno;
if (bind(nlfd, (struct sockaddr *)&sa, sizeof(sa)) < 0) {
int e = -errno; close(nlfd); return e;
}
/* RTM_NEWLINK: ifinfomsg + IFLA_IFNAME + IFLA_LINK + IFLA_LINKINFO{KIND=vlan, DATA{VLAN_ID}} */
ifi.ifi_family = AF_UNSPEC;
ifi.ifi_change = 0xFFFFFFFFu;
memcpy(p, &ifi, sizeof(ifi)); p += sizeof(ifi);
p += milan_nl_attr(p, IFLA_IFNAME, out, (uint16_t)(strlen(out) + 1));
pidx = (uint32_t)parent_idx;
p += milan_nl_attr(p, IFLA_LINK, &pidx, sizeof(pidx));
/* LINKINFO is nested: KIND=vlan, DATA={VLAN_ID=vid} */
np += milan_nl_attr(np, IFLA_INFO_KIND, kind, sizeof(kind));
dp += milan_nl_attr(dp, IFLA_VLAN_ID, &vidv, sizeof(vidv));
np += milan_nl_attr(np, IFLA_INFO_DATA, data, (uint16_t)(dp - data));
p += milan_nl_attr(p, IFLA_LINKINFO, nested, (uint16_t)(np - nested));
err = milan_nl_send(nlfd, RTM_NEWLINK,
NLM_F_REQUEST | NLM_F_CREATE | NLM_F_EXCL | NLM_F_ACK,
seq, payload, (size_t)(p - payload));
if (err == 0)
err = milan_nl_recv_ack(nlfd, seq);
if (err != 0 && err != -EEXIST) {
close(nlfd);
return err;
}
/* Bring it UP. */
new_idx = milan_get_ifindex(out);
if (new_idx <= 0) {
close(nlfd);
return new_idx ? new_idx : -ENODEV;
}
up_ifi.ifi_family = AF_UNSPEC;
up_ifi.ifi_index = new_idx;
up_ifi.ifi_flags = IFF_UP;
up_ifi.ifi_change = IFF_UP;
err = milan_nl_send(nlfd, RTM_NEWLINK, NLM_F_REQUEST | NLM_F_ACK,
++seq, &up_ifi, sizeof(up_ifi));
if (err == 0)
err = milan_nl_recv_ack(nlfd, seq);
close(nlfd);
if (err != 0)
return err;
return 0;
}
static int raw_stream_setup_socket(struct server *server, struct stream *stream)
{
int res;
char buf[128];
struct ifreq req;
const char *bind_ifname = server->ifname;
char vlan_ifname[IFNAMSIZ];
bool used_vlan_subiface = false;
spa_autoclose int fd = socket(AF_PACKET, SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK, htons(ETH_P_ALL));
if (fd < 0) {
@ -265,8 +444,30 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream)
return -errno;
}
/* For listener RX: route stream via a VLAN sub-iface so the NIC's
* hardware filter accepts VID-tagged AAF without promisc on parent. */
/* Listener-only: route stream RX via a VLAN sub-iface so the NIC accepts
* VID-tagged AAF without promisc on parent. OUTPUT direction stays on
* the parent because setup_pdu_milan_v12() already inserts a manual
* 802.1Q tag in the PDU header the kernel would add a second tag
* (QinQ) if we bound the talker socket to enp6s0.<vid>. */
if (stream->direction == SPA_DIRECTION_INPUT && stream->vlan_id > 0) {
int e = milan_ensure_vlan_iface(server->ifname,
(uint16_t)stream->vlan_id,
vlan_ifname, sizeof(vlan_ifname));
if (e == 0) {
bind_ifname = vlan_ifname;
used_vlan_subiface = true;
pw_log_info("milan-avb: listener RX via VLAN sub-iface %s (vid %d)",
vlan_ifname, stream->vlan_id);
} else {
pw_log_warn("milan-avb: VLAN sub-iface setup failed (%d), "
"falling back to PACKET_MR_PROMISC on parent", -e);
}
}
spa_zero(req);
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname);
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", bind_ifname);
res = ioctl(fd, SIOCGIFINDEX, &req);
if (res < 0) {
pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname);
@ -319,6 +520,18 @@ static int raw_stream_setup_socket(struct server *server, struct stream *stream)
pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m");
return -errno;
}
/* Fallback: lift promisc only when the VLAN sub-iface path didn't
* take. With the sub-iface, the NIC accepts VID 2 natively. */
if (!used_vlan_subiface) {
spa_zero(mreq);
mreq.mr_ifindex = req.ifr_ifindex;
mreq.mr_type = PACKET_MR_PROMISC;
res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
&mreq, sizeof(struct packet_mreq));
if (res < 0)
pw_log_warn("setsockopt(PACKET_MR_PROMISC) fallback failed: %m");
}
}
return spa_steal_fd(fd);
}

View file

@ -4,6 +4,10 @@
/* SPDX-FileCopyrightText: Copyright © 2025 Simon Gapp <simon.gapp@kebag-logic.com> */
/* SPDX-License-Identifier: MIT */
#include <unistd.h>
#include <pipewire/pipewire.h>
#include "adp.h"
#include "aecp-aem.h"
#include "aecp-aem-types.h"
@ -300,23 +304,26 @@ static void init_descriptor_legacy_avb(struct server *server)
static void init_descriptor_milan_v12(struct server *server)
{
/* name the entity after the hostname so each box shows as pw0/pw1/pw2 */
char hostname[64] = {0};
if (gethostname(hostname, sizeof(hostname) - 1) != 0 || hostname[0] == '\0')
snprintf(hostname, sizeof(hostname), "%s", DSC_STRINGS_0_DEVICE_NAME);
// TODO PERSISTENCE: retrieve the saved buffers.
/**************************************************************************************/
/* IEEE 1722.1-2021, Sec. 7.2.12 - STRINGS Descriptor
* Up to 7 localized strings
*/
es_builder_add_descriptor(server, AVB_AEM_DESC_STRINGS, 0,
sizeof(struct avb_aem_desc_strings),
&(struct avb_aem_desc_strings)
{
.string_0 = DSC_STRINGS_0_DEVICE_NAME,
struct avb_aem_desc_strings strings = {
.string_1 = DSC_STRINGS_1_CONFIGURATION_NAME,
.string_2 = DSC_STRINGS_2_MANUFACTURER_NAME,
.string_3 = DSC_STRINGS_3_GROUP_NAME,
.string_4 = DSC_STRINGS_4_MAINTAINER_0,
.string_5 = DSC_STRINGS_4_MAINTAINER_1,
}
);
};
snprintf(strings.string_0, sizeof(strings.string_0), "%s", hostname);
es_builder_add_descriptor(server, AVB_AEM_DESC_STRINGS, 0,
sizeof(strings), &strings);
/**************************************************************************************/
/* IEEE 1722.1-2021, Sec. 7.2.11 - LOCALE Descriptor */
@ -334,8 +341,7 @@ static void init_descriptor_milan_v12(struct server *server)
/* Milan v1.2, Sec. 5.3.3.1 */
struct avb_entity_config entity_conf = conf_load_entity(server->impl->props);
struct avb_aem_desc_entity entity_desc =
{
struct avb_aem_desc_entity entity = {
.entity_id = htobe64(server->entity_id),
.entity_model_id = htobe64(DSC_ENTITY_MODEL_ID),
.entity_capabilities = htonl(entity_conf.entity_capabilities),
@ -351,21 +357,19 @@ static void init_descriptor_milan_v12(struct server *server)
.available_index = htonl(DSC_ENTITY_MODEL_AVAILABLE_INDEX),
.association_id = htobe64(DSC_ENTITY_MODEL_ASSOCIATION_ID),
.vendor_name_string = htons(entity_conf.vendor_name),
.model_name_string = htons(entity_conf.model_name),
.vendor_name_string = htons(DSC_ENTITY_MODEL_VENDOR_NAME_STRING),
.model_name_string = htons(DSC_ENTITY_MODEL_MODEL_NAME_STRING),
.group_name = DSC_ENTITY_MODEL_GROUP_NAME,
.serial_number = DSC_ENTITY_MODEL_SERIAL_NUMBER,
.configurations_count = htons(DSC_ENTITY_MODEL_CONFIGURATIONS_COUNT),
.current_configuration = htons(DSC_ENTITY_MODEL_CURRENT_CONFIGURATION)
};
memcpy(entity_desc.entity_name, entity_conf.entity_name, sizeof(entity_desc.entity_name));
memcpy(entity_desc.firmware_version, entity_conf.firmware_version, sizeof(entity_desc.firmware_version));
memcpy(entity_desc.group_name, entity_conf.group_name, sizeof(entity_desc.group_name));
memcpy(entity_desc.serial_number, entity_conf.serial_number, sizeof(entity_desc.serial_number));
snprintf(entity.entity_name, sizeof(entity.entity_name), "%s", hostname);
/* firmware_version = the canonical PipeWire library version */
snprintf(entity.firmware_version, sizeof(entity.firmware_version), "%s", pw_get_library_version());
es_builder_add_descriptor(server, AVB_AEM_DESC_ENTITY, 0,
sizeof(struct avb_aem_desc_entity),
&entity_desc);
sizeof(entity), &entity);
/**************************************************************************************/
/* IEEE 1722.1-2021, Sec. 7.2.2 - CONFIGURATION Descriptor*/
/* Milan v1.2, Sec. 5.3.3.2 */

View file

@ -319,9 +319,11 @@ BUILD_SAMPLING_RATE(DSC_AUDIO_UNIT_SAMPLING_RATE_PULL, DSC_AUDIO_UNIT_SAMPLING_R
#define DSC_STREAM_INPUT_LOCALIZED_DESCRIPTION AVB_AEM_DESC_INVALID
#define DSC_STREAM_INPUT_CLOCK_DOMAIN_INDEX 0
#define DSC_STREAM_INPUT_STREAM_FLAGS (AVB_AEM_DESC_STREAM_FLAG_SYNC_SOURCE | AVB_AEM_DESC_STREAM_FLAG_CLASS_A)
// To match my talker
// Default current_format = AAF INT32/48k/8ch, to match the 8-channel PipeWire talker
// (pw1) without a per-bind set-format. The supported list below keeps the smaller
// channel counts (1/2/4/6) so 4ch talkers like the DS20 still bind via set-format.
// TODO: Define based on AUDIO_UNIT etc.
#define DSC_STREAM_INPUT_CURRENT_FORMAT 0x0205022001006000ULL
#define DSC_STREAM_INPUT_CURRENT_FORMAT 0x0205022002006000ULL
// TODO: Is 132 here, should be 138 according to spec
#define DSC_STREAM_INPUT_FORMATS_OFFSET (4 + sizeof(struct avb_aem_desc_stream))
@ -342,7 +344,7 @@ BUILD_SAMPLING_RATE(DSC_AUDIO_UNIT_SAMPLING_RATE_PULL, DSC_AUDIO_UNIT_SAMPLING_R
#define DSC_STREAM_INPUT_AVB_INTERFACE_INDEX 0
#define DSC_STREAM_INPUT_BUFFER_LENGTH_IN_NS 2126000
#define DSC_STREAM_INPUT_FORMATS_0 DSC_STREAM_INPUT_CURRENT_FORMAT
#define DSC_STREAM_INPUT_FORMATS_0 0x0205022001006000ULL /* 4ch (DS20) — kept supported */
#define DSC_STREAM_INPUT_FORMATS_1 0x0205022000406000ULL
#define DSC_STREAM_INPUT_FORMATS_2 0x0205022000806000ULL
#define DSC_STREAM_INPUT_FORMATS_3 0x0205022001806000ULL

View file

@ -108,16 +108,7 @@ static void mrp_periodic(void *data, uint64_t now)
if (now > mrp->lva_timer.leave_all_timeout) {
/* IEEE 802.1Q-2018 §10.7.5.20: when our LeaveAll timer fires, we
* become the TRANSMITTER of LeaveAll. Mark lva_tx_pending so the
* next TX cycle includes the LVA PDU. We must NOT apply RX_LVA to
* our own attributes per Table 10-4 the registrar transitions
* INLV only on RECEIVED LV/LVA, not on our own transmission.
* The old global_event(RX_LVA) here flipped our own SR-class
* Domain registrars to LV for a full 1 s leave_timeout window,
* during which the talker reads our Domain as "leaving" and
* reports MSRP TalkerFailed (failure_code 13). That broke the
* stream every 1015 s. */
/* IEEE 802.1Q-2018 Section 10.7.5.20: own LVA timer => TX path only, no RX_LVA */
mrp->lva_timer.state = FSM_LVA_ACTIVE;
if (mrp->lva_timer.leave_all_timeout > 0) {
mrp->lva_tx_pending = true;
@ -452,11 +443,7 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now
}
break;
case AVB_MRP_EVENT_TX_LVA:
/* IEEE 802.1Q-2018 Table 10-4: TX events do NOT trigger
* registrar transitions. The previous code lumped TX_LVA with
* RX_LVA and so transitioned our own registrar INLV for the
* full 1 s leave_timeout, briefly hiding the peer's attribute
* from upper layers. Leave the registrar untouched on TX_LVA. */
/* IEEE 802.1Q-2018 Table 10-4: TX events do not transition the registrar */
break;
case AVB_MRP_EVENT_FLUSH:
switch (state) {
@ -477,11 +464,10 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now
default:
break;
}
if (notify) {
mrp_attribute_emit_notify(a, now, notify);
mrp_emit_notify(mrp, now, &a->attr, notify);
}
/* commit registrar_state BEFORE notify: callbacks (e.g. notify_talker ->
* refresh_listener_param) read the registrar state, so they must see the new
* one. Emitting notify first made the Listener latch AskingFailed off a stale
* MT state and never recompute -> SRP bridge refused to forward the stream. */
if (a->registrar_state != state || notify) {
pw_log_debug("REG: attr %p: %s %s %s -> %s notify=%s", a, a->attr.name,
avb_mrp_event_name(event), avb_registrar_state_name(a->registrar_state),
@ -490,6 +476,11 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now
a->registrar_state = state;
}
if (notify) {
mrp_attribute_emit_notify(a, now, notify);
mrp_emit_notify(mrp, now, &a->attr, notify);
}
state = a->applicant_state;
switch (event) {

View file

@ -58,6 +58,16 @@ static void debug_msrp_talker(const struct avb_packet_msrp_talker *t)
/* IEEE 802.1Q Section 35.2.2.4.4: Listener may declare Ready only once the matching
* Talker Advertise is registered; otherwise it stays in AskingFailed. */
/* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN, else AskingFailed */
static void refresh_listener_param(struct stream_common *sc)
{
bool ta_in = sc->tastream_attr.mrp != NULL &&
avb_mrp_attribute_get_registrar_state(sc->tastream_attr.mrp) == AVB_MRP_IN;
sc->lstream_attr.param = ta_in
? AVB_MSRP_LISTENER_PARAM_READY
: AVB_MSRP_LISTENER_PARAM_ASKING_FAILED;
}
static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify)
{
struct stream_common *sc;
@ -76,12 +86,8 @@ static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, ui
sc = SPA_CONTAINER_OF(attr->attr, struct stream_common, tastream_attr);
if (sc->stream.direction == SPA_DIRECTION_INPUT) {
if (notify == AVB_MRP_NOTIFY_NEW || notify == AVB_MRP_NOTIFY_JOIN)
sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_READY;
else if (notify == AVB_MRP_NOTIFY_LEAVE)
sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED;
/* Milan Table 5.10: TA registrar state flips flags_ex.REGISTERING
* in the listener-side GET_STREAM_INFO answer emit an unsol. */
refresh_listener_param(sc);
/* Milan Table 5.10: TA registrar state flips flags_ex.REGISTERING */
avb_aecp_aem_mark_stream_info_dirty(msrp->server,
AVB_AEM_DESC_STREAM_INPUT, sc->stream.index);
}
@ -103,21 +109,7 @@ static void notify_talker_failed(struct msrp *msrp, uint64_t now, struct attr *a
sc = SPA_CONTAINER_OF(attr->attr, struct stream_common, tfstream_attr);
if (sc->stream.direction == SPA_DIRECTION_INPUT) {
/* IEEE 802.1Q-2018 §35.2.2.4.4 + Milan v1.2 §4.3.3.1: when the
* Talker has transitioned to TalkerFailed, the Listener must
* declare AskingFailed so the Talker has the recovery signal
* ("I still want this stream, please re-evaluate"). Without
* this, the Listener was stuck at the previous Ready
* declaration from before the failure, the Talker read
* "listener is satisfied" and never re-attempted the advertise.
* Symptom on the wire: Talker oscillates Advertise Failed
* silence; stream drops after a few seconds and never resumes
* for the same bind. */
if (notify == AVB_MRP_NOTIFY_NEW || notify == AVB_MRP_NOTIFY_JOIN)
sc->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED;
/* Milan Table 5.10: TF registrar state also flips
* flags_ex.REGISTERING on the listener side; emit an unsol
* when it changes. */
refresh_listener_param(sc);
avb_aecp_aem_mark_stream_info_dirty(msrp->server,
AVB_AEM_DESC_STREAM_INPUT, sc->stream.index);
}
@ -340,35 +332,18 @@ static int process_domain(struct msrp *msrp, uint64_t now, uint8_t attr_type,
}
if (msrp->server->avb_mode == AVB_MODE_MILAN_V12) {
/* Milan v1.2 §4.2.7.2.1: "re-adjust the domain" means
* align the priority/vid of the matching SR class NOT
* overwrite every locally-held Domain attribute with the
* incoming values. The old code iterated all Domain
* attributes and smashed each one whose (id, prio, vid)
* didn't match the incoming, so when the talker
* advertised SR-B (id=5) both of pipewire's Domain
* attributes (originally SR-A id=6 and SR-B id=5) were
* overwritten to SR-B; when the talker then advertised
* SR-A, both flipped back to SR-A. Pipewire transmitted
* only one of the two SR classes at any moment, so the
* talker never saw a stable SR-A Domain declaration from
* pipewire and locked into TalkerFailed code 13.
*
* Correct re-adjust: only the locally-held attribute
* with the SAME sr_class_id gets its priority/vid
* updated. */
/* Milan v1.2 Section 4.2.7.2.1: re-adjust scoped to matching sr_class_id only */
bool mismatch;
if (a->attr->attr.domain.sr_class_id != d->sr_class_id)
continue;
bool mismatch = (a->attr->attr.domain.sr_class_priority != d->sr_class_priority
mismatch = (a->attr->attr.domain.sr_class_priority != d->sr_class_priority
|| a->attr->attr.domain.sr_class_vid != d->sr_class_vid);
if (mismatch) {
pw_log_info("Domain re-adjust (sr_class_id=%u): prio %u->%u vid %u->%u",
d->sr_class_id,
a->attr->attr.domain.sr_class_priority,
d->sr_class_priority,
ntohs(a->attr->attr.domain.sr_class_vid),
pw_log_info("Domain re-adjust sr_class_id=%u prio %u->%u vid %u->%u",
d->sr_class_id, a->attr->attr.domain.sr_class_priority,
d->sr_class_priority, ntohs(a->attr->attr.domain.sr_class_vid),
ntohs(d->sr_class_vid));
a->attr->attr.domain.sr_class_priority = d->sr_class_priority;
a->attr->attr.domain.sr_class_vid = d->sr_class_vid;

View file

@ -82,17 +82,18 @@
*
* Per-counter wiring status (Milan Section 5.4.5.3, Table 5.16 Stream Input):
* FRAMES_RX live: handle_aaf_packet / handle_iec61883_packet
* STREAM_INTERRUPTED live: ringbuffer overrun in the same handlers
* STREAM_INTERRUPTED live: handle_aaf_packet, on the loss of several
* AVTPDUs (seq gap >= AVB_STREAM_INTERRUPT_MIN_LOST)
* MEDIA_LOCKED live: first-frame edge in handle_*_packet
* MEDIA_UNLOCKED live: cmd-get-counters periodic when last_frame_rx_ns
* ages past MEDIA_UNLOCK_TIMEOUT_NS
* SEQ_NUM_MISMATCH TODO: compare p->seq_num against expected (last + 1
* modulo 256), tick on mismatch and resync expected
* SEQ_NUM_MISMATCH live: handle_aaf_packet, p->seq_num != expected
* (last + 1 mod 256); resyncs expected each frame
* MEDIA_RESET_IN TODO: tick when AVTPDU header sets the mr bit
* (header reset notification)
* TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header
* UNSUPPORTED_FORMAT TODO: tick when subtype/format mismatch the bound
* descriptor's current_format
* UNSUPPORTED_FORMAT live: handle_aaf_packet drops + ticks any AAF PDU
* whose media format is not the Milan base format
* LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now
* (frame missed its presentation deadline)
* EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time
@ -118,6 +119,7 @@
#include <spa/debug/mem.h>
#include <spa/pod/builder.h>
#include <spa/param/audio/format-utils.h>
#include <spa/param/latency-utils.h>
#include "aaf.h"
#include "iec61883.h"
@ -263,17 +265,26 @@ static void on_source_stream_process(void *data)
avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
if (avail < wanted) {
pw_log_debug("capture underrun %d < %d", avail, wanted);
/* Milan v1.2 Section 5.4.5.3: partial-read on underrun, zero-pad tail. */
if (avail <= 0) {
memset(d[0].data, 0, n_bytes);
} else {
} else if ((uint32_t)avail >= n_bytes) {
spa_ringbuffer_read_data(&stream->ring,
stream->buffer_data,
stream->buffer_size,
index % stream->buffer_size,
d[0].data, n_bytes);
index += n_bytes;
spa_ringbuffer_read_update(&stream->ring, index);
spa_ringbuffer_read_update(&stream->ring, index + n_bytes);
} else {
uint32_t use = (uint32_t)avail;
spa_ringbuffer_read_data(&stream->ring,
stream->buffer_data,
stream->buffer_size,
index % stream->buffer_size,
d[0].data, use);
memset(SPA_PTROFF(d[0].data, use, void), 0, n_bytes - use);
spa_ringbuffer_read_update(&stream->ring, index + use);
pw_log_debug("capture partial-underrun %u/%u", use, n_bytes);
}
d[0].chunk->size = n_bytes;
@ -614,7 +625,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
if (stream->format)
avb_aem_stream_format_decode(stream->format, &fi);
stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE;
stream->info.info.raw.format = SPA_AUDIO_FORMAT_S32_BE;
stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED;
stream->info.info.raw.rate = fi.is_audio && fi.rate ? fi.rate : 48000;
stream->info.info.raw.channels = fi.is_audio && fi.channels ? fi.channels : 8;
@ -732,6 +743,14 @@ error_free:
void stream_destroy(struct stream *stream)
{
struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream);
struct timespec now_ts;
uint64_t now = 0;
/* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart
* or replug doesn't strand a stale reservation on the bridge (socket still open here). */
if (clock_gettime(CLOCK_TAI, &now_ts) == 0)
now = SPA_TIMESPEC_TO_NSEC(&now_ts);
stream_deactivate(stream, now);
if (stream->direction == SPA_DIRECTION_INPUT) {
struct aecp_aem_stream_input_state *si =
@ -753,6 +772,30 @@ static int setup_socket(struct stream *stream)
return avb_server_stream_setup_socket(stream->server, stream);
}
/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback is interrupted by the loss of
* "several" AVTPDUs (the spec leaves the count implementation-defined). A
* single dropped/reordered PDU is a SEQ_NUM_MISMATCH but not a full
* interruption; a gap of this many or more missing PDUs is. */
#define AVB_STREAM_INTERRUPT_MIN_LOST 2
/* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and
* NOT counted as SEQ_NUM_MISMATCH covers the one-time bind/SRP-path-open gap of
* a Listener that joins mid-stream. Small, so genuine ongoing loss still counts. */
#define AVB_STREAM_SEQ_SETTLE 8
/* Milan v1.2 Section 5.4: the listener supports only the Milan base stream
* format for decode AAF PCM, 32-bit integer, 48 kHz, non-sparse. Channel
* count is a stream parameter (the ring buffers by data_len), not part of the
* format check, so any Milan channel count passes. */
static inline bool aaf_is_milan_format(const struct avb_packet_aaf *p)
{
return p->subtype == AVB_SUBTYPE_AAF &&
p->format == AVB_AAF_FORMAT_INT_32BIT &&
p->nsr == AVB_AAF_PCM_NSR_48KHZ &&
p->bit_depth == 32 &&
p->sp == AVB_AAF_PCM_SP_NORMAL;
}
static void handle_aaf_packet(struct stream *stream,
struct avb_packet_aaf *p, int len)
{
@ -764,32 +807,54 @@ static void handle_aaf_packet(struct stream *stream,
filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
n_bytes = ntohs(p->data_len);
if (n_bytes > (uint32_t)(len - (int)sizeof(*p)))
return;
/* IEEE 1722.1 Section 7.4.42 / Milan v1.2 Section 5.4.5.3: FRAMES_RX counts
* every valid AVTPDU received on the wire independent of whether the
* listener pipeline could absorb it. */
/* milan-avb: support only the Milan format. EVERY received frame that is
* not a well-formed Milan AAF PDU bad length, or subtype/format/sample-
* rate/bit-depth/sparse not the Milan base format bumps UNSUPPORTED_FORMAT
* and is dropped, per frame: not counted as a valid frame, not media-locked,
* not written. (Channel count is a stream parameter, not part of the format
* check, so a valid 4ch talker like the DS20 is not flagged.) */
if (n_bytes > (uint32_t)(len - (int)sizeof(*p)) || !aaf_is_milan_format(p)) {
cnt->unsupported_format++;
stream_in_mark_counters_dirty(stream);
return;
}
/* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every
* valid AVTPDU received on the wire independent of whether the listener
* pipeline could absorb it. */
cnt->frame_rx++;
clock_gettime(CLOCK_MONOTONIC, &now_ts);
si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts);
if (!si->media_locked_state) {
cnt->media_locked++;
si->media_locked_state = true;
stream->prev_seq = p->seq_num; /* (re)lock: seed seq, no gap */
si->seq_settle = AVB_STREAM_SEQ_SETTLE; /* grace the bind/path-open step */
} else if (si->seq_settle > 0) {
/* settling just after a (re)lock: a Listener that binds mid-stream
* behind an SRP bridge gets a one-time sequence step as the bridge
* opens forwarding re-seed and don't count it. */
si->seq_settle--;
stream->prev_seq = p->seq_num;
} else {
uint8_t expected = (uint8_t)(stream->prev_seq + 1);
if (p->seq_num != expected) {
/* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence
* discontinuity (loss, reorder or duplicate). */
uint8_t lost = (uint8_t)(p->seq_num - expected);
cnt->seq_mistmatch++;
/* STREAM_INTERRUPTED only when several PDUs are missing. */
if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST)
cnt->stream_interrupted++;
}
stream->prev_seq = p->seq_num;
}
if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) {
/* Ringbuffer overrun. Per Milan v1.2 Section 5.4.5.3 the
* STREAM_INTERRUPTED counter is reserved for stream-level
* interruptions (e.g. loss of the SRP TalkerAdvertise) and
* MUST NOT be bumped per dropped frame. Without a consumer
* draining the ring (e.g. an ALSA sink hooked up), this branch
* would fire on every received AVTPDU and look identical to a
* total link failure on the controller dashboard defeating
* the goals.md "error counters stay zero" target. Drop the
* frame silently; an out-of-band metric should be used to
* surface unconsumed-stream conditions. */
/* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */
uint32_t r_index;
spa_ringbuffer_get_read_index(&stream->ring, &r_index);
spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes);
@ -833,10 +898,7 @@ static void handle_iec61883_packet(struct stream *stream,
}
if (filled + n_bytes > stream->buffer_size) {
/* Same reasoning as the AAF handler above: do NOT bump
* STREAM_INTERRUPTED on a per-frame ringbuffer overrun.
* Milan v1.2 §5.4.5.3 reserves that counter for stream-level
* interruptions (lost SRP TalkerAdvertise, etc.). */
/* Milan v1.2 Section 5.4.5.3: STREAM_INTERRUPTED is stream-level, not per-frame overrun */
uint32_t r_index;
spa_ringbuffer_get_read_index(&stream->ring, &r_index);
spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes);
@ -900,6 +962,30 @@ static void on_socket_data(void *data, int fd, uint32_t mask)
}
}
/* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the
* not-bound -> bound transition (and NOT on bound -> not-bound). Also re-arms
* the media-lock / seq-settle state: the unlock edge is detected only inside the
* GET_COUNTERS poll (100 ms silence), so a fast unbind/rebind can leave
* media_locked_state == true and miscount the bridge-open step as
* SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */
static void stream_input_reset_counters(struct aecp_aem_stream_input_state *si)
{
si->counters.media_locked = 0;
si->counters.media_unlocked = 0;
si->counters.stream_interrupted = 0;
si->counters.seq_mistmatch = 0;
si->counters.media_reset = 0;
si->counters.tu = 0;
si->counters.unsupported_format = 0;
si->counters.late_timestamp = 0;
si->counters.early_timestamp = 0;
si->counters.frame_rx = 0;
si->media_locked_state = false;
si->seq_settle = 0;
si->last_frame_rx_ns = 0;
si->counters_dirty = true;
}
int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
{
struct server *server = stream->server;
@ -908,6 +994,25 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
struct stream_common *common;
common = SPA_CONTAINER_OF(stream, struct stream_common, stream);
/* milan-avb: SR-class priority + VLAN id come from the MSRP Domain, not a
* hardcoded default. process_domain() re-adjusts the AVB_INTERFACE domain
* to the network-declared sr_class_priority/sr_class_vid, so this is the
* authoritative source. Read it before setup_socket() the listener uses
* stream->vlan_id to select its VLAN sub-iface. */
{
struct descriptor *avbif = server_find_descriptor(server,
AVB_AEM_DESC_AVB_INTERFACE, 0);
if (avbif != NULL) {
struct aecp_aem_avb_interface_state *ifs = avbif->ptr;
uint8_t dprio = ifs->domain_attr.attr.domain.sr_class_priority;
uint16_t dvid = ntohs(ifs->domain_attr.attr.domain.sr_class_vid);
if (dvid != 0 && dvid < 4095) {
stream->prio = dprio;
stream->vlan_id = dvid;
}
}
}
if (stream->source == NULL) {
if ((fd = setup_socket(stream)) < 0)
return fd;
@ -925,11 +1030,57 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
struct aecp_aem_stream_input_state *input_stream;
input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common);
/* lstream_attr.listener.stream_id is already populated by the
* ACMP FSM from PROBE_TX_RESPONSE. Don't overwrite it here.
* Milan Section 4.3.3.1: Listener starts in AskingFailed; notify_talker
* promotes to Ready once the Talker Advertise registrar is IN. */
common->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED;
/* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the
* media-lock / seq-settle state on the not-bound -> bound transition. */
stream_input_reset_counters(input_stream);
/* Prime ring with one PipeWire quantum of silence (Milan v1.2 Section 5.4.5.3). */
spa_ringbuffer_init(&stream->ring);
if (stream->frames_per_pdu > 0) {
uint32_t prefill_pdus = 1024u / stream->frames_per_pdu;
if (prefill_pdus > 0)
pad_ringbuffer_with_silence(stream, (int)prefill_pdus);
}
/* milan-avb: publish our contribution to graph latency so wpctl/pw-cli
* report it. Latency is the prefill: one PipeWire quantum at 48 kHz. */
{
struct spa_latency_info latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT);
uint32_t rate = stream->info.info.raw.rate ? stream->info.info.raw.rate : 48000;
uint8_t lbuf[256];
struct spa_pod_builder lb = { 0 };
const struct spa_pod *lp;
char buf[64];
struct pw_properties *props;
latency.min_quantum = 1.0f;
latency.max_quantum = 1.0f;
latency.min_rate = 1024;
latency.max_rate = 1024;
latency.min_ns = (uint64_t)1024 * SPA_NSEC_PER_SEC / rate;
latency.max_ns = latency.min_ns;
spa_pod_builder_init(&lb, lbuf, sizeof(lbuf));
lp = spa_latency_build(&lb, SPA_PARAM_Latency, &latency);
pw_stream_update_params(stream->stream, &lp, 1);
props = pw_properties_new(NULL, NULL);
snprintf(buf, sizeof(buf), "%llu", (unsigned long long)latency.min_ns);
pw_properties_set(props, "milan.avb.latency.prefill.ns", buf);
snprintf(buf, sizeof(buf), "%u", 1024u);
pw_properties_set(props, "milan.avb.latency.prefill.frames", buf);
snprintf(buf, sizeof(buf), "%u", (unsigned)stream->frames_per_pdu);
pw_properties_set(props, "milan.avb.frames_per_pdu", buf);
pw_stream_update_properties(stream->stream, &props->dict);
pw_properties_free(props);
}
/* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN.
* Compute from current state so a reconnect picks up an already-IN TA
* registrar (no NEW/JOIN event fires when the registrar didn't transition). */
common->lstream_attr.param =
(common->tastream_attr.mrp != NULL &&
avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN)
? AVB_MSRP_LISTENER_PARAM_READY
: AVB_MSRP_LISTENER_PARAM_ASKING_FAILED;
avb_mrp_attribute_begin(common->lstream_attr.mrp, now);
avb_mrp_attribute_join(common->lstream_attr.mrp, now, true);
@ -987,6 +1138,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
int stream_deactivate(struct stream *stream, uint64_t now)
{
struct stream_common *common;
struct aecp_aem_stream_input_state *si;
common = SPA_CONTAINER_OF(stream, struct stream_common, stream);
pw_stream_set_active(stream->stream, false);
@ -1000,14 +1152,19 @@ int stream_deactivate(struct stream *stream, uint64_t now)
stream->flush_timer = NULL;
stream->flush_last_ns = 0;
}
#if 0
avb_mrp_attribute_leave(stream->vlan_attr->mrp, now);
#endif //
if (stream->direction == SPA_DIRECTION_INPUT)
/* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the
* reservation immediately (Leave) instead of holding stale state until its
* LeaveAll timer otherwise a stop/restart or replug to another port can't
* re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */
if (stream->direction == SPA_DIRECTION_INPUT) {
si = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common);
avb_mrp_attribute_leave(common->lstream_attr.mrp, now);
else
if (si->mvrp_attr.mrp) {
avb_mrp_attribute_leave(si->mvrp_attr.mrp, now);
}
} else {
avb_mrp_attribute_leave(common->tastream_attr.mrp, now);
}
/* Milan Table 5.17: STREAM_STOP counter ticks each transition the
* other way. */