diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index caa654641..89307ad3d 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -154,6 +154,7 @@ struct stream { unsigned int free_data:1; unsigned int subscribe:1; unsigned int alloc_buffers:1; + unsigned int draining:1; }; static int get_param_index(uint32_t id) @@ -303,6 +304,7 @@ do_call_process(struct spa_loop *loop, static void call_process(struct stream *impl) { + pw_log_trace(NAME" %p: call process", impl); if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) { do_call_process(NULL, false, 1, NULL, 0, impl); } @@ -312,6 +314,24 @@ static void call_process(struct stream *impl) } } +static int +do_call_drained(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + struct pw_stream *stream = &impl->this; + pw_log_trace(NAME" %p: drained", stream); + pw_stream_emit_drained(stream); + impl->draining = false; + return 0; +} + +static void call_drained(struct stream *impl) +{ + pw_loop_invoke(impl->core->main_loop, + do_call_drained, 1, NULL, 0, false, impl); +} + static int impl_set_io(void *object, uint32_t id, void *data, size_t size) { struct stream *impl = object; @@ -338,7 +358,10 @@ static int impl_send_command(void *object, const struct spa_command *command) switch (SPA_NODE_COMMAND_ID(command)) { case SPA_NODE_COMMAND_Pause: + pw_loop_invoke(impl->core->main_loop, + NULL, 0, NULL, 0, false, impl); if (stream->state == PW_STREAM_STATE_STREAMING) { + pw_log_debug(NAME" %p: pause", stream); stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); } @@ -785,15 +808,20 @@ again: io->buffer_id = SPA_ID_INVALID; io->status = SPA_STATUS_NEED_BUFFER; pw_log_trace(NAME" %p: no more buffers %p", stream, io); + if (impl->draining) { + call_drained(impl); + goto exit; + } } } - if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { + if (!impl->draining && !SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { call_process(impl); if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) >= MIN_QUEUED && io->status == SPA_STATUS_NEED_BUFFER) goto again; } +exit: copy_position(impl, impl->queued.outcount); res = io->status; @@ -1491,6 +1519,9 @@ int pw_stream_disconnect(struct pw_stream *stream) pw_log_debug(NAME" %p: disconnect", stream); impl->disconnecting = true; + if (impl->node) + pw_node_set_active(impl->node, false); + if (stream->proxy) pw_proxy_destroy(stream->proxy); @@ -1498,6 +1529,7 @@ int pw_stream_disconnect(struct pw_stream *stream) pw_node_destroy(impl->node); impl->node = NULL; } + return 0; } @@ -1698,12 +1730,20 @@ do_flush(struct spa_loop *loop, return 0; } +static int +do_drain(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + impl->draining = true; + return 0; +} SPA_EXPORT int pw_stream_flush(struct pw_stream *stream, bool drain) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); pw_loop_invoke(impl->core->data_loop, - do_flush, 1, NULL, 0, true, impl); + drain ? do_drain : do_flush, 1, NULL, 0, true, impl); return 0; }