module-echo-cancel: Sync capture and sink buffers

Call process() when capture and sink ringbuffers contain data from the
same graph cycle and only process the latest block from them to avoid
adding latency that can accumulate if one of the streams gets more than
one buffer before the other gets its first buffer when starting up.
This commit is contained in:
Jonas Holmberg 2025-10-22 13:50:24 +02:00
parent c6d0b364ab
commit 614186a590

View file

@ -229,8 +229,10 @@ struct impl {
struct spa_audio_aec *aec;
uint32_t aec_blocksize;
unsigned int capture_ready:1;
unsigned int sink_ready:1;
struct spa_io_position *capture_position;
struct spa_io_position *sink_position;
uint32_t capture_cycle;
uint32_t sink_cycle;
unsigned int do_disconnect:1;
@ -309,11 +311,17 @@ static void process(struct impl *impl)
struct spa_data *dd;
uint32_t i, size;
uint32_t rindex, pindex, oindex, pdindex, avail;
int32_t pavail, pdavail;
size = impl->aec_blocksize;
/* First read a block from the playback and capture ring buffers */
spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
/* First read a block from the capture ring buffer */
avail = spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
while (avail > size) {
/* drop samples from previous graph cycles */
spa_ringbuffer_read_update(&impl->rec_ring, rindex + size);
avail = spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
}
for (i = 0; i < impl->rec_info.channels; i++) {
/* captured samples, with echo from sink */
@ -331,19 +339,30 @@ static void process(struct impl *impl)
out[i] = &out_buf[i][0];
}
spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
spa_ringbuffer_get_read_index(&impl->play_delayed_ring, &pdindex);
pavail = spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
pdavail = spa_ringbuffer_get_read_index(&impl->play_delayed_ring, &pdindex);
if (impl->playback != NULL && (pout = pw_stream_dequeue_buffer(impl->playback)) == NULL) {
pw_log_debug("out of playback buffers: %m");
/* playback stream may not yet be in streaming state, drop play
* data to avoid introducing additional playback latency */
spa_ringbuffer_read_update(&impl->play_ring, pindex + size);
spa_ringbuffer_read_update(&impl->play_delayed_ring, pdindex + size);
spa_ringbuffer_read_update(&impl->play_ring, pindex + pavail);
spa_ringbuffer_read_update(&impl->play_delayed_ring, pdindex + pdavail);
goto done;
}
while (pavail > size) {
/* drop samples from previous graph cycles */
spa_ringbuffer_read_update(&impl->play_ring, pindex + size);
pavail = spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
}
while (pdavail > size) {
/* drop samples from previous graph cycles */
spa_ringbuffer_read_update(&impl->play_delayed_ring, pdindex + size);
pdavail = spa_ringbuffer_get_read_index(&impl->play_delayed_ring, &pdindex);
}
for (i = 0; i < impl->play_info.channels; i++) {
/* echo from sink */
play[i] = &play_buf[i][0];
@ -454,8 +473,8 @@ static void process(struct impl *impl)
}
done:
impl->sink_ready = false;
impl->capture_ready = false;
impl->capture_cycle = 0;
impl->sink_cycle = 0;
}
static void reset_buffers(struct impl *impl)
@ -479,8 +498,8 @@ static void reset_buffers(struct impl *impl)
spa_ringbuffer_get_read_index(&impl->play_ring, &index);
spa_ringbuffer_read_update(&impl->play_ring, index + (sizeof(float) * (impl->buffer_delay)));
impl->sink_ready = false;
impl->capture_ready = false;
impl->capture_cycle = 0;
impl->sink_cycle = 0;
}
static void capture_destroy(void *d)
@ -546,8 +565,11 @@ static void capture_process(void *data)
spa_ringbuffer_write_update(&impl->rec_ring, index + size);
if (avail + size >= impl->aec_blocksize) {
impl->capture_ready = true;
if (impl->sink_ready)
if (impl->capture_position)
impl->capture_cycle = impl->capture_position->clock.cycle;
else
pw_log_warn("no capture position");
if (impl->capture_cycle == impl->sink_cycle)
process(impl);
}
@ -740,12 +762,26 @@ static void input_param_changed(void *data, uint32_t id, const struct spa_pod* p
}
}
static void capture_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct impl *impl = data;
switch (id) {
case SPA_IO_Position:
impl->capture_position = area;
break;
default:
break;
}
}
static const struct pw_stream_events capture_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = capture_destroy,
.state_changed = capture_state_changed,
.process = capture_process,
.param_changed = input_param_changed
.param_changed = input_param_changed,
.io_changed = capture_io_changed
};
static void source_destroy(void *d)
@ -930,10 +966,15 @@ static void sink_process(void *data)
SPA_PTROFF(d->data, offs, void), size);
}
spa_ringbuffer_write_update(&impl->play_ring, index + size);
spa_ringbuffer_get_write_index(&impl->play_delayed_ring, &index);
spa_ringbuffer_write_update(&impl->play_delayed_ring, index + size);
if (avail + size >= impl->aec_blocksize) {
impl->sink_ready = true;
if (impl->capture_ready)
if (impl->sink_position)
impl->sink_cycle = impl->sink_position->clock.cycle;
else
pw_log_warn("no sink position");
if (impl->capture_cycle == impl->sink_cycle)
process(impl);
}
@ -955,12 +996,27 @@ static const struct pw_stream_events playback_events = {
.state_changed = playback_state_changed,
.param_changed = output_param_changed
};
static void sink_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct impl *impl = data;
switch (id) {
case SPA_IO_Position:
impl->sink_position = area;
break;
default:
break;
}
}
static const struct pw_stream_events sink_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = sink_destroy,
.process = sink_process,
.state_changed = sink_state_changed,
.param_changed = output_param_changed
.param_changed = output_param_changed,
.io_changed = sink_io_changed
};
#define MAX_PARAMS 512u