mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
avb: use iovec to read from ringbuffer
This commit is contained in:
parent
5c3629a165
commit
4d6c71bf12
2 changed files with 84 additions and 65 deletions
|
|
@ -631,10 +631,11 @@ static int setup_packet(struct state *state, struct spa_audio_info *fmt)
|
||||||
int res;
|
int res;
|
||||||
struct avtp_stream_pdu *pdu;
|
struct avtp_stream_pdu *pdu;
|
||||||
struct props *p = &state->props;
|
struct props *p = &state->props;
|
||||||
ssize_t payload_size, pdu_size;
|
ssize_t payload_size, hdr_size, pdu_size;
|
||||||
|
|
||||||
|
hdr_size = sizeof(*pdu);
|
||||||
payload_size = state->stride * p->frames_per_pdu;
|
payload_size = state->stride * p->frames_per_pdu;
|
||||||
pdu_size = sizeof(*pdu) + payload_size;
|
pdu_size = hdr_size + payload_size;
|
||||||
if ((pdu = calloc(1, pdu_size)) == NULL)
|
if ((pdu = calloc(1, pdu_size)) == NULL)
|
||||||
return -errno;
|
return -errno;
|
||||||
|
|
||||||
|
|
@ -653,6 +654,8 @@ static int setup_packet(struct state *state, struct spa_audio_info *fmt)
|
||||||
#undef PDU_SET
|
#undef PDU_SET
|
||||||
}
|
}
|
||||||
state->pdu = pdu;
|
state->pdu = pdu;
|
||||||
|
state->hdr_size = hdr_size;
|
||||||
|
state->payload_size = payload_size;
|
||||||
state->pdu_size = pdu_size;
|
state->pdu_size = pdu_size;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
@ -664,19 +667,22 @@ error:
|
||||||
|
|
||||||
static int setup_msg(struct state *state)
|
static int setup_msg(struct state *state)
|
||||||
{
|
{
|
||||||
state->iov.iov_base = state->pdu;
|
state->iov[0].iov_base = state->pdu;
|
||||||
state->iov.iov_len = state->pdu_size;
|
state->iov[0].iov_len = state->hdr_size;
|
||||||
state->msg.msg_name = &state->sock_addr;
|
state->iov[1].iov_base = state->pdu->avtp_payload;
|
||||||
state->msg.msg_namelen = sizeof(state->sock_addr);
|
state->iov[1].iov_len = state->payload_size;
|
||||||
state->msg.msg_iov = &state->iov;
|
state->iov[2].iov_base = state->pdu->avtp_payload;
|
||||||
state->msg.msg_iovlen = 1;
|
state->iov[2].iov_len = 0;
|
||||||
state->msg.msg_control = state->control;
|
state->msg.msg_name = &state->sock_addr;
|
||||||
state->msg.msg_controllen = sizeof(state->control);
|
state->msg.msg_namelen = sizeof(state->sock_addr);
|
||||||
state->cmsg = CMSG_FIRSTHDR(&state->msg);
|
state->msg.msg_iov = state->iov;
|
||||||
state->cmsg->cmsg_level = SOL_SOCKET;
|
state->msg.msg_iovlen = 3;
|
||||||
state->cmsg->cmsg_type = SCM_TXTIME;
|
state->msg.msg_control = state->control;
|
||||||
state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64));
|
state->msg.msg_controllen = sizeof(state->control);
|
||||||
|
state->cmsg = CMSG_FIRSTHDR(&state->msg);
|
||||||
|
state->cmsg->cmsg_level = SOL_SOCKET;
|
||||||
|
state->cmsg->cmsg_type = SCM_TXTIME;
|
||||||
|
state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -769,18 +775,42 @@ static bool is_pdu_valid(struct state *state)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline void
|
||||||
|
set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
|
||||||
|
uint32_t offset, struct iovec *iov, uint32_t len)
|
||||||
|
{
|
||||||
|
iov[0].iov_len = SPA_MIN(len, size - offset);
|
||||||
|
iov[0].iov_base = SPA_PTROFF(buffer, offset, void);
|
||||||
|
iov[1].iov_len = len - iov[0].iov_len;
|
||||||
|
iov[1].iov_base = buffer;
|
||||||
|
}
|
||||||
|
|
||||||
static void avb_on_socket_event(struct spa_source *source)
|
static void avb_on_socket_event(struct spa_source *source)
|
||||||
{
|
{
|
||||||
int res;
|
int res;
|
||||||
struct state *state = source->data;
|
struct state *state = source->data;
|
||||||
ssize_t n;
|
ssize_t n;
|
||||||
int32_t filled;
|
int32_t filled;
|
||||||
uint32_t subtype, index, n_bytes;
|
uint32_t subtype, index;
|
||||||
struct avtp_common_pdu *common;
|
struct avtp_common_pdu *common;
|
||||||
struct props *p = &state->props;
|
|
||||||
struct avtp_stream_pdu *pdu = state->pdu;
|
struct avtp_stream_pdu *pdu = state->pdu;
|
||||||
|
bool overrun = false;
|
||||||
|
|
||||||
n = recv(state->sockfd, state->pdu, state->pdu_size, 0);
|
filled = spa_ringbuffer_get_write_index(&state->ring, &index);
|
||||||
|
overrun = filled > (int32_t) state->ringbuffer_size;
|
||||||
|
if (overrun) {
|
||||||
|
state->iov[1].iov_base = state->pdu->avtp_payload;
|
||||||
|
state->iov[1].iov_len = state->payload_size;
|
||||||
|
state->iov[2].iov_len = 0;
|
||||||
|
} else {
|
||||||
|
set_iovec(&state->ring,
|
||||||
|
state->ringbuffer_data,
|
||||||
|
state->ringbuffer_size,
|
||||||
|
index % state->ringbuffer_size,
|
||||||
|
&state->iov[1], state->payload_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
n = recvmsg(state->sockfd, &state->msg, 0);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
spa_log_error(state->log, "recv() failed: %m");
|
spa_log_error(state->log, "recv() failed: %m");
|
||||||
return;
|
return;
|
||||||
|
|
@ -794,27 +824,19 @@ static void avb_on_socket_event(struct spa_source *source)
|
||||||
|
|
||||||
if ((res = avtp_pdu_get(common, AVTP_FIELD_SUBTYPE, &subtype)) < 0)
|
if ((res = avtp_pdu_get(common, AVTP_FIELD_SUBTYPE, &subtype)) < 0)
|
||||||
return;
|
return;
|
||||||
if (subtype != AVTP_SUBTYPE_AAF)
|
if (subtype != AVTP_SUBTYPE_AAF) {
|
||||||
|
spa_log_error(state->log, "non supported subtype");
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
if (!is_pdu_valid(state)) {
|
if (!is_pdu_valid(state)) {
|
||||||
spa_log_error(state->log, "AAF PDU invalid");
|
spa_log_error(state->log, "AAF PDU invalid");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (overrun) {
|
||||||
filled = spa_ringbuffer_get_write_index(&state->ring, &index);
|
|
||||||
if (filled > (int32_t) state->ringbuffer_size) {
|
|
||||||
spa_log_warn(state->log, "overrun %d", filled);
|
spa_log_warn(state->log, "overrun %d", filled);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
n_bytes = p->frames_per_pdu * state->stride;
|
index += state->payload_size;
|
||||||
spa_ringbuffer_write_data(&state->ring,
|
|
||||||
state->ringbuffer_data,
|
|
||||||
state->ringbuffer_size,
|
|
||||||
index % state->ringbuffer_size,
|
|
||||||
pdu->avtp_payload, n_bytes);
|
|
||||||
|
|
||||||
index += n_bytes;
|
|
||||||
spa_ringbuffer_write_update(&state->ring, index);
|
spa_ringbuffer_write_update(&state->ring, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -895,7 +917,7 @@ static int handle_play(struct state *state, uint64_t current_time)
|
||||||
{
|
{
|
||||||
int res;
|
int res;
|
||||||
int32_t avail, wanted;
|
int32_t avail, wanted;
|
||||||
uint32_t index, n_bytes;
|
uint32_t index;
|
||||||
uint64_t ptime, txtime;
|
uint64_t ptime, txtime;
|
||||||
int pdu_count;
|
int pdu_count;
|
||||||
struct props *p = &state->props;
|
struct props *p = &state->props;
|
||||||
|
|
@ -913,16 +935,15 @@ static int handle_play(struct state *state, uint64_t current_time)
|
||||||
|
|
||||||
txtime = current_time + p->t_uncertainty;
|
txtime = current_time + p->t_uncertainty;
|
||||||
ptime = txtime + p->mtt;
|
ptime = txtime + p->mtt;
|
||||||
n_bytes = p->frames_per_pdu * state->stride;
|
|
||||||
|
|
||||||
while (pdu_count--) {
|
while (pdu_count--) {
|
||||||
*(__u64 *)CMSG_DATA(state->cmsg) = txtime;
|
*(__u64 *)CMSG_DATA(state->cmsg) = txtime;
|
||||||
|
|
||||||
spa_ringbuffer_read_data(&state->ring,
|
set_iovec(&state->ring,
|
||||||
state->ringbuffer_data,
|
state->ringbuffer_data,
|
||||||
state->ringbuffer_size,
|
state->ringbuffer_size,
|
||||||
index % state->ringbuffer_size,
|
index % state->ringbuffer_size,
|
||||||
pdu->avtp_payload, n_bytes);
|
&state->iov[1], state->payload_size);
|
||||||
|
|
||||||
#define PDU_SET(f,v) if ((res = avtp_aaf_pdu_set(pdu, (f), (v))) < 0) return res;
|
#define PDU_SET(f,v) if ((res = avtp_aaf_pdu_set(pdu, (f), (v))) < 0) return res;
|
||||||
PDU_SET(AVTP_AAF_FIELD_SEQ_NUM, state->pdu_seq++);
|
PDU_SET(AVTP_AAF_FIELD_SEQ_NUM, state->pdu_seq++);
|
||||||
|
|
@ -935,10 +956,9 @@ static int handle_play(struct state *state, uint64_t current_time)
|
||||||
}
|
}
|
||||||
txtime += state->pdu_period;
|
txtime += state->pdu_period;
|
||||||
ptime += state->pdu_period;
|
ptime += state->pdu_period;
|
||||||
index += n_bytes;
|
index += state->payload_size;
|
||||||
}
|
}
|
||||||
spa_ringbuffer_read_update(&state->ring, index);
|
spa_ringbuffer_read_update(&state->ring, index);
|
||||||
|
|
||||||
done:
|
done:
|
||||||
spa_node_call_ready(&state->callbacks, SPA_STATUS_NEED_DATA);
|
spa_node_call_ready(&state->callbacks, SPA_STATUS_NEED_DATA);
|
||||||
|
|
||||||
|
|
@ -951,6 +971,8 @@ int spa_avb_read(struct state *state)
|
||||||
uint32_t index;
|
uint32_t index;
|
||||||
struct port *port = &state->ports[0];
|
struct port *port = &state->ports[0];
|
||||||
struct buffer *b;
|
struct buffer *b;
|
||||||
|
struct spa_data *d;
|
||||||
|
uint32_t n_bytes;
|
||||||
|
|
||||||
if (state->position)
|
if (state->position)
|
||||||
state->duration = state->position->clock.duration;
|
state->duration = state->position->clock.duration;
|
||||||
|
|
@ -962,36 +984,31 @@ int spa_avb_read(struct state *state)
|
||||||
spa_log_warn(state->log, "capture underrun %d < %d", avail, wanted);
|
spa_log_warn(state->log, "capture underrun %d < %d", avail, wanted);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
while (avail >= wanted) {
|
if (spa_list_is_empty(&port->free))
|
||||||
struct spa_data *d;
|
return 0;
|
||||||
uint32_t n_bytes;
|
|
||||||
|
|
||||||
if (spa_list_is_empty(&port->free))
|
b = spa_list_first(&port->free, struct buffer, link);
|
||||||
break;
|
d = b->buf->datas;
|
||||||
|
|
||||||
b = spa_list_first(&port->free, struct buffer, link);
|
n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted);
|
||||||
d = b->buf->datas;
|
|
||||||
|
|
||||||
n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted);
|
spa_ringbuffer_read_data(&state->ring,
|
||||||
|
state->ringbuffer_data,
|
||||||
|
state->ringbuffer_size,
|
||||||
|
index % state->ringbuffer_size,
|
||||||
|
d[0].data, n_bytes);
|
||||||
|
|
||||||
spa_ringbuffer_read_data(&state->ring,
|
d[0].chunk->offset = 0;
|
||||||
state->ringbuffer_data,
|
d[0].chunk->size = n_bytes;
|
||||||
state->ringbuffer_size,
|
d[0].chunk->stride = state->stride;
|
||||||
index % state->ringbuffer_size,
|
d[0].chunk->flags = 0;
|
||||||
d[0].data, n_bytes);
|
|
||||||
|
|
||||||
d[0].chunk->offset = 0;
|
spa_list_remove(&b->link);
|
||||||
d[0].chunk->size = n_bytes;
|
spa_list_append(&port->ready, &b->link);
|
||||||
d[0].chunk->stride = state->stride;
|
|
||||||
d[0].chunk->flags = 0;
|
|
||||||
|
|
||||||
spa_list_remove(&b->link);
|
index += n_bytes;
|
||||||
spa_list_append(&port->ready, &b->link);
|
avail -= n_bytes;
|
||||||
|
spa_ringbuffer_read_update(&state->ring, index);
|
||||||
index += n_bytes;
|
|
||||||
avail -= n_bytes;
|
|
||||||
spa_ringbuffer_read_update(&state->ring, index);
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -237,12 +237,14 @@ struct state {
|
||||||
struct sockaddr_ll sock_addr;
|
struct sockaddr_ll sock_addr;
|
||||||
|
|
||||||
struct avtp_stream_pdu *pdu;
|
struct avtp_stream_pdu *pdu;
|
||||||
|
size_t hdr_size;
|
||||||
|
size_t payload_size;
|
||||||
size_t pdu_size;
|
size_t pdu_size;
|
||||||
int64_t pdu_period;
|
int64_t pdu_period;
|
||||||
uint8_t pdu_seq;
|
uint8_t pdu_seq;
|
||||||
uint8_t prev_seq;
|
uint8_t prev_seq;
|
||||||
|
|
||||||
struct iovec iov;
|
struct iovec iov[3];
|
||||||
struct msghdr msg;
|
struct msghdr msg;
|
||||||
char control[CMSG_SPACE(sizeof(__u64))];
|
char control[CMSG_SPACE(sizeof(__u64))];
|
||||||
struct cmsghdr *cmsg;
|
struct cmsghdr *cmsg;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue