module-pulse-tunnel: improve error recovery

Track the end-to-end latency of the stream and use that to drive
the resampler.
Hard reset the ringbuffer when under/overflow happens so that we
can recover quickly.

See #2230
This commit is contained in:
Wim Taymans 2022-04-19 09:41:28 +02:00
parent c73c852413
commit cdbdcd6771

View file

@ -120,9 +120,12 @@ struct impl {
pa_context *pa_context; pa_context *pa_context;
pa_stream *pa_stream; pa_stream *pa_stream;
uint32_t target_latency;
uint32_t current_latency;
uint32_t target_buffer; uint32_t target_buffer;
struct spa_dll dll; struct spa_dll dll;
float max_error; float max_error;
unsigned resync:1;
unsigned int do_disconnect:1; unsigned int do_disconnect:1;
}; };
@ -141,8 +144,12 @@ static void cork_stream(struct impl *impl, bool cork)
* played at the time when the sink starts running again. */ * played at the time when the sink starts running again. */
if ((operation = pa_stream_flush(impl->pa_stream, NULL, NULL))) if ((operation = pa_stream_flush(impl->pa_stream, NULL, NULL)))
pa_operation_unref(operation); pa_operation_unref(operation);
spa_ringbuffer_init(&impl->ring); spa_ringbuffer_init(&impl->ring);
} }
if (!cork)
impl->resync = true;
if ((operation = pa_stream_cork(impl->pa_stream, cork, NULL, NULL))) if ((operation = pa_stream_cork(impl->pa_stream, cork, NULL, NULL)))
pa_operation_unref(operation); pa_operation_unref(operation);
@ -197,26 +204,24 @@ static void playback_stream_process(void *d)
if (filled < 0) { if (filled < 0) {
pw_log_warn("%p: underrun write:%u filled:%d", pw_log_warn("%p: underrun write:%u filled:%d",
impl, write_index, filled); impl, write_index, filled);
} else if ((uint32_t)filled + size > RINGBUFFER_SIZE) {
pw_log_warn("%p: overrun write:%u filled:%d + size:%u > max:%u",
impl, write_index, filled,
size, RINGBUFFER_SIZE);
impl->resync = true;
} else { } else {
float error, corr; float error, corr;
if ((uint32_t)filled + size > impl->target_buffer * 2) { error = (float)(impl->current_latency) - (float)impl->target_latency;
pw_log_warn("%p: overrun write:%u filled:%d + size:%u > %u max:%u",
impl, write_index, filled,
size, impl->target_buffer * 2, RINGBUFFER_SIZE);
write_index -= impl->target_buffer;
filled -= impl->target_buffer;
} else {
error = (float)filled - (float)impl->target_buffer;
error = SPA_CLAMP(error, -impl->max_error, impl->max_error); error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
pw_log_debug("filled:%u target:%u error:%f corr:%f", filled,
impl->target_buffer, error, corr);
corr = spa_dll_update(&impl->dll, error); corr = spa_dll_update(&impl->dll, error);
pw_log_info("filled:%u target:%u error:%f corr:%f %u %u", filled,
impl->target_buffer, error, corr,
impl->current_latency, impl->target_latency);
pw_stream_set_control(impl->stream, pw_stream_set_control(impl->stream,
SPA_PROP_rate, 1, &corr, NULL); SPA_PROP_rate, 1, &corr, NULL);
} }
}
spa_ringbuffer_write_data(&impl->ring, spa_ringbuffer_write_data(&impl->ring,
impl->buffer, RINGBUFFER_SIZE, impl->buffer, RINGBUFFER_SIZE,
write_index & RINGBUFFER_MASK, write_index & RINGBUFFER_MASK,
@ -234,7 +239,7 @@ static void capture_stream_process(void *d)
struct pw_buffer *buf; struct pw_buffer *buf;
struct spa_data *bd; struct spa_data *bd;
int32_t avail; int32_t avail;
uint32_t size, req, read_index; uint32_t size, req, index;
if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) {
pw_log_debug("out of buffers: %m"); pw_log_debug("out of buffers: %m");
@ -246,42 +251,41 @@ static void capture_stream_process(void *d)
if ((req = buf->requested * impl->frame_size) == 0) if ((req = buf->requested * impl->frame_size) == 0)
req = 4096 * impl->frame_size; req = 4096 * impl->frame_size;
avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index);
if (avail <= 0) {
size = SPA_MIN(bd->maxsize, req); size = SPA_MIN(bd->maxsize, req);
avail = spa_ringbuffer_get_read_index(&impl->ring, &index);
if (avail < (int32_t)size) {
memset(bd->data, 0, size); memset(bd->data, 0, size);
} else { } else {
float error, corr; float error, corr;
if (avail > (int32_t)impl->target_buffer * 2) { if (avail > (int32_t)RINGBUFFER_SIZE) {
avail -= impl->target_buffer; avail = impl->target_buffer;
read_index += impl->target_buffer; index += avail - impl->target_buffer;
} else { } else {
error = (float)impl->target_buffer - (float)avail; error = (float)(impl->current_latency) - (float)impl->target_latency;
error = SPA_CLAMP(error, -impl->max_error, impl->max_error); error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
corr = spa_dll_update(&impl->dll, error); corr = spa_dll_update(&impl->dll, error);
pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, pw_log_info("avail:%u target:%u error:%f corr:%f %u %u", avail,
impl->target_buffer, error, corr); impl->target_buffer, error, corr,
impl->current_latency, impl->target_latency);
pw_stream_set_control(impl->stream, pw_stream_set_control(impl->stream,
SPA_PROP_rate, 1, &corr, NULL); SPA_PROP_rate, 1, &corr, NULL);
} }
size = SPA_MIN(bd->maxsize, (uint32_t)avail);
size = SPA_MIN(size, req);
spa_ringbuffer_read_data(&impl->ring, spa_ringbuffer_read_data(&impl->ring,
impl->buffer, RINGBUFFER_SIZE, impl->buffer, RINGBUFFER_SIZE,
read_index & RINGBUFFER_MASK, index & RINGBUFFER_MASK,
bd->data, size); bd->data, size);
read_index += size; index += size;
spa_ringbuffer_read_update(&impl->ring, read_index); spa_ringbuffer_read_update(&impl->ring, index);
} }
bd->chunk->offset = 0; bd->chunk->offset = 0;
bd->chunk->size = size; bd->chunk->size = size;
bd->chunk->stride = impl->frame_size;
pw_stream_queue_buffer(impl->stream, buf); pw_stream_queue_buffer(impl->stream, buf);
} }
@ -385,16 +389,18 @@ static void stream_read_request_cb(pa_stream *s, size_t length, void *userdata)
{ {
struct impl *impl = userdata; struct impl *impl = userdata;
int32_t filled; int32_t filled;
uint32_t write_index; uint32_t index;
pa_usec_t latency;
int negative;
filled = spa_ringbuffer_get_write_index(&impl->ring, &write_index); filled = spa_ringbuffer_get_write_index(&impl->ring, &index);
if (filled < 0) { if (filled < 0) {
pw_log_warn("%p: underrun write:%u filled:%d", pw_log_warn("%p: underrun write:%u filled:%d",
impl, write_index, filled); impl, index, filled);
} else if (filled + length > RINGBUFFER_SIZE) { } else if (filled + length > RINGBUFFER_SIZE) {
pw_log_warn("%p: overrun write:%u filled:%d", pw_log_warn("%p: overrun write:%u filled:%d",
impl, write_index, filled); impl, index, filled);
} }
while (length > 0) { while (length > 0) {
const void *p; const void *p;
@ -415,26 +421,45 @@ static void stream_read_request_cb(pa_stream *s, size_t length, void *userdata)
spa_ringbuffer_write_data(&impl->ring, spa_ringbuffer_write_data(&impl->ring,
impl->buffer, RINGBUFFER_SIZE, impl->buffer, RINGBUFFER_SIZE,
write_index & RINGBUFFER_MASK, index & RINGBUFFER_MASK,
p ? p : impl->empty, to_write); p ? p : impl->empty, to_write);
write_index += to_write; index += to_write;
p = p ? SPA_PTROFF(p, to_write, void) : NULL; p = p ? SPA_PTROFF(p, to_write, void) : NULL;
nbytes -= to_write; nbytes -= to_write;
length -= to_write; length -= to_write;
filled += to_write;
} }
pa_stream_drop(impl->pa_stream); pa_stream_drop(impl->pa_stream);
} }
spa_ringbuffer_write_update(&impl->ring, write_index);
pa_stream_get_latency(impl->pa_stream, &latency, &negative);
impl->current_latency = latency * impl->info.rate / SPA_USEC_PER_SEC;
impl->current_latency += filled / impl->frame_size;
spa_ringbuffer_write_update(&impl->ring, index);
} }
static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata) static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata)
{ {
struct impl *impl = userdata; struct impl *impl = userdata;
int32_t avail; int32_t avail;
uint32_t read_index, len, offset, l0, l1; uint32_t index, len, offset, l0, l1;
pa_usec_t latency;
int negative;
avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index); if (impl->resync) {
impl->resync = false;
avail = length + impl->target_buffer;
spa_ringbuffer_get_write_index(&impl->ring, &index);
index -= avail;
} else {
avail = spa_ringbuffer_get_read_index(&impl->ring, &index);
}
pa_stream_get_latency(impl->pa_stream, &latency, &negative);
impl->current_latency = latency * impl->info.rate / SPA_USEC_PER_SEC;
impl->current_latency += avail / impl->frame_size;
while (avail < (int32_t)length) { while (avail < (int32_t)length) {
/* send silence for the data we don't have */ /* send silence for the data we don't have */
@ -447,7 +472,7 @@ static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata)
if (length > 0 && avail >= (int32_t)length) { if (length > 0 && avail >= (int32_t)length) {
/* always send as much as is requested */ /* always send as much as is requested */
len = length; len = length;
offset = read_index & RINGBUFFER_MASK; offset = index & RINGBUFFER_MASK;
l0 = SPA_MIN(len, RINGBUFFER_SIZE - offset); l0 = SPA_MIN(len, RINGBUFFER_SIZE - offset);
l1 = len - l0; l1 = len - l0;
@ -460,10 +485,22 @@ static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata)
impl->buffer, l1, impl->buffer, l1,
NULL, 0, PA_SEEK_RELATIVE); NULL, 0, PA_SEEK_RELATIVE);
} }
read_index += len; index += len;
spa_ringbuffer_read_update(&impl->ring, read_index); spa_ringbuffer_read_update(&impl->ring, index);
} }
} }
static void stream_underflow_cb(pa_stream *s, void *userdata)
{
struct impl *impl = userdata;
pw_log_info("underflow");
impl->resync = true;
}
static void stream_overflow_cb(pa_stream *s, void *userdata)
{
struct impl *impl = userdata;
pw_log_info("underflow");
impl->resync = true;
}
static void stream_latency_update_cb(pa_stream *s, void *userdata) static void stream_latency_update_cb(pa_stream *s, void *userdata)
{ {
@ -553,6 +590,8 @@ static int create_pulse_stream(struct impl *impl)
pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl); pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl);
pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl); pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl);
pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl); pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl);
pa_stream_set_underflow_callback(impl->pa_stream, stream_underflow_cb, impl);
pa_stream_set_overflow_callback(impl->pa_stream, stream_overflow_cb, impl);
pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl); pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl);
remote_node_target = pw_properties_get(impl->props, PW_KEY_NODE_TARGET); remote_node_target = pw_properties_get(impl->props, PW_KEY_NODE_TARGET);
@ -564,6 +603,8 @@ static int create_pulse_stream(struct impl *impl)
latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss);
impl->target_latency = latency_bytes / impl->frame_size;
/* half in our buffer, half in the network + remote */ /* half in our buffer, half in the network + remote */
impl->target_buffer = latency_bytes / 2; impl->target_buffer = latency_bytes / 2;
@ -884,7 +925,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error; goto error;
} }
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->info.rate); spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->info.rate);
impl->max_error = 256.0f * impl->frame_size; impl->max_error = 256.0f;
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) { if (impl->core == NULL) {