From 537a38355a87e4412690a54cf83a4d33259aab3d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 8 Mar 2022 17:19:09 +0100 Subject: [PATCH] avb: more work --- spa/plugins/avb/avb-pcm-sink.c | 2 + spa/plugins/avb/avb-pcm.c | 408 +++++++++++++++++++++++---------- spa/plugins/avb/avb-pcm.h | 14 +- 3 files changed, 300 insertions(+), 124 deletions(-) diff --git a/spa/plugins/avb/avb-pcm-sink.c b/spa/plugins/avb/avb-pcm-sink.c index 56cfc615c..183bc16a0 100644 --- a/spa/plugins/avb/avb-pcm-sink.c +++ b/spa/plugins/avb/avb-pcm-sink.c @@ -788,6 +788,8 @@ static int impl_node_process(void *object) SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT); input->buffer_id = SPA_ID_INVALID; + spa_avb_write(this); + input->status = SPA_STATUS_OK; } return SPA_STATUS_HAVE_DATA; diff --git a/spa/plugins/avb/avb-pcm.c b/spa/plugins/avb/avb-pcm.c index 3a89f9e57..e7834fbc0 100644 --- a/spa/plugins/avb/avb-pcm.c +++ b/spa/plugins/avb/avb-pcm.c @@ -298,6 +298,7 @@ int spa_avb_init(struct state *state, const struct spa_dict *info) { uint32_t i; + state->quantum_limit = 8192; for (i = 0; info && i < info->n_items; i++) { const char *k = info->items[i].key; const char *s = info->items[i].value; @@ -307,6 +308,10 @@ int spa_avb_init(struct state *state, const struct spa_dict *info) avb_set_param(state, k, s); } } + + state->ringbuffer_size = state->quantum_limit * 64; + state->ringbuffer_data = calloc(1, state->ringbuffer_size * 4); + spa_ringbuffer_init(&state->ring); return 0; } @@ -315,41 +320,6 @@ int spa_avb_clear(struct state *state) return 0; } -int spa_avb_open(struct state *state, const char *params) -{ - int err; - - if (state->opened) - return 0; - - if ((err = spa_system_timerfd_create(state->data_system, - CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0) - goto error_exit_close; - - state->timerfd = err; - - state->opened = true; - - return 0; - -error_exit_close: - return err; -} - -int spa_avb_close(struct state *state) -{ - int err = 0; - - if (!state->opened) - return 0; - - spa_system_close(state->data_system, state->timerfd); - - state->opened = false; - - return err; -} - static int spa_format_to_aaf(uint32_t format) { switch(format) { @@ -586,22 +556,18 @@ error: static int setup_msg(struct state *state) { - struct cmsghdr *cmsg; - state->iov.iov_base = state->pdu; state->iov.iov_len = state->pdu_size; - state->msg.msg_name = &state->sock_addr; state->msg.msg_namelen = sizeof(state->sock_addr); state->msg.msg_iov = &state->iov; state->msg.msg_iovlen = 1; state->msg.msg_control = state->control; state->msg.msg_controllen = sizeof(state->control); - - cmsg = CMSG_FIRSTHDR(&state->msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_TXTIME; - cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); + state->cmsg = CMSG_FIRSTHDR(&state->msg); + state->cmsg->cmsg_level = SOL_SOCKET; + state->cmsg->cmsg_type = SCM_TXTIME; + state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64)); return 0; } @@ -629,7 +595,7 @@ int spa_avb_set_format(struct state *state, struct spa_audio_info *fmt, uint32_t return res; if ((res = spa_system_timerfd_create(state->data_system, - CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0) + CLOCK_REALTIME, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0) goto error_close_sockfd; state->timerfd = res; @@ -680,48 +646,289 @@ static void reset_buffers(struct state *this, struct port *port) } } -static int timer_start(struct state *state, uint64_t time, uint64_t period) +static bool is_pdu_valid(struct state *state) { - int res; - struct itimerspec ts; - uint64_t time_utc; - - state->timer_expirations = 0; - state->timer_period = period; - state->timer_starttime = time; - - time_utc = TAI_TO_UTC(time); - ts.it_value.tv_sec = time_utc / SPA_NSEC_PER_SEC; - ts.it_value.tv_nsec = time_utc % SPA_NSEC_PER_SEC; - ts.it_interval.tv_sec = 0; - ts.it_interval.tv_nsec = state->timer_period; - res = spa_system_timerfd_settime(state->data_system, state->timerfd, - SPA_FD_TIMER_ABSTIME, &ts, NULL); - return res; -} - -static int timer_start_playback(struct state *state) -{ - int res; - struct timespec now; - uint64_t time, period; - - if ((res = clock_gettime(CLOCK_TAI, &now)) < 0) { - spa_log_error(state->log, "clock_gettime(CLOCK_TAI) error: %m"); - return -errno; - } - - period = SPA_NSEC_PER_SEC * state->period_size / io->rate; - time = now.tv_sec * NSEC_PER_SEC + now.tv_nsec + period; - return timer_start(state, time, period); + return true; } static void avb_on_socket_event(struct spa_source *source) { + int res; + struct state *state = source->data; + ssize_t n; + int32_t filled; + uint32_t subtype, index; + struct avtp_common_pdu *common; + struct props *p = &state->props; + struct avtp_stream_pdu *pdu = state->pdu; + + n = recv(state->sockfd, state->pdu, state->pdu_size, 0); + if (n < 0) { + spa_log_error(state->log, "recv() failed: %m"); + return; + } + if (n != (ssize_t)state->pdu_size) { + spa_log_error(state->log, "AVTPDU dropped: Invalid size"); + return; + } + + common = (struct avtp_common_pdu *) pdu; + + if ((res = avtp_pdu_get(common, AVTP_FIELD_SUBTYPE, &subtype)) < 0) + return; + if (subtype != AVTP_SUBTYPE_AAF) + return; + + if (!is_pdu_valid(state)) { + spa_log_error(state->log, "AAF PDU invalid"); + return; + } + + filled = spa_ringbuffer_get_write_index(&state->ring, &index); + if (filled > (int32_t) state->ringbuffer_size) { + spa_log_warn(state->log, "overrun %d", filled); + } + spa_ringbuffer_write_data(&state->ring, + state->ringbuffer_data, + state->ringbuffer_size, + index % state->ringbuffer_size, + pdu->avtp_payload, + p->frames_per_pdu * state->stride); + + index += p->frames_per_pdu * state->stride; + spa_ringbuffer_write_update(&state->ring, index); +} + +static void set_timeout(struct state *state, uint64_t next_time) +{ + struct itimerspec ts; + uint64_t time_utc; + + spa_log_trace(state->log, "set timeout %"PRIu64, next_time); + + time_utc = next_time > TAI_OFFSET ? TAI_TO_UTC(next_time) : 0; + ts.it_value.tv_sec = time_utc / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time_utc % SPA_NSEC_PER_SEC; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + spa_system_timerfd_settime(state->data_system, + state->timer_source.fd, SPA_FD_TIMER_ABSTIME, &ts, NULL); +} + +int spa_avb_write(struct state *state) +{ + int32_t filled; + uint32_t index, to_write; + struct port *port = &state->ports[0]; + + filled = spa_ringbuffer_get_write_index(&state->ring, &index); + if (filled < 0) { + spa_log_warn(state->log, "underrun %d", filled); + } else if (filled > (int32_t)state->ringbuffer_size) { + spa_log_warn(state->log, "overrun %d", filled); + } + to_write = state->ringbuffer_size - filled; + + while (!spa_list_is_empty(&port->ready) && to_write > 0) { + size_t n_bytes, n_frames; + struct buffer *b; + struct spa_data *d; + uint32_t offs, avail, size; + + b = spa_list_first(&port->ready, struct buffer, link); + d = b->buf->datas; + + offs = SPA_MIN(d[0].chunk->offset + port->ready_offset, d[0].maxsize); + size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); + avail = (size - offs) / state->stride; + + n_frames = SPA_MIN(avail, to_write); + n_bytes = n_frames * state->stride; + + spa_ringbuffer_write_data(&state->ring, + state->ringbuffer_data, + state->ringbuffer_size, + index % state->ringbuffer_size, + SPA_PTROFF(d[0].data, offs, void), + n_bytes); + + port->ready_offset += n_bytes; + + if (port->ready_offset >= size || avail == 0) { + spa_list_remove(&b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + port->io->buffer_id = b->id; + spa_log_trace_fp(state->log, "%p: reuse buffer %u", state, b->id); + + spa_node_call_reuse_buffer(&state->callbacks, 0, b->id); + + port->ready_offset = 0; + } + to_write -= n_frames; + index += n_bytes; + } + spa_ringbuffer_write_update(&state->ring, index); + return 0; +} + +static int handle_play(struct state *state, uint64_t current_time) +{ + int res; + int32_t avail; + uint32_t index, n_bytes; + uint64_t ptime, txtime; + int pdu_count; + struct props *p = &state->props; + struct avtp_stream_pdu *pdu = state->pdu; + ssize_t n; + + avail = spa_ringbuffer_get_read_index(&state->ring, &index); + if (avail < (int32_t) state->duration) { + spa_log_warn(state->log, "underrun %d", avail); + } + + pdu_count = state->duration / p->frames_per_pdu; + + txtime = current_time + p->t_uncertainty; + ptime = txtime + p->mtt; + n_bytes = p->frames_per_pdu * state->stride; + + while (pdu_count--) { + *(__u64 *)CMSG_DATA(state->cmsg) = txtime; + + spa_ringbuffer_read_data(&state->ring, + state->ringbuffer_data, + state->ringbuffer_size, + index % state->ringbuffer_size, + pdu->avtp_payload, n_bytes); + +#define PDU_SET(f,v) if ((res = avtp_aaf_pdu_set(pdu, (f), (v))) < 0) return res; + PDU_SET(AVTP_AAF_FIELD_SEQ_NUM, state->pdu_seq++); + PDU_SET(AVTP_AAF_FIELD_TIMESTAMP, ptime); +#undef PDU_SET + n = sendmsg(state->sockfd, &state->msg, 0); + if (n < 0 || n != (ssize_t)state->pdu_size) { + spa_log_error(state->log, "sendmdg() failed: %m"); + return -errno; + } + txtime += state->pdu_period; + ptime += state->pdu_period; + index += n_bytes; + } + spa_ringbuffer_read_update(&state->ring, index); + + spa_node_call_ready(&state->callbacks, SPA_STATUS_NEED_DATA); + + return 0; +} + +static int handle_capture(struct state *state, uint64_t current_time) +{ + return 0; } static void avb_on_timeout_event(struct spa_source *source) { + struct state *state = source->data; + uint64_t expirations, current_time, duration; + uint32_t rate; + + spa_log_trace(state->log, "timeout"); + + if (spa_system_timerfd_read(state->data_system, + state->timer_source.fd, &expirations) < 0) { + if (errno == EAGAIN) + return; + spa_log_error(state->log, "read timerfd: %m"); + } + + current_time = state->next_time; + if (SPA_LIKELY(state->position)) { + duration = state->position->clock.duration; + rate = state->position->clock.rate.denom; + } else { + duration = 1024; + rate = 48000; + } + state->duration = duration; + + if (state->ports[0].direction == SPA_DIRECTION_INPUT) + handle_play(state, current_time); + else + handle_capture(state, current_time); + + state->next_time = current_time + duration * SPA_NSEC_PER_SEC / rate; + + if (SPA_LIKELY(state->clock)) { + state->clock->nsec = current_time; + state->clock->position += duration; + state->clock->duration = duration; + state->clock->delay = 0; + state->clock->rate_diff = 1.0; + state->clock->next_nsec = state->next_time; + } + + set_timeout(state, state->next_time); +} + +static int set_timers(struct state *state) +{ + struct timespec now; + int res; + + if ((res = spa_system_clock_gettime(state->data_system, CLOCK_TAI, &now)) < 0) + return res; + + state->next_time = SPA_TIMESPEC_TO_NSEC(&now); + + if (state->following) { + set_timeout(state, 0); + } else { + set_timeout(state, state->next_time); + } + return 0; +} + +static inline bool is_following(struct state *state) +{ + return state->position && state->clock && state->position->clock.id != state->clock->id; +} + +static int do_reassign_follower(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct state *state = user_data; + spa_dll_init(&state->dll); + set_timers(state); + return 0; +} + +int spa_avb_reassign_follower(struct state *state) +{ + bool following, freewheel; + + if (!state->started) + return 0; + + following = is_following(state); + if (following != state->following) { + spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following); + state->following = following; + spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state); + } + + freewheel = state->position && + SPA_FLAG_IS_SET(state->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL); + + if (state->freewheel != freewheel) { + spa_log_debug(state->log, "%p: freewheel %d->%d", state, state->freewheel, freewheel); + state->freewheel = freewheel; + } + return 0; } int spa_avb_start(struct state *state) @@ -747,6 +954,8 @@ int spa_avb_start(struct state *state) state->timer_source.rmask = 0; spa_loop_add_source(state->data_loop, &state->timer_source); + state->pdu_seq = 0; + if (state->ports[0].direction == SPA_DIRECTION_OUTPUT) { state->sock_source.func = avb_on_socket_event; state->sock_source.data = state; @@ -758,47 +967,13 @@ int spa_avb_start(struct state *state) reset_buffers(state, &state->ports[0]); + set_timers(state); + state->started = true; return 0; } -static int do_reassign_follower(struct spa_loop *loop, - bool async, - uint32_t seq, - const void *data, - size_t size, - void *user_data) -{ - struct state *state = user_data; - spa_dll_init(&state->dll); - return 0; -} - -int spa_avb_reassign_follower(struct state *state) -{ - bool following, freewheel; - - if (!state->started) - return 0; - - following = false; - if (following != state->following) { - spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following); - state->following = following; - spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state); - } - - freewheel = state->position && - SPA_FLAG_IS_SET(state->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL); - - if (state->freewheel != freewheel) { - spa_log_debug(state->log, "%p: freewheel %d->%d", state, state->freewheel, freewheel); - state->freewheel = freewheel; - } - return 0; -} - static int do_remove_source(struct spa_loop *loop, bool async, uint32_t seq, @@ -807,14 +982,8 @@ static int do_remove_source(struct spa_loop *loop, void *user_data) { struct state *state = user_data; - struct itimerspec ts; spa_loop_remove_source(state->data_loop, &state->timer_source); - ts.it_value.tv_sec = 0; - ts.it_value.tv_nsec = 0; - ts.it_interval.tv_sec = 0; - ts.it_interval.tv_nsec = 0; - spa_system_timerfd_settime(state->data_system, state->timerfd, 0, &ts, NULL); if (state->ports[0].direction == SPA_DIRECTION_OUTPUT) { spa_loop_remove_source(state->data_loop, &state->sock_source); @@ -832,6 +1001,7 @@ int spa_avb_pause(struct state *state) spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state); state->started = false; + set_timeout(state, 0); return 0; } diff --git a/spa/plugins/avb/avb-pcm.h b/spa/plugins/avb/avb-pcm.h index bc65149d4..29a55f1b4 100644 --- a/spa/plugins/avb/avb-pcm.h +++ b/spa/plugins/avb/avb-pcm.h @@ -49,6 +49,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -119,6 +120,7 @@ struct port { struct spa_list free; struct spa_list ready; + uint32_t ready_offset; }; struct state { @@ -165,19 +167,15 @@ struct state { struct port ports[1]; uint32_t duration; - uint32_t threshold; unsigned int following:1; unsigned int matching:1; unsigned int resample:1; - unsigned int opened:1; unsigned int started:1; unsigned int freewheel:1; int timerfd; struct spa_source timer_source; - uint64_t timer_starttime; - uint64_t timer_period; - uint64_t timer_expirations; + uint64_t next_time; int sockfd; struct spa_source sock_source; @@ -186,10 +184,16 @@ struct state { struct avtp_stream_pdu *pdu; size_t pdu_size; int64_t pdu_period; + uint8_t pdu_seq; struct iovec iov; struct msghdr msg; char control[CMSG_SPACE(sizeof(__u64))]; + struct cmsghdr *cmsg; + + uint8_t *ringbuffer_data; + uint32_t ringbuffer_size; + struct spa_ringbuffer ring; struct spa_dll dll; double max_error;