diff --git a/spa/plugins/bluez5/a2dp-source.c b/spa/plugins/bluez5/a2dp-source.c index e5618ee27..4fdffc2b8 100644 --- a/spa/plugins/bluez5/a2dp-source.c +++ b/spa/plugins/bluez5/a2dp-source.c @@ -60,18 +60,16 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.source.a2dp #undef SPA_LOG_TOPIC_DEFAULT #define SPA_LOG_TOPIC_DEFAULT &log_topic +#include "decode-buffer.h" + #define DEFAULT_CLOCK_NAME "clock.system.monotonic" struct props { - uint32_t min_latency; - uint32_t max_latency; char clock_name[64]; }; #define FILL_FRAMES 2 #define MAX_BUFFERS 32 -#define MIN_LATENCY 512 -#define MAX_LATENCY 1024 struct buffer { uint32_t id; @@ -89,6 +87,7 @@ struct port { uint64_t info_all; struct spa_port_info info; struct spa_io_buffers *io; + struct spa_io_rate_match *rate_match; struct spa_latency_info latency; #define IDX_EnumFormat 0 #define IDX_Meta 1 @@ -105,8 +104,7 @@ struct port { struct spa_list free; struct spa_list ready; - struct buffer *current_buffer; - uint32_t ready_offset; + struct spa_bt_decode_buffer buffer; }; struct impl { @@ -120,6 +118,8 @@ struct impl { struct spa_hook_list hooks; struct spa_callbacks callbacks; + uint32_t quantum_limit; + uint64_t info_all; struct spa_node_info info; #define IDX_PropInfo 0 @@ -137,6 +137,8 @@ struct impl { unsigned int started:1; unsigned int transport_acquired:1; unsigned int following:1; + unsigned int matching:1; + unsigned int resampling:1; unsigned int is_input:1; unsigned int is_duplex:1; @@ -144,9 +146,15 @@ struct impl { int fd; struct spa_source source; + struct spa_source timer_source; + int timerfd; + struct spa_io_clock *clock; struct spa_io_position *position; + uint64_t current_time; + uint64_t next_time; + const struct a2dp_codec *codec; bool codec_props_changed; void *codec_props; @@ -154,10 +162,8 @@ struct impl { struct spa_audio_info codec_format; uint8_t buffer_read[4096]; - uint8_t buffer_decoded[65536]; struct timespec now; uint64_t sample_count; - uint64_t skip_count; int duplex_timerfd; uint64_t duplex_timeout; @@ -165,13 +171,8 @@ struct impl { #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0) -static const uint32_t default_min_latency = MIN_LATENCY; -static const uint32_t default_max_latency = MAX_LATENCY; - static void reset_props(struct props *props) { - props->min_latency = default_min_latency; - props->max_latency = default_max_latency; strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name)); } @@ -200,43 +201,19 @@ static int impl_node_enum_params(void *object, int seq, switch (id) { case SPA_PARAM_PropInfo: { - struct props *p = &this->props; - switch (result.index) { - case 0: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency), - SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX)); - break; - case 1: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency), - SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX)); - break; default: enum_codec = true; - index_offset = 2; + index_offset = 0; } break; } case SPA_PARAM_Props: { - struct props *p = &this->props; - switch (result.index) { - case 0: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_Props, id, - SPA_PROP_minLatency, SPA_POD_Int(p->min_latency), - SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency)); - break; default: enum_codec = true; - index_offset = 1; + index_offset = 0; } break; } @@ -267,13 +244,38 @@ static int impl_node_enum_params(void *object, int seq, return 0; } -static int do_reassing_follower(struct spa_loop *loop, +static int set_timeout(struct impl *this, uint64_t time) +{ + struct itimerspec ts; + ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + return spa_system_timerfd_settime(this->data_system, + this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); +} + +static int set_timers(struct impl *this) +{ + struct timespec now; + + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + this->next_time = SPA_TIMESPEC_TO_NSEC(&now); + + return set_timeout(this, this->following ? 0 : this->next_time); +} + +static int do_reassign_follower(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { + struct impl *this = user_data; + struct port *port = &this->port; + + spa_bt_decode_buffer_recover(&port->buffer); return 0; } @@ -309,7 +311,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) if (this->started && following != this->following) { spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following); this->following = following; - spa_loop_invoke(this->data_loop, do_reassing_follower, 0, NULL, 0, true, this); + spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this); } return 0; } @@ -324,10 +326,7 @@ static int apply_props(struct impl *this, const struct spa_pod *param) if (param == NULL) { reset_props(&new_props); } else { - spa_pod_parse_object(param, - SPA_TYPE_OBJECT_Props, NULL, - SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency), - SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency)); + /* noop */ } changed = (memcmp(&new_props, &this->props, sizeof(struct props)) != 0); @@ -372,7 +371,6 @@ static void reset_buffers(struct port *port) spa_list_init(&port->free); spa_list_init(&port->ready); - port->current_buffer = NULL; for (i = 0; i < port->n_buffers; i++) { struct buffer *b = &port->buffers[i]; @@ -449,30 +447,15 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size, return dst_size - avail; } -static void skip_ready_buffers(struct impl *this) -{ - struct port *port = &this->port; - - /* Move all buffers from ready to free */ - while (!spa_list_is_empty(&port->ready)) { - struct buffer *b; - b = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&b->link); - spa_list_append(&port->free, &b->link); - spa_assert(!b->outstanding); - this->skip_count += b->buf->datas[0].chunk->size / port->frame_size; - } -} - static void a2dp_on_ready_read(struct spa_source *source) { struct impl *this = source->data; struct port *port = &this->port; - struct spa_io_buffers *io = port->io; - int32_t size_read, decoded, avail; - struct spa_data *datas; - struct buffer *buffer; - uint32_t min_data; + struct timespec now; + void *buf; + int32_t size_read, decoded; + uint32_t avail; + uint64_t dt; /* make sure the source is an input */ if ((source->rmask & SPA_IO_IN) == 0) { @@ -486,9 +469,6 @@ static void a2dp_on_ready_read(struct spa_source *source) spa_log_trace(this->log, "socket poll"); - /* update the current pts */ - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); - /* read */ size_read = read_data (this); if (size_read == 0) @@ -497,7 +477,9 @@ static void a2dp_on_ready_read(struct spa_source *source) spa_log_error(this->log, "failed to read data: %s", spa_strerror(size_read)); goto stop; } - spa_log_trace(this->log, "read socket data %d", size_read); + + /* update the current pts */ + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); if (this->codec_props_changed && this->codec_props && this->codec->update_props) { @@ -505,111 +487,33 @@ static void a2dp_on_ready_read(struct spa_source *source) this->codec_props_changed = false; } - /* decode */ - decoded = decode_data(this, this->buffer_read, size_read, - this->buffer_decoded, sizeof (this->buffer_decoded)); + /* decode to buffer */ + buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail); + spa_log_trace(this->log, "read socket data size:%d, avail:%d", size_read, avail); + decoded = decode_data(this, this->buffer_read, size_read, buf, avail); if (decoded < 0) { spa_log_debug(this->log, "failed to decode data: %d", decoded); return; } - if (decoded == 0) + if (decoded == 0) { + spa_log_trace(this->log, "no decoded socket data"); return; - - spa_log_trace(this->log, "decoded socket data %d", decoded); + } /* discard when not started */ if (!this->started) return; - /* get buffer */ - if (!port->current_buffer) { - if (spa_list_is_empty(&port->free)) { - /* xrun, skip ahead */ - skip_ready_buffers(this); - this->skip_count += decoded / port->frame_size; - this->sample_count += decoded / port->frame_size; - return; - } - if (this->skip_count > 0) { - spa_log_info(this->log, "%p: xrun, skipped %"PRIu64" usec", - this, (uint64_t)(this->skip_count * SPA_USEC_PER_SEC / port->current_format.info.raw.rate)); - this->skip_count = 0; - } + spa_bt_decode_buffer_write_packet(&port->buffer, decoded); - buffer = spa_list_first(&port->free, struct buffer, link); - spa_list_remove(&buffer->link); + dt = SPA_TIMESPEC_TO_NSEC(&this->now); + this->now = now; + dt = SPA_TIMESPEC_TO_NSEC(&this->now) - dt; - port->current_buffer = buffer; - port->ready_offset = 0; - spa_log_trace(this->log, "dequeue %d", buffer->id); + spa_log_trace(this->log, "decoded socket data size:%d frames:%d dt:%d dms", + (int)decoded, (int)decoded/port->frame_size, + (int)(dt / 100000)); - if (buffer->h) { - buffer->h->seq = this->sample_count; - buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); - buffer->h->dts_offset = 0; - } - } else { - buffer = port->current_buffer; - } - datas = buffer->buf->datas; - - /* copy data into buffer */ - avail = SPA_MIN(decoded, (int32_t)(datas[0].maxsize - port->ready_offset)); - if (avail < decoded) - spa_log_warn(this->log, "buffer too small (%d > %d)", decoded, avail); - memcpy ((uint8_t *)datas[0].data + port->ready_offset, this->buffer_decoded, avail); - port->ready_offset += avail; - this->sample_count += decoded / port->frame_size; - - /* send buffer if full */ - min_data = SPA_MIN(this->props.min_latency * port->frame_size, datas[0].maxsize / 2); - if (port->ready_offset >= min_data) { - uint64_t sample_count; - - datas[0].chunk->offset = 0; - datas[0].chunk->size = port->ready_offset; - datas[0].chunk->stride = port->frame_size; - - sample_count = datas[0].chunk->size / port->frame_size; - - spa_log_trace(this->log, "queue %d", buffer->id); - spa_list_append(&port->ready, &buffer->link); - port->current_buffer = NULL; - - if (!this->following && this->clock) { - this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now); - this->clock->duration = sample_count * this->clock->rate.denom / port->current_format.info.raw.rate; - this->clock->position = this->sample_count * this->clock->rate.denom / port->current_format.info.raw.rate; - this->clock->delay = 0; - this->clock->rate_diff = 1.0f; - this->clock->next_nsec = this->clock->nsec + (uint64_t)sample_count * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate; - } - } - - /* done if there are no buffers ready */ - if (spa_list_is_empty(&port->ready)) - return; - - if (this->following) - return; - - /* process the buffer if IO does not have any */ - if (io != NULL && io->status != SPA_STATUS_HAVE_DATA) { - struct buffer *b; - - if (io->buffer_id < port->n_buffers) - recycle_buffer(this, port, io->buffer_id); - - b = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&b->link); - b->outstanding = true; - - io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_DATA; - } - - /* notify ready */ - spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); return; stop: @@ -641,6 +545,75 @@ static void a2dp_on_duplex_timeout(struct spa_source *source) a2dp_on_ready_read(source); } +static int setup_matching(struct impl *this) +{ + struct port *port = &this->port; + + if (this->position && port->rate_match) { + port->rate_match->rate = 1 / port->buffer.corr; + + this->matching = this->following; + this->resampling = this->matching || + (port->current_format.info.raw.rate != this->position->clock.rate.denom); + } else { + this->matching = false; + this->resampling = false; + } + + if (port->rate_match) + SPA_FLAG_UPDATE(port->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE, this->matching); + + return 0; +} + +static void a2dp_on_timeout(struct spa_source *source) +{ + struct impl *this = source->data; + struct port *port = &this->port; + uint64_t exp, duration; + uint32_t rate; + struct spa_io_buffers *io = port->io; + uint64_t prev_time, now_time; + + if (this->transport == NULL) + return; + + if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0) + spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); + + prev_time = this->current_time; + now_time = this->current_time = this->next_time; + + spa_log_trace(this->log, "%p: timer %"PRIu64" %"PRIu64"", this, + now_time, now_time - prev_time); + + if (SPA_LIKELY(this->position)) { + duration = this->position->clock.duration; + rate = this->position->clock.rate.denom; + } else { + duration = 1024; + rate = 48000; + } + + setup_matching(this); + + this->next_time = now_time + duration * SPA_NSEC_PER_SEC / port->buffer.corr / rate; + + if (SPA_LIKELY(this->clock)) { + this->clock->nsec = now_time; + this->clock->position += duration; + this->clock->duration = duration; + this->clock->rate_diff = port->buffer.corr; + this->clock->next_nsec = this->next_time; + } + + spa_log_trace(this->log, "%p: %d", this, io->status); + io->status = SPA_STATUS_HAVE_DATA; + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); + + set_timeout(this, this->next_time); +} + static int transport_start(struct impl *this) { int res, val; @@ -683,7 +656,13 @@ static int transport_start(struct impl *this) if (setsockopt(this->transport->fd, SOL_SOCKET, SO_PRIORITY, &val, sizeof(val)) < 0) spa_log_warn(this->log, "SO_PRIORITY failed: %m"); - reset_buffers(&this->port); + reset_buffers(port); + + spa_bt_decode_buffer_clear(&port->buffer); + if ((res = spa_bt_decode_buffer_init(&port->buffer, this->log, + port->frame_size, port->current_format.info.raw.rate, + this->quantum_limit, this->quantum_limit)) < 0) + return res; this->fd = this->transport->fd; @@ -716,8 +695,18 @@ static int transport_start(struct impl *this) set_duplex_timeout(this, this->duplex_timeout); } + this->timer_source.data = this; + this->timer_source.fd = this->timerfd; + this->timer_source.func = a2dp_on_timeout; + this->timer_source.mask = SPA_IO_IN; + this->timer_source.rmask = 0; + spa_loop_add_source(this->data_loop, &this->timer_source); + this->sample_count = 0; - this->skip_count = 0; + + setup_matching(this); + + set_timers(this); return 0; } @@ -753,6 +742,7 @@ static int do_remove_source(struct spa_loop *loop, void *user_data) { struct impl *this = user_data; + struct itimerspec ts; spa_log_debug(this->log, "%p: remove source", this); @@ -761,11 +751,20 @@ static int do_remove_source(struct spa_loop *loop, if (this->source.loop) spa_loop_remove_source(this->data_loop, &this->source); + if (this->timer_source.loop) + spa_loop_remove_source(this->data_loop, &this->timer_source); + ts.it_value.tv_sec = 0; + ts.it_value.tv_nsec = 0; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL); + return 0; } static int transport_stop(struct impl *this) { + struct port *port = &this->port; int res; spa_log_debug(this->log, "%p: transport stop", this); @@ -783,6 +782,8 @@ static int transport_stop(struct impl *this) this->codec->deinit(this->codec_data); this->codec_data = NULL; + spa_bt_decode_buffer_clear(&port->buffer); + return res; } @@ -836,24 +837,20 @@ static int impl_node_send_command(void *object, const struct spa_command *comman static void emit_node_info(struct impl *this, bool full) { - char latency[64] = SPA_STRINGIFY(MIN_LATENCY)"/48000"; uint64_t old = full ? this->info.change_mask : 0; struct spa_dict_item node_info_items[] = { { SPA_KEY_DEVICE_API, "bluez5" }, { SPA_KEY_MEDIA_CLASS, this->is_input ? "Audio/Source" : "Stream/Output/Audio" }, - { SPA_KEY_NODE_LATENCY, latency }, + { SPA_KEY_NODE_LATENCY, this->is_input ? "" : "512/48000" }, { "media.name", ((this->transport && this->transport->device->name) ? - this->transport->device->name : "A2DP") }, - { SPA_KEY_NODE_DRIVER, this->is_input ? "true" : "false" }, + this->transport->device->name : "A2DP") }, + { SPA_KEY_NODE_DRIVER, this->is_input ? "true" : "false" }, }; if (full) this->info.change_mask = this->info_all; if (this->info.change_mask) { - if (this->transport && this->port.have_format) - snprintf(latency, sizeof(latency), "%d/%d", (int)this->props.min_latency, - (int)this->port.current_format.info.raw.rate); this->info.props = &SPA_DICT_INIT_ARRAY(node_info_items); spa_node_emit_info(&this->hooks, &this->info); this->info.change_mask = old; @@ -991,11 +988,11 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( - this->props.max_latency * port->frame_size, - this->props.min_latency * port->frame_size, + this->quantum_limit * port->frame_size, + 16 * port->frame_size, INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size)); break; @@ -1021,6 +1018,12 @@ impl_node_port_enum_params(void *object, int seq, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); break; + case 1: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match))); + break; default: return 0; } @@ -1059,7 +1062,6 @@ static int clear_buffers(struct impl *this, struct port *port) spa_list_init(&port->ready); port->n_buffers = 0; } - port->current_buffer = NULL; return 0; } @@ -1217,6 +1219,9 @@ impl_node_port_set_io(void *object, case SPA_IO_Buffers: port->io = data; break; + case SPA_IO_RateMatch: + port->rate_match = data; + break; default: return -ENOENT; } @@ -1244,6 +1249,88 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t return 0; } +static uint32_t get_samples(struct impl *this, uint32_t *duration) +{ + struct port *port = &this->port; + uint32_t samples; + + if (SPA_LIKELY(port->rate_match) && this->resampling) { + samples = port->rate_match->size; + } else { + if (SPA_LIKELY(this->position)) + samples = this->position->clock.duration * port->current_format.info.raw.rate + / this->position->clock.rate.denom; + else + samples = 1024; + } + + if (SPA_LIKELY(this->position)) + *duration = this->position->clock.duration * port->current_format.info.raw.rate + / this->position->clock.rate.denom; + else if (SPA_LIKELY(this->clock)) + *duration = this->clock->duration * port->current_format.info.raw.rate + / this->clock->rate.denom; + else + *duration = 1024 * port->current_format.info.raw.rate / 48000; + + return samples; +} + +static void process_buffering(struct impl *this) +{ + struct port *port = &this->port; + uint32_t duration; + const uint32_t samples = get_samples(this, &duration); + uint32_t avail; + void *buf; + + spa_bt_decode_buffer_process(&port->buffer, samples, duration); + + setup_matching(this); + + buf = spa_bt_decode_buffer_get_read(&port->buffer, &avail); + + /* copy data to buffers */ + if (!spa_list_is_empty(&port->free) && avail > 0) { + struct buffer *buffer; + struct spa_data *datas; + uint32_t data_size; + + data_size = samples * port->frame_size; + + avail = SPA_MIN(avail, data_size); + + spa_bt_decode_buffer_read(&port->buffer, avail); + + buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&buffer->link); + + spa_log_trace(this->log, "dequeue %d", buffer->id); + + if (buffer->h) { + buffer->h->seq = this->sample_count; + buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); + buffer->h->dts_offset = 0; + } + + datas = buffer->buf->datas; + + spa_assert(datas[0].maxsize >= data_size); + + datas[0].chunk->offset = 0; + datas[0].chunk->size = avail; + datas[0].chunk->stride = port->frame_size; + + memcpy(datas[0].data, buf, avail); + + this->sample_count += avail / port->frame_size; + + /* ready buffer if full */ + spa_log_trace(this->log, "queue %d frames:%d", buffer->id, (int)avail / port->frame_size); + spa_list_append(&port->ready, &buffer->link); + } +} + static int impl_node_process(void *object) { struct impl *this = object; @@ -1269,6 +1356,9 @@ static int impl_node_process(void *object) io->buffer_id = SPA_ID_INVALID; } + /* Handle buffering delay */ + process_buffering(this); + /* Return if there are no buffers ready to be processed */ if (spa_list_is_empty(&port->ready)) return SPA_STATUS_OK; @@ -1350,16 +1440,19 @@ static int impl_get_interface(struct spa_handle *handle, const char *type, void static int impl_clear(struct spa_handle *handle) { struct impl *this = (struct impl *) handle; + struct port *port = &this->port; if (this->codec_data) this->codec->deinit(this->codec_data); if (this->codec_props && this->codec->clear_props) this->codec->clear_props(this->codec_props); if (this->transport) spa_hook_remove(&this->transport_listener); + spa_system_close(this->data_system, this->timerfd); if (this->duplex_timerfd >= 0) { spa_system_close(this->data_system, this->duplex_timerfd); this->duplex_timerfd = -1; } + spa_bt_decode_buffer_clear(&port->buffer); return 0; } @@ -1451,7 +1544,11 @@ impl_init(const struct spa_handle_factory *factory, spa_list_init(&port->ready); spa_list_init(&port->free); + this->quantum_limit = 8192; + if (info != NULL) { + if (info && (str = spa_dict_lookup(info, "clock.quantum-limit"))) + spa_atou32(str, &this->quantum_limit, 0); if ((str = spa_dict_lookup(info, SPA_KEY_API_BLUEZ5_TRANSPORT)) != NULL) sscanf(str, "pointer:%p", &this->transport); if ((str = spa_dict_lookup(info, "bluez5.a2dp-source-role")) != NULL) @@ -1486,6 +1583,9 @@ impl_init(const struct spa_handle_factory *factory, spa_bt_transport_add_listener(this->transport, &this->transport_listener, &transport_events, this); + this->timerfd = spa_system_timerfd_create(this->data_system, + CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + if (this->is_duplex) { this->duplex_timerfd = spa_system_timerfd_create(this->data_system, CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); diff --git a/spa/plugins/bluez5/decode-buffer.h b/spa/plugins/bluez5/decode-buffer.h new file mode 100644 index 000000000..f2536fbf9 --- /dev/null +++ b/spa/plugins/bluez5/decode-buffer.h @@ -0,0 +1,381 @@ +/* Spa Bluez5 decode buffer + * + * Copyright © 2022 Pauli Virtanen + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * \file decode-buffer.h Buffering for Bluetooth sources + * + * A linear buffer, which is compacted when it gets half full. + * + * Also contains buffering logic, which calculates a rate correction + * factor to maintain the buffer level at the target value. + * + * Consider typical packet intervals with nominal frame duration + * of 10ms: + * + * ... 5ms | 5ms | 20ms | 5ms | 5ms | 20ms ... + * + * ... 3ms | 3ms | 4ms | 30ms | 3ms | 3ms | 4ms | 30ms ... + * + * plus random jitter; 10ms nominal may occasionally have 20+ms interval. + * The regular timer cycle cannot be aligned with this, so process() + * may occur at any time. + * + * The buffer level is the difference between the number of samples in + * buffer immediately after receiving a packet, and the samples consumed + * before receiving the next packet. + * + * The buffer level indicates how much any packet can be delayed without + * underrun. If it is positive, there are no underruns. + * + * The rate correction aims to maintain the average level at a safety margin. + */ + +#ifndef SPA_BLUEZ5_DECODE_BUFFER_H +#define SPA_BLUEZ5_DECODE_BUFFER_H + +#include +#include +#include +#include + +#define BUFFERING_LONG_MSEC 60000 +#define BUFFERING_SHORT_MSEC 1000 +#define BUFFERING_DLL_BW 0.03 +#define BUFFERING_RATE_DIFF_MAX 0.005 + +/** + * Safety margin. + * + * The spike is the long-window maximum difference + * between minimum and average buffer level. + */ +#define BUFFERING_TARGET(spike,packet_size) \ + SPA_CLAMP((spike)*3/2, (packet_size), 6*(packet_size)) + +/** Windowed min/max */ +struct spa_bt_ptp +{ + union { + int32_t min; + int32_t mins[4]; + }; + union { + int32_t max; + int32_t maxs[4]; + }; + uint32_t pos; + uint32_t period; +}; + +struct spa_bt_decode_buffer +{ + struct spa_log *log; + + uint32_t frame_size; + uint32_t rate; + + uint8_t *buffer_decoded; + uint32_t buffer_size; + uint32_t buffer_reserve; + uint32_t write_index; + uint32_t read_index; + + struct spa_bt_ptp spike; /**< spikes (long window) */ + struct spa_bt_ptp packet_size; /**< packet size (short window) */ + + int32_t target; + int32_t level; + double level_avg; + + struct spa_dll dll; + double corr; + + uint32_t prev_consumed; + uint32_t prev_avail; + uint32_t prev_duration; + uint32_t underrun; + uint32_t pos; + + uint8_t received:1; + uint8_t buffering:1; +}; + +static void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period) +{ + size_t i; + + spa_zero(*p); + for (i = 0; i < SPA_N_ELEMENTS(p->mins); ++i) { + p->mins[i] = INT32_MAX; + p->maxs[i] = INT32_MIN; + } + p->period = period; +} + +static void spa_bt_ptp_update(struct spa_bt_ptp *p, int32_t value, uint32_t duration) +{ + const size_t n = SPA_N_ELEMENTS(p->mins); + size_t i; + + for (i = 0; i < n; ++i) { + p->mins[i] = SPA_MIN(p->mins[i], value); + p->maxs[i] = SPA_MAX(p->maxs[i], value); + } + + p->pos += duration; + if (p->pos >= p->period / (n - 1)) { + p->pos = 0; + for (i = 1; i < SPA_N_ELEMENTS(p->mins); ++i) { + p->mins[i-1] = p->mins[i]; + p->maxs[i-1] = p->maxs[i]; + } + p->mins[n-1] = INT32_MAX; + p->maxs[n-1] = INT32_MIN; + } +} + +static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct spa_log *log, + uint32_t frame_size, uint32_t rate, uint32_t quantum_limit, uint32_t reserve) +{ + spa_zero(*this); + this->frame_size = frame_size; + this->rate = rate; + this->log = log; + this->buffer_reserve = this->frame_size * reserve; + this->buffer_size = this->frame_size * quantum_limit * 2; + this->buffer_size += this->buffer_reserve; + this->corr = 1.0; + this->buffering = true; + + spa_dll_init(&this->dll); + + spa_bt_ptp_init(&this->spike, (uint64_t)this->rate * BUFFERING_LONG_MSEC / 1000); + spa_bt_ptp_init(&this->packet_size, (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000); + + if ((this->buffer_decoded = malloc(this->buffer_size)) == NULL) { + this->buffer_size = 0; + return -ENOMEM; + } + return 0; +} + +static void spa_bt_decode_buffer_clear(struct spa_bt_decode_buffer *this) +{ + free(this->buffer_decoded); + spa_zero(*this); +} + +static void spa_bt_decode_buffer_compact(struct spa_bt_decode_buffer *this) +{ + uint32_t avail; + + spa_assert(this->read_index <= this->write_index); + + if (this->read_index == this->write_index) { + this->read_index = 0; + this->write_index = 0; + goto done; + } + + if (this->write_index > this->read_index + this->buffer_size - this->buffer_reserve) { + /* Drop data to keep buffer reserve free */ + spa_log_info(this->log, "%p buffer overrun: dropping data", this); + this->read_index = this->write_index + this->buffer_reserve - this->buffer_size; + } + + if (this->write_index < (this->buffer_size - this->buffer_reserve) / 2 + || this->read_index == 0) + goto done; + + avail = this->write_index - this->read_index; + spa_memmove(this->buffer_decoded, + SPA_PTROFF(this->buffer_decoded, this->read_index, void), + avail); + this->read_index = 0; + this->write_index = avail; + +done: + spa_assert(this->buffer_size - this->write_index >= this->buffer_reserve); +} + +static void *spa_bt_decode_buffer_get_write(struct spa_bt_decode_buffer *this, uint32_t *avail) +{ + spa_bt_decode_buffer_compact(this); + spa_assert(this->buffer_size >= this->write_index); + *avail = this->buffer_size - this->write_index; + return SPA_PTROFF(this->buffer_decoded, this->write_index, void); +} + +static void spa_bt_decode_buffer_write_packet(struct spa_bt_decode_buffer *this, uint32_t size) +{ + spa_assert(size % this->frame_size == 0); + this->write_index += size; + this->received = true; + spa_bt_ptp_update(&this->packet_size, size / this->frame_size, size / this->frame_size); +} + +static void *spa_bt_decode_buffer_get_read(struct spa_bt_decode_buffer *this, uint32_t *avail) +{ + spa_assert(this->write_index >= this->read_index); + if (!this->buffering) + *avail = this->write_index - this->read_index; + else + *avail = 0; + return SPA_PTROFF(this->buffer_decoded, this->read_index, void); +} + +static void spa_bt_decode_buffer_read(struct spa_bt_decode_buffer *this, uint32_t size) +{ + spa_assert(size % this->frame_size == 0); + this->read_index += size; +} + +static void spa_bt_decode_buffer_recover(struct spa_bt_decode_buffer *this) +{ + int32_t size = (this->write_index - this->read_index) / this->frame_size; + + this->prev_avail = size * this->frame_size; + this->prev_consumed = this->prev_duration; + this->level = (int32_t)this->prev_avail/this->frame_size + - (int32_t)this->prev_duration; + this->level_avg = this->level; + this->target = this->level; + this->corr = 1.0; + + spa_dll_init(&this->dll); +} + +static void spa_bt_decode_buffer_process(struct spa_bt_decode_buffer *this, uint32_t samples, uint32_t duration) +{ + const uint32_t data_size = samples * this->frame_size; + const int32_t max_level = SPA_MAX(8 * this->packet_size.max, (int32_t)duration); + uint32_t avail; + + if (SPA_UNLIKELY(duration != this->prev_duration)) { + this->prev_duration = duration; + spa_bt_decode_buffer_recover(this); + } + + if (SPA_UNLIKELY(this->buffering)) { + int32_t size = (this->write_index - this->read_index) / this->frame_size; + + this->corr = 1.0; + + spa_log_trace(this->log, "%p buffering size:%d", this, (int)size); + + if (this->received && + this->packet_size.max > 0 && + size >= SPA_MAX(3*this->packet_size.max, (int32_t)duration)) + this->buffering = false; + else + return; + + spa_bt_decode_buffer_recover(this); + } + + if (SPA_UNLIKELY(this->dll.bw == 0.0)) { + spa_log_trace(this->log, "%p dll reset duration:%d rate:%d", this, + (int)duration, (int)this->rate); + spa_dll_set_bw(&this->dll, BUFFERING_DLL_BW, duration, (uint64_t)this->rate); + } + + spa_bt_decode_buffer_get_read(this, &avail); + + if (this->received) { + const uint32_t avg_period = (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000; + int32_t level, target; + + /* Track buffer level */ + level = (int32_t)(this->prev_avail/this->frame_size) - (int32_t)this->prev_consumed; + level = SPA_MAX(level, -max_level); + this->prev_consumed = SPA_MIN(this->prev_consumed, avg_period); + + this->level_avg = ((double)this->prev_consumed*level + + ((double)avg_period - this->prev_consumed)*this->level_avg) / avg_period; + spa_bt_ptp_update(&this->spike, this->level_avg - level, this->prev_consumed); + + /* Update target level */ + target = BUFFERING_TARGET(this->spike.max, this->packet_size.max); + + if (level > SPA_MAX(4 * target, 2*(int32_t)duration) && + avail > data_size) { + /* Lagging too much: drop data */ + uint32_t size = SPA_MIN(avail - data_size, + (level - target*5/2) * this->frame_size); + + spa_bt_decode_buffer_read(this, size); + spa_log_trace(this->log, "%p overrun samples:%d level:%d target:%d", + this, (int)size/this->frame_size, + (int)level, (int)target); + + spa_bt_decode_buffer_recover(this); + } + + this->pos += this->prev_consumed; + if (this->pos > this->rate) { + spa_log_debug(this->log, + "%p avg:%d target:%d level:%d buffer:%d spike:%d corr:%f", + this, + (int)this->level_avg, + (int)target, + (int)level, + (int)(avail / this->frame_size), + (int)this->spike.max, + (double)this->corr); + this->pos = 0; + } + + spa_bt_decode_buffer_get_read(this, &avail); + + this->prev_consumed = 0; + this->prev_avail = avail; + this->underrun = 0; + this->received = false; + this->level = level; + this->target = target; + } + + this->corr = spa_dll_update(&this->dll, this->target - this->level); + + if (SPA_ABS(this->corr - 1.0) > BUFFERING_RATE_DIFF_MAX) { + spa_log_trace(this->log, "%p too big rate difference: clamp + reset", this); + spa_dll_init(&this->dll); + this->corr = SPA_CLAMP(this->corr, 1.0 - BUFFERING_RATE_DIFF_MAX, + 1.0 + BUFFERING_RATE_DIFF_MAX); + } + + if (avail < data_size) { + spa_log_trace(this->log, "%p underrun samples:%d", this, + (data_size - avail) / this->frame_size); + this->underrun += samples; + if (this->underrun >= SPA_MIN((uint32_t)max_level, this->buffer_size / this->frame_size)) { + this->buffering = true; + spa_log_debug(this->log, "%p underrun too much: start buffering", this); + } + } + + this->prev_consumed += samples; +} + +#endif