mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-06 13:30:01 -05:00
module-pulse-tunnel: use dll to keep latency under control
Assign half the latency to the internal ringbuffer and half on the network and remote end. Use a dll to calculate the drift from our target ringbuffer fill level and use the stream rate property to driver the resampler. This should reduce uncontrolled latency over the tunnel. PulseAudio wants us to be a driver will pull requests from the remote side. We would need to provide a clock based on the remote end and also try to follow it when we are not a driver. It would be slightly better because in the normal playback case we would be able to avoid resampling. We might do this eventually.
This commit is contained in:
parent
1bf1497855
commit
da6687e6ec
1 changed files with 84 additions and 22 deletions
|
|
@ -40,6 +40,7 @@
|
||||||
#include <spa/utils/string.h>
|
#include <spa/utils/string.h>
|
||||||
#include <spa/utils/json.h>
|
#include <spa/utils/json.h>
|
||||||
#include <spa/utils/ringbuffer.h>
|
#include <spa/utils/ringbuffer.h>
|
||||||
|
#include <spa/utils/dll.h>
|
||||||
#include <spa/debug/pod.h>
|
#include <spa/debug/pod.h>
|
||||||
#include <spa/pod/builder.h>
|
#include <spa/pod/builder.h>
|
||||||
#include <spa/param/audio/format-utils.h>
|
#include <spa/param/audio/format-utils.h>
|
||||||
|
|
@ -85,7 +86,7 @@ static const struct spa_dict_item module_props[] = {
|
||||||
#define RINGBUFFER_SIZE (1u << 22)
|
#define RINGBUFFER_SIZE (1u << 22)
|
||||||
#define RINGBUFFER_MASK (RINGBUFFER_SIZE-1)
|
#define RINGBUFFER_MASK (RINGBUFFER_SIZE-1)
|
||||||
|
|
||||||
#define DEFAULT_LATENCY_MSEC (100)
|
#define DEFAULT_LATENCY_MSEC (200)
|
||||||
|
|
||||||
struct impl {
|
struct impl {
|
||||||
struct pw_context *context;
|
struct pw_context *context;
|
||||||
|
|
@ -108,7 +109,6 @@ struct impl {
|
||||||
struct pw_properties *stream_props;
|
struct pw_properties *stream_props;
|
||||||
struct pw_stream *stream;
|
struct pw_stream *stream;
|
||||||
struct spa_hook stream_listener;
|
struct spa_hook stream_listener;
|
||||||
struct spa_io_rate_match *rate_match;
|
|
||||||
struct spa_audio_info_raw info;
|
struct spa_audio_info_raw info;
|
||||||
uint32_t frame_size;
|
uint32_t frame_size;
|
||||||
|
|
||||||
|
|
@ -120,6 +120,10 @@ struct impl {
|
||||||
pa_context *pa_context;
|
pa_context *pa_context;
|
||||||
pa_stream *pa_stream;
|
pa_stream *pa_stream;
|
||||||
|
|
||||||
|
uint32_t target_buffer;
|
||||||
|
struct spa_dll dll;
|
||||||
|
float max_error;
|
||||||
|
|
||||||
unsigned int do_disconnect:1;
|
unsigned int do_disconnect:1;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -189,13 +193,22 @@ static void playback_stream_process(void *d)
|
||||||
size = SPA_MIN(bd->chunk->size, RINGBUFFER_SIZE);
|
size = SPA_MIN(bd->chunk->size, RINGBUFFER_SIZE);
|
||||||
|
|
||||||
filled = spa_ringbuffer_get_write_index(&impl->ring, &write_index);
|
filled = spa_ringbuffer_get_write_index(&impl->ring, &write_index);
|
||||||
|
|
||||||
if (filled < 0) {
|
if (filled < 0) {
|
||||||
pw_log_warn("%p: underrun write:%u filled:%d",
|
pw_log_warn("%p: underrun write:%u filled:%d",
|
||||||
impl, write_index, filled);
|
impl, write_index, filled);
|
||||||
} else if ((uint32_t)filled + size > RINGBUFFER_SIZE) {
|
} else if ((uint32_t)filled + size > RINGBUFFER_SIZE) {
|
||||||
pw_log_debug("%p: overrun write:%u filled:%d size:%u max:%u",
|
pw_log_warn("%p: overrun write:%u filled:%d size:%u max:%u",
|
||||||
impl, write_index, filled,
|
impl, write_index, filled,
|
||||||
size, RINGBUFFER_SIZE);
|
size, RINGBUFFER_SIZE);
|
||||||
|
} else {
|
||||||
|
float error, corr;
|
||||||
|
error = (float)filled - (float)impl->target_buffer;
|
||||||
|
error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
|
||||||
|
|
||||||
|
corr = spa_dll_update(&impl->dll, error);
|
||||||
|
pw_stream_set_control(impl->stream,
|
||||||
|
SPA_PROP_rate, 1, &corr, NULL);
|
||||||
}
|
}
|
||||||
spa_ringbuffer_write_data(&impl->ring,
|
spa_ringbuffer_write_data(&impl->ring,
|
||||||
impl->buffer, RINGBUFFER_SIZE,
|
impl->buffer, RINGBUFFER_SIZE,
|
||||||
|
|
@ -223,16 +236,16 @@ static void capture_stream_process(void *d)
|
||||||
|
|
||||||
bd = &buf->buffer->datas[0];
|
bd = &buf->buffer->datas[0];
|
||||||
|
|
||||||
if (impl->rate_match)
|
if ((req = buf->requested * impl->frame_size) == 0)
|
||||||
req = impl->rate_match->size * impl->frame_size;
|
req = 4096 * impl->frame_size;
|
||||||
else
|
|
||||||
req = 4096;
|
|
||||||
|
|
||||||
avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index);
|
avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index);
|
||||||
if (avail <= 0) {
|
if (avail <= 0) {
|
||||||
size = SPA_MIN(bd->maxsize, req);
|
size = SPA_MIN(bd->maxsize, req);
|
||||||
memset(bd->data, 0, size);
|
memset(bd->data, 0, size);
|
||||||
} else {
|
} else {
|
||||||
|
float error, corr;
|
||||||
|
|
||||||
size = SPA_MIN(bd->maxsize, (uint32_t)avail);
|
size = SPA_MIN(bd->maxsize, (uint32_t)avail);
|
||||||
size = SPA_MIN(size, req);
|
size = SPA_MIN(size, req);
|
||||||
|
|
||||||
|
|
@ -243,6 +256,13 @@ static void capture_stream_process(void *d)
|
||||||
|
|
||||||
read_index += size;
|
read_index += size;
|
||||||
spa_ringbuffer_read_update(&impl->ring, read_index);
|
spa_ringbuffer_read_update(&impl->ring, read_index);
|
||||||
|
|
||||||
|
error = (float)impl->target_buffer - (float)avail;
|
||||||
|
error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
|
||||||
|
|
||||||
|
corr = spa_dll_update(&impl->dll, error);
|
||||||
|
pw_stream_set_control(impl->stream,
|
||||||
|
SPA_PROP_rate, 1, &corr, NULL);
|
||||||
}
|
}
|
||||||
bd->chunk->offset = 0;
|
bd->chunk->offset = 0;
|
||||||
bd->chunk->size = size;
|
bd->chunk->size = size;
|
||||||
|
|
@ -250,16 +270,6 @@ static void capture_stream_process(void *d)
|
||||||
pw_stream_queue_buffer(impl->stream, buf);
|
pw_stream_queue_buffer(impl->stream, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size)
|
|
||||||
{
|
|
||||||
struct impl *impl = data;
|
|
||||||
switch (id) {
|
|
||||||
case SPA_IO_RateMatch:
|
|
||||||
impl->rate_match = area;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static const struct pw_stream_events playback_stream_events = {
|
static const struct pw_stream_events playback_stream_events = {
|
||||||
PW_VERSION_STREAM_EVENTS,
|
PW_VERSION_STREAM_EVENTS,
|
||||||
.destroy = stream_destroy,
|
.destroy = stream_destroy,
|
||||||
|
|
@ -271,7 +281,6 @@ static const struct pw_stream_events capture_stream_events = {
|
||||||
PW_VERSION_STREAM_EVENTS,
|
PW_VERSION_STREAM_EVENTS,
|
||||||
.destroy = stream_destroy,
|
.destroy = stream_destroy,
|
||||||
.state_changed = stream_state_changed,
|
.state_changed = stream_state_changed,
|
||||||
.io_changed = stream_io_changed,
|
|
||||||
.process = capture_stream_process
|
.process = capture_stream_process
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -300,8 +309,6 @@ static int create_stream(struct impl *impl)
|
||||||
&playback_stream_events, impl);
|
&playback_stream_events, impl);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl->frame_size = 2 * 2;
|
|
||||||
|
|
||||||
n_params = 0;
|
n_params = 0;
|
||||||
spa_pod_builder_init(&b, buffer, sizeof(buffer));
|
spa_pod_builder_init(&b, buffer, sizeof(buffer));
|
||||||
params[n_params++] = spa_format_audio_raw_build(&b,
|
params[n_params++] = spa_format_audio_raw_build(&b,
|
||||||
|
|
@ -472,6 +479,7 @@ static int create_pulse_stream(struct impl *impl)
|
||||||
char stream_name[1024];
|
char stream_name[1024];
|
||||||
pa_buffer_attr bufferattr;
|
pa_buffer_attr bufferattr;
|
||||||
int res = -EIO;
|
int res = -EIO;
|
||||||
|
uint32_t latency_bytes;
|
||||||
|
|
||||||
if ((impl->pa_mainloop = pa_threaded_mainloop_new()) == NULL)
|
if ((impl->pa_mainloop = pa_threaded_mainloop_new()) == NULL)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
@ -538,19 +546,26 @@ static int create_pulse_stream(struct impl *impl)
|
||||||
bufferattr.maxlength = (uint32_t) -1;
|
bufferattr.maxlength = (uint32_t) -1;
|
||||||
bufferattr.prebuf = (uint32_t) -1;
|
bufferattr.prebuf = (uint32_t) -1;
|
||||||
|
|
||||||
|
latency_bytes = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss);
|
||||||
|
|
||||||
|
/* half in our buffer, half in the network + remote */
|
||||||
|
impl->target_buffer = latency_bytes / 2;
|
||||||
|
|
||||||
if (impl->mode == MODE_CAPTURE) {
|
if (impl->mode == MODE_CAPTURE) {
|
||||||
bufferattr.fragsize = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss);
|
bufferattr.fragsize = latency_bytes / 2;
|
||||||
|
|
||||||
res = pa_stream_connect_record(impl->pa_stream,
|
res = pa_stream_connect_record(impl->pa_stream,
|
||||||
remote_node_target, &bufferattr,
|
remote_node_target, &bufferattr,
|
||||||
|
PA_STREAM_DONT_MOVE |
|
||||||
PA_STREAM_INTERPOLATE_TIMING |
|
PA_STREAM_INTERPOLATE_TIMING |
|
||||||
PA_STREAM_ADJUST_LATENCY |
|
PA_STREAM_ADJUST_LATENCY |
|
||||||
PA_STREAM_AUTO_TIMING_UPDATE);
|
PA_STREAM_AUTO_TIMING_UPDATE);
|
||||||
} else {
|
} else {
|
||||||
bufferattr.tlength = pa_usec_to_bytes(impl->latency_msec * SPA_USEC_PER_MSEC, &ss);
|
bufferattr.tlength = latency_bytes / 2;
|
||||||
|
|
||||||
res = pa_stream_connect_playback(impl->pa_stream,
|
res = pa_stream_connect_playback(impl->pa_stream,
|
||||||
remote_node_target, &bufferattr,
|
remote_node_target, &bufferattr,
|
||||||
|
PA_STREAM_DONT_MOVE |
|
||||||
PA_STREAM_INTERPOLATE_TIMING |
|
PA_STREAM_INTERPOLATE_TIMING |
|
||||||
PA_STREAM_ADJUST_LATENCY |
|
PA_STREAM_ADJUST_LATENCY |
|
||||||
PA_STREAM_AUTO_TIMING_UPDATE,
|
PA_STREAM_AUTO_TIMING_UPDATE,
|
||||||
|
|
@ -713,6 +728,41 @@ static void parse_audio_info(struct pw_properties *props, struct spa_audio_info_
|
||||||
info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels);
|
info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels);
|
||||||
if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL)
|
if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL)
|
||||||
parse_position(info, str, strlen(str));
|
parse_position(info, str, strlen(str));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static int calc_frame_size(struct spa_audio_info_raw *info)
|
||||||
|
{
|
||||||
|
int res = info->channels;
|
||||||
|
switch (info->format) {
|
||||||
|
case SPA_AUDIO_FORMAT_U8:
|
||||||
|
case SPA_AUDIO_FORMAT_S8:
|
||||||
|
case SPA_AUDIO_FORMAT_ALAW:
|
||||||
|
case SPA_AUDIO_FORMAT_ULAW:
|
||||||
|
return res;
|
||||||
|
case SPA_AUDIO_FORMAT_S16:
|
||||||
|
case SPA_AUDIO_FORMAT_S16_OE:
|
||||||
|
case SPA_AUDIO_FORMAT_U16:
|
||||||
|
return res * 2;
|
||||||
|
case SPA_AUDIO_FORMAT_S24:
|
||||||
|
case SPA_AUDIO_FORMAT_S24_OE:
|
||||||
|
case SPA_AUDIO_FORMAT_U24:
|
||||||
|
return res * 3;
|
||||||
|
case SPA_AUDIO_FORMAT_S24_32:
|
||||||
|
case SPA_AUDIO_FORMAT_S24_32_OE:
|
||||||
|
case SPA_AUDIO_FORMAT_S32:
|
||||||
|
case SPA_AUDIO_FORMAT_S32_OE:
|
||||||
|
case SPA_AUDIO_FORMAT_U32:
|
||||||
|
case SPA_AUDIO_FORMAT_U32_OE:
|
||||||
|
case SPA_AUDIO_FORMAT_F32:
|
||||||
|
case SPA_AUDIO_FORMAT_F32_OE:
|
||||||
|
return res * 4;
|
||||||
|
case SPA_AUDIO_FORMAT_F64:
|
||||||
|
case SPA_AUDIO_FORMAT_F64_OE:
|
||||||
|
return res * 8;
|
||||||
|
default:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void copy_props(struct impl *impl, struct pw_properties *props, const char *key)
|
static void copy_props(struct impl *impl, struct pw_properties *props, const char *key)
|
||||||
|
|
@ -764,6 +814,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
||||||
|
|
||||||
spa_ringbuffer_init(&impl->ring);
|
spa_ringbuffer_init(&impl->ring);
|
||||||
impl->buffer = calloc(1, RINGBUFFER_SIZE);
|
impl->buffer = calloc(1, RINGBUFFER_SIZE);
|
||||||
|
spa_dll_init(&impl->dll);
|
||||||
|
|
||||||
if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) {
|
if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) {
|
||||||
if (spa_streq(str, "capture")) {
|
if (spa_streq(str, "capture")) {
|
||||||
|
|
@ -779,6 +830,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
||||||
|
|
||||||
impl->latency_msec = pw_properties_get_uint32(props, "pulse.latency", DEFAULT_LATENCY_MSEC);
|
impl->latency_msec = pw_properties_get_uint32(props, "pulse.latency", DEFAULT_LATENCY_MSEC);
|
||||||
|
|
||||||
|
|
||||||
if (pw_properties_get(props, PW_KEY_NODE_WANT_DRIVER) == NULL)
|
if (pw_properties_get(props, PW_KEY_NODE_WANT_DRIVER) == NULL)
|
||||||
pw_properties_set(props, PW_KEY_NODE_WANT_DRIVER, "true");
|
pw_properties_set(props, PW_KEY_NODE_WANT_DRIVER, "true");
|
||||||
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
|
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
|
||||||
|
|
@ -808,6 +860,16 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
||||||
|
|
||||||
parse_audio_info(impl->stream_props, &impl->info);
|
parse_audio_info(impl->stream_props, &impl->info);
|
||||||
|
|
||||||
|
impl->frame_size = calc_frame_size(&impl->info);
|
||||||
|
if (impl->frame_size == 0) {
|
||||||
|
pw_log_error("unsupported audio format:%d channels:%d",
|
||||||
|
impl->info.format, impl->info.channels);
|
||||||
|
res = -EINVAL;
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->info.rate);
|
||||||
|
impl->max_error = 256.0f * impl->frame_size;
|
||||||
|
|
||||||
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
|
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
|
||||||
if (impl->core == NULL) {
|
if (impl->core == NULL) {
|
||||||
str = pw_properties_get(props, PW_KEY_REMOTE_NAME);
|
str = pw_properties_get(props, PW_KEY_REMOTE_NAME);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue