module-avb: Introduce changes in the mechanisms how the stream are

built:
* es_builder: create stream with state variables and counters
* acmp: do not use the stream list, go through the descriptor to find
  the index
* stream: do not store redundant information such as the index and
  descriptor
* internal: removing the stream server and function associated to it

module-avb: internal, stream: removing server_find_stream
This commit is contained in:
hackerman-kl 2025-11-27 09:37:37 +01:00 committed by hackerman-kl
parent 546dafa0b0
commit 875dd91bc2
6 changed files with 120 additions and 55 deletions

View file

@ -11,6 +11,8 @@
#include "msrp.h"
#include "internal.h"
#include "stream.h"
#include "aecp-aem-descriptors.h"
#include "aecp-aem-state.h"
static const uint8_t mac[6] = AVB_BROADCAST_MAC;
@ -83,6 +85,48 @@ struct msg_info {
int (*handle) (struct acmp *acmp, uint64_t now, const void *m, int len);
};
static struct stream *find_stream(struct server *server, enum spa_direction direction,
uint16_t index)
{
uint16_t type;
struct descriptor *desc;
struct stream *stream;
switch (direction) {
case SPA_DIRECTION_INPUT:
type = AVB_AEM_DESC_STREAM_INPUT;
break;
case SPA_DIRECTION_OUTPUT:
type = AVB_AEM_DESC_STREAM_OUTPUT;
break;
default:
pw_log_error("Unkown direction\n");
return NULL;
}
desc = server_find_descriptor(server, type, index);
if (!desc) {
pw_log_error("Could not find stream type %u index %u\n",
type, index);
return NULL;
}
switch (direction) {
case SPA_DIRECTION_INPUT:
struct aecp_aem_stream_input_state *stream_in;
stream_in = desc->ptr;
stream = &stream_in->stream;
break;
case SPA_DIRECTION_OUTPUT:
struct aecp_aem_stream_output_state *stream_out;
stream_out = desc->ptr;
stream = &stream_out->stream;
break;
}
return stream;
}
static int reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len)
{
struct server *server = acmp->server;
@ -120,8 +164,7 @@ static int handle_connect_tx_command(struct acmp *acmp, uint64_t now, const void
return 0;
memcpy(buf, m, len);
stream = server_find_stream(server, SPA_DIRECTION_OUTPUT,
reply->talker_unique_id);
stream = find_stream(server, SPA_DIRECTION_OUTPUT, ntohs(reply->talker_unique_id));
if (stream == NULL) {
status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX;
goto done;
@ -130,7 +173,7 @@ static int handle_connect_tx_command(struct acmp *acmp, uint64_t now, const void
AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_RESPONSE);
reply->stream_id = htobe64(stream->id);
stream_activate(stream, now);
stream_activate(stream, ntohs(reply->talker_unique_id), now);
memcpy(reply->stream_dest_mac, stream->addr, 6);
reply->connection_count = htons(1);
@ -169,14 +212,13 @@ static int handle_connect_tx_response(struct acmp *acmp, uint64_t now, const voi
reply->sequence_id = htons(pending->old_sequence_id);
AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_RX_RESPONSE);
stream = server_find_stream(server, SPA_DIRECTION_INPUT,
ntohs(reply->listener_unique_id));
stream = find_stream(server, SPA_DIRECTION_INPUT, ntohs(reply->listener_unique_id));
if (stream == NULL)
return 0;
stream->peer_id = be64toh(reply->stream_id);
memcpy(stream->addr, reply->stream_dest_mac, 6);
stream_activate(stream, now);
stream_activate(stream, ntohs(reply->listener_unique_id), now);
res = avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, pending->size);
@ -199,8 +241,7 @@ static int handle_disconnect_tx_command(struct acmp *acmp, uint64_t now, const v
return 0;
memcpy(buf, m, len);
stream = server_find_stream(server, SPA_DIRECTION_OUTPUT,
reply->talker_unique_id);
stream = find_stream(server, SPA_DIRECTION_OUTPUT, ntohs(reply->talker_unique_id));
if (stream == NULL) {
status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX;
goto done;
@ -243,8 +284,7 @@ static int handle_disconnect_tx_response(struct acmp *acmp, uint64_t now, const
reply->sequence_id = htons(pending->old_sequence_id);
AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_RX_RESPONSE);
stream = server_find_stream(server, SPA_DIRECTION_INPUT,
reply->listener_unique_id);
stream = find_stream(server, SPA_DIRECTION_INPUT, ntohs(reply->listener_unique_id));
if (stream == NULL)
return 0;

View file

@ -254,7 +254,6 @@ struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props)
server->ifname = str ? strdup(str) : NULL;
spa_hook_list_init(&server->listener_list);
spa_list_init(&server->descriptors);
spa_list_init(&server->streams);
server->debug_messages = false;
@ -283,9 +282,6 @@ struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props)
avb_mrp_attribute_begin(server->domain_attr->mrp, 0);
avb_mrp_attribute_join(server->domain_attr->mrp, 0, true);
server_create_stream(server, SPA_DIRECTION_INPUT, 0);
server_create_stream(server, SPA_DIRECTION_OUTPUT, 0);
avb_maap_reserve(server->maap, 1);
init_descriptors(server);

View file

@ -4,7 +4,7 @@
#include "es-builder.h"
#include "aecp-aem-descriptors.h"
#include "aecp-aem-state.h"
/**
* \brief The goal of this modules is to create a an entity and
@ -31,9 +31,72 @@ struct es_builder_st {
es_builder_cb_t build_descriptor_cb;
};
/**
* \brief A generic function to avoid code duplicate for the streams */
static void *es_buidler_desc_stream_general_prepare(struct server *server,
uint16_t type, uint16_t index, size_t size, void *ptr)
{
void *ptr_alloc;
struct stream *stream;
enum spa_direction direction;
switch (type) {
case AVB_AEM_DESC_STREAM_INPUT:
struct aecp_aem_stream_input_state *pstream_input;
struct aecp_aem_stream_input_state stream_input = { 0 };
memcpy(&stream_input.desc, ptr, size);
ptr_alloc = server_add_descriptor(server, type, index,
sizeof(stream_input), &stream_input);
if (!ptr_alloc) {
pw_log_error("Allocation failed\n");
return NULL;
}
pstream_input = ptr_alloc;
stream = &pstream_input->stream;
direction = SPA_DIRECTION_INPUT;
break;
case AVB_AEM_DESC_STREAM_OUTPUT:
struct aecp_aem_stream_output_state *pstream_output;
struct aecp_aem_stream_output_state stream_output = { 0 };
memcpy(&stream_output.desc, ptr, size);
ptr_alloc = server_add_descriptor(server, type, index,
sizeof(stream_output), &stream_output);
if (!ptr_alloc) {
pw_log_error("Allocation failed\n");
return NULL;
}
pstream_output = ptr_alloc;
stream = &pstream_output->stream;
direction = SPA_DIRECTION_OUTPUT;
break;
default:
pw_log_error("Only STREAM_INPUT and STREAM_OUTPUT\n");
return NULL;
}
if (server_create_stream(server, stream, direction, index)) {
pw_log_error("Could not create/initialize a stream");
return NULL;
}
return ptr_alloc;
}
// Assign a ID to an specific builder
#define HELPER_ES_BUIDLER(type, callback) \
[type] = { .build_descriptor_cb = callback }
/** All callback that needs a status information */
static const struct es_builder_st es_builder[AVB_AEM_DESC_LAST_RESERVED_17221] =
{
HELPER_ES_BUIDLER(AVB_AEM_DESC_STREAM_OUTPUT, es_buidler_desc_stream_general_prepare),
HELPER_ES_BUIDLER(AVB_AEM_DESC_STREAM_INPUT, es_buidler_desc_stream_general_prepare),
};
/**

View file

@ -67,7 +67,6 @@ struct server {
struct spa_hook_list listener_list;
struct spa_list descriptors;
struct spa_list streams;
unsigned debug_messages:1;
@ -111,18 +110,6 @@ static inline void *server_add_descriptor(struct server *server,
return d->ptr;
}
static inline struct stream *server_find_stream(struct server *server,
enum spa_direction direction, uint16_t index)
{
struct stream *s;
spa_list_for_each(s, &server->streams, link) {
if (s->direction == direction &&
s->index == index)
return s;
}
return NULL;
}
struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props);
void avdecc_server_free(struct server *server);

View file

@ -15,8 +15,8 @@
#include "iec61883.h"
#include "stream.h"
#include "aecp-aem-state.h"
#include "utils.h"
#include "aecp-aem-descriptors.h"
static void on_stream_destroy(void *d)
{
@ -235,34 +235,17 @@ static const struct pw_stream_events sink_stream_events = {
.process = on_sink_stream_process
};
struct stream *server_create_stream(struct server *server,
struct stream *server_create_stream(struct server *server, struct stream *stream,
enum spa_direction direction, uint16_t index)
{
struct stream *stream;
const struct descriptor *desc;
uint32_t n_params;
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct spa_pod_builder b;
int res;
desc = server_find_descriptor(server,
direction == SPA_DIRECTION_INPUT ?
AVB_AEM_DESC_STREAM_INPUT :
AVB_AEM_DESC_STREAM_OUTPUT, index);
if (desc == NULL)
return NULL;
stream = calloc(1, sizeof(*stream));
if (stream == NULL)
return NULL;
stream->server = server;
stream->direction = direction;
stream->index = index;
stream->desc = desc;
spa_list_append(&server->streams, &stream->link);
stream->prio = AVB_MSRP_PRIORITY_DEFAULT;
stream->vlan_id = AVB_DEFAULT_VLAN;
@ -361,8 +344,6 @@ error_free:
void stream_destroy(struct stream *stream)
{
avb_mrp_attribute_destroy(stream->listener_attr->mrp);
spa_list_remove(&stream->link);
free(stream);
}
static int setup_socket(struct stream *stream)
@ -496,7 +477,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask)
}
}
int stream_activate(struct stream *stream, uint64_t now)
int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
{
struct server *server = stream->server;
struct avb_frame_header *h = (void*)stream->pdu;
@ -528,7 +509,7 @@ int stream_activate(struct stream *stream, uint64_t now)
stream->talker_attr->attr.talker.stream_id = htobe64(stream->peer_id);
avb_mrp_attribute_begin(stream->talker_attr->mrp, now);
} else {
if ((res = avb_maap_get_address(server->maap, stream->addr, stream->index)) < 0)
if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0)
return res;
stream->listener_attr->attr.listener.stream_id = htobe64(stream->id);

View file

@ -24,8 +24,6 @@ struct stream {
struct server *server;
uint16_t direction;
uint16_t index;
const struct descriptor *desc;
uint64_t id;
uint64_t peer_id;
@ -73,12 +71,12 @@ struct stream {
#include "mvrp.h"
#include "maap.h"
struct stream *server_create_stream(struct server *server,
struct stream *server_create_stream(struct server *server, struct stream *stream,
enum spa_direction direction, uint16_t index);
void stream_destroy(struct stream *stream);
int stream_activate(struct stream *stream, uint64_t now);
int stream_activate(struct stream *stream, uint16_t index, uint64_t now);
int stream_deactivate(struct stream *stream, uint64_t now);
#endif /* AVB_STREAM_H */