timer-queue: add a new timer queue helper

This allows you to schedule timeouts. It keeps a sorted list of
timeouts and uses just 1 timerfd to schedule the head of the timeout
list.
This commit is contained in:
Wim Taymans 2025-09-18 13:52:51 +02:00 committed by Arun Raghavan
parent ca51f72900
commit f2a11fadf4
9 changed files with 265 additions and 0 deletions

View file

@ -43,6 +43,7 @@ This determines the ordering of items in Doxygen sidebar.
\addtogroup pw_protocol
\addtogroup pw_resource
\addtogroup pw_thread_loop
\addtogroup pw_timer_queue
\addtogroup pw_work_queue
\}

View file

@ -580,6 +580,8 @@ void pw_context_destroy(struct pw_context *context)
if (context->work_queue)
pw_work_queue_destroy(context->work_queue);
if (context->timer_queue)
pw_timer_queue_destroy(context->timer_queue);
pw_properties_free(context->properties);
pw_properties_free(context->conf);
@ -750,6 +752,14 @@ struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context)
return context->work_queue;
}
SPA_EXPORT
struct pw_timer_queue *pw_context_get_timer_queue(struct pw_context *context)
{
if (context->timer_queue == NULL)
context->timer_queue = pw_timer_queue_new(context->main_loop);
return context->timer_queue;
}
SPA_EXPORT
struct pw_mempool *pw_context_get_mempool(struct pw_context *context)
{

View file

@ -144,6 +144,9 @@ void pw_context_release_loop(struct pw_context *context, struct pw_loop *loop);
/** Get the work queue from the context: Since 0.3.26 */
struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context);
/** Get the timer queue from the context: Since 1.6.0 */
struct pw_timer_queue *pw_context_get_timer_queue(struct pw_context *context);
/** Get the memory pool from the context: Since 0.3.74 */
struct pw_mempool *pw_context_get_mempool(struct pw_context *context);

View file

@ -69,6 +69,7 @@ PW_LOG_TOPIC(log_proxy, "pw.proxy");
PW_LOG_TOPIC(log_resource, "pw.resource");
PW_LOG_TOPIC(log_stream, "pw.stream");
PW_LOG_TOPIC(log_thread_loop, "pw.thread-loop");
PW_LOG_TOPIC(log_timer_queue, "pw.timer-queue");
PW_LOG_TOPIC(log_work_queue, "pw.work-queue");
PW_LOG_TOPIC(PW_LOG_TOPIC_DEFAULT, "default");

View file

@ -41,6 +41,7 @@ pipewire_headers = [
'stream.h',
'thread.h',
'thread-loop.h',
'timer-queue.h',
'type.h',
'utils.h',
'work-queue.h',
@ -78,6 +79,7 @@ pipewire_sources = [
'stream.c',
'thread.c',
'thread-loop.c',
'timer-queue.c',
'utils.c',
'work-queue.c',
]

View file

@ -37,6 +37,7 @@ extern "C" {
#include <pipewire/filter.h>
#include <pipewire/thread-loop.h>
#include <pipewire/data-loop.h>
#include <pipewire/timer-queue.h>
#include <pipewire/type.h>
#include <pipewire/utils.h>
#include <pipewire/version.h>

View file

@ -420,6 +420,7 @@ struct pw_context {
struct spa_thread_utils *thread_utils;
struct pw_loop *main_loop; /**< main loop for control */
struct pw_work_queue *work_queue; /**< work queue */
struct pw_timer_queue *timer_queue; /**< timer queue */
struct spa_support support[16]; /**< support for spa plugins */
uint32_t n_support; /**< number of support items */

195
src/pipewire/timer-queue.c Normal file
View file

@ -0,0 +1,195 @@
/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2025 Wim Taymans */
/* SPDX-License-Identifier: MIT */
#include <pipewire/log.h>
#include "timer-queue.h"
PW_LOG_TOPIC_EXTERN(log_timer_queue);
#define PW_LOG_TOPIC_DEFAULT log_timer_queue
struct pw_timer_queue {
struct pw_loop *loop;
struct spa_list entries;
struct timespec *next_timeout;
struct spa_source *timer;
};
static void rearm_timer(struct pw_timer_queue *queue)
{
struct timespec *timeout = NULL;
struct pw_timer *timer;
if (!spa_list_is_empty(&queue->entries)) {
timer = spa_list_first(&queue->entries, struct pw_timer, link);
timeout = &timer->timeout;
}
if (timeout != queue->next_timeout) {
if (timeout)
pw_log_debug("%p: arming with timeout %ld.%09ld", queue,
timeout->tv_sec, timeout->tv_nsec);
else
pw_log_debug("%p: disarming (no entries)", queue);
queue->next_timeout = timeout;
pw_loop_update_timer(queue->loop, queue->timer,
timeout, NULL, true);
}
}
static void timer_timeout(void *user_data, uint64_t expirations)
{
struct pw_timer_queue *queue = user_data;
struct pw_timer *timer;
pw_log_debug("%p: timeout fired, expirations=%"PRIu64, queue, expirations);
if (spa_list_is_empty(&queue->entries)) {
pw_log_debug("%p: no entries to process", queue);
return;
}
timer = spa_list_first(&queue->entries, struct pw_timer, link);
if (&timer->timeout != queue->next_timeout) {
/* this can happen when the timer expired but before we could
* dispatch the event, the timer got removed or a new one got
* added. The timer does not match the one we last scheduled
* and we need to wait for the rescheduled timer instead */
pw_log_debug("%p: timer was rearmed", queue);
return;
}
pw_log_debug("%p: processing timer %p", queue, timer);
timer->queue = NULL;
spa_list_remove(&timer->link);
timer->callback(timer->data);
rearm_timer(queue);
}
SPA_EXPORT
struct pw_timer_queue *pw_timer_queue_new(struct pw_loop *loop)
{
struct pw_timer_queue *queue;
int res;
queue = calloc(1, sizeof(struct pw_timer_queue));
if (queue == NULL)
return NULL;
queue->loop = loop;
queue->timer = pw_loop_add_timer(loop, timer_timeout, queue);
if (queue->timer == NULL) {
res = -errno;
goto error_free;
}
spa_list_init(&queue->entries);
pw_log_debug("%p: initialized", queue);
return queue;
error_free:
free(queue);
errno = -res;
return NULL;
}
SPA_EXPORT
void pw_timer_queue_destroy(struct pw_timer_queue *queue)
{
struct pw_timer *timer;
int count = 0;
pw_log_debug("%p: clearing", queue);
if (queue->timer)
pw_loop_destroy_source(queue->loop, queue->timer);
spa_list_consume(timer, &queue->entries, link) {
timer->queue = NULL;
spa_list_remove(&timer->link);
count++;
}
if (count > 0)
pw_log_debug("%p: cancelled %d entries", queue, count);
free(queue);
}
static int timespec_compare(const struct timespec *a, const struct timespec *b)
{
if (a->tv_sec < b->tv_sec)
return -1;
if (a->tv_sec > b->tv_sec)
return 1;
if (a->tv_nsec < b->tv_nsec)
return -1;
if (a->tv_nsec > b->tv_nsec)
return 1;
return 0;
}
SPA_EXPORT
int pw_timer_queue_add(struct pw_timer_queue *queue, struct pw_timer *timer,
struct timespec *abs_time, int64_t timeout_ns,
pw_timer_callback callback, void *data)
{
struct timespec timeout;
struct pw_timer *iter;
if (timer->queue != NULL)
return -EBUSY;
if (abs_time == NULL) {
/* Use CLOCK_MONOTONIC to match the timerfd clock used by SPA loop */
if (clock_gettime(CLOCK_MONOTONIC, &timeout) < 0)
return -errno;
} else {
timeout = *abs_time;
}
if (timeout_ns > 0) {
timeout.tv_sec += timeout_ns / SPA_NSEC_PER_SEC;
timeout.tv_nsec += timeout_ns % SPA_NSEC_PER_SEC;
if (timeout.tv_nsec >= SPA_NSEC_PER_SEC) {
timeout.tv_sec++;
timeout.tv_nsec -= SPA_NSEC_PER_SEC;
}
}
timer->queue = queue;
timer->timeout = timeout;
timer->callback = callback;
timer->data = data;
pw_log_debug("%p: adding timer %p with timeout %ld.%09ld",
queue, timer, timeout.tv_sec, timeout.tv_nsec);
/* Insert timer in sorted order (earliest timeout first) */
spa_list_for_each(iter, &queue->entries, link) {
if (timespec_compare(&timer->timeout, &iter->timeout) < 0)
break;
}
spa_list_append(&iter->link, &timer->link);
rearm_timer(queue);
return 0;
}
SPA_EXPORT
int pw_timer_queue_cancel(struct pw_timer *timer)
{
struct pw_timer_queue *queue = timer->queue;
if (queue == NULL)
return 0;
pw_log_debug("%p: cancelling timer %p", queue, timer);
timer->queue = NULL;
spa_list_remove(&timer->link);
rearm_timer(queue);
return 0;
}

View file

@ -0,0 +1,51 @@
/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2025 Wim Taymans */
/* SPDX-License-Identifier: MIT */
#ifndef PIPEWIRE_TIMER_QUEUE_H
#define PIPEWIRE_TIMER_QUEUE_H
#ifdef __cplusplus
extern "C" {
#endif
/** \defgroup pw_timer_queue Timer Queue
* Processing of timer events.
*/
/**
* \addtogroup pw_timer_queue
* \{
*/
struct pw_timer_queue;
#include <pipewire/loop.h>
typedef void (*pw_timer_callback) (void *data);
struct pw_timer {
struct spa_list link;
struct pw_timer_queue *queue;
struct timespec timeout;
pw_timer_callback callback;
void *data;
uint32_t padding[16];
};
struct pw_timer_queue *pw_timer_queue_new(struct pw_loop *loop);
void pw_timer_queue_destroy(struct pw_timer_queue *queue);
int pw_timer_queue_add(struct pw_timer_queue *queue, struct pw_timer *timer,
struct timespec *abs_time, int64_t timeout_ns,
pw_timer_callback callback, void *data);
int pw_timer_queue_cancel(struct pw_timer *timer);
/**
* \}
*/
#ifdef __cplusplus
}
#endif
#endif /* PIPEWIRE_TIMER_QUEUE_H */