From 2af5a90c900cb7b66d441bdcab9d85f4604c0bea Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 13 Nov 2023 10:12:34 +0100 Subject: [PATCH] evl: add fds to pollfd from rt thread When we don't have the thread id yet, don't add the pollfds yet but wait until we do our first wait operation. Use flags for eventfd. We can use this to communicate between all kinds of threads with read/write. Use evl_init() in the init function, don't attach the main loop, just the thread that dos the first poll. --- spa/plugins/support/evl-system.c | 88 +++++++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 12 deletions(-) diff --git a/spa/plugins/support/evl-system.c b/spa/plugins/support/evl-system.c index 336b1080b..a5cb9fd5e 100644 --- a/spa/plugins/support/evl-system.c +++ b/spa/plugins/support/evl-system.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -32,6 +33,7 @@ struct poll_entry { int fd; uint32_t events; void *data; + unsigned attached:1; }; struct impl { @@ -45,6 +47,7 @@ struct impl { uint32_t n_xbuf; int attached; + pthread_t thread; int pid; }; @@ -110,6 +113,21 @@ static int impl_pollfd_create(void *object, int flags) return retval; } +static inline struct poll_entry *find_free(struct impl *impl) +{ + uint32_t i; + for (i = 0; i < impl->n_entries; i++) { + struct poll_entry *e = &impl->entries[i]; + if (e->fd == -1) + return e; + } + if (impl->n_entries == MAX_POLL) { + errno = ENOSPC; + return NULL; + } + return &impl->entries[impl->n_entries++]; +} + static inline struct poll_entry *find_entry(struct impl *impl, int pfd, int fd) { uint32_t i; @@ -121,20 +139,36 @@ static inline struct poll_entry *find_entry(struct impl *impl, int pfd, int fd) return NULL; } +static int attach_entry(struct impl *impl, struct poll_entry *e) +{ + if (!e->attached && e->fd != -1) { + int res; + + res = evl_add_pollfd(e->pfd, e->fd, e->events, evl_nil); + if (res < 0) + return res; + + e->attached = true; + } + return 0; +} + static int impl_pollfd_add(void *object, int pfd, int fd, uint32_t events, void *data) { struct impl *impl = object; struct poll_entry *e; + int res = 0; - if (impl->n_entries == MAX_POLL) - return -ENOSPC; + if ((e = find_free(impl)) == NULL) + return -errno; - e = &impl->entries[impl->n_entries++]; e->pfd = pfd; e->fd = fd; e->events = events; e->data = data; - return evl_add_pollfd(pfd, fd, e->events, evl_nil); + if (impl->attached != 0) + attach_entry(impl, e); + return res; } static int impl_pollfd_mod(void *object, int pfd, int fd, uint32_t events, void *data) @@ -162,6 +196,7 @@ static int impl_pollfd_del(void *object, int pfd, int fd) e->pfd = -1; e->fd = -1; + e->attached = false; return evl_del_pollfd(pfd, fd); } @@ -178,6 +213,12 @@ static int impl_pollfd_wait(void *object, int pfd, if (res < 0) return res; impl->attached = res; + impl->thread = pthread_self(); + + for (i = 0; i < (int)impl->n_entries; i++) { + struct poll_entry *e = &impl->entries[i]; + attach_entry(impl, e); + } } if (timeout == -1) { @@ -259,12 +300,14 @@ static int impl_eventfd_create(void *object, int flags) { struct impl *impl = object; int res, fl; + struct evl_flags flg; - fl = EVL_CLONE_PRIVATE; + fl = EVL_CLONE_PUBLIC; if (flags & SPA_FD_NONBLOCK) fl |= EVL_CLONE_NONBLOCK; - res = evl_create_xbuf(1024, 1024, fl, "xbuf-%d-%p-%d", impl->pid, impl, impl->n_xbuf); + res = evl_create_flags(&flg, EVL_CLOCK_MONOTONIC, 0, fl, + "flags-%d-%p-%d", impl->pid, impl, impl->n_xbuf); if (res < 0) return res; @@ -275,14 +318,35 @@ static int impl_eventfd_create(void *object, int flags) static int impl_eventfd_write(void *object, int fd, uint64_t count) { - if (write(fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) - return -errno; - return 0; + int res; + int flags = count; + struct impl *impl = object; + pthread_t tid = pthread_self(); + + if (impl->thread != tid) + res = write(fd, &flags, sizeof(flags)); + else + res = oob_write(fd, &flags, sizeof(flags)); + + if (res != sizeof(flags)) + res = -errno; + return res; } static int impl_eventfd_read(void *object, int fd, uint64_t *count) { - if (oob_read(fd, count, sizeof(uint64_t)) != sizeof(uint64_t)) + int res; + int flags; + struct impl *impl = object; + pthread_t tid = pthread_self(); + + if (impl->thread != tid) + res = read(fd, &flags, sizeof(flags)); + else + res = oob_read(fd, &flags, sizeof(flags)); + + *count = flags; + if (res != sizeof(flags)) return -errno; return 0; } @@ -400,12 +464,12 @@ impl_init(const struct spa_handle_factory *factory, impl->pid = getpid(); - if ((res = evl_attach_self("evl-system-%d-%p", impl->pid, impl)) < 0) { + if ((res = evl_init()) < 0) { spa_log_error(impl->log, NAME " %p: init failed: %s", impl, spa_strerror(res)); return res; } - spa_log_debug(impl->log, NAME " %p: initialized", impl); + spa_log_info(impl->log, NAME " %p: initialized", impl); return 0; }