module-loopback: add target.delay.sec property

Add a target.delay.sec property to module-loopback that uses a
ringbuffer to further delay the signal to the requested value. This
also takes into account the graph delay to get an end-to-end delay.

Add a -d property to pw-loopback to control this.

Implement latency_msec on the pulse module with this new property so
that it behaves similar to pulseaudio.
This commit is contained in:
Wim Taymans 2022-10-20 16:02:28 +02:00
parent 35c9650e0e
commit 24b113e2d3
3 changed files with 137 additions and 15 deletions

View file

@ -35,6 +35,7 @@
#include <spa/utils/result.h>
#include <spa/utils/string.h>
#include <spa/utils/json.h>
#include <spa/utils/ringbuffer.h>
#include <spa/param/profiler.h>
#include <spa/debug/pod.h>
@ -54,6 +55,7 @@
* ## Module Options
*
* - `node.description`: a human readable name for the loopback streams
* - `target.delay.sec`: delay in seconds as float (Since 0.3.60)
* - `capture.props = {}`: properties to be passed to the input stream
* - `playback.props = {}`: properties to be passed to the output stream
*
@ -91,6 +93,7 @@
* { name = libpipewire-module-loopback
* args = {
* node.description = "CM106 Stereo Pair 2"
* #target.delay.sec = 1.5
* capture.props = {
* node.name = "CM106_stereo_pair_2"
* media.class = "Audio/Sink"
@ -128,6 +131,7 @@ static const struct spa_dict_item module_props[] = {
"[ audio.rate=<sample rate> ] "
"[ audio.channels=<number of channels> ] "
"[ audio.position=<channel map> ] "
"[ target.delay.sec=<delay as seconds in float> ] "
"[ capture.props=<properties> ] "
"[ playback.props=<properties> ] " },
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
@ -160,13 +164,21 @@ struct impl {
struct pw_stream *capture;
struct spa_hook capture_listener;
struct spa_audio_info_raw capture_info;
struct spa_latency_info capture_latency;
struct pw_properties *playback_props;
struct pw_stream *playback;
struct spa_hook playback_listener;
struct spa_audio_info_raw playback_info;
struct spa_latency_info playback_latency;
unsigned int do_disconnect:1;
unsigned int recalc_delay:1;
float target_delay;
struct spa_ringbuffer buffer;
uint8_t *buffer_data;
uint32_t buffer_size;
};
static void capture_destroy(void *d)
@ -176,6 +188,28 @@ static void capture_destroy(void *d)
impl->capture = NULL;
}
static void recalculate_delay(struct impl *impl)
{
uint32_t target = impl->capture_info.rate * impl->target_delay, cdelay, pdelay;
uint32_t delay, w;
struct pw_time pwt;
pw_stream_get_time_n(impl->playback, &pwt, sizeof(pwt));
pdelay = pwt.delay;
pw_stream_get_time_n(impl->capture, &pwt, sizeof(pwt));
cdelay = pwt.delay;
delay = target - SPA_MIN(target, pdelay + cdelay);
delay = SPA_MIN(delay, impl->buffer_size / 4);
spa_ringbuffer_get_write_index(&impl->buffer, &w);
spa_ringbuffer_read_update(&impl->buffer, w - (delay * 4));
pw_log_info("target:%d c:%d + p:%d + delay:%d = (%d)",
target, cdelay, pdelay, delay,
cdelay + pdelay + delay);
}
static void capture_process(void *d)
{
struct impl *impl = d;
@ -188,6 +222,11 @@ static void playback_process(void *d)
struct pw_buffer *in, *out;
uint32_t i;
if (impl->recalc_delay) {
recalculate_delay(impl);
impl->recalc_delay = false;
}
if ((in = pw_stream_dequeue_buffer(impl->capture)) == NULL)
pw_log_debug("out of capture buffers: %m");
@ -199,6 +238,7 @@ static void playback_process(void *d)
int32_t stride = 0;
struct spa_data *d;
const void *src[in->buffer->n_datas];
uint32_t r, w, buffer_size;
for (i = 0; i < in->buffer->n_datas; i++) {
uint32_t offs, size;
@ -211,13 +251,33 @@ static void playback_process(void *d)
outsize = SPA_MIN(outsize, size);
stride = SPA_MAX(stride, d->chunk->stride);
}
if (impl->buffer_size > 0) {
buffer_size = impl->buffer_size;
spa_ringbuffer_get_write_index(&impl->buffer, &w);
for (i = 0; i < in->buffer->n_datas; i++) {
void *buffer_data = &impl->buffer_data[i * buffer_size];
spa_ringbuffer_write_data(&impl->buffer,
buffer_data, buffer_size,
w % buffer_size, src[i], outsize);
src[i] = buffer_data;
}
w += outsize;
spa_ringbuffer_write_update(&impl->buffer, w);
spa_ringbuffer_get_read_index(&impl->buffer, &r);
} else {
r = 0;
buffer_size = outsize;
}
for (i = 0; i < out->buffer->n_datas; i++) {
d = &out->buffer->datas[i];
outsize = SPA_MIN(outsize, d->maxsize);
if (i < in->buffer->n_datas)
memcpy(d->data, src[i], outsize);
spa_ringbuffer_read_data(&impl->buffer,
src[i], buffer_size,
r % buffer_size,
d->data, outsize);
else
memset(d->data, 0, outsize);
@ -225,6 +285,10 @@ static void playback_process(void *d)
d->chunk->size = outsize;
d->chunk->stride = stride;
}
if (impl->buffer_size > 0) {
r += outsize;
spa_ringbuffer_read_update(&impl->buffer, r);
}
}
if (in != NULL)
@ -234,7 +298,7 @@ static void playback_process(void *d)
}
static void param_latency_changed(struct impl *impl, const struct spa_pod *param,
struct pw_stream *other)
struct spa_latency_info *info, struct pw_stream *other)
{
struct spa_latency_info latency;
uint8_t buffer[1024];
@ -244,9 +308,13 @@ static void param_latency_changed(struct impl *impl, const struct spa_pod *param
if (spa_latency_parse(param, &latency) < 0)
return;
*info = latency;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
params[0] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
pw_stream_update_params(other, params, 1);
impl->recalc_delay = true;
}
static void stream_state_changed(void *data, enum pw_stream_state old,
@ -257,6 +325,7 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
case PW_STREAM_STATE_PAUSED:
pw_stream_flush(impl->playback, false);
pw_stream_flush(impl->capture, false);
impl->recalc_delay = true;
break;
case PW_STREAM_STATE_UNCONNECTED:
pw_log_info("module %p: unconnected", impl);
@ -270,13 +339,49 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
}
}
static void recalculate_buffer(struct impl *impl)
{
if (impl->target_delay > 0.0f) {
uint32_t delay = impl->capture_info.rate * impl->target_delay;
void *data;
impl->buffer_size = (delay + (1u<<15)) * 4;
data = realloc(impl->buffer_data, impl->buffer_size * impl->capture_info.channels);
if (data == NULL) {
pw_log_warn("can't allocate delay buffer, delay disabled: %m");
impl->buffer_size = 0;
free(impl->buffer_data);
}
impl->buffer_data = data;
spa_ringbuffer_init(&impl->buffer);
} else {
impl->buffer_size = 0;
free(impl->buffer_data);
impl->buffer_data = NULL;
}
pw_log_info("configured delay:%f buffer:%d", impl->target_delay, impl->buffer_size);
impl->recalc_delay = true;
}
static void capture_param_changed(void *data, uint32_t id, const struct spa_pod *param)
{
struct impl *impl = data;
switch (id) {
case SPA_PARAM_Format:
{
struct spa_audio_info_raw info;
if (param == NULL)
return;
if (spa_format_audio_raw_parse(param, &info) < 0)
return;
impl->capture_info = info;
recalculate_buffer(impl);
break;
}
case SPA_PARAM_Latency:
param_latency_changed(impl, param, impl->playback);
param_latency_changed(impl, param, &impl->capture_latency, impl->playback);
break;
}
}
@ -302,7 +407,7 @@ static void playback_param_changed(void *data, uint32_t id, const struct spa_pod
switch (id) {
case SPA_PARAM_Latency:
param_latency_changed(impl, param, impl->capture);
param_latency_changed(impl, param, &impl->playback_latency, impl->capture);
break;
}
}
@ -539,6 +644,15 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "playback.props")) != NULL)
pw_properties_update_string(impl->playback_props, str, strlen(str));
if ((str = pw_properties_get(props, "target.delay.sec")) != NULL)
spa_atof(str, &impl->target_delay);
if (impl->target_delay > 0.0f &&
pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL)
/* a source and sink (USB) usually have a 1.5 quantum delay, so we use
* a 2 times smaller quantum to compensate */
pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%u",
(unsigned)(impl->target_delay * 48000 / 3), 48000);
copy_props(impl, props, PW_KEY_AUDIO_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);
copy_props(impl, props, SPA_KEY_AUDIO_POSITION);

View file

@ -47,6 +47,7 @@ struct module_loopback_data {
struct pw_properties *playback_props;
struct spa_audio_info_raw info;
uint32_t latency_msec;
};
static void module_destroy(void *data)
@ -68,6 +69,7 @@ static int module_loopback_load(struct client *client, struct module *module)
FILE *f;
char *args;
size_t size, i;
char val[256];
pw_properties_setf(data->capture_props, PW_KEY_NODE_GROUP, "loopback-%u", module->index);
pw_properties_setf(data->playback_props, PW_KEY_NODE_GROUP, "loopback-%u", module->index);
@ -88,6 +90,10 @@ static int module_loopback_load(struct client *client, struct module *module)
fprintf(f, " ]");
}
}
if (data->latency_msec != 0)
fprintf(f, " target.delay.sec = %s",
spa_json_format_float(val, sizeof(val),
data->latency_msec / 1000.0f));
fprintf(f, " capture.props = {");
pw_properties_serialize_dict(f, &data->capture_props->dict, 0);
fprintf(f, " } playback.props = {");
@ -204,15 +210,8 @@ static int module_loopback_prepare(struct module * const module)
pw_properties_set(props, "remix", NULL);
}
if ((str = pw_properties_get(props, "latency_msec")) != NULL) {
/* Half the latency on each of the playback and capture streams */
pw_properties_setf(capture_props, PW_KEY_NODE_LATENCY, "%s/2000", str);
pw_properties_setf(playback_props, PW_KEY_NODE_LATENCY, "%s/2000", str);
pw_properties_set(props, "latency_msec", NULL);
} else {
pw_properties_set(capture_props, PW_KEY_NODE_LATENCY, "100/1000");
pw_properties_set(playback_props, PW_KEY_NODE_LATENCY, "100/1000");
}
if ((str = pw_properties_get(props, "latency_msec")) != NULL)
d->latency_msec = atoi(str);
if ((str = pw_properties_get(props, "sink_input_properties")) != NULL) {
module_args_add_props(playback_props, str);

View file

@ -57,6 +57,7 @@ struct data {
uint32_t channels;
uint32_t latency;
float delay;
struct pw_properties *capture_props;
struct pw_properties *playback_props;
@ -93,6 +94,7 @@ static void show_help(struct data *data, const char *name, bool error)
" -c, --channels Number of channels (default %d)\n"
" -m, --channel-map Channel map (default '%s')\n"
" -l, --latency Desired latency in ms\n"
" -d, --delay Desired delay in float s\n"
" -C --capture Capture source to connect to\n"
" --capture-props Capture stream properties\n"
" -P --playback Playback sink to connect to\n"
@ -109,7 +111,7 @@ int main(int argc, char *argv[])
struct data data = { 0 };
struct pw_loop *l;
const char *opt_remote = NULL;
char cname[256];
char cname[256], value[256];
char *args;
size_t size;
FILE *f;
@ -121,6 +123,7 @@ int main(int argc, char *argv[])
{ "name", required_argument, NULL, 'n' },
{ "channels", required_argument, NULL, 'c' },
{ "latency", required_argument, NULL, 'l' },
{ "delay", required_argument, NULL, 'd' },
{ "capture", required_argument, NULL, 'C' },
{ "playback", required_argument, NULL, 'P' },
{ "capture-props", required_argument, NULL, 'i' },
@ -146,7 +149,7 @@ int main(int argc, char *argv[])
goto exit;
}
while ((c = getopt_long(argc, argv, "hVr:n:g:c:m:l:C:P:i:o:", long_options, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "hVr:n:g:c:m:l:d:C:P:i:o:", long_options, NULL)) != -1) {
switch (c) {
case 'h':
show_help(&data, argv[0], false);
@ -177,6 +180,9 @@ int main(int argc, char *argv[])
case 'l':
data.latency = atoi(optarg) * DEFAULT_RATE / SPA_MSEC_PER_SEC;
break;
case 'd':
data.delay = atof(optarg);
break;
case 'C':
pw_properties_set(data.capture_props, PW_KEY_NODE_TARGET, optarg);
break;
@ -223,6 +229,9 @@ int main(int argc, char *argv[])
fprintf(f, " remote.name = \"%s\"", opt_remote);
if (data.latency != 0)
fprintf(f, " node.latency = %u/%u", data.latency, DEFAULT_RATE);
if (data.delay != 0.0f)
fprintf(f, " target.delay.sec = %s",
spa_json_format_float(value, sizeof(value), data.delay));
if (data.channels != 0)
fprintf(f, " audio.channels = %u", data.channels);
if (data.opt_channel_map != NULL)