From 3be0e7e1c6b019b13383c4ed7da9cc182b4c94b1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 16 Oct 2020 13:13:04 +0200 Subject: [PATCH] a2dp-sink: fix playback Use a simple periodic timeout to pull in samples and write them to the socket --- spa/plugins/bluez5/a2dp-sink.c | 266 ++++++++++----------------------- 1 file changed, 80 insertions(+), 186 deletions(-) diff --git a/spa/plugins/bluez5/a2dp-sink.c b/spa/plugins/bluez5/a2dp-sink.c index 073d68d04..7d416cca1 100644 --- a/spa/plugins/bluez5/a2dp-sink.c +++ b/spa/plugins/bluez5/a2dp-sink.c @@ -59,7 +59,7 @@ struct props { }; #define FILL_FRAMES 2 -#define MAX_FRAME_COUNT 32 +#define MAX_FRAME_COUNT 16 #define MAX_BUFFERS 32 struct buffer { @@ -88,7 +88,6 @@ struct port { struct spa_list ready; size_t ready_offset; - unsigned int need_data:1; }; struct impl { @@ -122,6 +121,10 @@ struct impl { struct spa_io_clock *clock; struct spa_io_position *position; + uint64_t current_time; + uint64_t next_time; + uint64_t last_error; + sbc_t sbc; int read_size; int write_size; @@ -133,23 +136,12 @@ struct impl { int frame_count; uint16_t seqnum; uint32_t timestamp; + uint64_t sample_count; uint8_t tmp_buffer[512]; int tmp_buffer_used; int min_bitpool; int max_bitpool; - - uint64_t last_time; - uint64_t last_error; - - struct timespec now; - uint64_t start_time; - uint64_t sample_count; - uint64_t sample_time; - uint64_t last_ticks; - uint64_t last_monotonic; - - uint64_t underrun; }; #define NAME "a2dp-sink" @@ -242,24 +234,25 @@ static int impl_node_enum_params(void *object, int seq, return 0; } -static int set_timers(struct impl *this) +static int set_timeout(struct impl *this, uint64_t time) { struct itimerspec ts; - int res; - - ts.it_value.tv_sec = 0; - if (this->following) { - ts.it_value.tv_nsec = 0; - } else { - ts.it_value.tv_nsec = 1; - } + ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; ts.it_interval.tv_sec = 0; ts.it_interval.tv_nsec = 0; + return spa_system_timerfd_settime(this->data_system, + this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); +} - res = spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL); - this->source.mask = SPA_IO_IN; - spa_loop_update_source(this->data_loop, &this->source); - return res; +static int set_timers(struct impl *this) +{ + struct timespec now; + + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + this->next_time = SPA_TIMESPEC_TO_NSEC(&now); + + return set_timeout(this, this->following ? 0 : this->next_time); } static int do_reassign_follower(struct spa_loop *loop, @@ -335,21 +328,6 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, return 0; } -static inline void calc_timeout(size_t target, size_t current, - size_t rate, struct timespec *now, - struct timespec *ts) -{ - ts->tv_sec = now->tv_sec; - ts->tv_nsec = now->tv_nsec; - if (target > current) - ts->tv_nsec += ((target - current) * SPA_NSEC_PER_SEC) / rate; - - while (ts->tv_nsec >= SPA_NSEC_PER_SEC) { - ts->tv_sec++; - ts->tv_nsec -= SPA_NSEC_PER_SEC; - } -} - static int reset_buffer(struct impl *this) { this->buffer_used = sizeof(struct rtp_header) + sizeof(struct rtp_payload); @@ -359,7 +337,7 @@ static int reset_buffer(struct impl *this) static int send_buffer(struct impl *this) { - int val, written; + int written; struct rtp_header *header; struct rtp_payload *payload; @@ -376,14 +354,11 @@ static int send_buffer(struct impl *this) header->timestamp = htonl(this->timestamp); header->ssrc = htonl(1); - ioctl(this->transport->fd, TIOCOUTQ, &val); + spa_log_trace(this->log, NAME " %p: send %d %u %u %u", + this, this->frame_count, this->seqnum, this->timestamp, this->buffer_used); - spa_log_trace(this->log, NAME " %p: send %d %u %u %u %"PRIu64" %d", - this, this->frame_count, this->seqnum, this->timestamp, this->buffer_used, - this->sample_time, val); - - written = write(this->transport->fd, this->buffer, this->buffer_used); - spa_log_trace(this->log, NAME " %p: send %d", this, written); + written = send(this->transport->fd, this->buffer, this->buffer_used, MSG_DONTWAIT | MSG_NOSIGNAL); + spa_log_debug(this->log, NAME " %p: send %d", this, written); if (written < 0) return -errno; @@ -428,7 +403,6 @@ static int encode_buffer(struct impl *this, const void *data, int size) return processed; this->sample_count += processed / port->frame_size; - this->sample_time += processed / port->frame_size; this->frame_count += processed / this->codesize; this->buffer_used += out_encoded; @@ -439,7 +413,6 @@ static int encode_buffer(struct impl *this, const void *data, int size) processed = this->tmp_buffer_used; this->tmp_buffer_used = 0; } - return processed; } @@ -460,34 +433,6 @@ static int flush_buffer(struct impl *this, bool force) return 0; } -static int fill_socket(struct impl *this, uint64_t now_time) -{ - static const uint8_t zero_buffer[1024 * 4] = { 0, }; - int frames = 0; - - while (frames < FILL_FRAMES) { - int processed, written; - - processed = encode_buffer(this, zero_buffer, sizeof(zero_buffer)); - if (processed < 0) - return processed; - if (processed == 0) - break; - - written = flush_buffer(this, false); - if (written == -EAGAIN) - break; - else if (written < 0) - return written; - else if (written > 0) - frames++; - } - reset_buffer(this); - this->sample_count = this->timestamp; - - return 0; -} - static int add_data(struct impl *this, const void *data, int size) { int processed, total = 0; @@ -521,8 +466,6 @@ static int set_bitpool(struct impl *this, int bitpool) this->sbc.bitpool = bitpool; - spa_log_debug(this->log, NAME" %p: set bitpool %d", this, this->sbc.bitpool); - this->codesize = sbc_get_codesize(&this->sbc); /* make sure there's enough space in this->tmp_buffer */ spa_assert(this->codesize <= 512); @@ -536,6 +479,9 @@ static int set_bitpool(struct impl *this, int bitpool) this->write_samples = (this->write_size / this->frame_length) * (this->codesize / port->frame_size); + spa_log_info(this->log, NAME" %p: set bitpool %d codesize:%u frame_length:%u", + this, this->sbc.bitpool, this->codesize, this->frame_length); + return 0; } @@ -549,13 +495,18 @@ static int increase_bitpool(struct impl *this) return set_bitpool(this, this->sbc.bitpool + 1); } +static void enable_flush(struct impl *this, bool enabled) +{ + if (SPA_FLAG_IS_SET(this->flush_source.mask, SPA_IO_OUT) != enabled) { + SPA_FLAG_UPDATE(this->flush_source.mask, SPA_IO_OUT, enabled); + spa_loop_update_source(this->data_loop, &this->flush_source); + } +} + static int flush_data(struct impl *this, uint64_t now_time) { int written; uint32_t total_frames; - uint64_t elapsed; - int64_t queued; - struct itimerspec ts; struct port *port = &this->port; total_frames = 0; @@ -587,15 +538,11 @@ again: if (written > 0 && l1 > 0) written += add_data(this, src, l1); if (written <= 0) { - /* only request new data when the current buffer will be fully processed in the next iteration */ - if (port->ready_offset + (this->frame_count * this->codesize) >= d[0].chunk->size) - port->need_data = true; - if (written < 0 && written != -ENOSPC) { spa_list_remove(&b->link); SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); this->port.io->buffer_id = b->id; - spa_log_trace(this->log, NAME " %p: error %s, reuse buffer %u", + spa_log_warn(this->log, NAME " %p: error %s, reuse buffer %u", this, spa_strerror(written), b->id); spa_node_call_reuse_buffer(&this->callbacks, 0, b->id); port->ready_offset = 0; @@ -621,16 +568,14 @@ again: spa_log_trace(this->log, NAME " %p: written %u frames", this, total_frames); } - written = flush_buffer(this, false); + written = flush_buffer(this, true); if (written == -EAGAIN) { - spa_log_trace(this->log, NAME" %p: delay flush %"PRIu64, this, this->sample_time); - if ((this->flush_source.mask & SPA_IO_OUT) == 0) { - this->flush_source.mask = SPA_IO_OUT; - spa_loop_update_source(this->data_loop, &this->flush_source); - this->source.mask = 0; - spa_loop_update_source(this->data_loop, &this->source); - return 0; + spa_log_trace(this->log, NAME" %p: delay flush", this); + if (now_time - this->last_error > SPA_NSEC_PER_SEC / 2) { + reduce_bitpool(this); + this->last_error = now_time; } + enable_flush(this, true); } else if (written < 0) { spa_log_trace(this->log, NAME" %p: error flushing %s", this, @@ -638,62 +583,14 @@ again: return written; } else if (written > 0) { - if (now_time - this->last_error > SPA_NSEC_PER_SEC * 3) { + if (now_time - this->last_error > SPA_NSEC_PER_SEC) { increase_bitpool(this); this->last_error = now_time; } if (!spa_list_is_empty(&port->ready)) goto again; - } - this->flush_source.mask = 0; - spa_loop_update_source(this->data_loop, &this->flush_source); - - if (now_time > this->start_time) - elapsed = now_time - this->start_time; - else - elapsed = 0; - - elapsed = elapsed * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC; - - queued = this->sample_time - elapsed; - - spa_log_trace(this->log, NAME" %p: %"PRIu64" %"PRIi64" %"PRIu64" %"PRIu64" %d", this, - now_time, queued, this->sample_time, elapsed, this->write_samples); - - if (!this->following) { - if (queued < FILL_FRAMES * this->write_samples) { - queued = (FILL_FRAMES + 1) * this->write_samples; - if (this->sample_time < elapsed) { - this->sample_time = queued; - this->start_time = now_time; - } - if (!spa_list_is_empty(&port->ready) && - now_time - this->last_error > SPA_NSEC_PER_SEC / 2) { - reduce_bitpool(this); - this->last_error = now_time; - } - } - calc_timeout(queued, - FILL_FRAMES * this->write_samples, - port->current_format.info.raw.rate, - &this->now, &ts.it_value); - ts.it_interval.tv_sec = 0; - ts.it_interval.tv_nsec = 0; - spa_system_timerfd_settime(this->data_system, this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); - this->source.mask = SPA_IO_IN; - spa_loop_update_source(this->data_loop, &this->source); - - if (this->clock) { - this->clock->nsec = now_time; - this->clock->position = this->sample_count; - this->clock->delay = queued; - this->clock->rate_diff = 1.0f; - this->clock->next_nsec = SPA_TIMESPEC_TO_NSEC(&ts.it_value); - } - } else { - this->start_time = now_time; - this->sample_time = 0; + enable_flush(this, false); } return 0; } @@ -701,60 +598,62 @@ again: static void a2dp_on_flush(struct spa_source *source) { struct impl *this = source->data; - uint64_t now_time; spa_log_trace(this->log, NAME" %p: flushing", this); - if ((source->rmask & SPA_IO_OUT) == 0) { + if (!SPA_FLAG_IS_SET(source->rmask, SPA_IO_OUT)) { spa_log_warn(this->log, NAME" %p: error %d", this, source->rmask); if (this->flush_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_source); - this->source.mask = 0; - spa_loop_update_source(this->data_loop, &this->source); return; } - - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); - now_time = this->now.tv_sec * SPA_NSEC_PER_SEC + this->now.tv_nsec; - - flush_data(this, now_time); + flush_data(this, this->current_time); } static void a2dp_on_timeout(struct spa_source *source) { struct impl *this = source->data; struct port *port = &this->port; - int err; - uint64_t exp, now_time; + uint64_t exp, duration; + uint32_t rate; struct spa_io_buffers *io = port->io; + uint64_t prev_time, now_time; if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0) spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); - now_time = SPA_TIMESPEC_TO_NSEC(&this->now); + prev_time = this->current_time; + now_time = this->current_time = this->next_time; - spa_log_trace(this->log, NAME" %p: timeout %"PRIu64" %"PRIu64"", this, - now_time, now_time - this->last_time); - this->last_time = now_time; - - if (this->start_time == 0) { - if ((err = fill_socket(this, now_time)) < 0) - spa_log_error(this->log, "error fill socket %s", spa_strerror(err)); - this->start_time = now_time; + if (SPA_LIKELY(this->position)) { + duration = this->position->clock.duration; + rate = this->position->clock.rate.denom; + } else { + duration = 1024; + rate = 48000; } - if (spa_list_is_empty(&port->ready) || port->need_data) { - spa_log_trace(this->log, NAME " %p: %d", this, io->status); + this->next_time = now_time + duration * SPA_NSEC_PER_SEC / rate; - io->status = SPA_STATUS_NEED_DATA; - - spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); + if (SPA_LIKELY(this->clock)) { + this->clock->nsec = now_time; + this->clock->position += duration; + this->clock->position = duration; + this->clock->delay = 0; + this->clock->rate_diff = 1.0f; + this->clock->next_nsec = this->next_time; } - flush_data(this, now_time); + + spa_log_debug(this->log, NAME" %p: timeout %"PRIu64" %"PRIu64"", this, + now_time, now_time - prev_time); + + spa_log_trace(this->log, NAME " %p: %d", this, io->status); + io->status = SPA_STATUS_NEED_DATA; + spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); + + set_timeout(this, this->next_time); } - static int init_sbc(struct impl *this) { struct spa_bt_transport *transport = this->transport; @@ -975,6 +874,7 @@ static const struct spa_dict_item node_info_items[] = { { SPA_KEY_DEVICE_API, "bluez5" }, { SPA_KEY_MEDIA_CLASS, "Audio/Sink" }, { SPA_KEY_NODE_DRIVER, "true" }, + { SPA_KEY_NODE_LATENCY, "512/48000" }, }; static void emit_node_info(struct impl *this, bool full) @@ -1344,7 +1244,6 @@ static int impl_node_process(void *object) struct impl *this = object; struct port *port; struct spa_io_buffers *io; - uint64_t now_time; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -1352,12 +1251,6 @@ static int impl_node_process(void *object) io = port->io; spa_return_val_if_fail(io != NULL, -EIO); - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); - now_time = SPA_TIMESPEC_TO_NSEC(&this->now); - - if (!spa_list_is_empty(&port->ready)) - flush_data(this, now_time); - if (io->status == SPA_STATUS_HAVE_DATA && io->buffer_id < port->n_buffers) { struct buffer *b = &port->buffers[io->buffer_id]; @@ -1371,12 +1264,13 @@ static int impl_node_process(void *object) spa_list_append(&port->ready, &b->link); SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT); - port->need_data = false; - - flush_data(this, now_time); + io->buffer_id = SPA_ID_INVALID; io->status = SPA_STATUS_OK; } + if (!spa_list_is_empty(&port->ready)) + flush_data(this, this->current_time); + return SPA_STATUS_HAVE_DATA; }