bluez5: use kernel-provided RX timestamps

Use kernel-provided packet reception timestamps to get less jitter in
packet timings. Mostly matters for ISO/SCO which have regular schedule.

A2DP (L2CAP) doesn't currently do RX timestamps in kernel, but we can as
well use the same mechanism for it.
This commit is contained in:
Pauli Virtanen 2025-03-22 14:18:58 +02:00 committed by Wim Taymans
parent 081116906d
commit 0d61cc1b1d
8 changed files with 165 additions and 31 deletions

View file

@ -3269,10 +3269,10 @@ static int spa_bt_transport_stop_volume_timer(struct spa_bt_transport *transport
}
int spa_bt_transport_ensure_sco_io(struct spa_bt_transport *t, struct spa_loop *data_loop)
int spa_bt_transport_ensure_sco_io(struct spa_bt_transport *t, struct spa_loop *data_loop, struct spa_system *data_system)
{
if (t->sco_io == NULL) {
t->sco_io = spa_bt_sco_io_create(t, data_loop, t->monitor->log);
t->sco_io = spa_bt_sco_io_create(t, data_loop, data_system, t->monitor->log);
if (t->sco_io == NULL)
return -ENOMEM;
}

View file

@ -5,6 +5,7 @@
#ifndef SPA_BLUEZ5_BT_LATENCY_H
#define SPA_BLUEZ5_BT_LATENCY_H
#include <time.h>
#include <sys/socket.h>
#include <linux/net_tstamp.h>
#include <linux/errqueue.h>

View file

@ -31,6 +31,11 @@
#define SPA_BLUEZ5_DECODE_BUFFER_H
#include <stdlib.h>
#include <time.h>
#include <sys/socket.h>
#include <linux/net_tstamp.h>
#include <linux/errqueue.h>
#include <spa/utils/defs.h>
#include <spa/support/log.h>
@ -308,4 +313,121 @@ static inline void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *thi
}
}
struct spa_bt_recvmsg_data {
struct spa_log *log;
struct spa_system *data_system;
int fd;
int64_t offset;
int64_t err;
};
static inline void spa_bt_recvmsg_update_clock(struct spa_bt_recvmsg_data *data, uint64_t *now)
{
const int64_t max_resync = (50 * SPA_NSEC_PER_USEC);
const int64_t n_avg = 10;
struct timespec ts1, ts2, ts3;
int64_t t1, t2, t3, offset, err;
spa_system_clock_gettime(data->data_system, CLOCK_MONOTONIC, &ts1);
spa_system_clock_gettime(data->data_system, CLOCK_REALTIME, &ts2);
spa_system_clock_gettime(data->data_system, CLOCK_MONOTONIC, &ts3);
t1 = SPA_TIMESPEC_TO_NSEC(&ts1);
t2 = SPA_TIMESPEC_TO_NSEC(&ts2);
t3 = SPA_TIMESPEC_TO_NSEC(&ts3);
if (now)
*now = t1;
offset = t1 + (t3 - t1) / 2 - t2;
/* Moving average smoothing, discarding outliers */
err = offset - data->offset;
if (SPA_ABS(err) > max_resync) {
/* Clock jump */
spa_log_debug(data->log, "%p: nsec err %"PRIi64" > max_resync %"PRIi64", resetting",
data, err, max_resync);
data->offset = offset;
data->err = 0;
err = 0;
} else if (SPA_ABS(err) / 2 <= data->err) {
data->offset += err / n_avg;
}
data->err += (SPA_ABS(err) - data->err) / n_avg;
}
static inline ssize_t spa_bt_recvmsg(struct spa_bt_recvmsg_data *r, void *buf, size_t max_size, uint64_t *rx_time)
{
char control[1024];
struct iovec data = {
.iov_base = buf,
.iov_len = max_size
};
struct msghdr msg = {
.msg_iov = &data,
.msg_iovlen = 1,
.msg_control = &control,
.msg_controllen = sizeof(control),
};
struct cmsghdr *cmsg;
uint64_t t = 0, now;
ssize_t res;
res = recvmsg(r->fd, &msg, MSG_DONTWAIT);
if (res < 0 || !rx_time)
return res;
spa_bt_recvmsg_update_clock(r, &now);
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
struct scm_timestamping *tss;
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_TIMESTAMPING)
continue;
tss = (struct scm_timestamping *)CMSG_DATA(cmsg);
t = SPA_TIMESPEC_TO_NSEC(&tss->ts[0]);
break;
}
if (!t) {
*rx_time = now;
return res;
}
*rx_time = t + r->offset;
/* CLOCK_REALTIME may jump, so sanity check */
if (*rx_time > now || *rx_time + 20 * SPA_NSEC_PER_MSEC < now)
*rx_time = now;
spa_log_trace(r->log, "%p: rx:%" PRIu64 " now:%" PRIu64 " d:%"PRIu64" off:%"PRIi64,
r, *rx_time, now, now - *rx_time, r->offset);
return res;
}
static inline void spa_bt_recvmsg_init(struct spa_bt_recvmsg_data *data, int fd,
struct spa_system *data_system, struct spa_log *log)
{
int flags = 0;
socklen_t len = sizeof(flags);
data->log = log;
data->data_system = data_system;
data->fd = fd;
data->offset = 0;
data->err = 0;
if (getsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &flags, &len) < 0)
spa_log_info(log, "failed to get SO_TIMESTAMPING");
flags |= SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_RX_SOFTWARE;
if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &flags, sizeof(flags)) < 0)
spa_log_info(log, "failed to set SO_TIMESTAMPING");
}
#endif

View file

@ -584,9 +584,10 @@ struct spa_bt_iso_io;
struct spa_bt_sco_io;
struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, struct spa_loop *data_loop, struct spa_log *log);
struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, struct spa_loop *data_loop,
struct spa_system *data_system, struct spa_log *log);
void spa_bt_sco_io_destroy(struct spa_bt_sco_io *io);
void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *userdata, uint8_t *data, int size), void *userdata);
void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *userdata, uint8_t *data, int size, uint64_t rx_time), void *userdata);
void spa_bt_sco_io_set_sink_cb(struct spa_bt_sco_io *io, int (*sink_cb)(void *userdata), void *userdata);
int spa_bt_sco_io_write(struct spa_bt_sco_io *io, uint8_t *data, int size);
@ -703,7 +704,7 @@ bool spa_bt_transport_volume_enabled(struct spa_bt_transport *transport);
int spa_bt_transport_acquire(struct spa_bt_transport *t, bool optional);
int spa_bt_transport_release(struct spa_bt_transport *t);
int spa_bt_transport_keepalive(struct spa_bt_transport *t, bool keepalive);
int spa_bt_transport_ensure_sco_io(struct spa_bt_transport *t, struct spa_loop *data_loop);
int spa_bt_transport_ensure_sco_io(struct spa_bt_transport *t, struct spa_loop *data_loop, struct spa_system *data_system);
#define spa_bt_transport_emit(t,m,v,...) spa_hook_list_call(&(t)->listener_list, \
struct spa_bt_transport_events, \

View file

@ -159,7 +159,7 @@ struct impl {
struct spa_audio_info codec_format;
uint8_t buffer_read[4096];
struct timespec now;
uint64_t now;
uint64_t sample_count;
uint32_t errqueue_count;
@ -167,6 +167,8 @@ struct impl {
struct delay_info delay;
int64_t delay_sink;
struct spa_source *update_delay_event;
struct spa_bt_recvmsg_data recv;
};
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
@ -420,13 +422,14 @@ static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer
}
}
static int32_t read_data(struct impl *this) {
static int32_t read_data(struct impl *this, uint64_t *rx_time)
{
const ssize_t b_size = sizeof(this->buffer_read);
int32_t size_read = 0;
again:
/* read data from socket */
size_read = recv(this->fd, this->buffer_read, b_size, MSG_DONTWAIT);
size_read = spa_bt_recvmsg(&this->recv, this->buffer_read, b_size, rx_time);
if (size_read == 0)
return 0;
@ -499,11 +502,11 @@ static void media_on_ready_read(struct spa_source *source)
{
struct impl *this = source->data;
struct port *port = &this->port;
struct timespec now;
void *buf;
int32_t size_read, decoded;
uint32_t avail;
uint64_t dt;
uint64_t now;
/* make sure the source is an input */
if ((source->rmask & SPA_IO_IN) == 0) {
@ -524,11 +527,8 @@ static void media_on_ready_read(struct spa_source *source)
spa_log_trace(this->log, "socket poll");
/* update the current pts */
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
/* read */
size_read = read_data (this);
size_read = read_data (this, &now);
if (size_read == 0)
return;
if (size_read < 0) {
@ -559,11 +559,10 @@ static void media_on_ready_read(struct spa_source *source)
if (!this->started)
return;
spa_bt_decode_buffer_write_packet(&port->buffer, decoded, SPA_TIMESPEC_TO_NSEC(&now));
spa_bt_decode_buffer_write_packet(&port->buffer, decoded, now);
dt = SPA_TIMESPEC_TO_NSEC(&this->now);
dt = now - this->now;
this->now = now;
dt = SPA_TIMESPEC_TO_NSEC(&this->now) - dt;
spa_log_trace(this->log, "decoded socket data size:%d frames:%d dt:%d dms",
(int)decoded, (int)decoded/port->frame_size,
@ -766,6 +765,8 @@ static int transport_start(struct impl *this)
if (setsockopt(this->fd, SOL_SOCKET, SO_PRIORITY, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "SO_PRIORITY failed: %m");
spa_bt_recvmsg_init(&this->recv, this->fd, this->data_system, this->log);
reset_buffers(port);
spa_bt_decode_buffer_clear(&port->buffer);
@ -1549,7 +1550,7 @@ static void process_buffering(struct impl *this)
if (buffer->h) {
buffer->h->seq = this->sample_count;
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
buffer->h->pts = this->now;
buffer->h->dts_offset = 0;
}

View file

@ -34,6 +34,8 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.sco-io");
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
#include "decode-buffer.h"
/* We'll use the read rx data size to find the correct packet size for writing,
* since kernel might not report it as the socket MTU, see
@ -60,9 +62,12 @@ struct spa_bt_sco_io {
struct spa_log *log;
struct spa_loop *data_loop;
struct spa_system *data_system;
struct spa_source source;
int (*source_cb)(void *userdata, uint8_t *data, int size);
struct spa_bt_recvmsg_data recv;
int (*source_cb)(void *userdata, uint8_t *data, int size, uint64_t rx_time);
void *source_userdata;
int (*sink_cb)(void *userdata);
@ -92,9 +97,10 @@ static void sco_io_on_ready(struct spa_source *source)
if (SPA_FLAG_IS_SET(source->rmask, SPA_IO_IN)) {
int res;
uint64_t rx_time;
read_again:
res = recv(io->fd, io->read_buffer, SPA_MIN(io->read_mtu, MAX_MTU), MSG_DONTWAIT);
res = spa_bt_recvmsg(&io->recv, io->read_buffer, SPA_MIN(io->read_mtu, MAX_MTU), &rx_time);
if (res <= 0) {
if (errno == EINTR) {
/* retry if interrupted */
@ -115,7 +121,7 @@ static void sco_io_on_ready(struct spa_source *source)
if (io->source_cb) {
int res;
res = io->source_cb(io->source_userdata, io->read_buffer, io->read_size);
res = io->source_cb(io->source_userdata, io->read_buffer, io->read_size, rx_time);
if (res) {
io->source_cb = NULL;
}
@ -193,7 +199,8 @@ int spa_bt_sco_io_write(struct spa_bt_sco_io *io, uint8_t *buf, int size)
}
struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, struct spa_loop *data_loop, struct spa_log *log)
struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, struct spa_loop *data_loop,
struct spa_system *data_system, struct spa_log *log)
{
struct spa_bt_sco_io *io;
@ -207,6 +214,7 @@ struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, s
io->read_mtu = transport->read_mtu;
io->write_mtu = transport->write_mtu;
io->data_loop = data_loop;
io->data_system = data_system;
io->log = log;
if (transport->device->adapter->bus_type == BUS_TYPE_USB) {
@ -230,6 +238,8 @@ struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, s
spa_log_debug(io->log, "%p: initial packet size:%d", io, io->read_size);
spa_bt_recvmsg_init(&io->recv, io->fd, io->data_system, io->log);
/* Add the ready callback */
io->source.data = io;
io->source.fd = io->fd;
@ -271,7 +281,7 @@ void spa_bt_sco_io_destroy(struct spa_bt_sco_io *io)
* This function should only be called from the data thread.
* Callback is called (in data loop) with data just read from the socket.
*/
void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *, uint8_t *, int), void *userdata)
void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *, uint8_t *, int, uint64_t), void *userdata)
{
io->source_cb = source_cb;
io->source_userdata = userdata;

View file

@ -823,7 +823,7 @@ static int transport_start(struct impl *this)
(int64_t)(spa_bt_transport_get_delay_nsec(this->transport) / SPA_NSEC_PER_MSEC));
/* start socket i/o */
if ((res = spa_bt_transport_ensure_sco_io(this->transport, this->data_loop)) < 0)
if ((res = spa_bt_transport_ensure_sco_io(this->transport, this->data_loop, this->data_system)) < 0)
goto fail;
this->flush_timer_source.data = this;

View file

@ -153,7 +153,7 @@ struct impl {
void *lc3;
#endif
struct timespec now;
uint64_t now;
};
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
@ -538,7 +538,7 @@ static uint32_t preprocess_and_decode_codec_data(void *userdata, uint8_t *read_d
return decoded;
}
static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read, uint64_t rx_time)
{
struct impl *this = userdata;
struct port *port = &this->port;
@ -555,9 +555,8 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
}
/* update the current pts */
dt = SPA_TIMESPEC_TO_NSEC(&this->now);
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
dt = SPA_TIMESPEC_TO_NSEC(&this->now) - dt;
dt = rx_time - this->now;
this->now = rx_time;
/* handle data read from socket */
#if 0
@ -566,7 +565,7 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC ||
this->transport->codec == HFP_AUDIO_CODEC_LC3_SWB) {
decoded = preprocess_and_decode_codec_data(userdata, read_data, size_read, SPA_TIMESPEC_TO_NSEC(&this->now));
decoded = preprocess_and_decode_codec_data(userdata, read_data, size_read, this->now);
} else {
uint32_t avail;
uint8_t *packet;
@ -591,7 +590,7 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
packet = spa_bt_decode_buffer_get_write(&port->buffer, &avail);
avail = SPA_MIN(avail, (uint32_t)size_read);
spa_memmove(packet, read_data, avail);
spa_bt_decode_buffer_write_packet(&port->buffer, avail, SPA_TIMESPEC_TO_NSEC(&this->now));
spa_bt_decode_buffer_write_packet(&port->buffer, avail, this->now);
decoded = avail;
}
@ -763,7 +762,7 @@ static int transport_start(struct impl *this)
this->io_error = false;
/* Start socket i/o */
if ((res = spa_bt_transport_ensure_sco_io(this->transport, this->data_loop)) < 0)
if ((res = spa_bt_transport_ensure_sco_io(this->transport, this->data_loop, this->data_system)) < 0)
goto fail;
spa_loop_invoke(this->data_loop, do_add_source, 0, NULL, 0, true, this);