From 23fe46b698d1fc628976bc29f3aa3604f089a551 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 16 Aug 2019 15:14:23 +0200 Subject: [PATCH] stream: implement drain Set the drained flag. If the server calls us and we're out of buffers and the drained flag is set, emit the drained signal. Deactivate the node on disconnect. Make sure we have no more pending process callbacks in the invoke queue before when we pause. --- src/pipewire/stream.c | 44 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index caa654641..89307ad3d 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -154,6 +154,7 @@ struct stream { unsigned int free_data:1; unsigned int subscribe:1; unsigned int alloc_buffers:1; + unsigned int draining:1; }; static int get_param_index(uint32_t id) @@ -303,6 +304,7 @@ do_call_process(struct spa_loop *loop, static void call_process(struct stream *impl) { + pw_log_trace(NAME" %p: call process", impl); if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) { do_call_process(NULL, false, 1, NULL, 0, impl); } @@ -312,6 +314,24 @@ static void call_process(struct stream *impl) } } +static int +do_call_drained(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + struct pw_stream *stream = &impl->this; + pw_log_trace(NAME" %p: drained", stream); + pw_stream_emit_drained(stream); + impl->draining = false; + return 0; +} + +static void call_drained(struct stream *impl) +{ + pw_loop_invoke(impl->core->main_loop, + do_call_drained, 1, NULL, 0, false, impl); +} + static int impl_set_io(void *object, uint32_t id, void *data, size_t size) { struct stream *impl = object; @@ -338,7 +358,10 @@ static int impl_send_command(void *object, const struct spa_command *command) switch (SPA_NODE_COMMAND_ID(command)) { case SPA_NODE_COMMAND_Pause: + pw_loop_invoke(impl->core->main_loop, + NULL, 0, NULL, 0, false, impl); if (stream->state == PW_STREAM_STATE_STREAMING) { + pw_log_debug(NAME" %p: pause", stream); stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); } @@ -785,15 +808,20 @@ again: io->buffer_id = SPA_ID_INVALID; io->status = SPA_STATUS_NEED_BUFFER; pw_log_trace(NAME" %p: no more buffers %p", stream, io); + if (impl->draining) { + call_drained(impl); + goto exit; + } } } - if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { + if (!impl->draining && !SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { call_process(impl); if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) >= MIN_QUEUED && io->status == SPA_STATUS_NEED_BUFFER) goto again; } +exit: copy_position(impl, impl->queued.outcount); res = io->status; @@ -1491,6 +1519,9 @@ int pw_stream_disconnect(struct pw_stream *stream) pw_log_debug(NAME" %p: disconnect", stream); impl->disconnecting = true; + if (impl->node) + pw_node_set_active(impl->node, false); + if (stream->proxy) pw_proxy_destroy(stream->proxy); @@ -1498,6 +1529,7 @@ int pw_stream_disconnect(struct pw_stream *stream) pw_node_destroy(impl->node); impl->node = NULL; } + return 0; } @@ -1698,12 +1730,20 @@ do_flush(struct spa_loop *loop, return 0; } +static int +do_drain(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + impl->draining = true; + return 0; +} SPA_EXPORT int pw_stream_flush(struct pw_stream *stream, bool drain) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); pw_loop_invoke(impl->core->data_loop, - do_flush, 1, NULL, 0, true, impl); + drain ? do_drain : do_flush, 1, NULL, 0, true, impl); return 0; }