diff --git a/src/modules/echo-cancel/echo-cancel.h b/src/modules/echo-cancel/echo-cancel.h index 19e13505a..799631b0b 100644 --- a/src/modules/echo-cancel/echo-cancel.h +++ b/src/modules/echo-cancel/echo-cancel.h @@ -60,11 +60,16 @@ struct pa_echo_canceller_params { #endif /* each canceller-specific structure goes here */ } priv; + + /* Set this if canceller can do drift compensation. Also see set_drift() + * below */ + pa_bool_t drift_compensation; }; typedef struct pa_echo_canceller pa_echo_canceller; struct pa_echo_canceller { + /* Initialise canceller engine. */ pa_bool_t (*init) (pa_core *c, pa_echo_canceller *ec, pa_sample_spec *source_ss, @@ -73,9 +78,36 @@ struct pa_echo_canceller { pa_channel_map *sink_map, uint32_t *blocksize, const char *args); + + /* You should have only one of play()+record() or run() set. The first + * works under the assumption that you'll handle buffering and matching up + * samples yourself. If you set run(), module-echo-cancel will handle + * synchronising the playback and record streams. */ + + /* Feed the engine 'blocksize' playback bytes.. */ + void (*play) (pa_echo_canceller *ec, const uint8_t *play); + /* Feed the engine 'blocksize' record bytes. blocksize processed bytes are + * returned in out. */ + void (*record) (pa_echo_canceller *ec, const uint8_t *rec, uint8_t *out); + /* Feed the engine blocksize playback and record streams, with a reasonable + * effort at keeping the two in sync. blocksize processed bytes are + * returned in out. */ void (*run) (pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out); + + /* Optional callback to set the drift, expressed as the ratio of the + * difference in number of playback and capture samples to the number of + * capture samples, for some instant of time. This is used only if the + * canceller signals that it supports drift compensation, and is called + * before record(). The actual implementation needs to derive drift based + * on point samples -- the individual values are not accurate enough to use + * as-is. */ + /* NOTE: the semantics of this function might change in the future. */ + void (*set_drift) (pa_echo_canceller *ec, float drift); + + /* Free up resources. */ void (*done) (pa_echo_canceller *ec); + /* Structure with common and engine-specific canceller parameters. */ pa_echo_canceller_params params; }; @@ -102,6 +134,9 @@ pa_bool_t pa_webrtc_ec_init(pa_core *c, pa_echo_canceller *ec, pa_sample_spec *source_ss, pa_channel_map *source_map, pa_sample_spec *sink_ss, pa_channel_map *sink_map, uint32_t *blocksize, const char *args); +void pa_webrtc_ec_play(pa_echo_canceller *ec, const uint8_t *play); +void pa_webrtc_ec_record(pa_echo_canceller *ec, const uint8_t *rec, uint8_t *out); +void pa_webrtc_ec_set_drift(pa_echo_canceller *ec, float drift); void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out); void pa_webrtc_ec_done(pa_echo_canceller *ec); PA_C_DECL_END diff --git a/src/modules/echo-cancel/module-echo-cancel.c b/src/modules/echo-cancel/module-echo-cancel.c index 7360b270d..05d3bd4f6 100644 --- a/src/modules/echo-cancel/module-echo-cancel.c +++ b/src/modules/echo-cancel/module-echo-cancel.c @@ -31,6 +31,7 @@ #endif #include +#include #include "echo-cancel.h" @@ -107,6 +108,9 @@ static const pa_echo_canceller ec_table[] = { { /* WebRTC's audio processing engine */ .init = pa_webrtc_ec_init, + .play = pa_webrtc_ec_play, + .record = pa_webrtc_ec_record, + .set_drift = pa_webrtc_ec_set_drift, .run = pa_webrtc_ec_run, .done = pa_webrtc_ec_done, }, @@ -200,6 +204,10 @@ struct userdata { int64_t recv_counter; size_t sink_skip; + /* Bytes left over from previous iteration */ + size_t sink_rem; + size_t source_rem; + pa_atomic_t request_resync; pa_time_event *time_event; @@ -650,11 +658,157 @@ static void do_resync(struct userdata *u) { apply_diff_time(u, diff_time); } +/* 1. Calculate drift at this point, pass to canceller + * 2. Push out playback samples in blocksize chunks + * 3. Push out capture samples in blocksize chunks + * 4. ??? + * 5. Profit + */ +static void do_push_drift_comp(struct userdata *u) { + size_t rlen, plen; + pa_memchunk rchunk, pchunk, cchunk; + uint8_t *rdata, *pdata, *cdata; + float drift; + + rlen = pa_memblockq_get_length(u->source_memblockq); + plen = pa_memblockq_get_length(u->sink_memblockq); + + /* Estimate snapshot drift as follows: + * pd: amount of data consumed since last time + * rd: amount of data consumed since last time + * + * drift = (pd - rd) / rd; + * + * We calculate pd and rd as the memblockq length less the number of + * samples left from the last iteration (to avoid double counting + * those remainder samples. + */ + drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem)); + u->sink_rem = plen % u->blocksize; + u->source_rem = rlen % u->blocksize; + + /* Now let the canceller work its drift compensation magic */ + u->ec->set_drift(u->ec, drift); + + /* Send in the playback samples first */ + while (plen >= u->blocksize) { + pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk); + pdata = pa_memblock_acquire(pchunk.memblock); + pdata += pchunk.index; + + u->ec->play(u->ec, pdata); + + pa_memblock_release(pchunk.memblock); + pa_memblockq_drop(u->sink_memblockq, u->blocksize); + pa_memblock_unref(pchunk.memblock); + + plen -= u->blocksize; + } + + /* And now the capture samples */ + while (rlen >= u->blocksize) { + pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk); + + rdata = pa_memblock_acquire(rchunk.memblock); + rdata += rchunk.index; + + cchunk.index = 0; + cchunk.length = u->blocksize; + cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length); + cdata = pa_memblock_acquire(cchunk.memblock); + + u->ec->record(u->ec, rdata, cdata); + + pa_memblock_release(cchunk.memblock); + pa_memblock_release(rchunk.memblock); + + pa_memblock_unref(rchunk.memblock); + + pa_source_post(u->source, &cchunk); + pa_memblock_unref(cchunk.memblock); + + pa_memblockq_drop(u->source_memblockq, u->blocksize); + rlen -= u->blocksize; + } +} + +/* This one's simpler than the drift compensation case -- we just iterate over + * the capture buffer, and pass the canceller blocksize bytes of playback and + * capture data. */ +static void do_push(struct userdata *u) { + size_t rlen, plen; + pa_memchunk rchunk, pchunk, cchunk; + uint8_t *rdata, *pdata, *cdata; + int unused; + + rlen = pa_memblockq_get_length(u->source_memblockq); + plen = pa_memblockq_get_length(u->sink_memblockq); + + while (rlen >= u->blocksize) { + /* take fixed block from recorded samples */ + pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk); + + if (plen > u->blocksize) { + if (plen > u->blocksize) { + /* take fixed block from played samples */ + pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk); + + rdata = pa_memblock_acquire(rchunk.memblock); + rdata += rchunk.index; + pdata = pa_memblock_acquire(pchunk.memblock); + pdata += pchunk.index; + + cchunk.index = 0; + cchunk.length = u->blocksize; + cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length); + cdata = pa_memblock_acquire(cchunk.memblock); + + if (u->save_aec) { + if (u->captured_file) + unused = fwrite(rdata, 1, u->blocksize, u->captured_file); + if (u->played_file) + unused = fwrite(pdata, 1, u->blocksize, u->played_file); + } + + /* perform echo cancellation */ + u->ec->run(u->ec, rdata, pdata, cdata); + + if (u->save_aec) { + if (u->canceled_file) + unused = fwrite(cdata, 1, u->blocksize, u->canceled_file); + } + + pa_memblock_release(cchunk.memblock); + pa_memblock_release(pchunk.memblock); + pa_memblock_release(rchunk.memblock); + + /* drop consumed sink samples */ + pa_memblockq_drop(u->sink_memblockq, u->blocksize); + pa_memblock_unref(pchunk.memblock); + + pa_memblock_unref(rchunk.memblock); + /* the filtered samples now become the samples from our + * source */ + rchunk = cchunk; + + plen -= u->blocksize; + } + } + + /* forward the (echo-canceled) data to the virtual source */ + pa_source_post(u->source, &rchunk); + pa_memblock_unref(rchunk.memblock); + + pa_memblockq_drop(u->source_memblockq, u->blocksize); + rlen -= u->blocksize; + } +} + /* Called from input thread context */ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { struct userdata *u; size_t rlen, plen, to_skip; - pa_memchunk rchunk, pchunk; + pa_memchunk rchunk; pa_source_output_assert_ref(o); pa_source_output_assert_io_context(o); @@ -727,68 +881,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) u->sink_skip -= to_skip; } - while (rlen >= u->blocksize) { - /* take fixed block from recorded samples */ - pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk); - - if (plen > u->blocksize) { - uint8_t *rdata, *pdata, *cdata; - pa_memchunk cchunk; - int unused; - - if (plen > u->blocksize) { - /* take fixed block from played samples */ - pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk); - - rdata = pa_memblock_acquire(rchunk.memblock); - rdata += rchunk.index; - pdata = pa_memblock_acquire(pchunk.memblock); - pdata += pchunk.index; - - cchunk.index = 0; - cchunk.length = u->blocksize; - cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length); - cdata = pa_memblock_acquire(cchunk.memblock); - - if (u->save_aec) { - if (u->captured_file) - unused = fwrite(rdata, 1, u->blocksize, u->captured_file); - if (u->played_file) - unused = fwrite(pdata, 1, u->blocksize, u->played_file); - } - - /* perform echo cancellation */ - u->ec->run(u->ec, rdata, pdata, cdata); - - if (u->save_aec) { - if (u->canceled_file) - unused = fwrite(cdata, 1, u->blocksize, u->canceled_file); - } - - pa_memblock_release(cchunk.memblock); - pa_memblock_release(pchunk.memblock); - pa_memblock_release(rchunk.memblock); - - /* drop consumed sink samples */ - pa_memblockq_drop(u->sink_memblockq, u->blocksize); - pa_memblock_unref(pchunk.memblock); - - pa_memblock_unref(rchunk.memblock); - /* the filtered samples now become the samples from our - * source */ - rchunk = cchunk; - - plen -= u->blocksize; - } - } - - /* forward the (echo-canceled) data to the virtual source */ - pa_source_post(u->source, &rchunk); - pa_memblock_unref(rchunk.memblock); - - pa_memblockq_drop(u->source_memblockq, u->blocksize); - rlen -= u->blocksize; - } + /* process and push out samples */ + if (u->ec->params.drift_compensation) + do_push_drift_comp(u); + else + do_push(u); } /* Called from I/O thread context */ @@ -1380,6 +1477,9 @@ static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *sourc } u->ec->init = ec_table[ec_method].init; + u->ec->play = ec_table[ec_method].play; + u->ec->record = ec_table[ec_method].record; + u->ec->set_drift = ec_table[ec_method].set_drift; u->ec->run = ec_table[ec_method].run; u->ec->done = ec_table[ec_method].done; @@ -1499,6 +1599,9 @@ int pa__init(pa_module*m) { } } + if (u->ec->params.drift_compensation) + pa_assert(u->ec->set_drift); + /* Create source */ pa_source_new_data_init(&source_data); source_data.driver = __FILE__; @@ -1688,8 +1791,14 @@ int pa__init(pa_module*m) { goto fail; } - if (u->adjust_time > 0) + if (u->adjust_time > 0 && !u->ec->params.drift_compensation) u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u); + else if (u->ec->params.drift_compensation) { + pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled"); + u->adjust_time = 0; + /* Perform resync just once to give the canceller a leg up */ + pa_atomic_store(&u->request_resync, 1); + } if (u->save_aec) { pa_log("Creating AEC files in /tmp"); diff --git a/src/modules/echo-cancel/webrtc.cc b/src/modules/echo-cancel/webrtc.cc index c53e96303..f84555b49 100644 --- a/src/modules/echo-cancel/webrtc.cc +++ b/src/modules/echo-cancel/webrtc.cc @@ -47,6 +47,7 @@ PA_C_DECL_END #define DEFAULT_MOBILE FALSE #define DEFAULT_ROUTING_MODE "speakerphone" #define DEFAULT_COMFORT_NOISE TRUE +#define DEFAULT_DRIFT_COMPENSATION FALSE static const char* const valid_modargs[] = { "high_pass_filter", @@ -56,6 +57,7 @@ static const char* const valid_modargs[] = { "mobile", "routing_mode", "comfort_noise", + "drift_compensation", NULL }; @@ -125,7 +127,18 @@ pa_bool_t pa_webrtc_ec_init(pa_core *c, pa_echo_canceller *ec, goto fail; } + ec->params.drift_compensation = DEFAULT_DRIFT_COMPENSATION; + if (pa_modargs_get_value_boolean(ma, "drift_compensation", &ec->params.drift_compensation) < 0) { + pa_log("Failed to parse drift_compensation value"); + goto fail; + } + if (mobile) { + if (ec->params.drift_compensation) { + pa_log("Can't use drift_compensation in mobile mode"); + goto fail; + } + if ((rm = routing_mode_from_string(pa_modargs_get_value(ma, "routing_mode", DEFAULT_ROUTING_MODE))) < 0) { pa_log("Failed to parse routing_mode value"); goto fail; @@ -160,7 +173,13 @@ pa_bool_t pa_webrtc_ec_init(pa_core *c, pa_echo_canceller *ec, apm->high_pass_filter()->Enable(true); if (!mobile) { - apm->echo_cancellation()->enable_drift_compensation(false); + if (ec->params.drift_compensation) { + apm->echo_cancellation()->set_device_sample_rate_hz(source_ss->rate); + apm->echo_cancellation()->enable_drift_compensation(true); + } else { + apm->echo_cancellation()->enable_drift_compensation(false); + } + apm->echo_cancellation()->Enable(true); } else { apm->echo_control_mobile()->set_routing_mode(static_cast(rm)); @@ -204,9 +223,9 @@ fail: return FALSE; } -void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out) { +void pa_webrtc_ec_play(pa_echo_canceller *ec, const uint8_t *play) { webrtc::AudioProcessing *apm = (webrtc::AudioProcessing*)ec->params.priv.webrtc.apm; - webrtc::AudioFrame play_frame, out_frame; + webrtc::AudioFrame play_frame; const pa_sample_spec *ss = &ec->params.priv.webrtc.sample_spec; play_frame._audioChannel = ss->channels; @@ -214,18 +233,37 @@ void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t * play_frame._payloadDataLengthInSamples = ec->params.priv.webrtc.blocksize / pa_frame_size(ss); memcpy(play_frame._payloadData, play, ec->params.priv.webrtc.blocksize); + apm->AnalyzeReverseStream(&play_frame); +} + +void pa_webrtc_ec_record(pa_echo_canceller *ec, const uint8_t *rec, uint8_t *out) { + webrtc::AudioProcessing *apm = (webrtc::AudioProcessing*)ec->params.priv.webrtc.apm; + webrtc::AudioFrame out_frame; + const pa_sample_spec *ss = &ec->params.priv.webrtc.sample_spec; + out_frame._audioChannel = ss->channels; out_frame._frequencyInHz = ss->rate; out_frame._payloadDataLengthInSamples = ec->params.priv.webrtc.blocksize / pa_frame_size(ss); memcpy(out_frame._payloadData, rec, ec->params.priv.webrtc.blocksize); - apm->AnalyzeReverseStream(&play_frame); apm->set_stream_delay_ms(0); apm->ProcessStream(&out_frame); memcpy(out, out_frame._payloadData, ec->params.priv.webrtc.blocksize); } +void pa_webrtc_ec_set_drift(pa_echo_canceller *ec, float drift) { + webrtc::AudioProcessing *apm = (webrtc::AudioProcessing*)ec->params.priv.webrtc.apm; + const pa_sample_spec *ss = &ec->params.priv.webrtc.sample_spec; + + apm->echo_cancellation()->set_stream_drift_samples(drift * ec->params.priv.webrtc.blocksize / pa_frame_size(ss)); +} + +void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out) { + pa_webrtc_ec_play(ec, play); + pa_webrtc_ec_record(ec, rec, out); +} + void pa_webrtc_ec_done(pa_echo_canceller *ec) { if (ec->params.priv.webrtc.apm) { webrtc::AudioProcessing::Destroy((webrtc::AudioProcessing*)ec->params.priv.webrtc.apm);