diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index ff0e0f840..59c5ee2a0 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -400,6 +400,28 @@ static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id) return NULL; } +static inline uint32_t update_requested(struct stream *impl) +{ + uint32_t index, id, res = 0; + struct buffer *buffer; + struct spa_io_rate_match *r = impl->rate_match; + + if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) < 1) + return 0; + + id = impl->dequeued.ids[index & MASK_BUFFERS]; + buffer = &impl->buffers[id]; + if (r) { + buffer->this.requested = r->size; + res = r->size > 0 ? 1 : 0; + } else { + buffer->this.requested = impl->quantum; + res = 1; + } + pw_log_trace_fp("%p: update buffer:%u size:%"PRIu64, impl, id, buffer->this.requested); + return res; +} + static int do_call_process(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) @@ -411,9 +433,11 @@ do_call_process(struct spa_loop *loop, return 0; } -static void call_process(struct stream *impl) +static inline void call_process(struct stream *impl) { pw_log_trace_fp("%p: call process rt:%u", impl, impl->process_rt); + if (impl->direction == SPA_DIRECTION_OUTPUT && update_requested(impl) <= 0) + return; if (impl->process_rt) spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0); else @@ -563,28 +587,6 @@ static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struc return 0; } -static inline uint32_t update_requested(struct stream *impl) -{ - uint32_t index, id, res = 0; - struct buffer *buffer; - struct spa_io_rate_match *r = impl->rate_match; - - if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) < 1) - return 0; - - id = impl->dequeued.ids[index & MASK_BUFFERS]; - buffer = &impl->buffers[id]; - if (r) { - buffer->this.requested = r->size; - res = r->size > 0 ? 1 : 0; - } else { - buffer->this.requested = impl->quantum; - res = 1; - } - pw_log_trace_fp("%p: update buffer:%u size:%"PRIu64, impl, id, buffer->this.requested); - return res; -} - static int impl_send_command(void *object, const struct spa_command *command) { struct stream *impl = object; @@ -612,10 +614,8 @@ static int impl_send_command(void *object, const struct spa_command *command) if (impl->direction == SPA_DIRECTION_INPUT) impl->io->status = SPA_STATUS_NEED_DATA; - else if (!impl->process_rt && !impl->driving) { - if (update_requested(impl) > 0) - call_process(impl); - } + else if (!impl->process_rt && !impl->driving) + call_process(impl); stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); } @@ -1081,8 +1081,7 @@ again: /* we're not draining, not a driver check if we need to get * more buffers */ if (ask_more) { - if (update_requested(impl) > 0) - call_process(impl); + call_process(impl); /* realtime, we can try again now if there is something. * non-realtime, we will have to try in the next round */ if (impl->process_rt && @@ -2352,7 +2351,7 @@ do_trigger_process(struct spa_loop *loop, int res; if (impl->direction == SPA_DIRECTION_OUTPUT) { if (impl->process_rt) - spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0); + call_process(impl); res = impl->node_methods.process(impl); } else { res = SPA_STATUS_NEED_DATA; @@ -2386,11 +2385,9 @@ int pw_stream_trigger_process(struct pw_stream *stream) if (!impl->driving && !impl->trigger) { res = trigger_request_process(impl); } else { - if (impl->direction == SPA_DIRECTION_OUTPUT && - !impl->process_rt) { - pw_loop_invoke(impl->context->main_loop, - do_call_process, 1, NULL, 0, false, impl); - } + if (!impl->process_rt) + call_process(impl); + res = pw_loop_invoke(impl->context->data_loop, do_trigger_process, 1, NULL, 0, false, impl); }