diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index cea3b733e..37416aba9 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -166,6 +166,7 @@ struct stream { unsigned int warn_mlock:1; unsigned int process_rt:1; unsigned int driving:1; + unsigned int using_drive:1; }; static int get_param_index(uint32_t id) @@ -2054,7 +2055,8 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) return res; if (impl->direction == SPA_DIRECTION_OUTPUT && - impl->driving) { + impl->driving && !impl->using_drive) { + pw_log_debug("use pw_stream_drive() to drive the stream."); res = pw_loop_invoke(impl->context->data_loop, do_trigger, 1, NULL, 0, false, impl); } @@ -2103,3 +2105,40 @@ int pw_stream_flush(struct pw_stream *stream, bool drain) &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Flush)); return 0; } + +static int +do_drive(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + int res; + if (impl->direction == SPA_DIRECTION_OUTPUT) { + if (impl->process_rt) + spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0); + res = impl->node_methods.process(impl); + } else { + res = SPA_STATUS_NEED_DATA; + } + return spa_node_call_ready(&impl->callbacks, res); +} + +SPA_EXPORT +int pw_stream_drive(struct pw_stream *stream) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + int res = 0; + + pw_log_trace(NAME" %p", impl); + + impl->using_drive = true; + if (impl->driving) { + if (impl->direction == SPA_DIRECTION_OUTPUT && + !impl->process_rt) { + pw_loop_invoke(impl->context->main_loop, + do_call_process, 1, NULL, 0, false, impl); + } + res = pw_loop_invoke(impl->context->data_loop, + do_drive, 1, NULL, 0, false, impl); + } + return res; +} diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 720b4cff6..c788521fb 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -363,6 +363,10 @@ int pw_stream_set_active(struct pw_stream *stream, bool active); * be called when all data is played or recorded */ int pw_stream_flush(struct pw_stream *stream, bool drain); +/** Start a push/pull on the stream. The graph will be started and + * process will be called. Since 0.3.34 */ +int pw_stream_drive(struct pw_stream *stream); + /** * \} */