diff --git a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c index 09915167b..793e6f909 100644 --- a/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c +++ b/src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c @@ -360,6 +360,8 @@ static void emit_avb_interface_counters(struct aecp *aecp, uint16_t desc_index, * here, gated by last_counters_emit_ns. */ #define COUNTER_UNSOL_MIN_INTERVAL_NS ((int64_t)SPA_NSEC_PER_SEC) +#define MEDIA_UNLOCK_TIMEOUT_NS ((int64_t)(2 * SPA_NSEC_PER_MSEC)) + static bool counter_rate_limit_elapsed(int64_t now, int64_t last_emit) { if (last_emit == 0) @@ -386,18 +388,34 @@ void cmd_get_counters_periodic_milan_v12(struct aecp *aecp, int64_t now) ifs->last_counters_emit_ns = now; } } - for (i = 0; i < UINT16_MAX; i++) { - struct descriptor *d = server_find_descriptor(server, - AVB_AEM_DESC_STREAM_INPUT, i); - struct aecp_aem_stream_input_state *si; - if (d == NULL) - break; - si = d->ptr; - if (si->counters_dirty && - counter_rate_limit_elapsed(now, si->last_counters_emit_ns)) { - emit_stream_input_counters(aecp, i, si); - si->counters_dirty = false; - si->last_counters_emit_ns = now; + { + struct timespec mono_ts; + int64_t mono_now = 0; + clock_gettime(CLOCK_MONOTONIC, &mono_ts); + mono_now = SPA_TIMESPEC_TO_NSEC(&mono_ts); + + for (i = 0; i < UINT16_MAX; i++) { + struct descriptor *d = server_find_descriptor(server, + AVB_AEM_DESC_STREAM_INPUT, i); + struct aecp_aem_stream_input_state *si; + if (d == NULL) + break; + si = d->ptr; + + if (si->media_locked_state && + si->last_frame_rx_ns != 0 && + (mono_now - si->last_frame_rx_ns) > MEDIA_UNLOCK_TIMEOUT_NS) { + si->counters.media_unlocked++; + si->media_locked_state = false; + si->counters_dirty = true; + } + + if (si->counters_dirty && + counter_rate_limit_elapsed(now, si->last_counters_emit_ns)) { + emit_stream_input_counters(aecp, i, si); + si->counters_dirty = false; + si->last_counters_emit_ns = now; + } } } for (i = 0; i < UINT16_MAX; i++) { diff --git a/src/modules/module-avb/aecp-aem-state.h b/src/modules/module-avb/aecp-aem-state.h index 64296beba..21c5743c8 100644 --- a/src/modules/module-avb/aecp-aem-state.h +++ b/src/modules/module-avb/aecp-aem-state.h @@ -172,6 +172,13 @@ struct aecp_aem_stream_input_state { /* Milan Section 5.4.5 counter unsolicited rate-limit (see avb_interface_state). */ bool counters_dirty; int64_t last_counters_emit_ns; + + /* Milan Section 5.4.5.3 / Table 5.16: MEDIA_LOCKED ticks on the first valid + * AVTPDU after a silence gap; MEDIA_UNLOCKED ticks when the gap + * exceeds AVB_MEDIA_UNLOCK_TIMEOUT_NS. last_frame_rx_ns tracks the + * most recent valid PDU; media_locked_state is the current edge. */ + int64_t last_frame_rx_ns; + bool media_locked_state; }; struct acmp_stream_status_milan_v12 { diff --git a/src/modules/module-avb/aecp-aem-types.h b/src/modules/module-avb/aecp-aem-types.h index b1ce23a83..68da03894 100644 --- a/src/modules/module-avb/aecp-aem-types.h +++ b/src/modules/module-avb/aecp-aem-types.h @@ -105,6 +105,8 @@ #define AVB_AECP_AEM_CMD_SET_STREAM_BACKUP 0x0049 #define AVB_AECP_AEM_CMD_GET_STREAM_BACKUP 0x004a #define AVB_AECP_AEM_CMD_GET_DYNAMIC_INFO 0x004b +#define AVB_AECP_AEM_CMD_SET_MAX_TRANSIT_TIME 0x004c +#define AVB_AECP_AEM_CMD_GET_MAX_TRANSIT_TIME 0x004d #define AVB_AECP_AEM_CMD_EXPANSION 0x7fff #define AVB_AEM_ACQUIRE_ENTITY_PERSISTENT_FLAG (1<<0) diff --git a/src/modules/module-avb/aecp-aem.c b/src/modules/module-avb/aecp-aem.c index 90402e304..d3b204c76 100644 --- a/src/modules/module-avb/aecp-aem.c +++ b/src/modules/module-avb/aecp-aem.c @@ -25,6 +25,8 @@ #include "aecp-aem-cmds-resps/cmd-get-set-stream-info.h" #include "aecp-aem-cmds-resps/cmd-start-stop-streaming.h" #include "aecp-aem-cmds-resps/cmd-get-counters.h" +#include "aecp-aem-cmds-resps/cmd-get-as-path.h" +#include "aecp-aem-cmds-resps/cmd-get-set-max-transit-time.h" /* ACQUIRE_ENTITY */ @@ -175,7 +177,10 @@ static int handle_get_avb_info_common(struct aecp *aecp, int64_t now, i->gptp_grandmaster_id = avb_interface->clock_identity; i->propagation_delay = htonl(0); i->gptp_domain_number = avb_interface->domain_number; - i->flags = 0; + /* IEEE 1722.1-2021 Section 7.4.40: GPTP_ENABLED / GPTP_GRANDMASTER_SUPPORTED + * stay 0 until the gPTP interface lands. SRP_ENABLED is on because + * MSRP is running on every AVB interface this module manages. */ + i->flags = AVB_AEM_AVB_INFO_FLAG_SRP_ENABLED; i->msrp_mappings_count = htons(0); return avb_server_send_packet(server, h->src, AVB_TSN_ETH, buf, size); @@ -234,7 +239,35 @@ static const char * const cmd_names[] = { [AVB_AECP_AEM_CMD_GET_VIDEO_MAP] = "get-video-map", [AVB_AECP_AEM_CMD_ADD_VIDEO_MAPPINGS] = "add-video-mappings", [AVB_AECP_AEM_CMD_REMOVE_VIDEO_MAPPINGS] = "remove-video-mappings", - [AVB_AECP_AEM_CMD_GET_SENSOR_MAP] = "get-sensor-map" + [AVB_AECP_AEM_CMD_GET_SENSOR_MAP] = "get-sensor-map", + [AVB_AECP_AEM_CMD_ADD_SENSOR_MAPPINGS] = "add-sensor-mappings", + [AVB_AECP_AEM_CMD_REMOVE_SENSOR_MAPPINGS] = "remove-sensor-mappings", + [AVB_AECP_AEM_CMD_START_OPERATION] = "start-operation", + [AVB_AECP_AEM_CMD_ABORT_OPERATION] = "abort-operation", + [AVB_AECP_AEM_CMD_OPERATION_STATUS] = "operation-status", + [AVB_AECP_AEM_CMD_AUTH_ADD_KEY] = "auth-add-key", + [AVB_AECP_AEM_CMD_AUTH_DELETE_KEY] = "auth-delete-key", + [AVB_AECP_AEM_CMD_AUTH_GET_KEY_LIST] = "auth-get-key-list", + [AVB_AECP_AEM_CMD_AUTH_GET_KEY] = "auth-get-key", + [AVB_AECP_AEM_CMD_AUTH_ADD_KEY_TO_CHAIN] = "auth-add-key-to-chain", + [AVB_AECP_AEM_CMD_AUTH_DELETE_KEY_FROM_CHAIN] = "auth-delete-key-from-chain", + [AVB_AECP_AEM_CMD_AUTH_GET_KEYCHAIN_LIST] = "auth-get-keychain-list", + [AVB_AECP_AEM_CMD_AUTH_GET_IDENTITY] = "auth-get-identity", + [AVB_AECP_AEM_CMD_AUTH_ADD_TOKEN] = "auth-add-token", + [AVB_AECP_AEM_CMD_AUTH_DELETE_TOKEN] = "auth-delete-token", + [AVB_AECP_AEM_CMD_AUTHENTICATE] = "authenticate", + [AVB_AECP_AEM_CMD_DEAUTHENTICATE] = "deauthenticate", + [AVB_AECP_AEM_CMD_ENABLE_TRANSPORT_SECURITY] = "enable-transport-security", + [AVB_AECP_AEM_CMD_DISABLE_TRANSPORT_SECURITY] = "disable-transport-security", + [AVB_AECP_AEM_CMD_ENABLE_STREAM_ENCRYPTION] = "enable-stream-encryption", + [AVB_AECP_AEM_CMD_DISABLE_STREAM_ENCRYPTION] = "disable-stream-encryption", + [AVB_AECP_AEM_CMD_SET_MEMORY_OBJECT_LENGTH] = "set-memory-object-length", + [AVB_AECP_AEM_CMD_GET_MEMORY_OBJECT_LENGTH] = "get-memory-object-length", + [AVB_AECP_AEM_CMD_SET_STREAM_BACKUP] = "set-stream-backup", + [AVB_AECP_AEM_CMD_GET_STREAM_BACKUP] = "get-stream-backup", + [AVB_AECP_AEM_CMD_GET_DYNAMIC_INFO] = "get-dynamic-info", + [AVB_AECP_AEM_CMD_SET_MAX_TRANSIT_TIME] = "set-max-transit-time", + [AVB_AECP_AEM_CMD_GET_MAX_TRANSIT_TIME] = "get-max-transit-time", }; /* AEM_COMMAND */ @@ -387,6 +420,21 @@ static const struct cmd_info cmd_info_milan_v12[] = { /* Milan v1.2 Section 5.4.2.25 */ AECP_AEM_HANDLE_CMD(AVB_AECP_AEM_CMD_GET_COUNTERS, true, handle_cmd_get_counters_milan_v12), + + /* IEEE 1722.1-2021 Section 7.4.41. Returns NOT_IMPLEMENTED until gPTP lands; + * dispatched here so the response payload size matches what the + * controller computed (4-byte command echo). */ + AECP_AEM_HANDLE_CMD(AVB_AECP_AEM_CMD_GET_AS_PATH, true, + handle_cmd_get_as_path_milan_v12), + + /* SET_MAX_TRANSIT_TIME = 0x004C, GET_MAX_TRANSIT_TIME = 0x004D + * (relocated from IEEE 1722.1-2021 originals 0x004B/0x004C because + * 0x004B is GET_DYNAMIC_INFO in this stack). GET reflects the MSRP + * accumulated_latency floor; SET rejects values below it. */ + AECP_AEM_HANDLE_CMD(AVB_AECP_AEM_CMD_SET_MAX_TRANSIT_TIME, false, + handle_cmd_set_max_transit_time_milan_v12), + AECP_AEM_HANDLE_CMD(AVB_AECP_AEM_CMD_GET_MAX_TRANSIT_TIME, true, + handle_cmd_get_max_transit_time_milan_v12), }; static const struct { diff --git a/src/modules/module-avb/descriptors.c b/src/modules/module-avb/descriptors.c index 1781ef8af..f983cb191 100644 --- a/src/modules/module-avb/descriptors.c +++ b/src/modules/module-avb/descriptors.c @@ -188,6 +188,41 @@ static void init_descriptor_legacy_avb(struct server *server) es_builder_add_descriptor(server, AVB_AEM_DESC_STREAM_INPUT, 0, sizeof(stream_input_0), &stream_input_0); + /* CRF clock-reference stream input (Milan v1.2 Section 5.4.x / IEEE 1722-2016 + * Section 10). One format only. No audio cluster mapping — CRF carries clock + * timestamps, not media. Constants are pre-defined in + * entity-model-milan-v12.h. */ + struct { + struct avb_aem_desc_stream desc; + uint64_t stream_formats[DSC_STREAM_INPUT_CRF_NUMBER_OF_FORMATS]; + } __attribute__ ((__packed__)) stream_input_1_crf = + { + { + .object_name = DSC_STREAM_INPUT_CRF_OBJECT_NAME, + .localized_description = htons(DSC_STREAM_INPUT_CRF_LOCALIZED_DESCRIPTION), + .clock_domain_index = htons(DSC_STREAM_INPUT_CRF_CLOCK_DOMAIN_INDEX), + .stream_flags = htons(DSC_STREAM_INPUT_CRF_STREAM_FLAGS), + .current_format = htobe64(DSC_STREAM_INPUT_CRF_CURRENT_FORMAT), + .formats_offset = htons(DSC_STREAM_INPUT_CRF_FORMATS_OFFSET), + .number_of_formats = htons(DSC_STREAM_INPUT_CRF_NUMBER_OF_FORMATS), + .backup_talker_entity_id_0 = htobe64(0), + .backup_talker_unique_id_0 = htons(0), + .backup_talker_entity_id_1 = htobe64(0), + .backup_talker_unique_id_1 = htons(0), + .backup_talker_entity_id_2 = htobe64(0), + .backup_talker_unique_id_2 = htons(0), + .backedup_talker_entity_id = htobe64(0), + .backedup_talker_unique = htons(0), + .avb_interface_index = htons(DSC_STREAM_INPUT_CRF_AVB_INTERFACE_INDEX), + .buffer_length = htonl(DSC_STREAM_INPUT_CRF_BUFFER_LENGTH_IN_NS) + }, + .stream_formats = { + htobe64(DSC_STREAM_INPUT_CRF_FORMATS_0), + }, + }; + es_builder_add_descriptor(server, AVB_AEM_DESC_STREAM_INPUT, 1, + sizeof(stream_input_1_crf), &stream_input_1_crf); + struct { struct avb_aem_desc_stream desc; uint64_t stream_formats[6]; diff --git a/src/modules/module-avb/entity-model-milan-v12.h b/src/modules/module-avb/entity-model-milan-v12.h index f729326db..49181b20a 100644 --- a/src/modules/module-avb/entity-model-milan-v12.h +++ b/src/modules/module-avb/entity-model-milan-v12.h @@ -98,6 +98,10 @@ #define DSC_CONFIGURATION_DESCRIPTOR_COUNTS_OFFSET 74 #define DSC_CONFIGURATION_NO_OF_AUDIO_UNITS 1 +/* Two STREAM_INPUTs: index 0 = audio (AAF/IEC 61883-6), index 1 = CRF media + * clock reference (IEEE 1722-2016 Section 10). Today only index 0 is built in + * descriptors.c — index 1 (CRF) is still TODO; until it lands, Hive will get + * NO_SUCH_DESCRIPTOR on a READ_DESCRIPTOR for STREAM_INPUT/1. */ #define DSC_CONFIGURATION_NO_OF_STREAM_INPUTS 2 #define DSC_CONFIGURATION_NO_OF_STREAM_OUTPUTS 1 diff --git a/src/modules/module-avb/es-builder.c b/src/modules/module-avb/es-builder.c index f652b1490..b56c37e74 100644 --- a/src/modules/module-avb/es-builder.c +++ b/src/modules/module-avb/es-builder.c @@ -4,6 +4,7 @@ #include "es-builder.h" +#include "aecp-aem.h" #include "aecp-aem-state.h" #include "utils.h" @@ -28,9 +29,11 @@ static struct descriptor *es_buidler_desc_stream_general_prepare(struct server * struct descriptor *desc; struct stream *stream; enum spa_direction direction; + uint64_t out_mtt_ns = 0; if (type == AVB_AEM_DESC_STREAM_INPUT) { struct aecp_aem_stream_input_state_milan_v12 *w; + const struct avb_aem_desc_stream *body = ptr; desc = server_add_descriptor(server, type, index, sizeof(*w), size, ptr); @@ -42,6 +45,12 @@ static struct descriptor *es_buidler_desc_stream_general_prepare(struct server * w = desc->ptr; /* Milan v1.2 Section 5.3.8.7: started/stopped state defaults to started. */ w->stream_in_sta.started = true; + + struct avb_aem_stream_format_info fi; + avb_aem_stream_format_decode(body->current_format, &fi); + if (fi.kind == AVB_AEM_STREAM_FORMAT_KIND_CRF) + return desc; + stream = &w->stream_in_sta.common.stream; direction = SPA_DIRECTION_INPUT; } else if (type == AVB_AEM_DESC_STREAM_OUTPUT) { @@ -57,6 +66,8 @@ static struct descriptor *es_buidler_desc_stream_general_prepare(struct server * w = desc->ptr; /* Milan v1.2 Section 5.3.7.6: default presentation time offset is 2 ms. */ w->stream_out_sta.presentation_time_offset_ns = 2000000; + w->stream_out_sta.max_transit_time_ns = 2000000; + out_mtt_ns = w->stream_out_sta.max_transit_time_ns; stream = &w->stream_out_sta.common.stream; direction = SPA_DIRECTION_OUTPUT; } else { @@ -69,6 +80,9 @@ static struct descriptor *es_buidler_desc_stream_general_prepare(struct server * return NULL; } + if (direction == SPA_DIRECTION_OUTPUT) + stream->mtt = (int)out_mtt_ns; + return desc; } diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index 87b669eae..0cbae95ee 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -2,6 +2,112 @@ /* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ /* SPDX-License-Identifier: MIT */ +/* + * stream.c — AVTP stream data plane. + * + * Each STREAM_INPUT and STREAM_OUTPUT descriptor in the AEM model owns a + * `struct stream` here. The stream wraps both: + * - a PipeWire `pw_stream` (so audio reaches/leaves the local node graph + * as an `avb.source`/`avb.sink` Audio/Source or Audio/Sink), and + * - a raw AF_PACKET socket on the AVB interface (for AVTP frames on the + * wire at ethertype 0x22f0). + * + * Direction map (PipeWire vs AVB): + * AVB STREAM_OUTPUT (talker) = PipeWire AUDIO/SINK + * PipeWire pushes samples in via on_sink_stream_process(); we send + * AVTP frames to a MAAP-allocated dest_mac. + * AVB STREAM_INPUT (listener) = PipeWire AUDIO/SOURCE + * We receive AVTP frames into the ringbuffer; PipeWire pulls samples + * out via on_source_stream_process(). + * + * -------------------------------------------------------------------------- + * TX heartbeat (output direction) + * -------------------------------------------------------------------------- + * + * Why a timer drives flush_write_* instead of the PipeWire process tick: + * + * The AVTP wire schedule is dictated by the talker — frames must leave + * every `pdu_period` (= SPA_NSEC_PER_SEC * frames_per_pdu / sample_rate; + * 125 µs at 48 kHz / 6 frames). A bound listener computes its own + * presentation_time relative to those wire arrivals and expects them to + * keep coming. If we tied flush_write to PipeWire's process callback we + * would only emit frames when an upstream PipeWire node feeds samples; + * the moment nothing is connected to avb.sink-N (the common case during + * bring-up, conformance testing, or whenever the user's audio app hasn't + * started yet), the wire goes silent, the listener's media_locked + * counter stays at 0, and Milan Section 4.3.3.1 / Hive treat the talker as + * absent. + * + * So an output stream owns its own periodic timer (`flush_timer`, + * AVB_FLUSH_TICK_NS = 1 ms = 8 PDUs). Each tick: + * + * 1. computes how many PDUs are owed since the last drain + * (`(now - flush_last_ns) / pdu_period`), + * 2. tops up the ringbuffer with zero samples if PipeWire hasn't + * kept up (`pad_ringbuffer_with_silence`), and + * 3. drains via flush_write_milan_v12 / flush_write_legacy. + * + * When PipeWire IS connected and feeding samples in time, step 2 + * no-ops because filled ≥ needed — the timer just becomes the metronome. + * When PipeWire is silent or under-runs, step 2 fills the deficit with + * zeros so the wire keeps a valid AVTP frame coming. Listeners receive + * silent (but spec-correct, tv=1) frames and remain locked. + * + * on_sink_stream_process() therefore only writes into the ringbuffer; it + * no longer calls flush_write_*. Calling both would double-send each + * PDU. + * + * -------------------------------------------------------------------------- + * Counter unsolicited notifications + * -------------------------------------------------------------------------- + * + * The data-plane sites in this file (flush_write_*, handle_aaf_packet, + * handle_iec61883_packet, stream_activate, stream_deactivate) increment + * the per-descriptor counters in `aecp_aem_stream_input_counters` / + * `aecp_aem_stream_output_counters` and mark `counters_dirty = true` on + * the descriptor's state via stream_in_mark_counters_dirty() and + * stream_out_mark_counters_dirty(). + * + * The AECP periodic in cmd-get-counters.c (cmd_get_counters_periodic_milan_v12) + * scans descriptors at the server-tick rate (~100 ms) and, for each + * dirty descriptor where COUNTER_UNSOL_MIN_INTERVAL_NS (= 1 s) has + * elapsed since the last emit, sends one unsolicited GET_COUNTERS + * RESPONSE with the *current* values and clears the dirty flag. + * + * Net effect: a counter that ticks 1000 times in a second produces ONE + * unsolicited notification per second per descriptor, carrying the + * latest aggregate count (since the read happens at emit time). A + * counter that doesn't change produces no notification — Hive's GET_COUNTERS + * refresh still sees the latest values via the synchronous handler. + * + * Per-counter wiring status (Milan Section 5.4.5.3, Table 5.16 Stream Input): + * FRAMES_RX live: handle_aaf_packet / handle_iec61883_packet + * STREAM_INTERRUPTED live: ringbuffer overrun in the same handlers + * MEDIA_LOCKED live: first-frame edge in handle_*_packet + * MEDIA_UNLOCKED live: cmd-get-counters periodic when last_frame_rx_ns + * ages past MEDIA_UNLOCK_TIMEOUT_NS + * SEQ_NUM_MISMATCH TODO: compare p->seq_num against expected (last + 1 + * modulo 256), tick on mismatch and resync expected + * MEDIA_RESET_IN TODO: tick when AVTPDU header sets the mr bit + * (header reset notification) + * TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header + * UNSUPPORTED_FORMAT TODO: tick when subtype/format mismatch the bound + * descriptor's current_format + * LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now + * (frame missed its presentation deadline) + * EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time + * (frame arrived too far ahead of its deadline) + * Table 5.17 Stream Output: + * FRAMES_TX live: per send in flush_write_milan_v12 / _legacy + * STREAM_START live: stream_activate (first activation per session) + * STREAM_STOP live: stream_deactivate + * MEDIA_RESET_OUT TODO: tick when the AVTPDU mr bit is asserted by us + * TIMESTAMP_UNCERTAIN_OUT TODO: tick when we set tu in an outgoing frame + * (e.g. PipeWire underrun forced silent fill) + * + * -------------------------------------------------------------------------- + */ + #include #include #include @@ -22,6 +128,42 @@ #include "mvrp.h" #include "utils.h" +static inline struct aecp_aem_stream_input_state *stream_in_state(struct stream *s) +{ + struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream); + return SPA_CONTAINER_OF(c, struct aecp_aem_stream_input_state, common); +} +static inline struct aecp_aem_stream_input_counters *stream_in_counters(struct stream *s) +{ + return &stream_in_state(s)->counters; +} +static inline struct aecp_aem_stream_output_counters *stream_out_counters(struct stream *s) +{ + struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream); + struct aecp_aem_stream_output_state *so = + SPA_CONTAINER_OF(c, struct aecp_aem_stream_output_state, common); + return &so->counters; +} +static inline void stream_in_mark_counters_dirty(struct stream *s) +{ + struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream); + struct aecp_aem_stream_input_state *si = + SPA_CONTAINER_OF(c, struct aecp_aem_stream_input_state, common); + si->counters_dirty = true; +} +static inline void stream_out_mark_counters_dirty(struct stream *s) +{ + struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream); + struct aecp_aem_stream_output_state *so = + SPA_CONTAINER_OF(c, struct aecp_aem_stream_output_state, common); + so->counters_dirty = true; +} + +#define AVB_FLUSH_TICK_NS ((uint64_t)1000000) + +static int flush_write_milan_v12(struct stream *stream, uint64_t current_time); +static int flush_write_legacy(struct stream *stream, uint64_t current_time); + static void on_stream_destroy(void *d) { struct stream *stream = d; @@ -29,6 +171,77 @@ static void on_stream_destroy(void *d) stream->stream = NULL; } +static void pad_ringbuffer_with_silence(struct stream *stream, int owed) +{ + uint32_t index; + int32_t filled; + size_t needed; + size_t deficit; + size_t off; + void *base; + + if (owed <= 0) + return; + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + if (filled < 0) + filled = 0; + + needed = (size_t)owed * stream->stride * stream->frames_per_pdu; + if ((size_t)filled >= needed) + return; + + deficit = needed - (size_t)filled; + if ((size_t)filled + deficit > stream->buffer_size) + deficit = stream->buffer_size - (size_t)filled; + + off = index % stream->buffer_size; + base = stream->buffer_data; + + if (off + deficit <= stream->buffer_size) { + memset(SPA_PTROFF(base, off, void), 0, deficit); + } else { + size_t tail = stream->buffer_size - off; + memset(SPA_PTROFF(base, off, void), 0, tail); + memset(base, 0, deficit - tail); + } + spa_ringbuffer_write_update(&stream->ring, index + (uint32_t)deficit); +} + +static void on_flush_tick(void *data, uint64_t expirations) +{ + struct stream *stream = data; + struct server *server = stream->server; + struct timespec now_ts; + uint64_t now_ns; + int owed; + + (void)expirations; + + if (clock_gettime(CLOCK_TAI, &now_ts) < 0) + return; + now_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); + + if (stream->flush_last_ns == 0) { + stream->flush_last_ns = now_ns; + return; + } + if (stream->pdu_period == 0) + return; + + owed = (int)((now_ns - stream->flush_last_ns) / (uint64_t)stream->pdu_period); + if (owed <= 0) + return; + stream->flush_last_ns += (uint64_t)owed * (uint64_t)stream->pdu_period; + + pad_ringbuffer_with_silence(stream, owed); + + if (server->avb_mode == AVB_MODE_MILAN_V12) + flush_write_milan_v12(stream, now_ns); + else + flush_write_legacy(stream, now_ns); +} + static void on_source_stream_process(void *data) { struct stream *stream = data; @@ -122,11 +335,14 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time) if (n < 0 || n != (ssize_t)stream->pdu_size) pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); + else + stream_out_counters(stream)->frame_tx++; txtime += stream->pdu_period; ptime += stream->pdu_period; index += stream->payload_size; } + stream_out_mark_counters_dirty(stream); spa_ringbuffer_read_update(&stream->ring, index); return 0; } @@ -168,6 +384,8 @@ static int flush_write_legacy(struct stream *stream, uint64_t current_time) if (n < 0 || n != (ssize_t)stream->pdu_size) pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); + else + stream_out_counters(stream)->frame_tx++; txtime += stream->pdu_period; ptime += stream->pdu_period; index += stream->payload_size; @@ -175,6 +393,7 @@ static int flush_write_legacy(struct stream *stream, uint64_t current_time) } stream->dbc = dbc; + stream_out_mark_counters_dirty(stream); spa_ringbuffer_read_update(&stream->ring, index); return 0; } @@ -186,7 +405,6 @@ static void on_sink_stream_process(void *data) struct spa_data *d; int32_t filled; uint32_t index, offs, avail, size; - struct timespec now; if ((buf = pw_stream_dequeue_buffer(stream->stream)) == NULL) { pw_log_debug("out of buffers: %m"); @@ -214,11 +432,6 @@ static void on_sink_stream_process(void *data) } pw_stream_queue_buffer(stream->stream, buf); - clock_gettime(CLOCK_TAI, &now); - if (stream->server->avb_mode == AVB_MODE_MILAN_V12) - flush_write_milan_v12(stream, SPA_TIMESPEC_TO_NSEC(&now)); - else - flush_write_legacy(stream, SPA_TIMESPEC_TO_NSEC(&now)); } static void setup_pdu_milan_v12(struct stream *stream) @@ -339,6 +552,11 @@ struct stream *server_create_stream(struct server *server, struct stream *stream stream->index = index; stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->vlan_id = AVB_DEFAULT_VLAN; + stream->mtt = 2000000; + /* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is + * the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe + * default until we have a way to measure it from gPTP. */ + stream->t_uncertainty = 125000; stream->id = (uint64_t)server->mac_addr[0] << 56 | (uint64_t)server->mac_addr[1] << 48 | @@ -380,11 +598,25 @@ struct stream *server_create_stream(struct server *server, struct stream *stream &sink_stream_events, stream); - stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE; - stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED; - stream->info.info.raw.rate = 48000; - stream->info.info.raw.channels = 8; - stream->stride = stream->info.info.raw.channels * 4; + { + uint16_t desc_type = (direction == SPA_DIRECTION_INPUT) + ? AVB_AEM_DESC_STREAM_INPUT + : AVB_AEM_DESC_STREAM_OUTPUT; + struct descriptor *desc = server_find_descriptor(server, desc_type, index); + struct avb_aem_desc_stream *body = + desc ? descriptor_body(desc) : NULL; + struct avb_aem_stream_format_info fi = { 0 }; + + stream->format = body ? body->current_format : 0; + if (stream->format) + avb_aem_stream_format_decode(stream->format, &fi); + + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE; + stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED; + stream->info.info.raw.rate = fi.is_audio && fi.rate ? fi.rate : 48000; + stream->info.info.raw.channels = fi.is_audio && fi.channels ? fi.channels : 8; + stream->stride = stream->info.info.raw.channels * 4; + } n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); @@ -520,28 +752,53 @@ static int setup_socket(struct stream *stream) static void handle_aaf_packet(struct stream *stream, struct avb_packet_aaf *p, int len) { + struct aecp_aem_stream_input_state *si = stream_in_state(stream); + struct aecp_aem_stream_input_counters *cnt = &si->counters; + struct timespec now_ts; uint32_t index, n_bytes; int32_t filled; filled = spa_ringbuffer_get_write_index(&stream->ring, &index); n_bytes = ntohs(p->data_len); - if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { - pw_log_debug("capture overrun"); - } else { - spa_ringbuffer_write_data(&stream->ring, - stream->buffer_data, - stream->buffer_size, - index % stream->buffer_size, - p->payload, n_bytes); - index += n_bytes; - spa_ringbuffer_write_update(&stream->ring, index); + /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every valid + * AVTPDU received on the wire — independent of whether the listener + * pipeline could absorb it. A ringbuffer overrun is a separate event + * that bumps STREAM_INTERRUPTED. Counting both unconditionally keeps + * Hive's dashboard meaningful even when no PipeWire consumer is + * draining the source side. */ + cnt->frame_rx++; + + clock_gettime(CLOCK_MONOTONIC, &now_ts); + si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); + if (!si->media_locked_state) { + cnt->media_locked++; + si->media_locked_state = true; } + + if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { + uint32_t r_index; + spa_ringbuffer_get_read_index(&stream->ring, &r_index); + spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes); + cnt->stream_interrupted++; + filled -= n_bytes; + } + spa_ringbuffer_write_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + p->payload, n_bytes); + index += n_bytes; + spa_ringbuffer_write_update(&stream->ring, index); + stream_in_mark_counters_dirty(stream); } static void handle_iec61883_packet(struct stream *stream, struct avb_packet_iec61883 *p, int len) { + struct aecp_aem_stream_input_state *si = stream_in_state(stream); + struct aecp_aem_stream_input_counters *cnt = &si->counters; + struct timespec now_ts; uint32_t index, n_bytes; uint16_t data_len; int32_t filled; @@ -554,17 +811,30 @@ static void handle_iec61883_packet(struct stream *stream, if (n_bytes > (uint32_t)(len - (int)sizeof(*p))) return; - if (filled + n_bytes > stream->buffer_size) { - pw_log_debug("capture overrun"); - } else { - spa_ringbuffer_write_data(&stream->ring, - stream->buffer_data, - stream->buffer_size, - index % stream->buffer_size, - p->payload, n_bytes); - index += n_bytes; - spa_ringbuffer_write_update(&stream->ring, index); + cnt->frame_rx++; + + clock_gettime(CLOCK_MONOTONIC, &now_ts); + si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts); + if (!si->media_locked_state) { + cnt->media_locked++; + si->media_locked_state = true; } + + if (filled + n_bytes > stream->buffer_size) { + uint32_t r_index; + spa_ringbuffer_get_read_index(&stream->ring, &r_index); + spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes); + cnt->stream_interrupted++; + filled -= n_bytes; + } + spa_ringbuffer_write_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + p->payload, n_bytes); + index += n_bytes; + spa_ringbuffer_write_update(&stream->ring, index); + stream_in_mark_counters_dirty(stream); } static void on_socket_data(void *data, int fd, uint32_t mask) @@ -668,6 +938,31 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) pw_stream_set_active(stream->stream, true); + /* Milan Table 5.17: STREAM_START counter ticks each time the stream + * transitions from stopped → started. */ + if (stream->direction == SPA_DIRECTION_OUTPUT) { + stream_out_counters(stream)->stream_start++; + stream_out_mark_counters_dirty(stream); + + if (stream->flush_timer == NULL) { + struct timespec value = { + .tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC), + .tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC), + }; + struct timespec interval = value; + stream->flush_last_ns = 0; + stream->flush_timer = pw_loop_add_timer(server->impl->loop, + on_flush_tick, stream); + if (stream->flush_timer) + pw_loop_update_timer(server->impl->loop, + stream->flush_timer, + &value, &interval, false); + else + pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)", + stream); + } + } + return 0; } @@ -682,15 +977,27 @@ int stream_deactivate(struct stream *stream, uint64_t now) pw_loop_destroy_source(stream->server->impl->loop, stream->source); stream->source = NULL; } + if (stream->flush_timer != NULL) { + pw_loop_destroy_source(stream->server->impl->loop, stream->flush_timer); + stream->flush_timer = NULL; + stream->flush_last_ns = 0; + } #if 0 avb_mrp_attribute_leave(stream->vlan_attr->mrp, now); -#endif // +#endif // if (stream->direction == SPA_DIRECTION_INPUT) avb_mrp_attribute_leave(common->lstream_attr.mrp, now); else avb_mrp_attribute_leave(common->tastream_attr.mrp, now); + /* Milan Table 5.17: STREAM_STOP counter ticks each transition the + * other way. */ + if (stream->direction == SPA_DIRECTION_OUTPUT) { + stream_out_counters(stream)->stream_stop++; + stream_out_mark_counters_dirty(stream); + } + return 0; } diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h index ede0ece8d..87c623732 100644 --- a/src/modules/module-avb/stream.h +++ b/src/modules/module-avb/stream.h @@ -35,6 +35,8 @@ struct stream { int vlan_id; struct spa_source *source; + struct spa_source *flush_timer; + uint64_t flush_last_ns; int prio; int mtt; int t_uncertainty; diff --git a/src/tools/meson.build b/src/tools/meson.build index dc88dc93a..ef5afe4fc 100644 --- a/src/tools/meson.build +++ b/src/tools/meson.build @@ -135,6 +135,8 @@ if build_avb_virtual '../modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-stream-info.c', '../modules/module-avb/aecp-aem-cmds-resps/cmd-start-stop-streaming.c', '../modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c', + '../modules/module-avb/aecp-aem-cmds-resps/cmd-get-as-path.c', + '../modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-max-transit-time.c', '../modules/module-avb/aecp-aem-cmds-resps/reply-unsol-helpers.c', '../modules/module-avb/es-builder.c', '../modules/module-avb/avdecc.c', diff --git a/test/meson.build b/test/meson.build index 1005777af..44e603b21 100644 --- a/test/meson.build +++ b/test/meson.build @@ -190,6 +190,8 @@ if build_avb_virtual '../src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-stream-info.c', '../src/modules/module-avb/aecp-aem-cmds-resps/cmd-start-stop-streaming.c', '../src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-counters.c', + '../src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-as-path.c', + '../src/modules/module-avb/aecp-aem-cmds-resps/cmd-get-set-max-transit-time.c', '../src/modules/module-avb/aecp-aem-cmds-resps/reply-unsol-helpers.c', '../src/modules/module-avb/es-builder.c', '../src/modules/module-avb/avdecc.c',