From 00bb4a936a3dd0ab2668b5c9f2b3ef009a93a38e Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 2 Sep 2025 16:46:03 +0200 Subject: [PATCH] filter: removed QUEUED flag and add DEQUEUED flag Remove the QUEUED flags to check if a buffer is in some queue. Add a new flag to check if a buffer was dequeued by the application. Check if the application only queues buffers with the DEQUEUED flag set. --- src/pipewire/filter.c | 24 +++++++++++++----------- src/pipewire/stream.c | 3 +-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index a92819710..b78b827ed 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -36,7 +36,7 @@ struct buffer { struct pw_buffer this; uint32_t id; #define BUFFER_FLAG_MAPPED (1 << 0) -#define BUFFER_FLAG_QUEUED (1 << 1) +#define BUFFER_FLAG_DEQUEUED (1 << 1) #define BUFFER_FLAG_ADDED (1 << 2) uint32_t flags; }; @@ -356,11 +356,9 @@ static inline int push_queue(struct port *port, struct queue *queue, struct buff { uint32_t index; - if (SPA_FLAG_IS_SET(buffer->flags, BUFFER_FLAG_QUEUED)) + if (buffer->id >= port->n_buffers) return -EINVAL; - SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED); - spa_ringbuffer_get_write_index(&queue->ring, &index); queue->ids[index & MASK_BUFFERS] = buffer->id; spa_ringbuffer_write_update(&queue->ring, index + 1); @@ -382,7 +380,6 @@ static inline struct buffer *pop_queue(struct port *port, struct queue *queue) spa_ringbuffer_read_update(&queue->ring, index + 1); buffer = &port->buffers[id]; - SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED); return buffer; } @@ -941,6 +938,7 @@ static int impl_port_use_buffers(void *object, pw_log_debug("%p: got buffer id:%d datas:%d mapped size %d", filter, i, buffers[i]->n_datas, size); } + port->n_buffers = n_buffers; for (i = 0; i < n_buffers; i++) { struct buffer *b = &port->buffers[i]; @@ -953,11 +951,9 @@ static int impl_port_use_buffers(void *object, } SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED); + pw_filter_emit_add_buffer(filter, port->user_data, &b->this); } - - port->n_buffers = n_buffers; - return 0; } @@ -970,9 +966,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe return -EINVAL; pw_log_trace("%p: recycle buffer %d", impl, buffer_id); - if (buffer_id < port->n_buffers) - push_queue(port, &port->queued, &port->buffers[buffer_id]); - + push_queue(port, &port->queued, &port->buffers[buffer_id]); return 0; } @@ -2014,6 +2008,7 @@ struct pw_buffer *pw_filter_dequeue_buffer(void *port_data) return NULL; } pw_log_trace_fp("%p: dequeue buffer %d", p->filter, b->id); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_DEQUEUED); return &b->this; } @@ -2023,6 +2018,13 @@ int pw_filter_queue_buffer(void *port_data, struct pw_buffer *buffer) { struct port *p = SPA_CONTAINER_OF(port_data, struct port, user_data); struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this); + + if (!SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_DEQUEUED)) { + pw_log_warn("%p: tried to queue cleared buffer %d", p->filter, b->id); + return -EINVAL; + } + SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_DEQUEUED); + pw_log_trace_fp("%p: queue buffer %d", p->filter, b->id); return push_queue(p, &p->queued, b); } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 75ba9eaee..904e44a67 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -1045,8 +1045,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe { struct stream *d = object; pw_log_trace("%p: recycle buffer %d", d, buffer_id); - if (buffer_id < d->n_buffers) - queue_push(d, &d->queued, &d->buffers[buffer_id]); + queue_push(d, &d->queued, &d->buffers[buffer_id]); return 0; }