From d95870d8d3ab7fb9b91f0dbd8ae9395b1929434b Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Fri, 28 May 2021 11:28:03 -0400 Subject: [PATCH] module-echo-cancel: Wire up the webrtc echo canceller Mostly uses the existing infrastructure, but the webrtc canceller has a fixed blocksize, so we: 1. Use the canceller blocksize if configured 2. Accumulate output data in a ringbuffer 3. Push out the data in the required chunk size --- .gitlab-ci.yml | 3 +- config.h.meson | 2 + meson.build | 8 + meson_options.txt | 4 + src/modules/meson.build | 17 +- src/modules/module-echo-cancel.c | 183 +++++++++++++------ src/modules/module-echo-cancel/aec-null.c | 5 +- src/modules/module-echo-cancel/aec-webrtc.cc | 85 ++++++++- src/modules/module-echo-cancel/echo-cancel.h | 9 +- 9 files changed, 247 insertions(+), 69 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 02763f606..c8e6f6e4c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -19,7 +19,7 @@ include: .fedora: variables: # Update this tag when you want to trigger a rebuild - FDO_DISTRIBUTION_TAG: '2021-05-10.0' + FDO_DISTRIBUTION_TAG: '2021-06-02.0' FDO_DISTRIBUTION_VERSION: '33' FDO_DISTRIBUTION_PACKAGES: >- alsa-lib-devel @@ -45,6 +45,7 @@ include: SDL2-devel systemd-devel vulkan-loader-devel + webrtc-audio-processing-devel which xmltoman diff --git a/config.h.meson b/config.h.meson index b8ac1fd23..5de311ea5 100644 --- a/config.h.meson +++ b/config.h.meson @@ -486,3 +486,5 @@ #mesondefine PA_ALSA_PROFILE_SETS_DIR #mesondefine HAVE_GSTREAMER_DEVICE_PROVIDER + +#mesondefine HAVE_WEBRTC diff --git a/meson.build b/meson.build index 1308373a8..390ecdea5 100644 --- a/meson.build +++ b/meson.build @@ -344,6 +344,14 @@ if not get_option('gstreamer-device-provider').disabled() cdata.set('HAVE_GSTREAMER_DEVICE_PROVIDER', 1) endif +webrtc_dep = dependency('webrtc-audio-processing', + version : ['>= 0.2', '< 1.0'], + required : get_option('echo-cancel-webrtc')) + +if webrtc_dep.found() + cdata.set('HAVE_WEBRTC', 1) +endif + # On FreeBSD, epoll-shim library is required for eventfd() and timerfd() epoll_shim_dep = (build_machine.system() == 'freebsd' ? dependency('epoll-shim', required: true) diff --git a/meson_options.txt b/meson_options.txt index 9bc33fcd8..f47536bc2 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -191,3 +191,7 @@ option('avahi', description: 'Enable code that depends on avahi', type: 'feature', value: 'auto') +option('echo-cancel-webrtc', + description : 'Enable WebRTC-based echo canceller', + type : 'feature', + value : 'auto') diff --git a/src/modules/meson.build b/src/modules/meson.build index 7438295b2..c3a889bb0 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -35,16 +35,25 @@ pipewire_module_filter_chain = shared_library('pipewire-module-filter-chain', dependencies : [mathlib, dl_lib, pipewire_dep], ) +pipewire_module_echo_cancel_sources = [ + 'module-echo-cancel.c', + 'module-echo-cancel/aec-null.c', +] + +if webrtc_dep.found() + pipewire_module_echo_cancel_sources += [ + 'module-echo-cancel/aec-webrtc.cc' + ] +endif + pipewire_module_echo_cancel = shared_library('pipewire-module-echo-cancel', - [ 'module-echo-cancel.c', - 'module-echo-cancel/aec-null.c', - 'module-echo-cancel/aec-webrtc.cc' ], + pipewire_module_echo_cancel_sources, c_args : pipewire_module_c_args, include_directories : [configinc, spa_inc], install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, pipewire_dep], + dependencies : [mathlib, dl_lib, pipewire_dep, webrtc_dep], ) pipewire_module_profiler = shared_library('pipewire-module-profiler', diff --git a/src/modules/module-echo-cancel.c b/src/modules/module-echo-cancel.c index 63fe2b532..a344283f4 100644 --- a/src/modules/module-echo-cancel.c +++ b/src/modules/module-echo-cancel.c @@ -1,6 +1,7 @@ /* PipeWire * * Copyright © 2021 Wim Taymans + * © 2021 Arun Raghavan * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -97,7 +98,6 @@ struct impl { struct spa_hook source_listener; void *rec_buffer[SPA_AUDIO_MAX_CHANNELS]; uint32_t rec_ringsize; - uint32_t rec_bufsize; struct spa_ringbuffer rec_ring; struct spa_io_rate_match *rec_rate_match; @@ -108,12 +108,16 @@ struct impl { struct spa_hook sink_listener; void *play_buffer[SPA_AUDIO_MAX_CHANNELS]; uint32_t play_ringsize; - uint32_t play_bufsize; struct spa_ringbuffer play_ring; struct spa_io_rate_match *play_rate_match; + void *out_buffer[SPA_AUDIO_MAX_CHANNELS]; + uint32_t out_ringsize; + struct spa_ringbuffer out_ring; + const struct echo_cancel_info *aec_info; void *aec; + uint32_t aec_blocksize; unsigned int capture_ready:1; unsigned int sink_ready:1; @@ -139,56 +143,46 @@ static void process(struct impl *impl) { struct pw_buffer *cout; struct pw_buffer *pout; - float rec_buf[impl->info.channels][impl->rec_bufsize / sizeof(float)]; - float play_buf[impl->info.channels][impl->play_bufsize / sizeof(float)]; + float rec_buf[impl->info.channels][impl->aec_blocksize / sizeof(float)]; + float play_buf[impl->info.channels][impl->aec_blocksize / sizeof(float)]; + float out_buf[impl->info.channels][impl->aec_blocksize / sizeof(float)]; const float *rec[impl->info.channels]; const float *play[impl->info.channels]; float *out[impl->info.channels]; struct spa_data *dd; - uint32_t i, size = 0; - uint32_t rindex, pindex; + uint32_t i, size; + uint32_t rindex, pindex, oindex, avail; int32_t stride = 0; - if ((cout = pw_stream_dequeue_buffer(impl->source)) == NULL) - pw_log_debug("out of source buffers: %m"); - - if ((pout = pw_stream_dequeue_buffer(impl->playback)) == NULL) + if ((pout = pw_stream_dequeue_buffer(impl->playback)) == NULL) { pw_log_debug("out of playback buffers: %m"); - - if (impl->rec_bufsize != impl->play_bufsize) { - pw_log_warn("mismatched buffer sizes"); goto done; } - if (cout == NULL || pout == NULL) - goto done; + size = impl->aec_blocksize; + + /* First read a block from the playback and capture ring buffers */ spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex); spa_ringbuffer_get_read_index(&impl->play_ring, &pindex); for (i = 0; i < impl->info.channels; i++) { /* captured samples, with echo from sink */ + rec[i] = &rec_buf[i][0]; + /* echo from sink */ + play[i] = &play_buf[i][0]; + /* filtered samples, without echo from sink */ + out[i] = &out_buf[i][0]; + stride = 0; - size = impl->rec_bufsize; spa_ringbuffer_read_data(&impl->rec_ring, impl->rec_buffer[i], impl->rec_ringsize, rindex % impl->rec_ringsize, - (void*)rec_buf[i], size); - rec[i] = &rec_buf[i][0]; + (void*)rec[i], size); - /* filtered samples, without echo from sink */ - dd = &cout->buffer->datas[i]; - out[i] = dd->data; - dd->chunk->offset = 0; - dd->chunk->size = size; - dd->chunk->stride = stride; - - /* echo from sink */ stride = 0; - size = impl->play_bufsize; spa_ringbuffer_read_data(&impl->play_ring, impl->play_buffer[i], impl->play_ringsize, pindex % impl->play_ringsize, - (void *)play_buf[i], size); - play[i] = &play_buf[i][0]; + (void *)play[i], size); /* output to sink, just copy */ dd = &pout->buffer->datas[i]; @@ -202,14 +196,64 @@ static void process(struct impl *impl) spa_ringbuffer_read_update(&impl->rec_ring, rindex + size); spa_ringbuffer_read_update(&impl->play_ring, pindex + size); + pw_stream_queue_buffer(impl->playback, pout); + + /* Now run the canceller */ echo_cancel_run(impl->aec_info, impl->aec, rec, play, out, size / sizeof(float)); -done: - if (cout != NULL) - pw_stream_queue_buffer(impl->source, cout); - if (pout != NULL) - pw_stream_queue_buffer(impl->playback, pout); + /* Next, copy over the output to the output ringbuffer */ + avail = spa_ringbuffer_get_write_index(&impl->out_ring, &oindex); + if (avail + size > impl->out_ringsize) { + uint32_t rindex, drop; + /* Drop enough so we have size bytes left */ + drop = avail + size - impl->out_ringsize; + pw_log_debug("output ringbuffer xrun %d + %u > %u, dropping %u", + avail, size, impl->out_ringsize, drop); + + spa_ringbuffer_get_read_index(&impl->out_ring, &rindex); + spa_ringbuffer_read_update(&impl->out_ring, rindex + drop); + + avail += drop; + } + + for (i = 0; i < impl->info.channels; i++) { + /* captured samples, with echo from sink */ + spa_ringbuffer_write_data(&impl->out_ring, impl->out_buffer[i], + impl->out_ringsize, oindex % impl->out_ringsize, + (void *)out[i], size); + } + + spa_ringbuffer_write_update(&impl->out_ring, oindex + size); + + /* And finally take data from the output ringbuffer and make it + * available on the source */ + + avail = spa_ringbuffer_get_read_index(&impl->out_ring, &oindex); + while (avail >= size) { + if ((cout = pw_stream_dequeue_buffer(impl->source)) == NULL) { + pw_log_debug("out of source buffers: %m"); + break; + } + + for (i = 0; i < impl->info.channels; i++) { + dd = &cout->buffer->datas[i]; + spa_ringbuffer_read_data(&impl->out_ring, impl->out_buffer[i], + impl->out_ringsize, oindex % impl->out_ringsize, + (void *)dd->data, size); + dd->chunk->offset = 0; + dd->chunk->size = size; + dd->chunk->stride = 0; + } + + pw_stream_queue_buffer(impl->source, cout); + + oindex += size; + spa_ringbuffer_read_update(&impl->out_ring, oindex); + avail -= size; + } + +done: impl->sink_ready = false; impl->capture_ready = false; } @@ -261,11 +305,12 @@ static void capture_process(void *data) avail += drop; } - /* If we don't know what size to push yet, keep the block size the same + /* If we don't know what size to push yet, use the canceller blocksize + * if it has a specific requirement, else keep the block size the same * on input and output or what the resampler needs */ - if (impl->rec_bufsize == 0) { - impl->rec_bufsize = SPA_MAX(size, impl->rec_rate_match->size); - pw_log_debug("Setting capture buffer size to %u", impl->rec_bufsize); + if (impl->aec_blocksize == 0) { + impl->aec_blocksize = SPA_MAX(size, impl->rec_rate_match->size); + pw_log_debug("Setting AEC block size to %u", impl->aec_blocksize); } for (i = 0; i < impl->info.channels; i++) { @@ -280,7 +325,7 @@ static void capture_process(void *data) spa_ringbuffer_write_update(&impl->rec_ring, index + size); - if (avail + size >= impl->rec_bufsize) { + if (avail + size >= impl->aec_blocksize) { impl->capture_ready = true; if (impl->sink_ready) process(impl); @@ -415,11 +460,9 @@ static void sink_process(void *data) avail += drop; } - /* If we don't know what size to push yet, keep the block size the same - * on input and output or what the resampler needs */ - if (impl->play_bufsize == 0) { - impl->play_bufsize = SPA_MAX(size, impl->play_rate_match->size); - pw_log_debug("Setting sink buffer size to %u", impl->play_bufsize); + if (impl->aec_blocksize == 0) { + impl->aec_blocksize = SPA_MAX(size, impl->rec_rate_match->size); + pw_log_debug("Setting AEC block size to %u", impl->aec_blocksize); } for (i = 0; i < impl->info.channels; i++) { @@ -434,7 +477,7 @@ static void sink_process(void *data) spa_ringbuffer_write_update(&impl->play_ring, index + size); - if (avail + size >= impl->play_bufsize) { + if (avail + size >= impl->aec_blocksize) { impl->sink_ready = true; if (impl->capture_ready) process(impl); @@ -477,6 +520,8 @@ static int setup_streams(struct impl *impl) NULL); pw_properties_setf(props, PW_KEY_NODE_GROUP, "echo-cancel-%u", impl->id); + if (impl->aec_info->latency) + pw_properties_set(props, PW_KEY_NODE_LATENCY, impl->aec_info->latency); impl->capture = pw_stream_new(impl->core, "echo-cancel capture", props); @@ -512,6 +557,8 @@ static int setup_streams(struct impl *impl) NULL); pw_properties_setf(props, PW_KEY_NODE_GROUP, "echo-cancel-%u", impl->id); + if (impl->aec_info->latency) + pw_properties_set(props, PW_KEY_NODE_LATENCY, impl->aec_info->latency); impl->playback = pw_stream_new(impl->core, "echo-cancel playback", props); @@ -563,12 +610,15 @@ static int setup_streams(struct impl *impl) impl->rec_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000; impl->play_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000; + impl->out_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000; for (i = 0; i < impl->info.channels; i++) { impl->rec_buffer[i] = malloc(impl->rec_ringsize); impl->play_buffer[i] = malloc(impl->play_ringsize); + impl->out_buffer[i] = malloc(impl->out_ringsize); } spa_ringbuffer_init(&impl->rec_ring); spa_ringbuffer_init(&impl->play_ring); + spa_ringbuffer_init(&impl->out_ring); return 0; } @@ -626,6 +676,8 @@ static void impl_destroy(struct impl *impl) free(impl->rec_buffer[i]); if (impl->play_buffer[i]) free(impl->play_buffer[i]); + if (impl->out_buffer[i]) + free(impl->out_buffer[i]); } free(impl); @@ -699,7 +751,7 @@ SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); - struct pw_properties *props; + struct pw_properties *props, *aec_props; struct impl *impl; uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); const char *str; @@ -759,20 +811,37 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (pw_properties_get(impl->sink_props, PW_KEY_MEDIA_CLASS) == NULL) pw_properties_set(impl->sink_props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); - copy_props(impl, props, PW_KEY_NODE_NAME); - copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); - copy_props(impl, props, PW_KEY_NODE_GROUP); - copy_props(impl, props, PW_KEY_NODE_VIRTUAL); - copy_props(impl, props, PW_KEY_NODE_LATENCY); - if ((str = pw_properties_get(props, "aec.method")) == NULL) - str = "null"; + str = "webrtc"; + +#ifdef HAVE_WEBRTC if (spa_streq(str, "webrtc")) impl->aec_info = echo_cancel_webrtc; else +#endif impl->aec_info = echo_cancel_null; - impl->aec = echo_cancel_create(impl->aec_info, NULL, impl->info.channels); + if ((str = pw_properties_get(props, "aec.args")) != NULL) + aec_props = pw_properties_new_string(str); + else + aec_props = pw_properties_new(NULL, NULL); + + impl->aec = echo_cancel_create(impl->aec_info, aec_props, &impl->info); + + pw_properties_free(aec_props); + + if (impl->aec_info->latency) { + unsigned int num, denom; + + pw_log_info("Setting node latency to %s", impl->aec_info->latency); + pw_properties_set(props, PW_KEY_NODE_LATENCY, impl->aec_info->latency); + + sscanf(impl->aec_info->latency, "%u/%u", &num, &denom); + impl->aec_blocksize = sizeof(float) * impl->info.rate * num / denom; + } else { + /* Implementation doesn't care about the block size */ + impl->aec_blocksize = 0; + } impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { @@ -790,6 +859,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto error; } + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + pw_properties_free(props); pw_proxy_add_listener((struct pw_proxy*)impl->core, diff --git a/src/modules/module-echo-cancel/aec-null.c b/src/modules/module-echo-cancel/aec-null.c index 26c9f42b3..4ee9ac6d7 100644 --- a/src/modules/module-echo-cancel/aec-null.c +++ b/src/modules/module-echo-cancel/aec-null.c @@ -28,11 +28,11 @@ struct impl { uint32_t channels; }; -static void *null_create(struct spa_dict *info, uint32_t channels) +static void *null_create(const struct pw_properties *args, const struct spa_audio_info_raw *info) { struct impl *impl; impl = calloc(1, sizeof(struct impl)); - impl->channels = channels; + impl->channels = info->channels; return impl; } @@ -53,6 +53,7 @@ static int null_run(void *ec, const float *rec[], const float *play[], float *ou static const struct echo_cancel_info echo_cancel_null_impl = { .name = "null", .info = SPA_DICT_INIT(NULL, 0), + .latency = NULL, .create = null_create, .destroy = null_destroy, diff --git a/src/modules/module-echo-cancel/aec-webrtc.cc b/src/modules/module-echo-cancel/aec-webrtc.cc index f3035ba33..3c0e958b0 100644 --- a/src/modules/module-echo-cancel/aec-webrtc.cc +++ b/src/modules/module-echo-cancel/aec-webrtc.cc @@ -1,6 +1,7 @@ /* PipeWire * * Copyright © 2021 Wim Taymans + * © 2021 Arun Raghavan * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -24,35 +25,105 @@ #include "echo-cancel.h" +#include + +#include +#include +#include + struct impl { - uint32_t channels; + webrtc::AudioProcessing *apm = NULL; + spa_audio_info_raw info; }; -static void *webrtc_create(struct spa_dict *info, uint32_t channels) +static void *webrtc_create(const struct pw_properties *args, const spa_audio_info_raw *info) { struct impl *impl; + webrtc::AudioProcessing *apm; + webrtc::ProcessingConfig pconfig; + webrtc::Config config; + + apm = webrtc::AudioProcessing::Create(config); + + pconfig = {{ + webrtc::StreamConfig(info->rate, info->channels, false), /* input stream */ + webrtc::StreamConfig(info->rate, info->channels, false), /* output stream */ + webrtc::StreamConfig(info->rate, info->channels, false), /* reverse input stream */ + webrtc::StreamConfig(info->rate, info->channels, false), /* reverse output stream */ + }}; + + if (apm->Initialize(pconfig) != webrtc::AudioProcessing::kNoError) { + pw_log_error("Error initialising webrtc audio processing module"); + goto error; + } + + // TODO: wire up args to control these + apm->high_pass_filter()->Enable(true); + apm->echo_cancellation()->enable_drift_compensation(false); + apm->echo_cancellation()->Enable(true); + apm->noise_suppression()->set_level(webrtc::NoiseSuppression::kHigh); + apm->noise_suppression()->Enable(true); + apm->gain_control()->set_analog_level_limits(0, 255); + // FIXME: can we hook up AGC? + apm->gain_control()->set_mode(webrtc::GainControl::kAdaptiveDigital); + apm->gain_control()->Enable(true); + apm->voice_detection()->Enable(true); + impl = (struct impl *)calloc(1, sizeof(struct impl)); - impl->channels = channels; + impl->info = *info; + + impl->apm = apm; + return impl; + +error: + if (apm) + delete apm; + + return NULL; } static void webrtc_destroy(void *ec) { - free(ec); + struct impl *impl = (struct impl*)ec; + + delete impl->apm; + free(impl); } static int webrtc_run(void *ec, const float *rec[], const float *play[], float *out[], uint32_t n_samples) { struct impl *impl = (struct impl*)ec; - uint32_t i; - for (i = 0; i < impl->channels; i++) - memcpy(out[i], rec[i], n_samples * sizeof(float)); + webrtc::StreamConfig config = + webrtc::StreamConfig(impl->info.rate, impl->info.channels, false); + + if (n_samples * 1000 / impl->info.rate != 10) { + pw_log_error("Buffers must be 10ms in length (currently %u samples)", n_samples); + return -1; + } + + /* FIXME: ProcessReverseStream may change the playback buffer, in which + * case we should use that, if we ever expose the intelligibility + * enhancer */ + if (impl->apm->ProcessReverseStream(play, config, config, (float**)play) != + webrtc::AudioProcessing::kNoError) { + pw_log_error("Processing reverse stream failed"); + } + + impl->apm->set_stream_delay_ms(0); + + if (impl->apm->ProcessStream(rec, config, config, out) != + webrtc::AudioProcessing::kNoError) { + pw_log_error("Processing stream failed"); + } + return 0; } static const struct echo_cancel_info echo_cancel_webrtc_impl = { .name = "webrtc", .info = SPA_DICT_INIT(NULL, 0), + .latency = "480/48000", .create = webrtc_create, .destroy = webrtc_destroy, diff --git a/src/modules/module-echo-cancel/echo-cancel.h b/src/modules/module-echo-cancel/echo-cancel.h index 2b31e9f0e..fe011b962 100644 --- a/src/modules/module-echo-cancel/echo-cancel.h +++ b/src/modules/module-echo-cancel/echo-cancel.h @@ -22,14 +22,19 @@ * DEALINGS IN THE SOFTWARE. */ +#include "config.h" + #include #include +#include + struct echo_cancel_info { const char *name; const struct spa_dict info; + const char *latency; - void *(*create) (struct spa_dict *info, uint32_t channels); + void *(*create) (const struct pw_properties *args, const struct spa_audio_info_raw *info); void (*destroy) (void *ec); int (*run) (void *ec, const float *rec[], const float *play[], float *out[], uint32_t n_samples); @@ -39,5 +44,7 @@ struct echo_cancel_info { #define echo_cancel_destroy(i,...) (i)->destroy(__VA_ARGS__) #define echo_cancel_run(i,...) (i)->run(__VA_ARGS__) +#ifdef HAVE_WEBRTC extern const struct echo_cancel_info *echo_cancel_webrtc; +#endif extern const struct echo_cancel_info *echo_cancel_null;