bluetooth: prepare to redo transport writeout scheduling

Bluetooth SCO is synchronous stream, make our writes more uniformly paced.
To do this, first separate writing to socket from rendering a frame.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/507>
This commit is contained in:
Igor V. Kovalenko 2021-02-21 11:41:52 +03:00 committed by PulseAudio Marge Bot
parent 976fc1d099
commit 030dc8b968

View file

@ -255,9 +255,9 @@ static void connect_ports(struct userdata *u, void *new_data, pa_direction_t dir
} }
} }
static void bt_prepare_encoder_buffer(struct userdata *u) static bool bt_prepare_encoder_buffer(struct userdata *u)
{ {
size_t size; size_t encoded_size, reserved_size;
pa_assert(u); pa_assert(u);
pa_assert(u->bt_codec); pa_assert(u->bt_codec);
@ -267,17 +267,30 @@ static void bt_prepare_encoder_buffer(struct userdata *u)
* Reserve space for 2 encoded frames to cover that. * Reserve space for 2 encoded frames to cover that.
* *
* Note for A2DP codecs it is expected that size of encoded frame is less * Note for A2DP codecs it is expected that size of encoded frame is less
* than write link MTU therefore each encoded frame is sent out completely. * than write link MTU. Therefore each encoded frame is sent out completely
* and there is no used space in encoder buffer before next encoder call.
*/ */
if (u->bt_codec->get_encoded_block_size) if (u->bt_codec->get_encoded_block_size)
size = 2 * u->bt_codec->get_encoded_block_size(u->encoder_info, u->write_block_size); encoded_size = u->bt_codec->get_encoded_block_size(u->encoder_info, u->write_block_size);
else else
size = 2 * u->write_block_size; encoded_size = u->write_block_size;
if (u->encoder_buffer_size < size) { reserved_size = 2 * encoded_size;
u->encoder_buffer = pa_xrealloc(u->encoder_buffer, size);
u->encoder_buffer_size = size; if (u->encoder_buffer_size < reserved_size) {
u->encoder_buffer = pa_xrealloc(u->encoder_buffer, reserved_size);
u->encoder_buffer_size = reserved_size;
if (u->encoder_buffer_used > reserved_size) {
u->encoder_buffer_used = 0;
}
} }
/* Report if there is still not enough space for new block */
if (u->encoder_buffer_size < u->encoder_buffer_used + encoded_size)
return false;
return true;
} }
/* Run from IO thread */ /* Run from IO thread */
@ -286,20 +299,28 @@ static int bt_write_buffer(struct userdata *u) {
pa_assert(u); pa_assert(u);
pa_assert(u->transport); pa_assert(u->transport);
pa_assert(u->bt_codec);
written = u->transport->write(u->transport, u->stream_fd, u->encoder_buffer, u->encoder_buffer_used, u->write_link_mtu); written = u->transport->write(u->transport, u->stream_fd, u->encoder_buffer, u->encoder_buffer_used, u->write_link_mtu);
if (written < 0) if (written > 0) {
/* calculate remainder */
u->encoder_buffer_used -= written;
/* move any remainder back to start of u->encoder_buffer */
if (u->encoder_buffer_used)
memmove(u->encoder_buffer, u->encoder_buffer + written, u->encoder_buffer_used);
return 1;
} else if (written == 0) {
/* Not enough data in encoder buffer */
return 0;
} else {
/* Reset encoder sequence number and buffer positions */
u->bt_codec->reset(u->encoder_info);
u->encoder_buffer_used = 0;
return -1; return -1;
}
/* calculate remainder */
u->encoder_buffer_used -= written;
/* move any remainder back to start of u->encoder_buffer */
if (u->encoder_buffer_used)
memmove(u->encoder_buffer, u->encoder_buffer + written, u->encoder_buffer_used);
return 1;
} }
/* Run from IO thread */ /* Run from IO thread */
@ -314,17 +335,21 @@ static int bt_process_render(struct userdata *u) {
pa_assert(u->sink); pa_assert(u->sink);
pa_assert(u->bt_codec); pa_assert(u->bt_codec);
if (!bt_prepare_encoder_buffer(u))
return false;
/* First, render some data */ /* First, render some data */
if (!u->write_memchunk.memblock) if (!u->write_memchunk.memblock)
pa_sink_render_full(u->sink, u->write_block_size, &u->write_memchunk); pa_sink_render_full(u->sink, u->write_block_size, &u->write_memchunk);
pa_assert(u->write_memchunk.length == u->write_block_size); pa_assert(u->write_memchunk.length == u->write_block_size);
bt_prepare_encoder_buffer(u);
ptr = (const uint8_t *) pa_memblock_acquire_chunk(&u->write_memchunk); ptr = (const uint8_t *) pa_memblock_acquire_chunk(&u->write_memchunk);
length = u->bt_codec->encode_buffer(u->encoder_info, u->write_index / pa_frame_size(&u->encoder_sample_spec), ptr, u->write_memchunk.length, u->encoder_buffer + u->encoder_buffer_used, u->encoder_buffer_size - u->encoder_buffer_used, &processed); length = u->bt_codec->encode_buffer(u->encoder_info, u->write_index / pa_frame_size(&u->encoder_sample_spec),
ptr, u->write_memchunk.length,
u->encoder_buffer + u->encoder_buffer_used, u->encoder_buffer_size - u->encoder_buffer_used,
&processed);
pa_memblock_release(u->write_memchunk.memblock); pa_memblock_release(u->write_memchunk.memblock);
@ -340,14 +365,7 @@ static int bt_process_render(struct userdata *u) {
if (PA_LIKELY(length)) { if (PA_LIKELY(length)) {
u->encoder_buffer_used += length; u->encoder_buffer_used += length;
ret = 1;
ret = bt_write_buffer(u);
if (ret < 0) {
/* Reset encoder sequence number and buffer positions */
u->bt_codec->reset(u->encoder_info);
u->encoder_buffer_used = 0;
}
} else } else
ret = 0; ret = 0;
@ -1299,21 +1317,20 @@ static int init_profile(struct userdata *u) {
return r; return r;
} }
static int write_block(struct userdata *u) { static int bt_render_block(struct userdata *u) {
int n_written; int n_rendered;
if (u->write_index <= 0) if (u->write_index <= 0)
u->started_at = pa_rtclock_now(); u->started_at = pa_rtclock_now();
n_written = bt_process_render(u); n_rendered = bt_process_render(u);
if (n_written < 0) if (n_rendered < 0)
n_written = -1; n_rendered = -1;
return n_written; return n_rendered;
} }
/* I/O thread function */ /* I/O thread function */
static void thread_func(void *userdata) { static void thread_func(void *userdata) {
struct userdata *u = userdata; struct userdata *u = userdata;
@ -1412,28 +1429,39 @@ static void thread_func(void *userdata) {
* for the sink */ * for the sink */
if (have_source) { if (have_source) {
if (writable && blocks_to_write > 0) { /* If the stream is writable, send some data if necessary */
if (writable) {
int result; int result;
if ((result = write_block(u)) < 0) if (blocks_to_write > 0) {
result = bt_render_block(u);
if (result < 0)
goto fail;
blocks_to_write -= result;
}
result = bt_write_buffer(u);
if (result < 0)
goto fail; goto fail;
blocks_to_write -= result; if (result)
/* writable controls whether we set POLLOUT when polling - we set it to
* false to enable POLLOUT. If there are more blocks to write, we want to
* be woken up immediately when the socket becomes writable. If there
* aren't currently any more blocks to write, then we'll have to wait
* until we've received more data, so in that case we only want to set
* POLLIN. Note that when we are woken up the next time, POLLOUT won't be
* set in revents even if the socket has meanwhile become writable, which
* may seem bad, but in that case we'll set POLLOUT in the subsequent
* poll, and the poll will return immediately, so our writes won't be
* delayed. */
if (blocks_to_write > 0)
writable = false; writable = false;
} }
/* writable controls whether we set POLLOUT when polling - we set it to
* false to enable POLLOUT. If there are more blocks to write, we want to
* be woken up immediately when the socket becomes writable. If there
* aren't currently any more blocks to write, then we'll have to wait
* until we've received more data, so in that case we only want to set
* POLLIN. Note that when we are woken up the next time, POLLOUT won't be
* set in revents even if the socket has meanwhile become writable, which
* may seem bad, but in that case we'll set POLLOUT in the subsequent
* poll, and the poll will return immediately, so our writes won't be
* delayed. */
if (blocks_to_write > 0)
writable = false;
/* There is no source, we have to use the system clock for timing */ /* There is no source, we have to use the system clock for timing */
} else { } else {
bool have_written = false; bool have_written = false;
@ -1494,16 +1522,25 @@ static void thread_func(void *userdata) {
} }
/* If the stream is writable, send some data if necessary */ /* If the stream is writable, send some data if necessary */
if (writable && blocks_to_write > 0) { if (writable) {
int result; int result;
if ((result = write_block(u)) < 0) if (blocks_to_write > 0) {
int result = bt_render_block(u);
if (result < 0)
goto fail;
blocks_to_write -= result;
}
result = bt_write_buffer(u);
if (result < 0)
goto fail; goto fail;
blocks_to_write -= result; if (result) {
writable = false; writable = false;
if (result)
have_written = true; have_written = true;
}
} }
/* If nothing was written during this iteration, either the stream /* If nothing was written during this iteration, either the stream
@ -1521,7 +1558,7 @@ static void thread_func(void *userdata) {
/* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
if ((get_profile_direction(u->profile) & PA_DIRECTION_OUTPUT) && u->write_memchunk.memblock == NULL) { if ((get_profile_direction(u->profile) & PA_DIRECTION_OUTPUT) && u->write_memchunk.memblock == NULL) {
/* write_block() is keeping up with input, try increasing bitrate */ /* bt_write_buffer() is keeping up with input, try increasing bitrate */
if (u->bt_codec->increase_encoder_bitrate if (u->bt_codec->increase_encoder_bitrate
&& pa_timeval_age(&tv_last_output_rate_change) >= u->device->output_rate_refresh_interval_ms * PA_USEC_PER_MSEC) { && pa_timeval_age(&tv_last_output_rate_change) >= u->device->output_rate_refresh_interval_ms * PA_USEC_PER_MSEC) {
size_t new_write_block_size = u->bt_codec->increase_encoder_bitrate(u->encoder_info, u->write_link_mtu); size_t new_write_block_size = u->bt_codec->increase_encoder_bitrate(u->encoder_info, u->write_link_mtu);