mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-12-16 08:56:45 -05:00
bluez5: rework sco i/o + autodetect mtu
Move SCO polling to a single place, and abstract mtu handling. Autodetect suitable tx packet size based on rx, instead of relying on the kernel providing correct values.
This commit is contained in:
parent
518365e350
commit
368182d963
9 changed files with 423 additions and 79 deletions
|
|
@ -111,8 +111,6 @@ struct impl {
|
|||
|
||||
unsigned int started:1;
|
||||
|
||||
struct spa_source source;
|
||||
|
||||
struct spa_io_clock *clock;
|
||||
struct spa_io_position *position;
|
||||
|
||||
|
|
@ -120,7 +118,6 @@ struct impl {
|
|||
sbc_t msbc;
|
||||
bool msbc_seq_initialized;
|
||||
uint8_t msbc_seq;
|
||||
uint8_t read_buffer[4096];
|
||||
|
||||
/* mSBC frame parsing */
|
||||
uint8_t msbc_buffer[MSBC_ENCODED_SIZE];
|
||||
|
|
@ -284,28 +281,6 @@ static void reset_buffers(struct port *port)
|
|||
}
|
||||
}
|
||||
|
||||
static int read_data(struct impl *this, uint8_t *data, uint32_t data_size)
|
||||
{
|
||||
int res = 0;
|
||||
|
||||
again:
|
||||
res = read(this->transport->fd, data, data_size);
|
||||
if (res <= 0) {
|
||||
/* retry if interrupted */
|
||||
if (errno == EINTR)
|
||||
goto again;
|
||||
|
||||
/* return socked has no data */
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||
return res;
|
||||
|
||||
/* error */
|
||||
return -errno;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id)
|
||||
{
|
||||
struct buffer *b = &port->buffers[buffer_id];
|
||||
|
|
@ -364,21 +339,14 @@ static void msbc_buffer_append_byte(struct impl *this, uint8_t byte)
|
|||
++this->msbc_buffer_pos;
|
||||
}
|
||||
|
||||
static void sco_on_ready_read(struct spa_source *source)
|
||||
static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read)
|
||||
{
|
||||
struct impl *this = source->data;
|
||||
struct impl *this = userdata;
|
||||
struct port *port = &this->port;
|
||||
struct spa_io_buffers *io = port->io;
|
||||
int size_read;
|
||||
struct spa_data *datas;
|
||||
uint32_t max_out_size;
|
||||
uint8_t *packet;
|
||||
|
||||
/* make sure the source has input data */
|
||||
if ((source->rmask & SPA_IO_IN) == 0) {
|
||||
spa_log_error(this->log, "source has no input data, rmask=%d", source->rmask);
|
||||
goto stop;
|
||||
}
|
||||
if (this->transport == NULL) {
|
||||
spa_log_debug(this->log, "no transport, stop reading");
|
||||
goto stop;
|
||||
|
|
@ -388,7 +356,7 @@ static void sco_on_ready_read(struct spa_source *source)
|
|||
if (!port->current_buffer) {
|
||||
if (spa_list_is_empty(&port->free)) {
|
||||
spa_log_warn(this->log, "buffer not available");
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
port->current_buffer = spa_list_first(&port->free, struct buffer, link);
|
||||
spa_list_remove(&port->current_buffer->link);
|
||||
|
|
@ -398,28 +366,21 @@ static void sco_on_ready_read(struct spa_source *source)
|
|||
|
||||
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) {
|
||||
max_out_size = MSBC_DECODED_SIZE;
|
||||
packet = this->read_buffer;
|
||||
} else {
|
||||
max_out_size = this->transport->read_mtu;
|
||||
packet = (uint8_t *)datas[0].data + port->ready_offset;
|
||||
}
|
||||
|
||||
/* update the current pts */
|
||||
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
|
||||
|
||||
/* read */
|
||||
size_read = read_data(this, packet, this->transport->read_mtu);
|
||||
if (size_read < 0) {
|
||||
spa_log_error(this->log, "failed to read data");
|
||||
goto stop;
|
||||
}
|
||||
/* handle data read from socket */
|
||||
spa_log_debug(this->log, "read socket data %d", size_read);
|
||||
|
||||
if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) {
|
||||
int i;
|
||||
|
||||
for (i = 0; i < size_read; ++i) {
|
||||
msbc_buffer_append_byte(this, packet[i]);
|
||||
msbc_buffer_append_byte(this, read_data[i]);
|
||||
|
||||
/* Handle found mSBC packets.
|
||||
*
|
||||
|
|
@ -455,8 +416,12 @@ static void sco_on_ready_read(struct spa_source *source)
|
|||
port->ready_offset += written;
|
||||
}
|
||||
}
|
||||
} else
|
||||
} else {
|
||||
uint8_t *packet;
|
||||
packet = (uint8_t *)datas[0].data + port->ready_offset;
|
||||
spa_memmove(packet, read_data, size_read);
|
||||
port->ready_offset += size_read;
|
||||
}
|
||||
|
||||
/* send buffer if full */
|
||||
if ((max_out_size + port->ready_offset) > (this->props.max_latency * port->frame_size)) {
|
||||
|
|
@ -482,7 +447,7 @@ static void sco_on_ready_read(struct spa_source *source)
|
|||
|
||||
/* done if there are no buffers ready */
|
||||
if (spa_list_is_empty(&port->ready))
|
||||
return;
|
||||
return 0;
|
||||
|
||||
/* process the buffer if IO does not have any */
|
||||
if (io->status != SPA_STATUS_HAVE_DATA) {
|
||||
|
|
@ -501,11 +466,24 @@ static void sco_on_ready_read(struct spa_source *source)
|
|||
|
||||
/* notify ready */
|
||||
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA);
|
||||
return;
|
||||
return 0;
|
||||
|
||||
stop:
|
||||
if (this->source.loop)
|
||||
spa_loop_remove_source(this->data_loop, &this->source);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int do_add_source(struct spa_loop *loop,
|
||||
bool async,
|
||||
uint32_t seq,
|
||||
const void *data,
|
||||
size_t size,
|
||||
void *user_data)
|
||||
{
|
||||
struct impl *this = user_data;
|
||||
|
||||
spa_bt_sco_io_set_source_cb(this->transport->sco_io, sco_source_cb, this);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int do_start(struct impl *this)
|
||||
|
|
@ -538,16 +516,11 @@ static int do_start(struct impl *this)
|
|||
this->msbc_seq_initialized = false;
|
||||
|
||||
this->msbc_buffer_pos = 0;
|
||||
spa_assert(this->transport->read_mtu <= sizeof(this->read_buffer));
|
||||
}
|
||||
|
||||
/* Add the ready read callback */
|
||||
this->source.data = this;
|
||||
this->source.fd = this->transport->fd;
|
||||
this->source.func = sco_on_ready_read;
|
||||
this->source.mask = SPA_IO_IN;
|
||||
this->source.rmask = 0;
|
||||
spa_loop_add_source(this->data_loop, &this->source);
|
||||
/* Start socket i/o */
|
||||
spa_bt_transport_ensure_sco_io(this->transport, this->data_loop);
|
||||
spa_loop_invoke(this->data_loop, do_add_source, 0, NULL, 0, true, this);
|
||||
|
||||
/* Set the started flag */
|
||||
this->started = true;
|
||||
|
|
@ -564,8 +537,7 @@ static int do_remove_source(struct spa_loop *loop,
|
|||
{
|
||||
struct impl *this = user_data;
|
||||
|
||||
if (this->source.loop)
|
||||
spa_loop_remove_source(this->data_loop, &this->source);
|
||||
spa_bt_sco_io_set_source_cb(this->transport->sco_io, NULL, NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue