bluez: process available buffers in read_ready callback for a2dp-source and sco-source

This commit is contained in:
Julian Bouzas 2019-08-25 18:29:53 -04:00 committed by Wim Taymans
parent 84405dae2a
commit 5363d3352c
2 changed files with 172 additions and 160 deletions

View file

@ -302,6 +302,8 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size)
{ {
const ssize_t header_size = sizeof(struct rtp_header) + sizeof(struct rtp_payload); const ssize_t header_size = sizeof(struct rtp_header) + sizeof(struct rtp_payload);
struct port *port = &this->port; struct port *port = &this->port;
struct spa_io_buffers *io = port->io;
int32_t io_done_status = io->status;
struct buffer *buffer; struct buffer *buffer;
struct spa_data *data; struct spa_data *data;
uint8_t *dest; uint8_t *dest;
@ -312,73 +314,84 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size)
return; return;
} }
/* skip the header */ /* Skip the header */
src += header_size; src += header_size;
src_size -= header_size; src_size -= header_size;
/* check if we have a new buffer */ /* Decode data if we have a buffer free */
if (spa_list_is_empty(&port->free)) { if (!spa_list_is_empty(&port->free)) {
spa_log_warn(this->log, "no more buffers available, dropping data..."); /* Get the free buffer and remove it from the free list */
return; buffer = spa_list_first(&port->free, struct buffer, link);
} spa_list_remove(&buffer->link);
/* get the buffer */ /* Set the header */
buffer = spa_list_first(&port->free, struct buffer, link); if (buffer->h) {
buffer->h->seq = this->sample_count;
/* remove the the buffer from the list */ buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
spa_list_remove(&buffer->link); buffer->h->dts_offset = 0;
/* update the outstanding flag */
buffer->outstanding = true;
/* set the header */
if (buffer->h) {
buffer->h->seq = this->sample_count;
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
buffer->h->dts_offset = 0;
}
/* get the dest data values */
data = buffer->buf->datas;
dest = data[0].data;
dest_size = data[0].maxsize;
/* decode the source data */
spa_log_debug(this->log, "decoding data for buffer_id=%d %zd %zd",
buffer->id, src_size, dest_size);
while (src_size > 0 && dest_size > 0) {
decoded = sbc_decode(&this->sbc,
src, src_size,
dest, dest_size, &written);
if (decoded <= 0) {
spa_log_error(this->log, "Decoding error. (%zd)", decoded);
return;
} }
/* update source and dest pointers */ /* get the dest data values */
src_size -= decoded; data = buffer->buf->datas;
src += decoded; dest = data[0].data;
dest_size -= written; dest_size = data[0].maxsize;
dest += written;
/* decode the source data */
spa_log_debug(this->log, "decoding data for buffer_id=%d %zd %zd",
buffer->id, src_size, dest_size);
while (src_size > 0 && dest_size > 0) {
decoded = sbc_decode(&this->sbc,
src, src_size,
dest, dest_size, &written);
if (decoded <= 0) {
spa_log_error(this->log, "Decoding error. (%zd)", decoded);
return;
}
/* update source and dest pointers */
src_size -= decoded;
src += decoded;
dest_size -= written;
dest += written;
}
/* make sure all data has been decoded */
spa_assert(src_size <= 0);
/* set the decoded data */
data[0].chunk->offset = 0;
data[0].chunk->size = data[0].maxsize - dest_size;
data[0].chunk->stride = port->frame_size;
/* update the sample count */
this->sample_count += data[0].chunk->size / port->frame_size;
/* add the buffer to the queue */
spa_log_debug(this->log, "data decoded %d successfully for buffer_id=%d",
data[0].chunk->size, buffer->id);
buffer->outstanding = true;
spa_list_append(&port->ready, &buffer->link);
} }
/* make sure all data has been decoded */ /* Process a buffer if there is one ready and IO does not have one */
spa_assert(src_size <= 0); if (!spa_list_is_empty(&port->ready) && io->status != SPA_STATUS_HAVE_BUFFER) {
/* Get the ready buffer and remove it from the ready list */
buffer = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&buffer->link);
/* set the decoded data */ /* Mark the buffer to be processed */
data[0].chunk->offset = 0; io->buffer_id = buffer->id;
data[0].chunk->size = data[0].maxsize - dest_size; io->status = SPA_STATUS_HAVE_BUFFER;
data[0].chunk->stride = port->frame_size;
/* update the sample count */ /* Add the buffer to the free list */
this->sample_count += data[0].chunk->size / port->frame_size; spa_list_append(&port->free, &buffer->link);
buffer->outstanding = false;
/* add the buffer to the queue */ /* Set the done status as have buffer */
spa_log_debug(this->log, "data decoded %d successfully for buffer_id=%d", io_done_status = SPA_STATUS_HAVE_BUFFER;
data[0].chunk->size, buffer->id); }
spa_list_append(&port->ready, &buffer->link);
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_BUFFER); spa_node_call_ready(&this->callbacks, io_done_status);
} }
static void a2dp_on_ready_read(struct spa_source *source) static void a2dp_on_ready_read(struct spa_source *source)
@ -761,10 +774,7 @@ impl_node_port_enum_params(void *object, int seq,
return -ENOENT; return -ENOENT;
} }
/* TODO: why filer is != NULL when linking it with a2dp-sink? */ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
/* if filter is null a2dp-source cannot be linked with a2dp-sink,
* so for now we always pass NULL */
if (spa_pod_filter(&b, &result.param, param, NULL) < 0)
goto next; goto next;
spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
@ -964,39 +974,35 @@ static int impl_node_process(void *object)
struct impl *this = object; struct impl *this = object;
struct port *port; struct port *port;
struct spa_io_buffers *io; struct spa_io_buffers *io;
struct buffer *b; struct buffer *buffer;
/* get IO */
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
port = &this->port; port = &this->port;
io = port->io; io = port->io;
spa_return_val_if_fail(io != NULL, -EIO); spa_return_val_if_fail(io != NULL, -EIO);
/* don't do anything if IO does not need a buffer */ /* Return if we already have a buffer */
if (io->status != SPA_STATUS_NEED_BUFFER) if (io->status == SPA_STATUS_HAVE_BUFFER)
return io->status;
/* Recycle previously played buffer */
if (io->buffer_id != SPA_ID_INVALID &&
io->buffer_id < port->n_buffers) {
spa_log_debug(this->log, "recycling buffer_id=%d", io->buffer_id);
recycle_buffer(this, port, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
}
/* Check if we have new buffers in the queue */
if (spa_list_is_empty(&port->ready))
return SPA_STATUS_HAVE_BUFFER; return SPA_STATUS_HAVE_BUFFER;
/* Pop the new buffer from the queue */ /* Return if there is not buffers ready to be processed */
b = spa_list_first(&port->ready, struct buffer, link); if (spa_list_is_empty(&port->ready))
spa_list_remove(&b->link); return io->status;
/* Set the new buffer in IO to be played */ /* Get the new buffer from the ready list */
io->buffer_id = b->id; buffer = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&buffer->link);
/* Set the new buffer in IO */
io->buffer_id = buffer->id;
io->status = SPA_STATUS_HAVE_BUFFER; io->status = SPA_STATUS_HAVE_BUFFER;
/* Add the buffer to the free list */
spa_list_append(&port->free, &buffer->link);
buffer->outstanding = false;
/* Notify we have a buffer ready to be processed */
return SPA_STATUS_HAVE_BUFFER; return SPA_STATUS_HAVE_BUFFER;
} }

View file

@ -53,6 +53,7 @@ struct props {
uint32_t max_latency; uint32_t max_latency;
}; };
#define FILL_FRAMES 2
#define MAX_BUFFERS 32 #define MAX_BUFFERS 32
struct buffer { struct buffer {
@ -295,22 +296,19 @@ static bool read_data(struct impl *this, uint8_t *data, uint32_t size, uint32_t
const uint32_t mtu_size = this->transport->read_mtu; const uint32_t mtu_size = this->transport->read_mtu;
uint32_t local_total_read = 0; uint32_t local_total_read = 0;
/* TODO: For now we assume the size is always a mutliple of mtu_size */ /* Read chunks of mtu_size */
while (local_total_read < (size - mtu_size)) { while (local_total_read <= (size - mtu_size)) {
const int bytes_read = read(this->sock_fd, data, mtu_size); const int bytes_read = read(this->sock_fd, data, mtu_size);
if (bytes_read == 0) { if (bytes_read < 0) {
/* Stop */
return false;
} else if (bytes_read < 0) {
/* Retry */ /* Retry */
if (errno == EINTR) if (errno == EINTR)
continue; continue;
/* Socked has no data so return total data read */ /* Socked has no data */
if (errno == EAGAIN || errno == EWOULDBLOCK) if (errno == EAGAIN || errno == EWOULDBLOCK)
goto done; goto done;
/* Print error and stop */ /* Error */
spa_log_error(this->log, "read error: %s", strerror(errno)); spa_log_error(this->log, "read error: %s", strerror(errno));
return false; return false;
} }
@ -325,55 +323,81 @@ done:
return true; return true;
} }
static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id)
{
struct buffer *b = &port->buffers[buffer_id];
if (b->outstanding) {
spa_log_trace(this->log, NAME " %p: recycle buffer %u", this, buffer_id);
spa_list_append(&port->free, &b->link);
b->outstanding = false;
}
}
static void sco_on_ready_read(struct spa_source *source) static void sco_on_ready_read(struct spa_source *source)
{ {
struct impl *this = source->data; struct impl *this = source->data;
struct port *port = &this->port; struct port *port = &this->port;
struct spa_io_buffers *io = port->io;
int32_t io_done_status = io->status;
struct buffer *buffer; struct buffer *buffer;
struct spa_data *buffer_data; struct spa_data *buffer_data;
uint32_t total_read; uint32_t total_read;
/* update the current pts */ spa_return_if_fail(io != NULL);
clock_gettime(CLOCK_MONOTONIC, &this->now);
/* check if we have a new buffer */ /* Read a buffer if there is one free */
if (spa_list_is_empty(&port->free)) { if (!spa_list_is_empty(&port->free)) {
spa_log_warn(this->log, "waiting for buffer"); /* Get the free buffer and remove it from the free list */
return; buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&buffer->link);
buffer_data = &buffer->buf->datas[0];
spa_assert(buffer_data->data);
/* Read sco data */
if (!read_data(this, buffer_data->data, buffer_data->maxsize, &total_read)) {
if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source);
return;
}
/* Append a ready buffer if data could be read */
if (total_read > 0) {
/* Update the buffer offset, size and stride */
buffer_data->chunk->offset = 0;
buffer_data->chunk->size = total_read;
buffer_data->chunk->stride = port->frame_size;
/* Update the sample count */
this->sample_count += buffer_data->chunk->size / port->frame_size;
/* Add the buffer to the ready list */
buffer->outstanding = true;
spa_list_append(&port->ready, &buffer->link);
}
} }
/* get the buffer data */ /* Process a buffer if there is one ready and IO does not have one */
buffer = spa_list_first(&port->free, struct buffer, link); if (!spa_list_is_empty(&port->ready) && io->status != SPA_STATUS_HAVE_BUFFER) {
buffer_data = &buffer->buf->datas[0]; /* Get the ready buffer and remove it from the ready list */
spa_assert(buffer_data->data); buffer = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&buffer->link);
/* read data */ /* Mark the buffer to be processed */
if (!read_data(this, buffer_data->data, buffer_data->maxsize, &total_read)) io->buffer_id = buffer->id;
goto stop; io->status = SPA_STATUS_HAVE_BUFFER;
if (total_read == 0)
return;
/* update the buffer offset, size and stride */ /* Add the buffer to the free list */
buffer_data->chunk->offset = 0; spa_list_append(&port->free, &buffer->link);
buffer_data->chunk->size = total_read; buffer->outstanding = false;
buffer_data->chunk->stride = port->frame_size;
/* update the sample count */ /* Set the done status as have buffer */
this->sample_count += buffer_data->chunk->size / port->frame_size; io_done_status = SPA_STATUS_HAVE_BUFFER;
}
/* remove the buffer from the free list and add it to the ready list */ /* Notify the current status */
spa_list_remove(&buffer->link); spa_node_call_ready(&this->callbacks, io_done_status);
buffer->outstanding = true;
spa_list_append(&port->ready, &buffer->link);
/* Notify we are ready for the next buffer */
spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_BUFFER);
return;
stop:
if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source);
} }
static int do_start(struct impl *this) static int do_start(struct impl *this)
@ -397,12 +421,12 @@ static int do_start(struct impl *this)
return -1; return -1;
/* Set the write MTU */ /* Set the write MTU */
val = this->transport->write_mtu; val = FILL_FRAMES * this->transport->write_mtu;
if (setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)) < 0) if (setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "sco-source %p: SO_SNDBUF %m", this); spa_log_warn(this->log, "sco-source %p: SO_SNDBUF %m", this);
/* Set the read MTU */ /* Set the read MTU */
val = this->transport->read_mtu; val = FILL_FRAMES * this->transport->read_mtu;
if (setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)) < 0) if (setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "sco-source %p: SO_RCVBUF %m", this); spa_log_warn(this->log, "sco-source %p: SO_RCVBUF %m", this);
@ -681,10 +705,7 @@ impl_node_port_enum_params(void *object, int seq,
return -ENOENT; return -ENOENT;
} }
/* TODO: why filer is != NULL when linking it with sco-sink? */ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
/* if filter is null sco-source cannot be linked with sco-sink,
* so for now we always pass NULL */
if (spa_pod_filter(&b, &result.param, param, NULL) < 0)
goto next; goto next;
spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
@ -847,17 +868,6 @@ impl_node_port_set_io(void *object,
return 0; return 0;
} }
static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id)
{
struct buffer *b = &port->buffers[buffer_id];
if (b->outstanding) {
spa_log_trace(this->log, NAME " %p: recycle buffer %u", this, buffer_id);
spa_list_append(&port->free, &b->link);
b->outstanding = false;
}
}
static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
{ {
struct impl *this = object; struct impl *this = object;
@ -884,39 +894,35 @@ static int impl_node_process(void *object)
struct impl *this = object; struct impl *this = object;
struct port *port; struct port *port;
struct spa_io_buffers *io; struct spa_io_buffers *io;
struct buffer *b; struct buffer *buffer;
/* get IO */
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
port = &this->port; port = &this->port;
io = port->io; io = port->io;
spa_return_val_if_fail(io != NULL, -EIO); spa_return_val_if_fail(io != NULL, -EIO);
/* don't do anything if IO does not need a buffer */ /* Return if we already have a buffer */
if (io->status != SPA_STATUS_NEED_BUFFER) if (io->status == SPA_STATUS_HAVE_BUFFER)
return io->status;
/* Recycle previously played buffer */
if (io->buffer_id != SPA_ID_INVALID &&
io->buffer_id < port->n_buffers) {
spa_log_debug(this->log, "recycling buffer_id=%d", io->buffer_id);
recycle_buffer(this, port, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
}
/* Check if we have new buffers in the queue */
if (spa_list_is_empty(&port->ready))
return SPA_STATUS_HAVE_BUFFER; return SPA_STATUS_HAVE_BUFFER;
/* Pop the new buffer from the queue */ /* Return if there is not buffers ready to be processed */
b = spa_list_first(&port->ready, struct buffer, link); if (spa_list_is_empty(&port->ready))
spa_list_remove(&b->link); return io->status;
/* Set the new buffer in IO to be played */ /* Get the new buffer from the ready list */
io->buffer_id = b->id; buffer = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&buffer->link);
/* Set the new buffer in IO */
io->buffer_id = buffer->id;
io->status = SPA_STATUS_HAVE_BUFFER; io->status = SPA_STATUS_HAVE_BUFFER;
/* Add the buffer to the free list */
spa_list_append(&port->free, &buffer->link);
buffer->outstanding = false;
/* Notify we have a buffer ready to be processed */
return SPA_STATUS_HAVE_BUFFER; return SPA_STATUS_HAVE_BUFFER;
} }