bluez5: handle any quantum in a2dp-sink

Remove quantum limitation from a2dp-sink, and adjust how flushing is
done.

The "low-latency" A2DP codecs are not able to flush all data at once, so
for them flush based on a timeout, such that "excess samples" for each
quantum is bounded.  We also limit excess samples for the other A2DP
codecs, based on some testing on flaky headset/adapter combinations (for
most cases, this does not appear to matter).

Leave decision of packet sizes to the codecs. Currently, we send packets
based on min_latency, but sendinf full packets might help with stutter
on some headset/adapter combinations.  The slightly increased latency
hardly matters against the 100ms delays in BT headsets.

Bump codec API version.
This commit is contained in:
Pauli Virtanen 2021-11-06 12:54:02 +02:00
parent 9377ae5fcd
commit 377bc16eb5
5 changed files with 107 additions and 34 deletions

View file

@ -43,7 +43,7 @@
#define SPA_TYPE_INTERFACE_Bluez5CodecA2DP SPA_TYPE_INFO_INTERFACE_BASE "Bluez5:Codec:A2DP:Private"
#define SPA_VERSION_BLUEZ5_CODEC_A2DP 0
#define SPA_VERSION_BLUEZ5_CODEC_A2DP 1
struct spa_bluez5_codec_a2dp {
struct spa_interface iface;

View file

@ -70,6 +70,9 @@ struct props {
#define FILL_FRAMES 2
#define MAX_BUFFERS 32
#define MIN_LATENCY 128
#define MAX_LATENCY 8192
#define BUFFER_SIZE (MAX_LATENCY*8)
struct buffer {
uint32_t id;
@ -137,6 +140,8 @@ struct impl {
struct spa_source source;
int timerfd;
struct spa_source flush_source;
struct spa_source flush_timer_source;
int flush_timerfd;
struct spa_io_clock *clock;
struct spa_io_position *position;
@ -153,14 +158,14 @@ struct impl {
int need_flush;
uint32_t block_size;
uint8_t buffer[4096];
uint8_t buffer[BUFFER_SIZE];
uint32_t buffer_used;
uint32_t header_size;
uint32_t frame_count;
uint16_t seqnum;
uint32_t timestamp;
uint64_t sample_count;
uint8_t tmp_buffer[4096];
uint8_t tmp_buffer[BUFFER_SIZE];
uint32_t tmp_buffer_used;
uint32_t fd_buffer_size;
};
@ -169,11 +174,7 @@ struct impl {
static void reset_props(struct impl *this, struct props *props)
{
if (this->codec->id == SPA_BLUETOOTH_AUDIO_CODEC_APTX_LL) {
props->min_latency = 256;
} else {
props->min_latency = MIN_LATENCY;
}
props->min_latency = MIN_LATENCY;
props->max_latency = MAX_LATENCY;
props->latency_offset = 0;
strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name));
@ -481,11 +482,6 @@ static int send_buffer(struct impl *this)
return written;
}
static bool want_flush(struct impl *this)
{
return (this->frame_count * this->block_size / this->port.frame_size >= this->props.min_latency);
}
static int encode_buffer(struct impl *this, const void *data, uint32_t size)
{
int processed;
@ -542,7 +538,7 @@ static int flush_buffer(struct impl *this)
spa_log_trace(this->log, "%p: used:%d block_size:%d", this,
this->buffer_used, this->block_size);
if (this->need_flush || want_flush(this))
if (this->need_flush)
return send_buffer(this);
return 0;
@ -565,12 +561,25 @@ static int add_data(struct impl *this, const void *data, uint32_t size)
return total;
}
static void enable_flush(struct impl *this, bool enabled)
static void enable_flush(struct impl *this, bool enabled, uint64_t timeout)
{
if (SPA_FLAG_IS_SET(this->flush_source.mask, SPA_IO_OUT) != enabled) {
SPA_FLAG_UPDATE(this->flush_source.mask, SPA_IO_OUT, enabled);
bool flush_enabled = enabled && (timeout == 0);
struct itimerspec ts;
if (SPA_FLAG_IS_SET(this->flush_source.mask, SPA_IO_OUT) != flush_enabled) {
SPA_FLAG_UPDATE(this->flush_source.mask, SPA_IO_OUT, flush_enabled);
spa_loop_update_source(this->data_loop, &this->flush_source);
}
if (!enabled)
timeout = 0;
ts.it_value.tv_sec = timeout / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = timeout % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system,
this->flush_timerfd, 0, &ts, NULL);
}
static int flush_data(struct impl *this, uint64_t now_time)
@ -645,7 +654,7 @@ again:
}
if (written > 0 && this->buffer_used == this->header_size) {
enable_flush(this, false);
enable_flush(this, false, 0);
return 0;
}
@ -672,10 +681,31 @@ again:
spa_log_trace(this->log, "%p: error flushing %s", this,
spa_strerror(written));
reset_buffer(this);
enable_flush(this, false);
enable_flush(this, false, 0);
return written;
}
else if (written > 0) {
/*
* We cannot write all data we have at once, since this can exceed
* device buffers. We'll want a limited number of "excess"
* samples. This is an issue for the "low-latency" A2DP codecs.
*
* Flushing the rest of the data (if any) is delayed after a timeout,
* selected on an average-rate basis:
*
* npackets = quantum / packet_samples
* write_end_time = npackets * timeout
* max_excess = quantum - sample_rate * write_end_time
* packet_time = packet_samples / sample_rate
* => timeout = (quantum - max_excess)/quantum * packet_time
*/
uint64_t max_excess = 2*256;
uint64_t packet_samples = this->frame_count * this->block_size / port->frame_size;
uint64_t packet_time = packet_samples * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate;
uint64_t quantum = SPA_LIKELY(this->clock) ? this->clock->duration : 0;
uint64_t timeout = (quantum > max_excess) ?
(packet_time * (quantum - max_excess) / quantum) : 0;
reset_buffer(this);
if (now_time - this->last_error > SPA_NSEC_PER_SEC) {
if (get_transport_unused_size(this) == (int)this->fd_buffer_size) {
@ -684,15 +714,20 @@ again:
}
this->last_error = now_time;
}
if (!spa_list_is_empty(&port->ready))
goto again;
enable_flush(this, false);
if (!spa_list_is_empty(&port->ready)) {
spa_log_trace(this->log, "%p: flush after %d ns", this, (int)timeout);
if (timeout == 0)
goto again;
else
enable_flush(this, true, timeout);
} else {
enable_flush(this, false, 0);
}
}
else {
/* Don't want to flush yet, or failed to write anything */
spa_log_trace(this->log, "%p: skip flush", this);
enable_flush(this, false);
enable_flush(this, false, 0);
}
return 0;
}
@ -709,6 +744,30 @@ static void a2dp_on_flush(struct spa_source *source)
spa_loop_remove_source(this->data_loop, &this->flush_source);
return;
}
if (this->transport == NULL) {
enable_flush(this, false, 0);
return;
}
flush_data(this, this->current_time);
}
static void a2dp_on_flush_timeout(struct spa_source *source)
{
struct impl *this = source->data;
uint64_t exp;
spa_log_trace(this->log, "%p: flush on timeout", this);
if (spa_system_timerfd_read(this->data_system, this->flush_timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
if (this->transport == NULL) {
enable_flush(this, false, 0);
return;
}
flush_data(this, this->current_time);
}
@ -852,6 +911,13 @@ static int do_start(struct impl *this)
this->source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->source);
this->flush_timer_source.data = this;
this->flush_timer_source.fd = this->flush_timerfd;
this->flush_timer_source.func = a2dp_on_flush_timeout;
this->flush_timer_source.mask = SPA_IO_IN;
this->flush_timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_timer_source);
this->flush_source.data = this;
this->flush_source.fd = this->transport->fd;
this->flush_source.func = a2dp_on_flush;
@ -882,9 +948,18 @@ static int do_remove_source(struct spa_loop *loop,
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL);
if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source);
if (this->flush_timer_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_timer_source);
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->flush_timerfd, 0, &ts, NULL);
return 0;
}
@ -945,22 +1020,15 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
static void emit_node_info(struct impl *this, bool full)
{
char latency[64] = SPA_STRINGIFY(MIN_LATENCY)"/48000";
struct spa_dict_item node_info_items[] = {
{ SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, "Audio/Sink" },
{ SPA_KEY_NODE_DRIVER, "true" },
{ SPA_KEY_NODE_LATENCY, latency },
};
uint64_t old = full ? this->info.change_mask : 0;
if (full)
this->info.change_mask = this->info_all;
if (this->info.change_mask) {
if (this->transport && this->port.have_format)
snprintf(latency, sizeof(latency), "%d/%d", (int)this->props.min_latency,
(int)this->port.current_format.info.raw.rate);
else
snprintf(latency, sizeof(latency), "%d/48000", (int)this->props.min_latency);
this->info.props = &SPA_DICT_INIT_ARRAY(node_info_items);
spa_node_emit_info(&this->hooks, &this->info);
this->info.change_mask = old;
@ -1443,6 +1511,7 @@ static int impl_clear(struct spa_handle *handle)
if (this->transport)
spa_hook_remove(&this->transport_listener);
spa_system_close(this->data_system, this->timerfd);
spa_system_close(this->data_system, this->flush_timerfd);
return 0;
}
@ -1550,6 +1619,9 @@ impl_init(const struct spa_handle_factory *factory,
this->timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
this->flush_timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
return 0;
}

View file

@ -70,6 +70,8 @@ struct props {
#define FILL_FRAMES 2
#define MAX_BUFFERS 32
#define MIN_LATENCY 512
#define MAX_LATENCY 1024
struct buffer {
uint32_t id;

View file

@ -60,9 +60,6 @@ extern "C" {
#define PIPEWIRE_BATTERY_PROVIDER "/org/freedesktop/pipewire/battery"
#define MIN_LATENCY 512
#define MAX_LATENCY 1024
#define OBJECT_MANAGER_INTROSPECT_XML \
DBUS_INTROSPECT_1_0_XML_DOCTYPE_DECL_NODE \
"<node>\n" \

View file

@ -66,6 +66,8 @@ struct props {
};
#define MAX_BUFFERS 32
#define MIN_LATENCY 512
#define MAX_LATENCY 1024
struct buffer {
uint32_t id;