bluez5: add spa_bt_iso_io that does the low-level part for ISO

Add factored out helper for ISO socket I/O.

ISO sockets need synchronization of writes and audio position for
different stream fds in the same isochronous group, and it's easier to
separate out the part that coordinates it.
This commit is contained in:
Pauli Virtanen 2023-03-25 16:58:27 +02:00 committed by Wim Taymans
parent 0ed124f0fb
commit cec050ac25
5 changed files with 494 additions and 1 deletions

View file

@ -35,6 +35,7 @@
#include "config.h"
#include "codec-loader.h"
#include "player.h"
#include "iso-io.h"
#include "defs.h"
static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5");
@ -78,7 +79,9 @@ struct spa_bt_monitor {
struct spa_log *log;
struct spa_loop *main_loop;
struct spa_loop *data_loop;
struct spa_system *main_system;
struct spa_system *data_system;
struct spa_plugin_loader *plugin_loader;
struct spa_dbus *dbus;
struct spa_dbus_connection *dbus_connection;
@ -2304,6 +2307,9 @@ void spa_bt_transport_free(struct spa_bt_transport *transport)
transport->sco_io = NULL;
}
if (transport->iso_io)
spa_bt_iso_io_destroy(transport->iso_io);
spa_bt_transport_destroy(transport);
if (transport->acquire_call) {
@ -2951,6 +2957,48 @@ static int transport_set_volume(void *data, int id, float volume)
return 0;
}
static int transport_create_iso_io(struct spa_bt_transport *transport)
{
struct spa_bt_monitor *monitor = transport->monitor;
struct spa_bt_transport *t;
bool sink = (transport->profile & SPA_BT_PROFILE_BAP_SINK) != 0;
if (!(transport->profile & (SPA_BT_PROFILE_BAP_SINK | SPA_BT_PROFILE_BAP_SOURCE)))
return 0;
if (transport->bap_cig == 0xff || transport->bap_cis == 0xff)
return -EINVAL;
if (transport->iso_io) {
spa_log_debug(monitor->log, "transport %p: remove ISO IO", transport);
spa_bt_iso_io_destroy(transport->iso_io);
transport->iso_io = NULL;
}
/* Transports in same connected iso group share the same i/o */
spa_list_for_each(t, &monitor->transport_list, link) {
if (!(t->profile & (SPA_BT_PROFILE_BAP_SINK | SPA_BT_PROFILE_BAP_SOURCE)))
continue;
if (t->bap_cig != transport->bap_cig)
continue;
if (t->iso_io) {
spa_log_debug(monitor->log, "transport %p: attach ISO IO to %p",
transport, t);
transport->iso_io = spa_bt_iso_io_attach(t->iso_io, transport->fd, sink);
return 0;
}
}
spa_log_debug(monitor->log, "transport %p: new ISO IO", transport);
transport->iso_io = spa_bt_iso_io_create(transport->fd, sink,
monitor->log, monitor->data_loop, monitor->data_system);
if (transport->iso_io == NULL)
return -errno;
return 0;
}
static void transport_acquire_reply(DBusPendingCall *pending, void *user_data)
{
struct spa_bt_transport *transport = user_data;
@ -2979,6 +3027,12 @@ static void transport_acquire_reply(DBusPendingCall *pending, void *user_data)
dbus_error_init(&err);
if (transport->fd >= 0) {
spa_log_error(monitor->log, "transport %p: invalid duplicate acquire", transport);
ret = -EINVAL;
goto finish;
}
if (!dbus_message_get_args(r, &err,
DBUS_TYPE_UNIX_FD, &transport->fd,
DBUS_TYPE_UINT16, &transport->read_mtu,
@ -3003,8 +3057,13 @@ finish:
dbus_message_unref(r);
if (ret < 0)
spa_bt_transport_set_state(transport, SPA_BT_TRANSPORT_STATE_ERROR);
else
else {
if (transport_create_iso_io(transport) < 0)
spa_log_error(monitor->log, "transport %p: transport_create_iso_io failed",
transport);
spa_bt_transport_set_state(transport, SPA_BT_TRANSPORT_STATE_ACTIVE);
}
/* For LE Audio, multiple transport from the same device may share the same
* stream (CIS) and group (CIG) but for different direction, e.g. a speaker and
@ -3023,6 +3082,10 @@ finish:
spa_log_debug(monitor->log, "transport %p: linked Acquired %s, fd %d MTU %d:%d", t_linked,
t_linked->path, t_linked->fd, t_linked->read_mtu, t_linked->write_mtu);
if (transport_create_iso_io(t_linked) < 0)
spa_log_error(monitor->log, "transport %p: transport_create_iso_io failed",
t_linked);
spa_bt_transport_set_state(t_linked, SPA_BT_TRANSPORT_STATE_ACTIVE);
}
}
@ -3146,11 +3209,19 @@ static int do_transport_release(struct spa_bt_transport *transport)
spa_bt_player_set_state(transport->device->adapter->dummy_player, SPA_BT_PLAYER_STOPPED);
spa_bt_transport_set_state(transport, SPA_BT_TRANSPORT_STATE_IDLE);
if (transport->acquire_call) {
dbus_pending_call_cancel(transport->acquire_call);
transport->acquire_call = NULL;
}
if (transport->iso_io) {
spa_log_debug(monitor->log, "transport %p: remove ISO IO", transport);
spa_bt_iso_io_destroy(transport->iso_io);
transport->iso_io = NULL;
}
/* For LE Audio, multiple transport stream (CIS) can be linked together (CIG).
* If they are part of the same device they re-use the same fd, and call to
* release should be done for the last one only.
@ -5344,7 +5415,9 @@ impl_init(const struct spa_handle_factory *factory,
this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
this->dbus = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DBus);
this->main_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Loop);
this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop);
this->main_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System);
this->data_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem);
this->plugin_loader = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_PluginLoader);
spa_log_topic_init(this->log, &log_topic);

View file

@ -522,6 +522,8 @@ void spa_bt_device_update_last_bluez_action_time(struct spa_bt_device *device);
#define spa_bt_device_add_listener(d,listener,events,data) \
spa_hook_list_append(&(d)->listener_list, listener, events, data)
struct spa_bt_iso_io;
struct spa_bt_sco_io;
struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_loop *data_loop, int fd, uint16_t read_mtu, uint16_t write_mtu);
@ -611,6 +613,7 @@ struct spa_bt_transport {
uint8_t bap_cig;
uint8_t bap_cis;
struct spa_bt_iso_io *iso_io;
struct spa_bt_sco_io *sco_io;
struct spa_source volume_timer;

377
spa/plugins/bluez5/iso-io.c Normal file
View file

@ -0,0 +1,377 @@
/* Spa ISO I/O */
/* SPDX-FileCopyrightText: Copyright © 2023 Pauli Virtanen. */
/* SPDX-License-Identifier: MIT */
#include <unistd.h>
#include <stddef.h>
#include <stdio.h>
#include <errno.h>
#include <limits.h>
#include <sys/socket.h>
#include <spa/support/loop.h>
#include <spa/support/log.h>
#include <spa/utils/list.h>
#include <spa/utils/string.h>
#include <spa/utils/result.h>
#include <spa/node/io.h>
#include <bluetooth/bluetooth.h>
#include "config.h"
#include "iso-io.h"
static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.iso");
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
#define IDLE_TIME (100 * SPA_NSEC_PER_MSEC)
struct group {
struct spa_log *log;
struct spa_log_topic log_topic;
struct spa_loop *data_loop;
struct spa_system *data_system;
struct spa_source source;
struct spa_list streams;
int timerfd;
uint8_t cig;
uint64_t next;
uint64_t duration;
uint32_t paused;
};
struct stream {
struct spa_bt_iso_io this;
struct spa_list link;
struct group *group;
int fd;
bool sink;
bool idle;
spa_bt_iso_io_pull_t pull;
};
struct modify_info
{
struct stream *stream;
struct spa_list *streams;
};
static int do_modify(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct modify_info *info = user_data;
if (info->streams)
spa_list_append(info->streams, &info->stream->link);
else
spa_list_remove(&info->stream->link);
return 0;
}
static void stream_link(struct group *group, struct stream *stream)
{
struct modify_info info = { .stream = stream, .streams = &group->streams };
int res;
res = spa_loop_invoke(group->data_loop, do_modify, 0, NULL, 0, true, &info);
spa_assert_se(res == 0);
}
static void stream_unlink(struct stream *stream)
{
struct modify_info info = { .stream = stream, .streams = NULL };
int res;
res = spa_loop_invoke(stream->group->data_loop, do_modify, 0, NULL, 0, true, &info);
spa_assert_se(res == 0);
}
static int set_timeout(struct group *group, uint64_t time)
{
struct itimerspec ts;
ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
return spa_system_timerfd_settime(group->data_system,
group->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
static int set_timers(struct group *group)
{
struct timespec now;
spa_system_clock_gettime(group->data_system, CLOCK_MONOTONIC, &now);
group->next = SPA_ROUND_UP(SPA_TIMESPEC_TO_NSEC(&now) + group->duration,
group->duration);
return set_timeout(group, group->next);
}
static void group_on_timeout(struct spa_source *source)
{
struct group *group = source->data;
struct stream *stream;
uint64_t exp;
int res;
bool active = false;
if ((res = spa_system_timerfd_read(group->data_system, group->timerfd, &exp)) < 0) {
if (res != -EAGAIN)
spa_log_warn(group->log, "%p: ISO group:%u error reading timerfd: %s",
group, group->cig, spa_strerror(res));
return;
}
/*
* If an idle stream activates when another stream is already active,
* pause output of all streams for a while to avoid desynchronization.
*/
spa_list_for_each(stream, &group->streams, link) {
if (!stream->sink)
continue;
if (!stream->idle) {
active = true;
break;
}
}
spa_list_for_each(stream, &group->streams, link) {
if (!stream->sink)
continue;
if (stream->idle && stream->this.size > 0 && active && !group->paused)
group->paused = 1u + IDLE_TIME / group->duration;
stream->idle = (stream->this.size == 0);
}
if (group->paused) {
--group->paused;
spa_log_debug(group->log, "%p: ISO group:%d paused:%u", group, group->cig, group->paused);
}
/* Produce output */
spa_list_for_each(stream, &group->streams, link) {
int res;
if (!stream->sink)
continue;
if (stream->idle)
continue;
if (group->paused) {
stream->this.size = 0;
continue;
}
res = send(stream->fd, stream->this.buf, stream->this.size, MSG_DONTWAIT | MSG_NOSIGNAL);
if (res < 0)
res = -errno;
spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u res:%d",
group, group->cig, stream->fd, (unsigned)stream->this.size,
(unsigned)stream->this.timestamp, res);
stream->this.size = 0;
}
/* Pull data for the next interval */
group->next += exp * group->duration;
spa_list_for_each(stream, &group->streams, link) {
if (!stream->sink)
continue;
if (stream->pull) {
stream->this.now = group->next;
stream->pull(&stream->this);
} else {
stream->this.size = 0;
}
}
set_timeout(group, group->next);
}
static struct group *group_create(int fd, struct spa_log *log, struct spa_loop *data_loop,
struct spa_system *data_system)
{
#ifdef HAVE_BLUETOOTH_BAP
struct group *group;
struct bt_iso_qos qos;
socklen_t len;
len = sizeof(qos);
if (getsockopt(fd, SOL_BLUETOOTH, BT_ISO_QOS, &qos, &len) < 0)
return NULL;
if (qos.out.interval <= 5000) {
errno = EINVAL;
return NULL;
}
group = calloc(1, sizeof(struct group));
if (group == NULL)
return NULL;
spa_log_topic_init(log, &log_topic);
group->cig = qos.cig;
group->log = log;
group->data_loop = data_loop;
group->data_system = data_system;
group->duration = qos.out.interval * SPA_NSEC_PER_USEC;
spa_list_init(&group->streams);
group->timerfd = spa_system_timerfd_create(group->data_system,
CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
if (group->timerfd < 0) {
int err = errno;
free(group);
errno = err;
return NULL;
}
group->source.data = group;
group->source.fd = group->timerfd;
group->source.func = group_on_timeout;
group->source.mask = SPA_IO_IN;
group->source.rmask = 0;
spa_loop_add_source(group->data_loop, &group->source);
return group;
#else
errno = EOPNOTSUPP;
return NULL;
#endif
}
static int do_remove_source(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct group *group = user_data;
if (group->source.loop)
spa_loop_remove_source(group->data_loop, &group->source);
set_timeout(group, 0);
return 0;
}
static void group_destroy(struct group *group)
{
int res;
spa_assert(spa_list_is_empty(&group->streams));
res = spa_loop_invoke(group->data_loop, do_remove_source, 0, NULL, 0, true, group);
spa_assert_se(res == 0);
close(group->timerfd);
free(group);
}
struct stream *stream_create(int fd, bool sink, struct group *group)
{
struct stream *stream;
stream = calloc(1, sizeof(struct stream));
if (stream == NULL)
return NULL;
stream->fd = fd;
stream->sink = sink;
stream->group = group;
stream->this.duration = group->duration;
stream_link(group, stream);
return stream;
}
struct spa_bt_iso_io *spa_bt_iso_io_create(int fd, bool sink, struct spa_log *log,
struct spa_loop *data_loop, struct spa_system *data_system)
{
struct stream *stream;
struct group *group;
group = group_create(fd, log, data_loop, data_system);
if (group == NULL)
return NULL;
stream = stream_create(fd, sink, group);
if (stream == NULL) {
int err = errno;
group_destroy(group);
errno = err;
return NULL;
}
return &stream->this;
}
struct spa_bt_iso_io *spa_bt_iso_io_attach(struct spa_bt_iso_io *this, int fd, bool sink)
{
struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this);
stream = stream_create(fd, sink, stream->group);
if (stream == NULL)
return NULL;
return &stream->this;
}
void spa_bt_iso_io_destroy(struct spa_bt_iso_io *this)
{
struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this);
stream_unlink(stream);
if (spa_list_is_empty(&stream->group->streams))
group_destroy(stream->group);
free(stream);
}
static bool group_is_enabled(struct group *group)
{
struct stream *stream;
spa_list_for_each(stream, &group->streams, link)
if (stream->pull)
return true;
return false;
}
/** Must be called from data thread */
void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *this, spa_bt_iso_io_pull_t pull, void *user_data)
{
struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this);
bool was_enabled, enabled;
was_enabled = group_is_enabled(stream->group);
stream->pull = pull;
stream->this.user_data = user_data;
enabled = group_is_enabled(stream->group);
if (!enabled && was_enabled)
set_timeout(stream->group, 0);
else if (enabled && !was_enabled)
set_timers(stream->group);
if (pull == NULL) {
stream->this.size = 0;
return;
}
/* Pull data now for the next interval */
stream->this.now = stream->group->next;
stream->pull(&stream->this);
}

View file

@ -0,0 +1,39 @@
/* Spa Bluez5 ISO I/O */
/* SPDX-FileCopyrightText: Copyright © 2023 Pauli Virtanen */
/* SPDX-License-Identifier: MIT */
#ifndef SPA_BLUEZ5_ISO_IO_H
#define SPA_BLUEZ5_ISO_IO_H
#include <spa/utils/defs.h>
#include <spa/support/loop.h>
#include <spa/support/log.h>
#include <spa/node/io.h>
/**
* ISO I/O.
*
* Synchronizes related writes from different streams in the same group
* to occur at same real time instant (or not at all).
*/
struct spa_bt_iso_io
{
uint64_t now;
uint64_t duration;
uint32_t timestamp;
uint8_t buf[4096];
size_t size;
void *user_data;
};
typedef void (*spa_bt_iso_io_pull_t)(struct spa_bt_iso_io *io);
struct spa_bt_iso_io *spa_bt_iso_io_create(int fd, bool sink, struct spa_log *log,
struct spa_loop *data_loop, struct spa_system *data_system);
struct spa_bt_iso_io *spa_bt_iso_io_attach(struct spa_bt_iso_io *io, int fd, bool sink);
void spa_bt_iso_io_destroy(struct spa_bt_iso_io *io);
void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *io, spa_bt_iso_io_pull_t pull, void *user_data);
#endif

View file

@ -19,6 +19,7 @@ bluez5_sources = [
'sco-sink.c',
'sco-source.c',
'sco-io.c',
'iso-io.c',
'quirks.c',
'player.c',
'bluez5-device.c',