From 875dd91bc20227051de8399ae6a73dac2eb1cc1a Mon Sep 17 00:00:00 2001 From: hackerman-kl Date: Thu, 27 Nov 2025 09:37:37 +0100 Subject: [PATCH] module-avb: Introduce changes in the mechanisms how the stream are built: * es_builder: create stream with state variables and counters * acmp: do not use the stream list, go through the descriptor to find the index * stream: do not store redundant information such as the index and descriptor * internal: removing the stream server and function associated to it module-avb: internal, stream: removing server_find_stream --- src/modules/module-avb/acmp.c | 60 +++++++++++++++++++++----- src/modules/module-avb/avdecc.c | 4 -- src/modules/module-avb/es-builder.c | 65 ++++++++++++++++++++++++++++- src/modules/module-avb/internal.h | 13 ------ src/modules/module-avb/stream.c | 27 ++---------- src/modules/module-avb/stream.h | 6 +-- 6 files changed, 120 insertions(+), 55 deletions(-) diff --git a/src/modules/module-avb/acmp.c b/src/modules/module-avb/acmp.c index 951538be3..1d7f8af07 100644 --- a/src/modules/module-avb/acmp.c +++ b/src/modules/module-avb/acmp.c @@ -11,6 +11,8 @@ #include "msrp.h" #include "internal.h" #include "stream.h" +#include "aecp-aem-descriptors.h" +#include "aecp-aem-state.h" static const uint8_t mac[6] = AVB_BROADCAST_MAC; @@ -83,6 +85,48 @@ struct msg_info { int (*handle) (struct acmp *acmp, uint64_t now, const void *m, int len); }; +static struct stream *find_stream(struct server *server, enum spa_direction direction, + uint16_t index) +{ + uint16_t type; + struct descriptor *desc; + struct stream *stream; + + switch (direction) { + case SPA_DIRECTION_INPUT: + type = AVB_AEM_DESC_STREAM_INPUT; + break; + case SPA_DIRECTION_OUTPUT: + type = AVB_AEM_DESC_STREAM_OUTPUT; + break; + default: + pw_log_error("Unkown direction\n"); + return NULL; + } + + desc = server_find_descriptor(server, type, index); + if (!desc) { + pw_log_error("Could not find stream type %u index %u\n", + type, index); + return NULL; + } + + switch (direction) { + case SPA_DIRECTION_INPUT: + struct aecp_aem_stream_input_state *stream_in; + stream_in = desc->ptr; + stream = &stream_in->stream; + break; + case SPA_DIRECTION_OUTPUT: + struct aecp_aem_stream_output_state *stream_out; + stream_out = desc->ptr; + stream = &stream_out->stream; + break; + } + + return stream; +} + static int reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len) { struct server *server = acmp->server; @@ -120,8 +164,7 @@ static int handle_connect_tx_command(struct acmp *acmp, uint64_t now, const void return 0; memcpy(buf, m, len); - stream = server_find_stream(server, SPA_DIRECTION_OUTPUT, - reply->talker_unique_id); + stream = find_stream(server, SPA_DIRECTION_OUTPUT, ntohs(reply->talker_unique_id)); if (stream == NULL) { status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX; goto done; @@ -130,7 +173,7 @@ static int handle_connect_tx_command(struct acmp *acmp, uint64_t now, const void AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_RESPONSE); reply->stream_id = htobe64(stream->id); - stream_activate(stream, now); + stream_activate(stream, ntohs(reply->talker_unique_id), now); memcpy(reply->stream_dest_mac, stream->addr, 6); reply->connection_count = htons(1); @@ -169,14 +212,13 @@ static int handle_connect_tx_response(struct acmp *acmp, uint64_t now, const voi reply->sequence_id = htons(pending->old_sequence_id); AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_RX_RESPONSE); - stream = server_find_stream(server, SPA_DIRECTION_INPUT, - ntohs(reply->listener_unique_id)); + stream = find_stream(server, SPA_DIRECTION_INPUT, ntohs(reply->listener_unique_id)); if (stream == NULL) return 0; stream->peer_id = be64toh(reply->stream_id); memcpy(stream->addr, reply->stream_dest_mac, 6); - stream_activate(stream, now); + stream_activate(stream, ntohs(reply->listener_unique_id), now); res = avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, pending->size); @@ -199,8 +241,7 @@ static int handle_disconnect_tx_command(struct acmp *acmp, uint64_t now, const v return 0; memcpy(buf, m, len); - stream = server_find_stream(server, SPA_DIRECTION_OUTPUT, - reply->talker_unique_id); + stream = find_stream(server, SPA_DIRECTION_OUTPUT, ntohs(reply->talker_unique_id)); if (stream == NULL) { status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX; goto done; @@ -243,8 +284,7 @@ static int handle_disconnect_tx_response(struct acmp *acmp, uint64_t now, const reply->sequence_id = htons(pending->old_sequence_id); AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_RX_RESPONSE); - stream = server_find_stream(server, SPA_DIRECTION_INPUT, - reply->listener_unique_id); + stream = find_stream(server, SPA_DIRECTION_INPUT, ntohs(reply->listener_unique_id)); if (stream == NULL) return 0; diff --git a/src/modules/module-avb/avdecc.c b/src/modules/module-avb/avdecc.c index 183798fb6..067fb5e42 100644 --- a/src/modules/module-avb/avdecc.c +++ b/src/modules/module-avb/avdecc.c @@ -254,7 +254,6 @@ struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props) server->ifname = str ? strdup(str) : NULL; spa_hook_list_init(&server->listener_list); spa_list_init(&server->descriptors); - spa_list_init(&server->streams); server->debug_messages = false; @@ -283,9 +282,6 @@ struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props) avb_mrp_attribute_begin(server->domain_attr->mrp, 0); avb_mrp_attribute_join(server->domain_attr->mrp, 0, true); - server_create_stream(server, SPA_DIRECTION_INPUT, 0); - server_create_stream(server, SPA_DIRECTION_OUTPUT, 0); - avb_maap_reserve(server->maap, 1); init_descriptors(server); diff --git a/src/modules/module-avb/es-builder.c b/src/modules/module-avb/es-builder.c index 6d6330782..8d2d80c07 100644 --- a/src/modules/module-avb/es-builder.c +++ b/src/modules/module-avb/es-builder.c @@ -4,7 +4,7 @@ #include "es-builder.h" -#include "aecp-aem-descriptors.h" +#include "aecp-aem-state.h" /** * \brief The goal of this modules is to create a an entity and @@ -31,9 +31,72 @@ struct es_builder_st { es_builder_cb_t build_descriptor_cb; }; +/** + * \brief A generic function to avoid code duplicate for the streams */ +static void *es_buidler_desc_stream_general_prepare(struct server *server, + uint16_t type, uint16_t index, size_t size, void *ptr) +{ + void *ptr_alloc; + struct stream *stream; + enum spa_direction direction; + + switch (type) { + case AVB_AEM_DESC_STREAM_INPUT: + struct aecp_aem_stream_input_state *pstream_input; + struct aecp_aem_stream_input_state stream_input = { 0 }; + + memcpy(&stream_input.desc, ptr, size); + ptr_alloc = server_add_descriptor(server, type, index, + sizeof(stream_input), &stream_input); + if (!ptr_alloc) { + pw_log_error("Allocation failed\n"); + return NULL; + } + + pstream_input = ptr_alloc; + stream = &pstream_input->stream; + direction = SPA_DIRECTION_INPUT; + break; + case AVB_AEM_DESC_STREAM_OUTPUT: + struct aecp_aem_stream_output_state *pstream_output; + struct aecp_aem_stream_output_state stream_output = { 0 }; + + memcpy(&stream_output.desc, ptr, size); + ptr_alloc = server_add_descriptor(server, type, index, + sizeof(stream_output), &stream_output); + if (!ptr_alloc) { + pw_log_error("Allocation failed\n"); + return NULL; + } + + pstream_output = ptr_alloc; + stream = &pstream_output->stream; + direction = SPA_DIRECTION_OUTPUT; + + break; + default: + pw_log_error("Only STREAM_INPUT and STREAM_OUTPUT\n"); + return NULL; + } + + if (server_create_stream(server, stream, direction, index)) { + pw_log_error("Could not create/initialize a stream"); + return NULL; + } + + return ptr_alloc; +} + + +// Assign a ID to an specific builder +#define HELPER_ES_BUIDLER(type, callback) \ + [type] = { .build_descriptor_cb = callback } + /** All callback that needs a status information */ static const struct es_builder_st es_builder[AVB_AEM_DESC_LAST_RESERVED_17221] = { + HELPER_ES_BUIDLER(AVB_AEM_DESC_STREAM_OUTPUT, es_buidler_desc_stream_general_prepare), + HELPER_ES_BUIDLER(AVB_AEM_DESC_STREAM_INPUT, es_buidler_desc_stream_general_prepare), }; /** diff --git a/src/modules/module-avb/internal.h b/src/modules/module-avb/internal.h index 29e45c57c..c0600600b 100644 --- a/src/modules/module-avb/internal.h +++ b/src/modules/module-avb/internal.h @@ -67,7 +67,6 @@ struct server { struct spa_hook_list listener_list; struct spa_list descriptors; - struct spa_list streams; unsigned debug_messages:1; @@ -111,18 +110,6 @@ static inline void *server_add_descriptor(struct server *server, return d->ptr; } -static inline struct stream *server_find_stream(struct server *server, - enum spa_direction direction, uint16_t index) -{ - struct stream *s; - spa_list_for_each(s, &server->streams, link) { - if (s->direction == direction && - s->index == index) - return s; - } - return NULL; -} - struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props); void avdecc_server_free(struct server *server); diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index baa68cad7..f7101bdf0 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -15,8 +15,8 @@ #include "iec61883.h" #include "stream.h" +#include "aecp-aem-state.h" #include "utils.h" -#include "aecp-aem-descriptors.h" static void on_stream_destroy(void *d) { @@ -235,34 +235,17 @@ static const struct pw_stream_events sink_stream_events = { .process = on_sink_stream_process }; -struct stream *server_create_stream(struct server *server, +struct stream *server_create_stream(struct server *server, struct stream *stream, enum spa_direction direction, uint16_t index) { - struct stream *stream; - const struct descriptor *desc; uint32_t n_params; const struct spa_pod *params[1]; uint8_t buffer[1024]; struct spa_pod_builder b; int res; - desc = server_find_descriptor(server, - direction == SPA_DIRECTION_INPUT ? - AVB_AEM_DESC_STREAM_INPUT : - AVB_AEM_DESC_STREAM_OUTPUT, index); - if (desc == NULL) - return NULL; - - stream = calloc(1, sizeof(*stream)); - if (stream == NULL) - return NULL; - stream->server = server; stream->direction = direction; - stream->index = index; - stream->desc = desc; - spa_list_append(&server->streams, &stream->link); - stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->vlan_id = AVB_DEFAULT_VLAN; @@ -361,8 +344,6 @@ error_free: void stream_destroy(struct stream *stream) { avb_mrp_attribute_destroy(stream->listener_attr->mrp); - spa_list_remove(&stream->link); - free(stream); } static int setup_socket(struct stream *stream) @@ -496,7 +477,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask) } } -int stream_activate(struct stream *stream, uint64_t now) +int stream_activate(struct stream *stream, uint16_t index, uint64_t now) { struct server *server = stream->server; struct avb_frame_header *h = (void*)stream->pdu; @@ -528,7 +509,7 @@ int stream_activate(struct stream *stream, uint64_t now) stream->talker_attr->attr.talker.stream_id = htobe64(stream->peer_id); avb_mrp_attribute_begin(stream->talker_attr->mrp, now); } else { - if ((res = avb_maap_get_address(server->maap, stream->addr, stream->index)) < 0) + if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) return res; stream->listener_attr->attr.listener.stream_id = htobe64(stream->id); diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h index 64de0a94a..f650cc216 100644 --- a/src/modules/module-avb/stream.h +++ b/src/modules/module-avb/stream.h @@ -24,8 +24,6 @@ struct stream { struct server *server; uint16_t direction; - uint16_t index; - const struct descriptor *desc; uint64_t id; uint64_t peer_id; @@ -73,12 +71,12 @@ struct stream { #include "mvrp.h" #include "maap.h" -struct stream *server_create_stream(struct server *server, +struct stream *server_create_stream(struct server *server, struct stream *stream, enum spa_direction direction, uint16_t index); void stream_destroy(struct stream *stream); -int stream_activate(struct stream *stream, uint64_t now); +int stream_activate(struct stream *stream, uint16_t index, uint64_t now); int stream_deactivate(struct stream *stream, uint64_t now); #endif /* AVB_STREAM_H */