milan-avb: stream: wire Milan Section 5.4.5 stream counters, TX heartbeat, and MAX_TRANSIT_TIME plumbing

This commit is contained in:
hackerman-kl 2026-04-26 16:19:29 +02:00 committed by Wim Taymans
parent 16d793db38
commit 9c0007173b
11 changed files with 487 additions and 46 deletions

View file

@ -2,6 +2,112 @@
/* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */
/* SPDX-License-Identifier: MIT */
/*
* stream.c AVTP stream data plane.
*
* Each STREAM_INPUT and STREAM_OUTPUT descriptor in the AEM model owns a
* `struct stream` here. The stream wraps both:
* - a PipeWire `pw_stream` (so audio reaches/leaves the local node graph
* as an `avb.source`/`avb.sink` Audio/Source or Audio/Sink), and
* - a raw AF_PACKET socket on the AVB interface (for AVTP frames on the
* wire at ethertype 0x22f0).
*
* Direction map (PipeWire vs AVB):
* AVB STREAM_OUTPUT (talker) = PipeWire AUDIO/SINK
* PipeWire pushes samples in via on_sink_stream_process(); we send
* AVTP frames to a MAAP-allocated dest_mac.
* AVB STREAM_INPUT (listener) = PipeWire AUDIO/SOURCE
* We receive AVTP frames into the ringbuffer; PipeWire pulls samples
* out via on_source_stream_process().
*
* --------------------------------------------------------------------------
* TX heartbeat (output direction)
* --------------------------------------------------------------------------
*
* Why a timer drives flush_write_* instead of the PipeWire process tick:
*
* The AVTP wire schedule is dictated by the talker frames must leave
* every `pdu_period` (= SPA_NSEC_PER_SEC * frames_per_pdu / sample_rate;
* 125 µs at 48 kHz / 6 frames). A bound listener computes its own
* presentation_time relative to those wire arrivals and expects them to
* keep coming. If we tied flush_write to PipeWire's process callback we
* would only emit frames when an upstream PipeWire node feeds samples;
* the moment nothing is connected to avb.sink-N (the common case during
* bring-up, conformance testing, or whenever the user's audio app hasn't
* started yet), the wire goes silent, the listener's media_locked
* counter stays at 0, and Milan Section 4.3.3.1 / Hive treat the talker as
* absent.
*
* So an output stream owns its own periodic timer (`flush_timer`,
* AVB_FLUSH_TICK_NS = 1 ms = 8 PDUs). Each tick:
*
* 1. computes how many PDUs are owed since the last drain
* (`(now - flush_last_ns) / pdu_period`),
* 2. tops up the ringbuffer with zero samples if PipeWire hasn't
* kept up (`pad_ringbuffer_with_silence`), and
* 3. drains via flush_write_milan_v12 / flush_write_legacy.
*
* When PipeWire IS connected and feeding samples in time, step 2
* no-ops because filled needed the timer just becomes the metronome.
* When PipeWire is silent or under-runs, step 2 fills the deficit with
* zeros so the wire keeps a valid AVTP frame coming. Listeners receive
* silent (but spec-correct, tv=1) frames and remain locked.
*
* on_sink_stream_process() therefore only writes into the ringbuffer; it
* no longer calls flush_write_*. Calling both would double-send each
* PDU.
*
* --------------------------------------------------------------------------
* Counter unsolicited notifications
* --------------------------------------------------------------------------
*
* The data-plane sites in this file (flush_write_*, handle_aaf_packet,
* handle_iec61883_packet, stream_activate, stream_deactivate) increment
* the per-descriptor counters in `aecp_aem_stream_input_counters` /
* `aecp_aem_stream_output_counters` and mark `counters_dirty = true` on
* the descriptor's state via stream_in_mark_counters_dirty() and
* stream_out_mark_counters_dirty().
*
* The AECP periodic in cmd-get-counters.c (cmd_get_counters_periodic_milan_v12)
* scans descriptors at the server-tick rate (~100 ms) and, for each
* dirty descriptor where COUNTER_UNSOL_MIN_INTERVAL_NS (= 1 s) has
* elapsed since the last emit, sends one unsolicited GET_COUNTERS
* RESPONSE with the *current* values and clears the dirty flag.
*
* Net effect: a counter that ticks 1000 times in a second produces ONE
* unsolicited notification per second per descriptor, carrying the
* latest aggregate count (since the read happens at emit time). A
* counter that doesn't change produces no notification Hive's GET_COUNTERS
* refresh still sees the latest values via the synchronous handler.
*
* Per-counter wiring status (Milan Section 5.4.5.3, Table 5.16 Stream Input):
* FRAMES_RX live: handle_aaf_packet / handle_iec61883_packet
* STREAM_INTERRUPTED live: ringbuffer overrun in the same handlers
* MEDIA_LOCKED live: first-frame edge in handle_*_packet
* MEDIA_UNLOCKED live: cmd-get-counters periodic when last_frame_rx_ns
* ages past MEDIA_UNLOCK_TIMEOUT_NS
* SEQ_NUM_MISMATCH TODO: compare p->seq_num against expected (last + 1
* modulo 256), tick on mismatch and resync expected
* MEDIA_RESET_IN TODO: tick when AVTPDU header sets the mr bit
* (header reset notification)
* TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header
* UNSUPPORTED_FORMAT TODO: tick when subtype/format mismatch the bound
* descriptor's current_format
* LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now
* (frame missed its presentation deadline)
* EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time
* (frame arrived too far ahead of its deadline)
* Table 5.17 Stream Output:
* FRAMES_TX live: per send in flush_write_milan_v12 / _legacy
* STREAM_START live: stream_activate (first activation per session)
* STREAM_STOP live: stream_deactivate
* MEDIA_RESET_OUT TODO: tick when the AVTPDU mr bit is asserted by us
* TIMESTAMP_UNCERTAIN_OUT TODO: tick when we set tu in an outgoing frame
* (e.g. PipeWire underrun forced silent fill)
*
* --------------------------------------------------------------------------
*/
#include <unistd.h>
#include <linux/if_ether.h>
#include <linux/if_packet.h>
@ -22,6 +128,42 @@
#include "mvrp.h"
#include "utils.h"
static inline struct aecp_aem_stream_input_state *stream_in_state(struct stream *s)
{
struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream);
return SPA_CONTAINER_OF(c, struct aecp_aem_stream_input_state, common);
}
static inline struct aecp_aem_stream_input_counters *stream_in_counters(struct stream *s)
{
return &stream_in_state(s)->counters;
}
static inline struct aecp_aem_stream_output_counters *stream_out_counters(struct stream *s)
{
struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream);
struct aecp_aem_stream_output_state *so =
SPA_CONTAINER_OF(c, struct aecp_aem_stream_output_state, common);
return &so->counters;
}
static inline void stream_in_mark_counters_dirty(struct stream *s)
{
struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream);
struct aecp_aem_stream_input_state *si =
SPA_CONTAINER_OF(c, struct aecp_aem_stream_input_state, common);
si->counters_dirty = true;
}
static inline void stream_out_mark_counters_dirty(struct stream *s)
{
struct stream_common *c = SPA_CONTAINER_OF(s, struct stream_common, stream);
struct aecp_aem_stream_output_state *so =
SPA_CONTAINER_OF(c, struct aecp_aem_stream_output_state, common);
so->counters_dirty = true;
}
#define AVB_FLUSH_TICK_NS ((uint64_t)1000000)
static int flush_write_milan_v12(struct stream *stream, uint64_t current_time);
static int flush_write_legacy(struct stream *stream, uint64_t current_time);
static void on_stream_destroy(void *d)
{
struct stream *stream = d;
@ -29,6 +171,77 @@ static void on_stream_destroy(void *d)
stream->stream = NULL;
}
static void pad_ringbuffer_with_silence(struct stream *stream, int owed)
{
uint32_t index;
int32_t filled;
size_t needed;
size_t deficit;
size_t off;
void *base;
if (owed <= 0)
return;
filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
if (filled < 0)
filled = 0;
needed = (size_t)owed * stream->stride * stream->frames_per_pdu;
if ((size_t)filled >= needed)
return;
deficit = needed - (size_t)filled;
if ((size_t)filled + deficit > stream->buffer_size)
deficit = stream->buffer_size - (size_t)filled;
off = index % stream->buffer_size;
base = stream->buffer_data;
if (off + deficit <= stream->buffer_size) {
memset(SPA_PTROFF(base, off, void), 0, deficit);
} else {
size_t tail = stream->buffer_size - off;
memset(SPA_PTROFF(base, off, void), 0, tail);
memset(base, 0, deficit - tail);
}
spa_ringbuffer_write_update(&stream->ring, index + (uint32_t)deficit);
}
static void on_flush_tick(void *data, uint64_t expirations)
{
struct stream *stream = data;
struct server *server = stream->server;
struct timespec now_ts;
uint64_t now_ns;
int owed;
(void)expirations;
if (clock_gettime(CLOCK_TAI, &now_ts) < 0)
return;
now_ns = SPA_TIMESPEC_TO_NSEC(&now_ts);
if (stream->flush_last_ns == 0) {
stream->flush_last_ns = now_ns;
return;
}
if (stream->pdu_period == 0)
return;
owed = (int)((now_ns - stream->flush_last_ns) / (uint64_t)stream->pdu_period);
if (owed <= 0)
return;
stream->flush_last_ns += (uint64_t)owed * (uint64_t)stream->pdu_period;
pad_ringbuffer_with_silence(stream, owed);
if (server->avb_mode == AVB_MODE_MILAN_V12)
flush_write_milan_v12(stream, now_ns);
else
flush_write_legacy(stream, now_ns);
}
static void on_source_stream_process(void *data)
{
struct stream *stream = data;
@ -122,11 +335,14 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time)
if (n < 0 || n != (ssize_t)stream->pdu_size)
pw_log_error("stream send failed %zd != %zd: %m",
n, stream->pdu_size);
else
stream_out_counters(stream)->frame_tx++;
txtime += stream->pdu_period;
ptime += stream->pdu_period;
index += stream->payload_size;
}
stream_out_mark_counters_dirty(stream);
spa_ringbuffer_read_update(&stream->ring, index);
return 0;
}
@ -168,6 +384,8 @@ static int flush_write_legacy(struct stream *stream, uint64_t current_time)
if (n < 0 || n != (ssize_t)stream->pdu_size)
pw_log_error("stream send failed %zd != %zd: %m",
n, stream->pdu_size);
else
stream_out_counters(stream)->frame_tx++;
txtime += stream->pdu_period;
ptime += stream->pdu_period;
index += stream->payload_size;
@ -175,6 +393,7 @@ static int flush_write_legacy(struct stream *stream, uint64_t current_time)
}
stream->dbc = dbc;
stream_out_mark_counters_dirty(stream);
spa_ringbuffer_read_update(&stream->ring, index);
return 0;
}
@ -186,7 +405,6 @@ static void on_sink_stream_process(void *data)
struct spa_data *d;
int32_t filled;
uint32_t index, offs, avail, size;
struct timespec now;
if ((buf = pw_stream_dequeue_buffer(stream->stream)) == NULL) {
pw_log_debug("out of buffers: %m");
@ -214,11 +432,6 @@ static void on_sink_stream_process(void *data)
}
pw_stream_queue_buffer(stream->stream, buf);
clock_gettime(CLOCK_TAI, &now);
if (stream->server->avb_mode == AVB_MODE_MILAN_V12)
flush_write_milan_v12(stream, SPA_TIMESPEC_TO_NSEC(&now));
else
flush_write_legacy(stream, SPA_TIMESPEC_TO_NSEC(&now));
}
static void setup_pdu_milan_v12(struct stream *stream)
@ -339,6 +552,11 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
stream->index = index;
stream->prio = AVB_MSRP_PRIORITY_DEFAULT;
stream->vlan_id = AVB_DEFAULT_VLAN;
stream->mtt = 2000000;
/* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is
* the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe
* default until we have a way to measure it from gPTP. */
stream->t_uncertainty = 125000;
stream->id = (uint64_t)server->mac_addr[0] << 56 |
(uint64_t)server->mac_addr[1] << 48 |
@ -380,11 +598,25 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
&sink_stream_events,
stream);
stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE;
stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED;
stream->info.info.raw.rate = 48000;
stream->info.info.raw.channels = 8;
stream->stride = stream->info.info.raw.channels * 4;
{
uint16_t desc_type = (direction == SPA_DIRECTION_INPUT)
? AVB_AEM_DESC_STREAM_INPUT
: AVB_AEM_DESC_STREAM_OUTPUT;
struct descriptor *desc = server_find_descriptor(server, desc_type, index);
struct avb_aem_desc_stream *body =
desc ? descriptor_body(desc) : NULL;
struct avb_aem_stream_format_info fi = { 0 };
stream->format = body ? body->current_format : 0;
if (stream->format)
avb_aem_stream_format_decode(stream->format, &fi);
stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE;
stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED;
stream->info.info.raw.rate = fi.is_audio && fi.rate ? fi.rate : 48000;
stream->info.info.raw.channels = fi.is_audio && fi.channels ? fi.channels : 8;
stream->stride = stream->info.info.raw.channels * 4;
}
n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
@ -520,28 +752,53 @@ static int setup_socket(struct stream *stream)
static void handle_aaf_packet(struct stream *stream,
struct avb_packet_aaf *p, int len)
{
struct aecp_aem_stream_input_state *si = stream_in_state(stream);
struct aecp_aem_stream_input_counters *cnt = &si->counters;
struct timespec now_ts;
uint32_t index, n_bytes;
int32_t filled;
filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
n_bytes = ntohs(p->data_len);
if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) {
pw_log_debug("capture overrun");
} else {
spa_ringbuffer_write_data(&stream->ring,
stream->buffer_data,
stream->buffer_size,
index % stream->buffer_size,
p->payload, n_bytes);
index += n_bytes;
spa_ringbuffer_write_update(&stream->ring, index);
/* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every valid
* AVTPDU received on the wire independent of whether the listener
* pipeline could absorb it. A ringbuffer overrun is a separate event
* that bumps STREAM_INTERRUPTED. Counting both unconditionally keeps
* Hive's dashboard meaningful even when no PipeWire consumer is
* draining the source side. */
cnt->frame_rx++;
clock_gettime(CLOCK_MONOTONIC, &now_ts);
si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts);
if (!si->media_locked_state) {
cnt->media_locked++;
si->media_locked_state = true;
}
if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) {
uint32_t r_index;
spa_ringbuffer_get_read_index(&stream->ring, &r_index);
spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes);
cnt->stream_interrupted++;
filled -= n_bytes;
}
spa_ringbuffer_write_data(&stream->ring,
stream->buffer_data,
stream->buffer_size,
index % stream->buffer_size,
p->payload, n_bytes);
index += n_bytes;
spa_ringbuffer_write_update(&stream->ring, index);
stream_in_mark_counters_dirty(stream);
}
static void handle_iec61883_packet(struct stream *stream,
struct avb_packet_iec61883 *p, int len)
{
struct aecp_aem_stream_input_state *si = stream_in_state(stream);
struct aecp_aem_stream_input_counters *cnt = &si->counters;
struct timespec now_ts;
uint32_t index, n_bytes;
uint16_t data_len;
int32_t filled;
@ -554,17 +811,30 @@ static void handle_iec61883_packet(struct stream *stream,
if (n_bytes > (uint32_t)(len - (int)sizeof(*p)))
return;
if (filled + n_bytes > stream->buffer_size) {
pw_log_debug("capture overrun");
} else {
spa_ringbuffer_write_data(&stream->ring,
stream->buffer_data,
stream->buffer_size,
index % stream->buffer_size,
p->payload, n_bytes);
index += n_bytes;
spa_ringbuffer_write_update(&stream->ring, index);
cnt->frame_rx++;
clock_gettime(CLOCK_MONOTONIC, &now_ts);
si->last_frame_rx_ns = SPA_TIMESPEC_TO_NSEC(&now_ts);
if (!si->media_locked_state) {
cnt->media_locked++;
si->media_locked_state = true;
}
if (filled + n_bytes > stream->buffer_size) {
uint32_t r_index;
spa_ringbuffer_get_read_index(&stream->ring, &r_index);
spa_ringbuffer_read_update(&stream->ring, r_index + n_bytes);
cnt->stream_interrupted++;
filled -= n_bytes;
}
spa_ringbuffer_write_data(&stream->ring,
stream->buffer_data,
stream->buffer_size,
index % stream->buffer_size,
p->payload, n_bytes);
index += n_bytes;
spa_ringbuffer_write_update(&stream->ring, index);
stream_in_mark_counters_dirty(stream);
}
static void on_socket_data(void *data, int fd, uint32_t mask)
@ -668,6 +938,31 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
pw_stream_set_active(stream->stream, true);
/* Milan Table 5.17: STREAM_START counter ticks each time the stream
* transitions from stopped started. */
if (stream->direction == SPA_DIRECTION_OUTPUT) {
stream_out_counters(stream)->stream_start++;
stream_out_mark_counters_dirty(stream);
if (stream->flush_timer == NULL) {
struct timespec value = {
.tv_sec = (time_t)(AVB_FLUSH_TICK_NS / SPA_NSEC_PER_SEC),
.tv_nsec = (long)(AVB_FLUSH_TICK_NS % SPA_NSEC_PER_SEC),
};
struct timespec interval = value;
stream->flush_last_ns = 0;
stream->flush_timer = pw_loop_add_timer(server->impl->loop,
on_flush_tick, stream);
if (stream->flush_timer)
pw_loop_update_timer(server->impl->loop,
stream->flush_timer,
&value, &interval, false);
else
pw_log_warn("stream %p: no flush_timer (will rely on PipeWire pace)",
stream);
}
}
return 0;
}
@ -682,15 +977,27 @@ int stream_deactivate(struct stream *stream, uint64_t now)
pw_loop_destroy_source(stream->server->impl->loop, stream->source);
stream->source = NULL;
}
if (stream->flush_timer != NULL) {
pw_loop_destroy_source(stream->server->impl->loop, stream->flush_timer);
stream->flush_timer = NULL;
stream->flush_last_ns = 0;
}
#if 0
avb_mrp_attribute_leave(stream->vlan_attr->mrp, now);
#endif //
#endif //
if (stream->direction == SPA_DIRECTION_INPUT)
avb_mrp_attribute_leave(common->lstream_attr.mrp, now);
else
avb_mrp_attribute_leave(common->tastream_attr.mrp, now);
/* Milan Table 5.17: STREAM_STOP counter ticks each transition the
* other way. */
if (stream->direction == SPA_DIRECTION_OUTPUT) {
stream_out_counters(stream)->stream_stop++;
stream_out_mark_counters_dirty(stream);
}
return 0;
}