From 61dab51425533fd7594ebfc844e0acedce8c12b3 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 2 Mar 2020 16:56:21 +0100 Subject: [PATCH] stream: use busy metadata When we get a buffer from the server, mark it busy, when we queue it again, mark it unbusy. Refuse to dequeue a buffer when it is busy. --- src/pipewire/stream.c | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index e0d861eb0..ad3af4825 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -60,6 +60,7 @@ struct buffer { #define BUFFER_FLAG_QUEUED (1 << 1) #define BUFFER_FLAG_ADDED (1 << 2) uint32_t flags; + struct spa_meta_busy *busy; }; struct queue { @@ -733,6 +734,7 @@ static int impl_port_use_buffers(void *object, struct buffer *b = &impl->buffers[i]; b->this.buffer = buffers[i]; + b->busy = spa_buffer_find_meta_data(buffers[i], SPA_META_Busy, sizeof(*b->busy)); if (impl->direction == SPA_DIRECTION_OUTPUT) { pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); @@ -740,6 +742,7 @@ static int impl_port_use_buffers(void *object, } SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED); + pw_stream_emit_add_buffer(stream, &b->this); } @@ -790,6 +793,8 @@ static int impl_node_process_input(void *object) /* push new buffer */ if (push_queue(impl, &impl->dequeued, b) == 0) { copy_position(impl, impl->dequeued.incount); + if (b->busy) + ATOMIC_INC(b->busy->count); call_process(impl); } } @@ -1415,6 +1420,12 @@ static void add_params(struct stream *impl) SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)))); + + add_param(impl, SPA_PARAM_Meta, PARAM_FLAG_LOCKED, + spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, + SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Busy), + SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_busy)))); } static int find_format(struct stream *impl, enum pw_direction direction, @@ -1872,6 +1883,15 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) } pw_log_trace(NAME" %p: dequeue buffer %d", stream, b->id); + if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) { + if (ATOMIC_INC(b->busy->count) > 1) { + ATOMIC_DEC(b->busy->count); + push_queue(impl, &impl->dequeued, b); + pw_log_trace(NAME" %p: buffer busy", stream); + errno = EBUSY; + return NULL; + } + } return &b->this; } @@ -1882,6 +1902,9 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this); int res; + if (b->busy) + ATOMIC_DEC(b->busy->count); + pw_log_trace(NAME" %p: queue buffer %d", stream, b->id); if ((res = push_queue(impl, &impl->queued, b)) < 0) return res;