diff --git a/spa/plugins/bluez5/backend-hsp-native.c b/spa/plugins/bluez5/backend-hsp-native.c index b4995dfc2..14415a21f 100644 --- a/spa/plugins/bluez5/backend-hsp-native.c +++ b/spa/plugins/bluez5/backend-hsp-native.c @@ -265,6 +265,11 @@ static int sco_release_cb(void *data) spa_log_info(backend->log, "Transport %s released", t->path); + if (t->sco_io) { + spa_bt_sco_io_destroy(t->sco_io); + t->sco_io = NULL; + } + /* Shutdown and close the socket */ shutdown(t->fd, SHUT_RDWR); close(t->fd); diff --git a/spa/plugins/bluez5/backend-hsphfpd.c b/spa/plugins/bluez5/backend-hsphfpd.c index 3601f0824..8609e8b66 100644 --- a/spa/plugins/bluez5/backend-hsphfpd.c +++ b/spa/plugins/bluez5/backend-hsphfpd.c @@ -965,6 +965,11 @@ static int hsphfpd_audio_release(void *data) spa_log_debug(backend->log, NAME": transport %p: Release %s", transport, transport->path); + if (transport->sco_io) { + spa_bt_sco_io_destroy(transport->sco_io); + transport->sco_io = NULL; + } + /* shutdown to make sure connection is dropped immediately */ shutdown(transport->fd, SHUT_RDWR); close(transport->fd); diff --git a/spa/plugins/bluez5/backend-ofono.c b/spa/plugins/bluez5/backend-ofono.c index e6b62b186..df57cf3fb 100644 --- a/spa/plugins/bluez5/backend-ofono.c +++ b/spa/plugins/bluez5/backend-ofono.c @@ -246,6 +246,11 @@ static int ofono_audio_release(void *data) spa_log_debug(backend->log, NAME": transport %p: Release %s", transport, transport->path); + if (transport->sco_io) { + spa_bt_sco_io_destroy(transport->sco_io); + transport->sco_io = NULL; + } + /* shutdown to make sure connection is dropped immediately */ shutdown(transport->fd, SHUT_RDWR); close(transport->fd); diff --git a/spa/plugins/bluez5/bluez5-dbus.c b/spa/plugins/bluez5/bluez5-dbus.c index 5dd6f3d00..fe0f6137e 100644 --- a/spa/plugins/bluez5/bluez5-dbus.c +++ b/spa/plugins/bluez5/bluez5-dbus.c @@ -701,6 +701,7 @@ struct spa_bt_transport *spa_bt_transport_create(struct spa_bt_monitor *monitor, t->monitor = monitor; t->path = path; t->fd = -1; + t->sco_io = NULL; t->user_data = SPA_MEMBER(t, sizeof(struct spa_bt_transport), void); spa_hook_list_init(&t->listener_list); @@ -731,6 +732,11 @@ void spa_bt_transport_free(struct spa_bt_transport *transport) spa_bt_transport_emit_destroy(transport); + if (transport->sco_io) { + spa_bt_sco_io_destroy(transport->sco_io); + transport->sco_io = NULL; + } + spa_bt_transport_destroy(transport); if (transport->fd >= 0) { @@ -792,6 +798,16 @@ int spa_bt_transport_release(struct spa_bt_transport *transport) return res; } +void spa_bt_transport_ensure_sco_io(struct spa_bt_transport *t, struct spa_loop *data_loop) +{ + if (t->sco_io == NULL) { + t->sco_io = spa_bt_sco_io_create(data_loop, + t->fd, + t->read_mtu, + t->write_mtu); + } +} + static int transport_update_props(struct spa_bt_transport *transport, DBusMessageIter *props_iter, DBusMessageIter *invalidated_iter) diff --git a/spa/plugins/bluez5/defs.h b/spa/plugins/bluez5/defs.h index 1d6ab2480..1eafac4d7 100644 --- a/spa/plugins/bluez5/defs.h +++ b/spa/plugins/bluez5/defs.h @@ -246,6 +246,14 @@ struct spa_bt_device *spa_bt_device_find_by_address(struct spa_bt_monitor *monit int spa_bt_device_connect_profile(struct spa_bt_device *device, enum spa_bt_profile profile); int spa_bt_device_check_profiles(struct spa_bt_device *device, bool force); +struct spa_bt_sco_io; + +struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_loop *data_loop, int fd, uint16_t write_mtu, uint16_t read_mtu); +void spa_bt_sco_io_destroy(struct spa_bt_sco_io *io); +void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *userdata, uint8_t *data, int size), void *userdata); +void spa_bt_sco_io_set_sink_cb(struct spa_bt_sco_io *io, int (*sink_cb)(void *userdata), void *userdata); +int spa_bt_sco_io_write(struct spa_bt_sco_io *io, uint8_t *data, int size); + enum spa_bt_transport_state { SPA_BT_TRANSPORT_STATE_IDLE, SPA_BT_TRANSPORT_STATE_PENDING, @@ -290,6 +298,7 @@ struct spa_bt_transport { uint16_t write_mtu; uint16_t delay; void *user_data; + struct spa_bt_sco_io *sco_io; struct spa_hook_list listener_list; struct spa_callbacks impl; @@ -304,6 +313,7 @@ struct spa_bt_transport *spa_bt_transport_find_full(struct spa_bt_monitor *monit int spa_bt_transport_acquire(struct spa_bt_transport *t, bool optional); int spa_bt_transport_release(struct spa_bt_transport *t); +void spa_bt_transport_ensure_sco_io(struct spa_bt_transport *t, struct spa_loop *data_loop); #define spa_bt_transport_emit(t,m,v,...) spa_hook_list_call(&(t)->listener_list, \ struct spa_bt_transport_events, \ diff --git a/spa/plugins/bluez5/meson.build b/spa/plugins/bluez5/meson.build index 07f4016b4..7c40ef67a 100644 --- a/spa/plugins/bluez5/meson.build +++ b/spa/plugins/bluez5/meson.build @@ -6,6 +6,7 @@ bluez5_sources = ['plugin.c', 'a2dp-source.c', 'sco-sink.c', 'sco-source.c', + 'sco-io.c', 'bluez5-device.c', 'bluez5-dbus.c'] diff --git a/spa/plugins/bluez5/sco-io.c b/spa/plugins/bluez5/sco-io.c new file mode 100644 index 000000000..29720384e --- /dev/null +++ b/spa/plugins/bluez5/sco-io.c @@ -0,0 +1,300 @@ +/* Spa SCO I/O + * + * Copyright © 2019 Collabora Ltd. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "defs.h" + + +/* We'll use the read rx data size to find the correct packet size for writing, + * since kernel might not report it as the socket MTU, see + * https://lore.kernel.org/linux-bluetooth/20201210003528.3pmaxvubiwegxmhl@pali/T/ + * + * Since 24 is the packet size for the smallest setting (ALT1), we'll stop + * reading when rx packet of at least this size is seen, and use its size as the + * heuristic maximum write MTU. Of course, if we have a source connected, we'll + * continue reading without stopping. + * + * XXX: when the kernel/backends start giving the right values, the heuristic + * XXX: can be removed + */ +#define HEURISTIC_MIN_MTU 24 + +#define MAX_MTU 1024 + + +struct spa_bt_sco_io { + int started:1; + + uint8_t read_buffer[MAX_MTU]; + uint32_t read_size; + + int fd; + uint16_t read_mtu; + uint16_t write_mtu; + + struct spa_loop *data_loop; + struct spa_source source; + + int (*source_cb)(void *userdata, uint8_t *data, int size); + void *source_userdata; + + int (*sink_cb)(void *userdata); + void *sink_userdata; +}; + + +static void update_source(struct spa_bt_sco_io *io) +{ + int enabled; + int changed = 0; + + enabled = io->source_cb != NULL || io->read_size < HEURISTIC_MIN_MTU; + if (SPA_FLAG_IS_SET(io->source.mask, SPA_IO_IN) != enabled) { + SPA_FLAG_UPDATE(io->source.mask, SPA_IO_IN, enabled); + changed = 1; + } + + enabled = io->sink_cb != NULL; + if (SPA_FLAG_IS_SET(io->source.mask, SPA_IO_OUT) != enabled) { + SPA_FLAG_UPDATE(io->source.mask, SPA_IO_OUT, enabled); + changed = 1; + } + + if (changed) { + spa_loop_update_source(io->data_loop, &io->source); + } +} + +static void sco_io_on_ready(struct spa_source *source) +{ + struct spa_bt_sco_io *io = source->data; + + if (SPA_FLAG_IS_SET(source->rmask, SPA_IO_IN)) { + int res; + + /* + * Note that we will read from the socket for a few times even + * when there is no source callback, to autodetect packet size. + */ + + read_again: + res = read(io->fd, io->read_buffer, SPA_MIN(io->read_mtu, MAX_MTU)); + if (res <= 0) { + if (errno == EINTR) { + /* retry if interrupted */ + goto read_again; + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* no data: try it next time */ + goto read_done; + } + + /* error */ + goto stop; + } + + io->read_size = res; + + if (io->source_cb) { + int res; + res = io->source_cb(io->source_userdata, io->read_buffer, io->read_size); + if (res) { + io->source_cb = NULL; + } + } + } + +read_done: + if (SPA_FLAG_IS_SET(source->rmask, SPA_IO_OUT)) { + if (io->sink_cb) { + int res; + res = io->sink_cb(io->sink_userdata); + if (res) { + io->sink_cb = NULL; + } + } + } + + if (SPA_FLAG_IS_SET(source->rmask, SPA_IO_ERR) || SPA_FLAG_IS_SET(source->rmask, SPA_IO_HUP)) { + goto stop; + } + + /* Poll socket in/out only if necessary */ + update_source(io); + + return; + +stop: + if (io->source.loop) { + spa_loop_remove_source(io->data_loop, &io->source); + io->started = false; + } +} + +/* + * Write data to socket in correctly sized blocks. + * Returns the number of bytes written, 0 when data cannot be written now or + * there is too little of it to write, and <0 on write error. + */ +int spa_bt_sco_io_write(struct spa_bt_sco_io *io, uint8_t *buf, int size) +{ + uint16_t packet_size; + uint8_t *buf_start = buf; + + packet_size = (io->read_size > 0) ? SPA_MIN(io->write_mtu, io->read_size) : io->write_mtu; + spa_assert(packet_size > 0); + + if (size < packet_size) { + return 0; + } + + do { + int written; + + written = write(io->fd, buf, packet_size); + if (written < 0) { + if (errno == EINTR) { + /* retry if interrupted */ + continue; + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* Don't continue writing */ + break; + } + return -errno; + } + + buf += written; + size -= written; + } while (size >= packet_size); + + return buf - buf_start; +} + + +struct spa_bt_sco_io *spa_bt_sco_io_create(struct spa_loop *data_loop, + int fd, + uint16_t read_mtu, + uint16_t write_mtu) +{ + struct spa_bt_sco_io *io; + + io = calloc(1, sizeof(struct spa_bt_sco_io)); + if (io == NULL) + return io; + + io->fd = fd; + io->read_mtu = read_mtu; + io->write_mtu = write_mtu; + io->data_loop = data_loop; + + io->read_size = 0; + + /* Add the ready callback */ + io->source.data = io; + io->source.fd = io->fd; + io->source.func = sco_io_on_ready; + io->source.mask = SPA_IO_IN | SPA_IO_OUT | SPA_IO_ERR | SPA_IO_HUP; + io->source.rmask = 0; + spa_loop_add_source(io->data_loop, &io->source); + + io->started = true; + + return io; +} + +static int do_remove_source(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct spa_bt_sco_io *io = user_data; + + if (io->source.loop) + spa_loop_remove_source(io->data_loop, &io->source); + + return 0; +} + +void spa_bt_sco_io_destroy(struct spa_bt_sco_io *io) +{ + if (io->started) + spa_loop_invoke(io->data_loop, do_remove_source, 0, NULL, 0, true, io); + + io->started = false; + free(io); +} + +/* Set source callback. + * This function should only be called from the data thread. + * Callback is called (in data loop) with data just read from the socket. + */ +void spa_bt_sco_io_set_source_cb(struct spa_bt_sco_io *io, int (*source_cb)(void *, uint8_t *, int), void *userdata) +{ + io->source_cb = source_cb; + io->source_userdata = userdata; + + if (io->started) { + update_source(io); + } +} + +/* Set sink callback. + * This function should only be called from the data thread. + * Callback is called (in data loop) when socket can be written to. + */ +void spa_bt_sco_io_set_sink_cb(struct spa_bt_sco_io *io, int (*sink_cb)(void *), void *userdata) +{ + io->sink_cb = sink_cb; + io->sink_userdata = userdata; + + if (io->started) { + update_source(io); + } +} diff --git a/spa/plugins/bluez5/sco-sink.c b/spa/plugins/bluez5/sco-sink.c index f45f08845..bcb16f338 100644 --- a/spa/plugins/bluez5/sco-sink.c +++ b/spa/plugins/bluez5/sco-sink.c @@ -131,6 +131,7 @@ struct impl { uint8_t *buffer; uint8_t *buffer_head; uint8_t *buffer_next; + int buffer_size; /* Times */ uint64_t start_time; @@ -383,6 +384,12 @@ static void flush_data(struct impl *this) this->start_time = now_time; if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { + /* Encode */ + if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) { + /* Buffer overrun; shouldn't usually happen. Drop data and reset. */ + this->buffer_head = this->buffer_next = this->buffer; + spa_log_warn(this->log, "sco-sink: mSBC buffer overrun, dropping data"); + } this->buffer_next[0] = 0x01; this->buffer_next[1] = sntable[sn]; this->buffer_next[59] = 0x00; @@ -394,27 +401,43 @@ static void flush_data(struct impl *this) return; } this->buffer_next += out_encoded + 3; - } + port->write_buffer_size = 0; -next_write: - written = write(this->transport->fd, packet, this->transport->write_mtu); - if (written <= 0) { - spa_log_debug(this->log, "failed to write data"); - goto stop; - } - port->write_buffer_size = 0; - spa_log_debug(this->log, "wrote socket data %d", written); - - if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { - this->buffer_head += written; - if (this->buffer_next - this->buffer_head >= this->transport->write_mtu) { - packet = this->buffer_head; - goto next_write; + /* Write */ + written = spa_bt_sco_io_write(this->transport->sco_io, packet, this->buffer_next - this->buffer_head); + if (written < 0) { + spa_log_warn(this->log, "failed to write data"); + goto stop; } + spa_log_trace(this->log, "wrote socket data %d", written); + + this->buffer_head += written; + if (this->buffer_head == this->buffer_next) this->buffer_head = this->buffer_next = this->buffer; - } else + else if (this->buffer_next + MSBC_ENCODED_SIZE > this->buffer + this->buffer_size) { + /* Written bytes is not necessarily commensurate + * with MSBC_ENCODED_SIZE. If this occurs, copy data. + */ + int size = this->buffer_next - this->buffer_head; + spa_memmove(this->buffer, this->buffer_head, size); + this->buffer_next = this->buffer + size; + this->buffer_head = this->buffer; + } + } else { + written = spa_bt_sco_io_write(this->transport->sco_io, packet, port->write_buffer_size); + if (written < 0) { + spa_log_warn(this->log, "sco-sink: write failure: %d", written); + goto stop; + } + processed = written; + port->write_buffer_size -= written; + + if (port->write_buffer_size > 0 && written > 0) { + spa_memmove(port->write_buffer, port->write_buffer + written, port->write_buffer_size); + } + } next_timeout = get_next_timeout(this, now_time, processed / port->frame_size); @@ -513,15 +536,22 @@ static int do_start(struct impl *this) /* Libsbc expects audio samples by default in host endianity, mSBC requires little endian */ this->msbc.endian = SBC_LE; - if (this->transport->write_mtu > MSBC_ENCODED_SIZE) - this->transport->write_mtu = MSBC_ENCODED_SIZE; - - this->buffer = calloc(lcm(this->transport->write_mtu, MSBC_ENCODED_SIZE), sizeof(uint8_t)); + /* write_mtu might not be correct at this point, so we'll throw + * in some common ones, at the cost of a potentially larger + * allocation (size <= 120 * write_mtu). If it still fails to be + * commensurate, we may end up doing memmoves, but nothing worse + * is going to happen. + */ + this->buffer_size = lcm(24, lcm(60, lcm(this->transport->write_mtu, 2 * MSBC_ENCODED_SIZE))); + this->buffer = calloc(this->buffer_size, sizeof(uint8_t)); this->buffer_head = this->buffer_next = this->buffer; } spa_return_val_if_fail(this->transport->write_mtu <= sizeof(this->port.write_buffer), -EINVAL); + /* start socket i/o */ + spa_bt_transport_ensure_sco_io(this->transport, this->data_loop); + /* Add the timeout callback */ this->source.data = this; this->source.fd = this->timerfd; diff --git a/spa/plugins/bluez5/sco-source.c b/spa/plugins/bluez5/sco-source.c index 6da7c1d46..cafe9f499 100644 --- a/spa/plugins/bluez5/sco-source.c +++ b/spa/plugins/bluez5/sco-source.c @@ -111,8 +111,6 @@ struct impl { unsigned int started:1; - struct spa_source source; - struct spa_io_clock *clock; struct spa_io_position *position; @@ -120,7 +118,6 @@ struct impl { sbc_t msbc; bool msbc_seq_initialized; uint8_t msbc_seq; - uint8_t read_buffer[4096]; /* mSBC frame parsing */ uint8_t msbc_buffer[MSBC_ENCODED_SIZE]; @@ -284,28 +281,6 @@ static void reset_buffers(struct port *port) } } -static int read_data(struct impl *this, uint8_t *data, uint32_t data_size) -{ - int res = 0; - -again: - res = read(this->transport->fd, data, data_size); - if (res <= 0) { - /* retry if interrupted */ - if (errno == EINTR) - goto again; - - /* return socked has no data */ - if (errno == EAGAIN || errno == EWOULDBLOCK) - return res; - - /* error */ - return -errno; - } - - return res; -} - static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id) { struct buffer *b = &port->buffers[buffer_id]; @@ -364,21 +339,14 @@ static void msbc_buffer_append_byte(struct impl *this, uint8_t byte) ++this->msbc_buffer_pos; } -static void sco_on_ready_read(struct spa_source *source) +static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read) { - struct impl *this = source->data; + struct impl *this = userdata; struct port *port = &this->port; struct spa_io_buffers *io = port->io; - int size_read; struct spa_data *datas; uint32_t max_out_size; - uint8_t *packet; - /* make sure the source has input data */ - if ((source->rmask & SPA_IO_IN) == 0) { - spa_log_error(this->log, "source has no input data, rmask=%d", source->rmask); - goto stop; - } if (this->transport == NULL) { spa_log_debug(this->log, "no transport, stop reading"); goto stop; @@ -388,7 +356,7 @@ static void sco_on_ready_read(struct spa_source *source) if (!port->current_buffer) { if (spa_list_is_empty(&port->free)) { spa_log_warn(this->log, "buffer not available"); - return; + return 0; } port->current_buffer = spa_list_first(&port->free, struct buffer, link); spa_list_remove(&port->current_buffer->link); @@ -398,28 +366,21 @@ static void sco_on_ready_read(struct spa_source *source) if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { max_out_size = MSBC_DECODED_SIZE; - packet = this->read_buffer; } else { max_out_size = this->transport->read_mtu; - packet = (uint8_t *)datas[0].data + port->ready_offset; } /* update the current pts */ spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); - /* read */ - size_read = read_data(this, packet, this->transport->read_mtu); - if (size_read < 0) { - spa_log_error(this->log, "failed to read data"); - goto stop; - } + /* handle data read from socket */ spa_log_debug(this->log, "read socket data %d", size_read); if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { int i; for (i = 0; i < size_read; ++i) { - msbc_buffer_append_byte(this, packet[i]); + msbc_buffer_append_byte(this, read_data[i]); /* Handle found mSBC packets. * @@ -455,8 +416,12 @@ static void sco_on_ready_read(struct spa_source *source) port->ready_offset += written; } } - } else + } else { + uint8_t *packet; + packet = (uint8_t *)datas[0].data + port->ready_offset; + spa_memmove(packet, read_data, size_read); port->ready_offset += size_read; + } /* send buffer if full */ if ((max_out_size + port->ready_offset) > (this->props.max_latency * port->frame_size)) { @@ -482,7 +447,7 @@ static void sco_on_ready_read(struct spa_source *source) /* done if there are no buffers ready */ if (spa_list_is_empty(&port->ready)) - return; + return 0; /* process the buffer if IO does not have any */ if (io->status != SPA_STATUS_HAVE_DATA) { @@ -501,11 +466,24 @@ static void sco_on_ready_read(struct spa_source *source) /* notify ready */ spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); - return; + return 0; stop: - if (this->source.loop) - spa_loop_remove_source(this->data_loop, &this->source); + return 1; +} + +static int do_add_source(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct impl *this = user_data; + + spa_bt_sco_io_set_source_cb(this->transport->sco_io, sco_source_cb, this); + + return 0; } static int do_start(struct impl *this) @@ -538,16 +516,11 @@ static int do_start(struct impl *this) this->msbc_seq_initialized = false; this->msbc_buffer_pos = 0; - spa_assert(this->transport->read_mtu <= sizeof(this->read_buffer)); } - /* Add the ready read callback */ - this->source.data = this; - this->source.fd = this->transport->fd; - this->source.func = sco_on_ready_read; - this->source.mask = SPA_IO_IN; - this->source.rmask = 0; - spa_loop_add_source(this->data_loop, &this->source); + /* Start socket i/o */ + spa_bt_transport_ensure_sco_io(this->transport, this->data_loop); + spa_loop_invoke(this->data_loop, do_add_source, 0, NULL, 0, true, this); /* Set the started flag */ this->started = true; @@ -564,8 +537,7 @@ static int do_remove_source(struct spa_loop *loop, { struct impl *this = user_data; - if (this->source.loop) - spa_loop_remove_source(this->data_loop, &this->source); + spa_bt_sco_io_set_source_cb(this->transport->sco_io, NULL, NULL); return 0; }