diff --git a/spa/plugins/bluez5/media-sink.c b/spa/plugins/bluez5/media-sink.c index 5bc1811dd..3020edfd9 100644 --- a/spa/plugins/bluez5/media-sink.c +++ b/spa/plugins/bluez5/media-sink.c @@ -208,6 +208,8 @@ struct impl { struct spa_bt_asha *asha; struct spa_list asha_link; + + struct spa_bt_latency tx_latency; }; #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0) @@ -672,30 +674,41 @@ static int setup_matching(struct impl *this) return 0; } -static int get_transport_unused_size(struct impl *this) +static int get_transport_unsent_size(struct impl *this) { int res, value; - res = ioctl(this->flush_source.fd, TIOCOUTQ, &value); - if (res < 0) { - spa_log_error(this->log, "%p: ioctl fail: %m", this); - return -errno; + + if (this->tx_latency.enabled) { + res = 0; + value = this->tx_latency.unsent; + } else { + res = ioctl(this->flush_source.fd, TIOCOUTQ, &value); + if (res < 0) { + spa_log_error(this->log, "%p: ioctl fail: %m", this); + return -errno; + } + if ((unsigned int)value > this->fd_buffer_size) + return -EIO; + value = this->fd_buffer_size - value; } - spa_log_trace(this->log, "%p: fd unused buffer size:%d/%d", this, value, this->fd_buffer_size); + + spa_log_trace(this->log, "%p: fd unsent size:%d/%d", this, value, this->fd_buffer_size); return value; } static int send_buffer(struct impl *this) { int written, unsent; + struct timespec ts_pre; - unsent = get_transport_unused_size(this); - if (unsent >= 0) { - unsent = this->fd_buffer_size - unsent; + unsent = get_transport_unsent_size(this); + if (unsent >= 0) this->codec->abr_process(this->codec_data, unsent); - } - written = send(this->flush_source.fd, this->buffer, - this->buffer_used, MSG_DONTWAIT | MSG_NOSIGNAL); + spa_system_clock_gettime(this->data_system, CLOCK_REALTIME, &ts_pre); + + written = spa_bt_send(this->flush_source.fd, this->buffer, this->buffer_used, + &this->tx_latency, SPA_TIMESPEC_TO_NSEC(&ts_pre)); if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, SPA_LOG_LEVEL_TRACE))) { struct timespec ts; @@ -855,7 +868,7 @@ static int flush_data(struct impl *this, uint64_t now_time) bool is_asha = this->codec->asha; uint32_t total_frames; int written; - int unused_buffer; + int unsent_buffer; spa_assert(this->transport_started); @@ -994,11 +1007,10 @@ again: } /* - * Get socket queue size before writing to it. - * This should be the same as buffer size to increase bitpool - * Bitpool shouldn't be increased when data is left over in the buffer + * Get packet queue size before writing to it. This should be zero to increase + * bitpool. Bitpool shouldn't be increased when there is unsent data. */ - unused_buffer = get_transport_unused_size(this); + unsent_buffer = get_transport_unsent_size(this); written = flush_buffer(this); @@ -1075,7 +1087,7 @@ again: } if (now_time - this->last_error > SPA_NSEC_PER_SEC) { - if (unused_buffer == (int)this->fd_buffer_size) { + if (unsent_buffer == 0) { int res = this->codec->increase_bitpool(this->codec_data); spa_log_debug(this->log, "%p: increase bitpool: %i", this, res); @@ -1209,9 +1221,13 @@ static void media_on_flush_error(struct spa_source *source) if (source->rmask & SPA_IO_ERR) { /* TX timestamp info? */ - if (this->transport && this->transport->iso_io) + if (this->transport && this->transport->iso_io) { if (spa_bt_iso_io_recv_errqueue(this->transport->iso_io) == 0) return; + } else { + if (spa_bt_latency_recv_errqueue(&this->tx_latency, this->flush_source.fd, this->log) == 0) + return; + } /* Otherwise: actual error */ } @@ -1220,8 +1236,10 @@ static void media_on_flush_error(struct spa_source *source) if (source->rmask & (SPA_IO_HUP | SPA_IO_ERR)) { spa_log_warn(this->log, "%p: error %d", this, source->rmask); - if (this->flush_source.loop) + if (this->flush_source.loop) { + spa_bt_latency_flush(&this->tx_latency, this->flush_source.fd, this->log); spa_loop_remove_source(this->data_loop, &this->flush_source); + } enable_flush_timer(this, false); if (this->flush_timer_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_timer_source); @@ -1510,6 +1528,8 @@ static int transport_start(struct impl *this) this->flush_timer_source.mask = SPA_IO_IN; this->flush_timer_source.rmask = 0; spa_loop_add_source(this->data_loop, &this->flush_timer_source); + + spa_bt_latency_init(&this->tx_latency, this->transport, LATENCY_PERIOD, this->log); } if (!is_asha) { @@ -1626,8 +1646,11 @@ static int do_remove_transport_source(struct spa_loop *loop, this->transport_started = false; - if (this->flush_source.loop) + if (this->flush_source.loop) { + spa_bt_latency_flush(&this->tx_latency, this->flush_source.fd, this->log); spa_loop_remove_source(this->data_loop, &this->flush_source); + } + if (this->flush_timer_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_timer_source); if (this->codec->asha) {