diff --git a/src/modules/module-pulse-tunnel.c b/src/modules/module-pulse-tunnel.c index 2d930ba8a..da2f6ed37 100644 --- a/src/modules/module-pulse-tunnel.c +++ b/src/modules/module-pulse-tunnel.c @@ -120,9 +120,12 @@ struct impl { pa_context *pa_context; pa_stream *pa_stream; + uint32_t target_latency; + uint32_t current_latency; uint32_t target_buffer; struct spa_dll dll; float max_error; + unsigned resync:1; unsigned int do_disconnect:1; }; @@ -141,8 +144,12 @@ static void cork_stream(struct impl *impl, bool cork) * played at the time when the sink starts running again. */ if ((operation = pa_stream_flush(impl->pa_stream, NULL, NULL))) pa_operation_unref(operation); + spa_ringbuffer_init(&impl->ring); } + if (!cork) + impl->resync = true; + if ((operation = pa_stream_cork(impl->pa_stream, cork, NULL, NULL))) pa_operation_unref(operation); @@ -197,25 +204,23 @@ static void playback_stream_process(void *d) if (filled < 0) { pw_log_warn("%p: underrun write:%u filled:%d", impl, write_index, filled); + } else if ((uint32_t)filled + size > RINGBUFFER_SIZE) { + pw_log_warn("%p: overrun write:%u filled:%d + size:%u > max:%u", + impl, write_index, filled, + size, RINGBUFFER_SIZE); + impl->resync = true; } else { float error, corr; - if ((uint32_t)filled + size > impl->target_buffer * 2) { - pw_log_warn("%p: overrun write:%u filled:%d + size:%u > %u max:%u", - impl, write_index, filled, - size, impl->target_buffer * 2, RINGBUFFER_SIZE); - write_index -= impl->target_buffer; - filled -= impl->target_buffer; - } else { - error = (float)filled - (float)impl->target_buffer; - error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + error = (float)(impl->current_latency) - (float)impl->target_latency; + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); - pw_log_debug("filled:%u target:%u error:%f corr:%f", filled, - impl->target_buffer, error, corr); - corr = spa_dll_update(&impl->dll, error); - pw_stream_set_control(impl->stream, - SPA_PROP_rate, 1, &corr, NULL); - } + corr = spa_dll_update(&impl->dll, error); + pw_log_info("filled:%u target:%u error:%f corr:%f %u %u", filled, + impl->target_buffer, error, corr, + impl->current_latency, impl->target_latency); + pw_stream_set_control(impl->stream, + SPA_PROP_rate, 1, &corr, NULL); } spa_ringbuffer_write_data(&impl->ring, impl->buffer, RINGBUFFER_SIZE, @@ -234,7 +239,7 @@ static void capture_stream_process(void *d) struct pw_buffer *buf; struct spa_data *bd; int32_t avail; - uint32_t size, req, read_index; + uint32_t size, req, index; if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("out of buffers: %m"); @@ -246,42 +251,41 @@ static void capture_stream_process(void *d) if ((req = buf->requested * impl->frame_size) == 0) req = 4096 * impl->frame_size; - avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index); - if (avail <= 0) { - size = SPA_MIN(bd->maxsize, req); + size = SPA_MIN(bd->maxsize, req); + + avail = spa_ringbuffer_get_read_index(&impl->ring, &index); + if (avail < (int32_t)size) { memset(bd->data, 0, size); } else { float error, corr; - if (avail > (int32_t)impl->target_buffer * 2) { - avail -= impl->target_buffer; - read_index += impl->target_buffer; + if (avail > (int32_t)RINGBUFFER_SIZE) { + avail = impl->target_buffer; + index += avail - impl->target_buffer; } else { - error = (float)impl->target_buffer - (float)avail; + error = (float)(impl->current_latency) - (float)impl->target_latency; error = SPA_CLAMP(error, -impl->max_error, impl->max_error); corr = spa_dll_update(&impl->dll, error); - pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, - impl->target_buffer, error, corr); + pw_log_info("avail:%u target:%u error:%f corr:%f %u %u", avail, + impl->target_buffer, error, corr, + impl->current_latency, impl->target_latency); pw_stream_set_control(impl->stream, SPA_PROP_rate, 1, &corr, NULL); } - size = SPA_MIN(bd->maxsize, (uint32_t)avail); - size = SPA_MIN(size, req); - spa_ringbuffer_read_data(&impl->ring, impl->buffer, RINGBUFFER_SIZE, - read_index & RINGBUFFER_MASK, + index & RINGBUFFER_MASK, bd->data, size); - read_index += size; - spa_ringbuffer_read_update(&impl->ring, read_index); - + index += size; + spa_ringbuffer_read_update(&impl->ring, index); } bd->chunk->offset = 0; bd->chunk->size = size; + bd->chunk->stride = impl->frame_size; pw_stream_queue_buffer(impl->stream, buf); } @@ -385,16 +389,18 @@ static void stream_read_request_cb(pa_stream *s, size_t length, void *userdata) { struct impl *impl = userdata; int32_t filled; - uint32_t write_index; + uint32_t index; + pa_usec_t latency; + int negative; - filled = spa_ringbuffer_get_write_index(&impl->ring, &write_index); + filled = spa_ringbuffer_get_write_index(&impl->ring, &index); if (filled < 0) { pw_log_warn("%p: underrun write:%u filled:%d", - impl, write_index, filled); + impl, index, filled); } else if (filled + length > RINGBUFFER_SIZE) { pw_log_warn("%p: overrun write:%u filled:%d", - impl, write_index, filled); + impl, index, filled); } while (length > 0) { const void *p; @@ -415,26 +421,45 @@ static void stream_read_request_cb(pa_stream *s, size_t length, void *userdata) spa_ringbuffer_write_data(&impl->ring, impl->buffer, RINGBUFFER_SIZE, - write_index & RINGBUFFER_MASK, + index & RINGBUFFER_MASK, p ? p : impl->empty, to_write); - write_index += to_write; + index += to_write; p = p ? SPA_PTROFF(p, to_write, void) : NULL; nbytes -= to_write; length -= to_write; + filled += to_write; } pa_stream_drop(impl->pa_stream); } - spa_ringbuffer_write_update(&impl->ring, write_index); + + pa_stream_get_latency(impl->pa_stream, &latency, &negative); + impl->current_latency = latency * impl->info.rate / SPA_USEC_PER_SEC; + impl->current_latency += filled / impl->frame_size; + + spa_ringbuffer_write_update(&impl->ring, index); } static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata) { struct impl *impl = userdata; int32_t avail; - uint32_t read_index, len, offset, l0, l1; + uint32_t index, len, offset, l0, l1; + pa_usec_t latency; + int negative; - avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index); + if (impl->resync) { + impl->resync = false; + avail = length + impl->target_buffer; + spa_ringbuffer_get_write_index(&impl->ring, &index); + index -= avail; + } else { + avail = spa_ringbuffer_get_read_index(&impl->ring, &index); + } + + pa_stream_get_latency(impl->pa_stream, &latency, &negative); + impl->current_latency = latency * impl->info.rate / SPA_USEC_PER_SEC; + impl->current_latency += avail / impl->frame_size; while (avail < (int32_t)length) { /* send silence for the data we don't have */ @@ -447,7 +472,7 @@ static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata) if (length > 0 && avail >= (int32_t)length) { /* always send as much as is requested */ len = length; - offset = read_index & RINGBUFFER_MASK; + offset = index & RINGBUFFER_MASK; l0 = SPA_MIN(len, RINGBUFFER_SIZE - offset); l1 = len - l0; @@ -460,10 +485,22 @@ static void stream_write_request_cb(pa_stream *s, size_t length, void *userdata) impl->buffer, l1, NULL, 0, PA_SEEK_RELATIVE); } - read_index += len; - spa_ringbuffer_read_update(&impl->ring, read_index); + index += len; + spa_ringbuffer_read_update(&impl->ring, index); } } +static void stream_underflow_cb(pa_stream *s, void *userdata) +{ + struct impl *impl = userdata; + pw_log_info("underflow"); + impl->resync = true; +} +static void stream_overflow_cb(pa_stream *s, void *userdata) +{ + struct impl *impl = userdata; + pw_log_info("underflow"); + impl->resync = true; +} static void stream_latency_update_cb(pa_stream *s, void *userdata) { @@ -553,6 +590,8 @@ static int create_pulse_stream(struct impl *impl) pa_stream_set_state_callback(impl->pa_stream, stream_state_cb, impl); pa_stream_set_read_callback(impl->pa_stream, stream_read_request_cb, impl); pa_stream_set_write_callback(impl->pa_stream, stream_write_request_cb, impl); + pa_stream_set_underflow_callback(impl->pa_stream, stream_underflow_cb, impl); + pa_stream_set_overflow_callback(impl->pa_stream, stream_overflow_cb, impl); pa_stream_set_latency_update_callback(impl->pa_stream, stream_latency_update_cb, impl); remote_node_target = pw_properties_get(impl->props, PW_KEY_NODE_TARGET); @@ -564,6 +603,8 @@ static int create_pulse_stream(struct impl *impl) latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); + impl->target_latency = latency_bytes / impl->frame_size; + /* half in our buffer, half in the network + remote */ impl->target_buffer = latency_bytes / 2; @@ -884,7 +925,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto error; } spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->info.rate); - impl->max_error = 256.0f * impl->frame_size; + impl->max_error = 256.0f; impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) {