diff --git a/src/daemon/pipewire-aes67.conf.in b/src/daemon/pipewire-aes67.conf.in index 34c432f65..479c12c9b 100644 --- a/src/daemon/pipewire-aes67.conf.in +++ b/src/daemon/pipewire-aes67.conf.in @@ -135,6 +135,10 @@ context.modules = [ audio.channels = 2 # These channel names will be visible both to applications and AES67 receivers node.channel-names = ["CH1", "CH2"] + # Uncomment this and comment node.group in send/recv stream.props to allow + # separate drivers for the RTP sink and PTP sending (i.e. force rate matching on + # the AES67 node rather than other nodes) + #aes67.driver-group = "pipewire.ptp0" stream.props = { ### Please change the sink name, this is necessary when you create multiple sinks diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 4a682ab54..c8e2daeb9 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -65,6 +65,8 @@ * - `sess.ts-refclk = `: the name of a reference clock * - `sess.media = `: the media type audio|midi|opus, default audio * - `stream.props = {}`: properties to be passed to the stream + * - `aes67.driver-group = `: for AES67 streams, can be specified in order to allow + * the sink to be driven by a different node than the PTP driver. * * ## General options * @@ -145,6 +147,7 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); "( audio.rate= ) " \ "( audio.channels= ) " \ "( audio.position= ) " \ + "( aes67.driver-group= ) " \ "( stream.props= { key=value ... } ) " static const struct spa_dict_item module_info[] = { @@ -549,6 +552,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, "sess.max-ptime"); copy_props(impl, props, "sess.latency.msec"); copy_props(impl, props, "sess.ts-refclk"); + copy_props(impl, props, "aes67.driver-group"); str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 56d70f7d3..453179330 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -214,7 +214,7 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, iov[1].iov_base = buffer; } -static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets) +static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uint64_t set_timestamp) { int32_t avail, tosend; uint32_t stride, timestamp; @@ -251,6 +251,7 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets) header.m = 0; header.sequence_number = htons(impl->seq); header.timestamp = htonl(impl->ts_offset + timestamp); + header.timestamp = htonl(impl->ts_offset + set_timestamp ? set_timestamp : timestamp); set_iovec(&impl->ring, impl->buffer, BUFFER_SIZE, @@ -289,7 +290,7 @@ static void rtp_audio_flush_timeout(struct impl *impl, uint64_t expirations) { if (expirations > 1) pw_log_warn("missing timeout %"PRIu64, expirations); - rtp_audio_flush_packets(impl, expirations); + rtp_audio_flush_packets(impl, expirations, 0); } static void rtp_audio_process_capture(void *data) @@ -303,6 +304,12 @@ static void rtp_audio_process_capture(void *data) struct spa_io_position *pos; uint64_t next_nsec, quantum; + if (impl->separate_sender) { + /* apply the DLL rate */ + SPA_FLAG_SET(impl->io_rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); + impl->io_rate_match->rate = impl->ptp_corr; + } + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_info("Out of stream buffers: %m"); return; @@ -322,6 +329,14 @@ static void rtp_audio_process_capture(void *data) timestamp = pos->clock.position * impl->rate / rate; next_nsec = pos->clock.next_nsec; quantum = (uint64_t)(pos->clock.duration * SPA_NSEC_PER_SEC / (rate * pos->clock.rate_diff)); + + if (impl->separate_sender) { + /* the sender process() function uses this for managing the DLL */ + impl->sink_nsec = pos->clock.nsec; + impl->sink_next_nsec = pos->clock.next_nsec; + impl->sink_resamp_delay = impl->io_rate_match->delay; + impl->sink_quantum = (uint64_t)(pos->clock.duration * SPA_NSEC_PER_SEC / rate); + } } else { timestamp = expected_timestamp; next_nsec = 0; @@ -336,6 +351,12 @@ static void rtp_audio_process_capture(void *data) impl->have_sync = true; expected_timestamp = timestamp; filled = 0; + + if (impl->separate_sender) { + /* the sender should know that the sync state has changed, and that it should + * refill the buffer */ + impl->refilling = true; + } } else { if (SPA_ABS((int)expected_timestamp - (int)timestamp) > (int)quantum) { pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); @@ -347,6 +368,8 @@ static void rtp_audio_process_capture(void *data) } } + pw_log_trace("writing %u samples at %u", wanted, expected_timestamp); + spa_ringbuffer_write_data(&impl->ring, impl->buffer, BUFFER_SIZE, @@ -357,12 +380,17 @@ static void rtp_audio_process_capture(void *data) pw_stream_queue_buffer(impl->stream, buf); + if (impl->separate_sender) { + /* sending will happen in a separate process() */ + return; + } + pending = filled / impl->psamples; num_queued = (filled + wanted) / impl->psamples; if (num_queued > 0) { /* flush all previous packets plus new one right away */ - rtp_audio_flush_packets(impl, pending + 1); + rtp_audio_flush_packets(impl, pending + 1, 0); num_queued -= SPA_MIN(num_queued, pending + 1); if (num_queued > 0) { @@ -375,13 +403,210 @@ static void rtp_audio_process_capture(void *data) } } -static int rtp_audio_init(struct impl *impl, enum spa_direction direction) +static void ptp_sender_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->ptp_sender_listener); + impl->ptp_sender = NULL; +} + +static void ptp_sender_process(void *d, struct spa_io_position *position) +{ + struct impl *impl = d; + uint64_t nsec, next_nsec, quantum, quantum_nsec; + uint32_t ptp_timestamp, rtp_timestamp, read_idx; + uint32_t rate; + uint32_t filled; + double error, in_flight, delay; + + nsec = position->clock.nsec; + next_nsec = position->clock.next_nsec; + + /* the ringbuffer indices are in sink timetamp domain */ + filled = spa_ringbuffer_get_read_index(&impl->ring, &read_idx); + + if (SPA_LIKELY(position)) { + rate = position->clock.rate.denom; + quantum = position->clock.duration; + quantum_nsec = (uint64_t)(quantum * SPA_NSEC_PER_SEC / rate); + /* PTP time tells us what time it is */ + ptp_timestamp = position->clock.position * impl->rate / rate; + /* RTP time is based on when we sent the first packet after the last sync */ + rtp_timestamp = impl->rtp_base_ts + read_idx; + } else { + pw_log_warn("No clock information, skipping"); + return; + } + + pw_log_trace("sink nsec:%lu, sink next_nsec:%lu, ptp nsec:%lu, ptp next_sec:%lu", + impl->sink_nsec, impl->sink_next_nsec, nsec, next_nsec); + + /* If send is lagging by more than 2 or more quanta, reset */ + if (!impl->refilling && impl->rtp_last_ts && + SPA_ABS((int32_t)ptp_timestamp - (int32_t)impl->rtp_last_ts) >= (int32_t)(2 * quantum)) { + pw_log_warn("expected %u - timestamp %u = %d >= 2 * %lu quantum", rtp_timestamp, impl->rtp_last_ts, + (int)rtp_timestamp - (int)impl->rtp_last_ts, quantum); + goto resync; + } + + if (!impl->have_sync) { + pw_log_trace("Waiting for sync"); + return; + } + + in_flight = (double)impl->sink_quantum * impl->rate / SPA_NSEC_PER_SEC * + (double)(nsec - impl->sink_nsec) / (impl->sink_next_nsec - impl->sink_nsec); + delay = filled + in_flight + impl->sink_resamp_delay; + + /* Make sure the PTP node wake up times are within the bounds of sink + * node wake up times (with a little bit of tolerance). */ + if (SPA_LIKELY(nsec > impl->sink_nsec - quantum_nsec && + nsec < impl->sink_next_nsec + quantum_nsec)) { + /* Start adjusting if we're at/past the target delay. We requested ~1/2 the buffer + * size as the sink latency, so doing so ensures that we have two sink quanta of + * data, making the chance of and underrun low even for small buffer values */ + if (impl->refilling && (double)impl->target_buffer - delay <= 0) { + impl->refilling = false; + /* Store the offset for the PTP time at which we start sending */ + impl->rtp_base_ts = ptp_timestamp - read_idx; + rtp_timestamp = impl->rtp_base_ts + read_idx; /* = ptp_timestamp */ + pw_log_debug("start sending. sink quantum:%lu, ptp quantum:%lu", impl->sink_quantum, quantum_nsec); + } + + if (!impl->refilling) { + /* + * As per Controlling Adaptive Resampling paper[1], maintain + * W(t) - R(t) - delta = 0. We keep delta as target_buffer. + * + * [1] http://kokkinizita.linuxaudio.org/papers/adapt-resamp.pdf + */ + error = delay - impl->target_buffer; + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + impl->ptp_corr = spa_dll_update(&impl->ptp_dll, error); + + pw_log_debug("filled:%u in_flight:%g delay:%g target:%u error:%f corr:%f", + filled, in_flight, delay, impl->target_buffer, error, impl->ptp_corr); + + if (filled >= impl->psamples) { + rtp_audio_flush_packets(impl, 1, rtp_timestamp); + impl->rtp_last_ts = rtp_timestamp; + } + } + } else { + pw_log_warn("PTP node wake up time out of bounds !(%lu < %lu < %lu)", + impl->sink_nsec, nsec, impl->sink_next_nsec); + goto resync; + } + + return; + +resync: + impl->have_sync = false; + impl->rtp_last_ts = 0; + + return; +} + +static const struct pw_filter_events ptp_sender_events = { + PW_VERSION_FILTER_EVENTS, + .destroy = ptp_sender_destroy, + .process = ptp_sender_process +}; + +static int setup_ptp_sender(struct impl *impl, struct pw_core *core, enum pw_direction direction, const char *driver_grp) +{ + const struct spa_pod *params[4]; + struct pw_properties *filter_props = NULL; + struct spa_pod_builder b; + uint32_t n_params; + uint8_t buffer[1024]; + int ret; + + if (direction != PW_DIRECTION_INPUT) + return 0; + + if (driver_grp == NULL) { + pw_log_info("AES67 driver group not specified, no separate sender configured"); + return 0; + } + + pw_log_info("AES67 driver group: %s, setting up separate sender", driver_grp); + + spa_dll_init(&impl->ptp_dll); + /* BW selected empirically, as it converges most quickly and holds reasonably well in testing */ + spa_dll_set_bw(&impl->ptp_dll, SPA_DLL_BW_MAX, impl->psamples, impl->rate); + impl->ptp_corr = 1.0; + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + + filter_props = pw_properties_new(NULL, NULL); + if (filter_props == NULL) { + int res = -errno; + pw_log_error( "can't create properties: %m"); + return res; + } + + pw_properties_set(filter_props, PW_KEY_NODE_GROUP, driver_grp); + pw_properties_setf(filter_props, PW_KEY_NODE_NAME, "%s-ptp-sender", pw_stream_get_name(impl->stream)); + pw_properties_set(filter_props, PW_KEY_NODE_ALWAYS_PROCESS, "true"); + + /* + * sess.latency.msec defines how much data is buffered before it is + * sent out on the network. This is done by setting the node.latency + * to that value, and process function will get chunks of that size. + * It is then split up into psamples chunks and send every ptime. + * + * With this separate sender mechanism we have some latency in stream + * via node.latency, and some in ringbuffer between sink and sender. + * Ideally we want to have a total latency that still corresponds to + * sess.latency.msec. We do this by using the property setting and + * splitting some of it as stream latency and some as ringbuffer + * latency. The ringbuffer latency is actually determined by how + * long we wait before setting `refilling` to false and start the + * sending. Also, see `filter_process`. + */ + pw_properties_setf(filter_props, PW_KEY_NODE_FORCE_QUANTUM, "%u", impl->psamples); + pw_properties_setf(filter_props, PW_KEY_NODE_FORCE_RATE, "%u", impl->rate); + + impl->ptp_sender = pw_filter_new(core, NULL, filter_props); + if (impl->ptp_sender == NULL) + return -errno; + + pw_filter_add_listener(impl->ptp_sender, &impl->ptp_sender_listener, + &ptp_sender_events, impl); + + n_params = 0; + params[n_params++] = spa_format_audio_raw_build(&b, + SPA_PARAM_EnumFormat, &impl->info.info.raw); + params[n_params++] = spa_format_audio_raw_build(&b, + SPA_PARAM_Format, &impl->info.info.raw); + + ret = pw_filter_connect(impl->ptp_sender, + PW_FILTER_FLAG_RT_PROCESS, + params, n_params); + if (ret == 0) { + pw_log_info("created pw_filter for separate sender"); + impl->separate_sender = true; + } else { + pw_log_error("failed to create pw_filter for separate sender"); + impl->separate_sender = false; + } + + return ret; +} + +static int rtp_audio_init(struct impl *impl, struct pw_core *core, enum spa_direction direction, const char *ptp_driver) { if (direction == SPA_DIRECTION_INPUT) impl->stream_events.process = rtp_audio_process_capture; else impl->stream_events.process = rtp_audio_process_playback; + impl->receive_rtp = rtp_audio_receive; impl->flush_timeout = rtp_audio_flush_timeout; + + setup_ptp_sender(impl, core, direction, ptp_driver); + return 0; } diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 466f63df7..27f8af98d 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -98,6 +98,27 @@ struct impl { int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); void (*flush_timeout)(struct impl *impl, uint64_t expirations); + + /* + * pw_filter where the filter would be driven at the PTP clock + * rate with RTP sink being driven at the sink driver clock rate + * or some ALSA clock rate. + */ + struct pw_filter *ptp_sender; + struct spa_hook ptp_sender_listener; + struct spa_dll ptp_dll; + double ptp_corr; + bool separate_sender; + bool refilling; + + /* Track some variables we need from the sink driver */ + uint64_t sink_next_nsec; + uint64_t sink_nsec; + uint64_t sink_resamp_delay; + uint64_t sink_quantum; + /* And some bookkeping for the sender processing */ + uint64_t rtp_base_ts; + uint32_t rtp_last_ts; }; static int do_emit_state_changed(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) @@ -161,7 +182,18 @@ static int stream_start(struct impl *impl) rtp_stream_emit_state_changed(impl, true, NULL); + if (impl->separate_sender) { + struct spa_dict_item items[1]; + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_ALWAYS_PROCESS, "true"); + + pw_filter_set_active(impl->ptp_sender, true); + pw_filter_update_properties(impl->ptp_sender, NULL, &SPA_DICT_INIT(items, 1)); + + pw_log_info("activated pw_filter for separate sender"); + } + impl->started = true; + return 0; } @@ -174,6 +206,16 @@ static int stream_stop(struct impl *impl) if (!impl->timer_running) rtp_stream_emit_state_changed(impl, false, NULL); + if (impl->separate_sender) { + struct spa_dict_item items[1]; + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_ALWAYS_PROCESS, "false"); + + pw_filter_update_properties(impl->ptp_sender, NULL, &SPA_DICT_INIT(items, 1)); + + pw_log_info("deactivating pw_filter for separate sender"); + pw_filter_set_active(impl->ptp_sender, false); + } + impl->started = false; return 0; } @@ -304,7 +346,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, const struct rtp_stream_events *events, void *data) { struct impl *impl; - const char *str; + const char *str, *aes67_driver; char tmp[64]; uint8_t buffer[1024]; struct spa_pod_builder b; @@ -516,11 +558,18 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->target_buffer = (uint32_t)((impl->target_buffer / ptime) * impl->psamples); } + aes67_driver = pw_properties_get(props, "aes67.driver-group"); + pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", impl->rate); - if (direction == PW_DIRECTION_INPUT) { + if (direction == PW_DIRECTION_INPUT && !aes67_driver) { + /* While sending, we accept latency-sized buffers, and break it + * up and send in ptime intervals using a timer */ pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", impl->target_buffer, impl->rate); } else { + /* For receive, and with split sending, we break up the latency + * as half being in stream latency, and the rest in our own + * ringbuffer latency */ pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", impl->target_buffer / 2, impl->rate); } @@ -559,7 +608,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, params[n_params++] = spa_format_audio_build(&b, SPA_PARAM_EnumFormat, &impl->stream_info); flags |= PW_STREAM_FLAG_AUTOCONNECT; - rtp_audio_init(impl, direction); + rtp_audio_init(impl, core, direction, aes67_driver); break; case SPA_MEDIA_SUBTYPE_control: params[n_params++] = spa_pod_builder_add_object(&b,