diff --git a/spa/plugins/bluez5/decode-buffer.h b/spa/plugins/bluez5/decode-buffer.h index 6fb2d3c93..0441f041c 100644 --- a/spa/plugins/bluez5/decode-buffer.h +++ b/spa/plugins/bluez5/decode-buffer.h @@ -21,12 +21,8 @@ * The regular timer cycle cannot be aligned with this, so process() * may occur at any time. * - * The buffer level is the difference between the number of samples in - * buffer immediately after receiving a packet, and the samples consumed - * before receiving the next packet. - * - * The buffer level indicates how much any packet can be delayed without - * underrun. If it is positive, there are no underruns. + * The buffer level is the position of last received sample, relative to the current + * playback position. If it is larger than duration, there is no underrun. * * The rate correction aims to maintain the average level at a safety margin. */ @@ -45,16 +41,6 @@ #define BUFFERING_RATE_DIFF_MAX 0.005 -/** - * Safety margin. - * - * The spike is the long-window maximum difference - * between minimum and average buffer level. - */ -#define BUFFERING_TARGET(spike,packet_size,max_buf) \ - SPA_CLAMP((spike)*3/2, (packet_size), (max_buf) - 2*(packet_size)) - - struct spa_bt_decode_buffer { struct spa_log *log; @@ -74,20 +60,20 @@ struct spa_bt_decode_buffer struct spa_bt_rate_control ctl; double corr; - uint32_t prev_consumed; - uint32_t prev_avail; - uint32_t prev_duration; - uint32_t underrun; + uint32_t duration; uint32_t pos; int32_t target; /**< target buffer (0: automatic) */ - int32_t max_target; + int32_t max_extra; + + int32_t level; + uint64_t next_nsec; + double rate_diff; - uint8_t received:1; uint8_t buffering:1; }; -static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct spa_log *log, +static inline int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct spa_log *log, uint32_t frame_size, uint32_t rate, uint32_t quantum_limit, uint32_t reserve) { spa_zero(*this); @@ -100,7 +86,7 @@ static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct s this->corr = 1.0; this->target = 0; this->buffering = true; - this->max_target = INT32_MAX; + this->max_extra = INT32_MAX; spa_bt_rate_control_init(&this->ctl, 0); @@ -114,13 +100,13 @@ static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct s return 0; } -static void spa_bt_decode_buffer_clear(struct spa_bt_decode_buffer *this) +static inline void spa_bt_decode_buffer_clear(struct spa_bt_decode_buffer *this) { free(this->buffer_decoded); spa_zero(*this); } -static void spa_bt_decode_buffer_compact(struct spa_bt_decode_buffer *this) +static inline void spa_bt_decode_buffer_compact(struct spa_bt_decode_buffer *this) { uint32_t avail; @@ -153,23 +139,7 @@ done: spa_assert(this->buffer_size - this->write_index >= this->buffer_reserve); } -static void *spa_bt_decode_buffer_get_write(struct spa_bt_decode_buffer *this, uint32_t *avail) -{ - spa_bt_decode_buffer_compact(this); - spa_assert(this->buffer_size >= this->write_index); - *avail = this->buffer_size - this->write_index; - return SPA_PTROFF(this->buffer_decoded, this->write_index, void); -} - -static void spa_bt_decode_buffer_write_packet(struct spa_bt_decode_buffer *this, uint32_t size) -{ - spa_assert(size % this->frame_size == 0); - this->write_index += size; - this->received = true; - spa_bt_ptp_update(&this->packet_size, size / this->frame_size, size / this->frame_size); -} - -static void *spa_bt_decode_buffer_get_read(struct spa_bt_decode_buffer *this, uint32_t *avail) +static inline void *spa_bt_decode_buffer_get_read(struct spa_bt_decode_buffer *this, uint32_t *avail) { spa_assert(this->write_index >= this->read_index); if (!this->buffering) @@ -179,25 +149,49 @@ static void *spa_bt_decode_buffer_get_read(struct spa_bt_decode_buffer *this, ui return SPA_PTROFF(this->buffer_decoded, this->read_index, void); } -static void spa_bt_decode_buffer_read(struct spa_bt_decode_buffer *this, uint32_t size) +static inline void spa_bt_decode_buffer_read(struct spa_bt_decode_buffer *this, uint32_t size) { spa_assert(size % this->frame_size == 0); this->read_index += size; } -static void spa_bt_decode_buffer_recover(struct spa_bt_decode_buffer *this) +static inline void *spa_bt_decode_buffer_get_write(struct spa_bt_decode_buffer *this, uint32_t *avail) +{ + spa_bt_decode_buffer_compact(this); + spa_assert(this->buffer_size >= this->write_index); + *avail = this->buffer_size - this->write_index; + return SPA_PTROFF(this->buffer_decoded, this->write_index, void); +} + +static inline void spa_bt_decode_buffer_write_packet(struct spa_bt_decode_buffer *this, uint32_t size, uint64_t nsec) +{ + int32_t remain; + uint32_t avail; + + spa_assert(size % this->frame_size == 0); + this->write_index += size; + spa_bt_ptp_update(&this->packet_size, size / this->frame_size, size / this->frame_size); + + if (nsec && this->next_nsec && this->rate_diff != 0.0) { + int64_t dt = (this->next_nsec >= nsec) ? + (int64_t)(this->next_nsec - nsec) : -(int64_t)(nsec - this->next_nsec); + remain = (int32_t)SPA_CLAMP(dt * this->rate_diff * this->rate / SPA_NSEC_PER_SEC, + -(int32_t)this->duration, this->duration); + } else { + remain = 0; + } + + spa_bt_decode_buffer_get_read(this, &avail); + this->level = avail / this->frame_size + remain; +} + +static inline void spa_bt_decode_buffer_recover(struct spa_bt_decode_buffer *this) { int32_t size = (this->write_index - this->read_index) / this->frame_size; - int32_t level; - this->prev_avail = size * this->frame_size; - this->prev_consumed = this->prev_duration; - - level = (int32_t)this->prev_avail/this->frame_size - - (int32_t)this->prev_duration; + this->level = size; this->corr = 1.0; - - spa_bt_rate_control_init(&this->ctl, level); + spa_bt_rate_control_init(&this->ctl, size); } static inline void spa_bt_decode_buffer_set_target_latency(struct spa_bt_decode_buffer *this, int32_t samples) @@ -205,37 +199,49 @@ static inline void spa_bt_decode_buffer_set_target_latency(struct spa_bt_decode_ this->target = samples; } -static inline void spa_bt_decode_buffer_set_max_latency(struct spa_bt_decode_buffer *this, int32_t samples) +static inline void spa_bt_decode_buffer_set_max_extra_latency(struct spa_bt_decode_buffer *this, int32_t samples) { - this->max_target = samples; + this->max_extra = samples; } -static inline int32_t spa_bt_decode_buffer_get_target(struct spa_bt_decode_buffer *this) +static inline int32_t spa_bt_decode_buffer_get_target_latency(struct spa_bt_decode_buffer *this) { + const int32_t duration = this->duration; const int32_t packet_size = SPA_CLAMP(this->packet_size.max, 0, INT32_MAX/8); const int32_t max_buf = (this->buffer_size - this->buffer_reserve) / this->frame_size; + const int32_t spike = SPA_CLAMP(this->spike.max, 0, max_buf); int32_t target; if (this->target) target = this->target; else - target = BUFFERING_TARGET(this->spike.max, packet_size, max_buf); + target = SPA_CLAMP(SPA_ROUND_UP(SPA_MAX(spike * 3/2, duration), + SPA_CLAMP((int)this->rate / 50, 1, INT32_MAX)), + duration, max_buf - 2*packet_size); - return SPA_MIN(target, this->max_target); + return SPA_MIN(target, duration + SPA_CLAMP(this->max_extra, 0, INT32_MAX - duration)); } -static void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint32_t samples, uint32_t duration) +static inline void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint32_t samples, uint32_t duration, + double rate_diff, uint64_t next_nsec) { const uint32_t data_size = samples * this->frame_size; const int32_t packet_size = SPA_CLAMP(this->packet_size.max, 0, INT32_MAX/8); const int32_t max_level = SPA_MAX(8 * packet_size, (int32_t)duration); + const uint32_t avg_period = (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000; + int32_t target; uint32_t avail; - if (SPA_UNLIKELY(duration != this->prev_duration)) { - this->prev_duration = duration; + this->rate_diff = rate_diff; + this->next_nsec = next_nsec; + + if (SPA_UNLIKELY(duration != this->duration)) { + this->duration = duration; spa_bt_decode_buffer_recover(this); } + target = spa_bt_decode_buffer_get_target_latency(this); + if (SPA_UNLIKELY(this->buffering)) { int32_t size = (this->write_index - this->read_index) / this->frame_size; @@ -243,83 +249,63 @@ static void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint spa_log_trace(this->log, "%p buffering size:%d", this, (int)size); - if (this->received && - packet_size > 0 && - size >= SPA_MAX(3*packet_size, (int32_t)duration)) + if (size >= SPA_MAX((int)duration, target)) this->buffering = false; else return; + spa_bt_ptp_update(&this->spike, packet_size, duration); spa_bt_decode_buffer_recover(this); } spa_bt_decode_buffer_get_read(this, &avail); - if (this->received) { - const uint32_t avg_period = (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000; - int32_t level, target; + /* Track buffer level */ + this->level = SPA_MAX(this->level, -max_level); - /* Track buffer level */ - level = (int32_t)(this->prev_avail/this->frame_size) - (int32_t)this->prev_consumed; - level = SPA_MAX(level, -max_level); - this->prev_consumed = SPA_MIN(this->prev_consumed, avg_period); + spa_bt_ptp_update(&this->spike, (int32_t)this->ctl.avg - this->level, duration); - spa_bt_ptp_update(&this->spike, (int32_t)(this->ctl.avg - level), this->prev_consumed); + if (this->level > SPA_MAX(4 * target, 3*(int32_t)duration) && + avail > data_size) { + /* Lagging too much: drop data */ + uint32_t size = SPA_MIN(avail - data_size, + (this->level - target) * this->frame_size); - /* Update target level */ - target = spa_bt_decode_buffer_get_target(this); + spa_bt_decode_buffer_read(this, size); + spa_log_trace(this->log, "%p overrun samples:%d level:%d target:%d", + this, (int)size/this->frame_size, + (int)this->level, (int)target); - if (level > SPA_MAX(4 * target, 2*(int32_t)duration) && - avail > data_size) { - /* Lagging too much: drop data */ - uint32_t size = SPA_MIN(avail - data_size, - (level - target) * this->frame_size); - - spa_bt_decode_buffer_read(this, size); - spa_log_trace(this->log, "%p overrun samples:%d level:%d target:%d", - this, (int)size/this->frame_size, - (int)level, (int)target); - - spa_bt_decode_buffer_recover(this); - } - - this->pos += this->prev_consumed; - if (this->pos > this->rate) { - spa_log_debug(this->log, - "%p avg:%d target:%d level:%d buffer:%d spike:%d corr:%f", - this, - (int)this->ctl.avg, - (int)target, - (int)level, - (int)(avail / this->frame_size), - (int)this->spike.max, - (double)this->corr); - this->pos = 0; - } - - this->corr = spa_bt_rate_control_update(&this->ctl, - level, target, this->prev_consumed, avg_period, - BUFFERING_RATE_DIFF_MAX); - - spa_bt_decode_buffer_get_read(this, &avail); - - this->prev_consumed = 0; - this->prev_avail = avail; - this->underrun = 0; - this->received = false; + spa_bt_decode_buffer_recover(this); } + this->pos += duration; + if (this->pos > this->rate) { + spa_log_debug(this->log, + "%p avg:%d target:%d level:%d buffer:%d spike:%d corr:%f", + this, + (int)this->ctl.avg, + (int)target, + (int)this->level, + (int)(avail / this->frame_size), + (int)this->spike.max, + (double)this->corr); + this->pos = 0; + } + + this->corr = spa_bt_rate_control_update(&this->ctl, + this->level, target, duration, avg_period, + BUFFERING_RATE_DIFF_MAX); + + this->level -= duration; + + spa_bt_decode_buffer_get_read(this, &avail); if (avail < data_size) { spa_log_trace(this->log, "%p underrun samples:%d", this, (data_size - avail) / this->frame_size); - this->underrun += samples; - if (this->underrun >= SPA_MIN((uint32_t)max_level, this->buffer_size / this->frame_size)) { - this->buffering = true; - spa_log_debug(this->log, "%p underrun too much: start buffering", this); - } + this->buffering = true; + spa_bt_ptp_update(&this->spike, (int32_t)this->ctl.avg - this->level, duration); } - - this->prev_consumed += samples; } #endif diff --git a/spa/plugins/bluez5/media-source.c b/spa/plugins/bluez5/media-source.c index e002e0ee0..5f3fb522e 100644 --- a/spa/plugins/bluez5/media-source.c +++ b/spa/plugins/bluez5/media-source.c @@ -521,6 +521,9 @@ static void media_on_ready_read(struct spa_source *source) spa_log_trace(this->log, "socket poll"); + /* update the current pts */ + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + /* read */ size_read = read_data (this); if (size_read == 0) @@ -530,9 +533,6 @@ static void media_on_ready_read(struct spa_source *source) goto stop; } - /* update the current pts */ - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); - if (this->codec_props_changed && this->codec_props && this->codec->update_props) { this->codec->update_props(this->codec_data, this->codec_props); @@ -556,7 +556,7 @@ static void media_on_ready_read(struct spa_source *source) if (!this->started) return; - spa_bt_decode_buffer_write_packet(&port->buffer, decoded); + spa_bt_decode_buffer_write_packet(&port->buffer, decoded, SPA_TIMESPEC_TO_NSEC(&now)); dt = SPA_TIMESPEC_TO_NSEC(&this->now); this->now = now; @@ -681,7 +681,7 @@ static void update_transport_delay(struct impl *this) info.v = __atomic_load_n(&this->delay.v, __ATOMIC_RELAXED); /* Latency to sink */ - latency = info.buffer + info.duration + latency = info.buffer + port->latency[SPA_DIRECTION_INPUT].min_rate + port->latency[SPA_DIRECTION_INPUT].min_quantum * info.duration; @@ -772,8 +772,8 @@ static int transport_start(struct impl *this) return res; if (this->is_duplex) { - /* 80 ms max buffer */ - spa_bt_decode_buffer_set_max_latency(&port->buffer, + /* 80 ms max extra buffer */ + spa_bt_decode_buffer_set_max_extra_latency(&port->buffer, port->current_format.info.raw.rate * 80 / 1000); } @@ -1474,7 +1474,7 @@ static void update_target_latency(struct impl *this) port->current_format.info.raw.rate / SPA_USEC_PER_SEC; delay_sink = __atomic_load_n(&this->delay_sink, __ATOMIC_RELAXED); - latency = duration + delay_sink * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC; + latency = delay_sink * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC; if (samples > latency) samples -= latency; @@ -1502,7 +1502,9 @@ static void process_buffering(struct impl *this) update_target_latency(this); - spa_bt_decode_buffer_process(&port->buffer, samples, duration); + spa_bt_decode_buffer_process(&port->buffer, samples, duration, + this->position ? this->position->clock.rate_diff : 1.0, + this->position ? this->position->clock.next_nsec : 0); setup_matching(this); @@ -1557,7 +1559,7 @@ static void process_buffering(struct impl *this) } if (this->update_delay_event) { - int32_t target = spa_bt_decode_buffer_get_target(&port->buffer); + int32_t target = spa_bt_decode_buffer_get_target_latency(&port->buffer); if (target != this->delay.buffer || duration != this->delay.duration) { struct delay_info info = { .buffer = target, .duration = duration }; diff --git a/spa/plugins/bluez5/sco-source.c b/spa/plugins/bluez5/sco-source.c index 71725e492..dc2e1f070 100644 --- a/spa/plugins/bluez5/sco-source.c +++ b/spa/plugins/bluez5/sco-source.c @@ -458,7 +458,7 @@ static int lc3_decode_frame(struct impl *this, const void *src, size_t src_size, #endif } -static uint32_t preprocess_and_decode_codec_data(void *userdata, uint8_t *read_data, int size_read) +static uint32_t preprocess_and_decode_codec_data(void *userdata, uint8_t *read_data, int size_read, uint64_t now) { struct impl *this = userdata; struct port *port = &this->port; @@ -531,7 +531,7 @@ static uint32_t preprocess_and_decode_codec_data(void *userdata, uint8_t *read_d continue; } - spa_bt_decode_buffer_write_packet(&port->buffer, written); + spa_bt_decode_buffer_write_packet(&port->buffer, written, now); decoded += written; } @@ -566,7 +566,7 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read) if (this->transport->codec == HFP_AUDIO_CODEC_MSBC || this->transport->codec == HFP_AUDIO_CODEC_LC3_SWB) { - decoded = preprocess_and_decode_codec_data(userdata, read_data, size_read); + decoded = preprocess_and_decode_codec_data(userdata, read_data, size_read, SPA_TIMESPEC_TO_NSEC(&this->now)); } else { uint32_t avail; uint8_t *packet; @@ -591,7 +591,7 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read) packet = spa_bt_decode_buffer_get_write(&port->buffer, &avail); avail = SPA_MIN(avail, (uint32_t)size_read); spa_memmove(packet, read_data, avail); - spa_bt_decode_buffer_write_packet(&port->buffer, avail); + spa_bt_decode_buffer_write_packet(&port->buffer, avail, SPA_TIMESPEC_TO_NSEC(&this->now)); decoded = avail; } @@ -728,8 +728,8 @@ static int transport_start(struct impl *this) this->quantum_limit, this->quantum_limit)) < 0) return res; - /* 40 ms max buffer */ - spa_bt_decode_buffer_set_max_latency(&port->buffer, + /* 40 ms max buffer (on top of duration) */ + spa_bt_decode_buffer_set_max_extra_latency(&port->buffer, port->current_format.info.raw.rate * 40 / 1000); /* Init mSBC/LC3 if needed */ @@ -1402,7 +1402,9 @@ static void process_buffering(struct impl *this) void *buf; uint32_t avail; - spa_bt_decode_buffer_process(&port->buffer, samples, duration); + spa_bt_decode_buffer_process(&port->buffer, samples, duration, + this->position ? this->position->clock.rate_diff : 1.0, + this->position ? this->position->clock.next_nsec : 0); setup_matching(this);