diff --git a/spa/include/spa/support/loop.h b/spa/include/spa/support/loop.h index a72bbd560..0935c6243 100644 --- a/spa/include/spa/support/loop.h +++ b/spa/include/spa/support/loop.h @@ -28,7 +28,7 @@ extern "C" { struct spa_loop { struct spa_interface iface; }; #define SPA_TYPE_INTERFACE_LoopControl SPA_TYPE_INFO_INTERFACE_BASE "LoopControl" -#define SPA_VERSION_LOOP_CONTROL 0 +#define SPA_VERSION_LOOP_CONTROL 1 struct spa_loop_control { struct spa_interface iface; }; #define SPA_TYPE_INTERFACE_LoopUtils SPA_TYPE_INFO_INTERFACE_BASE "LoopUtils" @@ -140,7 +140,7 @@ struct spa_loop_control_hooks { struct spa_loop_control_methods { /* the version of this structure. This can be used to expand this * structure in the future */ -#define SPA_VERSION_LOOP_CONTROL_METHODS 0 +#define SPA_VERSION_LOOP_CONTROL_METHODS 1 uint32_t version; int (*get_fd) (void *object); @@ -182,6 +182,16 @@ struct spa_loop_control_methods { * The number of dispatched fds is returned. */ int (*iterate) (void *object, int timeout); + + /** Check context of the loop + * \param ctrl the control + * + * This function will check if the current thread is currently the + * one that did the enter call. Since version 1:1. + * + * returns 1 on success, 0 or negative errno value on error. + */ + int (*check) (void *object); }; #define spa_loop_control_method_v(o,method,version,...) \ @@ -207,6 +217,7 @@ struct spa_loop_control_methods { #define spa_loop_control_enter(l) spa_loop_control_method_v(l,enter,0) #define spa_loop_control_leave(l) spa_loop_control_method_v(l,leave,0) #define spa_loop_control_iterate(l,...) spa_loop_control_method_r(l,iterate,0,__VA_ARGS__) +#define spa_loop_control_check(l) spa_loop_control_method_r(l,check,1) typedef void (*spa_source_io_func_t) (void *data, int fd, uint32_t mask); typedef void (*spa_source_idle_func_t) (void *data); diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 19f32e0c7..4b2d94eaf 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -356,6 +356,13 @@ static void loop_leave(void *object) } } +static int loop_check(void *object) +{ + struct impl *impl = object; + pthread_t thread_id = pthread_self(); + return (impl->thread == 0 || pthread_equal(impl->thread, thread_id)) ? 1 : 0; +} + static inline void free_source(struct source_impl *s) { detach_source(&s->source); @@ -826,6 +833,7 @@ static const struct spa_loop_control_methods impl_loop_control = { .enter = loop_enter, .leave = loop_leave, .iterate = loop_iterate, + .check = loop_check, }; static const struct spa_loop_utils_methods impl_loop_utils = { diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 9e9214e7d..5365c8c17 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -1192,6 +1192,8 @@ filter_new(struct pw_context *context, const char *name, struct match match; int res; + ensure_loop(context->main_loop, return NULL); + impl = calloc(1, sizeof(struct filter)); if (impl == NULL) { res = -errno; @@ -1397,6 +1399,8 @@ void pw_filter_destroy(struct pw_filter *filter) struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); struct port *p; + ensure_loop(impl->context->main_loop, return); + pw_log_debug("%p: destroy", filter); pw_filter_emit_destroy(filter); @@ -1448,6 +1452,9 @@ void pw_filter_add_listener(struct pw_filter *filter, void *data) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + + ensure_loop(impl->context->main_loop); + spa_hook_list_append(&filter->listener_list, listener, events, data); if (events->process && impl->rt_callbacks.funcs == NULL) { impl->rt_callbacks = SPA_CALLBACKS_INIT(events, data); @@ -1496,6 +1503,8 @@ int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); int changed = 0; + ensure_loop(impl->context->main_loop, return -EIO); + if (port_data) { changed = pw_properties_update(port->props, dict); port->info.props = &port->props->dict; @@ -1533,6 +1542,8 @@ pw_filter_connect(struct pw_filter *filter, uint32_t i; struct spa_dict_item items[1]; + ensure_loop(impl->context->main_loop, return -EIO); + if (filter->proxy != NULL || filter->state != PW_FILTER_STATE_UNCONNECTED) return -EBUSY; @@ -1626,6 +1637,7 @@ SPA_EXPORT int pw_filter_disconnect(struct pw_filter *filter) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + ensure_loop(impl->context->main_loop, return -EIO); return filter_disconnect(impl); } @@ -1708,6 +1720,8 @@ void *pw_filter_add_port(struct pw_filter *filter, struct port *p; const char *str; + ensure_loop(impl->context->main_loop, return NULL); + if (props == NULL) props = pw_properties_new(NULL, NULL); if (props == NULL) @@ -1769,6 +1783,9 @@ int pw_filter_remove_port(void *port_data) { struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); struct filter *impl = port->filter; + + ensure_loop(impl->context->main_loop, return -EIO); + free_port(impl, port); return 0; } @@ -1777,6 +1794,10 @@ SPA_EXPORT int pw_filter_set_error(struct pw_filter *filter, int res, const char *error, ...) { + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + + ensure_loop(impl->context->main_loop, return -EIO); + if (res < 0) { va_list args; char *value; @@ -1807,6 +1828,8 @@ int pw_filter_update_params(struct pw_filter *filter, struct port *port; int res; + ensure_loop(impl->context->main_loop, return -EIO); + pw_log_debug("%p: update params", filter); port = port_data ? SPA_CONTAINER_OF(port_data, struct port, user_data) : NULL; @@ -1826,6 +1849,10 @@ int pw_filter_update_params(struct pw_filter *filter, SPA_EXPORT int pw_filter_set_active(struct pw_filter *filter, bool active) { + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + + ensure_loop(impl->context->main_loop, return -EIO); + pw_log_debug("%p: active:%d", filter, active); return 0; } diff --git a/src/pipewire/loop.c b/src/pipewire/loop.c index 96aa53706..0d9248e55 100644 --- a/src/pipewire/loop.c +++ b/src/pipewire/loop.c @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,9 @@ struct impl { struct spa_handle *system_handle; struct spa_handle *loop_handle; + + void *user_data; + const struct pw_loop_callbacks *cb; }; /** \endcond */ @@ -140,3 +144,24 @@ void pw_loop_destroy(struct pw_loop *loop) pw_unload_spa_handle(impl->system_handle); free(impl); } + +void +pw_loop_set_callbacks(struct pw_loop *loop, const struct pw_loop_callbacks *cb, void *data) +{ + struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, this); + + impl->user_data = data; + impl->cb = cb; +} + +SPA_EXPORT +int pw_loop_check(struct pw_loop *loop) +{ + struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, this); + int res; + if (impl->cb && impl->cb->check) + res = impl->cb->check(impl->user_data, loop); + else + res = spa_loop_control_check(loop->control); + return res; +} diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 5303c5b77..ec8ce7656 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -374,6 +374,29 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq, va_end(args); } +struct pw_loop_callbacks { +#define PW_VERSION_LOOP_CALLBACKS 0 + uint32_t version; + + int (*check) (void *data, struct pw_loop *loop); +}; + +void +pw_loop_set_callbacks(struct pw_loop *loop, const struct pw_loop_callbacks *cb, void *data); + +int pw_loop_check(struct pw_loop *loop); + +#define ensure_loop(loop,...) ({ \ + int res = pw_loop_check(loop); \ + if (res != 1) { \ + pw_log_error("%s called from wrong context, check locking: %s", \ + __func__, spa_strerror(res)); \ + fprintf(stderr, "*** %s called from wrong context, check locking: %s\n",\ + __func__, spa_strerror(res)); \ + __VA_ARGS__; \ + } \ +}) + #define pw_context_driver_emit(c,m,v,...) spa_hook_list_call_simple(&c->driver_listener_list, struct pw_context_driver_events, m, v, ##__VA_ARGS__) #define pw_context_driver_emit_start(c,n) pw_context_driver_emit(c, start, 0, n) #define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 604254973..fecce8e53 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -1436,6 +1436,8 @@ stream_new(struct pw_context *context, const char *name, struct match match; int res; + ensure_loop(context->main_loop, return NULL); + impl = calloc(1, sizeof(struct stream)); if (impl == NULL) { res = -errno; @@ -1647,6 +1649,8 @@ void pw_stream_destroy(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct control *c; + ensure_loop(impl->context->main_loop, return); + pw_log_debug("%p: destroy", stream); pw_stream_emit_destroy(stream); @@ -1701,6 +1705,9 @@ void pw_stream_add_listener(struct pw_stream *stream, void *data) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + ensure_loop(impl->context->main_loop); + spa_hook_list_append(&stream->listener_list, listener, events, data); if (events->process && impl->rt_callbacks.funcs == NULL) { @@ -1737,6 +1744,8 @@ int pw_stream_update_properties(struct pw_stream *stream, const struct spa_dict int changed, res = 0; struct match match; + ensure_loop(impl->context->main_loop, return -EIO); + changed = pw_properties_update(stream->properties, dict); if (!changed) return 0; @@ -1846,6 +1855,8 @@ pw_stream_connect(struct pw_stream *stream, uint32_t i; int res; + ensure_loop(impl->context->main_loop, return -EIO); + pw_log_debug("%p: connect target:%d", stream, target_id); if (impl->node != NULL || stream->state != PW_STREAM_STATE_UNCONNECTED) @@ -2075,6 +2086,7 @@ SPA_EXPORT int pw_stream_disconnect(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + ensure_loop(impl->context->main_loop, return -EIO); return stream_disconnect(impl); } @@ -2082,6 +2094,10 @@ SPA_EXPORT int pw_stream_set_error(struct pw_stream *stream, int res, const char *error, ...) { + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + ensure_loop(impl->context->main_loop, return -EIO); + if (res < 0) { va_list args; char *value; @@ -2110,6 +2126,8 @@ int pw_stream_update_params(struct pw_stream *stream, struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); int res; + ensure_loop(impl->context->main_loop, return -EIO); + pw_log_debug("%p: update params", stream); if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0) return res; @@ -2131,6 +2149,8 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_valu struct spa_pod *pod; struct control *c; + ensure_loop(impl->context->main_loop, return -EIO); + if (impl->node == NULL) return -EIO; @@ -2200,6 +2220,8 @@ int pw_stream_set_active(struct pw_stream *stream, bool active) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + ensure_loop(impl->context->main_loop, return -EIO); + pw_log_debug("%p: active:%d", stream, active); if (impl->node == NULL) diff --git a/src/pipewire/thread-loop.c b/src/pipewire/thread-loop.c index 329bd49a7..a9529d3c8 100644 --- a/src/pipewire/thread-loop.c +++ b/src/pipewire/thread-loop.c @@ -9,6 +9,7 @@ #include #include +#include "private.h" #include "log.h" #include "thread.h" #include "thread-loop.h" @@ -85,6 +86,33 @@ static const struct spa_loop_control_hooks impl_hooks = { .after = impl_after, }; +static int impl_check(void *data, struct pw_loop *loop) +{ + struct pw_thread_loop *this = data; + int res; + + /* we are in the thread running the loop */ + if (spa_loop_control_check(this->loop->control) == 1) + return 1; + + /* if lock taken by something else, error */ + if ((res = pthread_mutex_trylock(&this->lock)) != 0) { + pw_log_debug("%p: thread:%lu: %s", this, pthread_self(), strerror(res)); + return -res; + } + /* we could take the lock, check if we actually locked it somewhere */ + res = this->recurse > 0 ? 1 : -EPERM; + if (res < 0) + pw_log_debug("%p: thread:%lu: recurse:%d", this, pthread_self(), this->recurse); + pthread_mutex_unlock(&this->lock); + return res; +} + +static const struct pw_loop_callbacks impl_callbacks = { + PW_VERSION_LOOP_CALLBACKS, + .check = impl_check, +}; + static void do_stop(void *data, uint64_t count) { struct pw_thread_loop *this = data; @@ -144,6 +172,7 @@ static struct pw_thread_loop *loop_new(struct pw_loop *loop, goto clean_acceptcond; } + pw_loop_set_callbacks(loop, &impl_callbacks, this); pw_loop_add_hook(loop, &this->hook, &impl_hooks, this); return this;