module-rtp: Allow aes67 send with a non PTP clock

Our current AES67 sender setup requires that that PTP driver drive the
entire graph. This adds support for allowing the AES67 RTP sink to be
driven by an arbitrary driver, while still using the PTP driver for
sending data on the network.

When aes67.driver-group is specified a pw_filter is created with no
ports, node.always-process = true and node.group set to the
aes67.driver-group. When set to PTP, this gives us process callbacks at
the PTP rate which we use to get the current PTP time in the RTP sender
by interpolating the clock snapshots from the pw-filter.

Implementation ideas from Wim Taymans. Co-authored with Sanchayan Maity.

For a detailed reference, refer the following papers by Fons Adriaensen.
- Using a DLL to filter time
  (https://kokkinizita.linuxaudio.org/papers/usingdll.pdf)
- Controlling adaptive resampling
  (http://kokkinizita.linuxaudio.org/papers/adapt-resamp.pdf)
This commit is contained in:
Arun Raghavan 2024-08-15 11:26:13 -04:00 committed by Arun Raghavan
parent 9ccf62d4f6
commit 9f643fec7e
4 changed files with 289 additions and 7 deletions

View file

@ -214,7 +214,7 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
iov[1].iov_base = buffer;
}
static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets)
static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uint64_t set_timestamp)
{
int32_t avail, tosend;
uint32_t stride, timestamp;
@ -251,6 +251,7 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets)
header.m = 0;
header.sequence_number = htons(impl->seq);
header.timestamp = htonl(impl->ts_offset + timestamp);
header.timestamp = htonl(impl->ts_offset + set_timestamp ? set_timestamp : timestamp);
set_iovec(&impl->ring,
impl->buffer, BUFFER_SIZE,
@ -289,7 +290,7 @@ static void rtp_audio_flush_timeout(struct impl *impl, uint64_t expirations)
{
if (expirations > 1)
pw_log_warn("missing timeout %"PRIu64, expirations);
rtp_audio_flush_packets(impl, expirations);
rtp_audio_flush_packets(impl, expirations, 0);
}
static void rtp_audio_process_capture(void *data)
@ -303,6 +304,12 @@ static void rtp_audio_process_capture(void *data)
struct spa_io_position *pos;
uint64_t next_nsec, quantum;
if (impl->separate_sender) {
/* apply the DLL rate */
SPA_FLAG_SET(impl->io_rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE);
impl->io_rate_match->rate = impl->ptp_corr;
}
if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) {
pw_log_info("Out of stream buffers: %m");
return;
@ -322,6 +329,14 @@ static void rtp_audio_process_capture(void *data)
timestamp = pos->clock.position * impl->rate / rate;
next_nsec = pos->clock.next_nsec;
quantum = (uint64_t)(pos->clock.duration * SPA_NSEC_PER_SEC / (rate * pos->clock.rate_diff));
if (impl->separate_sender) {
/* the sender process() function uses this for managing the DLL */
impl->sink_nsec = pos->clock.nsec;
impl->sink_next_nsec = pos->clock.next_nsec;
impl->sink_resamp_delay = impl->io_rate_match->delay;
impl->sink_quantum = (uint64_t)(pos->clock.duration * SPA_NSEC_PER_SEC / rate);
}
} else {
timestamp = expected_timestamp;
next_nsec = 0;
@ -336,6 +351,12 @@ static void rtp_audio_process_capture(void *data)
impl->have_sync = true;
expected_timestamp = timestamp;
filled = 0;
if (impl->separate_sender) {
/* the sender should know that the sync state has changed, and that it should
* refill the buffer */
impl->refilling = true;
}
} else {
if (SPA_ABS((int)expected_timestamp - (int)timestamp) > (int)quantum) {
pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp);
@ -347,6 +368,8 @@ static void rtp_audio_process_capture(void *data)
}
}
pw_log_trace("writing %u samples at %u", wanted, expected_timestamp);
spa_ringbuffer_write_data(&impl->ring,
impl->buffer,
BUFFER_SIZE,
@ -357,12 +380,17 @@ static void rtp_audio_process_capture(void *data)
pw_stream_queue_buffer(impl->stream, buf);
if (impl->separate_sender) {
/* sending will happen in a separate process() */
return;
}
pending = filled / impl->psamples;
num_queued = (filled + wanted) / impl->psamples;
if (num_queued > 0) {
/* flush all previous packets plus new one right away */
rtp_audio_flush_packets(impl, pending + 1);
rtp_audio_flush_packets(impl, pending + 1, 0);
num_queued -= SPA_MIN(num_queued, pending + 1);
if (num_queued > 0) {
@ -375,13 +403,210 @@ static void rtp_audio_process_capture(void *data)
}
}
static int rtp_audio_init(struct impl *impl, enum spa_direction direction)
static void ptp_sender_destroy(void *d)
{
struct impl *impl = d;
spa_hook_remove(&impl->ptp_sender_listener);
impl->ptp_sender = NULL;
}
static void ptp_sender_process(void *d, struct spa_io_position *position)
{
struct impl *impl = d;
uint64_t nsec, next_nsec, quantum, quantum_nsec;
uint32_t ptp_timestamp, rtp_timestamp, read_idx;
uint32_t rate;
uint32_t filled;
double error, in_flight, delay;
nsec = position->clock.nsec;
next_nsec = position->clock.next_nsec;
/* the ringbuffer indices are in sink timetamp domain */
filled = spa_ringbuffer_get_read_index(&impl->ring, &read_idx);
if (SPA_LIKELY(position)) {
rate = position->clock.rate.denom;
quantum = position->clock.duration;
quantum_nsec = (uint64_t)(quantum * SPA_NSEC_PER_SEC / rate);
/* PTP time tells us what time it is */
ptp_timestamp = position->clock.position * impl->rate / rate;
/* RTP time is based on when we sent the first packet after the last sync */
rtp_timestamp = impl->rtp_base_ts + read_idx;
} else {
pw_log_warn("No clock information, skipping");
return;
}
pw_log_trace("sink nsec:%lu, sink next_nsec:%lu, ptp nsec:%lu, ptp next_sec:%lu",
impl->sink_nsec, impl->sink_next_nsec, nsec, next_nsec);
/* If send is lagging by more than 2 or more quanta, reset */
if (!impl->refilling && impl->rtp_last_ts &&
SPA_ABS((int32_t)ptp_timestamp - (int32_t)impl->rtp_last_ts) >= (int32_t)(2 * quantum)) {
pw_log_warn("expected %u - timestamp %u = %d >= 2 * %lu quantum", rtp_timestamp, impl->rtp_last_ts,
(int)rtp_timestamp - (int)impl->rtp_last_ts, quantum);
goto resync;
}
if (!impl->have_sync) {
pw_log_trace("Waiting for sync");
return;
}
in_flight = (double)impl->sink_quantum * impl->rate / SPA_NSEC_PER_SEC *
(double)(nsec - impl->sink_nsec) / (impl->sink_next_nsec - impl->sink_nsec);
delay = filled + in_flight + impl->sink_resamp_delay;
/* Make sure the PTP node wake up times are within the bounds of sink
* node wake up times (with a little bit of tolerance). */
if (SPA_LIKELY(nsec > impl->sink_nsec - quantum_nsec &&
nsec < impl->sink_next_nsec + quantum_nsec)) {
/* Start adjusting if we're at/past the target delay. We requested ~1/2 the buffer
* size as the sink latency, so doing so ensures that we have two sink quanta of
* data, making the chance of and underrun low even for small buffer values */
if (impl->refilling && (double)impl->target_buffer - delay <= 0) {
impl->refilling = false;
/* Store the offset for the PTP time at which we start sending */
impl->rtp_base_ts = ptp_timestamp - read_idx;
rtp_timestamp = impl->rtp_base_ts + read_idx; /* = ptp_timestamp */
pw_log_debug("start sending. sink quantum:%lu, ptp quantum:%lu", impl->sink_quantum, quantum_nsec);
}
if (!impl->refilling) {
/*
* As per Controlling Adaptive Resampling paper[1], maintain
* W(t) - R(t) - delta = 0. We keep delta as target_buffer.
*
* [1] http://kokkinizita.linuxaudio.org/papers/adapt-resamp.pdf
*/
error = delay - impl->target_buffer;
error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
impl->ptp_corr = spa_dll_update(&impl->ptp_dll, error);
pw_log_debug("filled:%u in_flight:%g delay:%g target:%u error:%f corr:%f",
filled, in_flight, delay, impl->target_buffer, error, impl->ptp_corr);
if (filled >= impl->psamples) {
rtp_audio_flush_packets(impl, 1, rtp_timestamp);
impl->rtp_last_ts = rtp_timestamp;
}
}
} else {
pw_log_warn("PTP node wake up time out of bounds !(%lu < %lu < %lu)",
impl->sink_nsec, nsec, impl->sink_next_nsec);
goto resync;
}
return;
resync:
impl->have_sync = false;
impl->rtp_last_ts = 0;
return;
}
static const struct pw_filter_events ptp_sender_events = {
PW_VERSION_FILTER_EVENTS,
.destroy = ptp_sender_destroy,
.process = ptp_sender_process
};
static int setup_ptp_sender(struct impl *impl, struct pw_core *core, enum pw_direction direction, const char *driver_grp)
{
const struct spa_pod *params[4];
struct pw_properties *filter_props = NULL;
struct spa_pod_builder b;
uint32_t n_params;
uint8_t buffer[1024];
int ret;
if (direction != PW_DIRECTION_INPUT)
return 0;
if (driver_grp == NULL) {
pw_log_info("AES67 driver group not specified, no separate sender configured");
return 0;
}
pw_log_info("AES67 driver group: %s, setting up separate sender", driver_grp);
spa_dll_init(&impl->ptp_dll);
/* BW selected empirically, as it converges most quickly and holds reasonably well in testing */
spa_dll_set_bw(&impl->ptp_dll, SPA_DLL_BW_MAX, impl->psamples, impl->rate);
impl->ptp_corr = 1.0;
n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
filter_props = pw_properties_new(NULL, NULL);
if (filter_props == NULL) {
int res = -errno;
pw_log_error( "can't create properties: %m");
return res;
}
pw_properties_set(filter_props, PW_KEY_NODE_GROUP, driver_grp);
pw_properties_setf(filter_props, PW_KEY_NODE_NAME, "%s-ptp-sender", pw_stream_get_name(impl->stream));
pw_properties_set(filter_props, PW_KEY_NODE_ALWAYS_PROCESS, "true");
/*
* sess.latency.msec defines how much data is buffered before it is
* sent out on the network. This is done by setting the node.latency
* to that value, and process function will get chunks of that size.
* It is then split up into psamples chunks and send every ptime.
*
* With this separate sender mechanism we have some latency in stream
* via node.latency, and some in ringbuffer between sink and sender.
* Ideally we want to have a total latency that still corresponds to
* sess.latency.msec. We do this by using the property setting and
* splitting some of it as stream latency and some as ringbuffer
* latency. The ringbuffer latency is actually determined by how
* long we wait before setting `refilling` to false and start the
* sending. Also, see `filter_process`.
*/
pw_properties_setf(filter_props, PW_KEY_NODE_FORCE_QUANTUM, "%u", impl->psamples);
pw_properties_setf(filter_props, PW_KEY_NODE_FORCE_RATE, "%u", impl->rate);
impl->ptp_sender = pw_filter_new(core, NULL, filter_props);
if (impl->ptp_sender == NULL)
return -errno;
pw_filter_add_listener(impl->ptp_sender, &impl->ptp_sender_listener,
&ptp_sender_events, impl);
n_params = 0;
params[n_params++] = spa_format_audio_raw_build(&b,
SPA_PARAM_EnumFormat, &impl->info.info.raw);
params[n_params++] = spa_format_audio_raw_build(&b,
SPA_PARAM_Format, &impl->info.info.raw);
ret = pw_filter_connect(impl->ptp_sender,
PW_FILTER_FLAG_RT_PROCESS,
params, n_params);
if (ret == 0) {
pw_log_info("created pw_filter for separate sender");
impl->separate_sender = true;
} else {
pw_log_error("failed to create pw_filter for separate sender");
impl->separate_sender = false;
}
return ret;
}
static int rtp_audio_init(struct impl *impl, struct pw_core *core, enum spa_direction direction, const char *ptp_driver)
{
if (direction == SPA_DIRECTION_INPUT)
impl->stream_events.process = rtp_audio_process_capture;
else
impl->stream_events.process = rtp_audio_process_playback;
impl->receive_rtp = rtp_audio_receive;
impl->flush_timeout = rtp_audio_flush_timeout;
setup_ptp_sender(impl, core, direction, ptp_driver);
return 0;
}

View file

@ -98,6 +98,27 @@ struct impl {
int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len);
void (*flush_timeout)(struct impl *impl, uint64_t expirations);
/*
* pw_filter where the filter would be driven at the PTP clock
* rate with RTP sink being driven at the sink driver clock rate
* or some ALSA clock rate.
*/
struct pw_filter *ptp_sender;
struct spa_hook ptp_sender_listener;
struct spa_dll ptp_dll;
double ptp_corr;
bool separate_sender;
bool refilling;
/* Track some variables we need from the sink driver */
uint64_t sink_next_nsec;
uint64_t sink_nsec;
uint64_t sink_resamp_delay;
uint64_t sink_quantum;
/* And some bookkeping for the sender processing */
uint64_t rtp_base_ts;
uint32_t rtp_last_ts;
};
static int do_emit_state_changed(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -161,7 +182,18 @@ static int stream_start(struct impl *impl)
rtp_stream_emit_state_changed(impl, true, NULL);
if (impl->separate_sender) {
struct spa_dict_item items[1];
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_ALWAYS_PROCESS, "true");
pw_filter_set_active(impl->ptp_sender, true);
pw_filter_update_properties(impl->ptp_sender, NULL, &SPA_DICT_INIT(items, 1));
pw_log_info("activated pw_filter for separate sender");
}
impl->started = true;
return 0;
}
@ -174,6 +206,16 @@ static int stream_stop(struct impl *impl)
if (!impl->timer_running)
rtp_stream_emit_state_changed(impl, false, NULL);
if (impl->separate_sender) {
struct spa_dict_item items[1];
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_ALWAYS_PROCESS, "false");
pw_filter_update_properties(impl->ptp_sender, NULL, &SPA_DICT_INIT(items, 1));
pw_log_info("deactivating pw_filter for separate sender");
pw_filter_set_active(impl->ptp_sender, false);
}
impl->started = false;
return 0;
}
@ -304,7 +346,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
const struct rtp_stream_events *events, void *data)
{
struct impl *impl;
const char *str;
const char *str, *aes67_driver;
char tmp[64];
uint8_t buffer[1024];
struct spa_pod_builder b;
@ -516,11 +558,18 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
impl->target_buffer = (uint32_t)((impl->target_buffer / ptime) * impl->psamples);
}
aes67_driver = pw_properties_get(props, "aes67.driver-group");
pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", impl->rate);
if (direction == PW_DIRECTION_INPUT) {
if (direction == PW_DIRECTION_INPUT && !aes67_driver) {
/* While sending, we accept latency-sized buffers, and break it
* up and send in ptime intervals using a timer */
pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d",
impl->target_buffer, impl->rate);
} else {
/* For receive, and with split sending, we break up the latency
* as half being in stream latency, and the rest in our own
* ringbuffer latency */
pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d",
impl->target_buffer / 2, impl->rate);
}
@ -559,7 +608,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
params[n_params++] = spa_format_audio_build(&b,
SPA_PARAM_EnumFormat, &impl->stream_info);
flags |= PW_STREAM_FLAG_AUTOCONNECT;
rtp_audio_init(impl, direction);
rtp_audio_init(impl, core, direction, aes67_driver);
break;
case SPA_MEDIA_SUBTYPE_control:
params[n_params++] = spa_pod_builder_add_object(&b,