diff --git a/doc/tree.dox b/doc/tree.dox index ecc43d604..f62bc62dd 100644 --- a/doc/tree.dox +++ b/doc/tree.dox @@ -43,6 +43,7 @@ This determines the ordering of items in Doxygen sidebar. \addtogroup pw_protocol \addtogroup pw_resource \addtogroup pw_thread_loop +\addtogroup pw_timer_queue \addtogroup pw_work_queue \} diff --git a/src/pipewire/context.c b/src/pipewire/context.c index 29065ef83..7b6d426ab 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -582,6 +582,8 @@ void pw_context_destroy(struct pw_context *context) if (context->work_queue) pw_work_queue_destroy(context->work_queue); + if (context->timer_queue) + pw_timer_queue_destroy(context->timer_queue); pw_properties_free(context->properties); pw_properties_free(context->conf); @@ -752,6 +754,14 @@ struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context) return context->work_queue; } +SPA_EXPORT +struct pw_timer_queue *pw_context_get_timer_queue(struct pw_context *context) +{ + if (context->timer_queue == NULL) + context->timer_queue = pw_timer_queue_new(context->main_loop); + return context->timer_queue; +} + SPA_EXPORT struct pw_mempool *pw_context_get_mempool(struct pw_context *context) { diff --git a/src/pipewire/context.h b/src/pipewire/context.h index eef297b4f..61c6662c4 100644 --- a/src/pipewire/context.h +++ b/src/pipewire/context.h @@ -144,6 +144,9 @@ void pw_context_release_loop(struct pw_context *context, struct pw_loop *loop); /** Get the work queue from the context: Since 0.3.26 */ struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context); +/** Get the timer queue from the context: Since 1.6.0 */ +struct pw_timer_queue *pw_context_get_timer_queue(struct pw_context *context); + /** Get the memory pool from the context: Since 0.3.74 */ struct pw_mempool *pw_context_get_mempool(struct pw_context *context); diff --git a/src/pipewire/log.c b/src/pipewire/log.c index c8410a6f5..7e7d6a66d 100644 --- a/src/pipewire/log.c +++ b/src/pipewire/log.c @@ -69,6 +69,7 @@ PW_LOG_TOPIC(log_proxy, "pw.proxy"); PW_LOG_TOPIC(log_resource, "pw.resource"); PW_LOG_TOPIC(log_stream, "pw.stream"); PW_LOG_TOPIC(log_thread_loop, "pw.thread-loop"); +PW_LOG_TOPIC(log_timer_queue, "pw.timer-queue"); PW_LOG_TOPIC(log_work_queue, "pw.work-queue"); PW_LOG_TOPIC(PW_LOG_TOPIC_DEFAULT, "default"); diff --git a/src/pipewire/meson.build b/src/pipewire/meson.build index b19631a92..9769369a1 100644 --- a/src/pipewire/meson.build +++ b/src/pipewire/meson.build @@ -41,6 +41,7 @@ pipewire_headers = [ 'stream.h', 'thread.h', 'thread-loop.h', + 'timer-queue.h', 'type.h', 'utils.h', 'work-queue.h', @@ -78,6 +79,7 @@ pipewire_sources = [ 'stream.c', 'thread.c', 'thread-loop.c', + 'timer-queue.c', 'utils.c', 'work-queue.c', ] diff --git a/src/pipewire/pipewire.h b/src/pipewire/pipewire.h index b3aa58d1a..6f26a38b9 100644 --- a/src/pipewire/pipewire.h +++ b/src/pipewire/pipewire.h @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include diff --git a/src/pipewire/private.h b/src/pipewire/private.h index e886f54e1..b5d609256 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -405,6 +405,7 @@ struct pw_context { struct spa_thread_utils *thread_utils; struct pw_loop *main_loop; /**< main loop for control */ struct pw_work_queue *work_queue; /**< work queue */ + struct pw_timer_queue *timer_queue; /**< timer queue */ struct spa_support support[16]; /**< support for spa plugins */ uint32_t n_support; /**< number of support items */ diff --git a/src/pipewire/timer-queue.c b/src/pipewire/timer-queue.c new file mode 100644 index 000000000..8f2948597 --- /dev/null +++ b/src/pipewire/timer-queue.c @@ -0,0 +1,195 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2025 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include + +#include "timer-queue.h" + +PW_LOG_TOPIC_EXTERN(log_timer_queue); +#define PW_LOG_TOPIC_DEFAULT log_timer_queue + +struct pw_timer_queue { + struct pw_loop *loop; + struct spa_list entries; + struct timespec *next_timeout; + struct spa_source *timer; +}; + +static void rearm_timer(struct pw_timer_queue *queue) +{ + struct timespec *timeout = NULL; + struct pw_timer *timer; + + if (!spa_list_is_empty(&queue->entries)) { + timer = spa_list_first(&queue->entries, struct pw_timer, link); + timeout = &timer->timeout; + } + if (timeout != queue->next_timeout) { + if (timeout) + pw_log_debug("%p: arming with timeout %ld.%09ld", queue, + timeout->tv_sec, timeout->tv_nsec); + else + pw_log_debug("%p: disarming (no entries)", queue); + + queue->next_timeout = timeout; + pw_loop_update_timer(queue->loop, queue->timer, + timeout, NULL, true); + } +} + +static void timer_timeout(void *user_data, uint64_t expirations) +{ + struct pw_timer_queue *queue = user_data; + struct pw_timer *timer; + + pw_log_debug("%p: timeout fired, expirations=%"PRIu64, queue, expirations); + + if (spa_list_is_empty(&queue->entries)) { + pw_log_debug("%p: no entries to process", queue); + return; + } + timer = spa_list_first(&queue->entries, struct pw_timer, link); + if (&timer->timeout != queue->next_timeout) { + /* this can happen when the timer expired but before we could + * dispatch the event, the timer got removed or a new one got + * added. The timer does not match the one we last scheduled + * and we need to wait for the rescheduled timer instead */ + pw_log_debug("%p: timer was rearmed", queue); + return; + } + + pw_log_debug("%p: processing timer %p", queue, timer); + timer->queue = NULL; + spa_list_remove(&timer->link); + + timer->callback(timer->data); + + rearm_timer(queue); +} + +SPA_EXPORT +struct pw_timer_queue *pw_timer_queue_new(struct pw_loop *loop) +{ + struct pw_timer_queue *queue; + int res; + + queue = calloc(1, sizeof(struct pw_timer_queue)); + if (queue == NULL) + return NULL; + + queue->loop = loop; + queue->timer = pw_loop_add_timer(loop, timer_timeout, queue); + if (queue->timer == NULL) { + res = -errno; + goto error_free; + } + + spa_list_init(&queue->entries); + pw_log_debug("%p: initialized", queue); + return queue; + +error_free: + free(queue); + errno = -res; + return NULL; +} + +SPA_EXPORT +void pw_timer_queue_destroy(struct pw_timer_queue *queue) +{ + struct pw_timer *timer; + int count = 0; + + pw_log_debug("%p: clearing", queue); + + if (queue->timer) + pw_loop_destroy_source(queue->loop, queue->timer); + + spa_list_consume(timer, &queue->entries, link) { + timer->queue = NULL; + spa_list_remove(&timer->link); + count++; + } + if (count > 0) + pw_log_debug("%p: cancelled %d entries", queue, count); + + free(queue); +} + +static int timespec_compare(const struct timespec *a, const struct timespec *b) +{ + if (a->tv_sec < b->tv_sec) + return -1; + if (a->tv_sec > b->tv_sec) + return 1; + if (a->tv_nsec < b->tv_nsec) + return -1; + if (a->tv_nsec > b->tv_nsec) + return 1; + return 0; +} + +SPA_EXPORT +int pw_timer_queue_add(struct pw_timer_queue *queue, struct pw_timer *timer, + struct timespec *abs_time, int64_t timeout_ns, + pw_timer_callback callback, void *data) +{ + struct timespec timeout; + struct pw_timer *iter; + + if (timer->queue != NULL) + return -EBUSY; + + if (abs_time == NULL) { + /* Use CLOCK_MONOTONIC to match the timerfd clock used by SPA loop */ + if (clock_gettime(CLOCK_MONOTONIC, &timeout) < 0) + return -errno; + } else { + timeout = *abs_time; + } + if (timeout_ns > 0) { + timeout.tv_sec += timeout_ns / SPA_NSEC_PER_SEC; + timeout.tv_nsec += timeout_ns % SPA_NSEC_PER_SEC; + if (timeout.tv_nsec >= SPA_NSEC_PER_SEC) { + timeout.tv_sec++; + timeout.tv_nsec -= SPA_NSEC_PER_SEC; + } + } + + timer->queue = queue; + timer->timeout = timeout; + timer->callback = callback; + timer->data = data; + + pw_log_debug("%p: adding timer %p with timeout %ld.%09ld", + queue, timer, timeout.tv_sec, timeout.tv_nsec); + + /* Insert timer in sorted order (earliest timeout first) */ + spa_list_for_each(iter, &queue->entries, link) { + if (timespec_compare(&timer->timeout, &iter->timeout) < 0) + break; + } + spa_list_append(&iter->link, &timer->link); + + rearm_timer(queue); + return 0; +} + +SPA_EXPORT +int pw_timer_queue_cancel(struct pw_timer *timer) +{ + struct pw_timer_queue *queue = timer->queue; + + if (queue == NULL) + return 0; + + pw_log_debug("%p: cancelling timer %p", queue, timer); + + timer->queue = NULL; + spa_list_remove(&timer->link); + + rearm_timer(queue); + + return 0; +} diff --git a/src/pipewire/timer-queue.h b/src/pipewire/timer-queue.h new file mode 100644 index 000000000..81dc7b447 --- /dev/null +++ b/src/pipewire/timer-queue.h @@ -0,0 +1,51 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2025 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_TIMER_QUEUE_H +#define PIPEWIRE_TIMER_QUEUE_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** \defgroup pw_timer_queue Timer Queue + * Processing of timer events. + */ + +/** + * \addtogroup pw_timer_queue + * \{ + */ +struct pw_timer_queue; + +#include + +typedef void (*pw_timer_callback) (void *data); + +struct pw_timer { + struct spa_list link; + struct pw_timer_queue *queue; + struct timespec timeout; + pw_timer_callback callback; + void *data; + uint32_t padding[16]; +}; + +struct pw_timer_queue *pw_timer_queue_new(struct pw_loop *loop); +void pw_timer_queue_destroy(struct pw_timer_queue *queue); + +int pw_timer_queue_add(struct pw_timer_queue *queue, struct pw_timer *timer, + struct timespec *abs_time, int64_t timeout_ns, + pw_timer_callback callback, void *data); +int pw_timer_queue_cancel(struct pw_timer *timer); + +/** + * \} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_TIMER_QUEUE_H */