mirror of
https://gitlab.freedesktop.org/wayland/wayland.git
synced 2025-10-29 05:40:16 -04:00
client: Add wl_event_queue for multi-thread dispatching
This introduces wl_event_queue, which is what will make multi-threaded wayland clients possible and useful. The driving use case is that of a GL rendering thread that renders and calls eglSwapBuffer independently of a "main thread" that owns the wl_display and handles input events and everything else. In general, the EGL and GL APIs have a threading model that requires the wayland client library to be usable from several threads. Finally, the current callback model gets into trouble even in a single threaded scenario: if we have to block in eglSwapBuffers, we may end up doing unrelated callbacks from within EGL. The wl_event_queue mechanism lets the application (or middleware such as EGL or toolkits) assign a proxy to an event queue. Only events from objects associated with the queue will be put in the queue, and conversely, events from objects associated with the queue will not be queue up anywhere else. The wl_display struct has a built-in event queue, which is considered the main and default event queue. New proxies are associated with the same queue as the object that created them (either the object that a request with a new-id argument was sent to or the object that sent an event with a new-id argument). A proxy can be moved to a different event queue by calling wl_proxy_set_queue(). A subsystem, such as EGL, will then create its own event queue and associate the objects it expects to receive events from with that queue. If EGL needs to block and wait for a certain event, it can keep dispatching event from its queue until that events comes in. This wont call out to unrelated code with an EGL lock held. Similarly, we don't risk the main thread handling an event from an EGL object and then calling into EGL from a different thread without the lock held.
This commit is contained in:
parent
de961dc1f3
commit
385fe30e8b
2 changed files with 123 additions and 35 deletions
|
|
@ -51,6 +51,8 @@ struct wl_global_listener {
|
|||
struct wl_proxy {
|
||||
struct wl_object object;
|
||||
struct wl_display *display;
|
||||
struct wl_event_queue *queue;
|
||||
int id_deleted;
|
||||
void *user_data;
|
||||
};
|
||||
|
||||
|
|
@ -61,14 +63,21 @@ struct wl_global {
|
|||
struct wl_list link;
|
||||
};
|
||||
|
||||
struct wl_event_queue {
|
||||
struct wl_list event_list;
|
||||
pthread_cond_t cond;
|
||||
};
|
||||
|
||||
struct wl_display {
|
||||
struct wl_proxy proxy;
|
||||
struct wl_connection *connection;
|
||||
int fd;
|
||||
int close_fd;
|
||||
pthread_t display_thread;
|
||||
struct wl_map objects;
|
||||
struct wl_list global_listener_list;
|
||||
struct wl_list global_list;
|
||||
struct wl_event_queue queue;
|
||||
pthread_mutex_t mutex;
|
||||
|
||||
wl_display_global_func_t global_handler;
|
||||
|
|
@ -77,6 +86,48 @@ struct wl_display {
|
|||
|
||||
static int wl_debug = 0;
|
||||
|
||||
static void
|
||||
wl_event_queue_init(struct wl_event_queue *queue)
|
||||
{
|
||||
wl_list_init(&queue->event_list);
|
||||
pthread_cond_init(&queue->cond, NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
wl_event_queue_release(struct wl_event_queue *queue)
|
||||
{
|
||||
struct wl_closure *closure;
|
||||
|
||||
while (!wl_list_empty(&queue->event_list)) {
|
||||
closure = container_of(queue->event_list.next,
|
||||
struct wl_closure, link);
|
||||
wl_list_remove(&closure->link);
|
||||
wl_closure_destroy(closure);
|
||||
}
|
||||
pthread_cond_destroy(&queue->cond);
|
||||
}
|
||||
|
||||
WL_EXPORT void
|
||||
wl_event_queue_destroy(struct wl_event_queue *queue)
|
||||
{
|
||||
wl_event_queue_release(queue);
|
||||
free(queue);
|
||||
}
|
||||
|
||||
WL_EXPORT struct wl_event_queue *
|
||||
wl_display_create_queue(struct wl_display *display)
|
||||
{
|
||||
struct wl_event_queue *queue;
|
||||
|
||||
queue = malloc(sizeof *queue);
|
||||
if (queue == NULL)
|
||||
return NULL;
|
||||
|
||||
wl_event_queue_init(queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
WL_EXPORT struct wl_global_listener *
|
||||
wl_display_add_global_listener(struct wl_display *display,
|
||||
wl_display_global_func_t handler, void *data)
|
||||
|
|
@ -120,6 +171,8 @@ wl_proxy_create(struct wl_proxy *factory, const struct wl_interface *interface)
|
|||
proxy->object.interface = interface;
|
||||
proxy->object.implementation = NULL;
|
||||
proxy->display = display;
|
||||
proxy->queue = factory->queue;
|
||||
proxy->id_deleted = 0;
|
||||
|
||||
pthread_mutex_lock(&display->mutex);
|
||||
proxy->object.id = wl_map_insert_new(&display->objects,
|
||||
|
|
@ -144,6 +197,8 @@ wl_proxy_create_for_id(struct wl_proxy *factory,
|
|||
proxy->object.implementation = NULL;
|
||||
proxy->object.id = id;
|
||||
proxy->display = display;
|
||||
proxy->queue = factory->queue;
|
||||
proxy->id_deleted = 0;
|
||||
|
||||
pthread_mutex_lock(&display->mutex);
|
||||
wl_map_insert_at(&display->objects, id, proxy);
|
||||
|
|
@ -157,7 +212,9 @@ wl_proxy_destroy(struct wl_proxy *proxy)
|
|||
{
|
||||
pthread_mutex_lock(&proxy->display->mutex);
|
||||
|
||||
if (proxy->object.id < WL_SERVER_ID_START)
|
||||
if (proxy->id_deleted)
|
||||
wl_map_remove(&proxy->display->objects, proxy->object.id);
|
||||
else if (proxy->object.id < WL_SERVER_ID_START)
|
||||
wl_map_insert_at(&proxy->display->objects,
|
||||
proxy->object.id, WL_ZOMBIE_OBJECT);
|
||||
else
|
||||
|
|
@ -290,7 +347,7 @@ display_handle_delete_id(void *data, struct wl_display *display, uint32_t id)
|
|||
|
||||
proxy = wl_map_lookup(&display->objects, id);
|
||||
if (proxy != WL_ZOMBIE_OBJECT)
|
||||
fprintf(stderr, "server sent delete_id for live object\n");
|
||||
proxy->id_deleted = 1;
|
||||
else
|
||||
wl_map_remove(&display->objects, id);
|
||||
|
||||
|
|
@ -380,6 +437,7 @@ wl_display_connect_to_fd(int fd)
|
|||
wl_map_init(&display->objects);
|
||||
wl_list_init(&display->global_listener_list);
|
||||
wl_list_init(&display->global_list);
|
||||
wl_event_queue_init(&display->queue);
|
||||
|
||||
wl_map_insert_new(&display->objects, WL_MAP_CLIENT_SIDE, NULL);
|
||||
|
||||
|
|
@ -390,6 +448,7 @@ wl_display_connect_to_fd(int fd)
|
|||
display->proxy.display = display;
|
||||
display->proxy.object.implementation = (void(**)(void)) &display_listener;
|
||||
display->proxy.user_data = display;
|
||||
display->proxy.queue = &display->queue;
|
||||
|
||||
display->connection = wl_connection_create(display->fd);
|
||||
if (display->connection == NULL) {
|
||||
|
|
@ -440,6 +499,7 @@ wl_display_disconnect(struct wl_display *display)
|
|||
|
||||
wl_connection_destroy(display->connection);
|
||||
wl_map_release(&display->objects);
|
||||
wl_event_queue_release(&display->queue);
|
||||
wl_list_for_each_safe(global, gnext,
|
||||
&display->global_list, link)
|
||||
wl_global_destroy(global);
|
||||
|
|
@ -486,7 +546,7 @@ wl_display_roundtrip(struct wl_display *display)
|
|||
}
|
||||
|
||||
static int
|
||||
create_proxies(struct wl_display *display, struct wl_closure *closure)
|
||||
create_proxies(struct wl_proxy *sender, struct wl_closure *closure)
|
||||
{
|
||||
struct wl_proxy *proxy;
|
||||
const char *signature;
|
||||
|
|
@ -506,7 +566,7 @@ create_proxies(struct wl_display *display, struct wl_closure *closure)
|
|||
*(void **) closure->args[i] = NULL;
|
||||
break;
|
||||
}
|
||||
proxy = wl_proxy_create_for_id(&display->proxy, id,
|
||||
proxy = wl_proxy_create_for_id(sender, id,
|
||||
closure->message->types[i - 2]);
|
||||
if (proxy == NULL)
|
||||
return -1;
|
||||
|
|
@ -521,7 +581,7 @@ create_proxies(struct wl_display *display, struct wl_closure *closure)
|
|||
}
|
||||
|
||||
static int
|
||||
queue_event(struct wl_display *display, int len, struct wl_list *list)
|
||||
queue_event(struct wl_display *display, int len)
|
||||
{
|
||||
uint32_t p[2], id;
|
||||
int opcode, size;
|
||||
|
|
@ -552,45 +612,50 @@ queue_event(struct wl_display *display, int len, struct wl_list *list)
|
|||
if (wl_debug)
|
||||
wl_closure_print(closure, &proxy->object, false);
|
||||
|
||||
if (closure == NULL || create_proxies(display, closure) < 0) {
|
||||
if (closure == NULL || create_proxies(proxy, closure) < 0) {
|
||||
fprintf(stderr, "Error demarshalling event\n");
|
||||
abort();
|
||||
}
|
||||
|
||||
wl_list_insert(list->prev, &closure->link);
|
||||
if (wl_list_empty(&proxy->queue->event_list))
|
||||
pthread_cond_signal(&proxy->queue->cond);
|
||||
wl_list_insert(proxy->queue->event_list.prev, &closure->link);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
static void
|
||||
dispatch_event(struct wl_display *display, struct wl_closure *closure)
|
||||
dispatch_event(struct wl_display *display, struct wl_event_queue *queue)
|
||||
{
|
||||
struct wl_closure *closure;
|
||||
struct wl_proxy *proxy;
|
||||
uint32_t id;
|
||||
int opcode;
|
||||
|
||||
closure = container_of(queue->event_list.next,
|
||||
struct wl_closure, link);
|
||||
wl_list_remove(&closure->link);
|
||||
id = closure->buffer[0];
|
||||
opcode = closure->buffer[1] & 0xffff;
|
||||
|
||||
pthread_mutex_lock(&display->mutex);
|
||||
proxy = wl_map_lookup(&display->objects, id);
|
||||
|
||||
pthread_mutex_unlock(&display->mutex);
|
||||
|
||||
if (proxy == WL_ZOMBIE_OBJECT)
|
||||
goto skip;
|
||||
|
||||
wl_closure_invoke(closure, &proxy->object,
|
||||
proxy->object.implementation[opcode],
|
||||
proxy->user_data);
|
||||
skip:
|
||||
if (proxy != WL_ZOMBIE_OBJECT)
|
||||
wl_closure_invoke(closure, &proxy->object,
|
||||
proxy->object.implementation[opcode],
|
||||
proxy->user_data);
|
||||
wl_closure_destroy(closure);
|
||||
|
||||
pthread_mutex_lock(&display->mutex);
|
||||
}
|
||||
|
||||
|
||||
WL_EXPORT int
|
||||
wl_display_dispatch(struct wl_display *display)
|
||||
wl_display_dispatch_queue(struct wl_display *display,
|
||||
struct wl_event_queue *queue)
|
||||
{
|
||||
struct wl_list list;
|
||||
struct wl_closure *closure;
|
||||
int len, size;
|
||||
|
||||
|
|
@ -599,29 +664,37 @@ wl_display_dispatch(struct wl_display *display)
|
|||
/* FIXME: Handle flush errors, EAGAIN... */
|
||||
wl_connection_flush(display->connection);
|
||||
|
||||
/* FIXME: Shouldn't always read here... */
|
||||
len = wl_connection_read(display->connection);
|
||||
if (len == -1) {
|
||||
pthread_mutex_unlock(&display->mutex);
|
||||
return -1;
|
||||
if (wl_list_empty(&queue->event_list) &&
|
||||
pthread_equal(display->display_thread, pthread_self())) {
|
||||
len = wl_connection_read(display->connection);
|
||||
if (len == -1) {
|
||||
pthread_mutex_unlock(&display->mutex);
|
||||
return -1;
|
||||
}
|
||||
while (len >= 8) {
|
||||
size = queue_event(display, len);
|
||||
if (size == 0)
|
||||
break;
|
||||
len -= size;
|
||||
}
|
||||
} else if (wl_list_empty(&queue->event_list)) {
|
||||
pthread_cond_wait(&queue->cond, &display->mutex);
|
||||
}
|
||||
|
||||
wl_list_init(&list);
|
||||
while (len >= 8) {
|
||||
size = queue_event(display, len, &list);
|
||||
if (size == 0)
|
||||
break;
|
||||
len -= size;
|
||||
}
|
||||
while (!wl_list_empty(&queue->event_list))
|
||||
dispatch_event(display, queue);
|
||||
|
||||
pthread_mutex_unlock(&display->mutex);
|
||||
|
||||
while (!wl_list_empty(&list)) {
|
||||
closure = container_of(list.next, struct wl_closure, link);
|
||||
dispatch_event(display, closure);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
return len;
|
||||
WL_EXPORT int
|
||||
wl_display_dispatch(struct wl_display *display)
|
||||
{
|
||||
display->display_thread = pthread_self();
|
||||
|
||||
return wl_display_dispatch_queue(display, &display->queue);
|
||||
}
|
||||
|
||||
WL_EXPORT int
|
||||
|
|
@ -687,6 +760,13 @@ wl_proxy_get_id(struct wl_proxy *proxy)
|
|||
return proxy->object.id;
|
||||
}
|
||||
|
||||
|
||||
WL_EXPORT void
|
||||
wl_proxy_set_queue(struct wl_proxy *proxy, struct wl_event_queue *queue)
|
||||
{
|
||||
proxy->queue = queue;
|
||||
}
|
||||
|
||||
WL_EXPORT void
|
||||
wl_log_set_handler_client(wl_log_func_t handler)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -32,6 +32,9 @@ extern "C" {
|
|||
|
||||
struct wl_proxy;
|
||||
struct wl_display;
|
||||
struct wl_event_queue;
|
||||
|
||||
void wl_event_queue_destroy(struct wl_event_queue *queue);
|
||||
|
||||
void wl_proxy_marshal(struct wl_proxy *p, uint32_t opcode, ...);
|
||||
struct wl_proxy *wl_proxy_create(struct wl_proxy *factory,
|
||||
|
|
@ -46,6 +49,7 @@ int wl_proxy_add_listener(struct wl_proxy *proxy,
|
|||
void wl_proxy_set_user_data(struct wl_proxy *proxy, void *user_data);
|
||||
void *wl_proxy_get_user_data(struct wl_proxy *proxy);
|
||||
uint32_t wl_proxy_get_id(struct wl_proxy *proxy);
|
||||
void wl_proxy_set_queue(struct wl_proxy *proxy, struct wl_event_queue *queue);
|
||||
|
||||
void *wl_display_bind(struct wl_display *display,
|
||||
uint32_t name, const struct wl_interface *interface);
|
||||
|
|
@ -74,8 +78,12 @@ struct wl_display *wl_display_connect_to_fd(int fd);
|
|||
void wl_display_disconnect(struct wl_display *display);
|
||||
int wl_display_get_fd(struct wl_display *display);
|
||||
int wl_display_dispatch(struct wl_display *display);
|
||||
int wl_display_dispatch_queue(struct wl_display *display,
|
||||
struct wl_event_queue *queue);
|
||||
|
||||
int wl_display_flush(struct wl_display *display);
|
||||
void wl_display_roundtrip(struct wl_display *display);
|
||||
struct wl_event_queue *wl_display_create_queue(struct wl_display *display);
|
||||
|
||||
struct wl_global_listener;
|
||||
typedef void (*wl_display_global_func_t)(struct wl_display *display,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue