From b2e45b8151f191c98dcdef638470a0fa5fc1e453 Mon Sep 17 00:00:00 2001 From: hackerman-kl Date: Sat, 18 Apr 2026 18:44:02 +0200 Subject: [PATCH] module-avb: milan: es_builder: ensure that the created for the milan differently than for the legacy-avb --- .../acmp-cmds-resps/acmp-milan-v12.c | 29 ++- src/modules/module-avb/aecp-aem-state.h | 4 + src/modules/module-avb/stream.c | 196 +++++++++++++----- 3 files changed, 161 insertions(+), 68 deletions(-) diff --git a/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c b/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c index 2ca19528b..640e1ebc0 100644 --- a/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c +++ b/src/modules/module-avb/acmp-cmds-resps/acmp-milan-v12.c @@ -2169,7 +2169,6 @@ int handle_probe_tx_command_milan_v12(struct acmp *acmp, uint64_t now, struct avb_ethernet_header *h_reply = (struct avb_ethernet_header *)buf; struct avb_packet_acmp *reply = SPA_PTROFF(h_reply, sizeof(*h_reply), void); struct descriptor *desc; - struct aecp_aem_stream_output_state_milan_v12 *stream_out_m; struct aecp_aem_stream_output_state *stream_out; int status = AVB_ACMP_STATUS_SUCCESS; @@ -2186,8 +2185,7 @@ int handle_probe_tx_command_milan_v12(struct acmp *acmp, uint64_t now, goto done; } - stream_out_m = desc->ptr; - stream_out = &stream_out_m->stream_out_sta; + stream_out = desc->ptr; if (!stream_output_on_this_iface(server, stream_out)) { status = AVB_ACMP_STATUS_INCOMPATIBLE_REQUEST; @@ -2205,7 +2203,7 @@ int handle_probe_tx_command_milan_v12(struct acmp *acmp, uint64_t now, } } - stream_out_m->acmp_sta.last_probe_rx_time = (int64_t)now; + stream_out->last_probe_rx_time = (int64_t)now; reply->stream_id = stream_out->common.tastream_attr.attr.talker.stream_id; memcpy(reply->stream_dest_mac, stream_out->common.stream.addr, @@ -2230,7 +2228,7 @@ int handle_disconnect_tx_command_milan_v12(struct acmp *acmp, uint64_t now, struct avb_ethernet_header *h_reply = (struct avb_ethernet_header *)buf; struct avb_packet_acmp *reply = SPA_PTROFF(h_reply, sizeof(*h_reply), void); struct descriptor *desc; - struct aecp_aem_stream_output_state_milan_v12 *stream_out_m; + struct aecp_aem_stream_output_state *stream_out; int status = AVB_ACMP_STATUS_SUCCESS; if (be64toh(p->talker_guid) != server->entity_id) @@ -2246,8 +2244,8 @@ int handle_disconnect_tx_command_milan_v12(struct acmp *acmp, uint64_t now, goto done; } - stream_out_m = desc->ptr; - if (!stream_output_on_this_iface(server, &stream_out_m->stream_out_sta)) + stream_out = desc->ptr; + if (!stream_output_on_this_iface(server, stream_out)) return 0; reply->stream_id = 0; @@ -2271,7 +2269,6 @@ int handle_get_tx_state_command_milan_v12(struct acmp *acmp, uint64_t now, struct avb_ethernet_header *h_reply = (struct avb_ethernet_header *)buf; struct avb_packet_acmp *reply = SPA_PTROFF(h_reply, sizeof(*h_reply), void); struct descriptor *desc; - struct aecp_aem_stream_output_state_milan_v12 *stream_out_m; struct aecp_aem_stream_output_state *stream_out; int status = AVB_ACMP_STATUS_SUCCESS; @@ -2288,8 +2285,7 @@ int handle_get_tx_state_command_milan_v12(struct acmp *acmp, uint64_t now, goto done; } - stream_out_m = desc->ptr; - stream_out = &stream_out_m->stream_out_sta; + stream_out = desc->ptr; if (!stream_output_on_this_iface(server, stream_out)) return 0; @@ -2334,7 +2330,7 @@ void acmp_periodic_milan_v12(struct acmp *acmp, uint64_t now) * within 15 s and no listener attribute is registered. */ for (uint16_t desc_index = 0; desc_index < UINT16_MAX; desc_index++) { struct descriptor *desc; - struct aecp_aem_stream_output_state_milan_v12 *stream_out_m; + struct aecp_aem_stream_output_state *stream_out; struct stream *stream; desc = server_find_descriptor(acmp->server, AVB_AEM_DESC_STREAM_OUTPUT, @@ -2342,19 +2338,18 @@ void acmp_periodic_milan_v12(struct acmp *acmp, uint64_t now) if (desc == NULL) break; - stream_out_m = desc->ptr; - stream = &stream_out_m->stream_out_sta.common.stream; + stream_out = desc->ptr; + stream = &stream_out->common.stream; - if (stream_out_m->acmp_sta.last_probe_rx_time == 0) + if (stream_out->last_probe_rx_time == 0) continue; if (stream->source == NULL) continue; - if (now - (uint64_t)stream_out_m->acmp_sta.last_probe_rx_time - > 15 * SPA_NSEC_PER_SEC) { + if (now - (uint64_t)stream_out->last_probe_rx_time > 15 * SPA_NSEC_PER_SEC) { pw_log_info("talker stream %u: no probe in 15 s, deactivating SRP", desc_index); stream_deactivate(stream, now); - stream_out_m->acmp_sta.last_probe_rx_time = 0; + stream_out->last_probe_rx_time = 0; } } } diff --git a/src/modules/module-avb/aecp-aem-state.h b/src/modules/module-avb/aecp-aem-state.h index 3cd5bf938..830d86e88 100644 --- a/src/modules/module-avb/aecp-aem-state.h +++ b/src/modules/module-avb/aecp-aem-state.h @@ -179,6 +179,10 @@ struct aecp_aem_stream_output_state { struct aecp_aem_stream_output_counters counters; struct stream_common common; + + /** Milan v1.2 Section 4.3.3.1: absolute time of last PROBE_TX_COMMAND received. + * 0 = never received. Reset to 0 when SRP is deactivated. */ + int64_t last_probe_rx_time; }; struct aecp_aem_stream_output_state_milan_v12 { diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index f183fce7b..422b983dc 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -13,6 +13,7 @@ #include #include +#include "aaf.h" #include "iec61883.h" #include "stream.h" #include "aecp-aem-state.h" @@ -88,12 +89,11 @@ static int flush_write(struct stream *stream, uint64_t current_time) { int32_t avail; uint32_t index; - uint64_t ptime, txtime; + uint64_t ptime, txtime; int pdu_count; ssize_t n; struct avb_frame_header *h = (void*)stream->pdu; - struct avb_packet_iec61883 *p = SPA_PTROFF(h, sizeof(*h), void); - uint8_t dbc; + bool is_milan = stream->server->avb_mode == AVB_MODE_MILAN_V12; avail = spa_ringbuffer_get_read_index(&stream->ring, &index); @@ -101,34 +101,63 @@ static int flush_write(struct stream *stream, uint64_t current_time) txtime = current_time + stream->t_uncertainty; ptime = txtime + stream->mtt; - dbc = stream->dbc; - while (pdu_count--) { - *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; + if (is_milan) { + struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); - set_iovec(&stream->ring, - stream->buffer_data, - stream->buffer_size, - index % stream->buffer_size, - &stream->iov[1], stream->payload_size); + while (pdu_count--) { + *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; - p->seq_num = stream->pdu_seq++; - p->tv = 1; - p->timestamp = ptime; - p->dbc = dbc; + set_iovec(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + &stream->iov[1], stream->payload_size); - n = avb_server_stream_send(stream->server, stream, - &stream->msg, MSG_NOSIGNAL); - if (n < 0 || n != (ssize_t)stream->pdu_size) { - pw_log_error("stream send failed %zd != %zd: %m", - n, stream->pdu_size); + p->seq_num = stream->pdu_seq++; + p->tv = 1; + p->timestamp = htonl((uint32_t)ptime); + + n = avb_server_stream_send(stream->server, stream, + &stream->msg, MSG_NOSIGNAL); + if (n < 0 || n != (ssize_t)stream->pdu_size) + pw_log_error("stream send failed %zd != %zd: %m", + n, stream->pdu_size); + txtime += stream->pdu_period; + ptime += stream->pdu_period; + index += stream->payload_size; } - txtime += stream->pdu_period; - ptime += stream->pdu_period; - index += stream->payload_size; - dbc += stream->frames_per_pdu; + } else { + struct avb_packet_iec61883 *p = SPA_PTROFF(h, sizeof(*h), void); + uint8_t dbc = stream->dbc; + + while (pdu_count--) { + *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; + + set_iovec(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + &stream->iov[1], stream->payload_size); + + p->seq_num = stream->pdu_seq++; + p->tv = 1; + p->timestamp = ptime; + p->dbc = dbc; + + n = avb_server_stream_send(stream->server, stream, + &stream->msg, MSG_NOSIGNAL); + if (n < 0 || n != (ssize_t)stream->pdu_size) + pw_log_error("stream send failed %zd != %zd: %m", + n, stream->pdu_size); + txtime += stream->pdu_period; + ptime += stream->pdu_period; + index += stream->payload_size; + dbc += stream->frames_per_pdu; + } + stream->dbc = dbc; } - stream->dbc = dbc; + spa_ringbuffer_read_update(&stream->ring, index); return 0; } @@ -175,36 +204,64 @@ static void on_sink_stream_process(void *data) static void setup_pdu(struct stream *stream) { struct avb_frame_header *h; - struct avb_packet_iec61883 *p; ssize_t payload_size, hdr_size, pdu_size; + bool is_milan = stream->server->avb_mode == AVB_MODE_MILAN_V12; spa_memzero(stream->pdu, sizeof(stream->pdu)); h = (struct avb_frame_header*)stream->pdu; - p = SPA_PTROFF(h, sizeof(*h), void); - hdr_size = sizeof(*h) + sizeof(*p); payload_size = stream->stride * stream->frames_per_pdu; - pdu_size = hdr_size + payload_size; - h->type = htons(0x8100); - h->prio_cfi_id = htons((stream->prio << 13) | stream->vlan_id); - h->etype = htons(0x22f0); + if (is_milan) { + struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); - if (stream->direction == SPA_DIRECTION_OUTPUT) { - p->subtype = AVB_SUBTYPE_61883_IIDC; - p->sv = 1; - p->stream_id = htobe64(stream->id); - p->data_len = htons(payload_size+8); - p->tag = 0x1; - p->channel = 0x1f; - p->tcode = 0xa; - p->sid = 0x3f; - p->dbs = stream->info.info.raw.channels; - p->qi2 = 0x2; - p->format_id = 0x10; - p->fdf = 0x2; - p->syt = htons(0x0008); + hdr_size = sizeof(*h) + sizeof(*p); + pdu_size = hdr_size + payload_size; + + h->type = htons(0x8100); + h->prio_cfi_id = htons((stream->prio << 13) | stream->vlan_id); + h->etype = htons(0x22f0); + + if (stream->direction == SPA_DIRECTION_OUTPUT) { + p->subtype = AVB_SUBTYPE_AAF; + p->sv = 1; + p->stream_id = htobe64(stream->id); + p->format = AVB_AAF_FORMAT_INT_32BIT; + p->nsr = AVB_AAF_PCM_NSR_48KHZ; + p->bit_depth = 32; + p->chan_per_frame = stream->info.info.raw.channels; + p->sp = AVB_AAF_PCM_SP_NORMAL; + p->event = 0; + p->seq_num = 0; + p->data_len = htons(payload_size); + } + } else { + struct avb_packet_iec61883 *p = SPA_PTROFF(h, sizeof(*h), void); + + hdr_size = sizeof(*h) + sizeof(*p); + pdu_size = hdr_size + payload_size; + + h->type = htons(0x8100); + h->prio_cfi_id = htons((stream->prio << 13) | stream->vlan_id); + h->etype = htons(0x22f0); + + if (stream->direction == SPA_DIRECTION_OUTPUT) { + p->subtype = AVB_SUBTYPE_61883_IIDC; + p->sv = 1; + p->stream_id = htobe64(stream->id); + p->data_len = htons(payload_size + 8); + p->tag = 0x1; + p->channel = 0x1f; + p->tcode = 0xa; + p->sid = 0x3f; + p->dbs = stream->info.info.raw.channels; + p->qi2 = 0x2; + p->format_id = 0x10; + p->fdf = 0x2; + p->syt = htons(0x0008); + } } + stream->hdr_size = hdr_size; stream->payload_size = payload_size; stream->pdu_size = pdu_size; @@ -342,7 +399,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id); common->tastream_attr.attr.talker.tspec_max_frame_size = - htons(32 + stream->frames_per_pdu * stream->stride); + htons(stream->server->avb_mode == AVB_MODE_MILAN_V12 + ? (uint16_t)stream->pdu_size + : (uint16_t)(32 + stream->frames_per_pdu * stream->stride)); common->tastream_attr.attr.talker.tspec_max_interval_frames = htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT); common->tastream_attr.attr.talker.priority = stream->prio; @@ -379,6 +438,28 @@ static int setup_socket(struct stream *stream) return avb_server_stream_setup_socket(stream->server, stream); } +static void handle_aaf_packet(struct stream *stream, + struct avb_packet_aaf *p, int len) +{ + uint32_t index, n_bytes; + int32_t filled; + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + n_bytes = ntohs(p->data_len); + + if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { + pw_log_debug("capture overrun"); + } else { + spa_ringbuffer_write_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + p->payload, n_bytes); + index += n_bytes; + spa_ringbuffer_write_update(&stream->ring, index); + } +} + static void handle_iec61883_packet(struct stream *stream, struct avb_packet_iec61883 *p, int len) { @@ -427,13 +508,26 @@ static void on_socket_data(void *data, int fd, uint32_t mask) sizeof(struct avb_packet_iec61883))); } else { struct avb_ethernet_header *h = (void*)buffer; - struct avb_packet_iec61883 *p = SPA_PTROFF(h, sizeof(*h), void); + struct avb_packet_header *ph = SPA_PTROFF(h, sizeof(*h), void); - if (memcmp(h->dest, stream->addr, 6) != 0 || - p->subtype != AVB_SUBTYPE_61883_IIDC) + if (memcmp(h->dest, stream->addr, 6) != 0) return; - handle_iec61883_packet(stream, p, len - sizeof(*h)); + switch (ph->subtype) { + case AVB_SUBTYPE_AAF: + handle_aaf_packet(stream, + (struct avb_packet_aaf *)ph, + len - (int)sizeof(*h)); + break; + case AVB_SUBTYPE_61883_IIDC: + handle_iec61883_packet(stream, + (struct avb_packet_iec61883 *)ph, + len - (int)sizeof(*h)); + break; + default: + pw_log_warn("unsupported subtype 0x%02x", ph->subtype); + break; + } } } }