module-echo-cancel: Wire up the webrtc echo canceller

Mostly uses the existing infrastructure, but the webrtc canceller has a
fixed blocksize, so we:

  1. Use the canceller blocksize if configured
  2. Accumulate output data in a ringbuffer
  3. Push out the data in the required chunk size
This commit is contained in:
Arun Raghavan 2021-05-28 11:28:03 -04:00
parent 1349d5334d
commit d95870d8d3
9 changed files with 247 additions and 69 deletions

View file

@ -1,6 +1,7 @@
/* PipeWire
*
* Copyright © 2021 Wim Taymans
* © 2021 Arun Raghavan <arun@asymptotic.io>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
@ -97,7 +98,6 @@ struct impl {
struct spa_hook source_listener;
void *rec_buffer[SPA_AUDIO_MAX_CHANNELS];
uint32_t rec_ringsize;
uint32_t rec_bufsize;
struct spa_ringbuffer rec_ring;
struct spa_io_rate_match *rec_rate_match;
@ -108,12 +108,16 @@ struct impl {
struct spa_hook sink_listener;
void *play_buffer[SPA_AUDIO_MAX_CHANNELS];
uint32_t play_ringsize;
uint32_t play_bufsize;
struct spa_ringbuffer play_ring;
struct spa_io_rate_match *play_rate_match;
void *out_buffer[SPA_AUDIO_MAX_CHANNELS];
uint32_t out_ringsize;
struct spa_ringbuffer out_ring;
const struct echo_cancel_info *aec_info;
void *aec;
uint32_t aec_blocksize;
unsigned int capture_ready:1;
unsigned int sink_ready:1;
@ -139,56 +143,46 @@ static void process(struct impl *impl)
{
struct pw_buffer *cout;
struct pw_buffer *pout;
float rec_buf[impl->info.channels][impl->rec_bufsize / sizeof(float)];
float play_buf[impl->info.channels][impl->play_bufsize / sizeof(float)];
float rec_buf[impl->info.channels][impl->aec_blocksize / sizeof(float)];
float play_buf[impl->info.channels][impl->aec_blocksize / sizeof(float)];
float out_buf[impl->info.channels][impl->aec_blocksize / sizeof(float)];
const float *rec[impl->info.channels];
const float *play[impl->info.channels];
float *out[impl->info.channels];
struct spa_data *dd;
uint32_t i, size = 0;
uint32_t rindex, pindex;
uint32_t i, size;
uint32_t rindex, pindex, oindex, avail;
int32_t stride = 0;
if ((cout = pw_stream_dequeue_buffer(impl->source)) == NULL)
pw_log_debug("out of source buffers: %m");
if ((pout = pw_stream_dequeue_buffer(impl->playback)) == NULL)
if ((pout = pw_stream_dequeue_buffer(impl->playback)) == NULL) {
pw_log_debug("out of playback buffers: %m");
if (impl->rec_bufsize != impl->play_bufsize) {
pw_log_warn("mismatched buffer sizes");
goto done;
}
if (cout == NULL || pout == NULL)
goto done;
size = impl->aec_blocksize;
/* First read a block from the playback and capture ring buffers */
spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
for (i = 0; i < impl->info.channels; i++) {
/* captured samples, with echo from sink */
rec[i] = &rec_buf[i][0];
/* echo from sink */
play[i] = &play_buf[i][0];
/* filtered samples, without echo from sink */
out[i] = &out_buf[i][0];
stride = 0;
size = impl->rec_bufsize;
spa_ringbuffer_read_data(&impl->rec_ring, impl->rec_buffer[i],
impl->rec_ringsize, rindex % impl->rec_ringsize,
(void*)rec_buf[i], size);
rec[i] = &rec_buf[i][0];
(void*)rec[i], size);
/* filtered samples, without echo from sink */
dd = &cout->buffer->datas[i];
out[i] = dd->data;
dd->chunk->offset = 0;
dd->chunk->size = size;
dd->chunk->stride = stride;
/* echo from sink */
stride = 0;
size = impl->play_bufsize;
spa_ringbuffer_read_data(&impl->play_ring, impl->play_buffer[i],
impl->play_ringsize, pindex % impl->play_ringsize,
(void *)play_buf[i], size);
play[i] = &play_buf[i][0];
(void *)play[i], size);
/* output to sink, just copy */
dd = &pout->buffer->datas[i];
@ -202,14 +196,64 @@ static void process(struct impl *impl)
spa_ringbuffer_read_update(&impl->rec_ring, rindex + size);
spa_ringbuffer_read_update(&impl->play_ring, pindex + size);
pw_stream_queue_buffer(impl->playback, pout);
/* Now run the canceller */
echo_cancel_run(impl->aec_info, impl->aec, rec, play, out, size / sizeof(float));
done:
if (cout != NULL)
pw_stream_queue_buffer(impl->source, cout);
if (pout != NULL)
pw_stream_queue_buffer(impl->playback, pout);
/* Next, copy over the output to the output ringbuffer */
avail = spa_ringbuffer_get_write_index(&impl->out_ring, &oindex);
if (avail + size > impl->out_ringsize) {
uint32_t rindex, drop;
/* Drop enough so we have size bytes left */
drop = avail + size - impl->out_ringsize;
pw_log_debug("output ringbuffer xrun %d + %u > %u, dropping %u",
avail, size, impl->out_ringsize, drop);
spa_ringbuffer_get_read_index(&impl->out_ring, &rindex);
spa_ringbuffer_read_update(&impl->out_ring, rindex + drop);
avail += drop;
}
for (i = 0; i < impl->info.channels; i++) {
/* captured samples, with echo from sink */
spa_ringbuffer_write_data(&impl->out_ring, impl->out_buffer[i],
impl->out_ringsize, oindex % impl->out_ringsize,
(void *)out[i], size);
}
spa_ringbuffer_write_update(&impl->out_ring, oindex + size);
/* And finally take data from the output ringbuffer and make it
* available on the source */
avail = spa_ringbuffer_get_read_index(&impl->out_ring, &oindex);
while (avail >= size) {
if ((cout = pw_stream_dequeue_buffer(impl->source)) == NULL) {
pw_log_debug("out of source buffers: %m");
break;
}
for (i = 0; i < impl->info.channels; i++) {
dd = &cout->buffer->datas[i];
spa_ringbuffer_read_data(&impl->out_ring, impl->out_buffer[i],
impl->out_ringsize, oindex % impl->out_ringsize,
(void *)dd->data, size);
dd->chunk->offset = 0;
dd->chunk->size = size;
dd->chunk->stride = 0;
}
pw_stream_queue_buffer(impl->source, cout);
oindex += size;
spa_ringbuffer_read_update(&impl->out_ring, oindex);
avail -= size;
}
done:
impl->sink_ready = false;
impl->capture_ready = false;
}
@ -261,11 +305,12 @@ static void capture_process(void *data)
avail += drop;
}
/* If we don't know what size to push yet, keep the block size the same
/* If we don't know what size to push yet, use the canceller blocksize
* if it has a specific requirement, else keep the block size the same
* on input and output or what the resampler needs */
if (impl->rec_bufsize == 0) {
impl->rec_bufsize = SPA_MAX(size, impl->rec_rate_match->size);
pw_log_debug("Setting capture buffer size to %u", impl->rec_bufsize);
if (impl->aec_blocksize == 0) {
impl->aec_blocksize = SPA_MAX(size, impl->rec_rate_match->size);
pw_log_debug("Setting AEC block size to %u", impl->aec_blocksize);
}
for (i = 0; i < impl->info.channels; i++) {
@ -280,7 +325,7 @@ static void capture_process(void *data)
spa_ringbuffer_write_update(&impl->rec_ring, index + size);
if (avail + size >= impl->rec_bufsize) {
if (avail + size >= impl->aec_blocksize) {
impl->capture_ready = true;
if (impl->sink_ready)
process(impl);
@ -415,11 +460,9 @@ static void sink_process(void *data)
avail += drop;
}
/* If we don't know what size to push yet, keep the block size the same
* on input and output or what the resampler needs */
if (impl->play_bufsize == 0) {
impl->play_bufsize = SPA_MAX(size, impl->play_rate_match->size);
pw_log_debug("Setting sink buffer size to %u", impl->play_bufsize);
if (impl->aec_blocksize == 0) {
impl->aec_blocksize = SPA_MAX(size, impl->rec_rate_match->size);
pw_log_debug("Setting AEC block size to %u", impl->aec_blocksize);
}
for (i = 0; i < impl->info.channels; i++) {
@ -434,7 +477,7 @@ static void sink_process(void *data)
spa_ringbuffer_write_update(&impl->play_ring, index + size);
if (avail + size >= impl->play_bufsize) {
if (avail + size >= impl->aec_blocksize) {
impl->sink_ready = true;
if (impl->capture_ready)
process(impl);
@ -477,6 +520,8 @@ static int setup_streams(struct impl *impl)
NULL);
pw_properties_setf(props,
PW_KEY_NODE_GROUP, "echo-cancel-%u", impl->id);
if (impl->aec_info->latency)
pw_properties_set(props, PW_KEY_NODE_LATENCY, impl->aec_info->latency);
impl->capture = pw_stream_new(impl->core,
"echo-cancel capture", props);
@ -512,6 +557,8 @@ static int setup_streams(struct impl *impl)
NULL);
pw_properties_setf(props,
PW_KEY_NODE_GROUP, "echo-cancel-%u", impl->id);
if (impl->aec_info->latency)
pw_properties_set(props, PW_KEY_NODE_LATENCY, impl->aec_info->latency);
impl->playback = pw_stream_new(impl->core,
"echo-cancel playback", props);
@ -563,12 +610,15 @@ static int setup_streams(struct impl *impl)
impl->rec_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000;
impl->play_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000;
impl->out_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000;
for (i = 0; i < impl->info.channels; i++) {
impl->rec_buffer[i] = malloc(impl->rec_ringsize);
impl->play_buffer[i] = malloc(impl->play_ringsize);
impl->out_buffer[i] = malloc(impl->out_ringsize);
}
spa_ringbuffer_init(&impl->rec_ring);
spa_ringbuffer_init(&impl->play_ring);
spa_ringbuffer_init(&impl->out_ring);
return 0;
}
@ -626,6 +676,8 @@ static void impl_destroy(struct impl *impl)
free(impl->rec_buffer[i]);
if (impl->play_buffer[i])
free(impl->play_buffer[i]);
if (impl->out_buffer[i])
free(impl->out_buffer[i]);
}
free(impl);
@ -699,7 +751,7 @@ SPA_EXPORT
int pipewire__module_init(struct pw_impl_module *module, const char *args)
{
struct pw_context *context = pw_impl_module_get_context(module);
struct pw_properties *props;
struct pw_properties *props, *aec_props;
struct impl *impl;
uint32_t id = pw_global_get_id(pw_impl_module_get_global(module));
const char *str;
@ -759,20 +811,37 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (pw_properties_get(impl->sink_props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_set(impl->sink_props, PW_KEY_MEDIA_CLASS, "Audio/Sink");
copy_props(impl, props, PW_KEY_NODE_NAME);
copy_props(impl, props, PW_KEY_NODE_DESCRIPTION);
copy_props(impl, props, PW_KEY_NODE_GROUP);
copy_props(impl, props, PW_KEY_NODE_VIRTUAL);
copy_props(impl, props, PW_KEY_NODE_LATENCY);
if ((str = pw_properties_get(props, "aec.method")) == NULL)
str = "null";
str = "webrtc";
#ifdef HAVE_WEBRTC
if (spa_streq(str, "webrtc"))
impl->aec_info = echo_cancel_webrtc;
else
#endif
impl->aec_info = echo_cancel_null;
impl->aec = echo_cancel_create(impl->aec_info, NULL, impl->info.channels);
if ((str = pw_properties_get(props, "aec.args")) != NULL)
aec_props = pw_properties_new_string(str);
else
aec_props = pw_properties_new(NULL, NULL);
impl->aec = echo_cancel_create(impl->aec_info, aec_props, &impl->info);
pw_properties_free(aec_props);
if (impl->aec_info->latency) {
unsigned int num, denom;
pw_log_info("Setting node latency to %s", impl->aec_info->latency);
pw_properties_set(props, PW_KEY_NODE_LATENCY, impl->aec_info->latency);
sscanf(impl->aec_info->latency, "%u/%u", &num, &denom);
impl->aec_blocksize = sizeof(float) * impl->info.rate * num / denom;
} else {
/* Implementation doesn't care about the block size */
impl->aec_blocksize = 0;
}
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {
@ -790,6 +859,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
copy_props(impl, props, PW_KEY_NODE_NAME);
copy_props(impl, props, PW_KEY_NODE_DESCRIPTION);
copy_props(impl, props, PW_KEY_NODE_GROUP);
copy_props(impl, props, PW_KEY_NODE_VIRTUAL);
copy_props(impl, props, PW_KEY_NODE_LATENCY);
pw_properties_free(props);
pw_proxy_add_listener((struct pw_proxy*)impl->core,