stream: improve process callback

When we are running non-rt and we just dequeued a buffer, check if the
queue is empty and we need to call process to queue a new buffer. We can
only do this when there is an empty buffer to dequeue.

Don't try to request a new buffer immediately when we are rate_matching
because we want the peer to first process the newly dequeued buffer and
update the rate match.

This makes pw-midiplay work again. Previously it was only requesting a
buffer every other cycle.
This commit is contained in:
Wim Taymans 2022-07-07 16:17:56 +02:00
parent a293e079d1
commit 43b964ea26

View file

@ -313,7 +313,7 @@ static int update_params(struct stream *impl, uint32_t id,
} }
static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer) static inline int queue_push(struct stream *stream, struct queue *queue, struct buffer *buffer)
{ {
uint32_t index; uint32_t index;
@ -330,7 +330,13 @@ static inline int push_queue(struct stream *stream, struct queue *queue, struct
return 0; return 0;
} }
static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue) static inline bool queue_is_empty(struct stream *stream, struct queue *queue)
{
uint32_t index;
return spa_ringbuffer_get_read_index(&queue->ring, &index) < 1;
}
static inline struct buffer *queue_pop(struct stream *stream, struct queue *queue)
{ {
uint32_t index, id; uint32_t index, id;
struct buffer *buffer; struct buffer *buffer;
@ -788,7 +794,7 @@ static void clear_buffers(struct pw_stream *stream)
if (impl->direction == SPA_DIRECTION_INPUT) { if (impl->direction == SPA_DIRECTION_INPUT) {
struct buffer *b; struct buffer *b;
while ((b = pop_queue(impl, &impl->dequeued))) { while ((b = queue_pop(impl, &impl->dequeued))) {
if (b->busy) if (b->busy)
ATOMIC_DEC(b->busy->count); ATOMIC_DEC(b->busy->count);
} }
@ -927,7 +933,7 @@ static int impl_port_use_buffers(void *object,
if (impl->direction == SPA_DIRECTION_OUTPUT) { if (impl->direction == SPA_DIRECTION_OUTPUT) {
pw_log_trace("%p: recycle buffer %d", stream, b->id); pw_log_trace("%p: recycle buffer %d", stream, b->id);
push_queue(impl, &impl->dequeued, b); queue_push(impl, &impl->dequeued, b);
} }
SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED); SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED);
@ -945,7 +951,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
struct stream *d = object; struct stream *d = object;
pw_log_trace("%p: recycle buffer %d", d, buffer_id); pw_log_trace("%p: recycle buffer %d", d, buffer_id);
if (buffer_id < d->n_buffers) if (buffer_id < d->n_buffers)
push_queue(d, &d->queued, &d->buffers[buffer_id]); queue_push(d, &d->queued, &d->buffers[buffer_id]);
return 0; return 0;
} }
@ -984,7 +990,7 @@ static int impl_node_process_input(void *object)
if (io->status == SPA_STATUS_HAVE_DATA && if (io->status == SPA_STATUS_HAVE_DATA &&
(b = get_buffer(stream, io->buffer_id)) != NULL) { (b = get_buffer(stream, io->buffer_id)) != NULL) {
/* push new buffer */ /* push new buffer */
if (push_queue(impl, &impl->dequeued, b) == 0) { if (queue_push(impl, &impl->dequeued, b) == 0) {
copy_position(impl, impl->dequeued.incount); copy_position(impl, impl->dequeued.incount);
if (b->busy) if (b->busy)
ATOMIC_INC(b->busy->count); ATOMIC_INC(b->busy->count);
@ -993,7 +999,7 @@ static int impl_node_process_input(void *object)
} }
if (io->status != SPA_STATUS_NEED_DATA) { if (io->status != SPA_STATUS_NEED_DATA) {
/* pop buffer to recycle */ /* pop buffer to recycle */
if ((b = pop_queue(impl, &impl->queued))) { if ((b = queue_pop(impl, &impl->queued))) {
pw_log_trace_fp("%p: recycle buffer %d", stream, b->id); pw_log_trace_fp("%p: recycle buffer %d", stream, b->id);
} else if (io->status == -EPIPE) } else if (io->status == -EPIPE)
return io->status; return io->status;
@ -1013,28 +1019,36 @@ static int impl_node_process_output(void *object)
struct spa_io_buffers *io = impl->io; struct spa_io_buffers *io = impl->io;
struct buffer *b; struct buffer *b;
int res; int res;
uint32_t index; bool ask_more;
bool recycled;
again: again:
pw_log_trace_fp("%p: process out status:%d id:%d", stream, pw_log_trace_fp("%p: process out status:%d id:%d", stream,
io->status, io->buffer_id); io->status, io->buffer_id);
recycled = false; ask_more = false;
if ((res = io->status) != SPA_STATUS_HAVE_DATA) { if ((res = io->status) != SPA_STATUS_HAVE_DATA) {
/* recycle old buffer */ /* recycle old buffer */
if ((b = get_buffer(stream, io->buffer_id)) != NULL) { if ((b = get_buffer(stream, io->buffer_id)) != NULL) {
pw_log_trace_fp("%p: recycle buffer %d", stream, b->id); pw_log_trace_fp("%p: recycle buffer %d", stream, b->id);
push_queue(impl, &impl->dequeued, b); queue_push(impl, &impl->dequeued, b);
recycled = true;
} }
/* pop new buffer */ /* pop new buffer */
if ((b = pop_queue(impl, &impl->queued)) != NULL) { if ((b = queue_pop(impl, &impl->queued)) != NULL) {
impl->drained = false; impl->drained = false;
io->buffer_id = b->id; io->buffer_id = b->id;
res = io->status = SPA_STATUS_HAVE_DATA; res = io->status = SPA_STATUS_HAVE_DATA;
pw_log_trace_fp("%p: pop %d %p", stream, b->id, io); pw_log_trace_fp("%p: pop %d %p", stream, b->id, io);
/* we have a buffer, if we are not rt and don't follow
* any rate matching and there are no more
* buffers queued and there is a buffer to dequeue, ask for
* more buffers so that we have one in the next round.
* If we are using rate matching we need to wait until the
* rate matching node (audioconvert) has been scheduled to
* update the values. */
ask_more = !impl->process_rt && impl->rate_match == NULL &&
queue_is_empty(impl, &impl->queued) &&
!queue_is_empty(impl, &impl->dequeued);
} else if (impl->draining || impl->drained) { } else if (impl->draining || impl->drained) {
impl->draining = true; impl->draining = true;
impl->drained = true; impl->drained = true;
@ -1045,6 +1059,7 @@ again:
io->buffer_id = SPA_ID_INVALID; io->buffer_id = SPA_ID_INVALID;
res = io->status = SPA_STATUS_NEED_DATA; res = io->status = SPA_STATUS_NEED_DATA;
pw_log_trace_fp("%p: no more buffers %p", stream, io); pw_log_trace_fp("%p: no more buffers %p", stream, io);
ask_more = true;
} }
} }
@ -1053,18 +1068,13 @@ again:
if (!impl->draining && !impl->driving) { if (!impl->draining && !impl->driving) {
/* we're not draining, not a driver check if we need to get /* we're not draining, not a driver check if we need to get
* more buffers */ * more buffers */
if (!impl->process_rt && (recycled || res == SPA_STATUS_NEED_DATA)) { if (ask_more) {
/* not realtime and we have a free buffer, trigger process so that we have
* data in the next round. */
if (update_requested(impl) > 0) if (update_requested(impl) > 0)
call_process(impl); call_process(impl);
} else if (res == SPA_STATUS_NEED_DATA) { /* realtime, we can try again now if there is something.
/* realtime and we don't have a buffer, trigger process and try * non-realtime, we will have to try in the next round */
* again when there is something in the queue now */ if (impl->process_rt &&
if (update_requested(impl) > 0) (impl->draining || !queue_is_empty(impl, &impl->queued)))
call_process(impl);
if (impl->draining ||
spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0)
goto again; goto again;
} }
} }
@ -2229,7 +2239,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
struct buffer *b; struct buffer *b;
int res; int res;
if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { if ((b = queue_pop(impl, &impl->dequeued)) == NULL) {
res = -errno; res = -errno;
pw_log_trace_fp("%p: no more buffers: %m", stream); pw_log_trace_fp("%p: no more buffers: %m", stream);
errno = -res; errno = -res;
@ -2240,7 +2250,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) { if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) {
if (ATOMIC_INC(b->busy->count) > 1) { if (ATOMIC_INC(b->busy->count) > 1) {
ATOMIC_DEC(b->busy->count); ATOMIC_DEC(b->busy->count);
push_queue(impl, &impl->dequeued, b); queue_push(impl, &impl->dequeued, b);
pw_log_trace_fp("%p: buffer busy", stream); pw_log_trace_fp("%p: buffer busy", stream);
errno = EBUSY; errno = EBUSY;
return NULL; return NULL;
@ -2260,7 +2270,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
ATOMIC_DEC(b->busy->count); ATOMIC_DEC(b->busy->count);
pw_log_trace_fp("%p: queue buffer %d", stream, b->id); pw_log_trace_fp("%p: queue buffer %d", stream, b->id);
if ((res = push_queue(impl, &impl->queued, b)) < 0) if ((res = queue_push(impl, &impl->queued, b)) < 0)
return res; return res;
if (impl->direction == SPA_DIRECTION_OUTPUT && if (impl->direction == SPA_DIRECTION_OUTPUT &&
@ -2281,9 +2291,9 @@ do_flush(struct spa_loop *loop,
pw_log_trace_fp("%p: flush", impl); pw_log_trace_fp("%p: flush", impl);
do { do {
b = pop_queue(impl, &impl->queued); b = queue_pop(impl, &impl->queued);
if (b != NULL) if (b != NULL)
push_queue(impl, &impl->dequeued, b); queue_push(impl, &impl->dequeued, b);
} }
while (b); while (b);