/* AVB support */ /* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ /* SPDX-License-Identifier: MIT */ #include #include #include #include #include #include #include #include #include #include "aaf.h" #include "iec61883.h" #include "stream.h" #include "aecp-aem-state.h" #include "acmp-cmds-resps/acmp-common.h" #include "utils.h" static void on_stream_destroy(void *d) { struct stream *stream = d; spa_hook_remove(&stream->stream_listener); stream->stream = NULL; } static void on_source_stream_process(void *data) { struct stream *stream = data; struct pw_buffer *buf; struct spa_data *d; uint32_t index, n_bytes; int32_t avail, wanted; if ((buf = pw_stream_dequeue_buffer(stream->stream)) == NULL) { pw_log_debug("out of buffers: %m"); return; } d = buf->buffer->datas; wanted = buf->requested ? buf->requested * stream->stride : d[0].maxsize; n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted); avail = spa_ringbuffer_get_read_index(&stream->ring, &index); if (avail < wanted) { pw_log_debug("capture underrun %d < %d", avail, wanted); memset(d[0].data, 0, n_bytes); } else { spa_ringbuffer_read_data(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, d[0].data, n_bytes); index += n_bytes; spa_ringbuffer_read_update(&stream->ring, index); } d[0].chunk->size = n_bytes; d[0].chunk->stride = stream->stride; d[0].chunk->offset = 0; buf->size = n_bytes / stream->stride; pw_stream_queue_buffer(stream->stream, buf); } static const struct pw_stream_events source_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, .process = on_source_stream_process }; static inline void set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, uint32_t offset, struct iovec *iov, uint32_t len) { iov[0].iov_len = SPA_MIN(len, size - offset); iov[0].iov_base = SPA_PTROFF(buffer, offset, void); iov[1].iov_len = len - iov[0].iov_len; iov[1].iov_base = buffer; } static int flush_write(struct stream *stream, uint64_t current_time) { int32_t avail; uint32_t index; uint64_t ptime, txtime; int pdu_count; ssize_t n; struct avb_frame_header *h = (void*)stream->pdu; bool is_milan = stream->server->avb_mode == AVB_MODE_MILAN_V12; avail = spa_ringbuffer_get_read_index(&stream->ring, &index); pdu_count = (avail / stream->stride) / stream->frames_per_pdu; txtime = current_time + stream->t_uncertainty; ptime = txtime + stream->mtt; if (is_milan) { struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); while (pdu_count--) { *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; set_iovec(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, &stream->iov[1], stream->payload_size); p->seq_num = stream->pdu_seq++; p->tv = 1; p->timestamp = htonl((uint32_t)ptime); n = avb_server_stream_send(stream->server, stream, &stream->msg, MSG_NOSIGNAL); if (n < 0 || n != (ssize_t)stream->pdu_size) pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); txtime += stream->pdu_period; ptime += stream->pdu_period; index += stream->payload_size; } } else { struct avb_packet_iec61883 *p = SPA_PTROFF(h, sizeof(*h), void); uint8_t dbc = stream->dbc; while (pdu_count--) { *(uint64_t*)CMSG_DATA(stream->cmsg) = txtime; set_iovec(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, &stream->iov[1], stream->payload_size); p->seq_num = stream->pdu_seq++; p->tv = 1; p->timestamp = ptime; p->dbc = dbc; n = avb_server_stream_send(stream->server, stream, &stream->msg, MSG_NOSIGNAL); if (n < 0 || n != (ssize_t)stream->pdu_size) pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); txtime += stream->pdu_period; ptime += stream->pdu_period; index += stream->payload_size; dbc += stream->frames_per_pdu; } stream->dbc = dbc; } spa_ringbuffer_read_update(&stream->ring, index); return 0; } static void on_sink_stream_process(void *data) { struct stream *stream = data; struct pw_buffer *buf; struct spa_data *d; int32_t filled; uint32_t index, offs, avail, size; struct timespec now; if ((buf = pw_stream_dequeue_buffer(stream->stream)) == NULL) { pw_log_debug("out of buffers: %m"); return; } d = buf->buffer->datas; offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); avail = size - offs; filled = spa_ringbuffer_get_write_index(&stream->ring, &index); if (filled >= (int32_t)stream->buffer_size) { pw_log_warn("playback overrun %d >= %zd", filled, stream->buffer_size); } else { spa_ringbuffer_write_data(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, SPA_PTROFF(d[0].data, offs, void), avail); index += avail; spa_ringbuffer_write_update(&stream->ring, index); } pw_stream_queue_buffer(stream->stream, buf); clock_gettime(CLOCK_TAI, &now); flush_write(stream, SPA_TIMESPEC_TO_NSEC(&now)); } static void setup_pdu(struct stream *stream) { struct avb_frame_header *h; ssize_t payload_size, hdr_size, pdu_size; bool is_milan = stream->server->avb_mode == AVB_MODE_MILAN_V12; spa_memzero(stream->pdu, sizeof(stream->pdu)); h = (struct avb_frame_header*)stream->pdu; payload_size = stream->stride * stream->frames_per_pdu; if (is_milan) { struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); hdr_size = sizeof(*h) + sizeof(*p); pdu_size = hdr_size + payload_size; h->type = htons(0x8100); h->prio_cfi_id = htons((stream->prio << 13) | stream->vlan_id); h->etype = htons(0x22f0); if (stream->direction == SPA_DIRECTION_OUTPUT) { p->subtype = AVB_SUBTYPE_AAF; p->sv = 1; p->stream_id = htobe64(stream->id); p->format = AVB_AAF_FORMAT_INT_32BIT; p->nsr = AVB_AAF_PCM_NSR_48KHZ; p->bit_depth = 32; p->chan_per_frame = stream->info.info.raw.channels; p->sp = AVB_AAF_PCM_SP_NORMAL; p->event = 0; p->seq_num = 0; p->data_len = htons(payload_size); } } else { struct avb_packet_iec61883 *p = SPA_PTROFF(h, sizeof(*h), void); hdr_size = sizeof(*h) + sizeof(*p); pdu_size = hdr_size + payload_size; h->type = htons(0x8100); h->prio_cfi_id = htons((stream->prio << 13) | stream->vlan_id); h->etype = htons(0x22f0); if (stream->direction == SPA_DIRECTION_OUTPUT) { p->subtype = AVB_SUBTYPE_61883_IIDC; p->sv = 1; p->stream_id = htobe64(stream->id); p->data_len = htons(payload_size + 8); p->tag = 0x1; p->channel = 0x1f; p->tcode = 0xa; p->sid = 0x3f; p->dbs = stream->info.info.raw.channels; p->qi2 = 0x2; p->format_id = 0x10; p->fdf = 0x2; p->syt = htons(0x0008); } } stream->hdr_size = hdr_size; stream->payload_size = payload_size; stream->pdu_size = pdu_size; } static int setup_msg(struct stream *stream) { stream->iov[0].iov_base = stream->pdu; stream->iov[0].iov_len = stream->hdr_size; stream->iov[1].iov_base = SPA_PTROFF(stream->pdu, stream->hdr_size, void); stream->iov[1].iov_len = stream->payload_size; stream->iov[2].iov_base = SPA_PTROFF(stream->pdu, stream->hdr_size, void); stream->iov[2].iov_len = 0; stream->msg.msg_name = &stream->sock_addr; stream->msg.msg_namelen = sizeof(stream->sock_addr); stream->msg.msg_iov = stream->iov; stream->msg.msg_iovlen = 3; stream->msg.msg_control = stream->control; stream->msg.msg_controllen = sizeof(stream->control); stream->cmsg = CMSG_FIRSTHDR(&stream->msg); stream->cmsg->cmsg_level = SOL_SOCKET; stream->cmsg->cmsg_type = SCM_TXTIME; stream->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); return 0; } static const struct pw_stream_events sink_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, .process = on_sink_stream_process }; struct stream *server_create_stream(struct server *server, struct stream *stream, enum spa_direction direction, uint16_t index) { struct stream_common *common = (struct stream_common *)stream; uint32_t n_params; const struct spa_pod *params[1]; uint8_t buffer[1024]; struct spa_pod_builder b; int res; stream->server = server; stream->direction = direction; stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->vlan_id = AVB_DEFAULT_VLAN; stream->id = (uint64_t)server->mac_addr[0] << 56 | (uint64_t)server->mac_addr[1] << 48 | (uint64_t)server->mac_addr[2] << 40 | (uint64_t)server->mac_addr[3] << 32 | (uint64_t)server->mac_addr[4] << 24 | (uint64_t)server->mac_addr[5] << 16 | htons(index); stream->buffer_data = calloc(1, BUFFER_SIZE); stream->buffer_size = BUFFER_SIZE; spa_ringbuffer_init(&stream->ring); if (direction == SPA_DIRECTION_INPUT) { stream->stream = pw_stream_new(server->impl->core, "source", pw_properties_new( PW_KEY_MEDIA_CLASS, "Audio/Source", PW_KEY_NODE_NAME, "avb.source", PW_KEY_NODE_DESCRIPTION, "AVB Source", PW_KEY_NODE_WANT_DRIVER, "true", NULL)); } else { stream->stream = pw_stream_new(server->impl->core, "sink", pw_properties_new( PW_KEY_MEDIA_CLASS, "Audio/Sink", PW_KEY_NODE_NAME, "avb.sink", PW_KEY_NODE_DESCRIPTION, "AVB Sink", PW_KEY_NODE_WANT_DRIVER, "true", NULL)); } if (stream->stream == NULL) goto error_free; pw_stream_add_listener(stream->stream, &stream->stream_listener, direction == SPA_DIRECTION_INPUT ? &source_stream_events : &sink_stream_events, stream); stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_32_BE; stream->info.info.raw.flags = SPA_AUDIO_FLAG_UNPOSITIONED; stream->info.info.raw.rate = 48000; stream->info.info.raw.channels = 8; stream->stride = stream->info.info.raw.channels * 4; n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &stream->info.info.raw); if ((res = pw_stream_connect(stream->stream, pw_direction_reverse(direction), PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_INACTIVE | PW_STREAM_FLAG_RT_PROCESS, params, n_params)) < 0) goto error_free_stream; stream->frames_per_pdu = 6; stream->pdu_period = SPA_NSEC_PER_SEC * stream->frames_per_pdu / stream->info.info.raw.rate; setup_pdu(stream); setup_msg(stream); res = avb_msrp_attribute_new(server->msrp, &common->lstream_attr, AVB_MSRP_ATTRIBUTE_TYPE_LISTENER); if (res) goto error_free; res = avb_msrp_attribute_new(server->msrp, &common->tfstream_attr, AVB_MSRP_ATTRIBUTE_TYPE_TALKER_FAILED); if (res) { avb_mrp_attribute_destroy(common->lstream_attr.mrp); goto error_free; } if (direction == SPA_DIRECTION_OUTPUT) { res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr, AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); if (res) { avb_mrp_attribute_destroy(common->lstream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); goto error_free; } common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id); common->tastream_attr.attr.talker.tspec_max_frame_size = htons(stream->server->avb_mode == AVB_MODE_MILAN_V12 ? (uint16_t)stream->pdu_size : (uint16_t)(32 + stream->frames_per_pdu * stream->stride)); common->tastream_attr.attr.talker.tspec_max_interval_frames = htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT); common->tastream_attr.attr.talker.priority = stream->prio; common->tastream_attr.attr.talker.rank = AVB_MSRP_RANK_DEFAULT; common->tastream_attr.attr.talker.accumulated_latency = htonl(95); } spa_list_append(&server->streams, &stream->link); return stream; error_free_stream: pw_stream_destroy(stream->stream); errno = -res; error_free: free(stream->buffer_data); return NULL; } void stream_destroy(struct stream *stream) { struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); if (stream->direction == SPA_DIRECTION_INPUT) { avb_mrp_attribute_destroy(common->lstream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); } else { avb_mrp_attribute_destroy(common->tastream_attr.mrp); avb_mrp_attribute_destroy(common->tfstream_attr.mrp); } } static int setup_socket(struct stream *stream) { return avb_server_stream_setup_socket(stream->server, stream); } static void handle_aaf_packet(struct stream *stream, struct avb_packet_aaf *p, int len) { uint32_t index, n_bytes; int32_t filled; filled = spa_ringbuffer_get_write_index(&stream->ring, &index); n_bytes = ntohs(p->data_len); if (filled + (int32_t)n_bytes > (int32_t)stream->buffer_size) { pw_log_debug("capture overrun"); } else { spa_ringbuffer_write_data(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, p->payload, n_bytes); index += n_bytes; spa_ringbuffer_write_update(&stream->ring, index); } } static void handle_iec61883_packet(struct stream *stream, struct avb_packet_iec61883 *p, int len) { uint32_t index, n_bytes; uint16_t data_len; int32_t filled; filled = spa_ringbuffer_get_write_index(&stream->ring, &index); data_len = ntohs(p->data_len); if (data_len < 8) return; n_bytes = data_len - 8; if (n_bytes > (uint32_t)(len - (int)sizeof(*p))) return; if (filled + n_bytes > stream->buffer_size) { pw_log_debug("capture overrun"); } else { spa_ringbuffer_write_data(&stream->ring, stream->buffer_data, stream->buffer_size, index % stream->buffer_size, p->payload, n_bytes); index += n_bytes; spa_ringbuffer_write_update(&stream->ring, index); } } static void on_socket_data(void *data, int fd, uint32_t mask) { struct stream *stream = data; if (mask & SPA_IO_IN) { int len; uint8_t buffer[2048]; len = recv(fd, buffer, sizeof(buffer), 0); if (len < 0) { pw_log_warn("got recv error: %m"); } else if (len < (int)(sizeof(struct avb_ethernet_header) + sizeof(struct avb_packet_iec61883))) { pw_log_warn("short packet received (%d < %d)", len, (int)(sizeof(struct avb_ethernet_header) + sizeof(struct avb_packet_iec61883))); } else { struct avb_ethernet_header *h = (void*)buffer; struct avb_packet_header *ph = SPA_PTROFF(h, sizeof(*h), void); if (memcmp(h->dest, stream->addr, 6) != 0) return; switch (ph->subtype) { case AVB_SUBTYPE_AAF: handle_aaf_packet(stream, (struct avb_packet_aaf *)ph, len - (int)sizeof(*h)); break; case AVB_SUBTYPE_61883_IIDC: handle_iec61883_packet(stream, (struct avb_packet_iec61883 *)ph, len - (int)sizeof(*h)); break; default: pw_log_warn("unsupported subtype 0x%02x", ph->subtype); break; } } } } int stream_activate(struct stream *stream, uint16_t index, uint64_t now) { struct server *server = stream->server; struct avb_frame_header *h = (void*)stream->pdu; int fd, res; struct stream_common *common; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); if (stream->source == NULL) { if ((fd = setup_socket(stream)) < 0) return fd; stream->source = pw_loop_add_io(server->impl->loop, fd, SPA_IO_IN, true, on_socket_data, stream); if (stream->source == NULL) { res = -errno; pw_log_error("stream %p: can't create source: %m", stream); close(fd); return res; } } if (stream->direction == SPA_DIRECTION_INPUT) { struct aecp_aem_stream_input_state *input_stream; input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); common->lstream_attr.attr.listener.stream_id = htobe64(stream->peer_id); common->lstream_attr.param = AVB_MSRP_LISTENER_PARAM_READY; avb_mrp_attribute_begin(common->lstream_attr.mrp, now); avb_mrp_attribute_join(common->lstream_attr.mrp, now, true); input_stream->mvrp_attr.attr.vid.vlan = htons(stream->vlan_id); avb_mrp_attribute_begin(input_stream->mvrp_attr.mrp, now); avb_mrp_attribute_join(input_stream->mvrp_attr.mrp, now, true); } else { if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) return res; common->tastream_attr.attr.talker.stream_id = htobe64(stream->id); memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6); stream->sock_addr.sll_halen = ETH_ALEN; memcpy(&stream->sock_addr.sll_addr, stream->addr, ETH_ALEN); memcpy(h->dest, stream->addr, 6); memcpy(h->src, server->mac_addr, 6); avb_mrp_attribute_begin(common->tastream_attr.mrp, now); avb_mrp_attribute_join(common->tastream_attr.mrp, now, true); } pw_stream_set_active(stream->stream, true); return 0; } int stream_deactivate(struct stream *stream, uint64_t now) { struct stream_common *common; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); pw_stream_set_active(stream->stream, false); if (stream->source != NULL) { pw_loop_destroy_source(stream->server->impl->loop, stream->source); stream->source = NULL; } #if 0 avb_mrp_attribute_leave(stream->vlan_attr->mrp, now); #endif // if (stream->direction == SPA_DIRECTION_INPUT) avb_mrp_attribute_leave(common->lstream_attr.mrp, now); else avb_mrp_attribute_leave(common->tastream_attr.mrp, now); return 0; } int stream_activate_virtual(struct stream *stream, uint16_t index) { struct server *server = stream->server; int fd; if (stream->source == NULL) { fd = setup_socket(stream); if (fd < 0) return fd; stream->source = pw_loop_add_io(server->impl->loop, fd, SPA_IO_IN, true, on_socket_data, stream); if (stream->source == NULL) { close(fd); return -errno; } } pw_stream_set_active(stream->stream, true); return 0; }