From 2ae4322cca8412fae52cd975015d1d8563ce2a53 Mon Sep 17 00:00:00 2001 From: Julian Bouzas Date: Tue, 7 Jul 2020 10:01:41 -0400 Subject: [PATCH] a2dpsource: fill the buffer as much as possible Fills the buffer until the buffer data size is maxsize / frame_size --- spa/plugins/bluez5/a2dp-source.c | 269 ++++++++++++++++--------------- 1 file changed, 141 insertions(+), 128 deletions(-) diff --git a/spa/plugins/bluez5/a2dp-source.c b/spa/plugins/bluez5/a2dp-source.c index 5f383242a..4724be11c 100644 --- a/spa/plugins/bluez5/a2dp-source.c +++ b/spa/plugins/bluez5/a2dp-source.c @@ -89,7 +89,8 @@ struct port { struct spa_list free; struct spa_list ready; - size_t ready_offset; + struct buffer *current_buffer; + uint32_t ready_offset; }; struct impl { @@ -302,107 +303,81 @@ static void reset_buffers(struct port *port) } } -static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size) +static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id) { - const size_t header_size = sizeof(struct rtp_header) + sizeof(struct rtp_payload); - struct port *port = &this->port; - struct spa_io_buffers *io = port->io; - int32_t io_done_status = io->status; - struct buffer *buffer; - struct spa_data *data; - uint8_t *dest; - size_t decoded, dest_size, written; + struct buffer *b = &port->buffers[buffer_id]; - if (src_size <= header_size) { - spa_log_error(this->log, "not valid header found. dropping data..."); - return; + if (b->outstanding) { + spa_log_trace(this->log, NAME " %p: recycle buffer %u", this, buffer_id); + spa_list_append(&port->free, &b->link); + b->outstanding = false; + } +} + +static int32_t read_data(struct impl *this) { + const ssize_t b_size = sizeof(this->buffer_read); + int32_t size_read = 0; + +again: + /* read data from socket */ + size_read = read(this->transport->fd, this->buffer_read, b_size); + + if (size_read == 0) + return 0; + else if (size_read < 0) { + /* retry if interrupted */ + if (errno == EINTR) + goto again; + + /* return socked has no data */ + if (errno == EAGAIN || errno == EWOULDBLOCK) + return 0; + + /* go to 'stop' if socket has an error */ + spa_log_error(this->log, "read error: %s", strerror(errno)); + return -errno; } - /* Skip the header */ + return size_read; +} + +static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size, + uint8_t *dst, uint32_t dst_size) +{ + const uint32_t header_size = sizeof(struct rtp_header) + sizeof(struct rtp_payload); + ssize_t processed; + size_t written, avail; + + /* skip the header */ + spa_return_val_if_fail (src_size > header_size, -EINVAL); src += header_size; src_size -= header_size; - /* Decode data if we have a buffer free */ - if (!spa_list_is_empty(&port->free)) { - /* Get the free buffer and remove it from the free list */ - buffer = spa_list_first(&port->free, struct buffer, link); - spa_list_remove(&buffer->link); + /* decode */ + avail = dst_size; + while (src_size > 0) { + processed = sbc_decode(&this->sbc, src, src_size, dst, avail, &written); + if (processed <= 0) + return processed; - /* Set the header */ - if (buffer->h) { - buffer->h->seq = this->sample_count; - buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); - buffer->h->dts_offset = 0; - } - - /* get the dest data values */ - data = buffer->buf->datas; - dest = data[0].data; - dest_size = data[0].maxsize; - - /* decode the source data */ - spa_log_debug(this->log, "decoding data for buffer_id=%d %zd %zd", - buffer->id, src_size, dest_size); - while (src_size > 0 && dest_size > 0) { - decoded = sbc_decode(&this->sbc, - src, src_size, - dest, dest_size, &written); - if (decoded <= 0) { - spa_log_error(this->log, "Decoding error. (%zd)", decoded); - return; - } - - /* update source and dest pointers */ - src_size -= decoded; - src += decoded; - dest_size -= written; - dest += written; - } - - /* make sure all data has been decoded */ - spa_assert(src_size <= 0); - - /* set the decoded data */ - data[0].chunk->offset = 0; - data[0].chunk->size = data[0].maxsize - dest_size; - data[0].chunk->stride = port->frame_size; - - /* update the sample count */ - this->sample_count += data[0].chunk->size / port->frame_size; - - /* add the buffer to the queue */ - spa_log_debug(this->log, "data decoded %d successfully for buffer_id=%d", - data[0].chunk->size, buffer->id); - buffer->outstanding = true; - spa_list_append(&port->ready, &buffer->link); + /* update source and dest pointers */ + spa_return_val_if_fail (avail > written, -ENOSPC); + src_size -= processed; + src += processed; + avail -= written; + dst += written; } - /* Process a buffer if there is one ready and IO does not have one */ - if (!spa_list_is_empty(&port->ready) && io->status != SPA_STATUS_HAVE_DATA) { - /* Get the ready buffer and remove it from the ready list */ - buffer = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&buffer->link); - - /* Mark the buffer to be processed */ - io->buffer_id = buffer->id; - io->status = SPA_STATUS_HAVE_DATA; - - /* Add the buffer to the free list */ - spa_list_append(&port->free, &buffer->link); - buffer->outstanding = false; - - /* Set the done status as have buffer */ - io_done_status = SPA_STATUS_HAVE_DATA; - } - - spa_node_call_ready(&this->callbacks, io_done_status); + return dst_size - avail; } static void a2dp_on_ready_read(struct spa_source *source) { struct impl *this = source->data; - const ssize_t buffer_size = sizeof(this->buffer_read); - ssize_t size_read; + struct port *port = &this->port; + struct spa_io_buffers *io = port->io; + int32_t size_read; + struct spa_data *datas; /* make sure the source is an input */ if ((source->rmask & SPA_IO_IN) == 0) { @@ -417,35 +392,79 @@ static void a2dp_on_ready_read(struct spa_source *source) /* update the current pts */ spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); -again: - /* read data from socket */ - size_read = read(this->transport->fd, this->buffer_read, buffer_size); - spa_log_debug(this->log, "read socket data %zd/%zd", size_read, buffer_size); - - if (size_read == 0) { + /* read */ + size_read = read_data (this); + if (size_read == 0) + return; + if (size_read < 0) { + spa_log_error(this->log, "failed to read data"); goto stop; } - else if (size_read < 0) { - /* retry if interrupted */ - if (errno == EINTR) - goto again; + spa_log_debug(this->log, "read socket data %d", size_read); - /* return socked has no data */ - if (errno == EAGAIN || errno == EWOULDBLOCK) - return; - - /* go to 'stop' if socket has an error */ - spa_log_error(this->log, "read error: %s", strerror(errno)); + /* decode */ + uint8_t read_decoded [4096]; + int32_t decoded = decode_data(this, this->buffer_read, size_read, + read_decoded, sizeof (read_decoded)); + if (decoded <= 0) { + spa_log_error(this->log, "failed to decode data"); goto stop; } + spa_log_debug(this->log, "decoded socket data %d", decoded); - /* make sure size_read is not bigger than the buffer_size */ - spa_assert(size_read <= buffer_size); + /* get buffer */ + if (!port->current_buffer) { + if (spa_list_is_empty(&port->free)) { + spa_log_warn(this->log, "buffer not available"); + return; + } + port->current_buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&port->current_buffer->link); + if (port->current_buffer->h) { + port->current_buffer->h->seq = this->sample_count; + port->current_buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); + port->current_buffer->h->dts_offset = 0; + } + port->ready_offset = 0; + } + datas = port->current_buffer->buf->datas; - /* decode the data */ - decode_sbc_data(this, this->buffer_read, size_read); + /* copy data into buffer */ + memcpy ((uint8_t *)datas[0].data + port->ready_offset, read_decoded, decoded); + port->ready_offset += decoded; - /* done reading */ + /* send buffer if full */ + if ((port->ready_offset + sizeof (read_decoded)) >= datas[0].maxsize / port->frame_size) { + datas[0].chunk->offset = 0; + datas[0].chunk->size = port->ready_offset; + datas[0].chunk->stride = port->frame_size; + + this->sample_count += datas[0].chunk->size / port->frame_size; + spa_list_append(&port->ready, &port->current_buffer->link); + port->current_buffer = NULL; + } + + /* done if there are no buffers ready */ + if (spa_list_is_empty(&port->ready)) + return; + + /* process the buffer if IO does not have any */ + if (io != NULL && io->status != SPA_STATUS_HAVE_DATA) { + struct buffer *b; + + if (io->buffer_id < port->n_buffers) + recycle_buffer(this, port, io->buffer_id); + + b = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&b->link); + b->outstanding = true; + + io->buffer_id = b->id; + io->status = SPA_STATUS_HAVE_DATA; + } + + /* notify ready */ + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); return; stop: @@ -760,10 +779,12 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, - /* 8 buffers are enough to make sure we always have one available when decoding */ - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), - SPA_PARAM_BUFFERS_size, SPA_POD_Int(this->props.max_latency * port->frame_size), + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( + this->props.max_latency * port->frame_size, + this->props.min_latency * port->frame_size, + INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size), SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); break; @@ -968,17 +989,6 @@ impl_node_port_set_io(void *object, return 0; } -static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id) -{ - struct buffer *b = &port->buffers[buffer_id]; - - if (b->outstanding) { - spa_log_trace(this->log, NAME " %p: recycle buffer %u", this, buffer_id); - spa_list_append(&port->free, &b->link); - b->outstanding = false; - } -} - static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) { struct impl *this = object; @@ -1017,22 +1027,25 @@ static int impl_node_process(void *object) if (io->status == SPA_STATUS_HAVE_DATA) return SPA_STATUS_HAVE_DATA; - /* Return if there is not buffers ready to be processed */ + /* Recycle */ + if (io->buffer_id < port->n_buffers) { + recycle_buffer(this, port, io->buffer_id); + io->buffer_id = SPA_ID_INVALID; + } + + /* Return if there are no buffers ready to be processed */ if (spa_list_is_empty(&port->ready)) - return io->status; + return SPA_STATUS_OK; /* Get the new buffer from the ready list */ buffer = spa_list_first(&port->ready, struct buffer, link); spa_list_remove(&buffer->link); + buffer->outstanding = false; /* Set the new buffer in IO */ io->buffer_id = buffer->id; io->status = SPA_STATUS_HAVE_DATA; - /* Add the buffer to the free list */ - spa_list_append(&port->free, &buffer->link); - buffer->outstanding = false; - /* Notify we have a buffer ready to be processed */ return SPA_STATUS_HAVE_DATA; }