stream: use busy metadata

When we get a buffer from the server, mark it busy, when we queue it
again, mark it unbusy.
Refuse to dequeue a buffer when it is busy.
This commit is contained in:
Wim Taymans 2020-03-02 16:56:21 +01:00
parent b39a2258c4
commit 61dab51425

View file

@ -60,6 +60,7 @@ struct buffer {
#define BUFFER_FLAG_QUEUED (1 << 1)
#define BUFFER_FLAG_ADDED (1 << 2)
uint32_t flags;
struct spa_meta_busy *busy;
};
struct queue {
@ -733,6 +734,7 @@ static int impl_port_use_buffers(void *object,
struct buffer *b = &impl->buffers[i];
b->this.buffer = buffers[i];
b->busy = spa_buffer_find_meta_data(buffers[i], SPA_META_Busy, sizeof(*b->busy));
if (impl->direction == SPA_DIRECTION_OUTPUT) {
pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
@ -740,6 +742,7 @@ static int impl_port_use_buffers(void *object,
}
SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED);
pw_stream_emit_add_buffer(stream, &b->this);
}
@ -790,6 +793,8 @@ static int impl_node_process_input(void *object)
/* push new buffer */
if (push_queue(impl, &impl->dequeued, b) == 0) {
copy_position(impl, impl->dequeued.incount);
if (b->busy)
ATOMIC_INC(b->busy->count);
call_process(impl);
}
}
@ -1415,6 +1420,12 @@ static void add_params(struct stream *impl)
SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))));
add_param(impl, SPA_PARAM_Meta, PARAM_FLAG_LOCKED,
spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Busy),
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_busy))));
}
static int find_format(struct stream *impl, enum pw_direction direction,
@ -1872,6 +1883,15 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
}
pw_log_trace(NAME" %p: dequeue buffer %d", stream, b->id);
if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) {
if (ATOMIC_INC(b->busy->count) > 1) {
ATOMIC_DEC(b->busy->count);
push_queue(impl, &impl->dequeued, b);
pw_log_trace(NAME" %p: buffer busy", stream);
errno = EBUSY;
return NULL;
}
}
return &b->this;
}
@ -1882,6 +1902,9 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this);
int res;
if (b->busy)
ATOMIC_DEC(b->busy->count);
pw_log_trace(NAME" %p: queue buffer %d", stream, b->id);
if ((res = push_queue(impl, &impl->queued, b)) < 0)
return res;