From 4a219e81dd2f29147f1a395ede812cca422499d4 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 14 Jun 2017 16:10:07 +0200 Subject: [PATCH] loop: move to plugin Move the loop implementation to a plugin. Organize the hooks in a list so that we can add many. --- pipewire/client/loop.c | 645 ++----------------- pipewire/client/loop.h | 5 +- pipewire/client/pipewire.c | 67 +- pipewire/client/pipewire.h | 8 +- pipewire/client/thread-loop.c | 22 +- pipewire/client/type.c | 2 +- pipewire/modules/module-protocol-native.c | 10 +- spa/include/spa/loop.h | 22 +- spa/include/spa/plugin.h | 13 + spa/plugins/support/logger.c | 8 +- spa/plugins/support/loop.c | 748 ++++++++++++++++++++++ spa/plugins/support/mapper.c | 8 +- spa/plugins/support/meson.build | 1 + spa/plugins/support/plugin.c | 27 +- 14 files changed, 935 insertions(+), 651 deletions(-) create mode 100644 spa/plugins/support/loop.c diff --git a/pipewire/client/loop.c b/pipewire/client/loop.c index 137147b98..03fd14f3c 100644 --- a/pipewire/client/loop.c +++ b/pipewire/client/loop.c @@ -30,8 +30,9 @@ #include #include -#include +#include +#include #include #include @@ -39,616 +40,86 @@ /** \cond */ -struct invoke_item { - size_t item_size; - spa_invoke_func_t func; - uint32_t seq; - size_t size; - void *data; - void *user_data; -}; - struct impl { struct pw_loop this; - struct spa_list source_list; - struct spa_list destroy_list; - - spa_loop_hook_t pre_func; - spa_loop_hook_t post_func; - void *hook_data; - - int epoll_fd; - pthread_t thread; - - struct spa_loop loop; - struct spa_loop_control control; - struct spa_loop_utils utils; - - struct spa_source *event; - - struct spa_ringbuffer buffer; - uint8_t buffer_data[DATAS_SIZE]; -}; - -struct source_impl { - struct spa_source source; - - struct impl *impl; - struct spa_list link; - - bool close; - union { - spa_source_io_func_t io; - spa_source_idle_func_t idle; - spa_source_event_func_t event; - spa_source_timer_func_t timer; - spa_source_signal_func_t signal; - } func; - int signal_number; - bool enabled; + struct spa_handle *handle; }; /** \endcond */ -static inline uint32_t spa_io_to_epoll(enum spa_io mask) -{ - uint32_t events = 0; - - if (mask & SPA_IO_IN) - events |= EPOLLIN; - if (mask & SPA_IO_OUT) - events |= EPOLLOUT; - if (mask & SPA_IO_ERR) - events |= EPOLLERR; - if (mask & SPA_IO_HUP) - events |= EPOLLHUP; - - return events; -} - -static inline enum spa_io spa_epoll_to_io(uint32_t events) -{ - enum spa_io mask = 0; - - if (events & EPOLLIN) - mask |= SPA_IO_IN; - if (events & EPOLLOUT) - mask |= SPA_IO_OUT; - if (events & EPOLLHUP) - mask |= SPA_IO_HUP; - if (events & EPOLLERR) - mask |= SPA_IO_ERR; - - return mask; -} - -static int loop_add_source(struct spa_loop *loop, struct spa_source *source) -{ - struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); - - source->loop = loop; - - if (source->fd != -1) { - struct epoll_event ep; - - spa_zero(ep); - ep.events = spa_io_to_epoll(source->mask); - ep.data.ptr = source; - - if (epoll_ctl(impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0) - return SPA_RESULT_ERRNO; - } - return SPA_RESULT_OK; -} - -static int loop_update_source(struct spa_source *source) -{ - struct spa_loop *loop = source->loop; - struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); - - if (source->fd != -1) { - struct epoll_event ep; - - spa_zero(ep); - ep.events = spa_io_to_epoll(source->mask); - ep.data.ptr = source; - - if (epoll_ctl(impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0) - return SPA_RESULT_ERRNO; - } - return SPA_RESULT_OK; -} - -static void loop_remove_source(struct spa_source *source) -{ - struct spa_loop *loop = source->loop; - struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); - - if (source->fd != -1) - epoll_ctl(impl->epoll_fd, EPOLL_CTL_DEL, source->fd, NULL); - - source->loop = NULL; -} - -static int -loop_invoke(struct spa_loop *loop, - spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data) -{ - struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); - bool in_thread = pthread_equal(impl->thread, pthread_self()); - struct invoke_item *item; - int res; - - if (in_thread) { - res = func(loop, false, seq, size, data, user_data); - } else { - int32_t filled, avail; - uint32_t idx, offset, l0; - - filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); - if (filled < 0 || filled > impl->buffer.size) { - pw_log_warn("data-loop %p: queue xrun %d", impl, filled); - return SPA_RESULT_ERROR; - } - avail = impl->buffer.size - filled; - if (avail < sizeof(struct invoke_item)) { - pw_log_warn("data-loop %p: queue full %d", impl, avail); - return SPA_RESULT_ERROR; - } - offset = idx & impl->buffer.mask; - - l0 = offset + avail; - if (l0 > impl->buffer.size) - l0 = impl->buffer.size - l0; - - item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item); - item->func = func; - item->seq = seq; - item->size = size; - item->user_data = user_data; - - if (l0 > sizeof(struct invoke_item) + size) { - item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); - item->item_size = sizeof(struct invoke_item) + size; - if (l0 < sizeof(struct invoke_item) + item->item_size) - item->item_size = l0; - } else { - item->data = impl->buffer_data; - item->item_size = l0 + 1 + size; - } - memcpy(item->data, data, size); - - spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); - - pw_loop_signal_event(&impl->this, impl->event); - - if (seq != SPA_ID_INVALID) - res = SPA_RESULT_RETURN_ASYNC(seq); - else - res = SPA_RESULT_OK; - } - return res; -} - -static void event_func(struct spa_loop_utils *utils, struct spa_source *source, void *data) -{ - struct impl *impl = data; - uint32_t index; - - while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { - struct invoke_item *item = - SPA_MEMBER(impl->buffer_data, index & impl->buffer.mask, struct invoke_item); - item->func(impl->this.loop, true, item->seq, item->size, item->data, - item->user_data); - spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); - } -} - -static int loop_get_fd(struct spa_loop_control *ctrl) -{ - struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); - - return impl->epoll_fd; -} - -static void -loop_set_hooks(struct spa_loop_control *ctrl, - spa_loop_hook_t pre_func, spa_loop_hook_t post_func, void *data) -{ - struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); - - impl->pre_func = pre_func; - impl->post_func = post_func; - impl->hook_data = data; -} - -static void loop_enter(struct spa_loop_control *ctrl) -{ - struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); - impl->thread = pthread_self(); -} - -static void loop_leave(struct spa_loop_control *ctrl) -{ - struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); - impl->thread = 0; -} - -static int loop_iterate(struct spa_loop_control *ctrl, int timeout) -{ - struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); - struct pw_loop *loop = &impl->this; - struct epoll_event ep[32]; - int i, nfds, save_errno; - struct source_impl *source, *tmp; - - pw_signal_emit(&loop->before_iterate, loop); - - if (SPA_UNLIKELY(impl->pre_func)) - impl->pre_func(ctrl, impl->hook_data); - - if (SPA_UNLIKELY((nfds = epoll_wait(impl->epoll_fd, ep, SPA_N_ELEMENTS(ep), timeout)) < 0)) - save_errno = errno; - - if (SPA_UNLIKELY(impl->post_func)) - impl->post_func(ctrl, impl->hook_data); - - if (SPA_UNLIKELY(nfds < 0)) { - errno = save_errno; - return SPA_RESULT_ERRNO; - } - - /* first we set all the rmasks, then call the callbacks. The reason is that - * some callback might also want to look at other sources it manages and - * can then reset the rmask to suppress the callback */ - for (i = 0; i < nfds; i++) { - struct spa_source *s = ep[i].data.ptr; - s->rmask = spa_epoll_to_io(ep[i].events); - } - for (i = 0; i < nfds; i++) { - struct spa_source *s = ep[i].data.ptr; - if (s->rmask) { - s->func(s); - } - } - spa_list_for_each_safe(source, tmp, &impl->destroy_list, link) - free(source); - - spa_list_init(&impl->destroy_list); - - return SPA_RESULT_OK; -} - -static void source_io_func(struct spa_source *source) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - impl->func.io(&impl->impl->utils, source, source->fd, source->rmask, source->data); -} - -static struct spa_source *loop_add_io(struct spa_loop_utils *utils, - int fd, - enum spa_io mask, - bool close, spa_source_io_func_t func, void *data) -{ - struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); - struct source_impl *source; - - source = calloc(1, sizeof(struct source_impl)); - if (source == NULL) - return NULL; - - source->source.loop = &impl->loop; - source->source.func = source_io_func; - source->source.data = data; - source->source.fd = fd; - source->source.mask = mask; - source->impl = impl; - source->close = close; - source->func.io = func; - - spa_loop_add_source(&impl->loop, &source->source); - - spa_list_insert(&impl->source_list, &source->link); - - return &source->source; -} - -static int loop_update_io(struct spa_source *source, enum spa_io mask) -{ - source->mask = mask; - return spa_loop_update_source(source->loop, source); -} - - -static void source_idle_func(struct spa_source *source) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - impl->func.idle(&impl->impl->utils, source, source->data); -} - -static struct spa_source *loop_add_idle(struct spa_loop_utils *utils, - bool enabled, spa_source_idle_func_t func, void *data) -{ - struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); - struct source_impl *source; - - source = calloc(1, sizeof(struct source_impl)); - if (source == NULL) - return NULL; - - source->source.loop = &impl->loop; - source->source.func = source_idle_func; - source->source.data = data; - source->source.fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - source->impl = impl; - source->close = true; - source->source.mask = SPA_IO_IN; - source->func.idle = func; - - spa_loop_add_source(&impl->loop, &source->source); - - spa_list_insert(&impl->source_list, &source->link); - - if (enabled) - spa_loop_utils_enable_idle(&impl->utils, &source->source, true); - - return &source->source; -} - -static void loop_enable_idle(struct spa_source *source, bool enabled) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - uint64_t count; - - if (enabled && !impl->enabled) { - count = 1; - if (write(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) - pw_log_warn("loop %p: failed to write idle fd: %s", source, - strerror(errno)); - } else if (!enabled && impl->enabled) { - if (read(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) - pw_log_warn("loop %p: failed to read idle fd: %s", source, strerror(errno)); - } - impl->enabled = enabled; -} - -static void source_event_func(struct spa_source *source) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - uint64_t count; - - if (read(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) - pw_log_warn("loop %p: failed to read event fd: %s", source, strerror(errno)); - - impl->func.event(&impl->impl->utils, source, source->data); -} - -static struct spa_source *loop_add_event(struct spa_loop_utils *utils, - spa_source_event_func_t func, void *data) -{ - struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); - struct source_impl *source; - - source = calloc(1, sizeof(struct source_impl)); - if (source == NULL) - return NULL; - - source->source.loop = &impl->loop; - source->source.func = source_event_func; - source->source.data = data; - source->source.fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - source->source.mask = SPA_IO_IN; - source->impl = impl; - source->close = true; - source->func.event = func; - - spa_loop_add_source(&impl->loop, &source->source); - - spa_list_insert(&impl->source_list, &source->link); - - return &source->source; -} - -static void loop_signal_event(struct spa_source *source) -{ - uint64_t count = 1; - - if (write(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) - pw_log_warn("loop %p: failed to write event fd: %s", source, strerror(errno)); -} - -static void source_timer_func(struct spa_source *source) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - uint64_t expires; - - if (read(source->fd, &expires, sizeof(uint64_t)) != sizeof(uint64_t)) - pw_log_warn("loop %p: failed to read timer fd: %s", source, strerror(errno)); - - impl->func.timer(&impl->impl->utils, source, source->data); -} - -static struct spa_source *loop_add_timer(struct spa_loop_utils *utils, - spa_source_timer_func_t func, void *data) -{ - struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); - struct source_impl *source; - - source = calloc(1, sizeof(struct source_impl)); - if (source == NULL) - return NULL; - - source->source.loop = &impl->loop; - source->source.func = source_timer_func; - source->source.data = data; - source->source.fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); - source->source.mask = SPA_IO_IN; - source->impl = impl; - source->close = true; - source->func.timer = func; - - spa_loop_add_source(&impl->loop, &source->source); - - spa_list_insert(&impl->source_list, &source->link); - - return &source->source; -} - -static int -loop_update_timer(struct spa_source *source, - struct timespec *value, struct timespec *interval, bool absolute) -{ - struct itimerspec its; - int flags = 0; - - spa_zero(its); - if (value) { - its.it_value = *value; - } else if (interval) { - its.it_value = *interval; - absolute = true; - } - if (interval) - its.it_interval = *interval; - if (absolute) - flags |= TFD_TIMER_ABSTIME; - - if (timerfd_settime(source->fd, flags, &its, NULL) < 0) - return SPA_RESULT_ERRNO; - - return SPA_RESULT_OK; -} - -static void source_signal_func(struct spa_source *source) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - struct signalfd_siginfo signal_info; - - if (read(source->fd, &signal_info, sizeof(signal_info)) != sizeof(signal_info)) - pw_log_warn("loop %p: failed to read signal fd: %s", source, strerror(errno)); - - impl->func.signal(&impl->impl->utils, source, impl->signal_number, source->data); -} - -static struct spa_source *loop_add_signal(struct spa_loop_utils *utils, - int signal_number, - spa_source_signal_func_t func, void *data) -{ - struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); - struct source_impl *source; - sigset_t mask; - - source = calloc(1, sizeof(struct source_impl)); - if (source == NULL) - return NULL; - - source->source.loop = &impl->loop; - source->source.func = source_signal_func; - source->source.data = data; - sigemptyset(&mask); - sigaddset(&mask, signal_number); - source->source.fd = signalfd(-1, &mask, SFD_CLOEXEC | SFD_NONBLOCK); - sigprocmask(SIG_BLOCK, &mask, NULL); - source->source.mask = SPA_IO_IN; - source->impl = impl; - source->close = true; - source->func.signal = func; - source->signal_number = signal_number; - - spa_loop_add_source(&impl->loop, &source->source); - - spa_list_insert(&impl->source_list, &source->link); - - return &source->source; -} - -static void loop_destroy_source(struct spa_source *source) -{ - struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); - struct impl *loop_impl = SPA_CONTAINER_OF(source->loop, struct impl, loop); - - spa_list_remove(&impl->link); - - spa_loop_remove_source(source->loop, source); - - if (source->fd != -1 && impl->close) - close(source->fd); - - spa_list_insert(&loop_impl->destroy_list, &impl->link); -} - -static const struct spa_loop loop_impl = { - sizeof(struct spa_loop), - loop_add_source, - loop_update_source, - loop_remove_source, - loop_invoke, -}; - -static const struct spa_loop_control loop_control_impl = { - sizeof(struct spa_loop_control), - loop_get_fd, - loop_set_hooks, - loop_enter, - loop_leave, - loop_iterate, -}; - -static const struct spa_loop_utils loop_utils_impl = { - sizeof(struct spa_loop_utils), - loop_add_io, - loop_update_io, - loop_add_idle, - loop_enable_idle, - loop_add_event, - loop_signal_event, - loop_add_timer, - loop_update_timer, - loop_add_signal, - loop_destroy_source, -}; - /** Create a new loop * \returns a newly allocated loop * \memberof pw_loop */ struct pw_loop *pw_loop_new(void) { + int res; struct impl *impl; struct pw_loop *this; + const struct spa_handle_factory *factory; + struct spa_type_map *map; + void *iface; + const struct spa_support *support; + uint32_t n_support; - impl = calloc(1, sizeof(struct impl)); + support = pw_get_support(&n_support); + if (support == NULL) + return NULL; + + map = spa_support_find(support, n_support, SPA_TYPE__TypeMap); + if (map == NULL) + return NULL; + + factory = pw_get_support_factory("loop"); + if (factory == NULL) + return NULL; + + impl = calloc(1, sizeof(struct impl) + factory->size); if (impl == NULL) return NULL; + impl->handle = SPA_MEMBER(impl, sizeof(struct impl), struct spa_handle); + this = &impl->this; - impl->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (impl->epoll_fd == -1) - goto no_epoll; - - spa_list_init(&impl->source_list); - spa_list_init(&impl->destroy_list); - - pw_signal_init(&this->before_iterate); pw_signal_init(&this->destroy_signal); - impl->loop = loop_impl; - this->loop = &impl->loop; + if ((res = spa_handle_factory_init(factory, + impl->handle, + NULL, + support, + n_support)) < 0) { + fprintf(stderr, "can't make factory instance: %d\n", res); + goto failed; + } - impl->control = loop_control_impl; - this->control = &impl->control; + if ((res = spa_handle_get_interface(impl->handle, + spa_type_map_get_id(map, SPA_TYPE__Loop), + &iface)) < 0) { + fprintf(stderr, "can't get %s interface %d\n", SPA_TYPE__Loop, res); + goto failed; + } + this->loop = iface; - impl->utils = loop_utils_impl; - this->utils = &impl->utils; + if ((res = spa_handle_get_interface(impl->handle, + spa_type_map_get_id(map, SPA_TYPE__LoopControl), + &iface)) < 0) { + fprintf(stderr, "can't get %s interface %d\n", SPA_TYPE__LoopControl, res); + goto failed; + } + this->control = iface; - spa_ringbuffer_init(&impl->buffer, DATAS_SIZE); - - impl->event = spa_loop_utils_add_event(&impl->utils, event_func, impl); + if ((res = spa_handle_get_interface(impl->handle, + spa_type_map_get_id(map, SPA_TYPE__LoopUtils), + &iface)) < 0) { + fprintf(stderr, "can't get %s interface %d\n", SPA_TYPE__LoopUtils, res); + goto failed; + } + this->utils = iface; return this; - no_epoll: + failed: free(impl); return NULL; } @@ -660,15 +131,9 @@ struct pw_loop *pw_loop_new(void) void pw_loop_destroy(struct pw_loop *loop) { struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, this); - struct source_impl *source, *tmp; pw_signal_emit(&loop->destroy_signal, loop); - spa_list_for_each_safe(source, tmp, &impl->source_list, link) - loop_destroy_source(&source->source); - spa_list_for_each_safe(source, tmp, &impl->destroy_list, link) - free(source); - - close(impl->epoll_fd); + spa_handle_clear(impl->handle); free(impl); } diff --git a/pipewire/client/loop.h b/pipewire/client/loop.h index 39f82a78b..417b40a5f 100644 --- a/pipewire/client/loop.h +++ b/pipewire/client/loop.h @@ -26,6 +26,7 @@ extern "C" { #include #include + #include /** \class pw_loop @@ -39,8 +40,6 @@ struct pw_loop { struct spa_loop_control *control; /**< loop control */ struct spa_loop_utils *utils; /**< loop utils */ - /** Emited before the loop iteration starts */ - PW_SIGNAL(before_iterate, (struct pw_listener *listener, struct pw_loop *loop)); /** Emited when the loop is destroyed */ PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_loop *loop)); }; @@ -57,7 +56,7 @@ pw_loop_destroy(struct pw_loop *loop); #define pw_loop_invoke(l,...) spa_loop_invoke((l)->loop,__VA_ARGS__) #define pw_loop_get_fd(l) spa_loop_control_get_fd((l)->control) -#define pw_loop_set_hooks(l,...) spa_loop_control_set_hooks((l)->control,__VA_ARGS__) +#define pw_loop_add_hooks(l,...) spa_loop_control_add_hooks((l)->control,__VA_ARGS__) #define pw_loop_enter(l) spa_loop_control_enter((l)->control) #define pw_loop_iterate(l,...) spa_loop_control_iterate((l)->control,__VA_ARGS__) #define pw_loop_leave(l) spa_loop_control_leave((l)->control) diff --git a/pipewire/client/pipewire.c b/pipewire/client/pipewire.c index c1afdc9cd..6fe699fcc 100644 --- a/pipewire/client/pipewire.c +++ b/pipewire/client/pipewire.c @@ -36,17 +36,6 @@ static struct support_info { uint32_t n_support; } support_info; -static void * find_support(struct support_info *info, const char *type) -{ - int i; - - for (i = 0; i < info->n_support; i++) { - if (strcmp(info->support->type, type) == 0) - return info->support->data; - } - return NULL; -} - static bool open_support(const char *lib, struct support_info *info) @@ -73,23 +62,14 @@ load_interface(struct support_info *info, { int res; struct spa_handle *handle; - uint32_t index, type_id; + uint32_t type_id; const struct spa_handle_factory *factory; void *iface; struct spa_type_map *map = NULL; - map = find_support(info, SPA_TYPE__TypeMap); - type_id = map ? spa_type_map_get_id(map, type) : 0; - - for (index = 0;; index++) { - if ((res = info->enum_func(&factory, index)) < 0) { - if (res != SPA_RESULT_ENUM_END) - fprintf(stderr, "can't enumerate factories: %d\n", res); - goto enum_failed; - } - if (strcmp(factory->name, factory_name) == 0) - break; - } + factory = pw_get_support_factory(factory_name); + if (factory == NULL) + goto not_found; handle = calloc(1, factory->size); if ((res = spa_handle_factory_init(factory, @@ -97,6 +77,10 @@ load_interface(struct support_info *info, fprintf(stderr, "can't make factory instance: %d\n", res); goto init_failed; } + + map = pw_get_support_interface(SPA_TYPE__TypeMap); + type_id = map ? spa_type_map_get_id(map, type) : 0; + if ((res = spa_handle_get_interface(handle, type_id, &iface)) < 0) { fprintf(stderr, "can't get %s interface %d\n", type, res); goto interface_failed; @@ -107,7 +91,7 @@ load_interface(struct support_info *info, spa_handle_clear(handle); init_failed: free(handle); - enum_failed: + not_found: return NULL; } @@ -144,11 +128,40 @@ static void configure_support(struct support_info *info) * \param type the interface type * \return the interface or NULL when not configured */ -void *pw_get_support(const char *type) +void *pw_get_support_interface(const char *type) { - return find_support(&support_info, type); + int i; + + for (i = 0; i < support_info.n_support; i++) { + if (strcmp(support_info.support->type, type) == 0) + return support_info.support->data; + } + return NULL; } +const struct spa_handle_factory *pw_get_support_factory(const char *factory_name) +{ + int res; + uint32_t index; + const struct spa_handle_factory *factory; + + for (index = 0;; index++) { + if ((res = support_info.enum_func(&factory, index)) < 0) { + if (res != SPA_RESULT_ENUM_END) + fprintf(stderr, "can't enumerate factories: %d\n", res); + break; + } + if (strcmp(factory->name, factory_name) == 0) + return factory; + } + return NULL; +} + +const struct spa_support *pw_get_support(uint32_t *n_support) +{ + *n_support = support_info.n_support; + return support_info.support; +} /** Initialize PipeWire * diff --git a/pipewire/client/pipewire.h b/pipewire/client/pipewire.h index 93c92b710..cabda2bd4 100644 --- a/pipewire/client/pipewire.h +++ b/pipewire/client/pipewire.h @@ -120,7 +120,13 @@ enum pw_direction pw_direction_reverse(enum pw_direction direction); void * -pw_get_support(const char *type); +pw_get_support_interface(const char *type); + +const struct spa_handle_factory * +pw_get_support_factory(const char *factory_name); + +const struct spa_support * +pw_get_support(uint32_t *n_support); #ifdef __cplusplus } diff --git a/pipewire/client/thread-loop.c b/pipewire/client/thread-loop.c index 646295a35..be0257633 100644 --- a/pipewire/client/thread-loop.c +++ b/pipewire/client/thread-loop.c @@ -35,6 +35,9 @@ struct thread_loop { bool running; pthread_t thread; + struct spa_loop_control_hooks hooks; + const struct spa_loop_control_hooks *old; + struct spa_source *event; int n_waiting; @@ -42,18 +45,24 @@ struct thread_loop { }; /** \endcond */ -static void pre_hook(struct spa_loop_control *ctrl, void *data) +static void before(const struct spa_loop_control_hooks *hooks) { - struct thread_loop *impl = data; + struct thread_loop *impl = SPA_CONTAINER_OF(hooks, struct thread_loop, hooks); pthread_mutex_unlock(&impl->lock); } -static void post_hook(struct spa_loop_control *ctrl, void *data) +static void after(const struct spa_loop_control_hooks *hooks) { - struct thread_loop *impl = data; + struct thread_loop *impl = SPA_CONTAINER_OF(hooks, struct thread_loop, hooks); pthread_mutex_lock(&impl->lock); } +static const struct spa_loop_control_hooks impl_hooks = { + { NULL, }, + before, + after, +}; + static void do_stop(struct spa_loop_utils *utils, struct spa_source *source, void *data) { struct thread_loop *impl = data; @@ -90,7 +99,8 @@ struct pw_thread_loop *pw_thread_loop_new(struct pw_loop *loop, const char *name this->loop = loop; this->name = name ? strdup(name) : NULL; - pw_loop_set_hooks(loop, pre_hook, post_hook, impl); + impl->hooks = impl_hooks; + pw_loop_add_hooks(loop, &impl->hooks); pw_signal_init(&this->destroy_signal); @@ -120,6 +130,8 @@ void pw_thread_loop_destroy(struct pw_thread_loop *loop) pthread_cond_destroy(&impl->cond); pthread_cond_destroy(&impl->accept_cond); + spa_list_remove(&impl->hooks.link); + free(impl); } diff --git a/pipewire/client/type.c b/pipewire/client/type.c index dd0e90756..f2f753ba0 100644 --- a/pipewire/client/type.c +++ b/pipewire/client/type.c @@ -37,7 +37,7 @@ */ void pw_type_init(struct pw_type *type) { - type->map = pw_get_support(SPA_TYPE__TypeMap); + type->map = pw_get_support_interface(SPA_TYPE__TypeMap); type->core = spa_type_map_get_id(type->map, PIPEWIRE_TYPE__Core); type->registry = spa_type_map_get_id(type->map, PIPEWIRE_TYPE__Registry); diff --git a/pipewire/modules/module-protocol-native.c b/pipewire/modules/module-protocol-native.c index 2667e6acb..cde6ca003 100644 --- a/pipewire/modules/module-protocol-native.c +++ b/pipewire/modules/module-protocol-native.c @@ -75,7 +75,7 @@ struct impl { struct spa_list socket_list; struct spa_list client_list; - struct pw_listener before_iterate; + struct spa_loop_control_hooks hooks; }; struct native_client { @@ -166,9 +166,9 @@ on_busy_changed(struct pw_listener *listener, } -static void on_before_iterate(struct pw_listener *listener, struct pw_loop *loop) +static void on_before_hook(const struct spa_loop_control_hooks *hooks) { - struct impl *this = SPA_CONTAINER_OF(listener, struct impl, before_iterate); + struct impl *this = SPA_CONTAINER_OF(hooks, struct impl, hooks); struct native_client *client, *tmp; spa_list_for_each_safe(client, tmp, &this->client_list, link) @@ -430,8 +430,8 @@ static struct impl *pw_protocol_native_new(struct pw_core *core, struct pw_prope if (!add_socket(impl, s)) goto error; - pw_signal_add(&impl->core->main_loop->loop->before_iterate, - &impl->before_iterate, on_before_iterate); + impl->hooks.before = on_before_hook; + pw_loop_add_hooks(impl->core->main_loop->loop, &impl->hooks); return impl; diff --git a/spa/include/spa/loop.h b/spa/include/spa/loop.h index d653c2c05..dcfda7ba0 100644 --- a/spa/include/spa/loop.h +++ b/spa/include/spa/loop.h @@ -37,6 +37,7 @@ struct spa_loop_utils; #define SPA_TYPE_LOOP__DataLoop SPA_TYPE_LOOP_BASE "DataLoop" #include +#include enum spa_io { SPA_IO_IN = (1 << 0), @@ -95,7 +96,14 @@ struct spa_loop { #define spa_loop_remove_source(l,...) (l)->remove_source(__VA_ARGS__) #define spa_loop_invoke(l,...) (l)->invoke((l),__VA_ARGS__) -typedef void (*spa_loop_hook_t) (struct spa_loop_control *ctrl, void *data); +/** Control hooks */ +struct spa_loop_control_hooks { + struct spa_list link; + /** Executed right before waiting for events */ + void (*before) (const struct spa_loop_control_hooks *hooks); + /** Executed right after waiting for events */ + void (*after) (const struct spa_loop_control_hooks *hooks); +}; /** * spa_loop_control: @@ -109,10 +117,12 @@ struct spa_loop_control { int (*get_fd) (struct spa_loop_control *ctrl); - void (*set_hooks) (struct spa_loop_control *ctrl, - spa_loop_hook_t pre_hook, - spa_loop_hook_t post_hook, - void *data); + /** Add a hook + * \param ctrl the control to change + * \param hooks the new hooks + * \param old location to store previous hooks */ + void (*add_hooks) (struct spa_loop_control *ctrl, + struct spa_loop_control_hooks *hooks); void (*enter) (struct spa_loop_control *ctrl); void (*leave) (struct spa_loop_control *ctrl); @@ -121,7 +131,7 @@ struct spa_loop_control { }; #define spa_loop_control_get_fd(l) (l)->get_fd(l) -#define spa_loop_control_set_hooks(l,...) (l)->set_hooks((l),__VA_ARGS__) +#define spa_loop_control_add_hooks(l,...) (l)->add_hooks((l),__VA_ARGS__) #define spa_loop_control_enter(l) (l)->enter(l) #define spa_loop_control_iterate(l,...) (l)->iterate((l),__VA_ARGS__) #define spa_loop_control_leave(l) (l)->leave(l) diff --git a/spa/include/spa/plugin.h b/spa/include/spa/plugin.h index 83ac1143b..f1b61e339 100644 --- a/spa/include/spa/plugin.h +++ b/spa/include/spa/plugin.h @@ -85,6 +85,17 @@ struct spa_support { void *data; }; +static inline void *spa_support_find(const struct spa_support *support, + uint32_t n_support, + const char *type) +{ + uint32_t i; + for (i = 0; i < n_support; i++) + if (strcmp(support->type, type) == 0) + return support->data; + return NULL; +} + #define SPA_SUPPORT_INIT(type,data) (struct spa_support) { (type), (data) } struct spa_handle_factory { @@ -182,6 +193,8 @@ typedef int (*spa_handle_factory_enum_func_t) (const struct spa_handle_factory * */ int spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t index); +void spa_handle_factory_register(const struct spa_handle_factory *factory); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/plugins/support/logger.c b/spa/plugins/support/logger.c index b99b52a00..da2fc8279 100644 --- a/spa/plugins/support/logger.c +++ b/spa/plugins/support/logger.c @@ -255,10 +255,16 @@ impl_enum_interface_info(const struct spa_handle_factory *factory, return SPA_RESULT_OK; } -const struct spa_handle_factory spa_logger_factory = { +static const struct spa_handle_factory logger_factory = { NAME, NULL, sizeof(struct impl), impl_init, impl_enum_interface_info, }; + +static void reg(void) __attribute__ ((constructor)); +static void reg(void) +{ + spa_handle_factory_register(&logger_factory); +} diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c new file mode 100644 index 000000000..248569009 --- /dev/null +++ b/spa/plugins/support/loop.c @@ -0,0 +1,748 @@ +/* PipeWire + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#define NAME "loop" + +#define DATAS_SIZE (4096 * 8) + +/** \cond */ + +struct invoke_item { + size_t item_size; + spa_invoke_func_t func; + uint32_t seq; + size_t size; + void *data; + void *user_data; +}; + +struct type { + uint32_t loop; + uint32_t loop_control; + uint32_t loop_utils; +}; + +static inline void init_type(struct type *type, struct spa_type_map *map) +{ + type->loop = spa_type_map_get_id(map, SPA_TYPE__Loop); + type->loop_control = spa_type_map_get_id(map, SPA_TYPE__LoopControl); + type->loop_utils = spa_type_map_get_id(map, SPA_TYPE__LoopUtils); +} + +struct impl { + struct spa_handle handle; + struct spa_loop loop; + struct spa_loop_control control; + struct spa_loop_utils utils; + + struct spa_log *log; + struct type type; + struct spa_type_map *map; + + struct spa_list source_list; + struct spa_list destroy_list; + struct spa_list hooks_list; + + int epoll_fd; + pthread_t thread; + + struct spa_source *event; + + struct spa_ringbuffer buffer; + uint8_t buffer_data[DATAS_SIZE]; +}; + +struct source_impl { + struct spa_source source; + + struct impl *impl; + struct spa_list link; + + bool close; + union { + spa_source_io_func_t io; + spa_source_idle_func_t idle; + spa_source_event_func_t event; + spa_source_timer_func_t timer; + spa_source_signal_func_t signal; + } func; + int signal_number; + bool enabled; +}; +/** \endcond */ + +static inline uint32_t spa_io_to_epoll(enum spa_io mask) +{ + uint32_t events = 0; + + if (mask & SPA_IO_IN) + events |= EPOLLIN; + if (mask & SPA_IO_OUT) + events |= EPOLLOUT; + if (mask & SPA_IO_ERR) + events |= EPOLLERR; + if (mask & SPA_IO_HUP) + events |= EPOLLHUP; + + return events; +} + +static inline enum spa_io spa_epoll_to_io(uint32_t events) +{ + enum spa_io mask = 0; + + if (events & EPOLLIN) + mask |= SPA_IO_IN; + if (events & EPOLLOUT) + mask |= SPA_IO_OUT; + if (events & EPOLLHUP) + mask |= SPA_IO_HUP; + if (events & EPOLLERR) + mask |= SPA_IO_ERR; + + return mask; +} + +static int loop_add_source(struct spa_loop *loop, struct spa_source *source) +{ + struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); + + source->loop = loop; + + if (source->fd != -1) { + struct epoll_event ep; + + spa_zero(ep); + ep.events = spa_io_to_epoll(source->mask); + ep.data.ptr = source; + + if (epoll_ctl(impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0) + return SPA_RESULT_ERRNO; + } + return SPA_RESULT_OK; +} + +static int loop_update_source(struct spa_source *source) +{ + struct spa_loop *loop = source->loop; + struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); + + if (source->fd != -1) { + struct epoll_event ep; + + spa_zero(ep); + ep.events = spa_io_to_epoll(source->mask); + ep.data.ptr = source; + + if (epoll_ctl(impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0) + return SPA_RESULT_ERRNO; + } + return SPA_RESULT_OK; +} + +static void loop_remove_source(struct spa_source *source) +{ + struct spa_loop *loop = source->loop; + struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); + + if (source->fd != -1) + epoll_ctl(impl->epoll_fd, EPOLL_CTL_DEL, source->fd, NULL); + + source->loop = NULL; +} + +static int +loop_invoke(struct spa_loop *loop, + spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop); + bool in_thread = pthread_equal(impl->thread, pthread_self()); + struct invoke_item *item; + int res; + + if (in_thread) { + res = func(loop, false, seq, size, data, user_data); + } else { + int32_t filled, avail; + uint32_t idx, offset, l0; + + filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); + if (filled < 0 || filled > impl->buffer.size) { + spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled); + return SPA_RESULT_ERROR; + } + avail = impl->buffer.size - filled; + if (avail < sizeof(struct invoke_item)) { + spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail); + return SPA_RESULT_ERROR; + } + offset = idx & impl->buffer.mask; + + l0 = offset + avail; + if (l0 > impl->buffer.size) + l0 = impl->buffer.size - l0; + + item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item); + item->func = func; + item->seq = seq; + item->size = size; + item->user_data = user_data; + + if (l0 > sizeof(struct invoke_item) + size) { + item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void); + item->item_size = sizeof(struct invoke_item) + size; + if (l0 < sizeof(struct invoke_item) + item->item_size) + item->item_size = l0; + } else { + item->data = impl->buffer_data; + item->item_size = l0 + 1 + size; + } + memcpy(item->data, data, size); + + spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size); + + spa_loop_utils_signal_event(&impl->utils, impl->event); + + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC(seq); + else + res = SPA_RESULT_OK; + } + return res; +} + +static void event_func(struct spa_loop_utils *utils, struct spa_source *source, void *data) +{ + struct impl *impl = data; + uint32_t index; + + while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) { + struct invoke_item *item = + SPA_MEMBER(impl->buffer_data, index & impl->buffer.mask, struct invoke_item); + item->func(&impl->loop, true, item->seq, item->size, item->data, + item->user_data); + spa_ringbuffer_read_update(&impl->buffer, index + item->item_size); + } +} + +static int loop_get_fd(struct spa_loop_control *ctrl) +{ + struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); + + return impl->epoll_fd; +} + +static void +loop_add_hooks(struct spa_loop_control *ctrl, + struct spa_loop_control_hooks *hooks) +{ + struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); + + spa_list_insert(impl->hooks_list.prev, &hooks->link); +} + +static void loop_enter(struct spa_loop_control *ctrl) +{ + struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); + impl->thread = pthread_self(); +} + +static void loop_leave(struct spa_loop_control *ctrl) +{ + struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); + impl->thread = 0; +} + +static int loop_iterate(struct spa_loop_control *ctrl, int timeout) +{ + struct impl *impl = SPA_CONTAINER_OF(ctrl, struct impl, control); + struct epoll_event ep[32]; + int i, nfds, save_errno = 0; + struct source_impl *source, *tmp; + struct spa_loop_control_hooks *hooks, *th; + + spa_list_for_each_safe(hooks, th, &impl->hooks_list, link) + if (hooks->before) + hooks->before(hooks); + + if (SPA_UNLIKELY((nfds = epoll_wait(impl->epoll_fd, ep, SPA_N_ELEMENTS(ep), timeout)) < 0)) + save_errno = errno; + + spa_list_for_each_safe(hooks, th, &impl->hooks_list, link) + if (hooks->after) + hooks->after(hooks); + + if (SPA_UNLIKELY(nfds < 0)) { + errno = save_errno; + return SPA_RESULT_ERRNO; + } + + /* first we set all the rmasks, then call the callbacks. The reason is that + * some callback might also want to look at other sources it manages and + * can then reset the rmask to suppress the callback */ + for (i = 0; i < nfds; i++) { + struct spa_source *s = ep[i].data.ptr; + s->rmask = spa_epoll_to_io(ep[i].events); + } + for (i = 0; i < nfds; i++) { + struct spa_source *s = ep[i].data.ptr; + if (s->rmask) { + s->func(s); + } + } + spa_list_for_each_safe(source, tmp, &impl->destroy_list, link) + free(source); + + spa_list_init(&impl->destroy_list); + + return SPA_RESULT_OK; +} + +static void source_io_func(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + impl->func.io(&impl->impl->utils, source, source->fd, source->rmask, source->data); +} + +static struct spa_source *loop_add_io(struct spa_loop_utils *utils, + int fd, + enum spa_io mask, + bool close, spa_source_io_func_t func, void *data) +{ + struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); + struct source_impl *source; + + source = calloc(1, sizeof(struct source_impl)); + if (source == NULL) + return NULL; + + source->source.loop = &impl->loop; + source->source.func = source_io_func; + source->source.data = data; + source->source.fd = fd; + source->source.mask = mask; + source->impl = impl; + source->close = close; + source->func.io = func; + + spa_loop_add_source(&impl->loop, &source->source); + + spa_list_insert(&impl->source_list, &source->link); + + return &source->source; +} + +static int loop_update_io(struct spa_source *source, enum spa_io mask) +{ + source->mask = mask; + return spa_loop_update_source(source->loop, source); +} + + +static void source_idle_func(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + impl->func.idle(&impl->impl->utils, source, source->data); +} + +static struct spa_source *loop_add_idle(struct spa_loop_utils *utils, + bool enabled, spa_source_idle_func_t func, void *data) +{ + struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); + struct source_impl *source; + + source = calloc(1, sizeof(struct source_impl)); + if (source == NULL) + return NULL; + + source->source.loop = &impl->loop; + source->source.func = source_idle_func; + source->source.data = data; + source->source.fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + source->impl = impl; + source->close = true; + source->source.mask = SPA_IO_IN; + source->func.idle = func; + + spa_loop_add_source(&impl->loop, &source->source); + + spa_list_insert(&impl->source_list, &source->link); + + if (enabled) + spa_loop_utils_enable_idle(&impl->utils, &source->source, true); + + return &source->source; +} + +static void loop_enable_idle(struct spa_source *source, bool enabled) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + uint64_t count; + + if (enabled && !impl->enabled) { + count = 1; + if (write(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) + spa_log_warn(impl->impl->log, NAME " %p: failed to write idle fd: %s", source, + strerror(errno)); + } else if (!enabled && impl->enabled) { + if (read(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) + spa_log_warn(impl->impl->log, NAME " %p: failed to read idle fd: %s", source, strerror(errno)); + } + impl->enabled = enabled; +} + +static void source_event_func(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + uint64_t count; + + if (read(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) + spa_log_warn(impl->impl->log, NAME " %p: failed to read event fd: %s", source, strerror(errno)); + + impl->func.event(&impl->impl->utils, source, source->data); +} + +static struct spa_source *loop_add_event(struct spa_loop_utils *utils, + spa_source_event_func_t func, void *data) +{ + struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); + struct source_impl *source; + + source = calloc(1, sizeof(struct source_impl)); + if (source == NULL) + return NULL; + + source->source.loop = &impl->loop; + source->source.func = source_event_func; + source->source.data = data; + source->source.fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + source->source.mask = SPA_IO_IN; + source->impl = impl; + source->close = true; + source->func.event = func; + + spa_loop_add_source(&impl->loop, &source->source); + + spa_list_insert(&impl->source_list, &source->link); + + return &source->source; +} + +static void loop_signal_event(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + uint64_t count = 1; + + if (write(source->fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) + spa_log_warn(impl->impl->log, NAME " %p: failed to write event fd: %s", source, strerror(errno)); +} + +static void source_timer_func(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + uint64_t expires; + + if (read(source->fd, &expires, sizeof(uint64_t)) != sizeof(uint64_t)) + spa_log_warn(impl->impl->log, NAME " %p: failed to read timer fd: %s", source, strerror(errno)); + + impl->func.timer(&impl->impl->utils, source, source->data); +} + +static struct spa_source *loop_add_timer(struct spa_loop_utils *utils, + spa_source_timer_func_t func, void *data) +{ + struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); + struct source_impl *source; + + source = calloc(1, sizeof(struct source_impl)); + if (source == NULL) + return NULL; + + source->source.loop = &impl->loop; + source->source.func = source_timer_func; + source->source.data = data; + source->source.fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); + source->source.mask = SPA_IO_IN; + source->impl = impl; + source->close = true; + source->func.timer = func; + + spa_loop_add_source(&impl->loop, &source->source); + + spa_list_insert(&impl->source_list, &source->link); + + return &source->source; +} + +static int +loop_update_timer(struct spa_source *source, + struct timespec *value, struct timespec *interval, bool absolute) +{ + struct itimerspec its; + int flags = 0; + + spa_zero(its); + if (value) { + its.it_value = *value; + } else if (interval) { + its.it_value = *interval; + absolute = true; + } + if (interval) + its.it_interval = *interval; + if (absolute) + flags |= TFD_TIMER_ABSTIME; + + if (timerfd_settime(source->fd, flags, &its, NULL) < 0) + return SPA_RESULT_ERRNO; + + return SPA_RESULT_OK; +} + +static void source_signal_func(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + struct signalfd_siginfo signal_info; + + if (read(source->fd, &signal_info, sizeof(signal_info)) != sizeof(signal_info)) + spa_log_warn(impl->impl->log, NAME " %p: failed to read signal fd: %s", source, strerror(errno)); + + impl->func.signal(&impl->impl->utils, source, impl->signal_number, source->data); +} + +static struct spa_source *loop_add_signal(struct spa_loop_utils *utils, + int signal_number, + spa_source_signal_func_t func, void *data) +{ + struct impl *impl = SPA_CONTAINER_OF(utils, struct impl, utils); + struct source_impl *source; + sigset_t mask; + + source = calloc(1, sizeof(struct source_impl)); + if (source == NULL) + return NULL; + + source->source.loop = &impl->loop; + source->source.func = source_signal_func; + source->source.data = data; + sigemptyset(&mask); + sigaddset(&mask, signal_number); + source->source.fd = signalfd(-1, &mask, SFD_CLOEXEC | SFD_NONBLOCK); + sigprocmask(SIG_BLOCK, &mask, NULL); + source->source.mask = SPA_IO_IN; + source->impl = impl; + source->close = true; + source->func.signal = func; + source->signal_number = signal_number; + + spa_loop_add_source(&impl->loop, &source->source); + + spa_list_insert(&impl->source_list, &source->link); + + return &source->source; +} + +static void loop_destroy_source(struct spa_source *source) +{ + struct source_impl *impl = SPA_CONTAINER_OF(source, struct source_impl, source); + struct impl *loop_impl = SPA_CONTAINER_OF(source->loop, struct impl, loop); + + spa_list_remove(&impl->link); + + spa_loop_remove_source(source->loop, source); + + if (source->fd != -1 && impl->close) + close(source->fd); + + spa_list_insert(&loop_impl->destroy_list, &impl->link); +} + +static const struct spa_loop impl_loop = { + sizeof(struct spa_loop), + loop_add_source, + loop_update_source, + loop_remove_source, + loop_invoke, +}; + +static const struct spa_loop_control impl_loop_control = { + sizeof(struct spa_loop_control), + loop_get_fd, + loop_add_hooks, + loop_enter, + loop_leave, + loop_iterate, +}; + +static const struct spa_loop_utils impl_loop_utils = { + sizeof(struct spa_loop_utils), + loop_add_io, + loop_update_io, + loop_add_idle, + loop_enable_idle, + loop_add_event, + loop_signal_event, + loop_add_timer, + loop_update_timer, + loop_add_signal, + loop_destroy_source, +}; + +static int impl_get_interface(struct spa_handle *handle, uint32_t interface_id, void **interface) +{ + struct impl *impl; + + spa_return_val_if_fail(handle != NULL, SPA_RESULT_INVALID_ARGUMENTS); + spa_return_val_if_fail(interface != NULL, SPA_RESULT_INVALID_ARGUMENTS); + + impl = (struct impl *) handle; + + if (interface_id == impl->type.loop) + *interface = &impl->loop; + else if (interface_id == impl->type.loop_control) + *interface = &impl->control; + else if (interface_id == impl->type.loop_utils) + *interface = &impl->utils; + else + return SPA_RESULT_UNKNOWN_INTERFACE; + + return SPA_RESULT_OK; +} + +static int impl_clear(struct spa_handle *handle) +{ + struct impl *impl; + struct source_impl *source, *tmp; + + spa_return_val_if_fail(handle != NULL, SPA_RESULT_INVALID_ARGUMENTS); + + impl = (struct impl *) handle; + + spa_list_for_each_safe(source, tmp, &impl->source_list, link) + loop_destroy_source(&source->source); + spa_list_for_each_safe(source, tmp, &impl->destroy_list, link) + free(source); + + close(impl->epoll_fd); + + return SPA_RESULT_OK; +} + +static int +impl_init(const struct spa_handle_factory *factory, + struct spa_handle *handle, + const struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + struct impl *impl; + uint32_t i; + + spa_return_val_if_fail(factory != NULL, SPA_RESULT_INVALID_ARGUMENTS); + spa_return_val_if_fail(handle != NULL, SPA_RESULT_INVALID_ARGUMENTS); + + handle->get_interface = impl_get_interface; + handle->clear = impl_clear; + + impl = (struct impl *) handle; + impl->loop = impl_loop; + impl->control = impl_loop_control; + impl->utils = impl_loop_utils; + + for (i = 0; i < n_support; i++) { + if (strcmp(support[i].type, SPA_TYPE__TypeMap) == 0) + impl->map = support[i].data; + else if (strcmp(support[i].type, SPA_TYPE__Log) == 0) + impl->log = support[i].data; + } + if (impl->map == NULL) { + spa_log_error(impl->log, NAME " %p: a type-map is needed", impl); + return SPA_RESULT_ERROR; + } + init_type(&impl->type, impl->map); + + impl->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (impl->epoll_fd == -1) + return SPA_RESULT_ERRNO; + + spa_list_init(&impl->source_list); + spa_list_init(&impl->destroy_list); + spa_list_init(&impl->hooks_list); + + spa_ringbuffer_init(&impl->buffer, DATAS_SIZE); + + impl->event = spa_loop_utils_add_event(&impl->utils, event_func, impl); + + spa_log_info(impl->log, NAME " %p: initialized", impl); + + return SPA_RESULT_OK; +} + +static const struct spa_interface_info impl_interfaces[] = { + {SPA_TYPE__Loop,}, + {SPA_TYPE__LoopControl,}, + {SPA_TYPE__LoopUtils,}, +}; + +static int +impl_enum_interface_info(const struct spa_handle_factory *factory, + const struct spa_interface_info **info, + uint32_t index) +{ + spa_return_val_if_fail(factory != NULL, SPA_RESULT_INVALID_ARGUMENTS); + spa_return_val_if_fail(info != NULL, SPA_RESULT_INVALID_ARGUMENTS); + + if (index >= SPA_N_ELEMENTS(impl_interfaces)) + return SPA_RESULT_ENUM_END; + + *info = &impl_interfaces[index]; + return SPA_RESULT_OK; +} + +static const struct spa_handle_factory loop_factory = { + NAME, + NULL, + sizeof(struct impl), + impl_init, + impl_enum_interface_info, +}; + +static void reg(void) __attribute__ ((constructor)); +static void reg(void) +{ + spa_handle_factory_register(&loop_factory); +} diff --git a/spa/plugins/support/mapper.c b/spa/plugins/support/mapper.c index 74b2a7b7e..492eb4d7c 100644 --- a/spa/plugins/support/mapper.c +++ b/spa/plugins/support/mapper.c @@ -207,10 +207,16 @@ impl_enum_interface_info(const struct spa_handle_factory *factory, return SPA_RESULT_OK; } -const struct spa_handle_factory spa_type_map_factory = { +static const struct spa_handle_factory type_map_factory = { NAME, NULL, sizeof(struct impl), impl_init, impl_enum_interface_info, }; + +static void reg(void) __attribute__ ((constructor)); +static void reg(void) +{ + spa_handle_factory_register(&type_map_factory); +} diff --git a/spa/plugins/support/meson.build b/spa/plugins/support/meson.build index c5097acac..7a2d7734b 100644 --- a/spa/plugins/support/meson.build +++ b/spa/plugins/support/meson.build @@ -1,5 +1,6 @@ spa_support_sources = ['mapper.c', 'logger.c', + 'loop.c', 'plugin.c'] spa_support_lib = shared_library('spa-support', diff --git a/spa/plugins/support/plugin.c b/spa/plugins/support/plugin.c index 891fa0746..3cebdfd1f 100644 --- a/spa/plugins/support/plugin.c +++ b/spa/plugins/support/plugin.c @@ -20,23 +20,28 @@ #include #include -extern const struct spa_handle_factory spa_logger_factory; -extern const struct spa_handle_factory spa_type_map_factory; +#define MAX_FACTORIES 16 + +static const struct spa_handle_factory *factories[MAX_FACTORIES]; +static int n_factories; + +void +spa_handle_factory_register(const struct spa_handle_factory *factory) +{ + if (n_factories < MAX_FACTORIES) + factories[n_factories++] = factory; + else + fprintf(stderr, "too many factories\n"); +} int spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t index) { spa_return_val_if_fail(factory != NULL, SPA_RESULT_INVALID_ARGUMENTS); - switch (index) { - case 0: - *factory = &spa_type_map_factory; - break; - case 1: - *factory = &spa_logger_factory; - break; - default: + if (index >= n_factories) return SPA_RESULT_ENUM_END; - } + + *factory = factories[index]; return SPA_RESULT_OK; }