From 43b964ea266c68d8a0d5e1e51d45179751afdca6 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 7 Jul 2022 16:17:56 +0200 Subject: [PATCH] stream: improve process callback When we are running non-rt and we just dequeued a buffer, check if the queue is empty and we need to call process to queue a new buffer. We can only do this when there is an empty buffer to dequeue. Don't try to request a new buffer immediately when we are rate_matching because we want the peer to first process the newly dequeued buffer and update the rate match. This makes pw-midiplay work again. Previously it was only requesting a buffer every other cycle. --- src/pipewire/stream.c | 66 +++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index b85a796d8..f35de287e 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -313,7 +313,7 @@ static int update_params(struct stream *impl, uint32_t id, } -static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer) +static inline int queue_push(struct stream *stream, struct queue *queue, struct buffer *buffer) { uint32_t index; @@ -330,7 +330,13 @@ static inline int push_queue(struct stream *stream, struct queue *queue, struct return 0; } -static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue) +static inline bool queue_is_empty(struct stream *stream, struct queue *queue) +{ + uint32_t index; + return spa_ringbuffer_get_read_index(&queue->ring, &index) < 1; +} + +static inline struct buffer *queue_pop(struct stream *stream, struct queue *queue) { uint32_t index, id; struct buffer *buffer; @@ -788,7 +794,7 @@ static void clear_buffers(struct pw_stream *stream) if (impl->direction == SPA_DIRECTION_INPUT) { struct buffer *b; - while ((b = pop_queue(impl, &impl->dequeued))) { + while ((b = queue_pop(impl, &impl->dequeued))) { if (b->busy) ATOMIC_DEC(b->busy->count); } @@ -927,7 +933,7 @@ static int impl_port_use_buffers(void *object, if (impl->direction == SPA_DIRECTION_OUTPUT) { pw_log_trace("%p: recycle buffer %d", stream, b->id); - push_queue(impl, &impl->dequeued, b); + queue_push(impl, &impl->dequeued, b); } SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED); @@ -945,7 +951,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe struct stream *d = object; pw_log_trace("%p: recycle buffer %d", d, buffer_id); if (buffer_id < d->n_buffers) - push_queue(d, &d->queued, &d->buffers[buffer_id]); + queue_push(d, &d->queued, &d->buffers[buffer_id]); return 0; } @@ -984,7 +990,7 @@ static int impl_node_process_input(void *object) if (io->status == SPA_STATUS_HAVE_DATA && (b = get_buffer(stream, io->buffer_id)) != NULL) { /* push new buffer */ - if (push_queue(impl, &impl->dequeued, b) == 0) { + if (queue_push(impl, &impl->dequeued, b) == 0) { copy_position(impl, impl->dequeued.incount); if (b->busy) ATOMIC_INC(b->busy->count); @@ -993,7 +999,7 @@ static int impl_node_process_input(void *object) } if (io->status != SPA_STATUS_NEED_DATA) { /* pop buffer to recycle */ - if ((b = pop_queue(impl, &impl->queued))) { + if ((b = queue_pop(impl, &impl->queued))) { pw_log_trace_fp("%p: recycle buffer %d", stream, b->id); } else if (io->status == -EPIPE) return io->status; @@ -1013,28 +1019,36 @@ static int impl_node_process_output(void *object) struct spa_io_buffers *io = impl->io; struct buffer *b; int res; - uint32_t index; - bool recycled; + bool ask_more; again: pw_log_trace_fp("%p: process out status:%d id:%d", stream, io->status, io->buffer_id); - recycled = false; + ask_more = false; if ((res = io->status) != SPA_STATUS_HAVE_DATA) { /* recycle old buffer */ if ((b = get_buffer(stream, io->buffer_id)) != NULL) { pw_log_trace_fp("%p: recycle buffer %d", stream, b->id); - push_queue(impl, &impl->dequeued, b); - recycled = true; + queue_push(impl, &impl->dequeued, b); } /* pop new buffer */ - if ((b = pop_queue(impl, &impl->queued)) != NULL) { + if ((b = queue_pop(impl, &impl->queued)) != NULL) { impl->drained = false; io->buffer_id = b->id; res = io->status = SPA_STATUS_HAVE_DATA; pw_log_trace_fp("%p: pop %d %p", stream, b->id, io); + /* we have a buffer, if we are not rt and don't follow + * any rate matching and there are no more + * buffers queued and there is a buffer to dequeue, ask for + * more buffers so that we have one in the next round. + * If we are using rate matching we need to wait until the + * rate matching node (audioconvert) has been scheduled to + * update the values. */ + ask_more = !impl->process_rt && impl->rate_match == NULL && + queue_is_empty(impl, &impl->queued) && + !queue_is_empty(impl, &impl->dequeued); } else if (impl->draining || impl->drained) { impl->draining = true; impl->drained = true; @@ -1045,6 +1059,7 @@ again: io->buffer_id = SPA_ID_INVALID; res = io->status = SPA_STATUS_NEED_DATA; pw_log_trace_fp("%p: no more buffers %p", stream, io); + ask_more = true; } } @@ -1053,18 +1068,13 @@ again: if (!impl->draining && !impl->driving) { /* we're not draining, not a driver check if we need to get * more buffers */ - if (!impl->process_rt && (recycled || res == SPA_STATUS_NEED_DATA)) { - /* not realtime and we have a free buffer, trigger process so that we have - * data in the next round. */ + if (ask_more) { if (update_requested(impl) > 0) call_process(impl); - } else if (res == SPA_STATUS_NEED_DATA) { - /* realtime and we don't have a buffer, trigger process and try - * again when there is something in the queue now */ - if (update_requested(impl) > 0) - call_process(impl); - if (impl->draining || - spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) + /* realtime, we can try again now if there is something. + * non-realtime, we will have to try in the next round */ + if (impl->process_rt && + (impl->draining || !queue_is_empty(impl, &impl->queued))) goto again; } } @@ -2229,7 +2239,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) struct buffer *b; int res; - if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { + if ((b = queue_pop(impl, &impl->dequeued)) == NULL) { res = -errno; pw_log_trace_fp("%p: no more buffers: %m", stream); errno = -res; @@ -2240,7 +2250,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) { if (ATOMIC_INC(b->busy->count) > 1) { ATOMIC_DEC(b->busy->count); - push_queue(impl, &impl->dequeued, b); + queue_push(impl, &impl->dequeued, b); pw_log_trace_fp("%p: buffer busy", stream); errno = EBUSY; return NULL; @@ -2260,7 +2270,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) ATOMIC_DEC(b->busy->count); pw_log_trace_fp("%p: queue buffer %d", stream, b->id); - if ((res = push_queue(impl, &impl->queued, b)) < 0) + if ((res = queue_push(impl, &impl->queued, b)) < 0) return res; if (impl->direction == SPA_DIRECTION_OUTPUT && @@ -2281,9 +2291,9 @@ do_flush(struct spa_loop *loop, pw_log_trace_fp("%p: flush", impl); do { - b = pop_queue(impl, &impl->queued); + b = queue_pop(impl, &impl->queued); if (b != NULL) - push_queue(impl, &impl->dequeued, b); + queue_push(impl, &impl->dequeued, b); } while (b);