bluez5: replace sco-sink with media-sink

Change media-sink to use sco-io for HFP codecs.

Move SCO fragmentation to sco-io side.

Replace sco-sink with media-sink.

sco-sink is mostly copypaste from media-sink, and only differed in the
fragmentation detail, which can as well be handled on sco-io side.
This commit is contained in:
Pauli Virtanen 2025-06-08 13:19:05 +03:00
parent f9b0bf3f95
commit 5b4e9dc33e
5 changed files with 141 additions and 1731 deletions

View file

@ -609,8 +609,7 @@ struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, s
struct spa_system *data_system, struct spa_log *log);
void spa_bt_sco_io_destroy(struct spa_bt_sco_io *io);
void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *userdata, uint8_t *data, int size, uint64_t rx_time), void *userdata);
void spa_bt_sco_io_set_sink_cb(struct spa_bt_sco_io *io, int (*sink_cb)(void *userdata), void *userdata);
int spa_bt_sco_io_write(struct spa_bt_sco_io *io, uint8_t *data, int size);
int spa_bt_sco_io_write(struct spa_bt_sco_io *io, const uint8_t *buf, size_t size);
#define SPA_BT_VOLUME_ID_RX 0
#define SPA_BT_VOLUME_ID_TX 1

View file

@ -685,6 +685,8 @@ static int get_transport_unsent_size(struct impl *this)
if (this->tx_latency.enabled) {
res = 0;
value = this->tx_latency.unsent;
} else if (this->codec->kind == MEDIA_CODEC_HFP) {
value = 0;
} else {
res = ioctl(this->flush_source.fd, TIOCOUTQ, &value);
if (res < 0) {
@ -705,14 +707,20 @@ static int send_buffer(struct impl *this)
int written, unsent;
struct timespec ts_pre;
unsent = get_transport_unsent_size(this);
if (unsent >= 0)
this->codec->abr_process(this->codec_data, unsent);
if (this->codec->abr_process) {
unsent = get_transport_unsent_size(this);
if (unsent >= 0)
this->codec->abr_process(this->codec_data, unsent);
}
spa_system_clock_gettime(this->data_system, CLOCK_REALTIME, &ts_pre);
written = spa_bt_send(this->flush_source.fd, this->buffer, this->buffer_used,
&this->tx_latency, SPA_TIMESPEC_TO_NSEC(&ts_pre));
if (this->codec->kind == MEDIA_CODEC_HFP) {
written = spa_bt_sco_io_write(this->transport->sco_io, this->buffer, this->buffer_used);
} else {
written = spa_bt_send(this->flush_source.fd, this->buffer, this->buffer_used,
&this->tx_latency, SPA_TIMESPEC_TO_NSEC(&ts_pre));
}
if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, SPA_LOG_LEVEL_TRACE))) {
struct timespec ts;
@ -870,6 +878,7 @@ static int flush_data(struct impl *this, uint64_t now_time)
{
struct port *port = &this->port;
bool is_asha = this->codec->kind == MEDIA_CODEC_ASHA;
bool is_sco = this->codec->kind == MEDIA_CODEC_HFP;
uint32_t total_frames;
int written;
int unsent_buffer;
@ -877,10 +886,12 @@ static int flush_data(struct impl *this, uint64_t now_time)
spa_assert(this->transport_started);
/* I/O in error state? */
if (this->transport == NULL || (!this->flush_source.loop && !is_asha))
if (this->transport == NULL || (!this->flush_source.loop && !is_asha && !is_sco))
return -EIO;
if (!this->flush_timer_source.loop && !this->transport->iso_io && !is_asha)
return -EIO;
if (!this->transport->sco_io && is_sco)
return -EIO;
if (this->transport->iso_io && !this->iso_pending)
return 0;
@ -1021,7 +1032,10 @@ again:
if (written == -EAGAIN) {
spa_log_trace(this->log, "%p: fail flush", this);
if (now_time - this->last_error > SPA_NSEC_PER_SEC / 2) {
int res = this->codec->reduce_bitpool(this->codec_data);
int res = 0;
if (this->codec->reduce_bitpool)
res = this->codec->reduce_bitpool(this->codec_data);
spa_log_debug(this->log, "%p: reduce bitpool: %i", this, res);
this->last_error = now_time;
@ -1092,7 +1106,10 @@ again:
if (now_time - this->last_error > SPA_NSEC_PER_SEC) {
if (unsent_buffer == 0) {
int res = this->codec->increase_bitpool(this->codec_data);
int res = 0;
if (this->codec->increase_bitpool)
res = this->codec->increase_bitpool(this->codec_data);
spa_log_debug(this->log, "%p: increase bitpool: %i", this, res);
}
@ -1437,6 +1454,7 @@ static int transport_start(struct impl *this)
uint8_t *conf;
uint32_t flags;
bool is_asha;
bool is_sco;
if (this->transport_started)
return 0;
@ -1452,6 +1470,7 @@ static int transport_start(struct impl *this)
conf = this->transport->configuration;
size = this->transport->configuration_len;
is_asha = this->codec->kind == MEDIA_CODEC_ASHA;
is_sco = this->codec->kind == MEDIA_CODEC_HFP;
spa_log_debug(this->log, "Transport configuration:");
spa_debug_log_mem(this->log, SPA_LOG_LEVEL_DEBUG, 2, conf, (size_t)size);
@ -1494,7 +1513,7 @@ static int transport_start(struct impl *this)
if (this->block_size > sizeof(this->tmp_buffer)) {
spa_log_error(this->log, "block-size %d > %zu",
this->block_size, sizeof(this->tmp_buffer));
return -EIO;
goto fail;
}
spa_log_debug(this->log, "%p: block_size %d", this, this->block_size);
@ -1525,6 +1544,14 @@ static int transport_start(struct impl *this)
this->update_delay_event = spa_loop_utils_add_event(this->loop_utils, update_delay_event, this);
spa_zero(this->tx_latency);
if (is_sco) {
int res;
if ((res = spa_bt_transport_ensure_sco_io(this->transport, this->data_loop, this->data_system)) < 0)
goto fail;
}
if (!this->transport->iso_io && !is_asha) {
this->flush_timer_source.data = this;
this->flush_timer_source.fd = this->flush_timerfd;
@ -1533,10 +1560,11 @@ static int transport_start(struct impl *this)
this->flush_timer_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_timer_source);
spa_bt_latency_init(&this->tx_latency, this->transport, LATENCY_PERIOD, this->log);
if (!is_sco)
spa_bt_latency_init(&this->tx_latency, this->transport, LATENCY_PERIOD, this->log);
}
if (!is_asha) {
if (!is_asha && !is_sco) {
this->flush_source.data = this;
this->flush_source.fd = this->transport->fd;
this->flush_source.func = media_on_flush_error;
@ -1578,6 +1606,15 @@ static int transport_start(struct impl *this)
return 0;
fail:
if (this->codec_data) {
if (this->own_codec_data)
this->codec->deinit(this->codec_data);
this->own_codec_data = false;
this->codec_data = NULL;
}
return -EIO;
}
static int do_start(struct impl *this)
@ -1595,7 +1632,8 @@ static int do_start(struct impl *this)
this->start_ready = true;
if ((res = spa_bt_transport_acquire(this->transport, false)) < 0) {
bool do_accept = this->transport->profile & SPA_BT_PROFILE_HEADSET_AUDIO_GATEWAY;
if ((res = spa_bt_transport_acquire(this->transport, do_accept)) < 0) {
this->start_ready = false;
return res;
}
@ -1748,6 +1786,8 @@ static void emit_node_info(struct impl *this, bool full)
{
char node_group_buf[256];
char *node_group = NULL;
const char *media_role = NULL;
const char *codec_profile = media_codec_kind_str(this->codec);
if (this->transport && (this->transport->profile & SPA_BT_PROFILE_BAP_SINK)) {
spa_scnprintf(node_group_buf, sizeof(node_group_buf), "[\"bluez-iso-%s-cig-%d\"]",
@ -1765,7 +1805,10 @@ static void emit_node_info(struct impl *this, bool full)
node_group = node_group_buf;
}
const char *codec_profile = media_codec_kind_str(this->codec);
if (!this->is_output && this->transport &&
(this->transport->profile & SPA_BT_PROFILE_HEADSET_AUDIO_GATEWAY))
media_role = "Communication";
struct spa_dict_item node_info_items[] = {
{ SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, this->is_internal ? "Audio/Sink/Internal" :
@ -1774,6 +1817,7 @@ static void emit_node_info(struct impl *this, bool full)
this->transport->device->name : codec_profile ) },
{ SPA_KEY_NODE_DRIVER, this->is_output ? "true" : "false" },
{ "node.group", node_group },
{ SPA_KEY_MEDIA_ROLE, media_role },
};
uint64_t old = full ? this->info.change_mask : 0;
if (full)
@ -2035,7 +2079,8 @@ static int port_set_format(struct impl *this, struct port *port,
port->frame_size = info.info.raw.channels;
switch (info.info.raw.format) {
case SPA_AUDIO_FORMAT_S16:
case SPA_AUDIO_FORMAT_S16_LE:
case SPA_AUDIO_FORMAT_S16_BE:
port->frame_size *= 2;
break;
case SPA_AUDIO_FORMAT_S24:
@ -2525,6 +2570,8 @@ impl_init(const struct spa_handle_factory *factory,
if (this->codec->kind == MEDIA_CODEC_BAP)
this->is_output = this->transport->bap_initiator;
else if (this->transport->profile & SPA_BT_PROFILE_HEADSET_AUDIO_GATEWAY)
this->is_output = false;
else
this->is_output = true;
@ -2602,3 +2649,13 @@ const struct spa_handle_factory spa_a2dp_sink_factory = {
impl_init,
impl_enum_interface_info,
};
/* Retained for backward compatibility: */
const struct spa_handle_factory spa_sco_sink_factory = {
SPA_VERSION_HANDLE_FACTORY,
SPA_NAME_API_BLUEZ5_SCO_SINK,
&info,
impl_get_size,
impl_init,
impl_enum_interface_info,
};

View file

@ -16,7 +16,6 @@ bluez5_sources = [
'media-codecs.c',
'media-sink.c',
'media-source.c',
'sco-sink.c',
'sco-source.c',
'sco-io.c',
'iso-io.c',

View file

@ -56,7 +56,10 @@ struct spa_bt_sco_io {
bool started;
uint8_t read_buffer[MAX_MTU];
uint32_t read_size;
size_t read_size;
uint8_t write_buffer[MAX_MTU];
size_t write_size;
int fd;
uint16_t read_mtu;
@ -71,28 +74,9 @@ struct spa_bt_sco_io {
int (*source_cb)(void *userdata, uint8_t *data, int size, uint64_t rx_time);
void *source_userdata;
int (*sink_cb)(void *userdata);
void *sink_userdata;
};
static void update_source(struct spa_bt_sco_io *io)
{
int enabled;
int changed = 0;
enabled = io->sink_cb != NULL;
if (SPA_FLAG_IS_SET(io->source.mask, SPA_IO_OUT) != enabled) {
SPA_FLAG_UPDATE(io->source.mask, SPA_IO_OUT, enabled);
changed = 1;
}
if (changed) {
spa_loop_update_source(io->data_loop, &io->source);
}
}
static void sco_io_on_ready(struct spa_source *source)
{
struct spa_bt_sco_io *io = source->data;
@ -116,9 +100,13 @@ static void sco_io_on_ready(struct spa_source *source)
goto stop;
}
if (res != (int)io->read_size)
if (res != (int)io->read_size) {
spa_log_trace(io->log, "%p: packet size:%d", io, res);
/* drop buffer when packet size changes */
io->write_size = 0;
}
io->read_size = res;
if (io->source_cb) {
@ -131,23 +119,9 @@ static void sco_io_on_ready(struct spa_source *source)
}
read_done:
if (SPA_FLAG_IS_SET(source->rmask, SPA_IO_OUT)) {
if (io->sink_cb) {
int res;
res = io->sink_cb(io->sink_userdata);
if (res) {
io->sink_cb = NULL;
}
}
}
if (SPA_FLAG_IS_SET(source->rmask, SPA_IO_ERR) || SPA_FLAG_IS_SET(source->rmask, SPA_IO_HUP)) {
goto stop;
}
/* Poll socket in/out only if necessary */
update_source(io);
return;
stop:
@ -157,47 +131,80 @@ stop:
}
}
static int write_packets(struct spa_bt_sco_io *io, const uint8_t **buf, size_t *size, size_t packet_size)
{
while (*size >= packet_size) {
ssize_t written;
written = send(io->fd, *buf, packet_size, MSG_DONTWAIT | MSG_NOSIGNAL);
if (written < 0) {
if (errno == EINTR)
continue;
return -errno;
}
*buf += written;
*size -= written;
}
return 0;
}
/*
* Write data to socket in correctly sized blocks.
* Returns the number of bytes written, 0 when data cannot be written now or
* there is too little of it to write, and <0 on write error.
* Returns the number of bytes written or buffered, and <0 on write error.
*/
int spa_bt_sco_io_write(struct spa_bt_sco_io *io, uint8_t *buf, int size)
int spa_bt_sco_io_write(struct spa_bt_sco_io *io, const uint8_t *buf, size_t size)
{
uint16_t packet_size;
uint8_t *buf_start = buf;
const size_t orig_size = size;
const uint8_t *pos;
size_t packet_size;
int res;
if (io->read_size == 0) {
/* The proper write packet size is not known yet */
return 0;
}
packet_size = SPA_MIN(io->write_mtu, io->read_size);
packet_size = SPA_MIN(SPA_MIN(io->write_mtu, io->read_size), sizeof(io->write_buffer));
if (size < packet_size) {
return 0;
if (io->write_size >= packet_size) {
/* packet size changed, drop data */
io->write_size = 0;
} else if (io->write_size) {
/* write fragment */
size_t need = SPA_MIN(packet_size - io->write_size, size);
memcpy(io->write_buffer + io->write_size, buf, need);
buf += need;
size -= need;
io->write_size += need;
if (io->write_size < packet_size)
return orig_size;
pos = io->write_buffer;
if ((res = write_packets(io, &pos, &io->write_size, packet_size)) < 0)
goto fail;
if (io->write_size)
goto fail;
}
do {
int written;
/* write */
if ((res = write_packets(io, &buf, &size, packet_size)) < 0)
goto fail;
written = send(io->fd, buf, packet_size, MSG_DONTWAIT | MSG_NOSIGNAL);
if (written < 0) {
if (errno == EINTR) {
/* retry if interrupted */
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Don't continue writing */
break;
}
return -errno;
}
spa_assert(size < packet_size);
buf += written;
size -= written;
} while (size >= packet_size);
/* store fragment */
io->write_size = size;
if (size)
memcpy(io->write_buffer, buf, size);
return buf - buf_start;
return orig_size;
fail:
io->write_size = 0;
return res;
}
@ -236,7 +243,7 @@ struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, s
}
}
spa_log_debug(io->log, "%p: initial packet size:%d", io, io->read_size);
spa_log_debug(io->log, "%p: initial packet size:%d", io, (int)io->read_size);
spa_bt_recvmsg_init(&io->recv, io->fd, io->data_system, io->log);
@ -244,7 +251,7 @@ struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_bt_transport *transport, s
io->source.data = io;
io->source.fd = io->fd;
io->source.func = sco_io_on_ready;
io->source.mask = SPA_IO_IN | SPA_IO_OUT | SPA_IO_ERR | SPA_IO_HUP;
io->source.mask = SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP;
io->source.rmask = 0;
spa_loop_add_source(io->data_loop, &io->source);
@ -285,22 +292,4 @@ void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void
{
io->source_cb = source_cb;
io->source_userdata = userdata;
if (io->started) {
update_source(io);
}
}
/* Set sink callback.
* This function should only be called from the data thread.
* Callback is called (in data loop) when socket can be written to.
*/
void spa_bt_sco_io_set_sink_cb(struct spa_bt_sco_io *io, int (*sink_cb)(void *), void *userdata)
{
io->sink_cb = sink_cb;
io->sink_userdata = userdata;
if (io->started) {
update_source(io);
}
}

File diff suppressed because it is too large Load diff