diff --git a/spa/include/spa/graph/graph.h b/spa/include/spa/graph/graph.h index 79adb20c1..561c3a4c6 100644 --- a/spa/include/spa/graph/graph.h +++ b/spa/include/spa/graph/graph.h @@ -233,10 +233,10 @@ static inline void spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node) { + spa_debug("node %p add to graph %p", node, graph); node->graph = graph; spa_list_append(&graph->nodes, &node->link); spa_graph_link_add(node, graph->state, &node->graph_link); - spa_debug("node %p add to graph %p", node, graph); } static inline void spa_graph_node_remove(struct spa_graph_node *node) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index d4d8da122..e0c59d951 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -1223,3 +1223,32 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) return call_trigger(impl); } + +static int +do_flush(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + struct buffer *b; + + pw_log_trace("stream %p: flush", impl); + do { + b = pop_queue(impl, &impl->queued); + if (b != NULL) + push_queue(impl, &impl->dequeued, b); + } + while (b); + + impl->time.queued = impl->queued.outcount = impl->dequeued.incount = + impl->dequeued.outcount = impl->queued.incount; + + return 0; +} + +int pw_stream_flush(struct pw_stream *stream, bool drain) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + pw_loop_invoke(impl->core->data_loop, + do_flush, 1, NULL, 0, true, impl); + return 0; +} diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 20461ab5b..c71977f35 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -173,7 +173,8 @@ struct pw_buffer { * returned in the time info. */ }; -/** Events for a stream */ +/** Events for a stream. These events are always called from the mainloop + * unless explicitly documented otherwise. */ struct pw_stream_events { #define PW_VERSION_STREAM_EVENTS 0 uint32_t version; @@ -197,6 +198,9 @@ struct pw_stream_events { * mainloop but can also be called directly from the realtime data * thread if the user is prepared to deal with this. */ void (*process) (void *data); + + /** The stream is drained */ + void (*drained) (void *data); }; /** Convert a stream state to a readable string \memberof pw_stream */ @@ -307,9 +311,6 @@ int pw_stream_set_control(struct pw_stream *stream, const char *name, float valu /** Get a control value */ int pw_stream_get_control(struct pw_stream *stream, const char *name, float *value); -/** Activate or deactivate the stream \memberof pw_stream */ -int pw_stream_set_active(struct pw_stream *stream, bool active); - /** A time structure \memberof pw_stream */ struct pw_time { int64_t now; /**< the monotonic time */ @@ -333,6 +334,12 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream); /** Submit a buffer for playback or recycle a buffer for capture. */ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer); +/** Activate or deactivate the stream \memberof pw_stream */ +int pw_stream_set_active(struct pw_stream *stream, bool active); + +/** Flush a stream. When \a drain is true, the drained callback will + * be called when all data is played or recorded */ +int pw_stream_flush(struct pw_stream *stream, bool drain); #ifdef __cplusplus }