From 14310e66feb4393fb33798a0e6e85507af9050b2 Mon Sep 17 00:00:00 2001 From: "Christian F.K. Schaller" Date: Tue, 7 Apr 2026 17:39:04 -0400 Subject: [PATCH] module-avb: extend transport abstraction to stream data path Add stream_setup_socket and stream_send ops to avb_transport_ops so the stream data plane can use the same pluggable transport backend as the control plane. Move the raw AF_PACKET socket setup from stream.c into avdecc.c as raw_stream_setup_socket(), and add a raw_stream_send() wrapper around sendmsg(). Add a stream list (spa_list) to struct server so streams can be iterated after creation, and add stream_activate_virtual() for lightweight activation without MRP/MAAP network operations. Implement loopback stream ops: eventfd-based dummy sockets and no-op send that discards audio data. This enables virtual AVB nodes that work without network hardware or privileges. Co-Authored-By: Claude Opus 4.6 --- .../module-avb/avb-transport-loopback.h | 35 ++++++ src/modules/module-avb/avdecc.c | 99 +++++++++++++++++ src/modules/module-avb/internal.h | 14 +++ src/modules/module-avb/stream.c | 105 +++++------------- src/modules/module-avb/stream.h | 1 + 5 files changed, 176 insertions(+), 78 deletions(-) diff --git a/src/modules/module-avb/avb-transport-loopback.h b/src/modules/module-avb/avb-transport-loopback.h index d783ad55c..18508b99a 100644 --- a/src/modules/module-avb/avb-transport-loopback.h +++ b/src/modules/module-avb/avb-transport-loopback.h @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include "internal.h" #include "packets.h" @@ -104,11 +106,44 @@ static inline void avb_loopback_destroy(struct server *server) server->transport_data = NULL; } +/** + * Create a dummy stream socket using eventfd. + * No AF_PACKET, no ioctls, no privileges needed. + */ +static inline int avb_loopback_stream_setup_socket(struct server *server, + struct stream *stream) +{ + int fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (fd < 0) + return -errno; + + spa_zero(stream->sock_addr); + stream->sock_addr.sll_family = AF_PACKET; + stream->sock_addr.sll_halen = ETH_ALEN; + + return fd; +} + +/** + * No-op stream send — pretend the send succeeded. + * Audio data is consumed from the ringbuffer but goes nowhere. + */ +static inline ssize_t avb_loopback_stream_send(struct server *server, + struct stream *stream, struct msghdr *msg, int flags) +{ + ssize_t total = 0; + for (size_t i = 0; i < msg->msg_iovlen; i++) + total += msg->msg_iov[i].iov_len; + return total; +} + static const struct avb_transport_ops avb_transport_loopback = { .setup = avb_loopback_setup, .send_packet = avb_loopback_send_packet, .make_socket = avb_loopback_make_socket, .destroy = avb_loopback_destroy, + .stream_setup_socket = avb_loopback_stream_setup_socket, + .stream_send = avb_loopback_stream_send, }; /** Get the number of captured sent packets */ diff --git a/src/modules/module-avb/avdecc.c b/src/modules/module-avb/avdecc.c index b3a40133d..8729d6421 100644 --- a/src/modules/module-avb/avdecc.c +++ b/src/modules/module-avb/avdecc.c @@ -257,6 +257,102 @@ error_no_source: return res; } +static int raw_stream_setup_socket(struct server *server, struct stream *stream) +{ + int fd, res; + char buf[128]; + struct ifreq req; + + fd = socket(AF_PACKET, SOCK_RAW | SOCK_NONBLOCK, htons(ETH_P_ALL)); + if (fd < 0) { + pw_log_error("socket() failed: %m"); + return -errno; + } + + spa_zero(req); + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname); + res = ioctl(fd, SIOCGIFINDEX, &req); + if (res < 0) { + pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname); + res = -errno; + goto error_close; + } + + spa_zero(stream->sock_addr); + stream->sock_addr.sll_family = AF_PACKET; + stream->sock_addr.sll_protocol = htons(ETH_P_TSN); + stream->sock_addr.sll_ifindex = req.ifr_ifindex; + + if (stream->direction == SPA_DIRECTION_OUTPUT) { + struct sock_txtime txtime_cfg; + + res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &stream->prio, + sizeof(stream->prio)); + if (res < 0) { + pw_log_error("setsockopt(SO_PRIORITY %d) failed: %m", stream->prio); + res = -errno; + goto error_close; + } + + txtime_cfg.clockid = CLOCK_TAI; + txtime_cfg.flags = 0; + res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg, + sizeof(txtime_cfg)); + if (res < 0) { + pw_log_error("setsockopt(SO_TXTIME) failed: %m"); + res = -errno; + goto error_close; + } + } else { + struct packet_mreq mreq; + + res = bind(fd, (struct sockaddr *) &stream->sock_addr, sizeof(stream->sock_addr)); + if (res < 0) { + pw_log_error("bind() failed: %m"); + res = -errno; + goto error_close; + } + + spa_zero(mreq); + mreq.mr_ifindex = req.ifr_ifindex; + mreq.mr_type = PACKET_MR_MULTICAST; + mreq.mr_alen = ETH_ALEN; + memcpy(&mreq.mr_address, stream->addr, ETH_ALEN); + res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, + &mreq, sizeof(struct packet_mreq)); + + pw_log_info("join %s", avb_utils_format_addr(buf, 128, stream->addr)); + + if (res < 0) { + pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m"); + res = -errno; + goto error_close; + } + } + return fd; + +error_close: + close(fd); + return res; +} + +static ssize_t raw_stream_send(struct server *server, struct stream *stream, + struct msghdr *msg, int flags) +{ + return sendmsg(stream->source->fd, msg, flags); +} + +int avb_server_stream_setup_socket(struct server *server, struct stream *stream) +{ + return server->transport->stream_setup_socket(server, stream); +} + +ssize_t avb_server_stream_send(struct server *server, struct stream *stream, + struct msghdr *msg, int flags) +{ + return server->transport->stream_send(server, stream, msg, flags); +} + static void raw_transport_destroy(struct server *server) { struct impl *impl = server->impl; @@ -270,6 +366,8 @@ const struct avb_transport_ops avb_transport_raw = { .send_packet = raw_send_packet, .make_socket = raw_make_socket, .destroy = raw_transport_destroy, + .stream_setup_socket = raw_stream_setup_socket, + .stream_send = raw_stream_send, }; struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props) @@ -294,6 +392,7 @@ struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props) spa_hook_list_init(&server->listener_list); spa_list_init(&server->descriptors); + spa_list_init(&server->streams); server->debug_messages = false; diff --git a/src/modules/module-avb/internal.h b/src/modules/module-avb/internal.h index f3ddb172e..bb4961674 100644 --- a/src/modules/module-avb/internal.h +++ b/src/modules/module-avb/internal.h @@ -5,6 +5,8 @@ #ifndef AVB_INTERNAL_H #define AVB_INTERNAL_H +#include + #include #ifdef __cplusplus @@ -17,6 +19,8 @@ struct avb_mrp; #define AVB_TSN_ETH 0x22f0 #define AVB_BROADCAST_MAC { 0x91, 0xe0, 0xf0, 0x01, 0x00, 0x00 }; +struct stream; + struct avb_transport_ops { int (*setup)(struct server *server); int (*send_packet)(struct server *server, const uint8_t dest[6], @@ -24,6 +28,11 @@ struct avb_transport_ops { int (*make_socket)(struct server *server, uint16_t type, const uint8_t mac[6]); void (*destroy)(struct server *server); + + /* stream data plane ops */ + int (*stream_setup_socket)(struct server *server, struct stream *stream); + ssize_t (*stream_send)(struct server *server, struct stream *stream, + struct msghdr *msg, int flags); }; struct impl { @@ -95,6 +104,7 @@ struct server { struct spa_hook_list listener_list; struct spa_list descriptors; + struct spa_list streams; unsigned debug_messages:1; @@ -163,6 +173,10 @@ int avb_server_make_socket(struct server *server, uint16_t type, const uint8_t m int avb_server_send_packet(struct server *server, const uint8_t dest[6], uint16_t type, void *data, size_t size); +int avb_server_stream_setup_socket(struct server *server, struct stream *stream); +ssize_t avb_server_stream_send(struct server *server, struct stream *stream, + struct msghdr *msg, int flags); + struct aecp { struct server *server; struct spa_hook server_listener; diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index f7101bdf0..26a3a795b 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -116,9 +116,10 @@ static int flush_write(struct stream *stream, uint64_t current_time) p->timestamp = ptime; p->dbc = dbc; - n = sendmsg(stream->source->fd, &stream->msg, MSG_NOSIGNAL); + n = avb_server_stream_send(stream->server, stream, + &stream->msg, MSG_NOSIGNAL); if (n < 0 || n != (ssize_t)stream->pdu_size) { - pw_log_error("sendmsg() failed %zd != %zd: %m", + pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); } txtime += stream->pdu_period; @@ -331,6 +332,8 @@ struct stream *server_create_stream(struct server *server, struct stream *stream stream->talker_attr->attr.talker.rank = AVB_MSRP_RANK_DEFAULT; stream->talker_attr->attr.talker.accumulated_latency = htonl(95); + spa_list_append(&server->streams, &stream->link); + return stream; error_free_stream: @@ -348,82 +351,7 @@ void stream_destroy(struct stream *stream) static int setup_socket(struct stream *stream) { - struct server *server = stream->server; - int fd, res; - char buf[128]; - struct ifreq req; - - fd = socket(AF_PACKET, SOCK_RAW | SOCK_NONBLOCK, htons(ETH_P_ALL)); - if (fd < 0) { - pw_log_error("socket() failed: %m"); - return -errno; - } - - spa_zero(req); - snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname); - res = ioctl(fd, SIOCGIFINDEX, &req); - if (res < 0) { - pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname); - res = -errno; - goto error_close; - } - - spa_zero(stream->sock_addr); - stream->sock_addr.sll_family = AF_PACKET; - stream->sock_addr.sll_protocol = htons(ETH_P_TSN); - stream->sock_addr.sll_ifindex = req.ifr_ifindex; - - if (stream->direction == SPA_DIRECTION_OUTPUT) { - struct sock_txtime txtime_cfg; - - res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &stream->prio, - sizeof(stream->prio)); - if (res < 0) { - pw_log_error("setsockopt(SO_PRIORITY %d) failed: %m", stream->prio); - res = -errno; - goto error_close; - } - - txtime_cfg.clockid = CLOCK_TAI; - txtime_cfg.flags = 0; - res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg, - sizeof(txtime_cfg)); - if (res < 0) { - pw_log_error("setsockopt(SO_TXTIME) failed: %m"); - res = -errno; - goto error_close; - } - } else { - struct packet_mreq mreq; - - res = bind(fd, (struct sockaddr *) &stream->sock_addr, sizeof(stream->sock_addr)); - if (res < 0) { - pw_log_error("bind() failed: %m"); - res = -errno; - goto error_close; - } - - spa_zero(mreq); - mreq.mr_ifindex = req.ifr_ifindex; - mreq.mr_type = PACKET_MR_MULTICAST; - mreq.mr_alen = ETH_ALEN; - memcpy(&mreq.mr_address, stream->addr, ETH_ALEN); - res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, - &mreq, sizeof(struct packet_mreq)); - - pw_log_info("join %s", avb_utils_format_addr(buf, 128, stream->addr)); - - if (res < 0) { - pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m"); - res = -errno; - goto error_close; - } - } - return fd; - -error_close: - close(fd); - return res; + return avb_server_stream_setup_socket(stream->server, stream); } static void handle_iec61883_packet(struct stream *stream, @@ -548,3 +476,24 @@ int stream_deactivate(struct stream *stream, uint64_t now) } return 0; } + +int stream_activate_virtual(struct stream *stream, uint16_t index) +{ + struct server *server = stream->server; + int fd; + + if (stream->source == NULL) { + fd = setup_socket(stream); + if (fd < 0) + return fd; + + stream->source = pw_loop_add_io(server->impl->loop, fd, + SPA_IO_IN, true, on_socket_data, stream); + if (stream->source == NULL) { + close(fd); + return -errno; + } + } + pw_stream_set_active(stream->stream, true); + return 0; +} diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h index f650cc216..4cc02ddd3 100644 --- a/src/modules/module-avb/stream.h +++ b/src/modules/module-avb/stream.h @@ -78,5 +78,6 @@ void stream_destroy(struct stream *stream); int stream_activate(struct stream *stream, uint16_t index, uint64_t now); int stream_deactivate(struct stream *stream, uint64_t now); +int stream_activate_virtual(struct stream *stream, uint16_t index); #endif /* AVB_STREAM_H */