modules: add pause support for pipe-tunnel

Streams are allowed to pause/resume when the pipe xruns.
Wait for IO_IN or IO_OUT when the ringbuffer xruns to resume playback or
capture.

Fixes #3197
This commit is contained in:
Wim Taymans 2023-11-15 12:52:10 +01:00
parent 179fbb51fc
commit 63385cedd0

View file

@ -141,6 +141,7 @@ static const struct spa_dict_item module_props[] = {
struct impl { struct impl {
struct pw_context *context; struct pw_context *context;
struct pw_loop *main_loop;
struct pw_loop *data_loop; struct pw_loop *data_loop;
#define MODE_PLAYBACK 0 #define MODE_PLAYBACK 0
@ -174,6 +175,8 @@ struct impl {
unsigned int do_disconnect:1; unsigned int do_disconnect:1;
unsigned int driving:1; unsigned int driving:1;
unsigned int have_sync:1; unsigned int have_sync:1;
unsigned int may_pause:1;
unsigned int paused:1;
struct spa_ringbuffer ring; struct spa_ringbuffer ring;
void *buffer; void *buffer;
@ -262,7 +265,7 @@ static void stream_state_changed(void *d, enum pw_stream_state old,
break; break;
case PW_STREAM_STATE_PAUSED: case PW_STREAM_STATE_PAUSED:
if (impl->direction == PW_DIRECTION_OUTPUT) { if (impl->direction == PW_DIRECTION_OUTPUT) {
pw_loop_update_io(impl->data_loop, impl->socket, 0); pw_loop_update_io(impl->data_loop, impl->socket, impl->paused ? SPA_IO_IN : 0);
set_timeout(impl, 0); set_timeout(impl, 0);
} }
break; break;
@ -281,6 +284,26 @@ static void stream_state_changed(void *d, enum pw_stream_state old,
} }
} }
static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, const void *data,
size_t size, void *user_data)
{
struct impl *impl = user_data;
const bool *paused = data;
pw_log_info("set paused: %d", *paused);
impl->paused = *paused;
pw_stream_set_active(impl->stream, !*paused);
return 0;
}
static void pause_stream(struct impl *impl, bool paused)
{
if (!impl->may_pause)
return;
if (impl->direction == PW_DIRECTION_INPUT)
pw_loop_update_io(impl->data_loop, impl->socket, paused ? SPA_IO_OUT : 0);
pw_loop_invoke(impl->main_loop, do_pause, 1, &paused, sizeof(bool), false, impl);
}
static void playback_stream_process(void *data) static void playback_stream_process(void *data)
{ {
struct impl *impl = data; struct impl *impl = data;
@ -308,8 +331,8 @@ static void playback_stream_process(void *data)
continue; continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) { } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Don't continue writing */ /* Don't continue writing */
pw_log_debug("pipe (%s) overrun: %m", pw_log_debug("pipe (%s) overrun: %m", impl->filename);
impl->filename); pause_stream(impl, true);
break; break;
} else { } else {
pw_log_warn("Failed to write to pipe (%s): %m", pw_log_warn("Failed to write to pipe (%s): %m",
@ -370,8 +393,10 @@ static void capture_stream_process(void *data)
if (avail < (int32_t)size) { if (avail < (int32_t)size) {
memset(bd->data, 0, size); memset(bd->data, 0, size);
if (avail > 0) if (avail >= 0) {
pw_log_warn("underrun %d < %u", avail, size); pw_log_warn("underrun %d < %u", avail, size);
pause_stream(impl, true);
}
impl->have_sync = false; impl->have_sync = false;
} }
if (avail > (int32_t)RINGBUFFER_SIZE) { if (avail > (int32_t)RINGBUFFER_SIZE) {
@ -456,6 +481,8 @@ static int create_stream(struct impl *impl)
params[n_params++] = spa_format_audio_raw_build(&b, params[n_params++] = spa_format_audio_raw_build(&b,
SPA_PARAM_EnumFormat, &impl->info); SPA_PARAM_EnumFormat, &impl->info);
impl->paused = false;
if ((res = pw_stream_connect(impl->stream, if ((res = pw_stream_connect(impl->stream,
impl->direction, impl->direction,
PW_ID_ANY, PW_ID_ANY,
@ -550,6 +577,8 @@ static void on_pipe_io(void *data, int fd, uint32_t mask)
pw_loop_update_io(impl->data_loop, impl->socket, 0); pw_loop_update_io(impl->data_loop, impl->socket, 0);
return; return;
} }
if (impl->paused)
pause_stream(impl, false);
if (mask & SPA_IO_IN) if (mask & SPA_IO_IN)
handle_pipe_read(impl); handle_pipe_read(impl);
} }
@ -839,6 +868,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module; impl->module = module;
impl->context = context; impl->context = context;
impl->main_loop = pw_context_get_main_loop(context);
data_loop = pw_context_get_data_loop(context); data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop); impl->data_loop = pw_data_loop_get_loop(data_loop);
@ -848,16 +878,20 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (spa_streq(str, "capture")) { if (spa_streq(str, "capture")) {
impl->mode = MODE_CAPTURE; impl->mode = MODE_CAPTURE;
impl->direction = PW_DIRECTION_INPUT; impl->direction = PW_DIRECTION_INPUT;
impl->may_pause = true;
} else if (spa_streq(str, "playback")) { } else if (spa_streq(str, "playback")) {
impl->mode = MODE_PLAYBACK; impl->mode = MODE_PLAYBACK;
impl->direction = PW_DIRECTION_OUTPUT; impl->direction = PW_DIRECTION_OUTPUT;
impl->may_pause = true;
}else if (spa_streq(str, "sink")) { }else if (spa_streq(str, "sink")) {
impl->mode = MODE_SINK; impl->mode = MODE_SINK;
impl->direction = PW_DIRECTION_INPUT; impl->direction = PW_DIRECTION_INPUT;
impl->may_pause = false;
media_class = "Audio/Sink"; media_class = "Audio/Sink";
} else if (spa_streq(str, "source")) { } else if (spa_streq(str, "source")) {
impl->mode = MODE_SOURCE; impl->mode = MODE_SOURCE;
impl->direction = PW_DIRECTION_OUTPUT; impl->direction = PW_DIRECTION_OUTPUT;
impl->may_pause = false;
media_class = "Audio/Source"; media_class = "Audio/Source";
} else { } else {
pw_log_error("invalid tunnel.mode '%s'", str); pw_log_error("invalid tunnel.mode '%s'", str);