diff --git a/src/modules/module-pipe-tunnel.c b/src/modules/module-pipe-tunnel.c index c95edbfc3..3f9aaa86f 100644 --- a/src/modules/module-pipe-tunnel.c +++ b/src/modules/module-pipe-tunnel.c @@ -141,6 +141,7 @@ static const struct spa_dict_item module_props[] = { struct impl { struct pw_context *context; + struct pw_loop *main_loop; struct pw_loop *data_loop; #define MODE_PLAYBACK 0 @@ -174,6 +175,8 @@ struct impl { unsigned int do_disconnect:1; unsigned int driving:1; unsigned int have_sync:1; + unsigned int may_pause:1; + unsigned int paused:1; struct spa_ringbuffer ring; void *buffer; @@ -262,7 +265,7 @@ static void stream_state_changed(void *d, enum pw_stream_state old, break; case PW_STREAM_STATE_PAUSED: if (impl->direction == PW_DIRECTION_OUTPUT) { - pw_loop_update_io(impl->data_loop, impl->socket, 0); + pw_loop_update_io(impl->data_loop, impl->socket, impl->paused ? SPA_IO_IN : 0); set_timeout(impl, 0); } break; @@ -281,6 +284,26 @@ static void stream_state_changed(void *d, enum pw_stream_state old, } } +static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, const void *data, + size_t size, void *user_data) +{ + struct impl *impl = user_data; + const bool *paused = data; + pw_log_info("set paused: %d", *paused); + impl->paused = *paused; + pw_stream_set_active(impl->stream, !*paused); + return 0; +} + +static void pause_stream(struct impl *impl, bool paused) +{ + if (!impl->may_pause) + return; + if (impl->direction == PW_DIRECTION_INPUT) + pw_loop_update_io(impl->data_loop, impl->socket, paused ? SPA_IO_OUT : 0); + pw_loop_invoke(impl->main_loop, do_pause, 1, &paused, sizeof(bool), false, impl); +} + static void playback_stream_process(void *data) { struct impl *impl = data; @@ -308,8 +331,8 @@ static void playback_stream_process(void *data) continue; } else if (errno == EAGAIN || errno == EWOULDBLOCK) { /* Don't continue writing */ - pw_log_debug("pipe (%s) overrun: %m", - impl->filename); + pw_log_debug("pipe (%s) overrun: %m", impl->filename); + pause_stream(impl, true); break; } else { pw_log_warn("Failed to write to pipe (%s): %m", @@ -370,8 +393,10 @@ static void capture_stream_process(void *data) if (avail < (int32_t)size) { memset(bd->data, 0, size); - if (avail > 0) + if (avail >= 0) { pw_log_warn("underrun %d < %u", avail, size); + pause_stream(impl, true); + } impl->have_sync = false; } if (avail > (int32_t)RINGBUFFER_SIZE) { @@ -456,6 +481,8 @@ static int create_stream(struct impl *impl) params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &impl->info); + impl->paused = false; + if ((res = pw_stream_connect(impl->stream, impl->direction, PW_ID_ANY, @@ -550,6 +577,8 @@ static void on_pipe_io(void *data, int fd, uint32_t mask) pw_loop_update_io(impl->data_loop, impl->socket, 0); return; } + if (impl->paused) + pause_stream(impl, false); if (mask & SPA_IO_IN) handle_pipe_read(impl); } @@ -839,6 +868,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->context = context; + impl->main_loop = pw_context_get_main_loop(context); data_loop = pw_context_get_data_loop(context); impl->data_loop = pw_data_loop_get_loop(data_loop); @@ -848,16 +878,20 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (spa_streq(str, "capture")) { impl->mode = MODE_CAPTURE; impl->direction = PW_DIRECTION_INPUT; + impl->may_pause = true; } else if (spa_streq(str, "playback")) { impl->mode = MODE_PLAYBACK; impl->direction = PW_DIRECTION_OUTPUT; + impl->may_pause = true; }else if (spa_streq(str, "sink")) { impl->mode = MODE_SINK; impl->direction = PW_DIRECTION_INPUT; + impl->may_pause = false; media_class = "Audio/Sink"; } else if (spa_streq(str, "source")) { impl->mode = MODE_SOURCE; impl->direction = PW_DIRECTION_OUTPUT; + impl->may_pause = false; media_class = "Audio/Source"; } else { pw_log_error("invalid tunnel.mode '%s'", str);