mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2026-04-09 08:21:08 -04:00
module-avb: extend transport abstraction to stream data path
Add stream_setup_socket and stream_send ops to avb_transport_ops so the stream data plane can use the same pluggable transport backend as the control plane. Move the raw AF_PACKET socket setup from stream.c into avdecc.c as raw_stream_setup_socket(), and add a raw_stream_send() wrapper around sendmsg(). Add a stream list (spa_list) to struct server so streams can be iterated after creation, and add stream_activate_virtual() for lightweight activation without MRP/MAAP network operations. Implement loopback stream ops: eventfd-based dummy sockets and no-op send that discards audio data. This enables virtual AVB nodes that work without network hardware or privileges. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ffa855d76e
commit
14310e66fe
5 changed files with 176 additions and 78 deletions
|
|
@ -10,6 +10,8 @@
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/eventfd.h>
|
#include <sys/eventfd.h>
|
||||||
|
#include <linux/if_ether.h>
|
||||||
|
#include <linux/if_packet.h>
|
||||||
|
|
||||||
#include "internal.h"
|
#include "internal.h"
|
||||||
#include "packets.h"
|
#include "packets.h"
|
||||||
|
|
@ -104,11 +106,44 @@ static inline void avb_loopback_destroy(struct server *server)
|
||||||
server->transport_data = NULL;
|
server->transport_data = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a dummy stream socket using eventfd.
|
||||||
|
* No AF_PACKET, no ioctls, no privileges needed.
|
||||||
|
*/
|
||||||
|
static inline int avb_loopback_stream_setup_socket(struct server *server,
|
||||||
|
struct stream *stream)
|
||||||
|
{
|
||||||
|
int fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
|
||||||
|
if (fd < 0)
|
||||||
|
return -errno;
|
||||||
|
|
||||||
|
spa_zero(stream->sock_addr);
|
||||||
|
stream->sock_addr.sll_family = AF_PACKET;
|
||||||
|
stream->sock_addr.sll_halen = ETH_ALEN;
|
||||||
|
|
||||||
|
return fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* No-op stream send — pretend the send succeeded.
|
||||||
|
* Audio data is consumed from the ringbuffer but goes nowhere.
|
||||||
|
*/
|
||||||
|
static inline ssize_t avb_loopback_stream_send(struct server *server,
|
||||||
|
struct stream *stream, struct msghdr *msg, int flags)
|
||||||
|
{
|
||||||
|
ssize_t total = 0;
|
||||||
|
for (size_t i = 0; i < msg->msg_iovlen; i++)
|
||||||
|
total += msg->msg_iov[i].iov_len;
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
static const struct avb_transport_ops avb_transport_loopback = {
|
static const struct avb_transport_ops avb_transport_loopback = {
|
||||||
.setup = avb_loopback_setup,
|
.setup = avb_loopback_setup,
|
||||||
.send_packet = avb_loopback_send_packet,
|
.send_packet = avb_loopback_send_packet,
|
||||||
.make_socket = avb_loopback_make_socket,
|
.make_socket = avb_loopback_make_socket,
|
||||||
.destroy = avb_loopback_destroy,
|
.destroy = avb_loopback_destroy,
|
||||||
|
.stream_setup_socket = avb_loopback_stream_setup_socket,
|
||||||
|
.stream_send = avb_loopback_stream_send,
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Get the number of captured sent packets */
|
/** Get the number of captured sent packets */
|
||||||
|
|
|
||||||
|
|
@ -257,6 +257,102 @@ error_no_source:
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int raw_stream_setup_socket(struct server *server, struct stream *stream)
|
||||||
|
{
|
||||||
|
int fd, res;
|
||||||
|
char buf[128];
|
||||||
|
struct ifreq req;
|
||||||
|
|
||||||
|
fd = socket(AF_PACKET, SOCK_RAW | SOCK_NONBLOCK, htons(ETH_P_ALL));
|
||||||
|
if (fd < 0) {
|
||||||
|
pw_log_error("socket() failed: %m");
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
spa_zero(req);
|
||||||
|
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname);
|
||||||
|
res = ioctl(fd, SIOCGIFINDEX, &req);
|
||||||
|
if (res < 0) {
|
||||||
|
pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname);
|
||||||
|
res = -errno;
|
||||||
|
goto error_close;
|
||||||
|
}
|
||||||
|
|
||||||
|
spa_zero(stream->sock_addr);
|
||||||
|
stream->sock_addr.sll_family = AF_PACKET;
|
||||||
|
stream->sock_addr.sll_protocol = htons(ETH_P_TSN);
|
||||||
|
stream->sock_addr.sll_ifindex = req.ifr_ifindex;
|
||||||
|
|
||||||
|
if (stream->direction == SPA_DIRECTION_OUTPUT) {
|
||||||
|
struct sock_txtime txtime_cfg;
|
||||||
|
|
||||||
|
res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &stream->prio,
|
||||||
|
sizeof(stream->prio));
|
||||||
|
if (res < 0) {
|
||||||
|
pw_log_error("setsockopt(SO_PRIORITY %d) failed: %m", stream->prio);
|
||||||
|
res = -errno;
|
||||||
|
goto error_close;
|
||||||
|
}
|
||||||
|
|
||||||
|
txtime_cfg.clockid = CLOCK_TAI;
|
||||||
|
txtime_cfg.flags = 0;
|
||||||
|
res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg,
|
||||||
|
sizeof(txtime_cfg));
|
||||||
|
if (res < 0) {
|
||||||
|
pw_log_error("setsockopt(SO_TXTIME) failed: %m");
|
||||||
|
res = -errno;
|
||||||
|
goto error_close;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
struct packet_mreq mreq;
|
||||||
|
|
||||||
|
res = bind(fd, (struct sockaddr *) &stream->sock_addr, sizeof(stream->sock_addr));
|
||||||
|
if (res < 0) {
|
||||||
|
pw_log_error("bind() failed: %m");
|
||||||
|
res = -errno;
|
||||||
|
goto error_close;
|
||||||
|
}
|
||||||
|
|
||||||
|
spa_zero(mreq);
|
||||||
|
mreq.mr_ifindex = req.ifr_ifindex;
|
||||||
|
mreq.mr_type = PACKET_MR_MULTICAST;
|
||||||
|
mreq.mr_alen = ETH_ALEN;
|
||||||
|
memcpy(&mreq.mr_address, stream->addr, ETH_ALEN);
|
||||||
|
res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
|
||||||
|
&mreq, sizeof(struct packet_mreq));
|
||||||
|
|
||||||
|
pw_log_info("join %s", avb_utils_format_addr(buf, 128, stream->addr));
|
||||||
|
|
||||||
|
if (res < 0) {
|
||||||
|
pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m");
|
||||||
|
res = -errno;
|
||||||
|
goto error_close;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fd;
|
||||||
|
|
||||||
|
error_close:
|
||||||
|
close(fd);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ssize_t raw_stream_send(struct server *server, struct stream *stream,
|
||||||
|
struct msghdr *msg, int flags)
|
||||||
|
{
|
||||||
|
return sendmsg(stream->source->fd, msg, flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
int avb_server_stream_setup_socket(struct server *server, struct stream *stream)
|
||||||
|
{
|
||||||
|
return server->transport->stream_setup_socket(server, stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize_t avb_server_stream_send(struct server *server, struct stream *stream,
|
||||||
|
struct msghdr *msg, int flags)
|
||||||
|
{
|
||||||
|
return server->transport->stream_send(server, stream, msg, flags);
|
||||||
|
}
|
||||||
|
|
||||||
static void raw_transport_destroy(struct server *server)
|
static void raw_transport_destroy(struct server *server)
|
||||||
{
|
{
|
||||||
struct impl *impl = server->impl;
|
struct impl *impl = server->impl;
|
||||||
|
|
@ -270,6 +366,8 @@ const struct avb_transport_ops avb_transport_raw = {
|
||||||
.send_packet = raw_send_packet,
|
.send_packet = raw_send_packet,
|
||||||
.make_socket = raw_make_socket,
|
.make_socket = raw_make_socket,
|
||||||
.destroy = raw_transport_destroy,
|
.destroy = raw_transport_destroy,
|
||||||
|
.stream_setup_socket = raw_stream_setup_socket,
|
||||||
|
.stream_send = raw_stream_send,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props)
|
struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props)
|
||||||
|
|
@ -294,6 +392,7 @@ struct server *avdecc_server_new(struct impl *impl, struct spa_dict *props)
|
||||||
|
|
||||||
spa_hook_list_init(&server->listener_list);
|
spa_hook_list_init(&server->listener_list);
|
||||||
spa_list_init(&server->descriptors);
|
spa_list_init(&server->descriptors);
|
||||||
|
spa_list_init(&server->streams);
|
||||||
|
|
||||||
server->debug_messages = false;
|
server->debug_messages = false;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@
|
||||||
#ifndef AVB_INTERNAL_H
|
#ifndef AVB_INTERNAL_H
|
||||||
#define AVB_INTERNAL_H
|
#define AVB_INTERNAL_H
|
||||||
|
|
||||||
|
#include <sys/socket.h>
|
||||||
|
|
||||||
#include <pipewire/pipewire.h>
|
#include <pipewire/pipewire.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
@ -17,6 +19,8 @@ struct avb_mrp;
|
||||||
#define AVB_TSN_ETH 0x22f0
|
#define AVB_TSN_ETH 0x22f0
|
||||||
#define AVB_BROADCAST_MAC { 0x91, 0xe0, 0xf0, 0x01, 0x00, 0x00 };
|
#define AVB_BROADCAST_MAC { 0x91, 0xe0, 0xf0, 0x01, 0x00, 0x00 };
|
||||||
|
|
||||||
|
struct stream;
|
||||||
|
|
||||||
struct avb_transport_ops {
|
struct avb_transport_ops {
|
||||||
int (*setup)(struct server *server);
|
int (*setup)(struct server *server);
|
||||||
int (*send_packet)(struct server *server, const uint8_t dest[6],
|
int (*send_packet)(struct server *server, const uint8_t dest[6],
|
||||||
|
|
@ -24,6 +28,11 @@ struct avb_transport_ops {
|
||||||
int (*make_socket)(struct server *server, uint16_t type,
|
int (*make_socket)(struct server *server, uint16_t type,
|
||||||
const uint8_t mac[6]);
|
const uint8_t mac[6]);
|
||||||
void (*destroy)(struct server *server);
|
void (*destroy)(struct server *server);
|
||||||
|
|
||||||
|
/* stream data plane ops */
|
||||||
|
int (*stream_setup_socket)(struct server *server, struct stream *stream);
|
||||||
|
ssize_t (*stream_send)(struct server *server, struct stream *stream,
|
||||||
|
struct msghdr *msg, int flags);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct impl {
|
struct impl {
|
||||||
|
|
@ -95,6 +104,7 @@ struct server {
|
||||||
struct spa_hook_list listener_list;
|
struct spa_hook_list listener_list;
|
||||||
|
|
||||||
struct spa_list descriptors;
|
struct spa_list descriptors;
|
||||||
|
struct spa_list streams;
|
||||||
|
|
||||||
unsigned debug_messages:1;
|
unsigned debug_messages:1;
|
||||||
|
|
||||||
|
|
@ -163,6 +173,10 @@ int avb_server_make_socket(struct server *server, uint16_t type, const uint8_t m
|
||||||
int avb_server_send_packet(struct server *server, const uint8_t dest[6],
|
int avb_server_send_packet(struct server *server, const uint8_t dest[6],
|
||||||
uint16_t type, void *data, size_t size);
|
uint16_t type, void *data, size_t size);
|
||||||
|
|
||||||
|
int avb_server_stream_setup_socket(struct server *server, struct stream *stream);
|
||||||
|
ssize_t avb_server_stream_send(struct server *server, struct stream *stream,
|
||||||
|
struct msghdr *msg, int flags);
|
||||||
|
|
||||||
struct aecp {
|
struct aecp {
|
||||||
struct server *server;
|
struct server *server;
|
||||||
struct spa_hook server_listener;
|
struct spa_hook server_listener;
|
||||||
|
|
|
||||||
|
|
@ -116,9 +116,10 @@ static int flush_write(struct stream *stream, uint64_t current_time)
|
||||||
p->timestamp = ptime;
|
p->timestamp = ptime;
|
||||||
p->dbc = dbc;
|
p->dbc = dbc;
|
||||||
|
|
||||||
n = sendmsg(stream->source->fd, &stream->msg, MSG_NOSIGNAL);
|
n = avb_server_stream_send(stream->server, stream,
|
||||||
|
&stream->msg, MSG_NOSIGNAL);
|
||||||
if (n < 0 || n != (ssize_t)stream->pdu_size) {
|
if (n < 0 || n != (ssize_t)stream->pdu_size) {
|
||||||
pw_log_error("sendmsg() failed %zd != %zd: %m",
|
pw_log_error("stream send failed %zd != %zd: %m",
|
||||||
n, stream->pdu_size);
|
n, stream->pdu_size);
|
||||||
}
|
}
|
||||||
txtime += stream->pdu_period;
|
txtime += stream->pdu_period;
|
||||||
|
|
@ -331,6 +332,8 @@ struct stream *server_create_stream(struct server *server, struct stream *stream
|
||||||
stream->talker_attr->attr.talker.rank = AVB_MSRP_RANK_DEFAULT;
|
stream->talker_attr->attr.talker.rank = AVB_MSRP_RANK_DEFAULT;
|
||||||
stream->talker_attr->attr.talker.accumulated_latency = htonl(95);
|
stream->talker_attr->attr.talker.accumulated_latency = htonl(95);
|
||||||
|
|
||||||
|
spa_list_append(&server->streams, &stream->link);
|
||||||
|
|
||||||
return stream;
|
return stream;
|
||||||
|
|
||||||
error_free_stream:
|
error_free_stream:
|
||||||
|
|
@ -348,82 +351,7 @@ void stream_destroy(struct stream *stream)
|
||||||
|
|
||||||
static int setup_socket(struct stream *stream)
|
static int setup_socket(struct stream *stream)
|
||||||
{
|
{
|
||||||
struct server *server = stream->server;
|
return avb_server_stream_setup_socket(stream->server, stream);
|
||||||
int fd, res;
|
|
||||||
char buf[128];
|
|
||||||
struct ifreq req;
|
|
||||||
|
|
||||||
fd = socket(AF_PACKET, SOCK_RAW | SOCK_NONBLOCK, htons(ETH_P_ALL));
|
|
||||||
if (fd < 0) {
|
|
||||||
pw_log_error("socket() failed: %m");
|
|
||||||
return -errno;
|
|
||||||
}
|
|
||||||
|
|
||||||
spa_zero(req);
|
|
||||||
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname);
|
|
||||||
res = ioctl(fd, SIOCGIFINDEX, &req);
|
|
||||||
if (res < 0) {
|
|
||||||
pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname);
|
|
||||||
res = -errno;
|
|
||||||
goto error_close;
|
|
||||||
}
|
|
||||||
|
|
||||||
spa_zero(stream->sock_addr);
|
|
||||||
stream->sock_addr.sll_family = AF_PACKET;
|
|
||||||
stream->sock_addr.sll_protocol = htons(ETH_P_TSN);
|
|
||||||
stream->sock_addr.sll_ifindex = req.ifr_ifindex;
|
|
||||||
|
|
||||||
if (stream->direction == SPA_DIRECTION_OUTPUT) {
|
|
||||||
struct sock_txtime txtime_cfg;
|
|
||||||
|
|
||||||
res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &stream->prio,
|
|
||||||
sizeof(stream->prio));
|
|
||||||
if (res < 0) {
|
|
||||||
pw_log_error("setsockopt(SO_PRIORITY %d) failed: %m", stream->prio);
|
|
||||||
res = -errno;
|
|
||||||
goto error_close;
|
|
||||||
}
|
|
||||||
|
|
||||||
txtime_cfg.clockid = CLOCK_TAI;
|
|
||||||
txtime_cfg.flags = 0;
|
|
||||||
res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg,
|
|
||||||
sizeof(txtime_cfg));
|
|
||||||
if (res < 0) {
|
|
||||||
pw_log_error("setsockopt(SO_TXTIME) failed: %m");
|
|
||||||
res = -errno;
|
|
||||||
goto error_close;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
struct packet_mreq mreq;
|
|
||||||
|
|
||||||
res = bind(fd, (struct sockaddr *) &stream->sock_addr, sizeof(stream->sock_addr));
|
|
||||||
if (res < 0) {
|
|
||||||
pw_log_error("bind() failed: %m");
|
|
||||||
res = -errno;
|
|
||||||
goto error_close;
|
|
||||||
}
|
|
||||||
|
|
||||||
spa_zero(mreq);
|
|
||||||
mreq.mr_ifindex = req.ifr_ifindex;
|
|
||||||
mreq.mr_type = PACKET_MR_MULTICAST;
|
|
||||||
mreq.mr_alen = ETH_ALEN;
|
|
||||||
memcpy(&mreq.mr_address, stream->addr, ETH_ALEN);
|
|
||||||
res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
|
|
||||||
&mreq, sizeof(struct packet_mreq));
|
|
||||||
|
|
||||||
pw_log_info("join %s", avb_utils_format_addr(buf, 128, stream->addr));
|
|
||||||
|
|
||||||
if (res < 0) {
|
|
||||||
pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m");
|
|
||||||
res = -errno;
|
|
||||||
goto error_close;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fd;
|
|
||||||
|
|
||||||
error_close:
|
|
||||||
close(fd);
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void handle_iec61883_packet(struct stream *stream,
|
static void handle_iec61883_packet(struct stream *stream,
|
||||||
|
|
@ -548,3 +476,24 @@ int stream_deactivate(struct stream *stream, uint64_t now)
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int stream_activate_virtual(struct stream *stream, uint16_t index)
|
||||||
|
{
|
||||||
|
struct server *server = stream->server;
|
||||||
|
int fd;
|
||||||
|
|
||||||
|
if (stream->source == NULL) {
|
||||||
|
fd = setup_socket(stream);
|
||||||
|
if (fd < 0)
|
||||||
|
return fd;
|
||||||
|
|
||||||
|
stream->source = pw_loop_add_io(server->impl->loop, fd,
|
||||||
|
SPA_IO_IN, true, on_socket_data, stream);
|
||||||
|
if (stream->source == NULL) {
|
||||||
|
close(fd);
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pw_stream_set_active(stream->stream, true);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -78,5 +78,6 @@ void stream_destroy(struct stream *stream);
|
||||||
|
|
||||||
int stream_activate(struct stream *stream, uint16_t index, uint64_t now);
|
int stream_activate(struct stream *stream, uint16_t index, uint64_t now);
|
||||||
int stream_deactivate(struct stream *stream, uint64_t now);
|
int stream_deactivate(struct stream *stream, uint64_t now);
|
||||||
|
int stream_activate_virtual(struct stream *stream, uint16_t index);
|
||||||
|
|
||||||
#endif /* AVB_STREAM_H */
|
#endif /* AVB_STREAM_H */
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue