Change filedescriptor API to be thread safe

The update callback for the file descriptors was always a bit awkward and
un-intuitive.  The idea was that whenever the protocol code needed to
write data to the fd it would call the 'update' function.  This function
would adjust the mainloop so that it polls for POLLOUT on the fd so we
can eventually flush the data to the socket.

The problem is that in multi-threaded applications, any thread can issue
a request, which writes data to the output buffer and thus triggers the
update callback.  Thus, we'll be calling out with the display mutex
held and may call from any thread.

The solution is to eliminate the udpate callback and just require that
the application or server flushes all connection buffers before blocking.
This turns out to be a simpler API, although we now require clients to
deal with EAGAIN and non-blocking writes.  It also saves a few syscalls,
since the socket will be writable most of the time and most writes will
complete, so we avoid changing epoll to poll for POLLOUT, then write and
then change it back for each write.
This commit is contained in:
Kristian Høgsberg 2012-10-04 16:54:22 -04:00
parent 0371668dcc
commit 53d24713a3
9 changed files with 150 additions and 251 deletions

View file

@ -1,5 +1,6 @@
/*
* Copyright © 2008 Kristian Høgsberg
* Copyright © 2008-2012 Kristian Høgsberg
* Copyright © 2010-2012 Intel Corporation
*
* Permission to use, copy, modify, distribute, and sell this software and its
* documentation for any purpose is hereby granted without fee, provided that
@ -64,34 +65,16 @@ struct wl_display {
struct wl_connection *connection;
int fd;
int close_fd;
uint32_t mask;
struct wl_map objects;
struct wl_list global_listener_list;
struct wl_list global_list;
wl_display_update_func_t update;
void *update_data;
wl_display_global_func_t global_handler;
void *global_handler_data;
};
static int wl_debug = 0;
static int
connection_update(struct wl_connection *connection,
uint32_t mask, void *data)
{
struct wl_display *display = data;
display->mask = mask;
if (display->update)
return display->update(display->mask,
display->update_data);
return 0;
}
WL_EXPORT struct wl_global_listener *
wl_display_add_global_listener(struct wl_display *display,
wl_display_global_func_t handler, void *data)
@ -387,8 +370,7 @@ wl_display_connect_to_fd(int fd)
display->proxy.object.implementation = (void(**)(void)) &display_listener;
display->proxy.user_data = display;
display->connection = wl_connection_create(display->fd,
connection_update, display);
display->connection = wl_connection_create(display->fd);
if (display->connection == NULL) {
wl_map_release(&display->objects);
close(display->fd);
@ -451,16 +433,8 @@ wl_display_disconnect(struct wl_display *display)
}
WL_EXPORT int
wl_display_get_fd(struct wl_display *display,
wl_display_update_func_t update, void *data)
wl_display_get_fd(struct wl_display *display)
{
display->update = update;
display->update_data = data;
if (display->update)
display->update(display->mask,
display->update_data);
return display->fd;
}
@ -486,9 +460,8 @@ wl_display_roundtrip(struct wl_display *display)
done = 0;
callback = wl_display_sync(display);
wl_callback_add_listener(callback, &sync_listener, &done);
wl_display_flush(display);
while (!done)
wl_display_iterate(display, WL_DISPLAY_READABLE);
wl_display_dispatch(display);
}
static int
@ -563,20 +536,17 @@ handle_event(struct wl_display *display,
wl_closure_destroy(closure);
}
WL_EXPORT void
wl_display_iterate(struct wl_display *display, uint32_t mask)
WL_EXPORT int
wl_display_dispatch(struct wl_display *display)
{
uint32_t p[2], object;
int len, opcode, size;
mask &= display->mask;
if (mask == 0) {
fprintf(stderr,
"wl_display_iterate called with unsolicited flags\n");
return;
}
/* FIXME: Handle flush errors, EAGAIN... */
wl_display_flush(display);
len = wl_connection_data(display->connection, mask);
/* FIXME: Shouldn't always read here... */
len = wl_connection_read(display->connection);
while (len > 0) {
if ((size_t) len < sizeof p)
@ -593,17 +563,13 @@ wl_display_iterate(struct wl_display *display, uint32_t mask)
len -= size;
}
if (len < 0) {
fprintf(stderr, "read error: %m\n");
exit(EXIT_FAILURE);
}
return len;
}
WL_EXPORT void
WL_EXPORT int
wl_display_flush(struct wl_display *display)
{
while (display->mask & WL_DISPLAY_WRITABLE)
wl_display_iterate (display, WL_DISPLAY_WRITABLE);
return wl_connection_flush(display->connection);
}
WL_EXPORT void *