first draft for ACMP/ timeout handling, and communication between SRP/ADP and the ACMP state machine

This commit is contained in:
hackerman-kl 2026-01-19 09:39:49 +01:00 committed by Wim Taymans
parent ad543e37f5
commit 4856f85de2
18 changed files with 2970 additions and 285 deletions

View file

@ -6,60 +6,6 @@
#include "../aecp-aem-state.h"
#include "../stream.h"
struct pending *pending_find(struct acmp *acmp, uint32_t type, uint16_t sequence_id)
{
struct pending *p;
spa_list_for_each(p, &acmp->pending[type], link)
if (p->sequence_id == sequence_id)
return p;
return NULL;
}
void pending_free(struct acmp *acmp, struct pending *p)
{
spa_list_remove(&p->link);
free(p);
}
void pending_destroy(struct acmp *acmp)
{
struct pending *p, *t;
for (uint32_t list_id = 0; list_id < PENDING_CONTROLLER; list_id++) {
spa_list_for_each_safe(p, t, &acmp->pending[list_id], link) {
pending_free(acmp, p);
}
}
}
void *pending_new(struct acmp *acmp, uint32_t type, uint64_t now, uint32_t timeout_ms,
const void *m, size_t size)
{
struct pending *p;
struct avb_ethernet_header *h;
struct avb_packet_acmp *pm;
p = calloc(1, sizeof(*p) + size);
if (p == NULL)
return NULL;
p->last_time = now;
p->timeout = timeout_ms * SPA_NSEC_PER_MSEC;
p->sequence_id = acmp->sequence_id[type]++;
p->size = size;
p->ptr = SPA_PTROFF(p, sizeof(*p), void);
memcpy(p->ptr, m, size);
h = p->ptr;
pm = SPA_PTROFF(h, sizeof(*h), void);
p->old_sequence_id = ntohs(pm->sequence_id);
pm->sequence_id = htons(p->sequence_id);
spa_list_append(&acmp->pending[type], &p->link);
return p->ptr;
}
struct stream *find_stream(struct server *server, enum spa_direction direction,
uint16_t index)
{
@ -75,13 +21,13 @@ struct stream *find_stream(struct server *server, enum spa_direction direction,
type = AVB_AEM_DESC_STREAM_OUTPUT;
break;
default:
pw_log_error("Unkown direction\n");
pw_log_error("Unkown direction");
return NULL;
}
desc = server_find_descriptor(server, type, index);
if (!desc) {
pw_log_error("Could not find stream type %u index %u\n",
pw_log_error("Could not find stream type %u index %u",
type, index);
return NULL;
}
@ -90,19 +36,19 @@ struct stream *find_stream(struct server *server, enum spa_direction direction,
case SPA_DIRECTION_INPUT:
struct aecp_aem_stream_input_state *stream_in;
stream_in = desc->ptr;
stream = &stream_in->stream;
stream = &stream_in->common.stream;
break;
case SPA_DIRECTION_OUTPUT:
struct aecp_aem_stream_output_state *stream_out;
stream_out = desc->ptr;
stream = &stream_out->stream;
stream = &stream_out->common.stream;
break;
}
return stream;
}
int reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len)
int acmp_reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len)
{
struct server *server = acmp->server;
uint8_t buf[len];
@ -115,12 +61,3 @@ int reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len)
return avb_server_send_packet(server, h->src, AVB_TSN_ETH, buf, len);
}
int retry_pending(struct acmp *acmp, uint64_t now, struct pending *p)
{
struct server *server = acmp->server;
struct avb_ethernet_header *h = p->ptr;
p->retry++;
p->last_time = now;
return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, p->ptr, p->size);
}

View file

@ -9,20 +9,14 @@
#include <pipewire/pipewire.h>
#include "../acmp.h"
struct pending {
struct spa_list link;
uint64_t last_time;
uint64_t timeout;
uint16_t old_sequence_id;
uint16_t sequence_id;
uint16_t retry;
size_t size;
void *ptr;
};
struct acmp {
struct server *server;
struct spa_hook server_listener;
};
struct acmp_legacy_avb {
struct acmp acmp;
#define PENDING_TALKER 0
#define PENDING_LISTENER 1
@ -31,20 +25,18 @@ struct acmp {
uint16_t sequence_id[3];
};
struct pending *pending_find(struct acmp *acmp, uint32_t type, uint16_t sequence_id);
struct acmp_milan_v12 {
struct acmp acmp;
void pending_free(struct acmp *acmp, struct pending *p);
struct spa_list timers_lt;
struct spa_list pending_tk;
uint16_t sequence_id[2];
};
void pending_destroy(struct acmp *acmp);
void *pending_new(struct acmp *acmp, uint32_t type, uint64_t now,
uint32_t timeout_ms, const void *m, size_t size);
int retry_pending(struct acmp *acmp, uint64_t now, struct pending *p);
struct stream *find_stream(struct server *server, enum spa_direction direction,
uint16_t index);
int reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len);
int acmp_reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len);
#endif //AVB_ACMP_COMMON_H

View file

@ -5,6 +5,78 @@
#include "acmp-common.h"
#include "acmp-legacy-avb.h"
struct pending {
struct spa_list link;
uint64_t last_time;
uint64_t timeout;
uint16_t old_sequence_id;
uint16_t sequence_id;
uint16_t retry;
size_t size;
void *ptr;
};
static struct pending *pending_find(struct acmp_legacy_avb *acmp_legacy, uint32_t type, uint16_t sequence_id)
{
struct pending *p;
spa_list_for_each(p, &acmp_legacy->pending[type], link)
if (p->sequence_id == sequence_id)
return p;
return NULL;
}
static void pending_free(struct acmp_legacy_avb *acmp_legacy, struct pending *p)
{
spa_list_remove(&p->link);
free(p);
}
static void pending_destroy(struct acmp_legacy_avb *acmp_legacy)
{
struct pending *p, *t;
for (uint32_t list_id = 0; list_id < PENDING_CONTROLLER; list_id++) {
spa_list_for_each_safe(p, t, &acmp_legacy->pending[list_id], link) {
pending_free(acmp_legacy, p);
}
}
}
static void *pending_new(struct acmp_legacy_avb *acmp_legacy, uint32_t type, uint64_t now, uint32_t timeout_ms,
const void *m, size_t size)
{
struct pending *p;
struct avb_ethernet_header *h;
struct avb_packet_acmp *pm;
p = calloc(1, sizeof(*p) + size);
if (p == NULL)
return NULL;
p->last_time = now;
p->timeout = timeout_ms * SPA_NSEC_PER_MSEC;
p->sequence_id = acmp_legacy->sequence_id[type]++;
p->size = size;
p->ptr = SPA_PTROFF(p, sizeof(*p), void);
memcpy(p->ptr, m, size);
h = p->ptr;
pm = SPA_PTROFF(h, sizeof(*h), void);
p->old_sequence_id = ntohs(pm->sequence_id);
pm->sequence_id = htons(p->sequence_id);
spa_list_append(&acmp_legacy->pending[type], &p->link);
return p->ptr;
}
int retry_pending(struct acmp_legacy_avb *acmp_legacy, uint64_t now, struct pending *p)
{
struct acmp *acmp = (struct acmp*)acmp_legacy;
struct server *server = acmp->server;
struct avb_ethernet_header *h = p->ptr;
p->retry++;
p->last_time = now;
return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, p->ptr, p->size);
}
int handle_connect_tx_command_legacy_avb(struct acmp *acmp, uint64_t now,
const void *m, int len)
{
@ -43,6 +115,7 @@ done:
int handle_connect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
const void *m, int len)
{
struct acmp_legacy_avb *acmp_legacy = (struct acmp_legacy_avb *) acmp;
struct server *server = acmp->server;
struct avb_ethernet_header *h;
const struct avb_packet_acmp *resp = SPA_PTROFF(m, sizeof(*h), void);
@ -57,7 +130,7 @@ int handle_connect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
sequence_id = ntohs(resp->sequence_id);
pending = pending_find(acmp, PENDING_TALKER, sequence_id);
pending = pending_find(acmp_legacy, PENDING_TALKER, sequence_id);
if (pending == NULL)
return 0;
@ -79,7 +152,7 @@ int handle_connect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
res = avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, pending->size);
pending_free(acmp, pending);
pending_free(acmp_legacy, pending);
return res;
}
@ -117,6 +190,7 @@ done:
int handle_disconnect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
const void *m, int len)
{
struct acmp_legacy_avb *acmp_legacy = (struct acmp_legacy_avb *) acmp;
struct server *server = acmp->server;
struct avb_ethernet_header *h;
struct avb_packet_acmp *reply;
@ -131,7 +205,7 @@ int handle_disconnect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
sequence_id = ntohs(resp->sequence_id);
pending = pending_find(acmp, PENDING_TALKER, sequence_id);
pending = pending_find(acmp_legacy, PENDING_TALKER, sequence_id);
if (pending == NULL)
return 0;
@ -151,7 +225,7 @@ int handle_disconnect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
res = avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, pending->size);
pending_free(acmp, pending);
pending_free(acmp_legacy, pending);
return res;
}
@ -159,6 +233,7 @@ int handle_disconnect_tx_response_legacy_avb(struct acmp *acmp, uint64_t now,
int handle_connect_rx_command_legacy_avb(struct acmp *acmp, uint64_t now,
const void *m, int len)
{
struct acmp_legacy_avb *acmp_legacy = (struct acmp_legacy_avb *) acmp;
struct server *server = acmp->server;
struct avb_ethernet_header *h;
const struct avb_packet_acmp *p = SPA_PTROFF(m, sizeof(*h), void);
@ -167,7 +242,7 @@ int handle_connect_rx_command_legacy_avb(struct acmp *acmp, uint64_t now,
if (be64toh(p->listener_guid) != server->entity_id)
return 0;
h = pending_new(acmp, PENDING_TALKER, now,
h = pending_new(acmp_legacy, PENDING_TALKER, now,
AVB_ACMP_TIMEOUT_CONNECT_TX_COMMAND_MS, m, len);
if (h == NULL)
return -errno;
@ -182,6 +257,7 @@ int handle_connect_rx_command_legacy_avb(struct acmp *acmp, uint64_t now,
int handle_disconnect_rx_command_legacy_avb(struct acmp *acmp, uint64_t now,
const void *m, int len)
{
struct acmp_legacy_avb *acmp_legacy = (struct acmp_legacy_avb *) acmp;
struct server *server = acmp->server;
struct avb_ethernet_header *h;
const struct avb_packet_acmp *p = SPA_PTROFF(m, sizeof(*h), void);
@ -190,7 +266,7 @@ int handle_disconnect_rx_command_legacy_avb(struct acmp *acmp, uint64_t now,
if (be64toh(p->listener_guid) != server->entity_id)
return 0;
h = pending_new(acmp, PENDING_TALKER, now,
h = pending_new(acmp_legacy, PENDING_TALKER, now,
AVB_ACMP_TIMEOUT_DISCONNECT_TX_COMMAND_MS, m, len);
if (h == NULL)
return -errno;
@ -201,3 +277,54 @@ int handle_disconnect_rx_command_legacy_avb(struct acmp *acmp, uint64_t now,
return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, len);
}
static void check_timeout(struct acmp *acmp, uint64_t now, uint16_t type)
{
struct acmp_legacy_avb *acmp_avb = (struct acmp_legacy_avb *)acmp;
struct pending *p, *t;
spa_list_for_each_safe(p, t, &acmp_avb->pending[type], link) {
if (p->last_time + p->timeout > now)
continue;
if (p->retry == 0) {
pw_log_info("%p: pending timeout, retry", p);
retry_pending(acmp_avb, now, p);
} else {
pw_log_info("%p: pending timeout, fail", p);
pending_free(acmp_avb, p);
}
}
}
void acmp_periodic_avb_legacy(void *data, uint64_t now)
{
struct acmp *acmp = data;
check_timeout(acmp, now, PENDING_TALKER);
check_timeout(acmp, now, PENDING_LISTENER);
check_timeout(acmp, now, PENDING_CONTROLLER);
}
struct acmp* acmp_server_init_legacy_avb(void)
{
struct acmp_legacy_avb *acmp_avb;
acmp_avb = calloc(1, sizeof(*acmp_avb));
if (acmp_avb == NULL)
return NULL;
spa_list_init(&acmp_avb->pending[PENDING_TALKER]);
spa_list_init(&acmp_avb->pending[PENDING_LISTENER]);
spa_list_init(&acmp_avb->pending[PENDING_CONTROLLER]);
return (struct acmp *)acmp_avb;
}
void acmp_server_destroy_legacy_avb(struct acmp *acmp)
{
struct acmp_legacy_avb *acmp_legacy = (struct acmp_legacy_avb *)acmp;
pending_destroy(acmp_legacy);
free(acmp_legacy);
}

View file

@ -7,6 +7,13 @@
#include <stdint.h>
#include "../acmp.h"
struct acmp* acmp_server_init_legacy_avb(void);
void acmp_periodic_avb_legacy(void *data, uint64_t now);
void acmp_server_destroy_legacy_avb(struct acmp *acmp);
int handle_connect_tx_command_legacy_avb(struct acmp *acmp, uint64_t now,
const void *m, int len);

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,67 @@
/* SPDX-FileCopyrightText: Copyright © 2027 Alexandre Malki <alexandre.malki@kebag-logic.com> */
/* SPDX-FileCopyrightText: Copyright © 2026 Alexandre Malki <alexandre.malki@kebag-logic.com> */
/* SPDX-License-Identifier: MIT */
#ifndef AVB_ACMP_MILAN_V12_H
#define AVB_ACMP_MILAN_V12_H
#include <stdint.h>
#include "acmp-common.h"
/** Milan v1.2 ACMP */
enum fsm_acmp_state_milan_v12 {
FSM_ACMP_STATE_MILAN_V12_UNBOUND,
FSM_ACMP_STATE_MILAN_V12_PRB_W_AVAIL,
FSM_ACMP_STATE_MILAN_V12_PRB_W_DELAY,
FSM_ACMP_STATE_MILAN_V12_PRB_W_RESP,
FSM_ACMP_STATE_MILAN_V12_PRB_W_RESP2,
FSM_ACMP_STATE_MILAN_V12_PRB_W_RETRY,
FSM_ACMP_STATE_MILAN_V12_SETTLED_NO_RSV,
FSM_ACMP_STATE_MILAN_V12_SETTLED_RSV_OK,
FSM_ACMP_STATE_MILAN_V12_MAX,
};
struct acmp* acmp_server_init_milan_v12(void);
void acmp_destroy_milan_v12(struct acmp *acmp);
void acmp_periodic_milan_v12(struct acmp *acmp, uint64_t now);
int handle_probe_tx_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_disconnect_tx_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_get_tx_state_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_get_tx_connection_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_unbind_rx_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_bind_rx_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_probe_tx_response_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_get_rx_state_command_milan_v12(struct acmp *acmp, uint64_t now,
const void *m, int len);
int handle_evt_tk_discovered_milan_v12(struct acmp *acmp, uint64_t entity);
int handle_evt_tk_departed_milan_v12(struct acmp *acmp, uint64_t entity);
int handle_evt_tk_registered_milan_v12(struct acmp *acmp, uint64_t talker_guid);
int handle_evt_tk_unregistered_milan_v12(struct acmp *acmp, uint64_t talker_guid);
int acmp_tmr_no_resp_milan_v12(struct acmp *acmp, uint64_t now);
int acmp_tmr_retry_milan_v12(struct acmp *acmp, uint64_t now);
int acmp_tmr_delay_milan_v12(struct acmp *acmp, uint64_t now);
int acmp_tmr_no_tk_milan_v12(struct acmp *acmp, uint64_t now);
int handle_acmp_cli_cmd_milan_v12(struct acmp *acmp, const char *args, FILE *out);
#endif //AVB_ACMP_MILAN_V12_H