From 6cc669e4e23bd9672851db4a2b92e8bd367e50f6 Mon Sep 17 00:00:00 2001 From: hackerman-kl Date: Sat, 25 Apr 2026 10:20:36 +0200 Subject: [PATCH] milan-avb: stream: Milan listener registrar and stream-output prep --- src/modules/module-avb/stream.c | 54 +++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index 1009c50f8..87b669eae 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -16,8 +16,10 @@ #include "aaf.h" #include "iec61883.h" #include "stream.h" +#include "aecp-aem.h" #include "aecp-aem-state.h" #include "acmp-cmds-resps/acmp-common.h" +#include "mvrp.h" #include "utils.h" static void on_stream_destroy(void *d) @@ -334,6 +336,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream stream->server = server; stream->direction = direction; + stream->index = index; stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->vlan_id = AVB_DEFAULT_VLAN; @@ -419,6 +422,37 @@ struct stream *server_create_stream(struct server *server, struct stream *stream goto error_free; } + if (direction == SPA_DIRECTION_INPUT) { + struct aecp_aem_stream_input_state *si = + SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); + res = avb_mvrp_attribute_new(server->mvrp, &si->mvrp_attr, + AVB_MVRP_ATTRIBUTE_TYPE_VID); + if (res) { + avb_mrp_attribute_destroy(common->lstream_attr.mrp); + avb_mrp_attribute_destroy(common->tfstream_attr.mrp); + goto error_free; + } + + /* Milan Section 5.3.8.8 / Section 5.4.2.10.1.1: a Listener observes foreign + * Talker Advertise PDUs matching the bound talker's stream_id. + * Create the registrar attribute now (stream_id is set later at + * BIND_RX, cleared at UNBIND_RX) and start its FSM without a + * join — we are an observer, not a declarant. Once a matching TA + * arrives from the wire, msrp.c populates attr.talker + * (accumulated_latency, dest_addr, vlan_id) and moves the + * registrar to IN. The Listener side reads those fields to + * answer GET_STREAM_INFO with real msrp_accumulated_latency. */ + res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr, + AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); + if (res) { + avb_mrp_attribute_destroy(common->lstream_attr.mrp); + avb_mrp_attribute_destroy(common->tfstream_attr.mrp); + avb_mrp_attribute_destroy(si->mvrp_attr.mrp); + goto error_free; + } + avb_mrp_attribute_begin(common->tastream_attr.mrp, 0); + } + if (direction == SPA_DIRECTION_OUTPUT) { res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr, AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); @@ -428,6 +462,12 @@ struct stream *server_create_stream(struct server *server, struct stream *stream goto error_free; } + /* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker + * stream_id so foreign Listener declarations from peers are + * delivered to it via process_listener and observed through + * notify_listener (sets listener_observed on stream_output_state). */ + common->lstream_attr.attr.listener.stream_id = htobe64(stream->id); + common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id); if (server->avb_mode == AVB_MODE_MILAN_V12) common->tastream_attr.attr.talker.tspec_max_frame_size = @@ -458,8 +498,14 @@ void stream_destroy(struct stream *stream) struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); if (stream->direction == SPA_DIRECTION_INPUT) { + struct aecp_aem_stream_input_state *si = + SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); avb_mrp_attribute_destroy(common->lstream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); + if (si->mvrp_attr.mrp) + avb_mrp_attribute_destroy(si->mvrp_attr.mrp); + if (common->tastream_attr.mrp) + avb_mrp_attribute_destroy(common->tastream_attr.mrp); } else { avb_mrp_attribute_destroy(common->tastream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); @@ -591,8 +637,9 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct aecp_aem_stream_input_state *input_stream; input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); - common->lstream_attr.attr.listener.stream_id = htobe64(stream->peer_id); - /* Milan Section 4.3.3.1: Listener starts in AskingFailed; notify_talker + /* lstream_attr.listener.stream_id is already populated by the + * ACMP FSM from PROBE_TX_RESPONSE. Don't overwrite it here. + * Milan Section 4.3.3.1: Listener starts in AskingFailed; notify_talker * promotes to Ready once the Talker Advertise registrar is IN. */ common->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_ASKING_FAILED; avb_mrp_attribute_begin(common->lstream_attr.mrp, now); @@ -614,6 +661,9 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) memcpy(h->src, server->mac_addr, 6); avb_mrp_attribute_begin(common->tastream_attr.mrp, now); avb_mrp_attribute_join(common->tastream_attr.mrp, now, true); + + avb_aecp_aem_mark_stream_info_dirty(server, + AVB_AEM_DESC_STREAM_OUTPUT, stream->index); } pw_stream_set_active(stream->stream, true);