a2dp-sink: fix playback

Use a simple periodic timeout to pull in samples and write them to the
socket
This commit is contained in:
Wim Taymans 2020-10-16 13:13:04 +02:00
parent 1b7d052098
commit 3be0e7e1c6

View file

@ -59,7 +59,7 @@ struct props {
}; };
#define FILL_FRAMES 2 #define FILL_FRAMES 2
#define MAX_FRAME_COUNT 32 #define MAX_FRAME_COUNT 16
#define MAX_BUFFERS 32 #define MAX_BUFFERS 32
struct buffer { struct buffer {
@ -88,7 +88,6 @@ struct port {
struct spa_list ready; struct spa_list ready;
size_t ready_offset; size_t ready_offset;
unsigned int need_data:1;
}; };
struct impl { struct impl {
@ -122,6 +121,10 @@ struct impl {
struct spa_io_clock *clock; struct spa_io_clock *clock;
struct spa_io_position *position; struct spa_io_position *position;
uint64_t current_time;
uint64_t next_time;
uint64_t last_error;
sbc_t sbc; sbc_t sbc;
int read_size; int read_size;
int write_size; int write_size;
@ -133,23 +136,12 @@ struct impl {
int frame_count; int frame_count;
uint16_t seqnum; uint16_t seqnum;
uint32_t timestamp; uint32_t timestamp;
uint64_t sample_count;
uint8_t tmp_buffer[512]; uint8_t tmp_buffer[512];
int tmp_buffer_used; int tmp_buffer_used;
int min_bitpool; int min_bitpool;
int max_bitpool; int max_bitpool;
uint64_t last_time;
uint64_t last_error;
struct timespec now;
uint64_t start_time;
uint64_t sample_count;
uint64_t sample_time;
uint64_t last_ticks;
uint64_t last_monotonic;
uint64_t underrun;
}; };
#define NAME "a2dp-sink" #define NAME "a2dp-sink"
@ -242,24 +234,25 @@ static int impl_node_enum_params(void *object, int seq,
return 0; return 0;
} }
static int set_timers(struct impl *this) static int set_timeout(struct impl *this, uint64_t time)
{ {
struct itimerspec ts; struct itimerspec ts;
int res; ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC;
ts.it_value.tv_sec = 0;
if (this->following) {
ts.it_value.tv_nsec = 0;
} else {
ts.it_value.tv_nsec = 1;
}
ts.it_interval.tv_sec = 0; ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0; ts.it_interval.tv_nsec = 0;
return spa_system_timerfd_settime(this->data_system,
this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
}
res = spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL); static int set_timers(struct impl *this)
this->source.mask = SPA_IO_IN; {
spa_loop_update_source(this->data_loop, &this->source); struct timespec now;
return res;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
this->next_time = SPA_TIMESPEC_TO_NSEC(&now);
return set_timeout(this, this->following ? 0 : this->next_time);
} }
static int do_reassign_follower(struct spa_loop *loop, static int do_reassign_follower(struct spa_loop *loop,
@ -335,21 +328,6 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
return 0; return 0;
} }
static inline void calc_timeout(size_t target, size_t current,
size_t rate, struct timespec *now,
struct timespec *ts)
{
ts->tv_sec = now->tv_sec;
ts->tv_nsec = now->tv_nsec;
if (target > current)
ts->tv_nsec += ((target - current) * SPA_NSEC_PER_SEC) / rate;
while (ts->tv_nsec >= SPA_NSEC_PER_SEC) {
ts->tv_sec++;
ts->tv_nsec -= SPA_NSEC_PER_SEC;
}
}
static int reset_buffer(struct impl *this) static int reset_buffer(struct impl *this)
{ {
this->buffer_used = sizeof(struct rtp_header) + sizeof(struct rtp_payload); this->buffer_used = sizeof(struct rtp_header) + sizeof(struct rtp_payload);
@ -359,7 +337,7 @@ static int reset_buffer(struct impl *this)
static int send_buffer(struct impl *this) static int send_buffer(struct impl *this)
{ {
int val, written; int written;
struct rtp_header *header; struct rtp_header *header;
struct rtp_payload *payload; struct rtp_payload *payload;
@ -376,14 +354,11 @@ static int send_buffer(struct impl *this)
header->timestamp = htonl(this->timestamp); header->timestamp = htonl(this->timestamp);
header->ssrc = htonl(1); header->ssrc = htonl(1);
ioctl(this->transport->fd, TIOCOUTQ, &val); spa_log_trace(this->log, NAME " %p: send %d %u %u %u",
this, this->frame_count, this->seqnum, this->timestamp, this->buffer_used);
spa_log_trace(this->log, NAME " %p: send %d %u %u %u %"PRIu64" %d", written = send(this->transport->fd, this->buffer, this->buffer_used, MSG_DONTWAIT | MSG_NOSIGNAL);
this, this->frame_count, this->seqnum, this->timestamp, this->buffer_used, spa_log_debug(this->log, NAME " %p: send %d", this, written);
this->sample_time, val);
written = write(this->transport->fd, this->buffer, this->buffer_used);
spa_log_trace(this->log, NAME " %p: send %d", this, written);
if (written < 0) if (written < 0)
return -errno; return -errno;
@ -428,7 +403,6 @@ static int encode_buffer(struct impl *this, const void *data, int size)
return processed; return processed;
this->sample_count += processed / port->frame_size; this->sample_count += processed / port->frame_size;
this->sample_time += processed / port->frame_size;
this->frame_count += processed / this->codesize; this->frame_count += processed / this->codesize;
this->buffer_used += out_encoded; this->buffer_used += out_encoded;
@ -439,7 +413,6 @@ static int encode_buffer(struct impl *this, const void *data, int size)
processed = this->tmp_buffer_used; processed = this->tmp_buffer_used;
this->tmp_buffer_used = 0; this->tmp_buffer_used = 0;
} }
return processed; return processed;
} }
@ -460,34 +433,6 @@ static int flush_buffer(struct impl *this, bool force)
return 0; return 0;
} }
static int fill_socket(struct impl *this, uint64_t now_time)
{
static const uint8_t zero_buffer[1024 * 4] = { 0, };
int frames = 0;
while (frames < FILL_FRAMES) {
int processed, written;
processed = encode_buffer(this, zero_buffer, sizeof(zero_buffer));
if (processed < 0)
return processed;
if (processed == 0)
break;
written = flush_buffer(this, false);
if (written == -EAGAIN)
break;
else if (written < 0)
return written;
else if (written > 0)
frames++;
}
reset_buffer(this);
this->sample_count = this->timestamp;
return 0;
}
static int add_data(struct impl *this, const void *data, int size) static int add_data(struct impl *this, const void *data, int size)
{ {
int processed, total = 0; int processed, total = 0;
@ -521,8 +466,6 @@ static int set_bitpool(struct impl *this, int bitpool)
this->sbc.bitpool = bitpool; this->sbc.bitpool = bitpool;
spa_log_debug(this->log, NAME" %p: set bitpool %d", this, this->sbc.bitpool);
this->codesize = sbc_get_codesize(&this->sbc); this->codesize = sbc_get_codesize(&this->sbc);
/* make sure there's enough space in this->tmp_buffer */ /* make sure there's enough space in this->tmp_buffer */
spa_assert(this->codesize <= 512); spa_assert(this->codesize <= 512);
@ -536,6 +479,9 @@ static int set_bitpool(struct impl *this, int bitpool)
this->write_samples = (this->write_size / this->frame_length) * this->write_samples = (this->write_size / this->frame_length) *
(this->codesize / port->frame_size); (this->codesize / port->frame_size);
spa_log_info(this->log, NAME" %p: set bitpool %d codesize:%u frame_length:%u",
this, this->sbc.bitpool, this->codesize, this->frame_length);
return 0; return 0;
} }
@ -549,13 +495,18 @@ static int increase_bitpool(struct impl *this)
return set_bitpool(this, this->sbc.bitpool + 1); return set_bitpool(this, this->sbc.bitpool + 1);
} }
static void enable_flush(struct impl *this, bool enabled)
{
if (SPA_FLAG_IS_SET(this->flush_source.mask, SPA_IO_OUT) != enabled) {
SPA_FLAG_UPDATE(this->flush_source.mask, SPA_IO_OUT, enabled);
spa_loop_update_source(this->data_loop, &this->flush_source);
}
}
static int flush_data(struct impl *this, uint64_t now_time) static int flush_data(struct impl *this, uint64_t now_time)
{ {
int written; int written;
uint32_t total_frames; uint32_t total_frames;
uint64_t elapsed;
int64_t queued;
struct itimerspec ts;
struct port *port = &this->port; struct port *port = &this->port;
total_frames = 0; total_frames = 0;
@ -587,15 +538,11 @@ again:
if (written > 0 && l1 > 0) if (written > 0 && l1 > 0)
written += add_data(this, src, l1); written += add_data(this, src, l1);
if (written <= 0) { if (written <= 0) {
/* only request new data when the current buffer will be fully processed in the next iteration */
if (port->ready_offset + (this->frame_count * this->codesize) >= d[0].chunk->size)
port->need_data = true;
if (written < 0 && written != -ENOSPC) { if (written < 0 && written != -ENOSPC) {
spa_list_remove(&b->link); spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
this->port.io->buffer_id = b->id; this->port.io->buffer_id = b->id;
spa_log_trace(this->log, NAME " %p: error %s, reuse buffer %u", spa_log_warn(this->log, NAME " %p: error %s, reuse buffer %u",
this, spa_strerror(written), b->id); this, spa_strerror(written), b->id);
spa_node_call_reuse_buffer(&this->callbacks, 0, b->id); spa_node_call_reuse_buffer(&this->callbacks, 0, b->id);
port->ready_offset = 0; port->ready_offset = 0;
@ -621,16 +568,14 @@ again:
spa_log_trace(this->log, NAME " %p: written %u frames", this, total_frames); spa_log_trace(this->log, NAME " %p: written %u frames", this, total_frames);
} }
written = flush_buffer(this, false); written = flush_buffer(this, true);
if (written == -EAGAIN) { if (written == -EAGAIN) {
spa_log_trace(this->log, NAME" %p: delay flush %"PRIu64, this, this->sample_time); spa_log_trace(this->log, NAME" %p: delay flush", this);
if ((this->flush_source.mask & SPA_IO_OUT) == 0) { if (now_time - this->last_error > SPA_NSEC_PER_SEC / 2) {
this->flush_source.mask = SPA_IO_OUT; reduce_bitpool(this);
spa_loop_update_source(this->data_loop, &this->flush_source); this->last_error = now_time;
this->source.mask = 0;
spa_loop_update_source(this->data_loop, &this->source);
return 0;
} }
enable_flush(this, true);
} }
else if (written < 0) { else if (written < 0) {
spa_log_trace(this->log, NAME" %p: error flushing %s", this, spa_log_trace(this->log, NAME" %p: error flushing %s", this,
@ -638,62 +583,14 @@ again:
return written; return written;
} }
else if (written > 0) { else if (written > 0) {
if (now_time - this->last_error > SPA_NSEC_PER_SEC * 3) { if (now_time - this->last_error > SPA_NSEC_PER_SEC) {
increase_bitpool(this); increase_bitpool(this);
this->last_error = now_time; this->last_error = now_time;
} }
if (!spa_list_is_empty(&port->ready)) if (!spa_list_is_empty(&port->ready))
goto again; goto again;
}
this->flush_source.mask = 0; enable_flush(this, false);
spa_loop_update_source(this->data_loop, &this->flush_source);
if (now_time > this->start_time)
elapsed = now_time - this->start_time;
else
elapsed = 0;
elapsed = elapsed * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC;
queued = this->sample_time - elapsed;
spa_log_trace(this->log, NAME" %p: %"PRIu64" %"PRIi64" %"PRIu64" %"PRIu64" %d", this,
now_time, queued, this->sample_time, elapsed, this->write_samples);
if (!this->following) {
if (queued < FILL_FRAMES * this->write_samples) {
queued = (FILL_FRAMES + 1) * this->write_samples;
if (this->sample_time < elapsed) {
this->sample_time = queued;
this->start_time = now_time;
}
if (!spa_list_is_empty(&port->ready) &&
now_time - this->last_error > SPA_NSEC_PER_SEC / 2) {
reduce_bitpool(this);
this->last_error = now_time;
}
}
calc_timeout(queued,
FILL_FRAMES * this->write_samples,
port->current_format.info.raw.rate,
&this->now, &ts.it_value);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
this->source.mask = SPA_IO_IN;
spa_loop_update_source(this->data_loop, &this->source);
if (this->clock) {
this->clock->nsec = now_time;
this->clock->position = this->sample_count;
this->clock->delay = queued;
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = SPA_TIMESPEC_TO_NSEC(&ts.it_value);
}
} else {
this->start_time = now_time;
this->sample_time = 0;
} }
return 0; return 0;
} }
@ -701,60 +598,62 @@ again:
static void a2dp_on_flush(struct spa_source *source) static void a2dp_on_flush(struct spa_source *source)
{ {
struct impl *this = source->data; struct impl *this = source->data;
uint64_t now_time;
spa_log_trace(this->log, NAME" %p: flushing", this); spa_log_trace(this->log, NAME" %p: flushing", this);
if ((source->rmask & SPA_IO_OUT) == 0) { if (!SPA_FLAG_IS_SET(source->rmask, SPA_IO_OUT)) {
spa_log_warn(this->log, NAME" %p: error %d", this, source->rmask); spa_log_warn(this->log, NAME" %p: error %d", this, source->rmask);
if (this->flush_source.loop) if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source); spa_loop_remove_source(this->data_loop, &this->flush_source);
this->source.mask = 0;
spa_loop_update_source(this->data_loop, &this->source);
return; return;
} }
flush_data(this, this->current_time);
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
now_time = this->now.tv_sec * SPA_NSEC_PER_SEC + this->now.tv_nsec;
flush_data(this, now_time);
} }
static void a2dp_on_timeout(struct spa_source *source) static void a2dp_on_timeout(struct spa_source *source)
{ {
struct impl *this = source->data; struct impl *this = source->data;
struct port *port = &this->port; struct port *port = &this->port;
int err; uint64_t exp, duration;
uint64_t exp, now_time; uint32_t rate;
struct spa_io_buffers *io = port->io; struct spa_io_buffers *io = port->io;
uint64_t prev_time, now_time;
if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0) if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); prev_time = this->current_time;
now_time = SPA_TIMESPEC_TO_NSEC(&this->now); now_time = this->current_time = this->next_time;
spa_log_trace(this->log, NAME" %p: timeout %"PRIu64" %"PRIu64"", this, if (SPA_LIKELY(this->position)) {
now_time, now_time - this->last_time); duration = this->position->clock.duration;
this->last_time = now_time; rate = this->position->clock.rate.denom;
} else {
if (this->start_time == 0) { duration = 1024;
if ((err = fill_socket(this, now_time)) < 0) rate = 48000;
spa_log_error(this->log, "error fill socket %s", spa_strerror(err));
this->start_time = now_time;
} }
if (spa_list_is_empty(&port->ready) || port->need_data) { this->next_time = now_time + duration * SPA_NSEC_PER_SEC / rate;
spa_log_trace(this->log, NAME " %p: %d", this, io->status);
io->status = SPA_STATUS_NEED_DATA; if (SPA_LIKELY(this->clock)) {
this->clock->nsec = now_time;
spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); this->clock->position += duration;
this->clock->position = duration;
this->clock->delay = 0;
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = this->next_time;
} }
flush_data(this, now_time);
spa_log_debug(this->log, NAME" %p: timeout %"PRIu64" %"PRIu64"", this,
now_time, now_time - prev_time);
spa_log_trace(this->log, NAME " %p: %d", this, io->status);
io->status = SPA_STATUS_NEED_DATA;
spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA);
set_timeout(this, this->next_time);
} }
static int init_sbc(struct impl *this) static int init_sbc(struct impl *this)
{ {
struct spa_bt_transport *transport = this->transport; struct spa_bt_transport *transport = this->transport;
@ -975,6 +874,7 @@ static const struct spa_dict_item node_info_items[] = {
{ SPA_KEY_DEVICE_API, "bluez5" }, { SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, "Audio/Sink" }, { SPA_KEY_MEDIA_CLASS, "Audio/Sink" },
{ SPA_KEY_NODE_DRIVER, "true" }, { SPA_KEY_NODE_DRIVER, "true" },
{ SPA_KEY_NODE_LATENCY, "512/48000" },
}; };
static void emit_node_info(struct impl *this, bool full) static void emit_node_info(struct impl *this, bool full)
@ -1344,7 +1244,6 @@ static int impl_node_process(void *object)
struct impl *this = object; struct impl *this = object;
struct port *port; struct port *port;
struct spa_io_buffers *io; struct spa_io_buffers *io;
uint64_t now_time;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
@ -1352,12 +1251,6 @@ static int impl_node_process(void *object)
io = port->io; io = port->io;
spa_return_val_if_fail(io != NULL, -EIO); spa_return_val_if_fail(io != NULL, -EIO);
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
now_time = SPA_TIMESPEC_TO_NSEC(&this->now);
if (!spa_list_is_empty(&port->ready))
flush_data(this, now_time);
if (io->status == SPA_STATUS_HAVE_DATA && io->buffer_id < port->n_buffers) { if (io->status == SPA_STATUS_HAVE_DATA && io->buffer_id < port->n_buffers) {
struct buffer *b = &port->buffers[io->buffer_id]; struct buffer *b = &port->buffers[io->buffer_id];
@ -1371,12 +1264,13 @@ static int impl_node_process(void *object)
spa_list_append(&port->ready, &b->link); spa_list_append(&port->ready, &b->link);
SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT); SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT);
port->need_data = false;
flush_data(this, now_time);
io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_OK; io->status = SPA_STATUS_OK;
} }
if (!spa_list_is_empty(&port->ready))
flush_data(this, this->current_time);
return SPA_STATUS_HAVE_DATA; return SPA_STATUS_HAVE_DATA;
} }