diff --git a/spa/plugins/bluez5/decode-buffer.h b/spa/plugins/bluez5/decode-buffer.h index 44bdc15d2..65e69c1ed 100644 --- a/spa/plugins/bluez5/decode-buffer.h +++ b/spa/plugins/bluez5/decode-buffer.h @@ -168,6 +168,11 @@ static inline void *spa_bt_decode_buffer_get_write(struct spa_bt_decode_buffer * return SPA_PTROFF(this->buffer_decoded, this->write_index, void); } +static inline size_t spa_bt_decode_buffer_get_size(struct spa_bt_decode_buffer *this) +{ + return this->write_index - this->read_index; +} + static inline void spa_bt_decode_buffer_write_packet(struct spa_bt_decode_buffer *this, uint32_t size, uint64_t nsec) { int32_t remain; diff --git a/spa/plugins/bluez5/media-source.c b/spa/plugins/bluez5/media-source.c index 6abafe054..ce33761a4 100644 --- a/spa/plugins/bluez5/media-source.c +++ b/spa/plugins/bluez5/media-source.c @@ -51,6 +51,8 @@ struct props { #define MAX_BUFFERS 32 +#define MAX_PLC_PACKETS 16 + struct buffer { uint32_t id; unsigned int outstanding:1; @@ -163,6 +165,9 @@ struct impl { uint64_t now; uint64_t sample_count; + int seqnum; + uint32_t plc_packets; + uint32_t errqueue_count; struct delay_info delay; @@ -451,38 +456,140 @@ again: return size_read; } +static int produce_plc_data(struct impl *this) +{ + struct port *port = &this->port; + uint32_t avail; + int res; + void *buf; + + if (!this->codec->produce_plc) + return -ENOTSUP; + + buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail); + res = this->codec->produce_plc(this->codec_data, buf, avail); + if (res <= 0) + return res; + + spa_bt_decode_buffer_write_packet(&port->buffer, res, 0); + + spa_log_debug(this->log, "%p: produced PLC audio, frames:%u", + this, (unsigned int)(res / port->frame_size)); + + this->plc_packets++; + return res; +} + static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size, - uint8_t *dst, uint32_t dst_size) + uint8_t *dst, uint32_t dst_size, uint32_t *dst_out) { ssize_t processed; size_t written, avail; + size_t src_avail = src_size; + uint16_t seqnum = this->seqnum + 1; + + *dst_out = 0; if ((processed = this->codec->start_decode(this->codec_data, - src, src_size, NULL, NULL)) < 0) + src, src_avail, &seqnum, NULL)) < 0) return processed; - /* TODO: check seqnum and handle PLC */ - src += processed; - src_size -= processed; + src_avail -= processed; + + if (this->seqnum < 0) { + /* first packet */ + } else if (this->codec->stream_pkt && this->seqnum == seqnum) { + /* previous packet continues */ + } else { + uint16_t lost = seqnum - (uint16_t)(this->seqnum + 1); + if (lost) + spa_log_debug(this->log, "%p: lost packets:%u (%u -> %u)", + this, (unsigned int)lost, this->seqnum + 1, seqnum); + + if (this->plc_packets > MAX_PLC_PACKETS || lost > MAX_PLC_PACKETS) { + /* Don't try to compensate for too big skips */ + this->plc_packets = 0; + lost = 0; + } + + if (lost >= this->plc_packets) { + lost -= this->plc_packets; + } else { + /* We already produced PLC audio for this packet. However, this + * only occurs if we are underflowing, so we should retain this + * packet regardless and let rate matching take care of it. + */ + lost = 0; + } + + /* Pad with PLC audio for any missing packets */ + while (lost > 0 && produce_plc_data(this) > 0) + --lost; + + this->plc_packets = 0; + } /* decode */ avail = dst_size; do { written = 0; if ((processed = this->codec->decode(this->codec_data, - src, src_size, dst, avail, &written)) < 0) + src, src_avail, dst, avail, &written)) < 0) return processed; /* update source and dest pointers */ spa_return_val_if_fail (avail > written, -ENOSPC); - src_size -= processed; + src_avail -= processed; src += processed; avail -= written; dst += written; - } while (src_size && (processed || written)); + } while (src_avail && (processed || written) && !this->codec->stream_pkt); - return dst_size - avail; + this->seqnum = seqnum; + + *dst_out = dst_size - avail; + return src_size - src_avail; +} + +static void add_data(struct impl *this, uint8_t *src, uint32_t src_size, uint64_t now) +{ + struct port *port = &this->port; + uint32_t decoded; + + spa_log_trace(this->log, "read socket data size:%d", src_size); + + do { + int32_t consumed; + uint32_t avail; + void *buf; + uint64_t dt; + + buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail); + + consumed = decode_data(this, src, src_size, buf, avail, &decoded); + if (consumed < 0) { + spa_log_debug(this->log, "%p: failed to decode data: %d", this, consumed); + return; + } + + src = SPA_PTROFF(src, consumed, void); + src_size -= consumed; + + /* discard when not started */ + if (this->started) + spa_bt_decode_buffer_write_packet(&port->buffer, decoded, now); + + if (decoded) { + dt = now - this->now; + this->now = now; + spa_log_trace(this->log, "decoded socket data seq:%u size:%d frames:%d dt:%d dms", + (unsigned int)this->seqnum, (int)decoded, (int)decoded/port->frame_size, + (int)(dt / 100000)); + } else { + spa_log_trace(this->log, "no decoded socket data"); + } + } while (this->codec->stream_pkt && src_size && decoded); } static void handle_errqueue(struct impl *this) @@ -506,11 +613,7 @@ static void handle_errqueue(struct impl *this) static void media_on_ready_read(struct spa_source *source) { struct impl *this = source->data; - struct port *port = &this->port; - void *buf; - int32_t size_read, decoded; - uint32_t avail; - uint64_t dt; + int32_t size_read; uint64_t now = 0; /* make sure the source is an input */ @@ -547,32 +650,7 @@ static void media_on_ready_read(struct spa_source *source) this->codec_props_changed = false; } - /* decode to buffer */ - buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail); - spa_log_trace(this->log, "read socket data size:%d, avail:%d", size_read, avail); - decoded = decode_data(this, this->buffer_read, size_read, buf, avail); - if (decoded < 0) { - spa_log_debug(this->log, "failed to decode data: %d", decoded); - return; - } - if (decoded == 0) { - spa_log_trace(this->log, "no decoded socket data"); - return; - } - - /* discard when not started */ - if (!this->started) - return; - - spa_bt_decode_buffer_write_packet(&port->buffer, decoded, now); - - dt = now - this->now; - this->now = now; - - spa_log_trace(this->log, "decoded socket data size:%d frames:%d dt:%d dms", - (int)decoded, (int)decoded/port->frame_size, - (int)(dt / 100000)); - + add_data(this, this->buffer_read, size_read, now); return; stop: @@ -586,11 +664,6 @@ stop: static int media_sco_pull(void *userdata, uint8_t *buffer_read, int size_read, uint64_t now) { struct impl *this = userdata; - struct port *port = &this->port; - void *buf; - int32_t decoded; - uint32_t avail; - uint64_t dt; if (this->transport == NULL) { spa_log_debug(this->log, "no transport, stop reading"); @@ -600,32 +673,7 @@ static int media_sco_pull(void *userdata, uint8_t *buffer_read, int size_read, u if (size_read == 0) return 0; - /* decode to buffer */ - buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail); - spa_log_trace(this->log, "read socket data size:%d, avail:%d", size_read, avail); - decoded = decode_data(this, buffer_read, size_read, buf, avail); - if (decoded < 0) { - spa_log_debug(this->log, "failed to decode data: %d", decoded); - return 0; - } - if (decoded == 0) { - spa_log_trace(this->log, "no decoded socket data"); - return 0; - } - - /* discard when not started */ - if (!this->started) - return 0; - - spa_bt_decode_buffer_write_packet(&port->buffer, decoded, now); - - dt = now - this->now; - this->now = now; - - spa_log_trace(this->log, "decoded socket data size:%d frames:%d dt:%d dms", - (int)decoded, (int)decoded/port->frame_size, - (int)(dt / 100000)); - + add_data(this, buffer_read, size_read, now); return 0; stop: @@ -851,6 +899,8 @@ static int transport_start(struct impl *this) this->sample_count = 0; this->errqueue_count = 0; + this->seqnum = -1; + this->io_error = false; if (this->codec->kind != MEDIA_CODEC_HFP) { @@ -1603,40 +1653,44 @@ static void process_buffering(struct impl *this) struct port *port = &this->port; uint32_t duration; const uint32_t samples = get_samples(this, &duration); + uint32_t data_size = samples * port->frame_size; uint32_t avail; - void *buf; update_target_latency(this); + if (samples > this->quantum_limit) + return; + + /* Produce PLC data if possible to avoid underrun */ + while (spa_bt_decode_buffer_get_size(&port->buffer) < data_size) { + if (produce_plc_data(this) <= 0) + break; + } + spa_bt_decode_buffer_process(&port->buffer, samples, duration, this->position ? this->position->clock.rate_diff : 1.0, this->position ? this->position->clock.next_nsec : 0); setup_matching(this); - buf = spa_bt_decode_buffer_get_read(&port->buffer, &avail); - /* copy data to buffers */ if (!spa_list_is_empty(&port->free)) { struct buffer *buffer; struct spa_data *datas; - uint32_t data_size; + void *buf; buffer = spa_list_first(&port->free, struct buffer, link); datas = buffer->buf->datas; - data_size = samples * port->frame_size; - WARN_ONCE(datas[0].maxsize < data_size && !this->following, this->log, "source buffer too small (%u < %u)", datas[0].maxsize, data_size); data_size = SPA_MIN(data_size, SPA_ROUND_DOWN(datas[0].maxsize, port->frame_size)); + buf = spa_bt_decode_buffer_get_read(&port->buffer, &avail); avail = SPA_MIN(avail, data_size); - spa_bt_decode_buffer_read(&port->buffer, avail); - spa_list_remove(&buffer->link); spa_log_trace(this->log, "dequeue %d", buffer->id); @@ -1653,10 +1707,9 @@ static void process_buffering(struct impl *this) memcpy(datas[0].data, buf, avail); - /* pad with silence - * - * TODO: should do PLC instead - */ + spa_bt_decode_buffer_read(&port->buffer, avail); + + /* Pad with silence, if PLC failed to produce enough */ if (avail < data_size) memset(SPA_PTROFF(datas[0].data, avail, void), 0, data_size - avail);