bluez5: decode-buffer: sub-sample accurate fill level tracking

Take resampler delay into account when computing the buffer fill level,
including the fractional part.

If decode-buffer is now fed nominal packet reference times in
write_packet(), it converges the total buffer + resampler latency to the
target at sub-sample accuracy.

This is needed for aligning RX of ISO streams in the same group, so that
e.g. stereo pair alignment is achieved even though the streams have
separate resamplers. Resampler phases get aligned via independent rate
matching.
This commit is contained in:
Pauli Virtanen 2025-09-07 16:53:10 +03:00 committed by Wim Taymans
parent e446e3aac5
commit 94c354c290
2 changed files with 96 additions and 66 deletions

View file

@ -21,8 +21,9 @@
* The regular timer cycle cannot be aligned with this, so process()
* may occur at any time.
*
* The buffer level is the position of last received sample, relative to the current
* playback position. If it is larger than duration, there is no underrun.
* The instantaneous buffer level is the time position (in samples) of the last
* received sample, relative to the nominal time position of the last sample of the last
* received packet. If it is always larger than duration, there is no underrun.
*
* The rate correction aims to maintain the average level at a safety margin.
*/
@ -59,6 +60,13 @@ struct spa_bt_decode_buffer
uint32_t frame_size;
uint32_t rate;
int64_t avg_period;
double rate_diff_max;
int32_t target; /**< target buffer (0: automatic) */
int32_t max_extra;
bool no_overrun_drop;
uint8_t *buffer_decoded;
uint32_t buffer_size;
@ -72,15 +80,20 @@ struct spa_bt_decode_buffer
struct spa_bt_rate_control ctl;
double corr;
uint32_t duration;
uint32_t pos;
int32_t target; /**< target buffer (0: automatic) */
int32_t max_extra;
int32_t level;
uint64_t next_nsec;
int64_t duration_ns;
int64_t next_nsec;
double rate_diff;
int32_t delay;
int32_t delay_frac;
double level;
struct {
int64_t nsec;
int64_t position;
} rx;
uint8_t buffering:1;
};
@ -99,6 +112,8 @@ static inline int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, s
this->target = 0;
this->buffering = true;
this->max_extra = INT32_MAX;
this->avg_period = BUFFERING_SHORT_MSEC * SPA_NSEC_PER_MSEC;
this->rate_diff_max = BUFFERING_RATE_DIFF_MAX;
spa_bt_rate_control_init(&this->ctl, 0);
@ -182,33 +197,28 @@ static inline size_t spa_bt_decode_buffer_get_size(struct spa_bt_decode_buffer *
static inline void spa_bt_decode_buffer_write_packet(struct spa_bt_decode_buffer *this, uint32_t size, uint64_t nsec)
{
int32_t remain;
uint32_t avail;
const int32_t duration = this->duration_ns * this->rate / SPA_NSEC_PER_SEC;
if (nsec) {
this->rx.nsec = nsec;
this->rx.position = size / this->frame_size;
} else {
this->rx.position += size / this->frame_size;
}
spa_assert(size % this->frame_size == 0);
this->write_index += size;
spa_bt_ptp_update(&this->packet_size, size / this->frame_size, size / this->frame_size);
if (nsec && this->next_nsec && this->rate_diff != 0.0) {
int64_t dt = (this->next_nsec >= nsec) ?
(int64_t)(this->next_nsec - nsec) : -(int64_t)(nsec - this->next_nsec);
remain = (int32_t)SPA_CLAMP(dt * this->rate_diff * this->rate / SPA_NSEC_PER_SEC,
-(int32_t)this->duration, this->duration);
if (this->rx.nsec && this->next_nsec) {
uint32_t avail = spa_bt_decode_buffer_get_size(this) / this->frame_size;
int64_t dt = this->next_nsec - this->rx.nsec;
this->level = dt * this->rate_diff * this->rate / SPA_NSEC_PER_SEC
+ avail + this->delay + this->delay_frac/1e9 - this->rx.position;
} else {
remain = 0;
this->level = spa_bt_decode_buffer_get_size(this) / this->frame_size - duration;
}
spa_bt_decode_buffer_get_read(this, &avail);
this->level = avail / this->frame_size + remain;
}
static inline void spa_bt_decode_buffer_recover(struct spa_bt_decode_buffer *this)
{
int32_t size = (this->write_index - this->read_index) / this->frame_size;
this->level = size;
this->corr = 1.0;
spa_bt_rate_control_init(&this->ctl, size);
}
static inline void spa_bt_decode_buffer_set_target_latency(struct spa_bt_decode_buffer *this, int32_t samples)
@ -223,7 +233,7 @@ static inline void spa_bt_decode_buffer_set_max_extra_latency(struct spa_bt_deco
static inline int32_t spa_bt_decode_buffer_get_auto_latency(struct spa_bt_decode_buffer *this)
{
const int32_t duration = this->duration;
const int32_t duration = this->duration_ns * this->rate / SPA_NSEC_PER_SEC;
const int32_t packet_size = SPA_CLAMP(this->packet_size.max, 0, INT32_MAX/8);
const int32_t max_buf = (this->buffer_size - this->buffer_reserve) / this->frame_size;
const int32_t spike = SPA_CLAMP(this->spike.max, 0, max_buf);
@ -243,21 +253,37 @@ static inline int32_t spa_bt_decode_buffer_get_target_latency(struct spa_bt_deco
return spa_bt_decode_buffer_get_auto_latency(this);
}
static inline void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint32_t samples, uint32_t duration,
double rate_diff, uint64_t next_nsec)
static inline void spa_bt_decode_buffer_recover(struct spa_bt_decode_buffer *this)
{
int32_t target = spa_bt_decode_buffer_get_target_latency(this);
this->rx.nsec = 0;
this->corr = 1.0;
spa_bt_rate_control_init(&this->ctl, target * SPA_NSEC_PER_SEC / this->rate);
spa_bt_decode_buffer_write_packet(this, 0, 0);
}
static inline void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint32_t samples, int64_t duration_ns,
double rate_diff, int64_t next_nsec, int32_t delay, int32_t delay_frac)
{
const uint32_t data_size = samples * this->frame_size;
const int32_t packet_size = SPA_CLAMP(this->packet_size.max, 0, INT32_MAX/8);
const int32_t max_level = SPA_MAX(8 * packet_size, (int32_t)duration);
const uint32_t avg_period = (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000;
int32_t target;
uint32_t avail;
double level;
this->rate_diff = rate_diff;
this->next_nsec = next_nsec;
this->delay = delay;
this->delay_frac = delay_frac;
if (SPA_UNLIKELY(duration != this->duration)) {
this->duration = duration;
/* The fractional delay is given at the start of current cycle. Make it relative
* to next_nsec used for the level calculations.
*/
this->delay_frac += (int32_t)(1e9 * samples - duration_ns * this->rate * this->rate_diff);
if (SPA_UNLIKELY(duration_ns != this->duration_ns)) {
this->duration_ns = duration_ns;
spa_bt_decode_buffer_recover(this);
}
@ -275,57 +301,60 @@ static inline void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *thi
else
return;
spa_bt_ptp_update(&this->spike, packet_size, duration);
spa_bt_ptp_update(&this->spike, packet_size, samples);
spa_bt_decode_buffer_recover(this);
}
spa_bt_decode_buffer_get_read(this, &avail);
/* Track buffer level */
this->level = SPA_MAX(this->level, -max_level);
level = SPA_MAX(this->level, 0);
spa_bt_ptp_update(&this->spike, (int32_t)this->ctl.avg - this->level, duration);
spa_bt_ptp_update(&this->spike,
(int32_t)(this->ctl.avg * this->rate / SPA_NSEC_PER_SEC - level),
samples);
if (this->level > SPA_MAX(4 * target, 3*(int32_t)samples) &&
avail > data_size) {
if (!this->no_overrun_drop &&
level > SPA_MAX(4 * target, 3*(int32_t)samples) && avail > data_size) {
/* Lagging too much: drop data */
uint32_t size = SPA_MIN(avail - data_size,
(this->level - target) * this->frame_size);
((int32_t)ceil(level) - target) * this->frame_size);
spa_bt_decode_buffer_read(this, size);
spa_log_trace(this->log, "%p overrun samples:%d level:%d target:%d",
spa_log_trace(this->log, "%p overrun samples:%d level:%.2f target:%d",
this, (int)size/this->frame_size,
(int)this->level, (int)target);
level, (int)target);
spa_bt_decode_buffer_recover(this);
}
this->pos += duration;
if (this->pos > this->rate) {
spa_log_debug(this->log,
"%p avg:%d target:%d level:%d buffer:%d spike:%d corr:%f",
this->pos += samples;
enum spa_log_level log_level = (this->pos > this->rate) ? SPA_LOG_LEVEL_DEBUG : SPA_LOG_LEVEL_TRACE;
if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, log_level))) {
spa_log_lev(this->log, log_level,
"%p avg:%.2f target:%d level:%.2f buffer:%d spike:%d corr:%g",
this,
(int)this->ctl.avg,
this->ctl.avg * this->rate / SPA_NSEC_PER_SEC,
(int)target,
(int)this->level,
level,
(int)(avail / this->frame_size),
(int)this->spike.max,
(double)this->corr);
(double)this->corr - 1);
this->pos = 0;
}
this->corr = spa_bt_rate_control_update(&this->ctl,
this->level, target, duration, avg_period,
BUFFERING_RATE_DIFF_MAX);
this->level -= samples;
level * SPA_NSEC_PER_SEC / this->rate,
((double)target + 0.5/this->rate) * SPA_NSEC_PER_SEC / this->rate,
duration_ns, this->avg_period, this->rate_diff_max);
spa_bt_decode_buffer_get_read(this, &avail);
if (avail < data_size) {
spa_log_trace(this->log, "%p underrun samples:%d", this,
spa_log_debug(this->log, "%p underrun samples:%d", this,
(data_size - avail) / this->frame_size);
this->buffering = true;
spa_bt_ptp_update(&this->spike, (int32_t)this->ctl.avg - this->level, duration);
spa_bt_ptp_update(&this->spike, samples, 0);
}
}

View file

@ -1622,7 +1622,7 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t
return 0;
}
static uint32_t get_samples(struct impl *this, uint32_t *result_duration)
static uint32_t get_samples(struct impl *this, int64_t *duration_ns)
{
struct port *port = &this->port;
uint32_t samples, rate_denom;
@ -1636,12 +1636,12 @@ static uint32_t get_samples(struct impl *this, uint32_t *result_duration)
rate_denom = port->current_format.info.raw.rate;
}
*result_duration = duration * port->current_format.info.raw.rate / rate_denom;
*duration_ns = duration * SPA_NSEC_PER_SEC / rate_denom;
if (SPA_LIKELY(port->rate_match) && this->resampling) {
samples = port->rate_match->size;
} else {
samples = *result_duration;
samples = duration;
}
return samples;
}
@ -1649,7 +1649,7 @@ static uint32_t get_samples(struct impl *this, uint32_t *result_duration)
static void update_target_latency(struct impl *this)
{
struct port *port = &this->port;
uint32_t samples, duration, latency;
uint32_t samples, latency;
int64_t delay_sink;
if (this->transport == NULL || !port->have_format)
@ -1669,8 +1669,6 @@ static void update_target_latency(struct impl *this)
if (this->transport->delay_us == SPA_BT_UNKNOWN_DELAY)
return;
get_samples(this, &duration);
/* Presentation delay for BAP server
*
* This assumes the time when we receive the packet is (on average)
@ -1708,8 +1706,8 @@ static void update_target_latency(struct impl *this)
static void process_buffering(struct impl *this)
{
struct port *port = &this->port;
uint32_t duration;
const uint32_t samples = get_samples(this, &duration);
int64_t duration_ns;
const uint32_t samples = get_samples(this, &duration_ns);
uint32_t data_size = samples * port->frame_size;
uint32_t avail;
@ -1726,9 +1724,11 @@ static void process_buffering(struct impl *this)
setup_matching(this);
spa_bt_decode_buffer_process(&port->buffer, samples, duration,
spa_bt_decode_buffer_process(&port->buffer, samples, duration_ns,
this->position ? this->position->clock.rate_diff : 1.0,
this->position ? this->position->clock.next_nsec : 0);
this->position ? this->position->clock.next_nsec : 0,
this->resampling ? this->port.rate_match->delay : 0,
this->resampling ? this->port.rate_match->delay_frac : 0);
/* copy data to buffers */
if (!spa_list_is_empty(&port->free)) {
@ -1780,6 +1780,7 @@ static void process_buffering(struct impl *this)
if (this->update_delay_event) {
int32_t target = spa_bt_decode_buffer_get_target_latency(&port->buffer);
uint32_t decoder_delay = 0;
uint32_t duration = this->position ? this->position->clock.duration : 1024;
if (this->codec->get_delay)
this->codec->get_delay(this->codec_data, NULL, &decoder_delay);