bluez5: sco-source: implement sco-source the same way as a2dp-source

This commit is contained in:
Pauli Virtanen 2022-06-18 20:03:41 +03:00 committed by Wim Taymans
parent 51356ea3d0
commit e1cb7c6fb2

View file

@ -58,11 +58,11 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.source.sco"
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
#include "decode-buffer.h"
#define DEFAULT_CLOCK_NAME "clock.system.monotonic"
struct props {
uint32_t min_latency;
uint32_t max_latency;
char clock_name[64];
};
@ -101,8 +101,7 @@ struct port {
struct spa_list free;
struct spa_list ready;
struct buffer *current_buffer;
uint32_t ready_offset;
struct spa_bt_decode_buffer buffer;
};
struct impl {
@ -116,6 +115,8 @@ struct impl {
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
uint32_t quantum_limit;
uint64_t info_all;
struct spa_node_info info;
#define IDX_PropInfo 0
@ -132,10 +133,18 @@ struct impl {
unsigned int started:1;
unsigned int following:1;
unsigned int matching:1;
unsigned int resampling:1;
struct spa_source timer_source;
int timerfd;
struct spa_io_clock *clock;
struct spa_io_position *position;
uint64_t current_time;
uint64_t next_time;
/* mSBC */
sbc_t msbc;
bool msbc_seq_initialized;
@ -150,13 +159,8 @@ struct impl {
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
static const uint32_t default_min_latency = 128;
static const uint32_t default_max_latency = 512;
static void reset_props(struct props *props)
{
props->min_latency = default_min_latency;
props->max_latency = default_max_latency;
strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name));
}
@ -184,23 +188,7 @@ static int impl_node_enum_params(void *object, int seq,
switch (id) {
case SPA_PARAM_PropInfo:
{
struct props *p = &this->props;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency),
SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency),
SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX));
break;
default:
return 0;
}
@ -208,15 +196,7 @@ static int impl_node_enum_params(void *object, int seq,
}
case SPA_PARAM_Props:
{
struct props *p = &this->props;
switch (result.index) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Props, id,
SPA_PROP_minLatency, SPA_POD_Int(p->min_latency),
SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency));
break;
default:
return 0;
}
@ -237,6 +217,41 @@ static int impl_node_enum_params(void *object, int seq,
return 0;
}
static int set_timeout(struct impl *this, uint64_t time)
{
struct itimerspec ts;
ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
return spa_system_timerfd_settime(this->data_system,
this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
static int set_timers(struct impl *this)
{
struct timespec now;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
this->next_time = SPA_TIMESPEC_TO_NSEC(&now);
return set_timeout(this, this->following ? 0 : this->next_time);
}
static int do_reassign_follower(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct impl *this = user_data;
struct port *port = &this->port;
spa_bt_decode_buffer_recover(&port->buffer);
return 0;
}
static inline bool is_following(struct impl *this)
{
return this->position && this->clock && this->position->clock.id != this->clock->id;
@ -269,6 +284,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
if (this->started && following != this->following) {
spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following);
this->following = following;
spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this);
}
return 0;
@ -284,10 +300,7 @@ static int apply_props(struct impl *this, const struct spa_pod *param)
if (param == NULL) {
reset_props(&new_props);
} else {
spa_pod_parse_object(param,
SPA_TYPE_OBJECT_Props, NULL,
SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency),
SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency));
/* noop */
}
changed = (memcmp(&new_props, &this->props, sizeof(struct props)) != 0);
@ -326,8 +339,6 @@ static void reset_buffers(struct port *port)
spa_list_init(&port->free);
spa_list_init(&port->ready);
port->current_buffer = NULL;
for (i = 0; i < port->n_buffers; i++) {
struct buffer *b = &port->buffers[i];
spa_list_append(&port->free, &b->link);
@ -422,116 +433,107 @@ static bool is_zero_packet(uint8_t *data, int size)
return true;
}
static void preprocess_and_decode_msbc_data(void *userdata, uint8_t *read_data, int size_read)
static uint32_t preprocess_and_decode_msbc_data(void *userdata, uint8_t *read_data, int size_read)
{
struct impl *this = userdata;
struct port *port = &this->port;
struct spa_data *datas = port->current_buffer->buf->datas;
uint32_t decoded = 0;
int i;
spa_log_trace(this->log, "handling mSBC data");
/* check if the packet contains only zeros - if so ignore the packet.
This is necessary, because some kernels insert bogus "all-zero" packets
into the datastream.
See https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/549 */
if (is_zero_packet(read_data, size_read)) {
return;
}
/*
* Check if the packet contains only zeros - if so ignore the packet.
* This is necessary, because some kernels insert bogus "all-zero" packets
* into the datastream.
* See https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/549
*/
if (is_zero_packet(read_data, size_read))
return 0;
int i;
for (i = 0; i < size_read; ++i) {
void *buf;
uint32_t avail;
int seq, processed;
size_t written;
msbc_buffer_append_byte(this, read_data[i]);
/* Handle found mSBC packets.
*
* XXX: if there's no space for the decoded audio in
* XXX: the current buffer, we'll drop data.
if (this->msbc_buffer_pos != MSBC_ENCODED_SIZE)
continue;
/*
* Handle found mSBC packet
*/
if (this->msbc_buffer_pos == MSBC_ENCODED_SIZE) {
spa_log_trace(this->log, "Received full mSBC packet, start processing it");
if (port->ready_offset + MSBC_DECODED_SIZE <= datas[0].maxsize) {
int seq, processed;
size_t written;
spa_log_trace(this->log,
"Output buffer has space, processing mSBC packet");
buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail);
/* Check sequence number */
seq = ((this->msbc_buffer[1] >> 4) & 1) |
((this->msbc_buffer[1] >> 6) & 2);
/* Check sequence number */
seq = ((this->msbc_buffer[1] >> 4) & 1) |
((this->msbc_buffer[1] >> 6) & 2);
spa_log_trace(this->log, "mSBC packet seq=%u", seq);
if (!this->msbc_seq_initialized) {
this->msbc_seq_initialized = true;
this->msbc_seq = seq;
} else if (seq != this->msbc_seq) {
spa_log_info(this->log,
"missing mSBC packet: %u != %u", seq, this->msbc_seq);
this->msbc_seq = seq;
/* TODO: Implement PLC. */
}
this->msbc_seq = (this->msbc_seq + 1) % 4;
/* decode frame */
processed = sbc_decode(
&this->msbc, this->msbc_buffer + 2, MSBC_ENCODED_SIZE - 3,
(uint8_t *)datas[0].data + port->ready_offset, MSBC_DECODED_SIZE,
&written);
if (processed < 0) {
spa_log_warn(this->log, "sbc_decode failed: %d", processed);
/* TODO: manage errors */
continue;
}
port->ready_offset += written;
} else {
spa_log_warn(this->log, "Output buffer full, dropping mSBC packet");
}
spa_log_trace(this->log, "mSBC packet seq=%u", seq);
if (!this->msbc_seq_initialized) {
this->msbc_seq_initialized = true;
this->msbc_seq = seq;
} else if (seq != this->msbc_seq) {
/* TODO: PLC (too late to insert data now) */
spa_log_info(this->log,
"missing mSBC packet: %u != %u", seq, this->msbc_seq);
this->msbc_seq = seq;
}
this->msbc_seq = (this->msbc_seq + 1) % 4;
if (avail < MSBC_DECODED_SIZE)
spa_log_warn(this->log, "Output buffer full, dropping msbc data");
/* decode frame */
processed = sbc_decode(
&this->msbc, this->msbc_buffer + 2, MSBC_ENCODED_SIZE - 3,
buf, avail, &written);
if (processed < 0) {
spa_log_warn(this->log, "sbc_decode failed: %d", processed);
/* TODO: manage errors */
continue;
}
spa_bt_decode_buffer_write_packet(&port->buffer, written);
decoded += written;
}
return decoded;
}
static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
{
struct impl *this = userdata;
struct port *port = &this->port;
struct spa_io_buffers *io = port->io;
struct spa_data *datas;
uint32_t min_data;
uint32_t decoded;
uint64_t dt;
if (this->transport == NULL) {
spa_log_debug(this->log, "no transport, stop reading");
goto stop;
}
/* get buffer */
if (!port->current_buffer) {
if (spa_list_is_empty(&port->free)) {
spa_log_warn(this->log, "buffer not available");
return 0;
}
port->current_buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&port->current_buffer->link);
port->ready_offset = 0;
}
datas = port->current_buffer->buf->datas;
/* update the current pts */
dt = SPA_TIMESPEC_TO_NSEC(&this->now);
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
dt = SPA_TIMESPEC_TO_NSEC(&this->now) - dt;
/* handle data read from socket */
spa_log_trace(this->log, "read socket data %d", size_read);
#if 0
hexdump_to_log(this, read_data, size_read);
#endif
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) {
preprocess_and_decode_msbc_data(userdata, read_data, size_read);
decoded = preprocess_and_decode_msbc_data(userdata, read_data, size_read);
} else {
uint32_t avail;
uint8_t *packet;
if (size_read != 48 && is_zero_packet(read_data, size_read)) {
/* Adapter is returning non-standard CVSD stream. For example
* Intel 8087:0029 at Firmware revision 0.0 build 191 week 21 2021
@ -539,64 +541,94 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
*/
return 0;
}
packet = (uint8_t *)datas[0].data + port->ready_offset;
spa_memmove(packet, read_data, size_read);
port->ready_offset += size_read;
packet = spa_bt_decode_buffer_get_write(&port->buffer, &avail);
avail = SPA_MIN(avail, (uint32_t)size_read);
spa_memmove(packet, read_data, avail);
spa_bt_decode_buffer_write_packet(&port->buffer, avail);
decoded = avail;
}
/* send buffer if full */
min_data = SPA_MIN(this->props.min_latency * port->frame_size, datas[0].maxsize / 2);
if (port->ready_offset >= min_data) {
uint64_t sample_count;
spa_log_trace(this->log, "read socket data size:%d decoded frames:%d dt:%d dms",
size_read, decoded / port->frame_size,
(int)(dt / 100000));
datas[0].chunk->offset = 0;
datas[0].chunk->size = port->ready_offset;
datas[0].chunk->stride = port->frame_size;
sample_count = datas[0].chunk->size / port->frame_size;
spa_list_append(&port->ready, &port->current_buffer->link);
port->current_buffer = NULL;
if (!this->following && this->clock) {
this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now);
this->clock->duration = sample_count * this->clock->rate.denom / port->current_format.info.raw.rate;
this->clock->position += this->clock->duration;
this->clock->delay = 0;
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = this->clock->nsec;
}
}
/* done if there are no buffers ready */
if (spa_list_is_empty(&port->ready))
return 0;
if (this->following)
return 0;
/* process the buffer if IO does not have any */
if (io->status != SPA_STATUS_HAVE_DATA) {
struct buffer *b;
if (io->buffer_id < port->n_buffers)
recycle_buffer(this, port, io->buffer_id);
b = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&b->link);
b->outstanding = true;
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_DATA;
}
/* notify ready */
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA);
return 0;
stop:
return 1;
}
static int setup_matching(struct impl *this)
{
struct port *port = &this->port;
if (this->position && port->rate_match) {
port->rate_match->rate = 1 / port->buffer.corr;
this->matching = this->following;
this->resampling = this->matching ||
(port->current_format.info.raw.rate != this->position->clock.rate.denom);
} else {
this->matching = false;
this->resampling = false;
}
if (port->rate_match)
SPA_FLAG_UPDATE(port->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE, this->matching);
return 0;
}
static void sco_on_timeout(struct spa_source *source)
{
struct impl *this = source->data;
struct port *port = &this->port;
uint64_t exp, duration;
uint32_t rate;
struct spa_io_buffers *io = port->io;
uint64_t prev_time, now_time;
if (this->transport == NULL)
return;
if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
prev_time = this->current_time;
now_time = this->current_time = this->next_time;
spa_log_trace(this->log, "%p: timer %"PRIu64" %"PRIu64"", this,
now_time, now_time - prev_time);
if (SPA_LIKELY(this->position)) {
duration = this->position->clock.duration;
rate = this->position->clock.rate.denom;
} else {
duration = 1024;
rate = 48000;
}
setup_matching(this);
this->next_time = now_time + duration * SPA_NSEC_PER_SEC / port->buffer.corr / rate;
if (SPA_LIKELY(this->clock)) {
this->clock->nsec = now_time;
this->clock->position += duration;
this->clock->duration = duration;
this->clock->rate_diff = port->buffer.corr;
this->clock->next_nsec = this->next_time;
}
spa_log_trace(this->log, "%p: %d", this, io->status);
io->status = SPA_STATUS_HAVE_DATA;
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA);
set_timeout(this, this->next_time);
}
static int do_add_source(struct spa_loop *loop,
bool async,
uint32_t seq,
@ -613,6 +645,7 @@ static int do_add_source(struct spa_loop *loop,
static int do_start(struct impl *this)
{
struct port *port = &this->port;
bool do_accept;
int res;
@ -636,7 +669,13 @@ static int do_start(struct impl *this)
return res;
/* Reset the buffers and sample count */
reset_buffers(&this->port);
reset_buffers(port);
spa_bt_decode_buffer_clear(&port->buffer);
if ((res = spa_bt_decode_buffer_init(&port->buffer, this->log,
port->frame_size, port->current_format.info.raw.rate,
this->quantum_limit, this->quantum_limit)) < 0)
return res;
/* Init mSBC if needed */
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) {
@ -653,6 +692,17 @@ static int do_start(struct impl *this)
goto fail;
spa_loop_invoke(this->data_loop, do_add_source, 0, NULL, 0, true, this);
/* Start timer */
this->timer_source.data = this;
this->timer_source.fd = this->timerfd;
this->timer_source.func = sco_on_timeout;
this->timer_source.mask = SPA_IO_IN;
this->timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->timer_source);
setup_matching(this);
set_timers(this);
/* Set the started flag */
this->started = true;
@ -671,15 +721,25 @@ static int do_remove_source(struct spa_loop *loop,
void *user_data)
{
struct impl *this = user_data;
struct itimerspec ts;
if (this->transport && this->transport->sco_io)
spa_bt_sco_io_set_source_cb(this->transport->sco_io, NULL, NULL);
if (this->timer_source.loop)
spa_loop_remove_source(this->data_loop, &this->timer_source);
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL);
return 0;
}
static int do_stop(struct impl *this)
{
struct port *port = &this->port;
int res = 0;
if (!this->started)
@ -696,6 +756,8 @@ static int do_stop(struct impl *this)
res = spa_bt_transport_release(this->transport);
}
spa_bt_decode_buffer_clear(&port->buffer);
return res;
}
@ -738,12 +800,9 @@ static void emit_node_info(struct impl *this, bool full)
{ SPA_KEY_MEDIA_CLASS, "Audio/Source" },
{ SPA_KEY_NODE_DRIVER, "true" },
};
char latency[64] = "128/8000";
const struct spa_dict_item ag_node_info_items[] = {
{ SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, "Stream/Output/Audio" },
{ SPA_KEY_NODE_LATENCY, latency },
{ "media.name", ((this->transport && this->transport->device->name) ?
this->transport->device->name : "HSP/HFP") },
};
@ -753,9 +812,6 @@ static void emit_node_info(struct impl *this, bool full)
if (full)
this->info.change_mask = this->info_all;
if (this->info.change_mask) {
if (this->transport && this->port.have_format)
snprintf(latency, sizeof(latency), "%d/%d", (int)this->props.min_latency,
(int)this->port.current_format.info.raw.rate);
this->info.props = is_ag ?
&SPA_DICT_INIT_ARRAY(ag_node_info_items) :
&SPA_DICT_INIT_ARRAY(hu_node_info_items);
@ -902,11 +958,11 @@ impl_node_port_enum_params(void *object, int seq,
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS),
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->props.max_latency * port->frame_size,
this->props.min_latency * port->frame_size,
this->quantum_limit * port->frame_size,
16 * port->frame_size,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size));
break;
@ -976,7 +1032,6 @@ static int clear_buffers(struct impl *this, struct port *port)
spa_list_init(&port->ready);
port->n_buffers = 0;
}
port->current_buffer = NULL;
return 0;
}
@ -1147,6 +1202,79 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t
return 0;
}
static uint32_t get_samples(struct impl *this, uint32_t *duration)
{
struct port *port = &this->port;
uint32_t samples;
if (SPA_LIKELY(port->rate_match) && this->resampling) {
samples = port->rate_match->size;
} else {
if (SPA_LIKELY(this->position))
samples = this->position->clock.duration * port->current_format.info.raw.rate
/ this->position->clock.rate.denom;
else
samples = 1024;
}
if (SPA_LIKELY(this->position))
*duration = this->position->clock.duration * port->current_format.info.raw.rate
/ this->position->clock.rate.denom;
else if (SPA_LIKELY(this->clock))
*duration = this->clock->duration * port->current_format.info.raw.rate
/ this->clock->rate.denom;
else
*duration = 1024 * port->current_format.info.raw.rate / 48000;
return samples;
}
static void process_buffering(struct impl *this)
{
struct port *port = &this->port;
uint32_t duration;
const uint32_t samples = get_samples(this, &duration);
void *buf;
uint32_t avail;
spa_bt_decode_buffer_process(&port->buffer, samples, duration);
setup_matching(this);
buf = spa_bt_decode_buffer_get_read(&port->buffer, &avail);
/* copy data to buffers */
if (!spa_list_is_empty(&port->free) && avail > 0) {
struct buffer *buffer;
struct spa_data *datas;
uint32_t data_size;
data_size = samples * port->frame_size;
avail = SPA_MIN(avail, data_size);
spa_bt_decode_buffer_read(&port->buffer, avail);
buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&buffer->link);
spa_log_trace(this->log, "dequeue %d", buffer->id);
datas = buffer->buf->datas;
spa_assert(datas[0].maxsize >= data_size);
datas[0].chunk->offset = 0;
datas[0].chunk->size = avail;
datas[0].chunk->stride = port->frame_size;
memcpy(datas[0].data, buf, avail);
/* ready buffer if full */
spa_log_trace(this->log, "queue %d frames:%d", buffer->id, (int)avail / port->frame_size);
spa_list_append(&port->ready, &buffer->link);
}
}
static int impl_node_process(void *object)
{
struct impl *this = object;
@ -1170,6 +1298,9 @@ static int impl_node_process(void *object)
io->buffer_id = SPA_ID_INVALID;
}
/* Produce data */
process_buffering(this);
/* Return if there are no buffers ready to be processed */
if (spa_list_is_empty(&port->ready))
return SPA_STATUS_OK;
@ -1252,6 +1383,8 @@ static int impl_clear(struct spa_handle *handle)
struct impl *this = (struct impl *) handle;
if (this->transport)
spa_hook_remove(&this->transport_listener);
spa_system_close(this->data_system, this->timerfd);
spa_bt_decode_buffer_clear(&this->port.buffer);
return 0;
}
@ -1341,6 +1474,10 @@ impl_init(const struct spa_handle_factory *factory,
spa_list_init(&port->ready);
spa_list_init(&port->free);
this->quantum_limit = 8192;
if (info && (str = spa_dict_lookup(info, "clock.quantum-limit")))
spa_atou32(str, &this->quantum_limit, 0);
if (info && (str = spa_dict_lookup(info, SPA_KEY_API_BLUEZ5_TRANSPORT)))
sscanf(str, "pointer:%p", &this->transport);
@ -1351,6 +1488,9 @@ impl_init(const struct spa_handle_factory *factory,
spa_bt_transport_add_listener(this->transport,
&this->transport_listener, &transport_events, this);
this->timerfd = spa_system_timerfd_create(this->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
return 0;
}