client: Add mediated reader stopping for display proxy

Add display API for negotiated display event reader's early exits.

Closes #28

This patch allows display event readers to exit their polling and
processing loops early based on when something calls wl_display_stop.

TODO: There is still a potential issue of new threads starting
after wl_display_stop has returned, which would need a more
significant change to give the display API control of the
display pointer, and check whether it is nulled before dereferencing
throughout the current API.

Signed-off-by: Robert Beckett <bob.beckett@collabora.com>
This commit is contained in:
Robert Beckett 2019-04-16 17:12:06 +01:00
parent 6d44973710
commit 35f1689aa1
3 changed files with 269 additions and 9 deletions

View file

@ -216,6 +216,12 @@ wl_display_disconnect(struct wl_display *display);
int int
wl_display_get_fd(struct wl_display *display); wl_display_get_fd(struct wl_display *display);
int
wl_display_get_stop_fd(struct wl_display *display);
int
wl_display_stop(struct wl_display *display, int wait_readers);
int int
wl_display_dispatch(struct wl_display *display); wl_display_dispatch(struct wl_display *display);

View file

@ -41,6 +41,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <sys/eventfd.h>
#include "wayland-util.h" #include "wayland-util.h"
#include "wayland-os.h" #include "wayland-os.h"
@ -107,6 +108,9 @@ struct wl_display {
int reader_count; int reader_count;
uint32_t read_serial; uint32_t read_serial;
pthread_cond_t reader_cond; pthread_cond_t reader_cond;
int stopfd;
bool stopped;
pthread_cond_t reader_done;
}; };
/** \endcond */ /** \endcond */
@ -131,6 +135,7 @@ display_wakeup_threads(struct wl_display *display)
++display->read_serial; ++display->read_serial;
pthread_cond_broadcast(&display->reader_cond); pthread_cond_broadcast(&display->reader_cond);
pthread_cond_broadcast(&display->reader_done);
} }
/** /**
@ -1050,6 +1055,9 @@ wl_display_connect_to_fd(int fd)
} }
display->fd = fd; display->fd = fd;
display->stopped = false;
display->stopfd = eventfd(0, EFD_CLOEXEC);
pthread_cond_init(&display->reader_done, NULL);
wl_map_init(&display->objects, WL_MAP_CLIENT_SIDE); wl_map_init(&display->objects, WL_MAP_CLIENT_SIDE);
wl_event_queue_init(&display->default_queue, display); wl_event_queue_init(&display->default_queue, display);
wl_event_queue_init(&display->display_queue, display); wl_event_queue_init(&display->display_queue, display);
@ -1173,6 +1181,7 @@ wl_display_disconnect(struct wl_display *display)
pthread_mutex_destroy(&display->mutex); pthread_mutex_destroy(&display->mutex);
pthread_cond_destroy(&display->reader_cond); pthread_cond_destroy(&display->reader_cond);
close(display->fd); close(display->fd);
close(display->stopfd);
free(display); free(display);
} }
@ -1193,6 +1202,76 @@ wl_display_get_fd(struct wl_display *display)
return display->fd; return display->fd;
} }
/** Get a display context's stop file descriptor
*
* \param display The display context object
* \return Display object stop file descriptor
*
* Return the stop file descriptor associated with a display so it can be
* integrated into the client's main loop. This should be used to break
* out of the poll loop early.
*
* If this fd has POLLIN in it's revents, you should call
* wl_display_cancel_read() then stop all usage of display.
*
* \memberof wl_display
*/
WL_EXPORT int
wl_display_get_stop_fd(struct wl_display *display)
{
return display->stopfd;
}
/** Stop display event processing
*
* \param display The display context object
* \param wait_readers Whether to wait for readers to finish
* \return Display object stop file descriptor
*
* Signal to current readers that they should immediately stop
* processing events. This can be used to safely force a
* mediated shutdown.
*
* If a thread is polling on the stop fd it will receive an event
* and should cancel it's read and exit its processing loop.
* If a thread is waiting for an event reader to finish it will
* be woken up and should exit the wl_display_read_events() call.
*
* If wait_readers is non-zero this call will block until all
* readers have indicated that they have stopped processing.
*
* Once this call returns it is up to the calling process to
* ensure that new reader threads are not created to start processing
* the display events again. At this point it can be assumed to be
* safe to disconnect the display.
*
* \memberof wl_display
*/
WL_EXPORT int
wl_display_stop(struct wl_display *display, int wait_readers)
{
int ret = 0;
pthread_mutex_lock(&display->mutex);
display->stopped = true;
if (display->stopfd >= 0) {
uint64_t stopval = 1;
if (write(display->stopfd, &stopval, sizeof(stopval)) != sizeof(stopval))
wl_log("Unable to wake up any pollers\n");
}
display_wakeup_threads(display);
if (wait_readers) {
while (display->reader_count)
pthread_cond_wait(&display->reader_done, &display->mutex);
}
pthread_mutex_unlock(&display->mutex);
return ret;
}
static void static void
sync_callback(void *data, struct wl_callback *callback, uint32_t serial) sync_callback(void *data, struct wl_callback *callback, uint32_t serial)
{ {
@ -1454,7 +1533,7 @@ dispatch_event(struct wl_display *display, struct wl_event_queue *queue)
static int static int
read_events(struct wl_display *display) read_events(struct wl_display *display)
{ {
int total, rem, size; int total, rem, size, ret = 0;
uint32_t serial; uint32_t serial;
display->reader_count--; display->reader_count--;
@ -1466,25 +1545,29 @@ read_events(struct wl_display *display)
* the reader_count dropped to 0 */ * the reader_count dropped to 0 */
display_wakeup_threads(display); display_wakeup_threads(display);
return 0; ret = 0;
goto exit;
} }
display_fatal_error(display, errno); display_fatal_error(display, errno);
return -1; ret = -1;
goto exit;
} else if (total == 0) { } else if (total == 0) {
/* The compositor has closed the socket. This /* The compositor has closed the socket. This
* should be considered an error so we'll fake * should be considered an error so we'll fake
* an errno */ * an errno */
errno = EPIPE; errno = EPIPE;
display_fatal_error(display, errno); display_fatal_error(display, errno);
return -1; ret = -1;
goto exit;
} }
for (rem = total; rem >= 8; rem -= size) { for (rem = total; rem >= 8; rem -= size) {
size = queue_event(display, rem); size = queue_event(display, rem);
if (size == -1) { if (size == -1) {
display_fatal_error(display, errno); display_fatal_error(display, errno);
return -1; ret = -1;
goto exit;
} else if (size == 0) { } else if (size == 0) {
break; break;
} }
@ -1499,11 +1582,19 @@ read_events(struct wl_display *display)
if (display->last_error) { if (display->last_error) {
errno = display->last_error; errno = display->last_error;
return -1; ret = -1;
goto exit;
}
if (display->stopped) {
errno = ECANCELED;
ret = -1;
goto exit;
} }
} }
return 0; exit:
pthread_cond_broadcast(&display->reader_done);
return ret;
} }
static void static void
@ -1512,6 +1603,8 @@ cancel_read(struct wl_display *display)
display->reader_count--; display->reader_count--;
if (display->reader_count == 0) if (display->reader_count == 0)
display_wakeup_threads(display); display_wakeup_threads(display);
else
pthread_cond_broadcast(&display->reader_done);
} }
/** Read events from display file descriptor /** Read events from display file descriptor
@ -1565,6 +1658,8 @@ wl_display_read_events(struct wl_display *display)
} }
ret = read_events(display); ret = read_events(display);
if (ret == ECANCELED)
cancel_read(display);
pthread_mutex_unlock(&display->mutex); pthread_mutex_unlock(&display->mutex);
@ -1720,12 +1815,18 @@ static int
wl_display_poll(struct wl_display *display, short int events) wl_display_poll(struct wl_display *display, short int events)
{ {
int ret; int ret;
struct pollfd pfd[1]; struct pollfd pfd[2];
pfd[0].fd = display->fd; pfd[0].fd = display->fd;
pfd[0].events = events; pfd[0].events = events;
pfd[1].fd = display->stopfd;
pfd[1].events = POLLIN;
do { do {
ret = poll(pfd, 1, -1); ret = poll(pfd, 2, -1);
if (pfd[1].revents & POLLIN) {
errno = ECANCELED;
return -1;
}
} while (ret == -1 && errno == EINTR); } while (ret == -1 && errno == EINTR);
return ret; return ret;

View file

@ -1629,3 +1629,156 @@ TEST(global_remove)
display_destroy(d); display_destroy(d);
} }
static void *
thread_prepare_and_read_nocheck(void *data)
{
struct client *c = data;
register_reading(c->wl_display);
c->display_stopped = 1;
wl_display_read_events(c->wl_display);
wl_display_dispatch_pending(c->wl_display);
pthread_exit(NULL);
}
static void
threading_stop_readers(void)
{
struct client *c = client_connect();
pthread_t th1, th2, th3;
register_reading(c->wl_display);
th1 = create_thread(c, thread_prepare_and_read_nocheck);
th2 = create_thread(c, thread_prepare_and_read_nocheck);
th3 = create_thread(c, thread_prepare_and_read_nocheck);
test_set_timeout(3);
wl_display_cancel_read(c->wl_display);
wl_display_stop(c->wl_display, 1);
pthread_join(th1, NULL);
pthread_join(th2, NULL);
pthread_join(th3, NULL);
client_disconnect(c);
}
TEST(threading_stop_readers_tst)
{
struct display *d = display_create();
client_create_noarg(d, threading_stop_readers);
display_run(d);
display_destroy(d);
}
static void *
thread_prepare_and_poll(void *data)
{
struct client *c = data;
int ret = 0;
struct pollfd pfd[2];
register_reading(c->wl_display);
c->display_stopped = 1;
pfd[0].fd = wl_display_get_fd(c->wl_display);
pfd[0].events = POLLIN;
pfd[1].fd = wl_display_get_stop_fd(c->wl_display);
pfd[1].events = POLLIN;
do {
ret = poll(pfd, 2, -1);
if (pfd[1].revents & POLLIN) {
wl_display_cancel_read(c->wl_display);
break;
}
} while (ret == -1 && errno == EINTR);
pthread_exit(NULL);
}
static void
threading_stop_pollers(void)
{
struct client *c = client_connect();
pthread_t th1, th2, th3;
register_reading(c->wl_display);
th1 = create_thread(c, thread_prepare_and_poll);
th2 = create_thread(c, thread_prepare_and_poll);
th3 = create_thread(c, thread_prepare_and_poll);
test_set_timeout(3);
wl_display_cancel_read(c->wl_display);
wl_display_stop(c->wl_display, 1);
pthread_join(th1, NULL);
pthread_join(th2, NULL);
pthread_join(th3, NULL);
client_disconnect(c);
}
TEST(threading_stop_pollers_tst)
{
struct display *d = display_create();
client_create_noarg(d, threading_stop_pollers);
display_run(d);
display_destroy(d);
}
static void *
thread_dispatch_loop(void *data)
{
struct client *c = data;
int ret = 0;
c->display_stopped = 1;
do {
ret = wl_display_dispatch(c->wl_display);
} while (ret == 0);
pthread_exit(NULL);
}
static void
threading_stop_dispatch_loop(void)
{
struct client *c = client_connect();
pthread_t th1, th2, th3;
register_reading(c->wl_display);
th1 = create_thread(c, thread_dispatch_loop);
th2 = create_thread(c, thread_dispatch_loop);
th3 = create_thread(c, thread_dispatch_loop);
test_set_timeout(3);
wl_display_cancel_read(c->wl_display);
wl_display_stop(c->wl_display, 1);
pthread_join(th1, NULL);
pthread_join(th2, NULL);
pthread_join(th3, NULL);
client_disconnect(c);
}
TEST(threading_stop_dispatch_loop_tst)
{
struct display *d = display_create();
client_create_noarg(d, threading_stop_dispatch_loop);
display_run(d);
display_destroy(d);
}