support: add support for checking loop context

Add check for running the the loop context and thread.

Add checks in filter and stream to avoid doing things when not run from
the context main-loop because this can crash things when doing IPC from
concurrent threads.
This commit is contained in:
Wim Taymans 2023-03-31 18:40:57 +02:00
parent a6497839bb
commit 74831aa967
7 changed files with 147 additions and 2 deletions

View file

@ -28,7 +28,7 @@ extern "C" {
struct spa_loop { struct spa_interface iface; };
#define SPA_TYPE_INTERFACE_LoopControl SPA_TYPE_INFO_INTERFACE_BASE "LoopControl"
#define SPA_VERSION_LOOP_CONTROL 0
#define SPA_VERSION_LOOP_CONTROL 1
struct spa_loop_control { struct spa_interface iface; };
#define SPA_TYPE_INTERFACE_LoopUtils SPA_TYPE_INFO_INTERFACE_BASE "LoopUtils"
@ -140,7 +140,7 @@ struct spa_loop_control_hooks {
struct spa_loop_control_methods {
/* the version of this structure. This can be used to expand this
* structure in the future */
#define SPA_VERSION_LOOP_CONTROL_METHODS 0
#define SPA_VERSION_LOOP_CONTROL_METHODS 1
uint32_t version;
int (*get_fd) (void *object);
@ -182,6 +182,16 @@ struct spa_loop_control_methods {
* The number of dispatched fds is returned.
*/
int (*iterate) (void *object, int timeout);
/** Check context of the loop
* \param ctrl the control
*
* This function will check if the current thread is currently the
* one that did the enter call. Since version 1:1.
*
* returns 1 on success, 0 or negative errno value on error.
*/
int (*check) (void *object);
};
#define spa_loop_control_method_v(o,method,version,...) \
@ -207,6 +217,7 @@ struct spa_loop_control_methods {
#define spa_loop_control_enter(l) spa_loop_control_method_v(l,enter,0)
#define spa_loop_control_leave(l) spa_loop_control_method_v(l,leave,0)
#define spa_loop_control_iterate(l,...) spa_loop_control_method_r(l,iterate,0,__VA_ARGS__)
#define spa_loop_control_check(l) spa_loop_control_method_r(l,check,1)
typedef void (*spa_source_io_func_t) (void *data, int fd, uint32_t mask);
typedef void (*spa_source_idle_func_t) (void *data);

View file

@ -356,6 +356,13 @@ static void loop_leave(void *object)
}
}
static int loop_check(void *object)
{
struct impl *impl = object;
pthread_t thread_id = pthread_self();
return (impl->thread == 0 || pthread_equal(impl->thread, thread_id)) ? 1 : 0;
}
static inline void free_source(struct source_impl *s)
{
detach_source(&s->source);
@ -826,6 +833,7 @@ static const struct spa_loop_control_methods impl_loop_control = {
.enter = loop_enter,
.leave = loop_leave,
.iterate = loop_iterate,
.check = loop_check,
};
static const struct spa_loop_utils_methods impl_loop_utils = {

View file

@ -1192,6 +1192,8 @@ filter_new(struct pw_context *context, const char *name,
struct match match;
int res;
ensure_loop(context->main_loop, return NULL);
impl = calloc(1, sizeof(struct filter));
if (impl == NULL) {
res = -errno;
@ -1397,6 +1399,8 @@ void pw_filter_destroy(struct pw_filter *filter)
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
struct port *p;
ensure_loop(impl->context->main_loop, return);
pw_log_debug("%p: destroy", filter);
pw_filter_emit_destroy(filter);
@ -1448,6 +1452,9 @@ void pw_filter_add_listener(struct pw_filter *filter,
void *data)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop);
spa_hook_list_append(&filter->listener_list, listener, events, data);
if (events->process && impl->rt_callbacks.funcs == NULL) {
impl->rt_callbacks = SPA_CALLBACKS_INIT(events, data);
@ -1496,6 +1503,8 @@ int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const
struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data);
int changed = 0;
ensure_loop(impl->context->main_loop, return -EIO);
if (port_data) {
changed = pw_properties_update(port->props, dict);
port->info.props = &port->props->dict;
@ -1533,6 +1542,8 @@ pw_filter_connect(struct pw_filter *filter,
uint32_t i;
struct spa_dict_item items[1];
ensure_loop(impl->context->main_loop, return -EIO);
if (filter->proxy != NULL || filter->state != PW_FILTER_STATE_UNCONNECTED)
return -EBUSY;
@ -1626,6 +1637,7 @@ SPA_EXPORT
int pw_filter_disconnect(struct pw_filter *filter)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop, return -EIO);
return filter_disconnect(impl);
}
@ -1708,6 +1720,8 @@ void *pw_filter_add_port(struct pw_filter *filter,
struct port *p;
const char *str;
ensure_loop(impl->context->main_loop, return NULL);
if (props == NULL)
props = pw_properties_new(NULL, NULL);
if (props == NULL)
@ -1769,6 +1783,9 @@ int pw_filter_remove_port(void *port_data)
{
struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data);
struct filter *impl = port->filter;
ensure_loop(impl->context->main_loop, return -EIO);
free_port(impl, port);
return 0;
}
@ -1777,6 +1794,10 @@ SPA_EXPORT
int pw_filter_set_error(struct pw_filter *filter,
int res, const char *error, ...)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop, return -EIO);
if (res < 0) {
va_list args;
char *value;
@ -1807,6 +1828,8 @@ int pw_filter_update_params(struct pw_filter *filter,
struct port *port;
int res;
ensure_loop(impl->context->main_loop, return -EIO);
pw_log_debug("%p: update params", filter);
port = port_data ? SPA_CONTAINER_OF(port_data, struct port, user_data) : NULL;
@ -1826,6 +1849,10 @@ int pw_filter_update_params(struct pw_filter *filter,
SPA_EXPORT
int pw_filter_set_active(struct pw_filter *filter, bool active)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop, return -EIO);
pw_log_debug("%p: active:%d", filter, active);
return 0;
}

View file

@ -9,6 +9,7 @@
#include <spa/utils/result.h>
#include <pipewire/pipewire.h>
#include <pipewire/private.h>
#include <pipewire/loop.h>
#include <pipewire/log.h>
#include <pipewire/type.h>
@ -23,6 +24,9 @@ struct impl {
struct spa_handle *system_handle;
struct spa_handle *loop_handle;
void *user_data;
const struct pw_loop_callbacks *cb;
};
/** \endcond */
@ -140,3 +144,24 @@ void pw_loop_destroy(struct pw_loop *loop)
pw_unload_spa_handle(impl->system_handle);
free(impl);
}
void
pw_loop_set_callbacks(struct pw_loop *loop, const struct pw_loop_callbacks *cb, void *data)
{
struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, this);
impl->user_data = data;
impl->cb = cb;
}
SPA_EXPORT
int pw_loop_check(struct pw_loop *loop)
{
struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, this);
int res;
if (impl->cb && impl->cb->check)
res = impl->cb->check(impl->user_data, loop);
else
res = spa_loop_control_check(loop->control);
return res;
}

View file

@ -374,6 +374,29 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq,
va_end(args);
}
struct pw_loop_callbacks {
#define PW_VERSION_LOOP_CALLBACKS 0
uint32_t version;
int (*check) (void *data, struct pw_loop *loop);
};
void
pw_loop_set_callbacks(struct pw_loop *loop, const struct pw_loop_callbacks *cb, void *data);
int pw_loop_check(struct pw_loop *loop);
#define ensure_loop(loop,...) ({ \
int res = pw_loop_check(loop); \
if (res != 1) { \
pw_log_error("%s called from wrong context, check locking: %s", \
__func__, spa_strerror(res)); \
fprintf(stderr, "*** %s called from wrong context, check locking: %s\n",\
__func__, spa_strerror(res)); \
__VA_ARGS__; \
} \
})
#define pw_context_driver_emit(c,m,v,...) spa_hook_list_call_simple(&c->driver_listener_list, struct pw_context_driver_events, m, v, ##__VA_ARGS__)
#define pw_context_driver_emit_start(c,n) pw_context_driver_emit(c, start, 0, n)
#define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n)

View file

@ -1436,6 +1436,8 @@ stream_new(struct pw_context *context, const char *name,
struct match match;
int res;
ensure_loop(context->main_loop, return NULL);
impl = calloc(1, sizeof(struct stream));
if (impl == NULL) {
res = -errno;
@ -1647,6 +1649,8 @@ void pw_stream_destroy(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct control *c;
ensure_loop(impl->context->main_loop, return);
pw_log_debug("%p: destroy", stream);
pw_stream_emit_destroy(stream);
@ -1701,6 +1705,9 @@ void pw_stream_add_listener(struct pw_stream *stream,
void *data)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop);
spa_hook_list_append(&stream->listener_list, listener, events, data);
if (events->process && impl->rt_callbacks.funcs == NULL) {
@ -1737,6 +1744,8 @@ int pw_stream_update_properties(struct pw_stream *stream, const struct spa_dict
int changed, res = 0;
struct match match;
ensure_loop(impl->context->main_loop, return -EIO);
changed = pw_properties_update(stream->properties, dict);
if (!changed)
return 0;
@ -1846,6 +1855,8 @@ pw_stream_connect(struct pw_stream *stream,
uint32_t i;
int res;
ensure_loop(impl->context->main_loop, return -EIO);
pw_log_debug("%p: connect target:%d", stream, target_id);
if (impl->node != NULL || stream->state != PW_STREAM_STATE_UNCONNECTED)
@ -2075,6 +2086,7 @@ SPA_EXPORT
int pw_stream_disconnect(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
return stream_disconnect(impl);
}
@ -2082,6 +2094,10 @@ SPA_EXPORT
int pw_stream_set_error(struct pw_stream *stream,
int res, const char *error, ...)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
if (res < 0) {
va_list args;
char *value;
@ -2110,6 +2126,8 @@ int pw_stream_update_params(struct pw_stream *stream,
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int res;
ensure_loop(impl->context->main_loop, return -EIO);
pw_log_debug("%p: update params", stream);
if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0)
return res;
@ -2131,6 +2149,8 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_valu
struct spa_pod *pod;
struct control *c;
ensure_loop(impl->context->main_loop, return -EIO);
if (impl->node == NULL)
return -EIO;
@ -2200,6 +2220,8 @@ int pw_stream_set_active(struct pw_stream *stream, bool active)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
pw_log_debug("%p: active:%d", stream, active);
if (impl->node == NULL)

View file

@ -9,6 +9,7 @@
#include <spa/support/thread.h>
#include <spa/utils/result.h>
#include "private.h"
#include "log.h"
#include "thread.h"
#include "thread-loop.h"
@ -85,6 +86,33 @@ static const struct spa_loop_control_hooks impl_hooks = {
.after = impl_after,
};
static int impl_check(void *data, struct pw_loop *loop)
{
struct pw_thread_loop *this = data;
int res;
/* we are in the thread running the loop */
if (spa_loop_control_check(this->loop->control) == 1)
return 1;
/* if lock taken by something else, error */
if ((res = pthread_mutex_trylock(&this->lock)) != 0) {
pw_log_debug("%p: thread:%lu: %s", this, pthread_self(), strerror(res));
return -res;
}
/* we could take the lock, check if we actually locked it somewhere */
res = this->recurse > 0 ? 1 : -EPERM;
if (res < 0)
pw_log_debug("%p: thread:%lu: recurse:%d", this, pthread_self(), this->recurse);
pthread_mutex_unlock(&this->lock);
return res;
}
static const struct pw_loop_callbacks impl_callbacks = {
PW_VERSION_LOOP_CALLBACKS,
.check = impl_check,
};
static void do_stop(void *data, uint64_t count)
{
struct pw_thread_loop *this = data;
@ -144,6 +172,7 @@ static struct pw_thread_loop *loop_new(struct pw_loop *loop,
goto clean_acceptcond;
}
pw_loop_set_callbacks(loop, &impl_callbacks, this);
pw_loop_add_hook(loop, &this->hook, &impl_hooks, this);
return this;