echo-cancel: Plug in WebRTC drift compensation

This adds the ability for echo cancellers to provide their own drift
compensation, and hooks in the appropriate bits to implement this in the
WebRTC canceller.

We do this by introducing an alternative model for the canceller. So
far, the core engine just provided a run() method which was given
blocksize-sized chunks of playback and record samples. The new model has
the engine provide play() and record() methods that can (in theory) be
called by the playback and capture threads. The latter would actually do
the processing required.

In addition to this a set_drift() method may be provided by the
implementation. PA will provide periodic samples of the drift to the
engine. These values need to be aggregated and processed over some time,
since the point values vary quite a bit (but generally fit a linear
regression reasonably accurately). At some point of time, we might move
the actual drift calculation into PA and change the semantics of this
function.

NOTE: This needs further testing before being deemed ready for wider use.
This commit is contained in:
Arun Raghavan 2011-10-07 16:28:11 +05:30
parent 8c0cca7905
commit 23ce9a4f79
3 changed files with 250 additions and 68 deletions

View file

@ -31,6 +31,7 @@
#endif
#include <stdio.h>
#include <math.h>
#include "echo-cancel.h"
@ -107,6 +108,9 @@ static const pa_echo_canceller ec_table[] = {
{
/* WebRTC's audio processing engine */
.init = pa_webrtc_ec_init,
.play = pa_webrtc_ec_play,
.record = pa_webrtc_ec_record,
.set_drift = pa_webrtc_ec_set_drift,
.run = pa_webrtc_ec_run,
.done = pa_webrtc_ec_done,
},
@ -200,6 +204,10 @@ struct userdata {
int64_t recv_counter;
size_t sink_skip;
/* Bytes left over from previous iteration */
size_t sink_rem;
size_t source_rem;
pa_atomic_t request_resync;
pa_time_event *time_event;
@ -650,11 +658,157 @@ static void do_resync(struct userdata *u) {
apply_diff_time(u, diff_time);
}
/* 1. Calculate drift at this point, pass to canceller
* 2. Push out playback samples in blocksize chunks
* 3. Push out capture samples in blocksize chunks
* 4. ???
* 5. Profit
*/
static void do_push_drift_comp(struct userdata *u) {
size_t rlen, plen;
pa_memchunk rchunk, pchunk, cchunk;
uint8_t *rdata, *pdata, *cdata;
float drift;
rlen = pa_memblockq_get_length(u->source_memblockq);
plen = pa_memblockq_get_length(u->sink_memblockq);
/* Estimate snapshot drift as follows:
* pd: amount of data consumed since last time
* rd: amount of data consumed since last time
*
* drift = (pd - rd) / rd;
*
* We calculate pd and rd as the memblockq length less the number of
* samples left from the last iteration (to avoid double counting
* those remainder samples.
*/
drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
u->sink_rem = plen % u->blocksize;
u->source_rem = rlen % u->blocksize;
/* Now let the canceller work its drift compensation magic */
u->ec->set_drift(u->ec, drift);
/* Send in the playback samples first */
while (plen >= u->blocksize) {
pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
pdata = pa_memblock_acquire(pchunk.memblock);
pdata += pchunk.index;
u->ec->play(u->ec, pdata);
pa_memblock_release(pchunk.memblock);
pa_memblockq_drop(u->sink_memblockq, u->blocksize);
pa_memblock_unref(pchunk.memblock);
plen -= u->blocksize;
}
/* And now the capture samples */
while (rlen >= u->blocksize) {
pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
rdata = pa_memblock_acquire(rchunk.memblock);
rdata += rchunk.index;
cchunk.index = 0;
cchunk.length = u->blocksize;
cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
cdata = pa_memblock_acquire(cchunk.memblock);
u->ec->record(u->ec, rdata, cdata);
pa_memblock_release(cchunk.memblock);
pa_memblock_release(rchunk.memblock);
pa_memblock_unref(rchunk.memblock);
pa_source_post(u->source, &cchunk);
pa_memblock_unref(cchunk.memblock);
pa_memblockq_drop(u->source_memblockq, u->blocksize);
rlen -= u->blocksize;
}
}
/* This one's simpler than the drift compensation case -- we just iterate over
* the capture buffer, and pass the canceller blocksize bytes of playback and
* capture data. */
static void do_push(struct userdata *u) {
size_t rlen, plen;
pa_memchunk rchunk, pchunk, cchunk;
uint8_t *rdata, *pdata, *cdata;
int unused;
rlen = pa_memblockq_get_length(u->source_memblockq);
plen = pa_memblockq_get_length(u->sink_memblockq);
while (rlen >= u->blocksize) {
/* take fixed block from recorded samples */
pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
if (plen > u->blocksize) {
if (plen > u->blocksize) {
/* take fixed block from played samples */
pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
rdata = pa_memblock_acquire(rchunk.memblock);
rdata += rchunk.index;
pdata = pa_memblock_acquire(pchunk.memblock);
pdata += pchunk.index;
cchunk.index = 0;
cchunk.length = u->blocksize;
cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
cdata = pa_memblock_acquire(cchunk.memblock);
if (u->save_aec) {
if (u->captured_file)
unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
if (u->played_file)
unused = fwrite(pdata, 1, u->blocksize, u->played_file);
}
/* perform echo cancellation */
u->ec->run(u->ec, rdata, pdata, cdata);
if (u->save_aec) {
if (u->canceled_file)
unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
}
pa_memblock_release(cchunk.memblock);
pa_memblock_release(pchunk.memblock);
pa_memblock_release(rchunk.memblock);
/* drop consumed sink samples */
pa_memblockq_drop(u->sink_memblockq, u->blocksize);
pa_memblock_unref(pchunk.memblock);
pa_memblock_unref(rchunk.memblock);
/* the filtered samples now become the samples from our
* source */
rchunk = cchunk;
plen -= u->blocksize;
}
}
/* forward the (echo-canceled) data to the virtual source */
pa_source_post(u->source, &rchunk);
pa_memblock_unref(rchunk.memblock);
pa_memblockq_drop(u->source_memblockq, u->blocksize);
rlen -= u->blocksize;
}
}
/* Called from input thread context */
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
struct userdata *u;
size_t rlen, plen, to_skip;
pa_memchunk rchunk, pchunk;
pa_memchunk rchunk;
pa_source_output_assert_ref(o);
pa_source_output_assert_io_context(o);
@ -727,68 +881,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
u->sink_skip -= to_skip;
}
while (rlen >= u->blocksize) {
/* take fixed block from recorded samples */
pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
if (plen > u->blocksize) {
uint8_t *rdata, *pdata, *cdata;
pa_memchunk cchunk;
int unused;
if (plen > u->blocksize) {
/* take fixed block from played samples */
pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
rdata = pa_memblock_acquire(rchunk.memblock);
rdata += rchunk.index;
pdata = pa_memblock_acquire(pchunk.memblock);
pdata += pchunk.index;
cchunk.index = 0;
cchunk.length = u->blocksize;
cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
cdata = pa_memblock_acquire(cchunk.memblock);
if (u->save_aec) {
if (u->captured_file)
unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
if (u->played_file)
unused = fwrite(pdata, 1, u->blocksize, u->played_file);
}
/* perform echo cancellation */
u->ec->run(u->ec, rdata, pdata, cdata);
if (u->save_aec) {
if (u->canceled_file)
unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
}
pa_memblock_release(cchunk.memblock);
pa_memblock_release(pchunk.memblock);
pa_memblock_release(rchunk.memblock);
/* drop consumed sink samples */
pa_memblockq_drop(u->sink_memblockq, u->blocksize);
pa_memblock_unref(pchunk.memblock);
pa_memblock_unref(rchunk.memblock);
/* the filtered samples now become the samples from our
* source */
rchunk = cchunk;
plen -= u->blocksize;
}
}
/* forward the (echo-canceled) data to the virtual source */
pa_source_post(u->source, &rchunk);
pa_memblock_unref(rchunk.memblock);
pa_memblockq_drop(u->source_memblockq, u->blocksize);
rlen -= u->blocksize;
}
/* process and push out samples */
if (u->ec->params.drift_compensation)
do_push_drift_comp(u);
else
do_push(u);
}
/* Called from I/O thread context */
@ -1380,6 +1477,9 @@ static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *sourc
}
u->ec->init = ec_table[ec_method].init;
u->ec->play = ec_table[ec_method].play;
u->ec->record = ec_table[ec_method].record;
u->ec->set_drift = ec_table[ec_method].set_drift;
u->ec->run = ec_table[ec_method].run;
u->ec->done = ec_table[ec_method].done;
@ -1499,6 +1599,9 @@ int pa__init(pa_module*m) {
}
}
if (u->ec->params.drift_compensation)
pa_assert(u->ec->set_drift);
/* Create source */
pa_source_new_data_init(&source_data);
source_data.driver = __FILE__;
@ -1688,8 +1791,14 @@ int pa__init(pa_module*m) {
goto fail;
}
if (u->adjust_time > 0)
if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
else if (u->ec->params.drift_compensation) {
pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
u->adjust_time = 0;
/* Perform resync just once to give the canceller a leg up */
pa_atomic_store(&u->request_resync, 1);
}
if (u->save_aec) {
pa_log("Creating AEC files in /tmp");