From e0d6b2bb4f535a74471caa64e4f8fa56ef2c7f13 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 5 Apr 2022 18:10:27 +0200 Subject: [PATCH] avb: implement streams Add in input/output stream, setup the talker/listeners. Implement IEC61883 audio packets, send and receive data. Implement talker encoding. With this, audio can be sent and received from MOTU M64. --- src/daemon/pipewire-avb.conf.in | 3 +- src/modules/meson.build | 3 +- src/modules/module-avb/aaf.h | 37 -- src/modules/module-avb/acmp.c | 70 +++- src/modules/module-avb/avb.c | 15 + src/modules/module-avb/avdecc.c | 8 +- src/modules/module-avb/iec61883.h | 112 ++++++ src/modules/module-avb/internal.h | 22 +- src/modules/module-avb/maap.h | 4 +- src/modules/module-avb/mrp.c | 43 ++- src/modules/module-avb/mrp.h | 4 + src/modules/module-avb/msrp.c | 76 ++-- src/modules/module-avb/mvrp.c | 107 +++++- src/modules/module-avb/stream.c | 572 ++++++++++++++++++++++++++++++ src/modules/module-avb/stream.h | 101 ++++++ 15 files changed, 1083 insertions(+), 94 deletions(-) create mode 100644 src/modules/module-avb/iec61883.h create mode 100644 src/modules/module-avb/stream.c create mode 100644 src/modules/module-avb/stream.h diff --git a/src/daemon/pipewire-avb.conf.in b/src/daemon/pipewire-avb.conf.in index 96dfbd579..d626ed9a8 100644 --- a/src/daemon/pipewire-avb.conf.in +++ b/src/daemon/pipewire-avb.conf.in @@ -33,7 +33,8 @@ context.modules = [ } flags = [ ifexists nofail ] } - { name = libpipewire-module-client-device } + { name = libpipewire-module-protocol-native } + { name = libpipewire-module-client-node } { name = libpipewire-module-adapter } { name = libpipewire-module-avb args = { diff --git a/src/modules/meson.build b/src/modules/meson.build index 04d59d889..a21acddfb 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -533,7 +533,8 @@ pipewire_module_avb = shared_library('pipewire-module-avb', 'module-avb/mrp.c', 'module-avb/msrp.c', 'module-avb/mvrp.c', - 'module-avb/srp.c' + 'module-avb/srp.c', + 'module-avb/stream.c' ], include_directories : [configinc], install : true, diff --git a/src/modules/module-avb/aaf.h b/src/modules/module-avb/aaf.h index 3d6273f4b..6fa5acb8f 100644 --- a/src/modules/module-avb/aaf.h +++ b/src/modules/module-avb/aaf.h @@ -100,41 +100,4 @@ struct avb_packet_aaf { uint8_t payload[0]; } __attribute__ ((__packed__)); -#define AVB_PACKET_AAF_SET_SUBTYPE(p,v) ((p)->subtype = (v)) -#define AVB_PACKET_AAF_SET_SV(p,v) ((p)->sv = (v)) -#define AVB_PACKET_AAF_SET_VERSION(p,v) ((p)->version = (v)) -#define AVB_PACKET_AAF_SET_MR(p,v) ((p)->mr = (v)) -#define AVB_PACKET_AAF_SET_GV(p,v) ((p)->gv = (v)) -#define AVB_PACKET_AAF_SET_TV(p,v) ((p)->tv = (v)) -#define AVB_PACKET_AAF_SET_SEQ_NUM(p,v) ((p)->seq_num = (v)) -#define AVB_PACKET_AAF_SET_TU(p,v) ((p)->tu = (v)) -#define AVB_PACKET_AAF_SET_STREAM_ID(p,v) ((p)->stream_id = htobe64(v)) -#define AVB_PACKET_AAF_SET_TIMESTAMP(p,v) ((p)->timestamp = htonl(v)) -#define AVB_PACKET_AAF_SET_DATA_LEN(p,v) ((p)->data_len = htons(v)) -#define AVB_PACKET_AAF_SET_FORMAT(p,v) ((p)->format = (v)) -#define AVB_PACKET_AAF_SET_NSR(p,v) ((p)->nsr = (v)) -#define AVB_PACKET_AAF_SET_CHAN_PER_FRAME(p,v) ((p)->chan_per_frame = (v)) -#define AVB_PACKET_AAF_SET_BIT_DEPTH(p,v) ((p)->bit_depth = (v)) -#define AVB_PACKET_AAF_SET_SP(p,v) ((p)->sp = (v)) -#define AVB_PACKET_AAF_SET_EVENT(p,v) ((p)->event = (v)) - -#define AVB_PACKET_AAF_GET_SUBTYPE(p) ((p)->subtype) -#define AVB_PACKET_AAF_GET_SV(p) ((p)->sv) -#define AVB_PACKET_AAF_GET_VERSION(p) ((p)->version) -#define AVB_PACKET_AAF_GET_MR(p) ((p)->mr) -#define AVB_PACKET_AAF_GET_GV(p) ((p)->gv) -#define AVB_PACKET_AAF_GET_TV(p) ((p)->tv) -#define AVB_PACKET_AAF_GET_SEQ_NUM(p) ((p)->seq_num) -#define AVB_PACKET_AAF_GET_TU(p) ((p)->tu) -#define AVB_PACKET_AAF_GET_STREAM_ID(p) be64toh((p)->stream_id) -#define AVB_PACKET_AAF_GET_TIMESTAMP(p) ntohl((p)->timestamp) -#define AVB_PACKET_AAF_GET_DATA_LEN(p) ntohs((p)->data_len) -#define AVB_PACKET_AAF_GET_FORMAT(p) ((p)->format) -#define AVB_PACKET_AAF_GET_NSR(p) ((p)->nsr) -#define AVB_PACKET_AAF_GET_CHAN_PER_FRAME(p) ((p)->chan_per_frame) -#define AVB_PACKET_AAF_GET_BIT_DEPTH(p) ((p)->bit_depth) -#define AVB_PACKET_AAF_GET_SP(p) ((p)->sp) -#define AVB_PACKET_AAF_GET_EVENT(p) ((p)->event) - - #endif /* AVB_AAF_H */ diff --git a/src/modules/module-avb/acmp.c b/src/modules/module-avb/acmp.c index 8de2c4b67..3683a8d52 100644 --- a/src/modules/module-avb/acmp.c +++ b/src/modules/module-avb/acmp.c @@ -30,6 +30,7 @@ #include "acmp.h" #include "msrp.h" #include "internal.h" +#include "stream.h" static const uint8_t mac[6] = AVB_BROADCAST_MAC; @@ -53,9 +54,6 @@ struct acmp { #define PENDING_CONTROLLER 2 struct spa_list pending[3]; uint16_t sequence_id[3]; - - struct avb_msrp_attribute *listener_attr; - struct avb_msrp_attribute *talker_attr; }; static void *pending_new(struct acmp *acmp, uint32_t type, uint64_t now, uint32_t timeout_ms, @@ -130,14 +128,33 @@ static int handle_connect_tx_command(struct acmp *acmp, uint64_t now, const void uint8_t buf[len]; const struct avb_packet_acmp *p = m; struct avb_packet_acmp *reply = (struct avb_packet_acmp*)buf; + int status = AVB_ACMP_STATUS_SUCCESS; + uint8_t mac[6] = { 0x91, 0xe0, 0xf0, 0x00, 0x10, 0x00 }; + struct stream *stream; if (be64toh(p->talker_guid) != server->entity_id) return 0; memcpy(reply, m, len); - AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_RESPONSE); - AVB_PACKET_ACMP_SET_STATUS(reply, AVB_ACMP_STATUS_SUCCESS); + stream = server_find_stream(server, SPA_DIRECTION_OUTPUT, + reply->talker_unique_id); + if (stream == NULL) { + status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX; + goto done; + } + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_RESPONSE); + reply->stream_id = htobe64(stream->id); + + memcpy(stream->addr, mac, 6); + + memcpy(reply->stream_dest_mac, stream->addr, 6); + reply->connection_count = htons(1); + reply->stream_vlan_id = htons(2); + stream_activate(stream, now); + +done: + AVB_PACKET_ACMP_SET_STATUS(reply, status); return avb_server_send_packet(server, reply->hdr.eth.dest, AVB_TSN_ETH, reply, len); } @@ -149,6 +166,7 @@ static int handle_connect_tx_response(struct acmp *acmp, uint64_t now, const voi struct avb_packet_acmp *reply; struct pending *pending; uint16_t sequence_id; + struct stream *stream; int res; if (be64toh(resp->listener_guid) != server->entity_id) @@ -165,13 +183,14 @@ static int handle_connect_tx_response(struct acmp *acmp, uint64_t now, const voi reply->sequence_id = htons(pending->old_sequence_id); AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_RX_RESPONSE); - acmp->listener_attr->attr.listener.stream_id = reply->stream_id; - acmp->listener_attr->param = AVB_MSRP_LISTENER_PARAM_READY; - avb_mrp_attribute_begin(acmp->listener_attr->mrp, now); - avb_mrp_attribute_join(acmp->listener_attr->mrp, now, true); + stream = server_find_stream(server, SPA_DIRECTION_INPUT, + reply->listener_unique_id); + if (stream == NULL) + return 0; - acmp->talker_attr->attr.talker.stream_id = reply->stream_id; - avb_mrp_attribute_begin(acmp->talker_attr->mrp, now); + stream->peer_id = be64toh(reply->stream_id); + memcpy(stream->addr, reply->stream_dest_mac, 6); + stream_activate(stream, now); res = avb_server_send_packet(server, reply->hdr.eth.dest, AVB_TSN_ETH, reply, pending->size); @@ -187,14 +206,26 @@ static int handle_disconnect_tx_command(struct acmp *acmp, uint64_t now, const v uint8_t buf[len]; const struct avb_packet_acmp *p = m; struct avb_packet_acmp *reply = (struct avb_packet_acmp*)buf; + int status = AVB_ACMP_STATUS_SUCCESS; + struct stream *stream; if (be64toh(p->talker_guid) != server->entity_id) return 0; memcpy(reply, m, len); - AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_TX_RESPONSE); - AVB_PACKET_ACMP_SET_STATUS(reply, AVB_ACMP_STATUS_SUCCESS); + stream = server_find_stream(server, SPA_DIRECTION_OUTPUT, + reply->talker_unique_id); + if (stream == NULL) { + status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX; + goto done; + } + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_TX_RESPONSE); + + stream_deactivate(stream, now); + +done: + AVB_PACKET_ACMP_SET_STATUS(reply, status); return avb_server_send_packet(server, reply->hdr.eth.dest, AVB_TSN_ETH, reply, len); } @@ -206,6 +237,7 @@ static int handle_disconnect_tx_response(struct acmp *acmp, uint64_t now, const struct avb_packet_acmp *reply; struct pending *pending; uint16_t sequence_id; + struct stream *stream; int res; if (be64toh(resp->listener_guid) != server->entity_id) @@ -222,7 +254,12 @@ static int handle_disconnect_tx_response(struct acmp *acmp, uint64_t now, const reply->sequence_id = htons(pending->old_sequence_id); AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_RX_RESPONSE); - avb_mrp_attribute_leave(acmp->listener_attr->mrp, now); + stream = server_find_stream(server, SPA_DIRECTION_INPUT, + reply->listener_unique_id); + if (stream == NULL) + return 0; + + stream_deactivate(stream, now); res = avb_server_send_packet(server, reply->hdr.eth.dest, AVB_TSN_ETH, reply, pending->size); @@ -423,11 +460,6 @@ struct avb_acmp *avb_acmp_register(struct server *server) spa_list_init(&acmp->pending[PENDING_LISTENER]); spa_list_init(&acmp->pending[PENDING_CONTROLLER]); - acmp->listener_attr = avb_msrp_attribute_new(server->msrp, - AVB_MSRP_ATTRIBUTE_TYPE_LISTENER); - acmp->talker_attr = avb_msrp_attribute_new(server->msrp, - AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); - avdecc_server_add_listener(server, &acmp->server_listener, &server_events, acmp); return (struct avb_acmp*)acmp; diff --git a/src/modules/module-avb/avb.c b/src/modules/module-avb/avb.c index 29560c11d..8b589c87a 100644 --- a/src/modules/module-avb/avb.c +++ b/src/modules/module-avb/avb.c @@ -59,6 +59,21 @@ struct pw_avb *pw_avb_new(struct pw_context *context, impl->context = context; impl->loop = pw_context_get_main_loop(context); impl->props = props; + impl->core = pw_context_get_object(context, PW_TYPE_INTERFACE_Core); + if (impl->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + impl->core = pw_context_connect(context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + impl->do_disconnect = true; + } + if (impl->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto error_free; + } impl->work_queue = pw_context_get_work_queue(context); diff --git a/src/modules/module-avb/avdecc.c b/src/modules/module-avb/avdecc.c index 4b4fbc39e..6adf84edc 100644 --- a/src/modules/module-avb/avdecc.c +++ b/src/modules/module-avb/avdecc.c @@ -40,6 +40,7 @@ #include "avb.h" #include "packets.h" #include "internal.h" +#include "stream.h" #include "acmp.h" #include "adp.h" #include "aecp.h" @@ -270,6 +271,7 @@ struct server *avdecc_server_new(struct impl *impl, const char *ifname, struct s server->ifname = strdup(ifname); spa_hook_list_init(&server->listener_list); spa_list_init(&server->descriptors); + spa_list_init(&server->streams); server->debug_messages = false; @@ -299,10 +301,8 @@ struct server *avdecc_server_new(struct impl *impl, const char *ifname, struct s avb_mrp_attribute_begin(server->domain_attr->mrp, 0); avb_mrp_attribute_join(server->domain_attr->mrp, 0, true); - server->listener_attr = avb_msrp_attribute_new(server->msrp, - AVB_MSRP_ATTRIBUTE_TYPE_LISTENER); - server->listener_attr->attr.listener.stream_id = htobe64(0); - avb_mrp_attribute_begin(server->listener_attr->mrp, 0); + server_create_stream(server, SPA_DIRECTION_INPUT, 0); + server_create_stream(server, SPA_DIRECTION_OUTPUT, 0); return server; diff --git a/src/modules/module-avb/iec61883.h b/src/modules/module-avb/iec61883.h new file mode 100644 index 000000000..be438be88 --- /dev/null +++ b/src/modules/module-avb/iec61883.h @@ -0,0 +1,112 @@ +/* AVB support + * + * Copyright © 2022 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef AVB_IEC61883_H +#define AVB_IEC61883_H + +#include "packets.h" + +struct avb_packet_iec61883 { + struct avb_ethernet_header eth; + uint32_t vlan; + uint8_t subtype; +#if __BYTE_ORDER == __BIG_ENDIAN + unsigned sv:1; + unsigned version:3; + unsigned mr:1; + unsigned _r1:1; + unsigned gv:1; + unsigned tv:1; + + uint8_t seq_number; + + unsigned _r2:7; + unsigned tu:1; +#elif __BYTE_ORDER == __LITTLE_ENDIAN + unsigned tv:1; + unsigned gv:1; + unsigned _r1:1; + unsigned mr:1; + unsigned version:3; + unsigned sv:1; + + uint8_t seq_num; + + unsigned tu:1; + unsigned _r2:7; +#endif + uint64_t stream_id; + uint32_t timestamp; + uint32_t gateway_info; + uint16_t data_len; +#if __BYTE_ORDER == __BIG_ENDIAN + uint8_t tag:2; + uint8_t channel:6; + + uint8_t tcode:4; + uint8_t app:4; + + uint8_t qi1:2; /* CIP Quadlet Indicator 1 */ + uint8_t sid:6; /* CIP Source ID */ + + uint8_t dbs; /* CIP Data Block Size */ + + uint8_t fn:2; /* CIP Fraction Number */ + uint8_t qpc:3; /* CIP Quadlet Padding Count */ + uint8_t sph:1; /* CIP Source Packet Header */ + uint8_t _r3:2; + + uint8_t dbc; /* CIP Data Block Continuity */ + + uint8_t qi2:2; /* CIP Quadlet Indicator 2 */ + uint8_t format_id:6; /* CIP Format ID */ +#elif __BYTE_ORDER == __LITTLE_ENDIAN + uint8_t channel:6; + uint8_t tag:2; + + uint8_t app:4; + uint8_t tcode:4; + + uint8_t sid:6; /* CIP Source ID */ + uint8_t qi1:2; /* CIP Quadlet Indicator 1 */ + + uint8_t dbs; /* CIP Data Block Size */ + + uint8_t _r3:2; + uint8_t sph:1; /* CIP Source Packet Header */ + uint8_t qpc:3; /* CIP Quadlet Padding Count */ + uint8_t fn:2; /* CIP Fraction Number */ + + uint8_t dbc; /* CIP Data Block Continuity */ + + uint8_t format_id:6; /* CIP Format ID */ + uint8_t qi2:2; /* CIP Quadlet Indicator 2 */ +#endif + uint8_t fdf; /* CIP Format Dependent Field */ + uint16_t syt; + + uint8_t payload[0]; +} __attribute__ ((__packed__)); + +#endif /* AVB_IEC61883_H */ diff --git a/src/modules/module-avb/internal.h b/src/modules/module-avb/internal.h index 731bb3b2e..fd3e40be9 100644 --- a/src/modules/module-avb/internal.h +++ b/src/modules/module-avb/internal.h @@ -31,6 +31,9 @@ extern "C" { #include +struct server; +struct avb_mrp; + #define AVB_TSN_ETH 0x22f0 #define AVB_BROADCAST_MAC { 0x91, 0xe0, 0xf0, 0x01, 0x00, 0x00 }; @@ -38,6 +41,8 @@ struct impl { struct pw_loop *loop; struct pw_context *context; struct spa_hook context_listener; + struct pw_core *core; + unsigned do_disconnect:1; struct pw_properties *props; struct pw_work_queue *work_queue; @@ -67,7 +72,6 @@ struct descriptor { void *ptr; }; - struct server { struct spa_list link; struct impl *impl; @@ -83,6 +87,7 @@ struct server { struct spa_hook_list listener_list; struct spa_list descriptors; + struct spa_list streams; unsigned debug_messages:1; @@ -92,9 +97,10 @@ struct server { struct avb_msrp *msrp; struct avb_msrp_attribute *domain_attr; - struct avb_msrp_attribute *listener_attr; }; +#include "stream.h" + static inline const struct descriptor *server_find_descriptor(struct server *server, uint16_t type, uint16_t index) { @@ -124,6 +130,18 @@ static inline void *server_add_descriptor(struct server *server, return d->ptr; } +static inline struct stream *server_find_stream(struct server *server, + enum spa_direction direction, uint16_t index) +{ + struct stream *s; + spa_list_for_each(s, &server->streams, link) { + if (s->direction == direction && + s->index == index) + return s; + } + return NULL; +} + struct server *avdecc_server_new(struct impl *impl, const char *ifname, struct spa_dict *props); void avdecc_server_free(struct server *server); diff --git a/src/modules/module-avb/maap.h b/src/modules/module-avb/maap.h index e9090ba38..680ec2fea 100644 --- a/src/modules/module-avb/maap.h +++ b/src/modules/module-avb/maap.h @@ -31,8 +31,8 @@ #define AVB_TSN_ETH 0x22f0 #define AVB_MAAP_MAC { 0x91, 0xe0, 0xf0, 0x00, 0xff, 0x00 }; -#define AVB_MAAP_MESSAGE_TYPE_PROBE 1 -#define AVB_MAAP_MESSAGE_TYPE_DEFEND 2 +#define AVB_MAAP_MESSAGE_TYPE_PROBE 1 +#define AVB_MAAP_MESSAGE_TYPE_DEFEND 2 #define AVB_MAAP_MESSAGE_TYPE_ANNOUNCE 3 struct avb_packet_maap { diff --git a/src/modules/module-avb/mrp.c b/src/modules/module-avb/mrp.c index 3f0349de4..a66b053d9 100644 --- a/src/modules/module-avb/mrp.c +++ b/src/modules/module-avb/mrp.c @@ -180,6 +180,38 @@ int avb_mrp_parse_packet(struct avb_mrp *mrp, uint64_t now, const void *pkt, int return 0; } +const char *avb_mrp_notify_name(uint8_t notify) +{ + switch(notify) { + case AVB_MRP_NOTIFY_NEW: + return "new"; + case AVB_MRP_NOTIFY_JOIN: + return "join"; + case AVB_MRP_NOTIFY_LEAVE: + return "leave"; + } + return "unknown"; +} + +const char *avb_mrp_send_name(uint8_t send) +{ + switch(send) { + case AVB_MRP_SEND_NEW: + return "new"; + case AVB_MRP_SEND_JOININ: + return "joinin"; + case AVB_MRP_SEND_IN: + return "in"; + case AVB_MRP_SEND_JOINMT: + return "joinmt"; + case AVB_MRP_SEND_MT: + return "mt"; + case AVB_MRP_SEND_LV: + return "leave"; + } + return "unknown"; +} + struct avb_mrp_attribute *avb_mrp_attribute_new(struct avb_mrp *m, size_t user_size) { @@ -198,6 +230,13 @@ struct avb_mrp_attribute *avb_mrp_attribute_new(struct avb_mrp *m, return &a->attr; } +void avb_mrp_attribute_destroy(struct avb_mrp_attribute *attr) +{ + struct attribute *a = SPA_CONTAINER_OF(attr, struct attribute, attr); + spa_list_remove(&a->link); + free(a); +} + void avb_mrp_attribute_add_listener(struct avb_mrp_attribute *attr, struct spa_hook *listener, const struct avb_mrp_attribute_events *events, void *data) { @@ -247,7 +286,7 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now switch (state) { case AVB_MRP_IN: a->leave_timeout = now + MRP_LVTIMER_MS * SPA_NSEC_PER_MSEC; - state = AVB_MRP_LV; + //state = AVB_MRP_LV; break; } break; @@ -276,7 +315,7 @@ void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now } if (a->registrar_state != state || notify) { - pw_log_debug("attr %p: %d %d -> %d %d", a, event, a->registrar_state, state, notify); + pw_log_info("attr %p: %d %d -> %d %d", a, event, a->registrar_state, state, notify); a->registrar_state = state; } diff --git a/src/modules/module-avb/mrp.h b/src/modules/module-avb/mrp.h index e63c69957..0a05d4b4d 100644 --- a/src/modules/module-avb/mrp.h +++ b/src/modules/module-avb/mrp.h @@ -118,6 +118,9 @@ struct avb_packet_mrp_footer { #define AVB_MRP_NOTIFY_JOIN 2 #define AVB_MRP_NOTIFY_LEAVE 3 +const char *avb_mrp_notify_name(uint8_t notify); +const char *avb_mrp_send_name(uint8_t send); + struct avb_mrp_attribute { uint8_t pending_send; void *user_data; @@ -132,6 +135,7 @@ struct avb_mrp_attribute_events { struct avb_mrp_attribute *avb_mrp_attribute_new(struct avb_mrp *mrp, size_t user_size); +void avb_mrp_attribute_destroy(struct avb_mrp_attribute *attr); void avb_mrp_attribute_update_state(struct avb_mrp_attribute *attr, uint64_t now, int event); diff --git a/src/modules/module-avb/msrp.c b/src/modules/module-avb/msrp.c index 3f348797f..09f4f6f9c 100644 --- a/src/modules/module-avb/msrp.c +++ b/src/modules/module-avb/msrp.c @@ -35,6 +35,8 @@ static const uint8_t msrp_mac[6] = AVB_MSRP_MAC; struct attr { struct avb_msrp_attribute attr; + struct msrp *msrp; + struct spa_hook listener; struct spa_list link; }; @@ -69,7 +71,7 @@ static void debug_msrp_talker(const struct avb_packet_msrp_talker *t) static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) { - pw_log_info("> notify talker: %d", notify); + pw_log_info("> notify talker: %s", avb_mrp_notify_name(notify)); debug_msrp_talker(&attr->attr.attr.talker); } @@ -86,6 +88,35 @@ static int process_talker(struct msrp *msrp, uint64_t now, uint8_t attr_type, } return 0; } +static int encode_talker(struct msrp *msrp, struct attr *a, void *m) +{ + struct avb_packet_msrp_msg *msg = m; + struct avb_packet_mrp_vector *v; + struct avb_packet_msrp_talker *t; + struct avb_packet_mrp_footer *f; + uint8_t *ev; + size_t attr_list_length = sizeof(*v) + sizeof(*t) + sizeof(*f) + 1; + + msg->attribute_type = AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE; + msg->attribute_length = sizeof(*t); + msg->attribute_list_length = htons(attr_list_length); + + v = (struct avb_packet_mrp_vector *)msg->attribute_list; + v->lva = 0; + AVB_MRP_VECTOR_SET_NUM_VALUES(v, 1); + + t = (struct avb_packet_msrp_talker *)v->first_value; + *t = a->attr.attr.talker; + + ev = SPA_PTROFF(t, sizeof(*t), uint8_t); + *ev = a->attr.mrp->pending_send * 6 * 6; + + f = SPA_PTROFF(ev, sizeof(*ev), struct avb_packet_mrp_footer); + f->end_mark = 0; + + return attr_list_length + sizeof(*msg); +} + static void debug_msrp_talker_fail(const struct avb_packet_msrp_talker_fail *t) { @@ -111,17 +142,18 @@ static int process_talker_fail(struct msrp *msrp, uint64_t now, uint8_t attr_typ return 0; } -static void debug_msrp_listener(const struct avb_packet_msrp_listener *l) +static void debug_msrp_listener(const struct avb_packet_msrp_listener *l, uint8_t param) { char buf[128]; pw_log_info("listener"); pw_log_info(" %s", avb_utils_format_id(buf, sizeof(buf), be64toh(l->stream_id))); + pw_log_info(" %d", param); } static void notify_listener(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) { - pw_log_info("> notify listener: %d", notify); - debug_msrp_listener(&attr->attr.attr.listener); + pw_log_info("> notify listener: %s", avb_mrp_notify_name(notify)); + debug_msrp_listener(&attr->attr.attr.listener, attr->attr.param); } static int process_listener(struct msrp *msrp, uint64_t now, uint8_t attr_type, @@ -167,7 +199,6 @@ static int encode_listener(struct msrp *msrp, struct attr *a, void *m) return attr_list_length + sizeof(*msg); } - static void debug_msrp_domain(const struct avb_packet_msrp_domain *d) { pw_log_info("domain"); @@ -178,7 +209,7 @@ static void debug_msrp_domain(const struct avb_packet_msrp_domain *d) static void notify_domain(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) { - pw_log_info("> notify domain: %d", notify); + pw_log_info("> notify domain: %s", avb_mrp_notify_name(notify)); debug_msrp_domain(&attr->attr.attr.domain); } @@ -223,12 +254,12 @@ static int encode_domain(struct msrp *msrp, struct attr *a, void *m) static const struct { const char *name; - int (*dispatch) (struct msrp *msrp, uint64_t now, uint8_t attr_type, + int (*process) (struct msrp *msrp, uint64_t now, uint8_t attr_type, const void *m, uint8_t event, uint8_t param, int num); int (*encode) (struct msrp *msrp, struct attr *attr, void *m); void (*notify) (struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify); } dispatch[] = { - [AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE] = { "talker", process_talker, NULL, notify_talker, }, + [AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE] = { "talker", process_talker, encode_talker, notify_talker, }, [AVB_MSRP_ATTRIBUTE_TYPE_TALKER_FAILED] = { "talker-fail", process_talker_fail, NULL, NULL }, [AVB_MSRP_ATTRIBUTE_TYPE_LISTENER] = { "listener", process_listener, encode_listener, notify_listener }, [AVB_MSRP_ATTRIBUTE_TYPE_DOMAIN] = { "domain", process_domain, encode_domain, notify_domain, }, @@ -261,7 +292,7 @@ static int msrp_process(void *data, uint64_t now, uint8_t attribute_type, const uint8_t event, uint8_t param, int index) { struct msrp *msrp = data; - return dispatch[attribute_type].dispatch(msrp, now, + return dispatch[attribute_type].process(msrp, now, attribute_type, value, event, param, index); } @@ -315,6 +346,18 @@ static const struct server_events server_events = { .destroy = msrp_destroy, }; +static void msrp_notify(void *data, uint64_t now, uint8_t notify) +{ + struct attr *a = data; + struct msrp *msrp = a->msrp; + return dispatch[a->attr.type].notify(msrp, now, a, notify); +} + +static const struct avb_mrp_attribute_events mrp_attr_events = { + AVB_VERSION_MRP_ATTRIBUTE_EVENTS, + .notify = msrp_notify, +}; + struct avb_msrp_attribute *avb_msrp_attribute_new(struct avb_msrp *m, uint8_t type) { @@ -325,9 +368,11 @@ struct avb_msrp_attribute *avb_msrp_attribute_new(struct avb_msrp *m, attr = avb_mrp_attribute_new(msrp->server->mrp, sizeof(struct attr)); a = attr->user_data; + a->msrp = msrp; a->attr.mrp = attr; a->attr.type = type; spa_list_append(&msrp->attributes, &a->link); + avb_mrp_attribute_add_listener(attr, &a->listener, &mrp_attr_events, a); return &a->attr; } @@ -351,8 +396,8 @@ static void msrp_event(void *data, uint64_t now, uint8_t event) if (dispatch[a->attr.type].encode == NULL) continue; - pw_log_debug("send %s %d", dispatch[a->attr.type].name, - a->attr.mrp->pending_send); + pw_log_info("send %s %s", dispatch[a->attr.type].name, + avb_mrp_send_name(a->attr.mrp->pending_send)); len = dispatch[a->attr.type].encode(msrp, a, msg); if (len < 0) @@ -370,18 +415,9 @@ static void msrp_event(void *data, uint64_t now, uint8_t event) buffer, total); } -static void msrp_notify(void *data, uint64_t now, struct avb_mrp_attribute *attr, uint8_t notify) -{ - struct msrp *msrp = data; - struct attr *a = attr->user_data; - if (dispatch[a->attr.type].notify != NULL) - dispatch[a->attr.type].notify(msrp, now, a, notify); -} - static const struct avb_mrp_events mrp_events = { AVB_VERSION_MRP_EVENTS, .event = msrp_event, - .notify = msrp_notify }; struct avb_msrp *avb_msrp_register(struct server *server) diff --git a/src/modules/module-avb/mvrp.c b/src/modules/module-avb/mvrp.c index 9d42d257e..2f5f6eaa7 100644 --- a/src/modules/module-avb/mvrp.c +++ b/src/modules/module-avb/mvrp.c @@ -32,12 +32,15 @@ static const uint8_t mvrp_mac[6] = AVB_MVRP_MAC; struct attr { struct avb_mvrp_attribute attr; + struct spa_hook listener; struct spa_list link; + struct mvrp *mvrp; }; struct mvrp { struct server *server; struct spa_hook server_listener; + struct spa_hook mrp_listener; struct spa_source *source; @@ -67,9 +70,8 @@ static int mvrp_attr_event(void *data, uint64_t now, uint8_t attribute_type, uin return 0; } -static void debug_vid(const void *p) +static void debug_vid(const struct avb_packet_mvrp_vid *t) { - const struct avb_packet_mvrp_vid *t = p; pw_log_info("vid"); pw_log_info(" %d", ntohs(t->vlan)); } @@ -80,19 +82,55 @@ static int process_vid(struct mvrp *mvrp, uint64_t now, uint8_t attr_type, return mvrp_attr_event(mvrp, now, attr_type, event); } +static int encode_vid(struct mvrp *mvrp, struct attr *a, void *m) +{ + struct avb_packet_mvrp_msg *msg = m; + struct avb_packet_mrp_vector *v; + struct avb_packet_mvrp_vid *d; + struct avb_packet_mrp_footer *f; + uint8_t *ev; + size_t attr_list_length = sizeof(*v) + sizeof(*d) + sizeof(*f) + 1; + + msg->attribute_type = AVB_MVRP_ATTRIBUTE_TYPE_VID; + msg->attribute_length = sizeof(*d); + + v = (struct avb_packet_mrp_vector *)msg->attribute_list; + v->lva = 0; + AVB_MRP_VECTOR_SET_NUM_VALUES(v, 1); + + d = (struct avb_packet_mvrp_vid *)v->first_value; + *d = a->attr.attr.vid; + + ev = SPA_PTROFF(d, sizeof(*d), uint8_t); + *ev = a->attr.mrp->pending_send * 36; + + f = SPA_PTROFF(ev, sizeof(*ev), struct avb_packet_mrp_footer); + f->end_mark = 0; + + return attr_list_length + sizeof(*msg); +} + +static void notify_vid(struct mvrp *mvrp, uint64_t now, struct attr *attr, uint8_t notify) +{ + pw_log_info("> notify vid: %s", avb_mrp_notify_name(notify)); + debug_vid(&attr->attr.attr.vid); +} + static const struct { - void (*debug) (const void *p); - int (*dispatch) (struct mvrp *mvrp, uint64_t now, uint8_t attr_type, + const char *name; + int (*process) (struct mvrp *mvrp, uint64_t now, uint8_t attr_type, const void *m, uint8_t event, uint8_t param, int num); + int (*encode) (struct mvrp *mvrp, struct attr *attr, void *m); + void (*notify) (struct mvrp *mvrp, uint64_t now, struct attr *attr, uint8_t notify); } dispatch[] = { - [AVB_MVRP_ATTRIBUTE_TYPE_VID] = { debug_vid, process_vid, }, + [AVB_MVRP_ATTRIBUTE_TYPE_VID] = { "vid", process_vid, encode_vid, notify_vid }, }; static int mvrp_process(void *data, uint64_t now, uint8_t attribute_type, const void *value, uint8_t event, uint8_t param, int index) { struct mvrp *mvrp = data; - return dispatch[attribute_type].dispatch(mvrp, now, + return dispatch[attribute_type].process(mvrp, now, attribute_type, value, event, param, index); } @@ -147,6 +185,18 @@ static const struct server_events server_events = { .destroy = mvrp_destroy, }; +static void mvrp_notify(void *data, uint64_t now, uint8_t notify) +{ + struct attr *a = data; + struct mvrp *mvrp = a->mvrp; + return dispatch[a->attr.type].notify(mvrp, now, a, notify); +} + +static const struct avb_mrp_attribute_events mrp_attr_events = { + AVB_VERSION_MRP_ATTRIBUTE_EVENTS, + .notify = mvrp_notify, +}; + struct avb_mvrp_attribute *avb_mvrp_attribute_new(struct avb_mvrp *m, uint8_t type) { @@ -160,10 +210,54 @@ struct avb_mvrp_attribute *avb_mvrp_attribute_new(struct avb_mvrp *m, a->attr.mrp = attr; a->attr.type = type; spa_list_append(&mvrp->attributes, &a->link); + avb_mrp_attribute_add_listener(attr, &a->listener, &mrp_attr_events, a); return &a->attr; } +static void mvrp_event(void *data, uint64_t now, uint8_t event) +{ + struct mvrp *mvrp = data; + uint8_t buffer[2048]; + struct avb_packet_mrp *p = (struct avb_packet_mrp*)buffer; + struct avb_packet_mrp_footer *f; + void *msg = SPA_PTROFF(buffer, sizeof(*p), void); + struct attr *a; + int len, count = 0; + size_t total = sizeof(*p) + 2; + + p->version = AVB_MRP_PROTOCOL_VERSION; + + spa_list_for_each(a, &mvrp->attributes, link) { + if (!a->attr.mrp->pending_send) + continue; + if (dispatch[a->attr.type].encode == NULL) + continue; + + pw_log_debug("send %s %s", dispatch[a->attr.type].name, + avb_mrp_send_name(a->attr.mrp->pending_send)); + + len = dispatch[a->attr.type].encode(mvrp, a, msg); + if (len < 0) + break; + + count++; + msg = SPA_PTROFF(msg, len, void); + total += len; + } + f = (struct avb_packet_mrp_footer *)msg; + f->end_mark = 0; + + if (count > 0) + avb_server_send_packet(mvrp->server, mvrp_mac, AVB_MVRP_ETH, + buffer, total); +} + +static const struct avb_mrp_events mrp_events = { + AVB_VERSION_MRP_EVENTS, + .event = mvrp_event, +}; + struct avb_mvrp *avb_mvrp_register(struct server *server) { struct mvrp *mvrp; @@ -190,6 +284,7 @@ struct avb_mvrp *avb_mvrp_register(struct server *server) goto error_no_source; } avdecc_server_add_listener(server, &mvrp->server_listener, &server_events, mvrp); + avb_mrp_add_listener(server->mrp, &mvrp->mrp_listener, &mrp_events, mvrp); return (struct avb_mvrp*)mvrp; diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c new file mode 100644 index 000000000..e7f61f762 --- /dev/null +++ b/src/modules/module-avb/stream.c @@ -0,0 +1,572 @@ +/* AVB support + * + * Copyright © 2022 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "iec61883.h" +#include "stream.h" +#include "utils.h" +#include "aecp-aem-descriptors.h" + +static void on_stream_destroy(void *d) +{ + struct stream *stream = d; + spa_hook_remove(&stream->stream_listener); + stream->stream = NULL; +} + +static void on_source_stream_process(void *data) +{ + struct stream *stream = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t index, n_bytes; + int32_t avail, wanted; + + if ((buf = pw_stream_dequeue_buffer(stream->stream)) == NULL) { + pw_log_debug("out of buffers: %m"); + return; + } + + d = buf->buffer->datas; + + wanted = buf->requested ? buf->requested * stream->stride : d[0].maxsize; + + n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted); + + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + + if (avail < wanted) { + pw_log_warn("capture underrun %d < %d", avail, wanted); + memset(d[0].data, 0, n_bytes); + } else { + spa_ringbuffer_read_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + d[0].data, n_bytes); + index += n_bytes; + spa_ringbuffer_read_update(&stream->ring, index); + } + + d[0].chunk->size = n_bytes; + d[0].chunk->stride = stream->stride; + d[0].chunk->offset = 0; + buf->size = n_bytes / stream->stride; + + pw_stream_queue_buffer(stream->stream, buf); +} + +static const struct pw_stream_events source_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = on_stream_destroy, + .process = on_source_stream_process +}; + +static inline void +set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, + uint32_t offset, struct iovec *iov, uint32_t len) +{ + iov[0].iov_len = SPA_MIN(len, size - offset); + iov[0].iov_base = SPA_PTROFF(buffer, offset, void); + iov[1].iov_len = len - iov[0].iov_len; + iov[1].iov_base = buffer; +} + +static int flush_write(struct stream *stream, uint64_t current_time) +{ + int32_t avail; + uint32_t index; + uint64_t ptime, txtime; + int pdu_count; + ssize_t n; + struct avb_packet_iec61883 *p = (struct avb_packet_iec61883*)stream->pdu; + + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + + pdu_count = (avail / stream->stride) / stream->frames_per_pdu; + + txtime = current_time + stream->t_uncertainty; + ptime = txtime + stream->mtt; + + while (pdu_count--) { + *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; + + set_iovec(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + &stream->iov[1], stream->payload_size); + + p->seq_num = stream->pdu_seq++; + p->tv = 1; + p->timestamp = ptime; + + n = sendmsg(stream->source->fd, &stream->msg, 0); + if (n < 0 || n != (ssize_t)stream->pdu_size) { + pw_log_error("sendmsg() failed %zd != %zd: %m", + n, stream->pdu_size); + } + txtime += stream->pdu_period; + ptime += stream->pdu_period; + index += stream->payload_size; + } + spa_ringbuffer_read_update(&stream->ring, index); + return 0; +} + +static void on_sink_stream_process(void *data) +{ + struct stream *stream = data; + struct pw_buffer *buf; + struct spa_data *d; + int32_t filled; + uint32_t index, offs, avail, size; + struct timespec now; + + if ((buf = pw_stream_dequeue_buffer(stream->stream)) == NULL) { + pw_log_debug("out of buffers: %m"); + return; + } + + d = buf->buffer->datas; + + offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); + size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); + avail = size - offs; + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + + if (filled >= (int32_t)stream->buffer_size) { + pw_log_warn("playback overrun %d >= %zd", filled, stream->buffer_size); + } else { + spa_ringbuffer_write_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + SPA_PTROFF(d[0].data, offs, void), avail); + index += avail; + spa_ringbuffer_write_update(&stream->ring, index); + } + pw_stream_queue_buffer(stream->stream, buf); + + clock_gettime(CLOCK_TAI, &now); + flush_write(stream, SPA_TIMESPEC_TO_NSEC(&now)); +} + +static void setup_pdu(struct stream *stream) +{ + struct avb_packet_iec61883 *p = (struct avb_packet_iec61883*)stream->pdu; + ssize_t payload_size, hdr_size, pdu_size; + + hdr_size = sizeof(*p); + payload_size = stream->stride * stream->frames_per_pdu; + pdu_size = hdr_size + payload_size; + + spa_zero(stream->pdu); + p->eth.type = htons(0x8100); + p->vlan = htonl(0x600222f0); + + if (stream->direction == SPA_DIRECTION_OUTPUT) { + p->subtype = AVB_SUBTYPE_61883_IIDC; + p->sv = 1; + p->stream_id = htobe64(stream->id); + p->data_len = htons(payload_size+8); + p->tag = 0x1; + p->channel = 0x1f; + p->tcode = 0xa; + p->sid = 0x3f; + p->dbs = 0x8; + p->qi2 = 0x2; + p->format_id = 0x10; + p->fdf = 0x2; + p->syt = htons(0xffff); + } + stream->hdr_size = hdr_size; + stream->payload_size = payload_size; + stream->pdu_size = pdu_size; +} + +static int setup_msg(struct stream *stream) +{ + stream->iov[0].iov_base = stream->pdu; + stream->iov[0].iov_len = stream->hdr_size; + stream->iov[1].iov_base = SPA_PTROFF(stream->pdu, stream->hdr_size, void); + stream->iov[1].iov_len = stream->payload_size; + stream->iov[2].iov_base = SPA_PTROFF(stream->pdu, stream->hdr_size, void); + stream->iov[2].iov_len = 0; + stream->msg.msg_name = &stream->sock_addr; + stream->msg.msg_namelen = sizeof(stream->sock_addr); + stream->msg.msg_iov = stream->iov; + stream->msg.msg_iovlen = 3; + stream->msg.msg_control = stream->control; + stream->msg.msg_controllen = sizeof(stream->control); + stream->cmsg = CMSG_FIRSTHDR(&stream->msg); + stream->cmsg->cmsg_level = SOL_SOCKET; + stream->cmsg->cmsg_type = SCM_TXTIME; + stream->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); + return 0; +} + +static const struct pw_stream_events sink_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = on_stream_destroy, + .process = on_sink_stream_process +}; + +struct stream *server_create_stream(struct server *server, + enum spa_direction direction, uint16_t index) +{ + struct stream *stream; + const struct descriptor *desc; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + int res; + + desc = server_find_descriptor(server, + direction == SPA_DIRECTION_INPUT ? + AVB_AEM_DESC_STREAM_INPUT : + AVB_AEM_DESC_STREAM_OUTPUT, index); + if (desc == NULL) + return NULL; + + stream = calloc(1, sizeof(*stream)); + if (stream == NULL) + return NULL; + + stream->server = server; + stream->direction = direction; + stream->index = index; + stream->desc = desc; + spa_list_append(&server->streams, &stream->link); + + stream->id = (uint64_t)server->mac_addr[0] << 56 | + (uint64_t)server->mac_addr[1] << 48 | + (uint64_t)server->mac_addr[2] << 40 | + (uint64_t)server->mac_addr[3] << 32 | + (uint64_t)server->mac_addr[4] << 24 | + (uint64_t)server->mac_addr[5] << 16 | + htons(index); + + stream->listener_attr = avb_msrp_attribute_new(server->msrp, + AVB_MSRP_ATTRIBUTE_TYPE_LISTENER); + stream->talker_attr = avb_msrp_attribute_new(server->msrp, + AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); + stream->talker_attr->attr.talker.vlan_id = htons(2); + stream->talker_attr->attr.talker.tspec_max_interval_frames = + htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT); + stream->talker_attr->attr.talker.priority = AVB_MSRP_PRIORITY_DEFAULT; + stream->talker_attr->attr.talker.rank = AVB_MSRP_RANK_DEFAULT; + stream->talker_attr->attr.talker.accumulated_latency = 0; + + stream->vlan_attr = avb_mvrp_attribute_new(server->mvrp, + AVB_MVRP_ATTRIBUTE_TYPE_VID); + stream->vlan_attr->attr.vid.vlan = htons(2); + + stream->buffer_data = calloc(1, BUFFER_SIZE); + stream->buffer_size = BUFFER_SIZE; + spa_ringbuffer_init(&stream->ring); + + if (direction == SPA_DIRECTION_INPUT) { + stream->stream = pw_stream_new(server->impl->core, "source", + pw_properties_new( + PW_KEY_MEDIA_CLASS, "Audio/Source", + PW_KEY_NODE_NAME, "avb.source", + PW_KEY_NODE_DESCRIPTION, "AVB Source", + PW_KEY_NODE_WANT_DRIVER, "true", + NULL)); + } else { + stream->stream = pw_stream_new(server->impl->core, "sink", + pw_properties_new( + PW_KEY_MEDIA_CLASS, "Audio/Sink", + PW_KEY_NODE_NAME, "avb.sink", + PW_KEY_NODE_DESCRIPTION, "AVB Sink", + PW_KEY_NODE_WANT_DRIVER, "true", + NULL)); + } + if (stream->stream == NULL) + goto error_free; + + pw_stream_add_listener(stream->stream, + &stream->stream_listener, + direction == SPA_DIRECTION_INPUT ? + &source_stream_events : + &sink_stream_events, + stream); + + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE; + stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED; + stream->info.info.raw.rate = 48000; + stream->info.info.raw.channels = 8; + stream->stride = 8 * 4; + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_raw_build(&b, + SPA_PARAM_EnumFormat, &stream->info.info.raw); + + if ((res = pw_stream_connect(stream->stream, + pw_direction_reverse(direction), + PW_ID_ANY, + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_INACTIVE | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + goto error_free_stream; + + stream->frames_per_pdu = 6; + stream->pdu_period = SPA_NSEC_PER_SEC * stream->frames_per_pdu / + stream->info.info.raw.rate; + + setup_pdu(stream); + setup_msg(stream); + + return stream; + +error_free_stream: + pw_stream_destroy(stream->stream); + errno = -res; +error_free: + free(stream); + return NULL; +} + +void stream_destroy(struct stream *stream) +{ + avb_mrp_attribute_destroy(stream->listener_attr->mrp); + spa_list_remove(&stream->link); + free(stream); +} + +static int setup_socket(struct stream *stream) +{ + struct server *server = stream->server; + int fd, res; + char buf[128]; + struct ifreq req; + + fd = socket(AF_PACKET, SOCK_RAW | SOCK_NONBLOCK, htons(ETH_P_ALL)); + if (fd < 0) { + pw_log_error("socket() failed: %m"); + return -errno; + } + + spa_zero(req); + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", server->ifname); + res = ioctl(fd, SIOCGIFINDEX, &req); + if (res < 0) { + pw_log_error("SIOCGIFINDEX %s failed: %m", server->ifname); + res = -errno; + goto error_close; + } + + spa_zero(stream->sock_addr); + stream->sock_addr.sll_family = AF_PACKET; + stream->sock_addr.sll_protocol = htons(ETH_P_TSN); + stream->sock_addr.sll_ifindex = req.ifr_ifindex; + + if (stream->direction == SPA_DIRECTION_OUTPUT) { + struct sock_txtime txtime_cfg; + + res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &stream->prio, + sizeof(stream->prio)); + if (res < 0) { + pw_log_error("setsockopt(SO_PRIORITY %d) failed: %m", stream->prio); + res = -errno; + goto error_close; + } + + txtime_cfg.clockid = CLOCK_TAI; + txtime_cfg.flags = 0; + res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg, + sizeof(txtime_cfg)); + if (res < 0) { + pw_log_error("setsockopt(SO_TXTIME) failed: %m"); + res = -errno; + goto error_close; + } + } else { + struct packet_mreq mreq; + + res = bind(fd, (struct sockaddr *) &stream->sock_addr, sizeof(stream->sock_addr)); + if (res < 0) { + pw_log_error("bind() failed: %m"); + res = -errno; + goto error_close; + } + + spa_zero(mreq); + mreq.mr_ifindex = req.ifr_ifindex; + mreq.mr_type = PACKET_MR_MULTICAST; + mreq.mr_alen = ETH_ALEN; + memcpy(&mreq.mr_address, stream->addr, ETH_ALEN); + res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, + &mreq, sizeof(struct packet_mreq)); + + pw_log_info("join %s", avb_utils_format_addr(buf, 128, stream->addr)); + + if (res < 0) { + pw_log_error("setsockopt(ADD_MEMBERSHIP) failed: %m"); + res = -errno; + goto error_close; + } + } + return fd; + +error_close: + close(fd); + return res; +} + +static void handle_iec61883_packet(struct stream *stream, + struct avb_packet_iec61883 *p, int len) +{ + uint32_t index, n_bytes; + int32_t filled; + bool overrun = false; + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + overrun = filled > (int32_t) stream->buffer_size; + if (overrun) { + pw_log_warn("capture overrun %zd < %d", stream->buffer_size, filled); + } else { + n_bytes = ntohs(p->data_len) - 8; + spa_ringbuffer_write_data(&stream->ring, + stream->buffer_data, + stream->buffer_size, + index % stream->buffer_size, + p->payload, n_bytes); + index += n_bytes; + spa_ringbuffer_write_update(&stream->ring, index); + } + +} + +static void on_socket_data(void *data, int fd, uint32_t mask) +{ + struct stream *stream = data; + + if (mask & SPA_IO_IN) { + int len; + uint8_t buffer[2048]; + + len = recv(fd, buffer, sizeof(buffer), 0); + + if (len < 0) { + pw_log_warn("got recv error: %m"); + } + else if (len < (int)sizeof(struct avb_packet_header)) { + pw_log_warn("short packet received (%d < %d)", len, + (int)sizeof(struct avb_packet_header)); + } else { + struct avb_packet_iec61883 *h = (struct avb_packet_iec61883*)buffer; + + if (memcmp(h->eth.dest, stream->addr, 6) != 0 || + h->subtype != AVB_SUBTYPE_61883_IIDC) + return; + + handle_iec61883_packet(stream, h, len); + } + } +} + +int stream_activate(struct stream *stream, uint64_t now) +{ + struct server *server = stream->server; + struct avb_ethernet_header *h = (struct avb_ethernet_header *)stream->pdu; + int fd, res; + + if (stream->source == NULL) { + if ((fd = setup_socket(stream)) < 0) + return fd; + + stream->source = pw_loop_add_io(server->impl->loop, fd, + SPA_IO_IN, true, on_socket_data, stream); + if (stream->source == NULL) { + res = -errno; + pw_log_error("stream %p: can't create source: %m", stream); + close(fd); + return res; + } + } + + avb_mrp_attribute_begin(stream->vlan_attr->mrp, now); + avb_mrp_attribute_join(stream->vlan_attr->mrp, now, true); + + if (stream->direction == SPA_DIRECTION_INPUT) { + stream->listener_attr->attr.listener.stream_id = htobe64(stream->peer_id); + stream->listener_attr->param = AVB_MSRP_LISTENER_PARAM_READY; + avb_mrp_attribute_begin(stream->listener_attr->mrp, now); + avb_mrp_attribute_join(stream->listener_attr->mrp, now, true); + + stream->talker_attr->attr.talker.stream_id = htobe64(stream->peer_id); + avb_mrp_attribute_begin(stream->talker_attr->mrp, now); + } else { + stream->listener_attr->attr.listener.stream_id = htobe64(stream->id); + stream->listener_attr->param = AVB_MSRP_LISTENER_PARAM_IGNORE; + avb_mrp_attribute_begin(stream->listener_attr->mrp, now); + + stream->talker_attr->attr.talker.stream_id = htobe64(stream->id); + memcpy(stream->talker_attr->attr.talker.dest_addr, stream->addr, 6); + + stream->sock_addr.sll_halen = ETH_ALEN; + memcpy(&stream->sock_addr.sll_addr, stream->addr, ETH_ALEN); + memcpy(h->dest, stream->addr, 6); + memcpy(h->src, server->mac_addr, 6); + avb_mrp_attribute_begin(stream->talker_attr->mrp, now); + avb_mrp_attribute_join(stream->talker_attr->mrp, now, true); + } + pw_stream_set_active(stream->stream, true); + return 0; +} + +int stream_deactivate(struct stream *stream, uint64_t now) +{ + pw_stream_set_active(stream->stream, false); + + if (stream->source != NULL) { + pw_loop_destroy_source(stream->server->impl->loop, stream->source); + stream->source = NULL; + } + + avb_mrp_attribute_leave(stream->vlan_attr->mrp, now); + + if (stream->direction == SPA_DIRECTION_INPUT) { + avb_mrp_attribute_leave(stream->listener_attr->mrp, now); + } else { + avb_mrp_attribute_leave(stream->talker_attr->mrp, now); + } + return 0; +} diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h new file mode 100644 index 000000000..f13340052 --- /dev/null +++ b/src/modules/module-avb/stream.h @@ -0,0 +1,101 @@ +/* AVB support + * + * Copyright © 2022 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef AVB_STREAM_H +#define AVB_STREAM_H + +#include +#include +#include +#include + +#include +#include + +#include + +#define BUFFER_SIZE (1u<<16) +#define BUFFER_MASK (BUFFER_SIZE-1) + +struct stream { + struct spa_list link; + + struct server *server; + + uint16_t direction; + uint16_t index; + const struct descriptor *desc; + uint64_t id; + uint64_t peer_id; + + struct pw_stream *stream; + struct spa_hook stream_listener; + + uint8_t addr[6]; + struct spa_source *source; + int prio; + int mtt; + int t_uncertainty; + uint32_t frames_per_pdu; + int ptime_tolerance; + + uint8_t pdu[2048]; + size_t hdr_size; + size_t payload_size; + size_t pdu_size; + int64_t pdu_period; + uint8_t pdu_seq; + uint8_t prev_seq; + + struct iovec iov[3]; + struct sockaddr_ll sock_addr; + struct msghdr msg; + char control[CMSG_SPACE(sizeof(uint64_t))]; + struct cmsghdr *cmsg; + + struct spa_ringbuffer ring; + void *buffer_data; + size_t buffer_size; + + uint64_t format; + uint32_t stride; + struct spa_audio_info info; + + struct avb_msrp_attribute *talker_attr; + struct avb_msrp_attribute *listener_attr; + struct avb_mvrp_attribute *vlan_attr; +}; + +#include "msrp.h" +#include "mvrp.h" + +struct stream *server_create_stream(struct server *server, + enum spa_direction direction, uint16_t index); + +void stream_destroy(struct stream *stream); + +int stream_activate(struct stream *stream, uint64_t now); +int stream_deactivate(struct stream *stream, uint64_t now); + +#endif /* AVB_STREAM_H */