pulse-server: use the new timer-queue for timeouts

Use the timer queue for scheduling stream and object data timeouts.

This avoids allocating timerfds for these timeouts and the timer queue
can handle many timeouts more efficiently.
This commit is contained in:
Wim Taymans 2025-09-18 13:55:43 +02:00
parent 38cb14d39d
commit ca713c08ee
5 changed files with 14 additions and 32 deletions

View file

@ -59,6 +59,7 @@ struct impl {
struct spa_hook_list hooks;
struct spa_list servers;
struct pw_timer_queue *timer_queue;
struct pw_work_queue *work_queue;
struct spa_list cleanup_clients;

View file

@ -27,6 +27,7 @@ struct manager {
struct pw_manager this;
struct pw_loop *loop;
struct pw_timer_queue *timer_queue;
struct spa_hook core_listener;
struct spa_hook registry_listener;
@ -48,7 +49,7 @@ struct object_data {
struct object *object;
const char *key;
size_t size;
struct spa_source *timer;
struct pw_timer timer;
};
struct object {
@ -178,10 +179,7 @@ static void object_reset_params(struct object *o)
static void object_data_free(struct object_data *d)
{
spa_list_remove(&d->link);
if (d->timer) {
pw_loop_destroy_source(d->object->manager->loop, d->timer);
d->timer = NULL;
}
pw_timer_queue_cancel(&d->timer);
free(d);
}
@ -752,6 +750,7 @@ struct pw_manager *pw_manager_new(struct pw_core *core)
context = pw_core_get_context(core);
m->loop = pw_context_get_main_loop(context);
m->timer_queue = pw_context_get_timer_queue(context);
spa_hook_list_init(&m->hooks);
@ -888,7 +887,7 @@ done:
return SPA_PTROFF(d, sizeof(struct object_data), void);
}
static void object_data_timeout(void *data, uint64_t count)
static void object_data_timeout(void *data)
{
struct object_data *d = data;
struct object *o = d->object;
@ -897,11 +896,6 @@ static void object_data_timeout(void *data, uint64_t count)
pw_log_debug("manager:%p object id:%d data '%s' lifetime ends",
m, o->this.id, d->key);
if (d->timer) {
pw_loop_destroy_source(m->loop, d->timer);
d->timer = NULL;
}
manager_emit_object_data_timeout(m, &o->this, d->key);
}
@ -911,7 +905,6 @@ void *pw_manager_object_add_temporary_data(struct pw_manager_object *obj, const
struct object *o = SPA_CONTAINER_OF(obj, struct object, this);
struct object_data *d;
void *data;
struct timespec timeout = {0}, interval = {0};
data = pw_manager_object_add_data(obj, key, size);
if (data == NULL)
@ -919,14 +912,8 @@ void *pw_manager_object_add_temporary_data(struct pw_manager_object *obj, const
d = SPA_PTROFF(data, -sizeof(struct object_data), void);
if (d->timer == NULL)
d->timer = pw_loop_add_timer(o->manager->loop, object_data_timeout, d);
if (d->timer == NULL)
return NULL;
timeout.tv_sec = lifetime_nsec / SPA_NSEC_PER_SEC;
timeout.tv_nsec = lifetime_nsec % SPA_NSEC_PER_SEC;
pw_loop_update_timer(o->manager->loop, d->timer, &timeout, &interval, false);
pw_timer_queue_add(o->manager->timer_queue, &d->timer, NULL,
lifetime_nsec, object_data_timeout, d);
return data;
}

View file

@ -5524,6 +5524,7 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
impl->main_loop = pw_context_get_main_loop(context);
impl->work_queue = pw_context_get_work_queue(context);
impl->timer_queue = pw_context_get_timer_queue(context);
if (props == NULL)
props = pw_properties_new(NULL, NULL);

View file

@ -40,7 +40,7 @@ static int parse_frac(struct pw_properties *props, const char *key,
return 0;
}
static void create_stream_timeout(void *user_data, uint64_t expirations)
static void create_stream_timeout(void *user_data)
{
struct stream *stream = user_data;
@ -52,9 +52,6 @@ static void create_stream_timeout(void *user_data, uint64_t expirations)
stream->killed = false;
stream_free(stream);
} else {
pw_loop_destroy_source(stream->impl->main_loop, stream->timer);
stream->timer = NULL;
}
}
@ -109,9 +106,8 @@ struct stream *stream_new(struct client *client, enum stream_type type, uint32_t
/* Time out if we don't get a link and can't send a reply to create in 35s. Client will time out in
* 30s and clean up its stream anyway. */
struct timespec create_timeout = { .tv_sec = 35, .tv_nsec = 0 };
stream->timer = pw_loop_add_timer(stream->impl->main_loop, create_stream_timeout, stream);
pw_loop_update_timer(stream->impl->main_loop, stream->timer, &create_timeout, NULL, false);
pw_timer_queue_add(stream->impl->timer_queue, &stream->timer, NULL,
35 * SPA_NSEC_PER_SEC, create_stream_timeout, stream);
return stream;
@ -130,10 +126,7 @@ void stream_free(struct stream *stream)
pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
if (stream->timer) {
pw_loop_destroy_source(stream->impl->main_loop, stream->timer);
stream->timer = NULL;
}
pw_timer_queue_cancel(&stream->timer);
if (stream->drain_tag)
reply_error(client, -1, stream->drain_tag, -ENOENT);

View file

@ -50,7 +50,7 @@ struct stream {
struct pw_stream *stream;
struct spa_hook stream_listener;
struct spa_source *timer;
struct pw_timer timer;
struct spa_io_position *position;
struct spa_ringbuffer ring;