diff --git a/pipewire-alsa/alsa-plugins/pcm_pipewire.c b/pipewire-alsa/alsa-plugins/pcm_pipewire.c index c76f931d3..ab9bad88f 100644 --- a/pipewire-alsa/alsa-plugins/pcm_pipewire.c +++ b/pipewire-alsa/alsa-plugins/pcm_pipewire.c @@ -60,13 +60,15 @@ typedef struct { uint32_t target; int fd; - bool activated; /* PipeWire is activated? */ - bool error; + unsigned int activated:1; /* PipeWire is activated? */ + unsigned int error:1; + unsigned int drained:1; + unsigned int draining:1; - unsigned int num_ports; - unsigned int hw_ptr; - unsigned int sample_bits; + snd_pcm_uframes_t hw_ptr; + snd_pcm_uframes_t boundary; snd_pcm_uframes_t min_avail; + unsigned int sample_bits; struct spa_system *system; struct pw_thread_loop *main_loop; @@ -91,7 +93,10 @@ static int pcm_poll_block_check(snd_pcm_ioplug_t *io) snd_pcm_sframes_t avail; snd_pcm_pipewire_t *pw = io->private_data; - if (io->state == SND_PCM_STATE_RUNNING || + if (io->state == SND_PCM_STATE_DRAINING) { + spa_system_eventfd_read(pw->system, io->poll_fd, &val); + return 0; + } else if (io->state == SND_PCM_STATE_RUNNING || (io->state == SND_PCM_STATE_PREPARED && io->stream == SND_PCM_STREAM_CAPTURE)) { avail = snd_pcm_avail_update(io->pcm); if (avail >= 0 && avail < (snd_pcm_sframes_t)pw->min_avail) { @@ -99,7 +104,6 @@ static int pcm_poll_block_check(snd_pcm_ioplug_t *io) return 1; } } - return 0; } @@ -112,22 +116,25 @@ static inline int pcm_poll_unblock_check(snd_pcm_ioplug_t *io) static void snd_pcm_pipewire_free(snd_pcm_pipewire_t *pw) { - if (pw) { - if (pw->main_loop) - pw_thread_loop_stop(pw->main_loop); - if (pw->context) - pw_context_destroy(pw->context); - if (pw->fd >= 0) - spa_system_close(pw->system, pw->fd); - if (pw->main_loop) - pw_thread_loop_destroy(pw->main_loop); - free(pw); - } + if (pw == NULL) + return; + + pw_log_debug(NAME" %p:", pw); + if (pw->main_loop) + pw_thread_loop_stop(pw->main_loop); + if (pw->context) + pw_context_destroy(pw->context); + if (pw->fd >= 0) + spa_system_close(pw->system, pw->fd); + if (pw->main_loop) + pw_thread_loop_destroy(pw->main_loop); + free(pw); } static int snd_pcm_pipewire_close(snd_pcm_ioplug_t *io) { snd_pcm_pipewire_t *pw = io->private_data; + pw_log_debug(NAME" %p:", pw); snd_pcm_pipewire_free(pw); return 0; } @@ -153,15 +160,19 @@ static int snd_pcm_pipewire_poll_revents(snd_pcm_ioplug_t *io, static snd_pcm_sframes_t snd_pcm_pipewire_pointer(snd_pcm_ioplug_t *io) { snd_pcm_pipewire_t *pw = io->private_data; + snd_pcm_sframes_t hw_ptr = pw->hw_ptr; if (pw->error) return -EBADFD; - return pw->hw_ptr; + if (pw->draining && !pw->drained) + hw_ptr = hw_ptr > 1 ? hw_ptr - 1 : (snd_pcm_sframes_t)(pw->boundary - 1); + + return hw_ptr; } static int -snd_pcm_pipewire_process_playback(snd_pcm_pipewire_t *pw, struct pw_buffer *b) +snd_pcm_pipewire_process_playback(snd_pcm_pipewire_t *pw, struct pw_buffer *b, snd_pcm_uframes_t *hw_avail) { snd_pcm_ioplug_t *io = &pw->io; const snd_pcm_channel_area_t *areas; @@ -187,6 +198,7 @@ snd_pcm_pipewire_process_playback(snd_pcm_pipewire_t *pw, struct pw_buffer *b) index = 0; avail = maxsize - filled; avail = SPA_MIN(avail, pw->min_avail * bpf); + avail = SPA_MIN(avail, *hw_avail * bpf); do { offset = index % maxsize; @@ -215,7 +227,7 @@ snd_pcm_pipewire_process_playback(snd_pcm_pipewire_t *pw, struct pw_buffer *b) xfer = 0; while (xfer < nframes) { snd_pcm_uframes_t frames = nframes - xfer; - snd_pcm_uframes_t offset = pw->hw_ptr; + snd_pcm_uframes_t offset = pw->hw_ptr % io->buffer_size; snd_pcm_uframes_t cont = io->buffer_size - offset; if (cont < frames) @@ -226,11 +238,11 @@ snd_pcm_pipewire_process_playback(snd_pcm_pipewire_t *pw, struct pw_buffer *b) io->channels, frames, io->format); pw->hw_ptr += frames; - pw->hw_ptr %= io->buffer_size; + if (pw->hw_ptr > pw->boundary) + pw->hw_ptr -= pw->boundary; xfer += frames; } - - pcm_poll_unblock_check(io); /* unblock socket for polling if needed */ + *hw_avail -= xfer; done: index += nbytes; @@ -245,7 +257,7 @@ snd_pcm_pipewire_process_playback(snd_pcm_pipewire_t *pw, struct pw_buffer *b) } static int -snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b) +snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b, snd_pcm_uframes_t *hw_avail) { snd_pcm_ioplug_t *io = &pw->io; const snd_pcm_channel_area_t *areas; @@ -265,7 +277,7 @@ snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b) d = b->buffer->datas; maxsize = d[0].chunk->size; - avail = maxsize; + avail = SPA_MIN(maxsize, *hw_avail * bpf); index = d[0].chunk->offset; do { @@ -288,7 +300,7 @@ snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b) xfer = 0; while (xfer < nframes) { snd_pcm_uframes_t frames = nframes - xfer; - snd_pcm_uframes_t offset = pw->hw_ptr; + snd_pcm_uframes_t offset = pw->hw_ptr % io->buffer_size; snd_pcm_uframes_t cont = io->buffer_size - offset; if (cont < frames) @@ -299,12 +311,11 @@ snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b) io->channels, frames, io->format); pw->hw_ptr += frames; - pw->hw_ptr %= io->buffer_size; + if (pw->hw_ptr > pw->boundary) + pw->hw_ptr -= pw->boundary; xfer += frames; } - - pcm_poll_unblock_check(io); /* unblock socket for polling if needed */ - + *hw_avail -= xfer; avail -= nbytes; index += nbytes; } while (avail > 0); @@ -350,23 +361,47 @@ static void on_stream_process(void *data) snd_pcm_pipewire_t *pw = data; snd_pcm_ioplug_t *io = &pw->io; struct pw_buffer *b; + snd_pcm_uframes_t hw_avail; + + hw_avail = snd_pcm_ioplug_hw_avail(io, pw->hw_ptr, io->appl_ptr); + + if (pw->drained) { + pcm_poll_unblock_check(io); /* unblock socket for polling if needed */ + return; + } b = pw_stream_dequeue_buffer(pw->stream); if (b == NULL) return; if (io->stream == SND_PCM_STREAM_PLAYBACK) - snd_pcm_pipewire_process_playback(pw, b); + snd_pcm_pipewire_process_playback(pw, b, &hw_avail); else - snd_pcm_pipewire_process_record(pw, b); + snd_pcm_pipewire_process_record(pw, b, &hw_avail); pw_stream_queue_buffer(pw->stream, b); + + if (io->state == SND_PCM_STATE_DRAINING && !pw->draining && hw_avail == 0) { + pw_stream_flush(pw->stream, true); + pw->draining = true; + pw->drained = false; + } + pcm_poll_unblock_check(io); /* unblock socket for polling if needed */ +} + +static void on_stream_drained(void *data) +{ + snd_pcm_pipewire_t *pw = data; + pw->drained = true; + pw_log_debug(NAME" %p: drained", pw); + pw_thread_loop_signal(pw->main_loop, false); } static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .param_changed = on_stream_param_changed, .process = on_stream_process, + .drained = on_stream_drained, }; static int snd_pcm_pipewire_prepare(snd_pcm_ioplug_t *io) @@ -383,10 +418,13 @@ static int snd_pcm_pipewire_prepare(snd_pcm_ioplug_t *io) pw_thread_loop_lock(pw->main_loop); snd_pcm_sw_params_alloca(&swparams); - if ((res = snd_pcm_sw_params_current(io->pcm, swparams)) == 0) + if ((res = snd_pcm_sw_params_current(io->pcm, swparams)) == 0) { snd_pcm_sw_params_get_avail_min(swparams, &pw->min_avail); - else + snd_pcm_sw_params_get_boundary(swparams, &pw->boundary); + } else { pw->min_avail = io->period_size; + pw->boundary = io->buffer_size; + } min_period = (MIN_PERIOD * io->rate / 48000); pw->min_avail = SPA_MAX(pw->min_avail, min_period); @@ -447,6 +485,7 @@ static int snd_pcm_pipewire_start(snd_pcm_ioplug_t *io) snd_pcm_pipewire_t *pw = io->private_data; pw_thread_loop_lock(pw->main_loop); + pw_log_debug(NAME" %p:", pw); if (!pw->activated && pw->stream != NULL) { pw_stream_set_active(pw->stream, true); pw->activated = true; @@ -459,6 +498,9 @@ static int snd_pcm_pipewire_stop(snd_pcm_ioplug_t *io) { snd_pcm_pipewire_t *pw = io->private_data; + pw_log_debug(NAME" %p:", pw); + pcm_poll_unblock_check(io); + pw_thread_loop_lock(pw->main_loop); if (pw->activated && pw->stream != NULL) { pw_stream_set_active(pw->stream, false);