diff --git a/spa/plugins/bluez5/media-sink.c b/spa/plugins/bluez5/media-sink.c index febe81a1e..e77d42a63 100644 --- a/spa/plugins/bluez5/media-sink.c +++ b/spa/plugins/bluez5/media-sink.c @@ -39,6 +39,7 @@ #include "rtp.h" #include "media-codecs.h" #include "rate-control.h" +#include "iso-io.h" static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.sink.media"); #undef SPA_LOG_TOPIC_DEFAULT @@ -144,7 +145,8 @@ struct impl { uint64_t next_time; uint64_t last_error; uint64_t process_time; - uint64_t process_position; + uint64_t process_duration; + uint64_t process_rate; uint64_t prev_flush_time; uint64_t next_flush_time; @@ -157,6 +159,8 @@ struct impl { int need_flush; bool fragment; + bool resync; + bool have_iso_packet; uint32_t block_size; uint8_t buffer[BUFFER_SIZE]; uint32_t buffer_used; @@ -168,10 +172,6 @@ struct impl { uint8_t tmp_buffer[BUFFER_SIZE]; uint32_t tmp_buffer_used; uint32_t fd_buffer_size; - -#ifdef HAVE_BLUETOOTH_BAP - struct bt_iso_qos qos; -#endif }; #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0) @@ -284,52 +284,75 @@ static int set_timers(struct impl *this) return set_timeout(this, this->following ? 0 : this->next_time); } -static int do_reassign_follower(struct spa_loop *loop, +static inline bool is_following(struct impl *this) +{ + return this->position && this->clock && this->position->clock.id != this->clock->id; +} + +struct reassign_io_info { + struct impl *this; + struct spa_io_position *position; + struct spa_io_clock *clock; +}; + +static int do_reassign_io(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { - struct impl *this = user_data; - set_timers(this); - return 0; -} + struct reassign_io_info *info = user_data; + struct impl *this = info->this; + bool following; -static inline bool is_following(struct impl *this) -{ - return this->position && this->clock && this->position->clock.id != this->clock->id; + if (this->position != info->position || this->clock != info->clock) + this->resync = true; + + this->position = info->position; + this->clock = info->clock; + + following = is_following(this); + + if (following != this->following) { + spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following); + this->following = following; + set_timers(this); + } + + return 0; } static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) { struct impl *this = object; - bool following; + struct reassign_io_info info = { .this = this, .position = this->position, .clock = this->clock }; spa_return_val_if_fail(this != NULL, -EINVAL); switch (id) { case SPA_IO_Clock: - this->clock = data; - if (this->clock != NULL) { - spa_scnprintf(this->clock->name, - sizeof(this->clock->name), + info.clock = data; + if (info.clock != NULL) { + spa_scnprintf(info.clock->name, + sizeof(info.clock->name), "%s", this->props.clock_name); } break; case SPA_IO_Position: - this->position = data; + info.position = data; break; default: return -ENOENT; } - following = is_following(this); - if (this->started && following != this->following) { - spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following); - this->following = following; - spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this); + if (this->started) { + spa_loop_invoke(this->data_loop, do_reassign_io, 0, NULL, 0, true, &info); + } else { + this->clock = info.clock; + this->position = info.position; } + return 0; } @@ -433,35 +456,36 @@ static uint32_t get_queued_frames(struct impl *this) return bytes / port->frame_size; } -static uint64_t get_reference_time(struct impl *this, uint64_t *duration_ns) +static uint64_t get_reference_time(struct impl *this, uint64_t *duration_ns_ret) { struct port *port = &this->port; + uint64_t t, duration_ns; - spa_assert(this->position); + if (!this->process_rate || !this->process_duration) { + if (this->position) { + this->process_duration = this->position->clock.duration; + this->process_rate = this->position->clock.rate.denom; + } else { + this->process_duration = 1024; + this->process_rate = 48000; + } + } + + duration_ns = ((uint64_t)this->process_duration * SPA_NSEC_PER_SEC / this->process_rate); + if (duration_ns_ret) + *duration_ns_ret = duration_ns; /* Time at the first sample in the current packet. */ - *duration_ns = ((uint64_t)this->position->clock.duration * SPA_NSEC_PER_SEC - / this->position->clock.rate.denom); - return this->process_time + *duration_ns - - ((uint64_t)get_queued_frames(this) * SPA_NSEC_PER_SEC - / port->current_format.info.raw.rate); -} + t = this->process_time + duration_ns; + t -= ((uint64_t)get_queued_frames(this) * SPA_NSEC_PER_SEC + / port->current_format.info.raw.rate); -static uint64_t get_reference_position(struct impl *this) -{ - struct port *port = &this->port; - uint64_t position; + /* Account for resampling delay */ + if (port->rate_match && this->clock && SPA_FLAG_IS_SET(port->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE)) + t -= (uint64_t)port->rate_match->delay * SPA_NSEC_PER_SEC + / this->clock->rate.denom; - /* Sample position at the first sample in the current packet. - * If resampling, may be rounded down by one sample. - */ - - if (!this->position) - return this->sample_count; - - position = this->process_position * port->current_format.info.raw.rate / - this->position->clock.rate.denom; - return position - get_queued_frames(this); + return t; } static int reset_buffer(struct impl *this) @@ -474,7 +498,8 @@ static int reset_buffer(struct impl *this) this->need_flush = 0; this->block_count = 0; this->fragment = false; - this->timestamp = this->codec->bap ? get_reference_position(this) : this->sample_count; + this->timestamp = this->codec->bap ? (get_reference_time(this, NULL) / SPA_NSEC_PER_USEC) + : this->sample_count; this->buffer_used = this->codec->start_encode(this->codec_data, this->buffer, sizeof(this->buffer), ++this->seqnum, this->timestamp); @@ -675,79 +700,6 @@ static void enable_flush_timer(struct impl *this, bool enabled) this->flush_pending = enabled; } -#ifdef HAVE_BLUETOOTH_BAP -static void sync_iso_frame_start(struct impl *this) -{ - struct port *port = &this->port; - uint64_t position; - uint32_t interval_frames; - uint32_t req; - - if (!this->codec->bap || !this->qos.out.interval || !this->position) - return; - - /* Synchronize packet start sample position to a multiple of the ISO interval. - * - * This ensures that different nodes in the graph create packets containing audio - * aligned at commensurate ISO intervals. This will then also align their flush - * reference times. - * - * The ISO interval generally consists of an integer number of frames, so we - * should do this calculation in frames. - */ - position = get_reference_position(this); - interval_frames = (uint64_t)port->current_format.info.raw.rate * this->qos.out.interval - / SPA_USEC_PER_SEC; - - /* Skip frames: generally, this should only occur once when the node starts. */ - req = position % interval_frames; - - if (this->position->clock.rate.denom != port->current_format.info.raw.rate) { - /* if resampling, the count may be rounded down by one */ - if (req == interval_frames - 1) - req = 0; - } - if (req > 0) - req = interval_frames - req; - - if (req > 0) { - spa_log_debug(this->log, "node %p: ISO sync %"PRIu64"->%"PRIu64": skipping %d frames", - this, position, SPA_ROUND_UP(position, interval_frames), req); - } - while (req > 0 && !spa_list_is_empty(&port->ready)) { - struct buffer *b; - struct spa_data *d; - uint32_t avail; - - b = spa_list_first(&port->ready, struct buffer, link); - d = b->buf->datas; - - avail = d[0].chunk->size - port->ready_offset; - avail /= port->frame_size; - - avail = SPA_MIN(avail, req); - port->ready_offset += avail * port->frame_size; - req -= avail; - - if (port->ready_offset >= d[0].chunk->size) { - spa_list_remove(&b->link); - SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); - spa_log_trace(this->log, "%p: reuse buffer %u", this, b->id); - this->port.io->buffer_id = b->id; - - spa_node_call_reuse_buffer(&this->callbacks, 0, b->id); - port->ready_offset = 0; - } - - spa_log_trace(this->log, "%p: skipped %u frames", this, avail); - } -} -#else -static void sync_iso_frame_start(struct impl *this) -{ -} -#endif - static int flush_data(struct impl *this, uint64_t now_time) { int written; @@ -757,10 +709,11 @@ static int flush_data(struct impl *this, uint64_t now_time) spa_assert(this->transport_started); - if (this->transport == NULL || !this->flush_source.loop || !this->flush_timer_source.loop) { - /* I/O in error state */ + /* I/O in error state? */ + if (this->transport == NULL || !this->flush_source.loop) + return -EIO; + if (!this->flush_timer_source.loop && !this->transport->iso_io) return -EIO; - } total_frames = 0; again: @@ -831,6 +784,27 @@ again: spa_log_trace(this->log, "%p: written %u frames", this, total_frames); } + if (this->transport->iso_io) { + struct spa_bt_iso_io *iso_io = this->transport->iso_io; + + if (this->need_flush && !this->have_iso_packet) { + size_t avail = SPA_MIN(this->buffer_used, sizeof(iso_io->buf)); + + spa_log_trace(this->log, "%p: ISO put fd:%d size:%u sn:%u ts:%u now:%"PRIu64, + this, this->transport->fd, (unsigned)avail, + (unsigned)this->seqnum, (unsigned)this->timestamp, + iso_io->now); + + memcpy(iso_io->buf, this->buffer, avail); + iso_io->size = avail; + iso_io->timestamp = this->timestamp; + this->have_iso_packet = true; + + reset_buffer(this); + } + return 0; + } + if (this->flush_pending) { spa_log_trace(this->log, "%p: wait for flush timer", this); return 0; @@ -880,8 +854,6 @@ again: uint64_t packet_time = (uint64_t)packet_samples * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate; - sync_iso_frame_start(this); - if (SPA_LIKELY(this->position)) { uint64_t duration_ns; @@ -942,6 +914,98 @@ again: return 0; } +static void drop_frames(struct impl *this, uint32_t req) +{ + struct port *port = &this->port; + + while (req > 0 && !spa_list_is_empty(&port->ready)) { + struct buffer *b; + struct spa_data *d; + uint32_t avail; + + b = spa_list_first(&port->ready, struct buffer, link); + d = b->buf->datas; + + avail = d[0].chunk->size - port->ready_offset; + avail /= port->frame_size; + + avail = SPA_MIN(avail, req); + port->ready_offset += avail * port->frame_size; + req -= avail; + + if (port->ready_offset >= d[0].chunk->size) { + spa_list_remove(&b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + spa_log_trace(this->log, "%p: reuse buffer %u", this, b->id); + this->port.io->buffer_id = b->id; + + spa_node_call_reuse_buffer(&this->callbacks, 0, b->id); + port->ready_offset = 0; + } + + spa_log_trace(this->log, "%p: skipped %u frames", this, avail); + } +} + +static void media_iso_pull(struct spa_bt_iso_io *iso_io) +{ + struct impl *this = iso_io->user_data; + struct port *port = &this->port; + const double period = 0.1 * SPA_NSEC_PER_SEC; + uint64_t duration_ns; + double value, target, err; + + this->have_iso_packet = false; + + if (this->resync || !this->position) { + spa_bt_rate_control_init(&port->ratectl, 0); + goto done; + } + + /* + * Rate match sample position so that the graph is 3/2 ISO interval + * ahead of the time instant we have to send data. + * + * Being 1 ISO interval ahead is unavoidable otherwise we underrun, + * and the 1/2 is safety margin for the graph to deliver data + * in time. + * + * This is then the part of the TX latency on PipeWire side. There is + * another part of TX latency on kernel/controller side before the + * controller starts processing the packet. + */ + + value = (int64_t)iso_io->now - (int64_t)get_reference_time(this, &duration_ns); + target = iso_io->duration * 3/2; + err = value - target; + + if (err > iso_io->duration) { + uint32_t req = err * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC; + + spa_log_debug(this->log, "%p: ISO sync reset frames:%u", this, (unsigned int)req); + + spa_bt_rate_control_init(&port->ratectl, 0); + drop_frames(this, req); + } else if (-err > iso_io->duration) { + uint32_t req = -err * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC; + + spa_log_debug(this->log, "%p: ISO sync skip flush frames:%u", this, (unsigned int)req); + return; + } else { + spa_bt_rate_control_update(&port->ratectl, err, 0, + iso_io->duration, period, RATE_CTL_DIFF_MAX); + spa_log_trace(this->log, "%p: ISO sync err:%+.3f value:%.3f target:%.3f (ms) corr:%g", + this, + port->ratectl.avg / SPA_NSEC_PER_MSEC, + value / SPA_NSEC_PER_MSEC, + target / SPA_NSEC_PER_MSEC, + port->ratectl.corr); + } + +done: + flush_data(this, this->current_time); +} + static void media_on_flush_error(struct spa_source *source) { struct impl *this = source->data; @@ -955,6 +1019,8 @@ static void media_on_flush_error(struct spa_source *source) enable_flush_timer(this, false); if (this->flush_timer_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_timer_source); + if (this->transport && this->transport->iso_io) + spa_bt_iso_io_set_cb(this->transport->iso_io, NULL, NULL); return; } } @@ -1048,6 +1114,15 @@ static void media_on_timeout(struct spa_source *source) set_timeout(this, this->next_time); } +static int do_start_iso_io(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct impl *this = user_data; + + spa_bt_iso_io_set_cb(this->transport->iso_io, media_iso_pull, this); + return 0; +} + static int transport_start(struct impl *this) { int val, size; @@ -1120,26 +1195,18 @@ static int transport_start(struct impl *this) if (setsockopt(this->transport->fd, SOL_SOCKET, SO_PRIORITY, &val, sizeof(val)) < 0) spa_log_warn(this->log, "SO_PRIORITY failed: %m"); -#ifdef HAVE_BLUETOOTH_BAP - if (this->codec->bap) { - len = sizeof(this->qos); - if (getsockopt(this->transport->fd, SOL_BLUETOOTH, BT_ISO_QOS, &this->qos, &len) < 0) { - memset(&this->qos, 0, sizeof(this->qos)); - spa_log_warn(this->log, "BT_ISO_QOS failed: %m"); - } - } -#endif - reset_buffer(this); spa_bt_rate_control_init(&port->ratectl, 0); - this->flush_timer_source.data = this; - this->flush_timer_source.fd = this->flush_timerfd; - this->flush_timer_source.func = media_on_flush_timeout; - this->flush_timer_source.mask = SPA_IO_IN; - this->flush_timer_source.rmask = 0; - spa_loop_add_source(this->data_loop, &this->flush_timer_source); + if (!this->transport->iso_io) { + this->flush_timer_source.data = this; + this->flush_timer_source.fd = this->flush_timerfd; + this->flush_timer_source.func = media_on_flush_timeout; + this->flush_timer_source.mask = SPA_IO_IN; + this->flush_timer_source.rmask = 0; + spa_loop_add_source(this->data_loop, &this->flush_timer_source); + } this->flush_source.data = this; this->flush_source.fd = this->transport->fd; @@ -1148,10 +1215,15 @@ static int transport_start(struct impl *this) this->flush_source.rmask = 0; spa_loop_add_source(this->data_loop, &this->flush_source); + this->resync = true; + this->flush_pending = false; this->transport_started = true; + if (this->transport->iso_io) + spa_loop_invoke(this->data_loop, do_start_iso_io, 0, NULL, 0, true, this); + return 0; } @@ -1224,6 +1296,9 @@ static int do_remove_transport_source(struct spa_loop *loop, spa_loop_remove_source(this->data_loop, &this->flush_timer_source); enable_flush_timer(this, false); + if (this->transport->iso_io) + spa_bt_iso_io_set_cb(this->transport->iso_io, NULL, NULL); + return 0; } @@ -1635,7 +1710,7 @@ impl_node_port_use_buffers(void *object, spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); port = &this->port; - spa_log_debug(this->log, "use buffers %d", n_buffers); + spa_log_debug(this->log, "%p: use buffers %d", this, n_buffers); clear_buffers(this, port); @@ -1703,6 +1778,7 @@ static int impl_node_process(void *object) struct impl *this = object; struct port *port; struct spa_io_buffers *io; + int res; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -1746,22 +1822,23 @@ static int impl_node_process(void *object) } } - if (this->position) - this->process_position = this->position->clock.position; + if (this->position) { + this->process_duration = this->position->clock.duration; + this->process_rate = this->position->clock.rate.denom; + } else { + this->process_duration = 1024; + this->process_rate = 48000; + } this->process_time = this->current_time; + this->resync = false; setup_matching(this); - if (!spa_list_is_empty(&port->ready)) { - int res; - spa_log_trace(this->log, "%p: flush on process", this); - if ((res = flush_data(this, this->current_time)) < 0) { - io->status = res; - return SPA_STATUS_STOPPED; - } - } else { - spa_log_trace(this->log, "%p: no flush on process", this); + spa_log_trace(this->log, "%p: on process time:%"PRIu64, this, this->process_time); + if ((res = flush_data(this, this->current_time)) < 0) { + io->status = res; + return SPA_STATUS_STOPPED; } return SPA_STATUS_HAVE_DATA;