diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index dc935d770..21bf643e1 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -547,6 +547,22 @@ static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struc return 0; } +static inline uint32_t update_requested(struct stream *impl) +{ + uint32_t index, id; + struct buffer *buffer; + struct spa_io_rate_match *r = impl->rate_match; + + if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) < 1) + return 0; + + id = impl->dequeued.ids[index & MASK_BUFFERS]; + buffer = &impl->buffers[id]; + buffer->this.requested = r ? r->size : 0; + pw_log_trace_fp("%p: update buffer:%u size:%u", impl, id, r->size); + return 1; +} + static int impl_send_command(void *object, const struct spa_command *command) { struct stream *impl = object; @@ -574,8 +590,10 @@ static int impl_send_command(void *object, const struct spa_command *command) if (impl->direction == SPA_DIRECTION_INPUT) impl->io->status = SPA_STATUS_NEED_DATA; - else if (!impl->process_rt && !impl->driving) - call_process(impl); + else if (!impl->process_rt && !impl->driving) { + if (update_requested(impl) > 0) + call_process(impl); + } stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); } @@ -1029,12 +1047,13 @@ again: if (!impl->process_rt && (recycled || res == SPA_STATUS_NEED_DATA)) { /* not realtime and we have a free buffer, trigger process so that we have * data in the next round. */ - if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0) + if (update_requested(impl) > 0) call_process(impl); } else if (res == SPA_STATUS_NEED_DATA) { /* realtime and we don't have a buffer, trigger process and try * again when there is something in the queue now */ - call_process(impl); + if (update_requested(impl) > 0) + call_process(impl); if (impl->draining || spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) goto again; diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 9edc605d7..c4833766a 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -173,11 +173,20 @@ enum pw_stream_state { PW_STREAM_STATE_STREAMING = 3 /**< streaming */ }; +/** a buffer structure obtained from pw_stream_dequeue_buffer(). The size of this + * structure can grow as more field are added in the future */ struct pw_buffer { struct spa_buffer *buffer; /**< the spa buffer */ void *user_data; /**< user data attached to the buffer */ uint64_t size; /**< This field is set by the user and the sum of - * all queued buffer is returned in the time info */ + * all queued buffer is returned in the time info. + * For audio, it is advised to use the number of + * samples in the buffer for this field. */ + uint64_t requested; /**< For playback streams, this field contains the + * suggested amount of data to provide. For audio + * streams this will be the amount of samples + * required by the resampler. This field is 0 + * when no suggestion is provided. Since 0.3.49 */ }; struct pw_stream_control { @@ -191,7 +200,47 @@ struct pw_stream_control { uint32_t max_values; /**< max values that can be set on this control */ }; -/** A time structure */ +/** A time structure. + * + * Use pw_stream_get_time() to get an updated time snapshot of the stream. + * The time snapshot can give information about the time in the driver of the + * graph, the delay to the edge of the graph and the internal queuing in the + * stream. + * + * pw_time.ticks gives a monotonic increasing counter of the time in the graph + * driver. I can be used to generate a timetime to schedule samples as well + * as detect discontinuities in the timeline caused by xruns. + * + * The total delay of data in a stream is the sum of the queued data (not yet + * processed data) and the delay to the edge of the graph, usually a playback + * or capture device. + * + * pw_time.delay is expressed as pw_time.ticks, the time domain of the graph. This + * value, and pw_time.ticks, were captured at pw_time.now and can be extrapolated + * to the current time like this: + * + * struct timespec ts; + * clock_gettime(CLOCK_MONOTONIC, &ts); + * int64_t diff = SPA_TIMESPEC_TO_NSEC(&ts) - pw_time.now; + * int64_t elapsed = (pw_time.rate.denom * diff) / (pw_time.rate.num * SPA_NSEC_PER_SEC); + * + * pw_time.queued is expressed in the time domain of the stream, or the format + * that is used for the buffers of this stream. The value depends on the specific + * format used by the application and how the application manages the pw_buffer.size + * field. For audio applications, it is recommended to use the number of samples in + * the buffer as the pw_buffer.size field to get meaningful pw_time.queued values. + * + * For an audio playback stream, if you were to queue a buffer, the total delay + * in milliseconds for the first sample in the newly queued buffer to arrive in the + * sink can be calculated as: + * + * (pw_time.queued * 1000 / stream.samplerate) + + * ((pw_time.delay - elapsed) * 1000 * pw_time.rate.num / pw_time.rate.denom) + * + * The current extrapolated time (in ms) in the source or sink can be calculated as: + * + * (pw_time.ticks + elapsed) * 1000 * pw_time.rate.num / pw_time.rate.denom + */ struct pw_time { int64_t now; /**< the monotonic time in nanoseconds. This is the time * when this time report was updated. It is usually @@ -202,21 +251,24 @@ struct pw_time { struct spa_fraction rate; /**< the rate of \a ticks and delay. This is usually * expressed in 1/. */ uint64_t ticks; /**< the ticks at \a now. This is the current time that - * the remote end is reading/writing. */ + * the remote end is reading/writing. This is monotonicaly + * increasing. */ int64_t delay; /**< delay to device. This is the time it will take for - * the next sample in the stream to be presented by + * the next output sample of the stream to be presented by * the playback device or the time a sample traveled * from the capture device. This delay includes the * delay introduced by all filters on the path between * the stream and the device. The delay is normally * constant in a graph and can change when the topology - * of the graph or the quantum changes. */ + * of the graph or the quantum changes. This delay does + * not include the delay caused by queued buffers. */ uint64_t queued; /**< data queued in the stream, this is the sum * of the size fields in the pw_buffer that are * currently queued and, for audio streams, the extra - * data queued in the resampler. For audio streams, it - * is thus highly recommended to use the buffer size - * field as the sample count in the buffer. */ + * number of samples queued in the resampler. For audio + * streams, it is thus highly recommended to use the + * pw_buffer size field as the number of samples in + * the buffer. */ }; #include diff --git a/src/tests/test-stream.c b/src/tests/test-stream.c index cb54eb291..7880968fc 100644 --- a/src/tests/test-stream.c +++ b/src/tests/test-stream.c @@ -67,7 +67,7 @@ static void test_abi(void) TEST_FUNC(ev, test, trigger_done); #if defined(__x86_64__) && defined(__LP64__) - spa_assert_se(sizeof(struct pw_buffer) == 24); + spa_assert_se(sizeof(struct pw_buffer) == 32); spa_assert_se(sizeof(struct pw_time) == 40); #else fprintf(stderr, "%zd\n", sizeof(struct pw_buffer));