bluez5: media-sink: use iso-io for BAP

Use the ISO IO helpers to get synchronized BAP output, and rate match to
the ISO schedule.

The rate matching is necessary, since the driver may be ticking at a
corrected rate, different from the ISO interval rate.
This commit is contained in:
Pauli Virtanen 2023-03-25 18:01:43 +02:00 committed by Wim Taymans
parent cec050ac25
commit 6db234ad0c

View file

@ -39,6 +39,7 @@
#include "rtp.h"
#include "media-codecs.h"
#include "rate-control.h"
#include "iso-io.h"
static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.sink.media");
#undef SPA_LOG_TOPIC_DEFAULT
@ -144,7 +145,8 @@ struct impl {
uint64_t next_time;
uint64_t last_error;
uint64_t process_time;
uint64_t process_position;
uint64_t process_duration;
uint64_t process_rate;
uint64_t prev_flush_time;
uint64_t next_flush_time;
@ -157,6 +159,8 @@ struct impl {
int need_flush;
bool fragment;
bool resync;
bool have_iso_packet;
uint32_t block_size;
uint8_t buffer[BUFFER_SIZE];
uint32_t buffer_used;
@ -168,10 +172,6 @@ struct impl {
uint8_t tmp_buffer[BUFFER_SIZE];
uint32_t tmp_buffer_used;
uint32_t fd_buffer_size;
#ifdef HAVE_BLUETOOTH_BAP
struct bt_iso_qos qos;
#endif
};
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0)
@ -284,52 +284,75 @@ static int set_timers(struct impl *this)
return set_timeout(this, this->following ? 0 : this->next_time);
}
static int do_reassign_follower(struct spa_loop *loop,
static inline bool is_following(struct impl *this)
{
return this->position && this->clock && this->position->clock.id != this->clock->id;
}
struct reassign_io_info {
struct impl *this;
struct spa_io_position *position;
struct spa_io_clock *clock;
};
static int do_reassign_io(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct impl *this = user_data;
set_timers(this);
return 0;
}
struct reassign_io_info *info = user_data;
struct impl *this = info->this;
bool following;
static inline bool is_following(struct impl *this)
{
return this->position && this->clock && this->position->clock.id != this->clock->id;
if (this->position != info->position || this->clock != info->clock)
this->resync = true;
this->position = info->position;
this->clock = info->clock;
following = is_following(this);
if (following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
set_timers(this);
}
return 0;
}
static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
{
struct impl *this = object;
bool following;
struct reassign_io_info info = { .this = this, .position = this->position, .clock = this->clock };
spa_return_val_if_fail(this != NULL, -EINVAL);
switch (id) {
case SPA_IO_Clock:
this->clock = data;
if (this->clock != NULL) {
spa_scnprintf(this->clock->name,
sizeof(this->clock->name),
info.clock = data;
if (info.clock != NULL) {
spa_scnprintf(info.clock->name,
sizeof(info.clock->name),
"%s", this->props.clock_name);
}
break;
case SPA_IO_Position:
this->position = data;
info.position = data;
break;
default:
return -ENOENT;
}
following = is_following(this);
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
if (this->started) {
spa_loop_invoke(this->data_loop, do_reassign_io, 0, NULL, 0, true, &info);
} else {
this->clock = info.clock;
this->position = info.position;
}
return 0;
}
@ -433,35 +456,36 @@ static uint32_t get_queued_frames(struct impl *this)
return bytes / port->frame_size;
}
static uint64_t get_reference_time(struct impl *this, uint64_t *duration_ns)
static uint64_t get_reference_time(struct impl *this, uint64_t *duration_ns_ret)
{
struct port *port = &this->port;
uint64_t t, duration_ns;
spa_assert(this->position);
if (!this->process_rate || !this->process_duration) {
if (this->position) {
this->process_duration = this->position->clock.duration;
this->process_rate = this->position->clock.rate.denom;
} else {
this->process_duration = 1024;
this->process_rate = 48000;
}
}
duration_ns = ((uint64_t)this->process_duration * SPA_NSEC_PER_SEC / this->process_rate);
if (duration_ns_ret)
*duration_ns_ret = duration_ns;
/* Time at the first sample in the current packet. */
*duration_ns = ((uint64_t)this->position->clock.duration * SPA_NSEC_PER_SEC
/ this->position->clock.rate.denom);
return this->process_time + *duration_ns
- ((uint64_t)get_queued_frames(this) * SPA_NSEC_PER_SEC
/ port->current_format.info.raw.rate);
}
t = this->process_time + duration_ns;
t -= ((uint64_t)get_queued_frames(this) * SPA_NSEC_PER_SEC
/ port->current_format.info.raw.rate);
static uint64_t get_reference_position(struct impl *this)
{
struct port *port = &this->port;
uint64_t position;
/* Account for resampling delay */
if (port->rate_match && this->clock && SPA_FLAG_IS_SET(port->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE))
t -= (uint64_t)port->rate_match->delay * SPA_NSEC_PER_SEC
/ this->clock->rate.denom;
/* Sample position at the first sample in the current packet.
* If resampling, may be rounded down by one sample.
*/
if (!this->position)
return this->sample_count;
position = this->process_position * port->current_format.info.raw.rate /
this->position->clock.rate.denom;
return position - get_queued_frames(this);
return t;
}
static int reset_buffer(struct impl *this)
@ -474,7 +498,8 @@ static int reset_buffer(struct impl *this)
this->need_flush = 0;
this->block_count = 0;
this->fragment = false;
this->timestamp = this->codec->bap ? get_reference_position(this) : this->sample_count;
this->timestamp = this->codec->bap ? (get_reference_time(this, NULL) / SPA_NSEC_PER_USEC)
: this->sample_count;
this->buffer_used = this->codec->start_encode(this->codec_data,
this->buffer, sizeof(this->buffer),
++this->seqnum, this->timestamp);
@ -675,79 +700,6 @@ static void enable_flush_timer(struct impl *this, bool enabled)
this->flush_pending = enabled;
}
#ifdef HAVE_BLUETOOTH_BAP
static void sync_iso_frame_start(struct impl *this)
{
struct port *port = &this->port;
uint64_t position;
uint32_t interval_frames;
uint32_t req;
if (!this->codec->bap || !this->qos.out.interval || !this->position)
return;
/* Synchronize packet start sample position to a multiple of the ISO interval.
*
* This ensures that different nodes in the graph create packets containing audio
* aligned at commensurate ISO intervals. This will then also align their flush
* reference times.
*
* The ISO interval generally consists of an integer number of frames, so we
* should do this calculation in frames.
*/
position = get_reference_position(this);
interval_frames = (uint64_t)port->current_format.info.raw.rate * this->qos.out.interval
/ SPA_USEC_PER_SEC;
/* Skip frames: generally, this should only occur once when the node starts. */
req = position % interval_frames;
if (this->position->clock.rate.denom != port->current_format.info.raw.rate) {
/* if resampling, the count may be rounded down by one */
if (req == interval_frames - 1)
req = 0;
}
if (req > 0)
req = interval_frames - req;
if (req > 0) {
spa_log_debug(this->log, "node %p: ISO sync %"PRIu64"->%"PRIu64": skipping %d frames",
this, position, SPA_ROUND_UP(position, interval_frames), req);
}
while (req > 0 && !spa_list_is_empty(&port->ready)) {
struct buffer *b;
struct spa_data *d;
uint32_t avail;
b = spa_list_first(&port->ready, struct buffer, link);
d = b->buf->datas;
avail = d[0].chunk->size - port->ready_offset;
avail /= port->frame_size;
avail = SPA_MIN(avail, req);
port->ready_offset += avail * port->frame_size;
req -= avail;
if (port->ready_offset >= d[0].chunk->size) {
spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
spa_log_trace(this->log, "%p: reuse buffer %u", this, b->id);
this->port.io->buffer_id = b->id;
spa_node_call_reuse_buffer(&this->callbacks, 0, b->id);
port->ready_offset = 0;
}
spa_log_trace(this->log, "%p: skipped %u frames", this, avail);
}
}
#else
static void sync_iso_frame_start(struct impl *this)
{
}
#endif
static int flush_data(struct impl *this, uint64_t now_time)
{
int written;
@ -757,10 +709,11 @@ static int flush_data(struct impl *this, uint64_t now_time)
spa_assert(this->transport_started);
if (this->transport == NULL || !this->flush_source.loop || !this->flush_timer_source.loop) {
/* I/O in error state */
/* I/O in error state? */
if (this->transport == NULL || !this->flush_source.loop)
return -EIO;
if (!this->flush_timer_source.loop && !this->transport->iso_io)
return -EIO;
}
total_frames = 0;
again:
@ -831,6 +784,27 @@ again:
spa_log_trace(this->log, "%p: written %u frames", this, total_frames);
}
if (this->transport->iso_io) {
struct spa_bt_iso_io *iso_io = this->transport->iso_io;
if (this->need_flush && !this->have_iso_packet) {
size_t avail = SPA_MIN(this->buffer_used, sizeof(iso_io->buf));
spa_log_trace(this->log, "%p: ISO put fd:%d size:%u sn:%u ts:%u now:%"PRIu64,
this, this->transport->fd, (unsigned)avail,
(unsigned)this->seqnum, (unsigned)this->timestamp,
iso_io->now);
memcpy(iso_io->buf, this->buffer, avail);
iso_io->size = avail;
iso_io->timestamp = this->timestamp;
this->have_iso_packet = true;
reset_buffer(this);
}
return 0;
}
if (this->flush_pending) {
spa_log_trace(this->log, "%p: wait for flush timer", this);
return 0;
@ -880,8 +854,6 @@ again:
uint64_t packet_time = (uint64_t)packet_samples * SPA_NSEC_PER_SEC
/ port->current_format.info.raw.rate;
sync_iso_frame_start(this);
if (SPA_LIKELY(this->position)) {
uint64_t duration_ns;
@ -942,6 +914,98 @@ again:
return 0;
}
static void drop_frames(struct impl *this, uint32_t req)
{
struct port *port = &this->port;
while (req > 0 && !spa_list_is_empty(&port->ready)) {
struct buffer *b;
struct spa_data *d;
uint32_t avail;
b = spa_list_first(&port->ready, struct buffer, link);
d = b->buf->datas;
avail = d[0].chunk->size - port->ready_offset;
avail /= port->frame_size;
avail = SPA_MIN(avail, req);
port->ready_offset += avail * port->frame_size;
req -= avail;
if (port->ready_offset >= d[0].chunk->size) {
spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
spa_log_trace(this->log, "%p: reuse buffer %u", this, b->id);
this->port.io->buffer_id = b->id;
spa_node_call_reuse_buffer(&this->callbacks, 0, b->id);
port->ready_offset = 0;
}
spa_log_trace(this->log, "%p: skipped %u frames", this, avail);
}
}
static void media_iso_pull(struct spa_bt_iso_io *iso_io)
{
struct impl *this = iso_io->user_data;
struct port *port = &this->port;
const double period = 0.1 * SPA_NSEC_PER_SEC;
uint64_t duration_ns;
double value, target, err;
this->have_iso_packet = false;
if (this->resync || !this->position) {
spa_bt_rate_control_init(&port->ratectl, 0);
goto done;
}
/*
* Rate match sample position so that the graph is 3/2 ISO interval
* ahead of the time instant we have to send data.
*
* Being 1 ISO interval ahead is unavoidable otherwise we underrun,
* and the 1/2 is safety margin for the graph to deliver data
* in time.
*
* This is then the part of the TX latency on PipeWire side. There is
* another part of TX latency on kernel/controller side before the
* controller starts processing the packet.
*/
value = (int64_t)iso_io->now - (int64_t)get_reference_time(this, &duration_ns);
target = iso_io->duration * 3/2;
err = value - target;
if (err > iso_io->duration) {
uint32_t req = err * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC;
spa_log_debug(this->log, "%p: ISO sync reset frames:%u", this, (unsigned int)req);
spa_bt_rate_control_init(&port->ratectl, 0);
drop_frames(this, req);
} else if (-err > iso_io->duration) {
uint32_t req = -err * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC;
spa_log_debug(this->log, "%p: ISO sync skip flush frames:%u", this, (unsigned int)req);
return;
} else {
spa_bt_rate_control_update(&port->ratectl, err, 0,
iso_io->duration, period, RATE_CTL_DIFF_MAX);
spa_log_trace(this->log, "%p: ISO sync err:%+.3f value:%.3f target:%.3f (ms) corr:%g",
this,
port->ratectl.avg / SPA_NSEC_PER_MSEC,
value / SPA_NSEC_PER_MSEC,
target / SPA_NSEC_PER_MSEC,
port->ratectl.corr);
}
done:
flush_data(this, this->current_time);
}
static void media_on_flush_error(struct spa_source *source)
{
struct impl *this = source->data;
@ -955,6 +1019,8 @@ static void media_on_flush_error(struct spa_source *source)
enable_flush_timer(this, false);
if (this->flush_timer_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_timer_source);
if (this->transport && this->transport->iso_io)
spa_bt_iso_io_set_cb(this->transport->iso_io, NULL, NULL);
return;
}
}
@ -1048,6 +1114,15 @@ static void media_on_timeout(struct spa_source *source)
set_timeout(this, this->next_time);
}
static int do_start_iso_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct impl *this = user_data;
spa_bt_iso_io_set_cb(this->transport->iso_io, media_iso_pull, this);
return 0;
}
static int transport_start(struct impl *this)
{
int val, size;
@ -1120,26 +1195,18 @@ static int transport_start(struct impl *this)
if (setsockopt(this->transport->fd, SOL_SOCKET, SO_PRIORITY, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "SO_PRIORITY failed: %m");
#ifdef HAVE_BLUETOOTH_BAP
if (this->codec->bap) {
len = sizeof(this->qos);
if (getsockopt(this->transport->fd, SOL_BLUETOOTH, BT_ISO_QOS, &this->qos, &len) < 0) {
memset(&this->qos, 0, sizeof(this->qos));
spa_log_warn(this->log, "BT_ISO_QOS failed: %m");
}
}
#endif
reset_buffer(this);
spa_bt_rate_control_init(&port->ratectl, 0);
this->flush_timer_source.data = this;
this->flush_timer_source.fd = this->flush_timerfd;
this->flush_timer_source.func = media_on_flush_timeout;
this->flush_timer_source.mask = SPA_IO_IN;
this->flush_timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_timer_source);
if (!this->transport->iso_io) {
this->flush_timer_source.data = this;
this->flush_timer_source.fd = this->flush_timerfd;
this->flush_timer_source.func = media_on_flush_timeout;
this->flush_timer_source.mask = SPA_IO_IN;
this->flush_timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_timer_source);
}
this->flush_source.data = this;
this->flush_source.fd = this->transport->fd;
@ -1148,10 +1215,15 @@ static int transport_start(struct impl *this)
this->flush_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_source);
this->resync = true;
this->flush_pending = false;
this->transport_started = true;
if (this->transport->iso_io)
spa_loop_invoke(this->data_loop, do_start_iso_io, 0, NULL, 0, true, this);
return 0;
}
@ -1224,6 +1296,9 @@ static int do_remove_transport_source(struct spa_loop *loop,
spa_loop_remove_source(this->data_loop, &this->flush_timer_source);
enable_flush_timer(this, false);
if (this->transport->iso_io)
spa_bt_iso_io_set_cb(this->transport->iso_io, NULL, NULL);
return 0;
}
@ -1635,7 +1710,7 @@ impl_node_port_use_buffers(void *object,
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = &this->port;
spa_log_debug(this->log, "use buffers %d", n_buffers);
spa_log_debug(this->log, "%p: use buffers %d", this, n_buffers);
clear_buffers(this, port);
@ -1703,6 +1778,7 @@ static int impl_node_process(void *object)
struct impl *this = object;
struct port *port;
struct spa_io_buffers *io;
int res;
spa_return_val_if_fail(this != NULL, -EINVAL);
@ -1746,22 +1822,23 @@ static int impl_node_process(void *object)
}
}
if (this->position)
this->process_position = this->position->clock.position;
if (this->position) {
this->process_duration = this->position->clock.duration;
this->process_rate = this->position->clock.rate.denom;
} else {
this->process_duration = 1024;
this->process_rate = 48000;
}
this->process_time = this->current_time;
this->resync = false;
setup_matching(this);
if (!spa_list_is_empty(&port->ready)) {
int res;
spa_log_trace(this->log, "%p: flush on process", this);
if ((res = flush_data(this, this->current_time)) < 0) {
io->status = res;
return SPA_STATUS_STOPPED;
}
} else {
spa_log_trace(this->log, "%p: no flush on process", this);
spa_log_trace(this->log, "%p: on process time:%"PRIu64, this, this->process_time);
if ((res = flush_data(this, this->current_time)) < 0) {
io->status = res;
return SPA_STATUS_STOPPED;
}
return SPA_STATUS_HAVE_DATA;