diff --git a/spa/plugins/bluez5/iso-io.c b/spa/plugins/bluez5/iso-io.c index 69f6748ea..8015a81de 100644 --- a/spa/plugins/bluez5/iso-io.c +++ b/spa/plugins/bluez5/iso-io.c @@ -35,6 +35,27 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.iso"); #define LATENCY_PERIOD (1000 * SPA_NSEC_PER_MSEC) #define MAX_LATENCY (50 * SPA_NSEC_PER_MSEC) +#define CLOCK_SYNC_AVG_PERIOD (500 * SPA_NSEC_PER_MSEC) +#define CLOCK_SYNC_RATE_DIFF_MAX 0.005 + +#define ISO_BUFFERING_AVG_PERIOD (50 * SPA_NSEC_PER_MSEC) +#define ISO_BUFFERING_RATE_DIFF_MAX 0.05 + +struct clock_sync { + /** Reference monotonic time for streams in the group */ + int64_t base_time; + + /** Average error for current cycle */ + int64_t avg_err; + unsigned int avg_num; + + /** Log rate limiting */ + uint64_t log_pos; + + /** Rate matching ISO clock to monotonic clock */ + struct spa_bt_rate_control dll; +}; + struct group { struct spa_log *log; struct spa_loop *data_loop; @@ -43,10 +64,13 @@ struct group { struct spa_list streams; int timerfd; uint8_t id; - uint64_t next; - uint64_t duration; + int64_t next; + int64_t duration_tx; + int64_t duration_rx; bool flush; bool started; + + struct clock_sync rx_sync; }; struct stream { @@ -65,6 +89,12 @@ struct stream { struct spa_bt_latency tx_latency; struct spa_bt_decode_buffer *source_buf; + + /** Stream packet sequence number, relative to group::rx_sync */ + int64_t rx_pos; + + /** Current graph clock position */ + uint64_t position; }; struct modify_info @@ -151,11 +181,11 @@ static uint64_t get_time_ns(struct spa_system *system, clockid_t clockid) static int set_timers(struct group *group) { - if (group->duration == 0) + if (group->duration_tx == 0) return -EINVAL; - group->next = SPA_ROUND_UP(get_time_ns(group->data_system, CLOCK_MONOTONIC) + group->duration, - group->duration); + group->next = SPA_ROUND_UP(get_time_ns(group->data_system, CLOCK_MONOTONIC) + group->duration_tx, + group->duration_tx); return set_timeout(group, group->next); } @@ -309,7 +339,7 @@ static void group_on_timeout(struct spa_source *source) done: /* Pull data for the next interval */ - group->next += exp * group->duration; + group->next += exp * group->duration_tx; spa_list_for_each(stream, &group->streams, link) { if (!stream->sink) @@ -355,7 +385,6 @@ static struct group *group_create(struct spa_bt_transport *t, group->log = log; group->data_loop = data_loop; group->data_system = data_system; - group->duration = 0; spa_list_init(&group->streams); @@ -412,12 +441,15 @@ static struct stream *stream_create(struct spa_bt_transport *t, struct group *gr struct spa_audio_info format = { 0 }; int res; bool sink; + int64_t interval, *duration; if (t->profile == SPA_BT_PROFILE_BAP_SINK || t->profile == SPA_BT_PROFILE_BAP_BROADCAST_SINK) { sink = true; + duration = &group->duration_tx; } else { sink = false; + duration = &group->duration_rx; } if (t->media_codec->kind != MEDIA_CODEC_BAP || !t->media_codec->get_interval) { @@ -425,39 +457,40 @@ static struct stream *stream_create(struct spa_bt_transport *t, struct group *gr goto fail; } - if (sink) { - uint64_t interval; + res = t->media_codec->validate_config(t->media_codec, 0, t->configuration, t->configuration_len, &format); + if (res < 0) + goto fail; - res = t->media_codec->validate_config(t->media_codec, 0, t->configuration, t->configuration_len, &format); - if (res < 0) - goto fail; + codec_data = t->media_codec->init(t->media_codec, 0, t->configuration, t->configuration_len, + &format, NULL, t->write_mtu); + if (!codec_data) { + res = -EINVAL; + goto fail; + } - codec_data = t->media_codec->init(t->media_codec, 0, t->configuration, t->configuration_len, - &format, NULL, t->write_mtu); - if (!codec_data) { - res = -EINVAL; - goto fail; - } + block_size = t->media_codec->get_block_size(codec_data); + if (block_size < 0 || block_size > EMPTY_BUF_SIZE) { + res = -EINVAL; + goto fail; + } - block_size = t->media_codec->get_block_size(codec_data); - if (block_size < 0 || block_size > EMPTY_BUF_SIZE) { - res = -EINVAL; - goto fail; - } + interval = t->media_codec->get_interval(codec_data); + if (interval <= 5000) { + res = -EINVAL; + goto fail; + } - interval = t->media_codec->get_interval(codec_data); - if (interval <= 5000) { - res = -EINVAL; - goto fail; - } + if (*duration == 0) { + *duration = interval; + } else if (interval != *duration) { + /* SDU_Interval in ISO group must be same for each direction */ + res = -EINVAL; + goto fail; + } - if (group->duration == 0) { - group->duration = interval; - } else if (interval != group->duration) { - /* SDU_Interval in ISO group must be same for each direction */ - res = -EINVAL; - goto fail; - } + if (!sink) { + t->media_codec->deinit(codec_data); + codec_data = NULL; } stream = calloc(1, sizeof(struct stream)); @@ -467,7 +500,7 @@ static struct stream *stream_create(struct spa_bt_transport *t, struct group *gr stream->fd = t->fd; stream->sink = sink; stream->group = group; - stream->this.duration = sink ? group->duration : 0; + stream->this.duration = *duration; stream->codec = t->media_codec; stream->this.codec_data = codec_data; @@ -597,26 +630,234 @@ int spa_bt_iso_io_recv_errqueue(struct spa_bt_iso_io *this) return spa_bt_latency_recv_errqueue(&stream->tx_latency, stream->fd, group->log); } -/** Must be called from data thread */ +/** + * Set decode buffer used by a stream when it has packet RX. Set to NULL when stream is + * inactive. + * + * Must be called from data thread. + */ void spa_bt_iso_io_set_source_buffer(struct spa_bt_iso_io *this, struct spa_bt_decode_buffer *buffer) { struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this); + struct group *group = stream->group; + struct clock_sync *sync = &group->rx_sync; + + spa_zero(sync->dll); stream->source_buf = buffer; + if (buffer) { + /* Take over buffer overrun handling */ + buffer->no_overrun_drop = true; + buffer->buffering = false; + buffer->avg_period = ISO_BUFFERING_AVG_PERIOD; + buffer->rate_diff_max = ISO_BUFFERING_RATE_DIFF_MAX; + } } -/** Must be called from data thread */ -void spa_bt_iso_io_update_source_latency(struct spa_bt_iso_io *this) +/** + * Get automatic group-wide stream RX target latency. This is useful only for BAP Client. + * BAP Server target latency is determined by the presentation delay. + * + * Must be called from data thread. + */ +int32_t spa_bt_iso_io_get_source_target_latency(struct spa_bt_iso_io *this) { struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this); struct group *group = stream->group; struct stream *s; int32_t latency = 0; + if (!stream->source_buf) + return 0; + spa_list_for_each(s, &group->streams, link) if (s->source_buf) latency = SPA_MAX(latency, spa_bt_decode_buffer_get_auto_latency(s->source_buf)); - if (stream->source_buf) - spa_bt_decode_buffer_set_target_latency(stream->source_buf, latency); + return latency; +} + +/** + * Called on stream packet RX with packet monotonic timestamp. + * + * Returns the logical SDU reference time, with respect to which decode-buffer should + * target its fill level. This is needed so that all streams converge to same latency + * (with sub-sample accuracy needed for eg. stereo stream alignment). + * + * Determines the ISO group clock rate matching from individual stream packet RX times. + * Packet arrival time is decomposed to + * + * now = group::rx_sync::base_time + stream::rx_pos * group::duration_rx + err + * + * Clock rate matching is done by drifting base_time by the rate difference, so that `err` + * is zero on average across different streams. If stream's rx_pos appears to be out of + * sync, it is resynchronized to a new position. + * + * The logical SDU timestamps for different streams are aligned and occur at equal + * intervals, but the RX timestamp `now` we actually get here is a software timestamp + * indicating when packet was received by kernel. In practice, they are not equally spaced + * but are approximately aligned between different streams. + * + * The Core v6.1 specification does **not** provide any way to synchronize Controller and + * Host clocks, so we can attempt to sync to ISO clock only based on the RX timestamps. + * + * Because the actual packet RX times are not equally spaced, it's ambiguous what the + * logical SDU reference time is. It's then impossible to achieve clock synchronization with + * better accuracy than this jitter (on Intel AX210 it's several ms jitter in a regular + * pattern, plus some random noise). + * + * Aligned playback for different devices cannot be implemented with the tools provided in + * the specification. Some implementation-defined clock synchronization mechanism is + * needed, but kernel (6.17) doesn't have anything and it's not clear such vendor-defined + * mechanisms exist over USB. + * + * The HW timestamps on packets do not help with this, as they are in controller's clock + * domain. They are only useful for aligning packets from different streams. They are also + * optional in the specification and controllers don't necessarily implement them. They + * are not used here. + * + * Must be called from data thread. + */ +int64_t spa_bt_iso_io_recv(struct spa_bt_iso_io *this, int64_t now) +{ + struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this); + struct group *group = stream->group; + struct clock_sync *sync = &group->rx_sync; + struct stream *s; + bool resync = false; + int64_t err, t; + + spa_assert(stream->source_buf); + + if (sync->dll.corr == 0) { + sync->base_time = now; + spa_bt_rate_control_init(&sync->dll, 0); + } + + stream->rx_pos++; + t = sync->base_time + group->duration_rx * stream->rx_pos; + err = now - t; + + if (SPA_ABS(err) > group->duration_rx) { + resync = true; + spa_log_debug(group->log, "%p: ISO rx-resync large group:%u fd:%d", + group, group->id, stream->fd); + } + + spa_list_for_each(s, &group->streams, link) { + if (s == stream || !s->source_buf) + continue; + if (SPA_ABS(now - s->source_buf->rx.nsec) < group->duration_rx / 2 && + stream->rx_pos != s->rx_pos) { + spa_log_debug(group->log, "%p: ISO rx-resync balance group:%u fd:%d fd:%d", + group, group->id, stream->fd, s->fd); + resync = true; + break; + } + } + + if (resync) { + stream->rx_pos = (now - sync->base_time + group->duration_rx/2) / group->duration_rx; + t = sync->base_time + group->duration_rx * stream->rx_pos; + err = now - t; + spa_log_debug(group->log, "%p: ISO rx-resync group:%u fd:%d err:%"PRIi64, + group, group->id, stream->fd, err); + } + + sync->avg_err = (sync->avg_err * sync->avg_num + err) / (sync->avg_num + 1); + sync->avg_num++; + + return t; +} + +/** + * Call at end of stream process(), after consuming data. + * + * Apply ISO clock rate matching. + * + * Realign stream RX to target latency, if it is too far off, so that rate matching + * converges faster to alignment. + * + * Must be called from data thread + */ +void spa_bt_iso_io_check_rx_sync(struct spa_bt_iso_io *this, uint64_t position) +{ + struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this); + struct group *group = stream->group; + struct stream *s; + const int64_t max_err = group->duration_rx; + struct clock_sync *sync = &group->rx_sync; + int32_t target; + bool overrun = false; + double corr; + + if (!stream->source_buf) + return; + + /* Check sync after all input streams have completed process() on same cycle */ + stream->position = position; + spa_list_for_each(s, &group->streams, link) { + if (!s->source_buf) + continue; + if (s->position != stream->position) + return; + } + + target = stream->source_buf->target; + + /* Rate match ISO clock */ + corr = spa_bt_rate_control_update(&sync->dll, sync->avg_err, 0, + group->duration_rx, CLOCK_SYNC_AVG_PERIOD, CLOCK_SYNC_RATE_DIFF_MAX); + sync->base_time += (int64_t)(group->duration_rx * (corr - 1)); + + enum spa_log_level log_level = (sync->log_pos > SPA_NSEC_PER_SEC) ? SPA_LOG_LEVEL_DEBUG + : SPA_LOG_LEVEL_TRACE; + if (SPA_UNLIKELY(spa_log_level_topic_enabled(group->log, SPA_LOG_TOPIC_DEFAULT, log_level))) { + spa_log_lev(group->log, log_level, + "%p: ISO rx-sync group:%u base:%"PRIi64" avg:%g err:%"PRIi64" corr:%g", + group, group->id, sync->base_time, sync->dll.avg, sync->avg_err, corr-1); + sync->log_pos = 0; + } + sync->log_pos += stream->source_buf->duration_ns; + + sync->avg_err = 0; + sync->avg_num = 0; + + /* Handle overrun (e.g. resyncs streams after initial buffering) */ + spa_list_for_each(s, &group->streams, link) { + if (s->source_buf) { + double level = s->source_buf->level; + int max_level = target + max_err * s->source_buf->rate / SPA_NSEC_PER_SEC; + + if (level > max_level) + overrun = true; + } + } + + if (!overrun) + return; + + spa_list_for_each(s, &group->streams, link) { + if (!s->source_buf) + continue; + + int32_t level = (int32_t)s->source_buf->level; + + if (level > target) { + uint32_t drop = (level - target) * s->source_buf->frame_size; + uint32_t avail = spa_bt_decode_buffer_get_size(s->source_buf); + + drop = SPA_MIN(drop, avail); + + spa_log_debug(group->log, "%p: ISO overrun group:%u fd:%d level:%f target:%d drop:%u", + group, group->id, s->fd, + s->source_buf->level, + target, + drop/s->source_buf->frame_size); + + spa_bt_decode_buffer_read(s->source_buf, drop); + } + + spa_bt_decode_buffer_recover(s->source_buf); + } } diff --git a/spa/plugins/bluez5/iso-io.h b/spa/plugins/bluez5/iso-io.h index 1ff6285c1..c59ebc075 100644 --- a/spa/plugins/bluez5/iso-io.h +++ b/spa/plugins/bluez5/iso-io.h @@ -48,6 +48,8 @@ void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *io, spa_bt_iso_io_pull_t pull, v int spa_bt_iso_io_recv_errqueue(struct spa_bt_iso_io *io); void spa_bt_iso_io_set_source_buffer(struct spa_bt_iso_io *io, struct spa_bt_decode_buffer *buffer); -void spa_bt_iso_io_update_source_latency(struct spa_bt_iso_io *io); +int32_t spa_bt_iso_io_get_source_target_latency(struct spa_bt_iso_io *io); +void spa_bt_iso_io_check_rx_sync(struct spa_bt_iso_io *io, uint64_t position); +int64_t spa_bt_iso_io_recv(struct spa_bt_iso_io *io, int64_t now); #endif diff --git a/spa/plugins/bluez5/media-source.c b/spa/plugins/bluez5/media-source.c index 7b01b7f01..4fab044a0 100644 --- a/spa/plugins/bluez5/media-source.c +++ b/spa/plugins/bluez5/media-source.c @@ -572,6 +572,9 @@ static void add_data(struct impl *this, uint8_t *src, uint32_t src_size, uint64_ spa_log_trace(this->log, "%p: read socket data size:%d", this, src_size); + if (this->transport->iso_io) + now = spa_bt_iso_io_recv(this->transport->iso_io, now); + do { int32_t consumed; uint32_t avail; @@ -674,8 +677,10 @@ stop: this->io_error = true; if (this->source.loop) spa_loop_remove_source(this->data_loop, &this->source); - if (this->transport && this->transport->iso_io) + if (this->transport && this->transport->iso_io) { spa_bt_iso_io_set_cb(this->transport->iso_io, NULL, NULL); + spa_bt_iso_io_set_source_buffer(this->transport->iso_io, NULL); + } } static int media_sco_pull(void *userdata, uint8_t *buffer_read, int size_read, uint64_t now) @@ -931,6 +936,8 @@ static int transport_start(struct impl *this) if (this->codec->kind != MEDIA_CODEC_HFP) { spa_bt_recvmsg_init(&this->recv, this->fd, this->data_system, this->log); + spa_loop_locked(this->data_loop, do_start_sco_iso_io, 0, NULL, 0, this); + this->source.data = this; this->source.fd = this->fd; @@ -944,10 +951,8 @@ static int transport_start(struct impl *this) spa_zero(this->source); if (spa_bt_transport_ensure_sco_io(this->transport, this->data_loop, this->data_system) < 0) goto fail; - } - - if (this->transport->iso_io || this->transport->sco_io) spa_loop_locked(this->data_loop, do_start_sco_iso_io, 0, NULL, 0, this); + } this->transport_started = true; @@ -1045,8 +1050,10 @@ static int do_remove_transport_source(struct spa_loop *loop, if (this->source.loop) spa_loop_remove_source(this->data_loop, &this->source); - if (this->transport->iso_io) + if (this->transport->iso_io) { spa_bt_iso_io_set_cb(this->transport->iso_io, NULL, NULL); + spa_bt_iso_io_set_source_buffer(this->transport->iso_io, NULL); + } if (this->transport->sco_io) spa_bt_sco_io_set_source_cb(this->transport->sco_io, NULL, NULL); @@ -1661,8 +1668,11 @@ static void update_target_latency(struct impl *this) /* BAP Client. Should use same buffer size for all streams in the same * group, so that capture is in sync. */ - if (this->transport->iso_io) - spa_bt_iso_io_update_source_latency(this->transport->iso_io); + if (this->transport->iso_io) { + int32_t target = spa_bt_iso_io_get_source_target_latency(this->transport->iso_io); + + spa_bt_decode_buffer_set_target_latency(&port->buffer, target); + } return; } @@ -1777,6 +1787,9 @@ static void process_buffering(struct impl *this) spa_list_append(&port->ready, &buffer->link); } + if (this->transport->iso_io && this->position) + spa_bt_iso_io_check_rx_sync(this->transport->iso_io, this->position->clock.position); + if (this->update_delay_event) { int32_t target = spa_bt_decode_buffer_get_target_latency(&port->buffer); uint32_t decoder_delay = 0;