From 4d6c71bf12c489e0f18bcbaa0718325c68c72cee Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 10 Mar 2022 10:43:59 +0100 Subject: [PATCH] avb: use iovec to read from ringbuffer --- spa/plugins/avb/avb-pcm.c | 145 +++++++++++++++++++++----------------- spa/plugins/avb/avb-pcm.h | 4 +- 2 files changed, 84 insertions(+), 65 deletions(-) diff --git a/spa/plugins/avb/avb-pcm.c b/spa/plugins/avb/avb-pcm.c index 1117f6915..28291b8f1 100644 --- a/spa/plugins/avb/avb-pcm.c +++ b/spa/plugins/avb/avb-pcm.c @@ -631,10 +631,11 @@ static int setup_packet(struct state *state, struct spa_audio_info *fmt) int res; struct avtp_stream_pdu *pdu; struct props *p = &state->props; - ssize_t payload_size, pdu_size; + ssize_t payload_size, hdr_size, pdu_size; + hdr_size = sizeof(*pdu); payload_size = state->stride * p->frames_per_pdu; - pdu_size = sizeof(*pdu) + payload_size; + pdu_size = hdr_size + payload_size; if ((pdu = calloc(1, pdu_size)) == NULL) return -errno; @@ -653,6 +654,8 @@ static int setup_packet(struct state *state, struct spa_audio_info *fmt) #undef PDU_SET } state->pdu = pdu; + state->hdr_size = hdr_size; + state->payload_size = payload_size; state->pdu_size = pdu_size; return 0; @@ -664,19 +667,22 @@ error: static int setup_msg(struct state *state) { - state->iov.iov_base = state->pdu; - state->iov.iov_len = state->pdu_size; - state->msg.msg_name = &state->sock_addr; - state->msg.msg_namelen = sizeof(state->sock_addr); - state->msg.msg_iov = &state->iov; - state->msg.msg_iovlen = 1; - state->msg.msg_control = state->control; - state->msg.msg_controllen = sizeof(state->control); - state->cmsg = CMSG_FIRSTHDR(&state->msg); - state->cmsg->cmsg_level = SOL_SOCKET; - state->cmsg->cmsg_type = SCM_TXTIME; - state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); - + state->iov[0].iov_base = state->pdu; + state->iov[0].iov_len = state->hdr_size; + state->iov[1].iov_base = state->pdu->avtp_payload; + state->iov[1].iov_len = state->payload_size; + state->iov[2].iov_base = state->pdu->avtp_payload; + state->iov[2].iov_len = 0; + state->msg.msg_name = &state->sock_addr; + state->msg.msg_namelen = sizeof(state->sock_addr); + state->msg.msg_iov = state->iov; + state->msg.msg_iovlen = 3; + state->msg.msg_control = state->control; + state->msg.msg_controllen = sizeof(state->control); + state->cmsg = CMSG_FIRSTHDR(&state->msg); + state->cmsg->cmsg_level = SOL_SOCKET; + state->cmsg->cmsg_type = SCM_TXTIME; + state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); return 0; } @@ -769,18 +775,42 @@ static bool is_pdu_valid(struct state *state) return true; } +static inline void +set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, + uint32_t offset, struct iovec *iov, uint32_t len) +{ + iov[0].iov_len = SPA_MIN(len, size - offset); + iov[0].iov_base = SPA_PTROFF(buffer, offset, void); + iov[1].iov_len = len - iov[0].iov_len; + iov[1].iov_base = buffer; +} + static void avb_on_socket_event(struct spa_source *source) { int res; struct state *state = source->data; ssize_t n; int32_t filled; - uint32_t subtype, index, n_bytes; + uint32_t subtype, index; struct avtp_common_pdu *common; - struct props *p = &state->props; struct avtp_stream_pdu *pdu = state->pdu; + bool overrun = false; - n = recv(state->sockfd, state->pdu, state->pdu_size, 0); + filled = spa_ringbuffer_get_write_index(&state->ring, &index); + overrun = filled > (int32_t) state->ringbuffer_size; + if (overrun) { + state->iov[1].iov_base = state->pdu->avtp_payload; + state->iov[1].iov_len = state->payload_size; + state->iov[2].iov_len = 0; + } else { + set_iovec(&state->ring, + state->ringbuffer_data, + state->ringbuffer_size, + index % state->ringbuffer_size, + &state->iov[1], state->payload_size); + } + + n = recvmsg(state->sockfd, &state->msg, 0); if (n < 0) { spa_log_error(state->log, "recv() failed: %m"); return; @@ -794,27 +824,19 @@ static void avb_on_socket_event(struct spa_source *source) if ((res = avtp_pdu_get(common, AVTP_FIELD_SUBTYPE, &subtype)) < 0) return; - if (subtype != AVTP_SUBTYPE_AAF) + if (subtype != AVTP_SUBTYPE_AAF) { + spa_log_error(state->log, "non supported subtype"); return; - + } if (!is_pdu_valid(state)) { spa_log_error(state->log, "AAF PDU invalid"); return; } - - filled = spa_ringbuffer_get_write_index(&state->ring, &index); - if (filled > (int32_t) state->ringbuffer_size) { + if (overrun) { spa_log_warn(state->log, "overrun %d", filled); return; } - n_bytes = p->frames_per_pdu * state->stride; - spa_ringbuffer_write_data(&state->ring, - state->ringbuffer_data, - state->ringbuffer_size, - index % state->ringbuffer_size, - pdu->avtp_payload, n_bytes); - - index += n_bytes; + index += state->payload_size; spa_ringbuffer_write_update(&state->ring, index); } @@ -895,7 +917,7 @@ static int handle_play(struct state *state, uint64_t current_time) { int res; int32_t avail, wanted; - uint32_t index, n_bytes; + uint32_t index; uint64_t ptime, txtime; int pdu_count; struct props *p = &state->props; @@ -913,16 +935,15 @@ static int handle_play(struct state *state, uint64_t current_time) txtime = current_time + p->t_uncertainty; ptime = txtime + p->mtt; - n_bytes = p->frames_per_pdu * state->stride; while (pdu_count--) { *(__u64 *)CMSG_DATA(state->cmsg) = txtime; - spa_ringbuffer_read_data(&state->ring, - state->ringbuffer_data, - state->ringbuffer_size, - index % state->ringbuffer_size, - pdu->avtp_payload, n_bytes); + set_iovec(&state->ring, + state->ringbuffer_data, + state->ringbuffer_size, + index % state->ringbuffer_size, + &state->iov[1], state->payload_size); #define PDU_SET(f,v) if ((res = avtp_aaf_pdu_set(pdu, (f), (v))) < 0) return res; PDU_SET(AVTP_AAF_FIELD_SEQ_NUM, state->pdu_seq++); @@ -935,10 +956,9 @@ static int handle_play(struct state *state, uint64_t current_time) } txtime += state->pdu_period; ptime += state->pdu_period; - index += n_bytes; + index += state->payload_size; } spa_ringbuffer_read_update(&state->ring, index); - done: spa_node_call_ready(&state->callbacks, SPA_STATUS_NEED_DATA); @@ -951,6 +971,8 @@ int spa_avb_read(struct state *state) uint32_t index; struct port *port = &state->ports[0]; struct buffer *b; + struct spa_data *d; + uint32_t n_bytes; if (state->position) state->duration = state->position->clock.duration; @@ -962,36 +984,31 @@ int spa_avb_read(struct state *state) spa_log_warn(state->log, "capture underrun %d < %d", avail, wanted); return 0; } - while (avail >= wanted) { - struct spa_data *d; - uint32_t n_bytes; + if (spa_list_is_empty(&port->free)) + return 0; - if (spa_list_is_empty(&port->free)) - break; + b = spa_list_first(&port->free, struct buffer, link); + d = b->buf->datas; - b = spa_list_first(&port->free, struct buffer, link); - d = b->buf->datas; + n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted); - n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted); + spa_ringbuffer_read_data(&state->ring, + state->ringbuffer_data, + state->ringbuffer_size, + index % state->ringbuffer_size, + d[0].data, n_bytes); - spa_ringbuffer_read_data(&state->ring, - state->ringbuffer_data, - state->ringbuffer_size, - index % state->ringbuffer_size, - d[0].data, n_bytes); + d[0].chunk->offset = 0; + d[0].chunk->size = n_bytes; + d[0].chunk->stride = state->stride; + d[0].chunk->flags = 0; - d[0].chunk->offset = 0; - d[0].chunk->size = n_bytes; - d[0].chunk->stride = state->stride; - d[0].chunk->flags = 0; + spa_list_remove(&b->link); + spa_list_append(&port->ready, &b->link); - spa_list_remove(&b->link); - spa_list_append(&port->ready, &b->link); - - index += n_bytes; - avail -= n_bytes; - spa_ringbuffer_read_update(&state->ring, index); - } + index += n_bytes; + avail -= n_bytes; + spa_ringbuffer_read_update(&state->ring, index); return 0; } diff --git a/spa/plugins/avb/avb-pcm.h b/spa/plugins/avb/avb-pcm.h index 93af975e1..dfa369c2f 100644 --- a/spa/plugins/avb/avb-pcm.h +++ b/spa/plugins/avb/avb-pcm.h @@ -237,12 +237,14 @@ struct state { struct sockaddr_ll sock_addr; struct avtp_stream_pdu *pdu; + size_t hdr_size; + size_t payload_size; size_t pdu_size; int64_t pdu_period; uint8_t pdu_seq; uint8_t prev_seq; - struct iovec iov; + struct iovec iov[3]; struct msghdr msg; char control[CMSG_SPACE(sizeof(__u64))]; struct cmsghdr *cmsg;