diff --git a/spa/plugins/bluez5/a2dp-source.c b/spa/plugins/bluez5/a2dp-source.c index 4d0ad14f9..9b3fa2ee6 100644 --- a/spa/plugins/bluez5/a2dp-source.c +++ b/spa/plugins/bluez5/a2dp-source.c @@ -302,6 +302,8 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size) { const ssize_t header_size = sizeof(struct rtp_header) + sizeof(struct rtp_payload); struct port *port = &this->port; + struct spa_io_buffers *io = port->io; + int32_t io_done_status = io->status; struct buffer *buffer; struct spa_data *data; uint8_t *dest; @@ -312,73 +314,84 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size) return; } - /* skip the header */ + /* Skip the header */ src += header_size; src_size -= header_size; - /* check if we have a new buffer */ - if (spa_list_is_empty(&port->free)) { - spa_log_warn(this->log, "no more buffers available, dropping data..."); - return; - } + /* Decode data if we have a buffer free */ + if (!spa_list_is_empty(&port->free)) { + /* Get the free buffer and remove it from the free list */ + buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&buffer->link); - /* get the buffer */ - buffer = spa_list_first(&port->free, struct buffer, link); - - /* remove the the buffer from the list */ - spa_list_remove(&buffer->link); - /* update the outstanding flag */ - buffer->outstanding = true; - - /* set the header */ - if (buffer->h) { - buffer->h->seq = this->sample_count; - buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); - buffer->h->dts_offset = 0; - } - - /* get the dest data values */ - data = buffer->buf->datas; - dest = data[0].data; - dest_size = data[0].maxsize; - - /* decode the source data */ - spa_log_debug(this->log, "decoding data for buffer_id=%d %zd %zd", - buffer->id, src_size, dest_size); - - while (src_size > 0 && dest_size > 0) { - decoded = sbc_decode(&this->sbc, - src, src_size, - dest, dest_size, &written); - if (decoded <= 0) { - spa_log_error(this->log, "Decoding error. (%zd)", decoded); - return; + /* Set the header */ + if (buffer->h) { + buffer->h->seq = this->sample_count; + buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); + buffer->h->dts_offset = 0; } - /* update source and dest pointers */ - src_size -= decoded; - src += decoded; - dest_size -= written; - dest += written; + /* get the dest data values */ + data = buffer->buf->datas; + dest = data[0].data; + dest_size = data[0].maxsize; + + /* decode the source data */ + spa_log_debug(this->log, "decoding data for buffer_id=%d %zd %zd", + buffer->id, src_size, dest_size); + while (src_size > 0 && dest_size > 0) { + decoded = sbc_decode(&this->sbc, + src, src_size, + dest, dest_size, &written); + if (decoded <= 0) { + spa_log_error(this->log, "Decoding error. (%zd)", decoded); + return; + } + + /* update source and dest pointers */ + src_size -= decoded; + src += decoded; + dest_size -= written; + dest += written; + } + + /* make sure all data has been decoded */ + spa_assert(src_size <= 0); + + /* set the decoded data */ + data[0].chunk->offset = 0; + data[0].chunk->size = data[0].maxsize - dest_size; + data[0].chunk->stride = port->frame_size; + + /* update the sample count */ + this->sample_count += data[0].chunk->size / port->frame_size; + + /* add the buffer to the queue */ + spa_log_debug(this->log, "data decoded %d successfully for buffer_id=%d", + data[0].chunk->size, buffer->id); + buffer->outstanding = true; + spa_list_append(&port->ready, &buffer->link); } - /* make sure all data has been decoded */ - spa_assert(src_size <= 0); + /* Process a buffer if there is one ready and IO does not have one */ + if (!spa_list_is_empty(&port->ready) && io->status != SPA_STATUS_HAVE_BUFFER) { + /* Get the ready buffer and remove it from the ready list */ + buffer = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&buffer->link); - /* set the decoded data */ - data[0].chunk->offset = 0; - data[0].chunk->size = data[0].maxsize - dest_size; - data[0].chunk->stride = port->frame_size; + /* Mark the buffer to be processed */ + io->buffer_id = buffer->id; + io->status = SPA_STATUS_HAVE_BUFFER; - /* update the sample count */ - this->sample_count += data[0].chunk->size / port->frame_size; + /* Add the buffer to the free list */ + spa_list_append(&port->free, &buffer->link); + buffer->outstanding = false; - /* add the buffer to the queue */ - spa_log_debug(this->log, "data decoded %d successfully for buffer_id=%d", - data[0].chunk->size, buffer->id); - spa_list_append(&port->ready, &buffer->link); + /* Set the done status as have buffer */ + io_done_status = SPA_STATUS_HAVE_BUFFER; + } - spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_BUFFER); + spa_node_call_ready(&this->callbacks, io_done_status); } static void a2dp_on_ready_read(struct spa_source *source) @@ -761,10 +774,7 @@ impl_node_port_enum_params(void *object, int seq, return -ENOENT; } - /* TODO: why filer is != NULL when linking it with a2dp-sink? */ - /* if filter is null a2dp-source cannot be linked with a2dp-sink, - * so for now we always pass NULL */ - if (spa_pod_filter(&b, &result.param, param, NULL) < 0) + if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); @@ -964,39 +974,35 @@ static int impl_node_process(void *object) struct impl *this = object; struct port *port; struct spa_io_buffers *io; - struct buffer *b; + struct buffer *buffer; - /* get IO */ spa_return_val_if_fail(this != NULL, -EINVAL); port = &this->port; io = port->io; spa_return_val_if_fail(io != NULL, -EIO); - /* don't do anything if IO does not need a buffer */ - if (io->status != SPA_STATUS_NEED_BUFFER) - return io->status; - - /* Recycle previously played buffer */ - if (io->buffer_id != SPA_ID_INVALID && - io->buffer_id < port->n_buffers) { - spa_log_debug(this->log, "recycling buffer_id=%d", io->buffer_id); - recycle_buffer(this, port, io->buffer_id); - io->buffer_id = SPA_ID_INVALID; - } - - /* Check if we have new buffers in the queue */ - if (spa_list_is_empty(&port->ready)) + /* Return if we already have a buffer */ + if (io->status == SPA_STATUS_HAVE_BUFFER) return SPA_STATUS_HAVE_BUFFER; - /* Pop the new buffer from the queue */ - b = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&b->link); + /* Return if there is not buffers ready to be processed */ + if (spa_list_is_empty(&port->ready)) + return io->status; - /* Set the new buffer in IO to be played */ - io->buffer_id = b->id; + /* Get the new buffer from the ready list */ + buffer = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&buffer->link); + + /* Set the new buffer in IO */ + io->buffer_id = buffer->id; io->status = SPA_STATUS_HAVE_BUFFER; + /* Add the buffer to the free list */ + spa_list_append(&port->free, &buffer->link); + buffer->outstanding = false; + + /* Notify we have a buffer ready to be processed */ return SPA_STATUS_HAVE_BUFFER; } diff --git a/spa/plugins/bluez5/sco-source.c b/spa/plugins/bluez5/sco-source.c index 225415cb1..245aacd88 100644 --- a/spa/plugins/bluez5/sco-source.c +++ b/spa/plugins/bluez5/sco-source.c @@ -53,6 +53,7 @@ struct props { uint32_t max_latency; }; +#define FILL_FRAMES 2 #define MAX_BUFFERS 32 struct buffer { @@ -295,22 +296,19 @@ static bool read_data(struct impl *this, uint8_t *data, uint32_t size, uint32_t const uint32_t mtu_size = this->transport->read_mtu; uint32_t local_total_read = 0; - /* TODO: For now we assume the size is always a mutliple of mtu_size */ - while (local_total_read < (size - mtu_size)) { + /* Read chunks of mtu_size */ + while (local_total_read <= (size - mtu_size)) { const int bytes_read = read(this->sock_fd, data, mtu_size); - if (bytes_read == 0) { - /* Stop */ - return false; - } else if (bytes_read < 0) { + if (bytes_read < 0) { /* Retry */ if (errno == EINTR) continue; - /* Socked has no data so return total data read */ + /* Socked has no data */ if (errno == EAGAIN || errno == EWOULDBLOCK) goto done; - /* Print error and stop */ + /* Error */ spa_log_error(this->log, "read error: %s", strerror(errno)); return false; } @@ -325,55 +323,81 @@ done: return true; } +static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id) +{ + struct buffer *b = &port->buffers[buffer_id]; + + if (b->outstanding) { + spa_log_trace(this->log, NAME " %p: recycle buffer %u", this, buffer_id); + spa_list_append(&port->free, &b->link); + b->outstanding = false; + } +} + static void sco_on_ready_read(struct spa_source *source) { struct impl *this = source->data; struct port *port = &this->port; + struct spa_io_buffers *io = port->io; + int32_t io_done_status = io->status; struct buffer *buffer; struct spa_data *buffer_data; uint32_t total_read; - /* update the current pts */ - clock_gettime(CLOCK_MONOTONIC, &this->now); + spa_return_if_fail(io != NULL); - /* check if we have a new buffer */ - if (spa_list_is_empty(&port->free)) { - spa_log_warn(this->log, "waiting for buffer"); - return; + /* Read a buffer if there is one free */ + if (!spa_list_is_empty(&port->free)) { + /* Get the free buffer and remove it from the free list */ + buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&buffer->link); + + buffer_data = &buffer->buf->datas[0]; + spa_assert(buffer_data->data); + + /* Read sco data */ + if (!read_data(this, buffer_data->data, buffer_data->maxsize, &total_read)) { + if (this->source.loop) + spa_loop_remove_source(this->data_loop, &this->source); + return; + } + + /* Append a ready buffer if data could be read */ + if (total_read > 0) { + /* Update the buffer offset, size and stride */ + buffer_data->chunk->offset = 0; + buffer_data->chunk->size = total_read; + buffer_data->chunk->stride = port->frame_size; + + /* Update the sample count */ + this->sample_count += buffer_data->chunk->size / port->frame_size; + + /* Add the buffer to the ready list */ + buffer->outstanding = true; + spa_list_append(&port->ready, &buffer->link); + } } - /* get the buffer data */ - buffer = spa_list_first(&port->free, struct buffer, link); - buffer_data = &buffer->buf->datas[0]; - spa_assert(buffer_data->data); + /* Process a buffer if there is one ready and IO does not have one */ + if (!spa_list_is_empty(&port->ready) && io->status != SPA_STATUS_HAVE_BUFFER) { + /* Get the ready buffer and remove it from the ready list */ + buffer = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&buffer->link); - /* read data */ - if (!read_data(this, buffer_data->data, buffer_data->maxsize, &total_read)) - goto stop; - if (total_read == 0) - return; + /* Mark the buffer to be processed */ + io->buffer_id = buffer->id; + io->status = SPA_STATUS_HAVE_BUFFER; - /* update the buffer offset, size and stride */ - buffer_data->chunk->offset = 0; - buffer_data->chunk->size = total_read; - buffer_data->chunk->stride = port->frame_size; + /* Add the buffer to the free list */ + spa_list_append(&port->free, &buffer->link); + buffer->outstanding = false; - /* update the sample count */ - this->sample_count += buffer_data->chunk->size / port->frame_size; + /* Set the done status as have buffer */ + io_done_status = SPA_STATUS_HAVE_BUFFER; + } - /* remove the buffer from the free list and add it to the ready list */ - spa_list_remove(&buffer->link); - buffer->outstanding = true; - spa_list_append(&port->ready, &buffer->link); - - /* Notify we are ready for the next buffer */ - spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_BUFFER); - - return; - -stop: - if (this->source.loop) - spa_loop_remove_source(this->data_loop, &this->source); + /* Notify the current status */ + spa_node_call_ready(&this->callbacks, io_done_status); } static int do_start(struct impl *this) @@ -397,12 +421,12 @@ static int do_start(struct impl *this) return -1; /* Set the write MTU */ - val = this->transport->write_mtu; + val = FILL_FRAMES * this->transport->write_mtu; if (setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)) < 0) spa_log_warn(this->log, "sco-source %p: SO_SNDBUF %m", this); /* Set the read MTU */ - val = this->transport->read_mtu; + val = FILL_FRAMES * this->transport->read_mtu; if (setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)) < 0) spa_log_warn(this->log, "sco-source %p: SO_RCVBUF %m", this); @@ -681,10 +705,7 @@ impl_node_port_enum_params(void *object, int seq, return -ENOENT; } - /* TODO: why filer is != NULL when linking it with sco-sink? */ - /* if filter is null sco-source cannot be linked with sco-sink, - * so for now we always pass NULL */ - if (spa_pod_filter(&b, &result.param, param, NULL) < 0) + if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); @@ -847,17 +868,6 @@ impl_node_port_set_io(void *object, return 0; } -static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id) -{ - struct buffer *b = &port->buffers[buffer_id]; - - if (b->outstanding) { - spa_log_trace(this->log, NAME " %p: recycle buffer %u", this, buffer_id); - spa_list_append(&port->free, &b->link); - b->outstanding = false; - } -} - static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) { struct impl *this = object; @@ -884,39 +894,35 @@ static int impl_node_process(void *object) struct impl *this = object; struct port *port; struct spa_io_buffers *io; - struct buffer *b; + struct buffer *buffer; - /* get IO */ spa_return_val_if_fail(this != NULL, -EINVAL); port = &this->port; io = port->io; spa_return_val_if_fail(io != NULL, -EIO); - /* don't do anything if IO does not need a buffer */ - if (io->status != SPA_STATUS_NEED_BUFFER) - return io->status; - - /* Recycle previously played buffer */ - if (io->buffer_id != SPA_ID_INVALID && - io->buffer_id < port->n_buffers) { - spa_log_debug(this->log, "recycling buffer_id=%d", io->buffer_id); - recycle_buffer(this, port, io->buffer_id); - io->buffer_id = SPA_ID_INVALID; - } - - /* Check if we have new buffers in the queue */ - if (spa_list_is_empty(&port->ready)) + /* Return if we already have a buffer */ + if (io->status == SPA_STATUS_HAVE_BUFFER) return SPA_STATUS_HAVE_BUFFER; - /* Pop the new buffer from the queue */ - b = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&b->link); + /* Return if there is not buffers ready to be processed */ + if (spa_list_is_empty(&port->ready)) + return io->status; - /* Set the new buffer in IO to be played */ - io->buffer_id = b->id; + /* Get the new buffer from the ready list */ + buffer = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&buffer->link); + + /* Set the new buffer in IO */ + io->buffer_id = buffer->id; io->status = SPA_STATUS_HAVE_BUFFER; + /* Add the buffer to the free list */ + spa_list_append(&port->free, &buffer->link); + buffer->outstanding = false; + + /* Notify we have a buffer ready to be processed */ return SPA_STATUS_HAVE_BUFFER; }