diff --git a/src/modules/module-pulse-tunnel.c b/src/modules/module-pulse-tunnel.c index 903c0162c..fe122ff8a 100644 --- a/src/modules/module-pulse-tunnel.c +++ b/src/modules/module-pulse-tunnel.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -85,7 +86,7 @@ static const struct spa_dict_item module_props[] = { #define RINGBUFFER_SIZE (1u << 22) #define RINGBUFFER_MASK (RINGBUFFER_SIZE-1) -#define DEFAULT_LATENCY_MSEC (100) +#define DEFAULT_LATENCY_MSEC (200) struct impl { struct pw_context *context; @@ -108,7 +109,6 @@ struct impl { struct pw_properties *stream_props; struct pw_stream *stream; struct spa_hook stream_listener; - struct spa_io_rate_match *rate_match; struct spa_audio_info_raw info; uint32_t frame_size; @@ -120,6 +120,10 @@ struct impl { pa_context *pa_context; pa_stream *pa_stream; + uint32_t target_buffer; + struct spa_dll dll; + float max_error; + unsigned int do_disconnect:1; }; @@ -189,13 +193,22 @@ static void playback_stream_process(void *d) size = SPA_MIN(bd->chunk->size, RINGBUFFER_SIZE); filled = spa_ringbuffer_get_write_index(&impl->ring, &write_index); + if (filled < 0) { pw_log_warn("%p: underrun write:%u filled:%d", impl, write_index, filled); } else if ((uint32_t)filled + size > RINGBUFFER_SIZE) { - pw_log_debug("%p: overrun write:%u filled:%d size:%u max:%u", + pw_log_warn("%p: overrun write:%u filled:%d size:%u max:%u", impl, write_index, filled, size, RINGBUFFER_SIZE); + } else { + float error, corr; + error = (float)filled - (float)impl->target_buffer; + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + + corr = spa_dll_update(&impl->dll, error); + pw_stream_set_control(impl->stream, + SPA_PROP_rate, 1, &corr, NULL); } spa_ringbuffer_write_data(&impl->ring, impl->buffer, RINGBUFFER_SIZE, @@ -223,16 +236,16 @@ static void capture_stream_process(void *d) bd = &buf->buffer->datas[0]; - if (impl->rate_match) - req = impl->rate_match->size * impl->frame_size; - else - req = 4096; + if ((req = buf->requested * impl->frame_size) == 0) + req = 4096 * impl->frame_size; avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index); if (avail <= 0) { size = SPA_MIN(bd->maxsize, req); memset(bd->data, 0, size); } else { + float error, corr; + size = SPA_MIN(bd->maxsize, (uint32_t)avail); size = SPA_MIN(size, req); @@ -243,6 +256,13 @@ static void capture_stream_process(void *d) read_index += size; spa_ringbuffer_read_update(&impl->ring, read_index); + + error = (float)impl->target_buffer - (float)avail; + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + + corr = spa_dll_update(&impl->dll, error); + pw_stream_set_control(impl->stream, + SPA_PROP_rate, 1, &corr, NULL); } bd->chunk->offset = 0; bd->chunk->size = size; @@ -250,16 +270,6 @@ static void capture_stream_process(void *d) pw_stream_queue_buffer(impl->stream, buf); } -static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) -{ - struct impl *impl = data; - switch (id) { - case SPA_IO_RateMatch: - impl->rate_match = area; - break; - } -} - static const struct pw_stream_events playback_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, @@ -271,7 +281,6 @@ static const struct pw_stream_events capture_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = stream_state_changed, - .io_changed = stream_io_changed, .process = capture_stream_process }; @@ -300,8 +309,6 @@ static int create_stream(struct impl *impl) &playback_stream_events, impl); } - impl->frame_size = 2 * 2; - n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_format_audio_raw_build(&b, @@ -472,6 +479,7 @@ static int create_pulse_stream(struct impl *impl) char stream_name[1024]; pa_buffer_attr bufferattr; int res = -EIO; + uint32_t latency_bytes; if ((impl->pa_mainloop = pa_threaded_mainloop_new()) == NULL) goto error; @@ -538,19 +546,26 @@ static int create_pulse_stream(struct impl *impl) bufferattr.maxlength = (uint32_t) -1; bufferattr.prebuf = (uint32_t) -1; + latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); + + /* half in our buffer, half in the network + remote */ + impl->target_buffer = latency_bytes / 2; + if (impl->mode == MODE_CAPTURE) { - bufferattr.fragsize = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); + bufferattr.fragsize = latency_bytes / 2; res = pa_stream_connect_record(impl->pa_stream, remote_node_target, &bufferattr, + PA_STREAM_DONT_MOVE | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE); } else { - bufferattr.tlength = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss); + bufferattr.tlength = latency_bytes / 2; res = pa_stream_connect_playback(impl->pa_stream, remote_node_target, &bufferattr, + PA_STREAM_DONT_MOVE | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE, @@ -713,6 +728,41 @@ static void parse_audio_info(struct pw_properties *props, struct spa_audio_info_ info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) parse_position(info, str, strlen(str)); + +} + +static int calc_frame_size(struct spa_audio_info_raw *info) +{ + int res = info->channels; + switch (info->format) { + case SPA_AUDIO_FORMAT_U8: + case SPA_AUDIO_FORMAT_S8: + case SPA_AUDIO_FORMAT_ALAW: + case SPA_AUDIO_FORMAT_ULAW: + return res; + case SPA_AUDIO_FORMAT_S16: + case SPA_AUDIO_FORMAT_S16_OE: + case SPA_AUDIO_FORMAT_U16: + return res * 2; + case SPA_AUDIO_FORMAT_S24: + case SPA_AUDIO_FORMAT_S24_OE: + case SPA_AUDIO_FORMAT_U24: + return res * 3; + case SPA_AUDIO_FORMAT_S24_32: + case SPA_AUDIO_FORMAT_S24_32_OE: + case SPA_AUDIO_FORMAT_S32: + case SPA_AUDIO_FORMAT_S32_OE: + case SPA_AUDIO_FORMAT_U32: + case SPA_AUDIO_FORMAT_U32_OE: + case SPA_AUDIO_FORMAT_F32: + case SPA_AUDIO_FORMAT_F32_OE: + return res * 4; + case SPA_AUDIO_FORMAT_F64: + case SPA_AUDIO_FORMAT_F64_OE: + return res * 8; + default: + return 0; + } } static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) @@ -764,6 +814,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) spa_ringbuffer_init(&impl->ring); impl->buffer = calloc(1, RINGBUFFER_SIZE); + spa_dll_init(&impl->dll); if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) { if (spa_streq(str, "capture")) { @@ -779,6 +830,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->latency_msec = pw_properties_get_uint32(props, "pulse.latency", DEFAULT_LATENCY_MSEC); + if (pw_properties_get(props, PW_KEY_NODE_WANT_DRIVER) == NULL) pw_properties_set(props, PW_KEY_NODE_WANT_DRIVER, "true"); if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) @@ -808,6 +860,16 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) parse_audio_info(impl->stream_props, &impl->info); + impl->frame_size = calc_frame_size(&impl->info); + if (impl->frame_size == 0) { + pw_log_error("unsupported audio format:%d channels:%d", + impl->info.format, impl->info.channels); + res = -EINVAL; + goto error; + } + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->info.rate); + impl->max_error = 256.0f * impl->frame_size; + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME);