mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
module-echo-cancel: Separate input gathering and output processing
This aggregates buffers from the capture stream and the sink in the corresponding process callbacks, and runs processing based on the desired resampler rate if there is one. We allow the ringbuffer in which we capture buffers to grow to some extent, as that seems to be required for playback to occur smoothly. Whether this is hiding some other underlying problem likely needs investigation.
This commit is contained in:
parent
42196a331d
commit
bbae1adda0
1 changed files with 186 additions and 38 deletions
|
|
@ -56,6 +56,9 @@
|
|||
#include "module-echo-cancel/echo-cancel.h"
|
||||
|
||||
#define NAME "echo-cancel"
|
||||
/* Hopefully this is enough for any combination of AEC engine and resampler
|
||||
* input requirement for rate matching */
|
||||
#define MAX_BUFSIZE_MS 100
|
||||
|
||||
static const struct spa_dict_item module_props[] = {
|
||||
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
|
||||
|
|
@ -92,12 +95,22 @@ struct impl {
|
|||
struct pw_properties *source_props;
|
||||
struct pw_stream *source;
|
||||
struct spa_hook source_listener;
|
||||
void *rec_buffer[SPA_AUDIO_MAX_CHANNELS];
|
||||
uint32_t rec_ringsize;
|
||||
uint32_t rec_bufsize;
|
||||
struct spa_ringbuffer rec_ring;
|
||||
struct spa_io_rate_match *rec_rate_match;
|
||||
|
||||
struct pw_stream *playback;
|
||||
struct spa_hook playback_listener;
|
||||
struct pw_properties *sink_props;
|
||||
struct pw_stream *sink;
|
||||
struct spa_hook sink_listener;
|
||||
void *play_buffer[SPA_AUDIO_MAX_CHANNELS];
|
||||
uint32_t play_ringsize;
|
||||
uint32_t play_bufsize;
|
||||
struct spa_ringbuffer play_ring;
|
||||
struct spa_io_rate_match *play_rate_match;
|
||||
|
||||
const struct echo_cancel_info *aec_info;
|
||||
void *aec;
|
||||
|
|
@ -124,37 +137,43 @@ static void unload_module(struct impl *impl)
|
|||
|
||||
static void process(struct impl *impl)
|
||||
{
|
||||
struct pw_buffer *cin, *cout;
|
||||
struct pw_buffer *pin, *pout;
|
||||
const float *rec[SPA_AUDIO_MAX_CHANNELS];
|
||||
const float *play[SPA_AUDIO_MAX_CHANNELS];
|
||||
float *out[SPA_AUDIO_MAX_CHANNELS];
|
||||
struct spa_data *ds, *dd;
|
||||
struct pw_buffer *cout;
|
||||
struct pw_buffer *pout;
|
||||
float rec_buf[impl->info.channels][impl->rec_bufsize / sizeof(float)];
|
||||
float play_buf[impl->info.channels][impl->play_bufsize / sizeof(float)];
|
||||
const float *rec[impl->info.channels];
|
||||
const float *play[impl->info.channels];
|
||||
float *out[impl->info.channels];
|
||||
struct spa_data *dd;
|
||||
uint32_t i, size = 0;
|
||||
uint32_t rindex, pindex;
|
||||
int32_t stride = 0;
|
||||
|
||||
if ((cin = pw_stream_dequeue_buffer(impl->capture)) == NULL)
|
||||
pw_log_warn("out of capture buffers: %m");
|
||||
|
||||
if ((cout = pw_stream_dequeue_buffer(impl->source)) == NULL)
|
||||
pw_log_warn("out of source buffers: %m");
|
||||
|
||||
if ((pin = pw_stream_dequeue_buffer(impl->sink)) == NULL)
|
||||
pw_log_warn("out of sink buffers: %m");
|
||||
pw_log_debug("out of source buffers: %m");
|
||||
|
||||
if ((pout = pw_stream_dequeue_buffer(impl->playback)) == NULL)
|
||||
pw_log_warn("out of playback buffers: %m");
|
||||
pw_log_debug("out of playback buffers: %m");
|
||||
|
||||
if (cin == NULL || cout == NULL || pin == NULL || pout == NULL)
|
||||
if (impl->rec_bufsize != impl->play_bufsize) {
|
||||
pw_log_warn("mismatched buffer sizes");
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (cout == NULL || pout == NULL)
|
||||
goto done;
|
||||
|
||||
spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
|
||||
spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
|
||||
|
||||
for (i = 0; i < impl->info.channels; i++) {
|
||||
/* captured samples, with echo from sink */
|
||||
ds = &cin->buffer->datas[i];
|
||||
rec[i] = SPA_PTROFF(ds->data, ds->chunk->offset, void);
|
||||
|
||||
size = ds->chunk->size;
|
||||
stride = ds->chunk->stride;
|
||||
stride = 0;
|
||||
size = impl->rec_bufsize;
|
||||
spa_ringbuffer_read_data(&impl->rec_ring, impl->rec_buffer[i],
|
||||
impl->rec_ringsize, rindex % impl->rec_ringsize,
|
||||
(void*)rec_buf[i], size);
|
||||
rec[i] = &rec_buf[i][0];
|
||||
|
||||
/* filtered samples, without echo from sink */
|
||||
dd = &cout->buffer->datas[i];
|
||||
|
|
@ -164,8 +183,12 @@ static void process(struct impl *impl)
|
|||
dd->chunk->stride = stride;
|
||||
|
||||
/* echo from sink */
|
||||
ds = &pin->buffer->datas[i];
|
||||
play[i] = SPA_PTROFF(ds->data, ds->chunk->offset, void);
|
||||
stride = 0;
|
||||
size = impl->play_bufsize;
|
||||
spa_ringbuffer_read_data(&impl->play_ring, impl->play_buffer[i],
|
||||
impl->play_ringsize, pindex % impl->play_ringsize,
|
||||
(void *)play_buf[i], size);
|
||||
play[i] = &play_buf[i][0];
|
||||
|
||||
/* output to sink, just copy */
|
||||
dd = &pout->buffer->datas[i];
|
||||
|
|
@ -175,15 +198,15 @@ static void process(struct impl *impl)
|
|||
dd->chunk->size = size;
|
||||
dd->chunk->stride = stride;
|
||||
}
|
||||
echo_cancel_run(impl->aec_info, impl->aec, rec, play, out, size / sizeof(float));
|
||||
|
||||
spa_ringbuffer_read_update(&impl->rec_ring, rindex + size);
|
||||
spa_ringbuffer_read_update(&impl->play_ring, pindex + size);
|
||||
|
||||
echo_cancel_run(impl->aec_info, impl->aec, rec, play, out, size / sizeof(float));
|
||||
|
||||
done:
|
||||
if (cin != NULL)
|
||||
pw_stream_queue_buffer(impl->capture, cin);
|
||||
if (cout != NULL)
|
||||
pw_stream_queue_buffer(impl->source, cout);
|
||||
if (pin != NULL)
|
||||
pw_stream_queue_buffer(impl->sink, pin);
|
||||
if (pout != NULL)
|
||||
pw_stream_queue_buffer(impl->playback, pout);
|
||||
|
||||
|
|
@ -198,12 +221,64 @@ static void capture_destroy(void *d)
|
|||
impl->capture = NULL;
|
||||
}
|
||||
|
||||
static void capture_process(void *d)
|
||||
static void capture_io_changed(void *data, uint32_t id, void *area, uint32_t size)
|
||||
{
|
||||
struct impl *impl = d;
|
||||
impl->capture_ready = true;
|
||||
if (impl->sink_ready)
|
||||
process(impl);
|
||||
struct impl *impl = data;
|
||||
|
||||
switch (id) {
|
||||
case SPA_IO_RateMatch:
|
||||
impl->rec_rate_match = area;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void capture_process(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
struct pw_buffer *buf;
|
||||
struct spa_data *d;
|
||||
uint32_t i, index, size;
|
||||
int32_t avail;
|
||||
|
||||
if ((buf = pw_stream_dequeue_buffer(impl->capture)) == NULL) {
|
||||
pw_log_warn("out of capture buffers: %m");
|
||||
return;
|
||||
}
|
||||
|
||||
avail = spa_ringbuffer_get_write_index(&impl->rec_ring, &index);
|
||||
size = buf->buffer->datas[0].chunk->size;
|
||||
if (avail + size >= impl->rec_ringsize) {
|
||||
pw_log_warn("capture ringbuffer xrun %d + %u > %u",
|
||||
avail, size, impl->rec_ringsize);
|
||||
/* FIXME: drop what we overwrite */
|
||||
}
|
||||
|
||||
/* If we don't know what size to push yet, keep the block size the same
|
||||
* on input and output or what the resampler needs */
|
||||
if (impl->rec_bufsize == 0) {
|
||||
impl->rec_bufsize = SPA_MAX(size, impl->rec_rate_match->size);
|
||||
pw_log_debug("Setting capture buffer size to %u", impl->rec_bufsize);
|
||||
}
|
||||
|
||||
for (i = 0; i < impl->info.channels; i++) {
|
||||
/* captured samples, with echo from sink */
|
||||
d = &buf->buffer->datas[i];
|
||||
|
||||
spa_ringbuffer_write_data(&impl->rec_ring, impl->rec_buffer[i],
|
||||
impl->rec_ringsize, index % impl->rec_ringsize,
|
||||
SPA_PTROFF(d->data, d->chunk->offset, void),
|
||||
d->chunk->size);
|
||||
}
|
||||
|
||||
spa_ringbuffer_write_update(&impl->rec_ring, index + size);
|
||||
|
||||
if (avail + size >= impl->rec_bufsize) {
|
||||
impl->capture_ready = true;
|
||||
if (impl->sink_ready)
|
||||
process(impl);
|
||||
}
|
||||
|
||||
pw_stream_queue_buffer(impl->capture, buf);
|
||||
}
|
||||
|
||||
static void input_param_latency_changed(struct impl *impl, const struct spa_pod *param)
|
||||
|
|
@ -238,6 +313,7 @@ static void input_param_changed(void *data, uint32_t id, const struct spa_pod *p
|
|||
static const struct pw_stream_events capture_events = {
|
||||
PW_VERSION_STREAM_EVENTS,
|
||||
.destroy = capture_destroy,
|
||||
.io_changed = capture_io_changed,
|
||||
.process = capture_process,
|
||||
.param_changed = input_param_changed
|
||||
};
|
||||
|
|
@ -291,12 +367,64 @@ static void sink_destroy(void *d)
|
|||
impl->sink = NULL;
|
||||
}
|
||||
|
||||
static void sink_process(void *d)
|
||||
static void sink_io_changed(void *data, uint32_t id, void *area, uint32_t size)
|
||||
{
|
||||
struct impl *impl = d;
|
||||
impl->sink_ready = true;
|
||||
if (impl->capture_ready)
|
||||
process(impl);
|
||||
struct impl *impl = data;
|
||||
|
||||
switch (id) {
|
||||
case SPA_IO_RateMatch:
|
||||
impl->play_rate_match = area;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void sink_process(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
struct pw_buffer *buf;
|
||||
struct spa_data *d;
|
||||
uint32_t i, index, size;
|
||||
int32_t avail;
|
||||
|
||||
if ((buf = pw_stream_dequeue_buffer(impl->sink)) == NULL) {
|
||||
pw_log_warn("out of sink buffers: %m");
|
||||
return;
|
||||
}
|
||||
|
||||
avail = spa_ringbuffer_get_write_index(&impl->play_ring, &index);
|
||||
size = buf->buffer->datas[0].chunk->size;
|
||||
if (avail + size >= impl->play_ringsize) {
|
||||
pw_log_warn("sink ringbuffer xrun %d + %u > %u",
|
||||
avail, size, impl->play_ringsize);
|
||||
/* FIXME: drop what we overwrite */
|
||||
}
|
||||
|
||||
/* If we don't know what size to push yet, keep the block size the same
|
||||
* on input and output or what the resampler needs */
|
||||
if (impl->play_bufsize == 0) {
|
||||
impl->play_bufsize = SPA_MAX(size, impl->play_rate_match->size);
|
||||
pw_log_debug("Setting sink buffer size to %u", impl->play_bufsize);
|
||||
}
|
||||
|
||||
for (i = 0; i < impl->info.channels; i++) {
|
||||
/* echo from sink */
|
||||
d = &buf->buffer->datas[i];
|
||||
|
||||
spa_ringbuffer_write_data(&impl->play_ring, impl->play_buffer[i],
|
||||
impl->play_ringsize, index % impl->play_ringsize,
|
||||
SPA_PTROFF(d->data, d->chunk->offset, void),
|
||||
d->chunk->size);
|
||||
}
|
||||
|
||||
spa_ringbuffer_write_update(&impl->play_ring, index + size);
|
||||
|
||||
if (avail + size >= impl->play_bufsize) {
|
||||
impl->sink_ready = true;
|
||||
if (impl->capture_ready)
|
||||
process(impl);
|
||||
}
|
||||
|
||||
pw_stream_queue_buffer(impl->sink, buf);
|
||||
}
|
||||
|
||||
static void playback_destroy(void *d)
|
||||
|
|
@ -314,6 +442,7 @@ static const struct pw_stream_events playback_events = {
|
|||
static const struct pw_stream_events sink_events = {
|
||||
PW_VERSION_STREAM_EVENTS,
|
||||
.destroy = sink_destroy,
|
||||
.io_changed = sink_io_changed,
|
||||
.process = sink_process,
|
||||
.param_changed = output_param_changed
|
||||
};
|
||||
|
|
@ -321,7 +450,7 @@ static const struct pw_stream_events sink_events = {
|
|||
static int setup_streams(struct impl *impl)
|
||||
{
|
||||
int res;
|
||||
uint32_t n_params;
|
||||
uint32_t n_params, i;
|
||||
const struct spa_pod *params[1];
|
||||
uint8_t buffer[1024];
|
||||
struct spa_pod_builder b;
|
||||
|
|
@ -416,6 +545,15 @@ static int setup_streams(struct impl *impl)
|
|||
params, n_params)) < 0)
|
||||
return res;
|
||||
|
||||
impl->rec_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000;
|
||||
impl->play_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000;
|
||||
for (i = 0; i < impl->info.channels; i++) {
|
||||
impl->rec_buffer[i] = malloc(impl->rec_ringsize);
|
||||
impl->play_buffer[i] = malloc(impl->play_ringsize);
|
||||
}
|
||||
spa_ringbuffer_init(&impl->rec_ring);
|
||||
spa_ringbuffer_init(&impl->play_ring);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -449,6 +587,7 @@ static const struct pw_proxy_events core_proxy_events = {
|
|||
|
||||
static void impl_destroy(struct impl *impl)
|
||||
{
|
||||
uint32_t i;
|
||||
if (impl->capture)
|
||||
pw_stream_destroy(impl->capture);
|
||||
if (impl->source)
|
||||
|
|
@ -463,7 +602,16 @@ static void impl_destroy(struct impl *impl)
|
|||
echo_cancel_destroy(impl->aec_info, impl->aec);
|
||||
pw_properties_free(impl->source_props);
|
||||
pw_properties_free(impl->sink_props);
|
||||
|
||||
pw_work_queue_cancel(impl->work, impl, SPA_ID_INVALID);
|
||||
|
||||
for (i = 0; i < impl->info.channels; i++) {
|
||||
if (impl->rec_buffer[i])
|
||||
free(impl->rec_buffer[i]);
|
||||
if (impl->play_buffer[i])
|
||||
free(impl->play_buffer[i]);
|
||||
}
|
||||
|
||||
free(impl);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue