bluez5-device: Rewrite of thread function, reduce send buffer size for a2dp sink

The rewrite of the thread function does not change functionality much,
most of it is only cleanup, minor bug fixing  and documentation work.

This patch also changes the send buffer size for a2dp sink to avoid lags
after temporary connection drops, following the proof-of-concept patch
posted by Dmitry Kalyanov.

Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746

Additionally the patch changes the fixed latency for HSP playback from 125
to 25 ms. Tests showed that this produces better audio sync, which is
expected as HSP should have smaller latency than A2DP.
This commit is contained in:
Georg Chini 2018-05-09 07:27:58 +02:00
parent dc65a03454
commit 192c3aaef8

View file

@ -56,9 +56,8 @@ PA_MODULE_LOAD_ONCE(false);
PA_MODULE_USAGE("path=<device object path>" PA_MODULE_USAGE("path=<device object path>"
"autodetect_mtu=<boolean>"); "autodetect_mtu=<boolean>");
#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_PLAYBACK_SCO (25 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC)
#define FIXED_LATENCY_RECORD_SCO (25 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_RECORD_SCO (25 * PA_USEC_PER_MSEC)
@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
return ret; return ret;
} }
static void update_buffer_size(struct userdata *u) {
int old_bufsize;
socklen_t len = sizeof(int);
int ret;
ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);
if (ret == -1) {
pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
} else {
int new_bufsize;
/* Set send buffer size as small as possible. The minimum value is 1024 according to the
* socket man page. The data is written to the socket in chunks of write_block_size, so
* there should at least be room for two chunks in the buffer. Generally, write_block_size
* is larger than 512. If not, use the next multiple of write_block_size which is larger
* than 1024. */
new_bufsize = 2 * u->write_block_size;
if (new_bufsize < 1024)
new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size;
/* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt
* returns the doubled value. */
if (new_bufsize != old_bufsize / 2) {
ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len);
if (ret == -1)
pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
else
pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize);
}
}
}
/* Run from I/O thread */ /* Run from I/O thread */
static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
struct sbc_info *sbc_info; struct sbc_info *sbc_info;
@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
pa_sink_set_fixed_latency_within_thread(u->sink, pa_sink_set_fixed_latency_within_thread(u->sink,
FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
/* If there is still data in the memchunk, we have to discard it
* because the write_block_size may have changed. */
if (u->write_memchunk.memblock) {
pa_memblock_unref(u->write_memchunk.memblock);
pa_memchunk_reset(&u->write_memchunk);
}
update_buffer_size(u);
} }
/* Run from I/O thread */ /* Run from I/O thread */
@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
pa_log_debug("Stream properly set up, we're ready to roll!"); pa_log_debug("Stream properly set up, we're ready to roll!");
if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
a2dp_set_bitpool(u, u->sbc_info.max_bitpool); a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
update_buffer_size(u);
}
u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
@ -1068,12 +1110,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
switch (code) { switch (code) {
case PA_SINK_MESSAGE_GET_LATENCY: { case PA_SINK_MESSAGE_GET_LATENCY: {
int64_t wi, ri; int64_t wi = 0, ri = 0;
if (u->read_smoother) { if (u->read_smoother) {
ri = pa_smoother_get(u->read_smoother, pa_rtclock_now()); ri = pa_smoother_get(u->read_smoother, pa_rtclock_now());
wi = pa_bytes_to_usec(u->write_index + u->write_block_size, &u->sample_spec); wi = pa_bytes_to_usec(u->write_index + u->write_block_size, &u->sample_spec);
} else { } else if (u->started_at) {
ri = pa_rtclock_now() - u->started_at; ri = pa_rtclock_now() - u->started_at;
wi = pa_bytes_to_usec(u->write_index, &u->sample_spec); wi = pa_bytes_to_usec(u->write_index, &u->sample_spec);
} }
@ -1415,12 +1457,32 @@ static int init_profile(struct userdata *u) {
return r; return r;
} }
static int write_block(struct userdata *u) {
int n_written;
if (u->write_index <= 0)
u->started_at = pa_rtclock_now();
if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
if ((n_written = a2dp_process_render(u)) < 0)
return -1;
} else {
if ((n_written = sco_process_render(u)) < 0)
return -1;
}
if (n_written == 0)
pa_log_debug("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss.");
return n_written;
}
/* I/O thread function */ /* I/O thread function */
static void thread_func(void *userdata) { static void thread_func(void *userdata) {
struct userdata *u = userdata; struct userdata *u = userdata;
unsigned do_write = 0; unsigned blocks_to_write = 0;
unsigned pending_read_bytes = 0; unsigned bytes_to_write = 0;
bool writable = false;
pa_assert(u); pa_assert(u);
pa_assert(u->transport); pa_assert(u->transport);
@ -1440,9 +1502,13 @@ static void thread_func(void *userdata) {
struct pollfd *pollfd; struct pollfd *pollfd;
int ret; int ret;
bool disable_timer = true; bool disable_timer = true;
bool writable = false;
bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
/* Check for stream error or close */
if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
pa_log_info("FD error: %s%s%s%s", pa_log_info("FD error: %s%s%s%s",
pollfd->revents & POLLERR ? "POLLERR " :"", pollfd->revents & POLLERR ? "POLLERR " :"",
@ -1453,147 +1519,185 @@ static void thread_func(void *userdata) {
if (pollfd->revents & POLLHUP) { if (pollfd->revents & POLLHUP) {
pollfd = NULL; pollfd = NULL;
teardown_stream(u); teardown_stream(u);
do_write = 0; blocks_to_write = 0;
pending_read_bytes = 0; bytes_to_write = 0;
writable = false;
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
} else } else
goto fail; goto fail;
} }
if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { /* If there is a pollfd, the stream is set up and we need to do something */
if (pollfd) {
/* We should send two blocks to the device before we expect /* Handle source if present */
* a response. */ if (have_source) {
if (u->write_index == 0 && u->read_index <= 0) /* We should send two blocks to the device before we expect a response. */
do_write = 2; if (u->write_index == 0 && u->read_index <= 0)
blocks_to_write = 2;
if (pollfd && (pollfd->revents & POLLIN)) { /* If we got woken up by POLLIN let's do some reading */
int n_read; if (pollfd->revents & POLLIN) {
int n_read;
if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
n_read = a2dp_process_push(u); n_read = a2dp_process_push(u);
else else
n_read = sco_process_push(u); n_read = sco_process_push(u);
if (n_read < 0) if (n_read < 0)
goto fail; goto fail;
if (n_read > 0) { if (n_read > 0) {
/* We just read something, so we are supposed to write something, too */ /* We just read something, so we are supposed to write something, too */
pending_read_bytes += n_read; bytes_to_write += n_read;
do_write += pending_read_bytes / u->write_block_size; blocks_to_write += bytes_to_write / u->write_block_size;
pending_read_bytes = pending_read_bytes % u->write_block_size; bytes_to_write = bytes_to_write % u->write_block_size;
}
} }
} }
}
if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { /* Handle sink if present */
if (have_sink) {
if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) /* Process rewinds */
pa_sink_process_rewind(u->sink, 0); if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
pa_sink_process_rewind(u->sink, 0);
if (pollfd) { /* Test if the stream is writable */
if (pollfd->revents & POLLOUT) if (pollfd->revents & POLLOUT)
writable = true; writable = true;
if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { /* If we have a source, we let the source determine the timing
pa_usec_t time_passed; * for the sink */
pa_usec_t audio_sent; if (have_source) {
/* Hmm, there is no input stream we could synchronize if (writable && blocks_to_write > 0) {
* to. So let's do things by time */ int result;
time_passed = pa_rtclock_now() - u->started_at; if ((result = write_block(u)) < 0)
audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); goto fail;
blocks_to_write -= result;
/* writable controls whether we set POLLOUT when polling - we set it to
* false to enable POLLOUT. If there are more blocks to write, we want to
* be woken up immediately when the socket becomes writable. If there
* aren't currently any more blocks to write, then we'll have to wait
* until we've received more data, so in that case we only want to set
* POLLIN. Note that when we are woken up the next time, POLLOUT won't be
* set in revents even if the socket has meanwhile become writable, which
* may seem bad, but in that case we'll set POLLOUT in the subsequent
* poll, and the poll will return immediately, so our writes won't be
* delayed. */
if (blocks_to_write > 0)
writable = false;
}
/* There is no source, we have to use the system clock for timing */
} else {
bool have_written = false;
pa_usec_t time_passed = 0;
pa_usec_t audio_sent = 0;
if (u->started_at) {
time_passed = pa_rtclock_now() - u->started_at;
audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
}
/* A new block needs to be sent. */
if (audio_sent <= time_passed) { if (audio_sent <= time_passed) {
pa_usec_t audio_to_send = time_passed - audio_sent; size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec);
/* Never try to catch up for more than 100ms */ /* There are more than two blocks that need to be written. It seems that
if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { * the socket has not been accepting data fast enough (could be due to
pa_usec_t skip_usec; * hiccups in the wireless transmission). We need to discard everything
* older than two block sizes to keep the latency from growing. */
if (bytes_to_send > 2 * u->write_block_size) {
uint64_t skip_bytes; uint64_t skip_bytes;
pa_memchunk tmp;
size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool);
pa_usec_t skip_usec;
skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; skip_bytes = bytes_to_send - 2 * u->write_block_size;
skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec);
if (skip_bytes > 0) { pa_log_debug("Skipping %llu us (= %llu bytes) in audio stream",
pa_memchunk tmp; (unsigned long long) skip_usec,
(unsigned long long) skip_bytes);
pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", while (skip_bytes > 0) {
(unsigned long long) skip_usec, size_t bytes_to_render;
(unsigned long long) skip_bytes);
pa_sink_render_full(u->sink, skip_bytes, &tmp); if (skip_bytes > mempool_max_block_size)
bytes_to_render = mempool_max_block_size;
else
bytes_to_render = skip_bytes;
pa_sink_render_full(u->sink, bytes_to_render, &tmp);
pa_memblock_unref(tmp.memblock); pa_memblock_unref(tmp.memblock);
u->write_index += skip_bytes; u->write_index += bytes_to_render;
skip_bytes -= bytes_to_render;
if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
a2dp_reduce_bitpool(u);
} }
if (u->write_index > 0 && u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
a2dp_reduce_bitpool(u);
} }
do_write = 1; blocks_to_write = 1;
pending_read_bytes = 0;
}
}
if (writable && do_write > 0) {
int n_written;
if (u->write_index <= 0)
u->started_at = pa_rtclock_now();
if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
if ((n_written = a2dp_process_render(u)) < 0)
goto fail;
} else {
if ((n_written = sco_process_render(u)) < 0)
goto fail;
} }
if (n_written == 0) /* If the stream is writable, send some data if necessary */
pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); if (writable && blocks_to_write > 0) {
int result;
do_write -= n_written; if ((result = write_block(u)) < 0)
writable = false; goto fail;
}
if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { blocks_to_write -= result;
pa_usec_t sleep_for; writable = false;
pa_usec_t time_passed, next_write_at; if (result)
have_written = true;
}
if (writable) { /* If nothing was written during this iteration, either the stream
/* Hmm, there is no input stream we could synchronize * is not writable or there was no write pending. Set up a timer that
* to. So let's estimate when we need to wake up the latest */ * will wake up the thread when the next data needs to be written. */
time_passed = pa_rtclock_now() - u->started_at; if (!have_written) {
next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); pa_usec_t sleep_for;
sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; pa_usec_t next_write_at;
/* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
} else
/* drop stream every 500 ms */
sleep_for = PA_USEC_PER_MSEC * 500;
pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); if (writable) {
disable_timer = false; /* There was no write pending on this iteration of the loop.
* Let's estimate when we need to wake up next */
next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec);
sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
/* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
} else
/* We could not write because the stream was not ready. Let's try
* again in 500 ms and drop audio if we still can't write. The
* thread will also be woken up when we can write again. */
sleep_for = PA_USEC_PER_MSEC * 500;
pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for);
disable_timer = false;
}
} }
} }
/* Set events to wake up the thread */
pollfd->events = (short) (((have_sink && !writable) ? POLLOUT : 0) | (have_source ? POLLIN : 0));
} }
if (disable_timer) if (disable_timer)
pa_rtpoll_set_timer_disabled(u->rtpoll); pa_rtpoll_set_timer_disabled(u->rtpoll);
/* Hmm, nothing to do. Let's sleep */
if (pollfd)
pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) |
(u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0));
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) { if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
pa_log_debug("pa_rtpoll_run failed with: %d", ret); pa_log_debug("pa_rtpoll_run failed with: %d", ret);
goto fail; goto fail;
} }
if (ret == 0) { if (ret == 0) {
pa_log_debug("IO thread shutdown requested, stopping cleanly"); pa_log_debug("IO thread shutdown requested, stopping cleanly");
transport_release(u); transport_release(u);