module-combine-stream: add option to add delays for latency compensation

Add combine.latency-compensate option, which uses delay buffers to
compensate for different latencies in the target streams.
This commit is contained in:
Pauli Virtanen 2023-04-30 21:44:32 +03:00 committed by Wim Taymans
parent e24d7cc8b7
commit cc5da73665

View file

@ -45,6 +45,7 @@
* - `node.name`: a unique name for the stream
* - `node.description`: a human readable name for the stream
* - `combine.mode` = capture | playback | sink | source, default sink
* - `combine.latency-compensate`: use delay buffers to match stream latencies
* - `combine.props = {}`: properties to be passed to the sink/source
* - `stream.props = {}`: properties to be passed to the streams
* - `stream.rules = {}`: rules for matching streams, use create-stream actions
@ -80,6 +81,7 @@
* combine.mode = sink
* node.name = "combine_sink"
* node.description = "My Combine Sink"
* combine.latency-compensate = false
* combine.props = {
* audio.position = [ FL FR ]
* }
@ -119,6 +121,7 @@
* combine.mode = sink
* node.name = "combine_sink_5_1"
* node.description = "My 5.1 Combine Sink"
* combine.latency-compensate = false
* combine.props = {
* audio.position = [ FL FR FC LFE SL SR ]
* }
@ -214,6 +217,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
"( stream.props=<properties> ) " \
"( stream.rules=<properties> ) "
#define DELAYBUF_MAX_SIZE (20 * sizeof(float) * 96000)
static const struct spa_dict_item module_props[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
@ -224,6 +229,7 @@ static const struct spa_dict_item module_props[] = {
struct impl {
struct pw_context *context;
struct pw_loop *main_loop;
struct pw_data_loop *data_loop;
struct pw_properties *props;
@ -244,6 +250,8 @@ struct impl {
struct pw_registry *registry;
struct spa_hook registry_listener;
struct spa_source *update_delay_event;
struct pw_properties *combine_props;
struct pw_stream *combine;
struct spa_hook combine_listener;
@ -259,11 +267,18 @@ struct impl {
struct spa_audio_info_raw info;
unsigned int do_disconnect:1;
unsigned int latency_compensate:1;
struct spa_list streams;
uint32_t n_streams;
};
struct ringbuffer {
void *buf;
uint32_t idx;
uint32_t size;
};
struct stream {
uint32_t id;
@ -278,6 +293,13 @@ struct stream {
struct spa_audio_info_raw info;
uint32_t remap[SPA_AUDIO_MAX_CHANNELS];
uint32_t rate;
void *delaybuf;
struct ringbuffer delay[SPA_AUDIO_MAX_CHANNELS];
int64_t delay_nsec; /* for main loop */
int64_t data_delay_nsec; /* for data loop */
unsigned int ready:1;
unsigned int added:1;
@ -324,6 +346,53 @@ static void parse_audio_info(const struct pw_properties *props, struct spa_audio
parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION));
}
static void ringbuffer_init(struct ringbuffer *r, void *buf, uint32_t size)
{
r->buf = buf;
r->idx = 0;
r->size = size;
}
static void ringbuffer_memcpy(struct ringbuffer *r, void *dst, void *src, uint32_t size)
{
uint32_t avail;
avail = SPA_MIN(size, r->size);
/* buf to dst */
if (dst && avail > 0) {
spa_ringbuffer_read_data(NULL, r->buf, r->size, r->idx, dst, avail);
dst = SPA_PTROFF(dst, avail, void);
}
/* src to dst */
if (size > avail) {
if (dst)
memcpy(dst, src, size - avail);
src = SPA_PTROFF(src, size - avail, void);
}
/* src to buf */
if (avail > 0) {
spa_ringbuffer_write_data(NULL, r->buf, r->size, r->idx, src, avail);
r->idx = (r->idx + avail) % r->size;
}
}
static void ringbuffer_copy(struct ringbuffer *dst, struct ringbuffer *src)
{
uint32_t l0, l1;
if (dst->size == 0 || src->size == 0)
return;
l0 = src->size - src->idx;
l1 = src->idx;
ringbuffer_memcpy(dst, NULL, SPA_PTROFF(src->buf, src->idx, void), l0);
ringbuffer_memcpy(dst, NULL, src->buf, l1);
}
static struct stream *find_stream(struct impl *impl, uint32_t id)
{
struct stream *s;
@ -347,6 +416,17 @@ static void apply_latency_offset(struct spa_latency_info *latency, int64_t offse
latency->max_ns += SPA_MAX(offset, -(int64_t)latency->max_ns);
}
static int64_t get_stream_delay(struct stream *s)
{
struct pw_time t;
if (pw_stream_get_time_n(s->stream, &t, sizeof(t)) < 0 ||
t.rate.denom == 0)
return INT64_MIN;
return t.delay * SPA_NSEC_PER_SEC * t.rate.num / t.rate.denom;
}
static void update_latency(struct impl *impl)
{
struct spa_latency_info latency;
@ -355,13 +435,28 @@ static void update_latency(struct impl *impl)
if (impl->combine == NULL)
return;
spa_latency_info_combine_start(&latency, get_combine_direction(impl));
if (!impl->latency_compensate) {
spa_latency_info_combine_start(&latency, get_combine_direction(impl));
spa_list_for_each(s, &impl->streams, link)
if (s->have_latency)
spa_latency_info_combine(&latency, &s->latency);
spa_list_for_each(s, &impl->streams, link)
if (s->have_latency)
spa_latency_info_combine(&latency, &s->latency);
spa_latency_info_combine_finish(&latency);
spa_latency_info_combine_finish(&latency);
} else {
int64_t max_delay = INT64_MIN;
latency = SPA_LATENCY_INFO(get_combine_direction(impl));
spa_list_for_each(s, &impl->streams, link) {
int64_t delay = get_stream_delay(s);
if (delay > max_delay && s->have_latency) {
latency = s->latency;
max_delay = delay;
}
}
}
apply_latency_offset(&latency, impl->latency_offset);
@ -378,6 +473,121 @@ static void update_latency(struct impl *impl)
}
}
struct replace_delay_info {
struct stream *stream;
void *buf;
struct ringbuffer delay[SPA_AUDIO_MAX_CHANNELS];
};
static int do_replace_delay(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct replace_delay_info *info = user_data;
unsigned int i;
for (i = 0; i < SPA_N_ELEMENTS(info->stream->delay); ++i) {
ringbuffer_copy(&info->delay[i], &info->stream->delay[i]);
info->stream->delay[i] = info->delay[i];
}
SPA_SWAP(info->stream->delaybuf, info->buf);
return 0;
}
static void resize_delay(struct stream *stream, uint32_t size)
{
struct replace_delay_info info;
uint32_t channels = stream->info.channels;
unsigned int i;
size = SPA_MIN(size, DELAYBUF_MAX_SIZE);
for (i = 0; i < channels; ++i)
if (stream->delay[i].size != size)
break;
if (i == channels)
return;
pw_log_info("stream %d latency compensation samples:%u", stream->id,
(unsigned int)(size / sizeof(float)));
spa_zero(info);
info.stream = stream;
if (size > 0)
info.buf = calloc(channels, size);
if (!info.buf)
size = 0;
for (i = 0; i < channels; ++i)
ringbuffer_init(&info.delay[i], SPA_PTROFF(info.buf, i*size, void), size);
pw_data_loop_invoke(stream->impl->data_loop, do_replace_delay, 0, NULL, 0, true, &info);
free(info.buf);
}
static void update_delay(struct impl *impl)
{
struct stream *s;
int64_t max_delay = INT64_MIN;
if (!impl->latency_compensate)
return;
spa_list_for_each(s, &impl->streams, link) {
int64_t delay = get_stream_delay(s);
if (delay != s->delay_nsec && delay != INT64_MIN)
pw_log_debug("stream %d delay:%"PRIi64" ns", s->id, delay);
max_delay = SPA_MAX(max_delay, delay);
s->delay_nsec = delay;
}
spa_list_for_each(s, &impl->streams, link) {
uint32_t size = 0;
if (s->delay_nsec != INT64_MIN) {
int64_t delay = max_delay - s->delay_nsec;
size = delay * s->rate / SPA_NSEC_PER_SEC;
size *= sizeof(float);
}
resize_delay(s, size);
}
update_latency(impl);
}
static void update_delay_event(void *data, uint64_t count)
{
struct impl *impl = data;
/* in main loop */
update_delay(impl);
}
static int do_clear_delaybuf(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct impl *impl = user_data;
struct stream *s;
unsigned int i;
spa_list_for_each(s, &impl->streams, link) {
for (i = 0; i < SPA_N_ELEMENTS(s->delay); ++i)
if (s->delay[i].size)
memset(s->delay[i].buf, 0, s->delay[i].size);
}
return 0;
}
static void clear_delaybuf(struct impl *impl)
{
pw_data_loop_invoke(impl->data_loop, do_clear_delaybuf, 0, NULL, 0, true, impl);
}
static int do_add_stream(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
@ -413,6 +623,8 @@ static void destroy_stream(struct stream *s)
spa_hook_remove(&s->stream_listener);
pw_stream_destroy(s->stream);
}
free(s->delaybuf);
free(s);
}
@ -462,8 +674,24 @@ static void stream_param_changed(void *d, uint32_t id, const struct spa_pod *par
{
struct stream *s = d;
struct spa_latency_info latency;
struct spa_audio_info format = { 0 };
switch (id) {
case SPA_PARAM_Format:
if (!param) {
s->rate = 0;
} else {
if (spa_format_parse(param, &format.media_type, &format.media_subtype) < 0)
break;
if (format.media_type != SPA_MEDIA_TYPE_audio ||
format.media_subtype != SPA_MEDIA_SUBTYPE_raw)
break;
if (spa_format_audio_raw_parse(param, &format.info.raw) < 0)
break;
s->rate = format.info.raw.rate;
}
update_delay(s->impl);
break;
case SPA_PARAM_Latency:
if (!param) {
s->have_latency = false;
@ -602,6 +830,7 @@ static int create_stream(struct stream_info *info)
goto error;
pw_data_loop_invoke(impl->data_loop, do_add_stream, 0, NULL, 0, true, s);
update_delay(impl);
return 0;
error_errno:
@ -673,6 +902,7 @@ static void registry_event_global_remove(void *data, uint32_t id)
return;
destroy_stream(s);
update_delay(impl);
}
static const struct pw_registry_events registry_events = {
@ -698,6 +928,7 @@ static void combine_state_changed(void *d, enum pw_stream_state old,
pw_impl_module_schedule_destroy(impl->module);
break;
case PW_STREAM_STATE_PAUSED:
clear_delaybuf(impl);
impl->combine_id = pw_stream_get_node_id(impl->combine);
pw_log_info("got combine id %d", impl->combine_id);
break;
@ -708,11 +939,27 @@ static void combine_state_changed(void *d, enum pw_stream_state old,
}
}
static bool check_stream_delay(struct stream *s)
{
int64_t delay;
if (!s->impl->latency_compensate)
return false;
delay = get_stream_delay(s);
if (delay == INT64_MIN || delay == s->data_delay_nsec)
return false;
s->data_delay_nsec = delay;
return true;
}
static void combine_input_process(void *d)
{
struct impl *impl = d;
struct pw_buffer *in, *out;
struct stream *s;
bool delay_changed = false;
if ((in = pw_stream_dequeue_buffer(impl->combine)) == NULL) {
pw_log_debug("out of buffers: %m");
@ -725,6 +972,9 @@ static void combine_input_process(void *d)
if (s->stream == NULL)
continue;
if (check_stream_delay(s))
delay_changed = true;
if ((out = pw_stream_dequeue_buffer(s->stream)) == NULL) {
pw_log_warn("out of playback buffers: %m");
goto do_trigger;
@ -746,8 +996,8 @@ static void combine_input_process(void *d)
offs = SPA_MIN(ds->chunk->offset, ds->maxsize);
size = SPA_MIN(ds->chunk->size, ds->maxsize - offs);
memcpy(dd->data,
SPA_PTROFF(ds->data, offs, void), size);
ringbuffer_memcpy(&s->delay[j],
dd->data, SPA_PTROFF(ds->data, offs, void), size);
outsize = SPA_MAX(outsize, size);
stride = SPA_MAX(stride, ds->chunk->stride);
@ -763,6 +1013,12 @@ do_trigger:
pw_stream_trigger_process(s->stream);
}
pw_stream_queue_buffer(impl->combine, in);
/* Update delay if quantum etc. has changed.
* This should be rare enough so that doing it via main loop doesn't matter.
*/
if (impl->latency_compensate && delay_changed)
pw_loop_signal_event(impl->main_loop, impl->update_delay_event);
}
static void combine_output_process(void *d)
@ -770,6 +1026,7 @@ static void combine_output_process(void *d)
struct impl *impl = d;
struct pw_buffer *in, *out;
struct stream *s;
bool delay_changed = false;
if ((out = pw_stream_dequeue_buffer(impl->combine)) == NULL) {
pw_log_debug("out of buffers: %m");
@ -782,6 +1039,9 @@ static void combine_output_process(void *d)
if (s->stream == NULL)
continue;
if (check_stream_delay(s))
delay_changed = true;
if ((in = pw_stream_dequeue_buffer(s->stream)) == NULL) {
pw_log_warn("%p: out of capture buffers: %m", s);
continue;
@ -806,8 +1066,8 @@ static void combine_output_process(void *d)
size = SPA_MIN(ds->chunk->size, ds->maxsize - offs);
size = SPA_MIN(size, dd->maxsize);
memcpy(dd->data,
SPA_PTROFF(ds->data, offs, void), size);
ringbuffer_memcpy(&s->delay[j],
dd->data, SPA_PTROFF(ds->data, offs, void), size);
outsize = SPA_MAX(outsize, size);
stride = SPA_MAX(stride, ds->chunk->stride);
@ -820,6 +1080,9 @@ static void combine_output_process(void *d)
pw_stream_queue_buffer(s->stream, in);
}
pw_stream_queue_buffer(impl->combine, out);
if (impl->latency_compensate && delay_changed)
pw_loop_signal_event(impl->main_loop, impl->update_delay_event);
}
static void combine_param_changed(void *d, uint32_t id, const struct spa_pod *param)
@ -967,6 +1230,9 @@ static void impl_destroy(struct impl *impl)
if (impl->combine)
pw_stream_destroy(impl->combine);
if (impl->update_delay_event)
pw_loop_destroy_source(impl->main_loop, impl->update_delay_event);
if (impl->registry) {
spa_hook_remove(&impl->registry_listener);
pw_proxy_destroy((struct pw_proxy*)impl->registry);
@ -1026,6 +1292,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
return -errno;
pw_log_debug("module %p: new %s", impl, args);
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_context_get_data_loop(context);
spa_list_init(&impl->streams);
@ -1062,6 +1329,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
prefix = "sink";
}
if ((str = pw_properties_get(props, "combine.latency-compensate")) != NULL)
impl->latency_compensate = spa_atob(str);
impl->combine_props = pw_properties_new(NULL, NULL);
impl->stream_props = pw_properties_new(NULL, NULL);
if (impl->combine_props == NULL || impl->stream_props == NULL) {
@ -1128,6 +1398,16 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (pw_properties_get(impl->stream_props, PW_KEY_NODE_DONT_RECONNECT) == NULL)
pw_properties_set(impl->stream_props, PW_KEY_NODE_DONT_RECONNECT, "true");
if (impl->latency_compensate) {
impl->update_delay_event = pw_loop_add_event(impl->main_loop,
update_delay_event, impl);
if (impl->update_delay_event == NULL) {
res = -errno;
pw_log_error("can't create event source: %m");
goto error;
}
}
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {
str = pw_properties_get(props, PW_KEY_REMOTE_NAME);