diff --git a/src/wayland-client-core.h b/src/wayland-client-core.h index ce91a6f6..c19622f8 100644 --- a/src/wayland-client-core.h +++ b/src/wayland-client-core.h @@ -234,6 +234,12 @@ wl_display_disconnect(struct wl_display *display); int wl_display_get_fd(struct wl_display *display); +int +wl_display_get_stop_fd(struct wl_display *display); + +int +wl_display_stop(struct wl_display *display, int wait_readers); + int wl_display_dispatch(struct wl_display *display); diff --git a/src/wayland-client.c b/src/wayland-client.c index 63ce0d01..17827121 100644 --- a/src/wayland-client.c +++ b/src/wayland-client.c @@ -41,6 +41,7 @@ #include #include #include +#include #include "wayland-util.h" #include "wayland-os.h" @@ -107,6 +108,9 @@ struct wl_display { int reader_count; uint32_t read_serial; pthread_cond_t reader_cond; + int stopfd; + bool stopped; + pthread_cond_t reader_done; }; /** \endcond */ @@ -131,6 +135,7 @@ display_wakeup_threads(struct wl_display *display) ++display->read_serial; pthread_cond_broadcast(&display->reader_cond); + pthread_cond_broadcast(&display->reader_done); } /** @@ -1148,6 +1153,9 @@ wl_display_connect_to_fd(int fd) } display->fd = fd; + display->stopped = false; + display->stopfd = eventfd(0, EFD_CLOEXEC); + pthread_cond_init(&display->reader_done, NULL); wl_map_init(&display->objects, WL_MAP_CLIENT_SIDE); wl_event_queue_init(&display->default_queue, display); wl_event_queue_init(&display->display_queue, display); @@ -1278,6 +1286,7 @@ wl_display_disconnect(struct wl_display *display) pthread_mutex_destroy(&display->mutex); pthread_cond_destroy(&display->reader_cond); close(display->fd); + close(display->stopfd); free(display); } @@ -1298,6 +1307,76 @@ wl_display_get_fd(struct wl_display *display) return display->fd; } +/** Get a display context's stop file descriptor + * + * \param display The display context object + * \return Display object stop file descriptor + * + * Return the stop file descriptor associated with a display so it can be + * integrated into the client's main loop. This should be used to break + * out of the poll loop early. + * + * If this fd has POLLIN in it's revents, you should call + * wl_display_cancel_read() then stop all usage of display. + * + * \memberof wl_display + */ +WL_EXPORT int +wl_display_get_stop_fd(struct wl_display *display) +{ + return display->stopfd; +} + +/** Stop display event processing + * + * \param display The display context object + * \param wait_readers Whether to wait for readers to finish + * \return Display object stop file descriptor + * + * Signal to current readers that they should immediately stop + * processing events. This can be used to safely force a + * mediated shutdown. + * + * If a thread is polling on the stop fd it will receive an event + * and should cancel it's read and exit its processing loop. + * If a thread is waiting for an event reader to finish it will + * be woken up and should exit the wl_display_read_events() call. + * + * If wait_readers is non-zero this call will block until all + * readers have indicated that they have stopped processing. + * + * Once this call returns it is up to the calling process to + * ensure that new reader threads are not created to start processing + * the display events again. At this point it can be assumed to be + * safe to disconnect the display. + * + * \memberof wl_display + */ +WL_EXPORT int +wl_display_stop(struct wl_display *display, int wait_readers) +{ + int ret = 0; + + pthread_mutex_lock(&display->mutex); + + display->stopped = true; + if (display->stopfd >= 0) { + uint64_t stopval = 1; + if (write(display->stopfd, &stopval, sizeof(stopval)) != sizeof(stopval)) + wl_log("Unable to wake up any pollers\n"); + } + display_wakeup_threads(display); + + if (wait_readers) { + while (display->reader_count) + pthread_cond_wait(&display->reader_done, &display->mutex); + } + + pthread_mutex_unlock(&display->mutex); + + return ret; +} + static void sync_callback(void *data, struct wl_callback *callback, uint32_t serial) { @@ -1577,7 +1656,7 @@ dispatch_event(struct wl_display *display, struct wl_event_queue *queue) static int read_events(struct wl_display *display) { - int total, rem, size; + int total, rem, size, ret = 0; uint32_t serial; display->reader_count--; @@ -1589,25 +1668,29 @@ read_events(struct wl_display *display) * the reader_count dropped to 0 */ display_wakeup_threads(display); - return 0; + ret = 0; + goto exit; } display_fatal_error(display, errno); - return -1; + ret = -1; + goto exit; } else if (total == 0) { /* The compositor has closed the socket. This * should be considered an error so we'll fake * an errno */ errno = EPIPE; display_fatal_error(display, errno); - return -1; + ret = -1; + goto exit; } for (rem = total; rem >= 8; rem -= size) { size = queue_event(display, rem); if (size == -1) { display_fatal_error(display, errno); - return -1; + ret = -1; + goto exit; } else if (size == 0) { break; } @@ -1622,11 +1705,19 @@ read_events(struct wl_display *display) if (display->last_error) { errno = display->last_error; - return -1; + ret = -1; + goto exit; + } + if (display->stopped) { + errno = ECANCELED; + ret = -1; + goto exit; } } - return 0; +exit: + pthread_cond_broadcast(&display->reader_done); + return ret; } static void @@ -1635,6 +1726,8 @@ cancel_read(struct wl_display *display) display->reader_count--; if (display->reader_count == 0) display_wakeup_threads(display); + else + pthread_cond_broadcast(&display->reader_done); } /** Read events from display file descriptor @@ -1688,6 +1781,8 @@ wl_display_read_events(struct wl_display *display) } ret = read_events(display); + if (ret == ECANCELED) + cancel_read(display); pthread_mutex_unlock(&display->mutex); @@ -1843,12 +1938,18 @@ static int wl_display_poll(struct wl_display *display, short int events) { int ret; - struct pollfd pfd[1]; + struct pollfd pfd[2]; pfd[0].fd = display->fd; pfd[0].events = events; + pfd[1].fd = display->stopfd; + pfd[1].events = POLLIN; do { - ret = poll(pfd, 1, -1); + ret = poll(pfd, 2, -1); + if (pfd[1].revents & POLLIN) { + errno = ECANCELED; + return -1; + } } while (ret == -1 && errno == EINTR); return ret; diff --git a/tests/display-test.c b/tests/display-test.c index 3db7c95a..1e8cd661 100644 --- a/tests/display-test.c +++ b/tests/display-test.c @@ -1629,3 +1629,156 @@ TEST(global_remove) display_destroy(d); } + +static void * +thread_prepare_and_read_nocheck(void *data) +{ + struct client *c = data; + + register_reading(c->wl_display); + + c->display_stopped = 1; + + wl_display_read_events(c->wl_display); + wl_display_dispatch_pending(c->wl_display); + + pthread_exit(NULL); +} + +static void +threading_stop_readers(void) +{ + struct client *c = client_connect(); + pthread_t th1, th2, th3; + + register_reading(c->wl_display); + + th1 = create_thread(c, thread_prepare_and_read_nocheck); + th2 = create_thread(c, thread_prepare_and_read_nocheck); + th3 = create_thread(c, thread_prepare_and_read_nocheck); + + test_set_timeout(3); + wl_display_cancel_read(c->wl_display); + wl_display_stop(c->wl_display, 1); + pthread_join(th1, NULL); + pthread_join(th2, NULL); + pthread_join(th3, NULL); + + client_disconnect(c); +} + +TEST(threading_stop_readers_tst) +{ + struct display *d = display_create(); + client_create_noarg(d, threading_stop_readers); + + display_run(d); + + display_destroy(d); +} + +static void * +thread_prepare_and_poll(void *data) +{ + struct client *c = data; + int ret = 0; + struct pollfd pfd[2]; + + register_reading(c->wl_display); + + c->display_stopped = 1; + + pfd[0].fd = wl_display_get_fd(c->wl_display); + pfd[0].events = POLLIN; + pfd[1].fd = wl_display_get_stop_fd(c->wl_display); + pfd[1].events = POLLIN; + + do { + ret = poll(pfd, 2, -1); + if (pfd[1].revents & POLLIN) { + wl_display_cancel_read(c->wl_display); + break; + } + } while (ret == -1 && errno == EINTR); + + pthread_exit(NULL); +} + +static void +threading_stop_pollers(void) +{ + struct client *c = client_connect(); + pthread_t th1, th2, th3; + + register_reading(c->wl_display); + + th1 = create_thread(c, thread_prepare_and_poll); + th2 = create_thread(c, thread_prepare_and_poll); + th3 = create_thread(c, thread_prepare_and_poll); + + test_set_timeout(3); + wl_display_cancel_read(c->wl_display); + wl_display_stop(c->wl_display, 1); + pthread_join(th1, NULL); + pthread_join(th2, NULL); + pthread_join(th3, NULL); + + client_disconnect(c); +} + +TEST(threading_stop_pollers_tst) +{ + struct display *d = display_create(); + client_create_noarg(d, threading_stop_pollers); + + display_run(d); + + display_destroy(d); +} + +static void * +thread_dispatch_loop(void *data) +{ + struct client *c = data; + int ret = 0; + + c->display_stopped = 1; + + do { + ret = wl_display_dispatch(c->wl_display); + } while (ret == 0); + + pthread_exit(NULL); +} + +static void +threading_stop_dispatch_loop(void) +{ + struct client *c = client_connect(); + pthread_t th1, th2, th3; + + register_reading(c->wl_display); + + th1 = create_thread(c, thread_dispatch_loop); + th2 = create_thread(c, thread_dispatch_loop); + th3 = create_thread(c, thread_dispatch_loop); + + test_set_timeout(3); + wl_display_cancel_read(c->wl_display); + wl_display_stop(c->wl_display, 1); + pthread_join(th1, NULL); + pthread_join(th2, NULL); + pthread_join(th3, NULL); + + client_disconnect(c); +} + +TEST(threading_stop_dispatch_loop_tst) +{ + struct display *d = display_create(); + client_create_noarg(d, threading_stop_dispatch_loop); + + display_run(d); + + display_destroy(d); +}