avb: more work

This commit is contained in:
Wim Taymans 2022-03-08 17:19:09 +01:00
parent f95b8263e6
commit 537a38355a
3 changed files with 300 additions and 124 deletions

View file

@ -788,6 +788,8 @@ static int impl_node_process(void *object)
SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT);
input->buffer_id = SPA_ID_INVALID;
spa_avb_write(this);
input->status = SPA_STATUS_OK;
}
return SPA_STATUS_HAVE_DATA;

View file

@ -298,6 +298,7 @@ int spa_avb_init(struct state *state, const struct spa_dict *info)
{
uint32_t i;
state->quantum_limit = 8192;
for (i = 0; info && i < info->n_items; i++) {
const char *k = info->items[i].key;
const char *s = info->items[i].value;
@ -307,6 +308,10 @@ int spa_avb_init(struct state *state, const struct spa_dict *info)
avb_set_param(state, k, s);
}
}
state->ringbuffer_size = state->quantum_limit * 64;
state->ringbuffer_data = calloc(1, state->ringbuffer_size * 4);
spa_ringbuffer_init(&state->ring);
return 0;
}
@ -315,41 +320,6 @@ int spa_avb_clear(struct state *state)
return 0;
}
int spa_avb_open(struct state *state, const char *params)
{
int err;
if (state->opened)
return 0;
if ((err = spa_system_timerfd_create(state->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
goto error_exit_close;
state->timerfd = err;
state->opened = true;
return 0;
error_exit_close:
return err;
}
int spa_avb_close(struct state *state)
{
int err = 0;
if (!state->opened)
return 0;
spa_system_close(state->data_system, state->timerfd);
state->opened = false;
return err;
}
static int spa_format_to_aaf(uint32_t format)
{
switch(format) {
@ -586,22 +556,18 @@ error:
static int setup_msg(struct state *state)
{
struct cmsghdr *cmsg;
state->iov.iov_base = state->pdu;
state->iov.iov_len = state->pdu_size;
state->msg.msg_name = &state->sock_addr;
state->msg.msg_namelen = sizeof(state->sock_addr);
state->msg.msg_iov = &state->iov;
state->msg.msg_iovlen = 1;
state->msg.msg_control = state->control;
state->msg.msg_controllen = sizeof(state->control);
cmsg = CMSG_FIRSTHDR(&state->msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_TXTIME;
cmsg->cmsg_len = CMSG_LEN(sizeof(__u64));
state->cmsg = CMSG_FIRSTHDR(&state->msg);
state->cmsg->cmsg_level = SOL_SOCKET;
state->cmsg->cmsg_type = SCM_TXTIME;
state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64));
return 0;
}
@ -629,7 +595,7 @@ int spa_avb_set_format(struct state *state, struct spa_audio_info *fmt, uint32_t
return res;
if ((res = spa_system_timerfd_create(state->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
CLOCK_REALTIME, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
goto error_close_sockfd;
state->timerfd = res;
@ -680,48 +646,289 @@ static void reset_buffers(struct state *this, struct port *port)
}
}
static int timer_start(struct state *state, uint64_t time, uint64_t period)
static bool is_pdu_valid(struct state *state)
{
int res;
struct itimerspec ts;
uint64_t time_utc;
state->timer_expirations = 0;
state->timer_period = period;
state->timer_starttime = time;
time_utc = TAI_TO_UTC(time);
ts.it_value.tv_sec = time_utc / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time_utc % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = state->timer_period;
res = spa_system_timerfd_settime(state->data_system, state->timerfd,
SPA_FD_TIMER_ABSTIME, &ts, NULL);
return res;
}
static int timer_start_playback(struct state *state)
{
int res;
struct timespec now;
uint64_t time, period;
if ((res = clock_gettime(CLOCK_TAI, &now)) < 0) {
spa_log_error(state->log, "clock_gettime(CLOCK_TAI) error: %m");
return -errno;
}
period = SPA_NSEC_PER_SEC * state->period_size / io->rate;
time = now.tv_sec * NSEC_PER_SEC + now.tv_nsec + period;
return timer_start(state, time, period);
return true;
}
static void avb_on_socket_event(struct spa_source *source)
{
int res;
struct state *state = source->data;
ssize_t n;
int32_t filled;
uint32_t subtype, index;
struct avtp_common_pdu *common;
struct props *p = &state->props;
struct avtp_stream_pdu *pdu = state->pdu;
n = recv(state->sockfd, state->pdu, state->pdu_size, 0);
if (n < 0) {
spa_log_error(state->log, "recv() failed: %m");
return;
}
if (n != (ssize_t)state->pdu_size) {
spa_log_error(state->log, "AVTPDU dropped: Invalid size");
return;
}
common = (struct avtp_common_pdu *) pdu;
if ((res = avtp_pdu_get(common, AVTP_FIELD_SUBTYPE, &subtype)) < 0)
return;
if (subtype != AVTP_SUBTYPE_AAF)
return;
if (!is_pdu_valid(state)) {
spa_log_error(state->log, "AAF PDU invalid");
return;
}
filled = spa_ringbuffer_get_write_index(&state->ring, &index);
if (filled > (int32_t) state->ringbuffer_size) {
spa_log_warn(state->log, "overrun %d", filled);
}
spa_ringbuffer_write_data(&state->ring,
state->ringbuffer_data,
state->ringbuffer_size,
index % state->ringbuffer_size,
pdu->avtp_payload,
p->frames_per_pdu * state->stride);
index += p->frames_per_pdu * state->stride;
spa_ringbuffer_write_update(&state->ring, index);
}
static void set_timeout(struct state *state, uint64_t next_time)
{
struct itimerspec ts;
uint64_t time_utc;
spa_log_trace(state->log, "set timeout %"PRIu64, next_time);
time_utc = next_time > TAI_OFFSET ? TAI_TO_UTC(next_time) : 0;
ts.it_value.tv_sec = time_utc / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time_utc % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(state->data_system,
state->timer_source.fd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
int spa_avb_write(struct state *state)
{
int32_t filled;
uint32_t index, to_write;
struct port *port = &state->ports[0];
filled = spa_ringbuffer_get_write_index(&state->ring, &index);
if (filled < 0) {
spa_log_warn(state->log, "underrun %d", filled);
} else if (filled > (int32_t)state->ringbuffer_size) {
spa_log_warn(state->log, "overrun %d", filled);
}
to_write = state->ringbuffer_size - filled;
while (!spa_list_is_empty(&port->ready) && to_write > 0) {
size_t n_bytes, n_frames;
struct buffer *b;
struct spa_data *d;
uint32_t offs, avail, size;
b = spa_list_first(&port->ready, struct buffer, link);
d = b->buf->datas;
offs = SPA_MIN(d[0].chunk->offset + port->ready_offset, d[0].maxsize);
size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs);
avail = (size - offs) / state->stride;
n_frames = SPA_MIN(avail, to_write);
n_bytes = n_frames * state->stride;
spa_ringbuffer_write_data(&state->ring,
state->ringbuffer_data,
state->ringbuffer_size,
index % state->ringbuffer_size,
SPA_PTROFF(d[0].data, offs, void),
n_bytes);
port->ready_offset += n_bytes;
if (port->ready_offset >= size || avail == 0) {
spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
port->io->buffer_id = b->id;
spa_log_trace_fp(state->log, "%p: reuse buffer %u", state, b->id);
spa_node_call_reuse_buffer(&state->callbacks, 0, b->id);
port->ready_offset = 0;
}
to_write -= n_frames;
index += n_bytes;
}
spa_ringbuffer_write_update(&state->ring, index);
return 0;
}
static int handle_play(struct state *state, uint64_t current_time)
{
int res;
int32_t avail;
uint32_t index, n_bytes;
uint64_t ptime, txtime;
int pdu_count;
struct props *p = &state->props;
struct avtp_stream_pdu *pdu = state->pdu;
ssize_t n;
avail = spa_ringbuffer_get_read_index(&state->ring, &index);
if (avail < (int32_t) state->duration) {
spa_log_warn(state->log, "underrun %d", avail);
}
pdu_count = state->duration / p->frames_per_pdu;
txtime = current_time + p->t_uncertainty;
ptime = txtime + p->mtt;
n_bytes = p->frames_per_pdu * state->stride;
while (pdu_count--) {
*(__u64 *)CMSG_DATA(state->cmsg) = txtime;
spa_ringbuffer_read_data(&state->ring,
state->ringbuffer_data,
state->ringbuffer_size,
index % state->ringbuffer_size,
pdu->avtp_payload, n_bytes);
#define PDU_SET(f,v) if ((res = avtp_aaf_pdu_set(pdu, (f), (v))) < 0) return res;
PDU_SET(AVTP_AAF_FIELD_SEQ_NUM, state->pdu_seq++);
PDU_SET(AVTP_AAF_FIELD_TIMESTAMP, ptime);
#undef PDU_SET
n = sendmsg(state->sockfd, &state->msg, 0);
if (n < 0 || n != (ssize_t)state->pdu_size) {
spa_log_error(state->log, "sendmdg() failed: %m");
return -errno;
}
txtime += state->pdu_period;
ptime += state->pdu_period;
index += n_bytes;
}
spa_ringbuffer_read_update(&state->ring, index);
spa_node_call_ready(&state->callbacks, SPA_STATUS_NEED_DATA);
return 0;
}
static int handle_capture(struct state *state, uint64_t current_time)
{
return 0;
}
static void avb_on_timeout_event(struct spa_source *source)
{
struct state *state = source->data;
uint64_t expirations, current_time, duration;
uint32_t rate;
spa_log_trace(state->log, "timeout");
if (spa_system_timerfd_read(state->data_system,
state->timer_source.fd, &expirations) < 0) {
if (errno == EAGAIN)
return;
spa_log_error(state->log, "read timerfd: %m");
}
current_time = state->next_time;
if (SPA_LIKELY(state->position)) {
duration = state->position->clock.duration;
rate = state->position->clock.rate.denom;
} else {
duration = 1024;
rate = 48000;
}
state->duration = duration;
if (state->ports[0].direction == SPA_DIRECTION_INPUT)
handle_play(state, current_time);
else
handle_capture(state, current_time);
state->next_time = current_time + duration * SPA_NSEC_PER_SEC / rate;
if (SPA_LIKELY(state->clock)) {
state->clock->nsec = current_time;
state->clock->position += duration;
state->clock->duration = duration;
state->clock->delay = 0;
state->clock->rate_diff = 1.0;
state->clock->next_nsec = state->next_time;
}
set_timeout(state, state->next_time);
}
static int set_timers(struct state *state)
{
struct timespec now;
int res;
if ((res = spa_system_clock_gettime(state->data_system, CLOCK_TAI, &now)) < 0)
return res;
state->next_time = SPA_TIMESPEC_TO_NSEC(&now);
if (state->following) {
set_timeout(state, 0);
} else {
set_timeout(state, state->next_time);
}
return 0;
}
static inline bool is_following(struct state *state)
{
return state->position && state->clock && state->position->clock.id != state->clock->id;
}
static int do_reassign_follower(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct state *state = user_data;
spa_dll_init(&state->dll);
set_timers(state);
return 0;
}
int spa_avb_reassign_follower(struct state *state)
{
bool following, freewheel;
if (!state->started)
return 0;
following = is_following(state);
if (following != state->following) {
spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following);
state->following = following;
spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state);
}
freewheel = state->position &&
SPA_FLAG_IS_SET(state->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL);
if (state->freewheel != freewheel) {
spa_log_debug(state->log, "%p: freewheel %d->%d", state, state->freewheel, freewheel);
state->freewheel = freewheel;
}
return 0;
}
int spa_avb_start(struct state *state)
@ -747,6 +954,8 @@ int spa_avb_start(struct state *state)
state->timer_source.rmask = 0;
spa_loop_add_source(state->data_loop, &state->timer_source);
state->pdu_seq = 0;
if (state->ports[0].direction == SPA_DIRECTION_OUTPUT) {
state->sock_source.func = avb_on_socket_event;
state->sock_source.data = state;
@ -758,47 +967,13 @@ int spa_avb_start(struct state *state)
reset_buffers(state, &state->ports[0]);
set_timers(state);
state->started = true;
return 0;
}
static int do_reassign_follower(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct state *state = user_data;
spa_dll_init(&state->dll);
return 0;
}
int spa_avb_reassign_follower(struct state *state)
{
bool following, freewheel;
if (!state->started)
return 0;
following = false;
if (following != state->following) {
spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following);
state->following = following;
spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state);
}
freewheel = state->position &&
SPA_FLAG_IS_SET(state->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL);
if (state->freewheel != freewheel) {
spa_log_debug(state->log, "%p: freewheel %d->%d", state, state->freewheel, freewheel);
state->freewheel = freewheel;
}
return 0;
}
static int do_remove_source(struct spa_loop *loop,
bool async,
uint32_t seq,
@ -807,14 +982,8 @@ static int do_remove_source(struct spa_loop *loop,
void *user_data)
{
struct state *state = user_data;
struct itimerspec ts;
spa_loop_remove_source(state->data_loop, &state->timer_source);
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(state->data_system, state->timerfd, 0, &ts, NULL);
if (state->ports[0].direction == SPA_DIRECTION_OUTPUT) {
spa_loop_remove_source(state->data_loop, &state->sock_source);
@ -832,6 +1001,7 @@ int spa_avb_pause(struct state *state)
spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state);
state->started = false;
set_timeout(state, 0);
return 0;
}

View file

@ -49,6 +49,7 @@ extern "C" {
#include <spa/node/utils.h>
#include <spa/node/io.h>
#include <spa/debug/types.h>
#include <spa/utils/ringbuffer.h>
#include <spa/param/param.h>
#include <spa/param/latency-utils.h>
#include <spa/param/audio/format-utils.h>
@ -119,6 +120,7 @@ struct port {
struct spa_list free;
struct spa_list ready;
uint32_t ready_offset;
};
struct state {
@ -165,19 +167,15 @@ struct state {
struct port ports[1];
uint32_t duration;
uint32_t threshold;
unsigned int following:1;
unsigned int matching:1;
unsigned int resample:1;
unsigned int opened:1;
unsigned int started:1;
unsigned int freewheel:1;
int timerfd;
struct spa_source timer_source;
uint64_t timer_starttime;
uint64_t timer_period;
uint64_t timer_expirations;
uint64_t next_time;
int sockfd;
struct spa_source sock_source;
@ -186,10 +184,16 @@ struct state {
struct avtp_stream_pdu *pdu;
size_t pdu_size;
int64_t pdu_period;
uint8_t pdu_seq;
struct iovec iov;
struct msghdr msg;
char control[CMSG_SPACE(sizeof(__u64))];
struct cmsghdr *cmsg;
uint8_t *ringbuffer_data;
uint32_t ringbuffer_size;
struct spa_ringbuffer ring;
struct spa_dll dll;
double max_error;