diff --git a/spa/plugins/bluez5/media-sink.c b/spa/plugins/bluez5/media-sink.c index 80a7e12ad..d07f66788 100644 --- a/spa/plugins/bluez5/media-sink.c +++ b/spa/plugins/bluez5/media-sink.c @@ -62,17 +62,14 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.sink.media" #define DEFAULT_CLOCK_NAME "clock.system.monotonic" struct props { - uint32_t min_latency; - uint32_t max_latency; int64_t latency_offset; char clock_name[64]; }; #define FILL_FRAMES 4 +#define MIN_BUFFERS 2 #define MAX_BUFFERS 32 -#define MIN_LATENCY 128 -#define MAX_LATENCY 8192 -#define BUFFER_SIZE (MAX_LATENCY*8) +#define BUFFER_SIZE (8192*8) struct buffer { uint32_t id; @@ -121,6 +118,8 @@ struct impl { struct spa_hook_list hooks; struct spa_callbacks callbacks; + uint32_t quantum_limit; + uint64_t info_all; struct spa_node_info info; #define IDX_PropInfo 0 @@ -137,6 +136,7 @@ struct impl { unsigned int started:1; unsigned int following:1; unsigned int is_output:1; + unsigned int flush_pending:1; unsigned int is_duplex:1; @@ -152,6 +152,10 @@ struct impl { uint64_t current_time; uint64_t next_time; uint64_t last_error; + uint64_t process_time; + + uint64_t prev_flush_time; + uint64_t next_flush_time; const struct media_codec *codec; bool codec_props_changed; @@ -161,30 +165,23 @@ struct impl { int need_flush; bool fragment; - uint64_t fragment_timeout; uint32_t block_size; uint8_t buffer[BUFFER_SIZE]; uint32_t buffer_used; uint32_t header_size; - uint32_t frame_count; + uint32_t block_count; uint16_t seqnum; uint32_t timestamp; uint64_t sample_count; uint8_t tmp_buffer[BUFFER_SIZE]; uint32_t tmp_buffer_used; uint32_t fd_buffer_size; - - /* Times */ - uint64_t start_time; - uint64_t total_samples; }; -#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0) +#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0) static void reset_props(struct impl *this, struct props *props) { - props->min_latency = MIN_LATENCY; - props->max_latency = MAX_LATENCY; props->latency_offset = 0; strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name)); } @@ -214,24 +211,8 @@ static int impl_node_enum_params(void *object, int seq, switch (id) { case SPA_PARAM_PropInfo: { - struct props *p = &this->props; - switch (result.index) { case 0: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency), - SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX)); - break; - case 1: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency), - SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX)); - break; - case 2: param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_PropInfo, id, SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_latencyOffsetNsec), @@ -240,7 +221,7 @@ static int impl_node_enum_params(void *object, int seq, break; default: enum_codec = true; - index_offset = 3; + index_offset = 1; } break; } @@ -252,8 +233,6 @@ static int impl_node_enum_params(void *object, int seq, case 0: param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_Props, id, - SPA_PROP_minLatency, SPA_POD_Int(p->min_latency), - SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency), SPA_PROP_latencyOffsetNsec, SPA_POD_Long(p->latency_offset)); break; default: @@ -391,8 +370,6 @@ static int apply_props(struct impl *this, const struct spa_pod *param) } else { spa_pod_parse_object(param, SPA_TYPE_OBJECT_Props, NULL, - SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency), - SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency), SPA_PROP_latencyOffsetNsec, SPA_POD_OPT_Long(&new_props.latency_offset)); } @@ -444,7 +421,7 @@ static int reset_buffer(struct impl *this) this->codec_props_changed = false; } this->need_flush = 0; - this->frame_count = 0; + this->block_count = 0; this->fragment = false; this->buffer_used = this->codec->start_encode(this->codec_data, this->buffer, sizeof(this->buffer), @@ -469,20 +446,32 @@ static int get_transport_unused_size(struct impl *this) static int send_buffer(struct impl *this) { int written, unsent; + unsent = get_transport_unused_size(this); if (unsent >= 0) { unsent = this->fd_buffer_size - unsent; this->codec->abr_process(this->codec_data, unsent); } - spa_log_trace(this->log, "%p: send %d %u %u %u %u", - this, this->frame_count, this->block_size, this->seqnum, - this->timestamp, this->buffer_used); - written = send(this->flush_source.fd, this->buffer, this->buffer_used, MSG_DONTWAIT | MSG_NOSIGNAL); - spa_log_trace(this->log, "%p: send %d", this, written); + if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, SPA_LOG_LEVEL_TRACE))) { + struct timespec ts; + uint64_t now; + uint64_t dt; + + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts); + now = SPA_TIMESPEC_TO_NSEC(&ts); + dt = now - this->prev_flush_time; + this->prev_flush_time = now; + + spa_log_trace(this->log, + "%p: send blocks:%d block:%u seq:%u ts:%u size:%u " + "wrote:%d dt:%"PRIu64, + this, this->block_count, this->block_size, this->seqnum, + this->timestamp, this->buffer_used, written, dt); + } if (written < 0) { spa_log_debug(this->log, "%p: %m", this); @@ -502,7 +491,7 @@ static int encode_buffer(struct impl *this, const void *data, uint32_t size) spa_log_trace(this->log, "%p: encode %d used %d, %d %d %d", this, size, this->buffer_used, port->frame_size, this->block_size, - this->frame_count); + this->block_count); if (this->need_flush) return 0; @@ -530,7 +519,7 @@ static int encode_buffer(struct impl *this, const void *data, uint32_t size) return processed; this->sample_count += processed / port->frame_size; - this->frame_count += processed / this->block_size; + this->block_count += processed / this->block_size; this->buffer_used += out_encoded; spa_log_trace(this->log, "%p: processed %d %zd used %d", @@ -551,7 +540,7 @@ static int encode_fragment(struct impl *this) spa_log_trace(this->log, "%p: encode fragment used %d, %d %d %d", this, this->buffer_used, port->frame_size, this->block_size, - this->frame_count); + this->block_count); if (this->need_flush) return 0; @@ -602,46 +591,41 @@ static int add_data(struct impl *this, const void *data, uint32_t size) return total; } -static void enable_flush(struct impl *this, bool enabled, uint64_t timeout) +static void enable_flush_timer(struct impl *this, bool enabled) { - bool flush_enabled = enabled && (timeout == 0); struct itimerspec ts; - if (SPA_FLAG_IS_SET(this->flush_source.mask, SPA_IO_OUT) != flush_enabled) { - SPA_FLAG_UPDATE(this->flush_source.mask, SPA_IO_OUT, flush_enabled); - spa_loop_update_source(this->data_loop, &this->flush_source); - } - if (!enabled) - timeout = 0; + this->next_flush_time = 0; - ts.it_value.tv_sec = timeout / SPA_NSEC_PER_SEC; - ts.it_value.tv_nsec = timeout % SPA_NSEC_PER_SEC; + ts.it_value.tv_sec = this->next_flush_time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = this->next_flush_time % SPA_NSEC_PER_SEC; ts.it_interval.tv_sec = 0; ts.it_interval.tv_nsec = 0; spa_system_timerfd_settime(this->data_system, - this->flush_timerfd, 0, &ts, NULL); + this->flush_timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); + + this->flush_pending = enabled; } -static uint64_t get_next_bap_timeout(struct impl *this) +static uint32_t get_queued_frames(struct impl *this) { struct port *port = &this->port; - uint64_t playback_time = 0, elapsed_time = 0, next_time = 0; - struct timespec now; - uint64_t now_time; + uint32_t bytes = 0; + struct buffer *b; - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); - now_time = SPA_TIMESPEC_TO_NSEC(&now); - if (this->start_time == 0) - this->start_time = now_time; + spa_list_for_each(b, &port->ready, link) { + struct spa_data *d = b->buf->datas; - playback_time = (this->total_samples * SPA_NSEC_PER_SEC) / port->current_format.info.raw.rate; - if (now_time > this->start_time) - elapsed_time = now_time - this->start_time; - if (elapsed_time < playback_time) - next_time = playback_time - elapsed_time; + bytes += d[0].chunk->size; + } - return next_time; + if (bytes > port->ready_offset) + bytes -= port->ready_offset; + else + bytes = 0; + + return bytes / port->frame_size; } static int flush_data(struct impl *this, uint64_t now_time) @@ -720,13 +704,17 @@ again: port->ready_offset = 0; } total_frames += n_frames; - this->total_samples += n_frames; spa_log_trace(this->log, "%p: written %u frames", this, total_frames); } if (written > 0 && this->buffer_used == this->header_size) { - enable_flush(this, false, 0); + enable_flush_timer(this, false); + return 0; + } + + if (this->flush_pending) { + spa_log_trace(this->log, "%p: wait for flush timer", this); return 0; } @@ -746,109 +734,95 @@ again: * glitch in any case. */ written = this->buffer_used; - reset_buffer(this); } if (written < 0) { spa_log_trace(this->log, "%p: error flushing %s", this, spa_strerror(written)); reset_buffer(this); - enable_flush(this, false, 0); + enable_flush_timer(this, false); return written; } else if (written > 0) { - if (this->codec->bap) { - uint64_t timeout = get_next_bap_timeout(this); + /* + * We cannot write all data we have at once, since this can exceed device + * buffers (esp. for the A2DP low-latency codecs) and socket buffers, so + * flush needs to be delayed. + */ + uint32_t packet_samples = this->block_count * this->block_size + / port->frame_size; + uint64_t packet_time = (uint64_t)packet_samples * SPA_NSEC_PER_SEC + / port->current_format.info.raw.rate; + + if (SPA_LIKELY(this->position)) { + uint32_t frames = get_queued_frames(this); + uint64_t duration_ns; - reset_buffer(this); - if (!spa_list_is_empty(&port->ready)) { - spa_log_debug(this->log, "%p: flush after %d ns", this, (unsigned int)timeout); - if (timeout == 0) - goto again; - else - enable_flush(this, true, timeout); - } else { - enable_flush(this, false, 0); - } - } else { /* - * We cannot write all data we have at once, since this can exceed - * device buffers. We'll want a limited number of "excess" - * samples. This is an issue for the "low-latency" A2DP codecs. - * - * Flushing the rest of the data (if any) is delayed after a timeout, - * selected on an average-rate basis: - * - * npackets = quantum / packet_samples - * write_end_time = npackets * timeout - * max_excess = quantum - sample_rate * write_end_time - * packet_time = packet_samples / sample_rate - * => timeout = (quantum - max_excess)/quantum * packet_time - */ - uint64_t max_excess = 2*256; - uint64_t packet_samples = (uint64_t)this->frame_count * this->block_size / port->frame_size; - uint64_t packet_time = packet_samples * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate; - uint64_t quantum = SPA_LIKELY(this->clock) ? this->clock->duration : 0; - uint64_t timeout = (quantum > max_excess) ? - (packet_time * (quantum - max_excess) / quantum) : 0; + * Flush at the time position of the next buffered sample. + */ + duration_ns = ((uint64_t)this->position->clock.duration * SPA_NSEC_PER_SEC + / this->position->clock.rate.denom); + this->next_flush_time = this->process_time + duration_ns + - ((uint64_t)frames * SPA_NSEC_PER_SEC + / port->current_format.info.raw.rate); - if (this->need_flush == NEED_FLUSH_FRAGMENT) { - reset_buffer(this); - this->fragment = true; - this->fragment_timeout = (packet_samples > 0) ? timeout : this->fragment_timeout; - goto again; - } - if (this->fragment_timeout > 0) { - timeout = this->fragment_timeout; - this->fragment_timeout = 0; - } - - reset_buffer(this); - if (now_time - this->last_error > SPA_NSEC_PER_SEC) { - if (get_transport_unused_size(this) == (int)this->fd_buffer_size) { - spa_log_trace(this->log, "%p: increase bitpool", this); - this->codec->increase_bitpool(this->codec_data); - } - this->last_error = now_time; - } - if (!spa_list_is_empty(&port->ready)) { - spa_log_trace(this->log, "%p: flush after %d ns", this, (int)timeout); - if (timeout == 0) - goto again; - else - enable_flush(this, true, timeout); - } else { - enable_flush(this, false, 0); - } + /* + * We could delay the output by one packet to avoid waiting + * for the next buffer and so make send intervals exactly regular. + * However, this is not needed for A2DP or BAP. The controller + * will do the scheduling for us, and there's also the socket buffer + * in between. + */ +#if 0 + this->next_flush_time += SPA_MIN(packet_time, + duration_ns * (port->n_buffers - 1)); +#endif + } else { + if (this->next_flush_time == 0) + this->next_flush_time = this->process_time; + this->next_flush_time += packet_time; } + + if (this->need_flush == NEED_FLUSH_FRAGMENT) { + reset_buffer(this); + this->fragment = true; + goto again; + } + + if (now_time - this->last_error > SPA_NSEC_PER_SEC) { + if (get_transport_unused_size(this) == (int)this->fd_buffer_size) { + spa_log_trace(this->log, "%p: increase bitpool", this); + this->codec->increase_bitpool(this->codec_data); + } + this->last_error = now_time; + } + + spa_log_trace(this->log, "%p: flush at:%"PRIu64" process:%"PRIu64, this, + this->next_flush_time, this->process_time); + reset_buffer(this); + enable_flush_timer(this, true); } else { /* Don't want to flush yet, or failed to write anything */ spa_log_trace(this->log, "%p: skip flush", this); - enable_flush(this, false, 0); + enable_flush_timer(this, false); } return 0; } -static void media_on_flush(struct spa_source *source) +static void media_on_flush_error(struct spa_source *source) { struct impl *this = source->data; - spa_log_trace(this->log, "%p: flushing", this); + spa_log_trace(this->log, "%p: flush event", this); - if (!SPA_FLAG_IS_SET(source->rmask, SPA_IO_OUT)) { + if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { spa_log_warn(this->log, "%p: error %d", this, source->rmask); if (this->flush_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_source); return; } - - if (this->transport == NULL) { - enable_flush(this, false, 0); - return; - } - - flush_data(this, this->current_time); } static void media_on_flush_timeout(struct spa_source *source) @@ -862,11 +836,14 @@ static void media_on_flush_timeout(struct spa_source *source) spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); if (this->transport == NULL) { - enable_flush(this, false, 0); + enable_flush_timer(this, false); return; } - flush_data(this, this->current_time); + while (exp-- > 0) { + this->flush_pending = false; + flush_data(this, this->current_time); + } } static void media_on_timeout(struct spa_source *source) @@ -969,8 +946,8 @@ static int do_start(struct impl *this) return -EIO; spa_log_info(this->log, "%p: using %s codec %s, delay:%"PRIi64" ms", this, - this->codec->bap ? "BAP" : "A2DP", this->codec->description, - (int64_t)(spa_bt_transport_get_delay_nsec(this->transport) / SPA_NSEC_PER_MSEC)); + this->codec->bap ? "BAP" : "A2DP", this->codec->description, + (int64_t)(spa_bt_transport_get_delay_nsec(this->transport) / SPA_NSEC_PER_MSEC)); this->seqnum = 0; @@ -1025,11 +1002,13 @@ static int do_start(struct impl *this) this->flush_source.data = this; this->flush_source.fd = this->transport->fd; - this->flush_source.func = media_on_flush; - this->flush_source.mask = 0; + this->flush_source.func = media_on_flush_error; + this->flush_source.mask = SPA_IO_ERR | SPA_IO_HUP; this->flush_source.rmask = 0; spa_loop_add_source(this->data_loop, &this->flush_source); + this->flush_pending = false; + set_timers(this); this->started = true; @@ -1046,9 +1025,6 @@ static int do_remove_source(struct spa_loop *loop, struct impl *this = user_data; struct itimerspec ts; - this->start_time = 0; - this->total_samples = 0; - if (this->source.loop) spa_loop_remove_source(this->data_loop, &this->source); ts.it_value.tv_sec = 0; @@ -1078,7 +1054,7 @@ static int do_stop(struct impl *this) if (!this->started) return 0; - spa_log_trace(this->log, "%p: stop", this); + spa_log_trace(this->log, "%p: stop", this); spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this); @@ -1275,11 +1251,14 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 2, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( + MIN_BUFFERS, + MIN_BUFFERS, + MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), - SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( - this->props.min_latency * port->frame_size, - this->props.min_latency * port->frame_size, + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( + this->quantum_limit * port->frame_size, + 16 * port->frame_size, INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size)); break; @@ -1539,18 +1518,21 @@ static int impl_node_process(void *object) io->buffer_id = SPA_ID_INVALID; io->status = SPA_STATUS_OK; } - if (!spa_list_is_empty(&port->ready)) { - if (this->following) { - if (this->position) { - this->current_time = this->position->clock.nsec; - } else { - struct timespec now; - spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); - this->current_time = SPA_TIMESPEC_TO_NSEC(&now); - } + + if (this->following) { + if (this->position) { + this->current_time = this->position->clock.nsec; + } else { + struct timespec now; + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + this->current_time = SPA_TIMESPEC_TO_NSEC(&now); } - if (this->need_flush) - reset_buffer(this); + } + + this->process_time = this->current_time; + + if (!spa_list_is_empty(&port->ready)) { + spa_log_trace(this->log, "%p: flush on process", this); flush_data(this, this->current_time); } @@ -1754,6 +1736,11 @@ impl_init(const struct spa_handle_factory *factory, spa_list_init(&port->ready); + this->quantum_limit = 8192; + + if (info && (str = spa_dict_lookup(info, "clock.quantum-limit"))) + spa_atou32(str, &this->quantum_limit, 0); + if (info && (str = spa_dict_lookup(info, "api.bluez5.a2dp-duplex")) != NULL) this->is_duplex = spa_atob(str);