diff --git a/src/modules/module-echo-cancel.c b/src/modules/module-echo-cancel.c index 6bd9a405b..526858b97 100644 --- a/src/modules/module-echo-cancel.c +++ b/src/modules/module-echo-cancel.c @@ -56,6 +56,9 @@ #include "module-echo-cancel/echo-cancel.h" #define NAME "echo-cancel" +/* Hopefully this is enough for any combination of AEC engine and resampler + * input requirement for rate matching */ +#define MAX_BUFSIZE_MS 100 static const struct spa_dict_item module_props[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, @@ -92,12 +95,22 @@ struct impl { struct pw_properties *source_props; struct pw_stream *source; struct spa_hook source_listener; + void *rec_buffer[SPA_AUDIO_MAX_CHANNELS]; + uint32_t rec_ringsize; + uint32_t rec_bufsize; + struct spa_ringbuffer rec_ring; + struct spa_io_rate_match *rec_rate_match; struct pw_stream *playback; struct spa_hook playback_listener; struct pw_properties *sink_props; struct pw_stream *sink; struct spa_hook sink_listener; + void *play_buffer[SPA_AUDIO_MAX_CHANNELS]; + uint32_t play_ringsize; + uint32_t play_bufsize; + struct spa_ringbuffer play_ring; + struct spa_io_rate_match *play_rate_match; const struct echo_cancel_info *aec_info; void *aec; @@ -124,37 +137,43 @@ static void unload_module(struct impl *impl) static void process(struct impl *impl) { - struct pw_buffer *cin, *cout; - struct pw_buffer *pin, *pout; - const float *rec[SPA_AUDIO_MAX_CHANNELS]; - const float *play[SPA_AUDIO_MAX_CHANNELS]; - float *out[SPA_AUDIO_MAX_CHANNELS]; - struct spa_data *ds, *dd; + struct pw_buffer *cout; + struct pw_buffer *pout; + float rec_buf[impl->info.channels][impl->rec_bufsize / sizeof(float)]; + float play_buf[impl->info.channels][impl->play_bufsize / sizeof(float)]; + const float *rec[impl->info.channels]; + const float *play[impl->info.channels]; + float *out[impl->info.channels]; + struct spa_data *dd; uint32_t i, size = 0; + uint32_t rindex, pindex; int32_t stride = 0; - if ((cin = pw_stream_dequeue_buffer(impl->capture)) == NULL) - pw_log_warn("out of capture buffers: %m"); - if ((cout = pw_stream_dequeue_buffer(impl->source)) == NULL) - pw_log_warn("out of source buffers: %m"); - - if ((pin = pw_stream_dequeue_buffer(impl->sink)) == NULL) - pw_log_warn("out of sink buffers: %m"); + pw_log_debug("out of source buffers: %m"); if ((pout = pw_stream_dequeue_buffer(impl->playback)) == NULL) - pw_log_warn("out of playback buffers: %m"); + pw_log_debug("out of playback buffers: %m"); - if (cin == NULL || cout == NULL || pin == NULL || pout == NULL) + if (impl->rec_bufsize != impl->play_bufsize) { + pw_log_warn("mismatched buffer sizes"); goto done; + } + + if (cout == NULL || pout == NULL) + goto done; + + spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex); + spa_ringbuffer_get_read_index(&impl->play_ring, &pindex); for (i = 0; i < impl->info.channels; i++) { /* captured samples, with echo from sink */ - ds = &cin->buffer->datas[i]; - rec[i] = SPA_PTROFF(ds->data, ds->chunk->offset, void); - - size = ds->chunk->size; - stride = ds->chunk->stride; + stride = 0; + size = impl->rec_bufsize; + spa_ringbuffer_read_data(&impl->rec_ring, impl->rec_buffer[i], + impl->rec_ringsize, rindex % impl->rec_ringsize, + (void*)rec_buf[i], size); + rec[i] = &rec_buf[i][0]; /* filtered samples, without echo from sink */ dd = &cout->buffer->datas[i]; @@ -164,8 +183,12 @@ static void process(struct impl *impl) dd->chunk->stride = stride; /* echo from sink */ - ds = &pin->buffer->datas[i]; - play[i] = SPA_PTROFF(ds->data, ds->chunk->offset, void); + stride = 0; + size = impl->play_bufsize; + spa_ringbuffer_read_data(&impl->play_ring, impl->play_buffer[i], + impl->play_ringsize, pindex % impl->play_ringsize, + (void *)play_buf[i], size); + play[i] = &play_buf[i][0]; /* output to sink, just copy */ dd = &pout->buffer->datas[i]; @@ -175,15 +198,15 @@ static void process(struct impl *impl) dd->chunk->size = size; dd->chunk->stride = stride; } - echo_cancel_run(impl->aec_info, impl->aec, rec, play, out, size / sizeof(float)); + + spa_ringbuffer_read_update(&impl->rec_ring, rindex + size); + spa_ringbuffer_read_update(&impl->play_ring, pindex + size); + + echo_cancel_run(impl->aec_info, impl->aec, rec, play, out, size / sizeof(float)); done: - if (cin != NULL) - pw_stream_queue_buffer(impl->capture, cin); if (cout != NULL) pw_stream_queue_buffer(impl->source, cout); - if (pin != NULL) - pw_stream_queue_buffer(impl->sink, pin); if (pout != NULL) pw_stream_queue_buffer(impl->playback, pout); @@ -198,12 +221,64 @@ static void capture_destroy(void *d) impl->capture = NULL; } -static void capture_process(void *d) +static void capture_io_changed(void *data, uint32_t id, void *area, uint32_t size) { - struct impl *impl = d; - impl->capture_ready = true; - if (impl->sink_ready) - process(impl); + struct impl *impl = data; + + switch (id) { + case SPA_IO_RateMatch: + impl->rec_rate_match = area; + break; + } +} + +static void capture_process(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t i, index, size; + int32_t avail; + + if ((buf = pw_stream_dequeue_buffer(impl->capture)) == NULL) { + pw_log_warn("out of capture buffers: %m"); + return; + } + + avail = spa_ringbuffer_get_write_index(&impl->rec_ring, &index); + size = buf->buffer->datas[0].chunk->size; + if (avail + size >= impl->rec_ringsize) { + pw_log_warn("capture ringbuffer xrun %d + %u > %u", + avail, size, impl->rec_ringsize); + /* FIXME: drop what we overwrite */ + } + + /* If we don't know what size to push yet, keep the block size the same + * on input and output or what the resampler needs */ + if (impl->rec_bufsize == 0) { + impl->rec_bufsize = SPA_MAX(size, impl->rec_rate_match->size); + pw_log_debug("Setting capture buffer size to %u", impl->rec_bufsize); + } + + for (i = 0; i < impl->info.channels; i++) { + /* captured samples, with echo from sink */ + d = &buf->buffer->datas[i]; + + spa_ringbuffer_write_data(&impl->rec_ring, impl->rec_buffer[i], + impl->rec_ringsize, index % impl->rec_ringsize, + SPA_PTROFF(d->data, d->chunk->offset, void), + d->chunk->size); + } + + spa_ringbuffer_write_update(&impl->rec_ring, index + size); + + if (avail + size >= impl->rec_bufsize) { + impl->capture_ready = true; + if (impl->sink_ready) + process(impl); + } + + pw_stream_queue_buffer(impl->capture, buf); } static void input_param_latency_changed(struct impl *impl, const struct spa_pod *param) @@ -238,6 +313,7 @@ static void input_param_changed(void *data, uint32_t id, const struct spa_pod *p static const struct pw_stream_events capture_events = { PW_VERSION_STREAM_EVENTS, .destroy = capture_destroy, + .io_changed = capture_io_changed, .process = capture_process, .param_changed = input_param_changed }; @@ -291,12 +367,64 @@ static void sink_destroy(void *d) impl->sink = NULL; } -static void sink_process(void *d) +static void sink_io_changed(void *data, uint32_t id, void *area, uint32_t size) { - struct impl *impl = d; - impl->sink_ready = true; - if (impl->capture_ready) - process(impl); + struct impl *impl = data; + + switch (id) { + case SPA_IO_RateMatch: + impl->play_rate_match = area; + break; + } +} + +static void sink_process(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t i, index, size; + int32_t avail; + + if ((buf = pw_stream_dequeue_buffer(impl->sink)) == NULL) { + pw_log_warn("out of sink buffers: %m"); + return; + } + + avail = spa_ringbuffer_get_write_index(&impl->play_ring, &index); + size = buf->buffer->datas[0].chunk->size; + if (avail + size >= impl->play_ringsize) { + pw_log_warn("sink ringbuffer xrun %d + %u > %u", + avail, size, impl->play_ringsize); + /* FIXME: drop what we overwrite */ + } + + /* If we don't know what size to push yet, keep the block size the same + * on input and output or what the resampler needs */ + if (impl->play_bufsize == 0) { + impl->play_bufsize = SPA_MAX(size, impl->play_rate_match->size); + pw_log_debug("Setting sink buffer size to %u", impl->play_bufsize); + } + + for (i = 0; i < impl->info.channels; i++) { + /* echo from sink */ + d = &buf->buffer->datas[i]; + + spa_ringbuffer_write_data(&impl->play_ring, impl->play_buffer[i], + impl->play_ringsize, index % impl->play_ringsize, + SPA_PTROFF(d->data, d->chunk->offset, void), + d->chunk->size); + } + + spa_ringbuffer_write_update(&impl->play_ring, index + size); + + if (avail + size >= impl->play_bufsize) { + impl->sink_ready = true; + if (impl->capture_ready) + process(impl); + } + + pw_stream_queue_buffer(impl->sink, buf); } static void playback_destroy(void *d) @@ -314,6 +442,7 @@ static const struct pw_stream_events playback_events = { static const struct pw_stream_events sink_events = { PW_VERSION_STREAM_EVENTS, .destroy = sink_destroy, + .io_changed = sink_io_changed, .process = sink_process, .param_changed = output_param_changed }; @@ -321,7 +450,7 @@ static const struct pw_stream_events sink_events = { static int setup_streams(struct impl *impl) { int res; - uint32_t n_params; + uint32_t n_params, i; const struct spa_pod *params[1]; uint8_t buffer[1024]; struct spa_pod_builder b; @@ -416,6 +545,15 @@ static int setup_streams(struct impl *impl) params, n_params)) < 0) return res; + impl->rec_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000; + impl->play_ringsize = sizeof(float) * MAX_BUFSIZE_MS * impl->info.rate / 1000; + for (i = 0; i < impl->info.channels; i++) { + impl->rec_buffer[i] = malloc(impl->rec_ringsize); + impl->play_buffer[i] = malloc(impl->play_ringsize); + } + spa_ringbuffer_init(&impl->rec_ring); + spa_ringbuffer_init(&impl->play_ring); + return 0; } @@ -449,6 +587,7 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { + uint32_t i; if (impl->capture) pw_stream_destroy(impl->capture); if (impl->source) @@ -463,7 +602,16 @@ static void impl_destroy(struct impl *impl) echo_cancel_destroy(impl->aec_info, impl->aec); pw_properties_free(impl->source_props); pw_properties_free(impl->sink_props); + pw_work_queue_cancel(impl->work, impl, SPA_ID_INVALID); + + for (i = 0; i < impl->info.channels; i++) { + if (impl->rec_buffer[i]) + free(impl->rec_buffer[i]); + if (impl->play_buffer[i]) + free(impl->play_buffer[i]); + } + free(impl); }