Merge branch 'master' into 'master'

module-echo-cancel: Sync capture and sink buffers

See merge request pipewire/pipewire!2572
This commit is contained in:
Jonas Holmberg 2025-10-27 07:44:03 +00:00
commit 1227785f25

View file

@ -229,8 +229,10 @@ struct impl {
struct spa_audio_aec *aec;
uint32_t aec_blocksize;
unsigned int capture_ready:1;
unsigned int sink_ready:1;
struct spa_io_position *capture_position;
struct spa_io_position *sink_position;
uint32_t capture_cycle;
uint32_t sink_cycle;
unsigned int do_disconnect:1;
@ -309,11 +311,17 @@ static void process(struct impl *impl)
struct spa_data *dd;
uint32_t i, size;
uint32_t rindex, pindex, oindex, pdindex, avail;
int32_t pavail, pdavail;
size = impl->aec_blocksize;
/* First read a block from the playback and capture ring buffers */
spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
/* First read a block from the capture ring buffer */
avail = spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
while (avail > size) {
/* drop samples from previous graph cycles */
spa_ringbuffer_read_update(&impl->rec_ring, rindex + size);
avail = spa_ringbuffer_get_read_index(&impl->rec_ring, &rindex);
}
for (i = 0; i < impl->rec_info.channels; i++) {
/* captured samples, with echo from sink */
@ -331,19 +339,30 @@ static void process(struct impl *impl)
out[i] = &out_buf[i][0];
}
spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
spa_ringbuffer_get_read_index(&impl->play_delayed_ring, &pdindex);
pavail = spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
pdavail = spa_ringbuffer_get_read_index(&impl->play_delayed_ring, &pdindex);
if (impl->playback != NULL && (pout = pw_stream_dequeue_buffer(impl->playback)) == NULL) {
pw_log_debug("out of playback buffers: %m");
/* playback stream may not yet be in streaming state, drop play
* data to avoid introducing additional playback latency */
spa_ringbuffer_read_update(&impl->play_ring, pindex + size);
spa_ringbuffer_read_update(&impl->play_delayed_ring, pdindex + size);
spa_ringbuffer_read_update(&impl->play_ring, pindex + pavail);
spa_ringbuffer_read_update(&impl->play_delayed_ring, pdindex + pdavail);
goto done;
}
while (pavail > size) {
/* drop samples from previous graph cycles */
spa_ringbuffer_read_update(&impl->play_ring, pindex + size);
pavail = spa_ringbuffer_get_read_index(&impl->play_ring, &pindex);
}
while (pdavail > size) {
/* drop samples from previous graph cycles */
spa_ringbuffer_read_update(&impl->play_delayed_ring, pdindex + size);
pdavail = spa_ringbuffer_get_read_index(&impl->play_delayed_ring, &pdindex);
}
for (i = 0; i < impl->play_info.channels; i++) {
/* echo from sink */
play[i] = &play_buf[i][0];
@ -454,8 +473,8 @@ static void process(struct impl *impl)
}
done:
impl->sink_ready = false;
impl->capture_ready = false;
impl->capture_cycle = 0;
impl->sink_cycle = 0;
}
static void reset_buffers(struct impl *impl)
@ -479,8 +498,8 @@ static void reset_buffers(struct impl *impl)
spa_ringbuffer_get_read_index(&impl->play_ring, &index);
spa_ringbuffer_read_update(&impl->play_ring, index + (sizeof(float) * (impl->buffer_delay)));
impl->sink_ready = false;
impl->capture_ready = false;
impl->capture_cycle = 0;
impl->sink_cycle = 0;
}
static void capture_destroy(void *d)
@ -546,8 +565,11 @@ static void capture_process(void *data)
spa_ringbuffer_write_update(&impl->rec_ring, index + size);
if (avail + size >= impl->aec_blocksize) {
impl->capture_ready = true;
if (impl->sink_ready)
if (impl->capture_position)
impl->capture_cycle = impl->capture_position->clock.cycle;
else
pw_log_warn("no capture position");
if (impl->capture_cycle == impl->sink_cycle)
process(impl);
}
@ -740,12 +762,26 @@ static void input_param_changed(void *data, uint32_t id, const struct spa_pod* p
}
}
static void capture_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct impl *impl = data;
switch (id) {
case SPA_IO_Position:
impl->capture_position = area;
break;
default:
break;
}
}
static const struct pw_stream_events capture_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = capture_destroy,
.state_changed = capture_state_changed,
.process = capture_process,
.param_changed = input_param_changed
.param_changed = input_param_changed,
.io_changed = capture_io_changed
};
static void source_destroy(void *d)
@ -930,10 +966,15 @@ static void sink_process(void *data)
SPA_PTROFF(d->data, offs, void), size);
}
spa_ringbuffer_write_update(&impl->play_ring, index + size);
spa_ringbuffer_get_write_index(&impl->play_delayed_ring, &index);
spa_ringbuffer_write_update(&impl->play_delayed_ring, index + size);
if (avail + size >= impl->aec_blocksize) {
impl->sink_ready = true;
if (impl->capture_ready)
if (impl->sink_position)
impl->sink_cycle = impl->sink_position->clock.cycle;
else
pw_log_warn("no sink position");
if (impl->capture_cycle == impl->sink_cycle)
process(impl);
}
@ -955,12 +996,27 @@ static const struct pw_stream_events playback_events = {
.state_changed = playback_state_changed,
.param_changed = output_param_changed
};
static void sink_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct impl *impl = data;
switch (id) {
case SPA_IO_Position:
impl->sink_position = area;
break;
default:
break;
}
}
static const struct pw_stream_events sink_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = sink_destroy,
.process = sink_process,
.state_changed = sink_state_changed,
.param_changed = output_param_changed
.param_changed = output_param_changed,
.io_changed = sink_io_changed
};
#define MAX_PARAMS 512u