filter: removed QUEUED flag and add DEQUEUED flag

Remove the QUEUED flags to check if a buffer is in some queue.

Add a new flag to check if a buffer was dequeued by the application.
Check if the application only queues buffers with the DEQUEUED flag set.
This commit is contained in:
Wim Taymans 2025-09-02 16:46:03 +02:00
parent b6ce585da6
commit 00bb4a936a
2 changed files with 14 additions and 13 deletions

View file

@ -36,7 +36,7 @@ struct buffer {
struct pw_buffer this;
uint32_t id;
#define BUFFER_FLAG_MAPPED (1 << 0)
#define BUFFER_FLAG_QUEUED (1 << 1)
#define BUFFER_FLAG_DEQUEUED (1 << 1)
#define BUFFER_FLAG_ADDED (1 << 2)
uint32_t flags;
};
@ -356,11 +356,9 @@ static inline int push_queue(struct port *port, struct queue *queue, struct buff
{
uint32_t index;
if (SPA_FLAG_IS_SET(buffer->flags, BUFFER_FLAG_QUEUED))
if (buffer->id >= port->n_buffers)
return -EINVAL;
SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED);
spa_ringbuffer_get_write_index(&queue->ring, &index);
queue->ids[index & MASK_BUFFERS] = buffer->id;
spa_ringbuffer_write_update(&queue->ring, index + 1);
@ -382,7 +380,6 @@ static inline struct buffer *pop_queue(struct port *port, struct queue *queue)
spa_ringbuffer_read_update(&queue->ring, index + 1);
buffer = &port->buffers[id];
SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED);
return buffer;
}
@ -941,6 +938,7 @@ static int impl_port_use_buffers(void *object,
pw_log_debug("%p: got buffer id:%d datas:%d mapped size %d", filter, i,
buffers[i]->n_datas, size);
}
port->n_buffers = n_buffers;
for (i = 0; i < n_buffers; i++) {
struct buffer *b = &port->buffers[i];
@ -953,11 +951,9 @@ static int impl_port_use_buffers(void *object,
}
SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED);
pw_filter_emit_add_buffer(filter, port->user_data, &b->this);
}
port->n_buffers = n_buffers;
return 0;
}
@ -970,9 +966,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
return -EINVAL;
pw_log_trace("%p: recycle buffer %d", impl, buffer_id);
if (buffer_id < port->n_buffers)
push_queue(port, &port->queued, &port->buffers[buffer_id]);
push_queue(port, &port->queued, &port->buffers[buffer_id]);
return 0;
}
@ -2014,6 +2008,7 @@ struct pw_buffer *pw_filter_dequeue_buffer(void *port_data)
return NULL;
}
pw_log_trace_fp("%p: dequeue buffer %d", p->filter, b->id);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_DEQUEUED);
return &b->this;
}
@ -2023,6 +2018,13 @@ int pw_filter_queue_buffer(void *port_data, struct pw_buffer *buffer)
{
struct port *p = SPA_CONTAINER_OF(port_data, struct port, user_data);
struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this);
if (!SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_DEQUEUED)) {
pw_log_warn("%p: tried to queue cleared buffer %d", p->filter, b->id);
return -EINVAL;
}
SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_DEQUEUED);
pw_log_trace_fp("%p: queue buffer %d", p->filter, b->id);
return push_queue(p, &p->queued, b);
}

View file

@ -1045,8 +1045,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
{
struct stream *d = object;
pw_log_trace("%p: recycle buffer %d", d, buffer_id);
if (buffer_id < d->n_buffers)
queue_push(d, &d->queued, &d->buffers[buffer_id]);
queue_push(d, &d->queued, &d->buffers[buffer_id]);
return 0;
}