stream: emit events from only one thread

Emitting events using the listener_list can not be done from multiple
threads at the same time.  For this reason, make a copy of the events
with the process event and call it explicitly from the data thread.

See #1122
This commit is contained in:
Wim Taymans 2021-05-28 13:24:10 +02:00
parent a138152bef
commit 934caab1e1
2 changed files with 33 additions and 2 deletions

View file

@ -154,6 +154,8 @@ struct filter {
uint64_t base_pos;
uint32_t clock_id;
struct spa_callbacks rt_callbacks;
unsigned int disconnecting:1;
unsigned int disconnect_core:1;
unsigned int subscribe:1;
@ -1182,13 +1184,27 @@ void pw_filter_destroy(struct pw_filter *filter)
free(impl);
}
static void hook_removed(struct spa_hook *hook)
{
struct filter *impl = hook->priv;
spa_zero(impl->rt_callbacks);
hook->priv = NULL;
hook->removed = NULL;
}
SPA_EXPORT
void pw_filter_add_listener(struct pw_filter *filter,
struct spa_hook *listener,
const struct pw_filter_events *events,
void *data)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
spa_hook_list_append(&filter->listener_list, listener, events, data);
if (events->process && impl->rt_callbacks.funcs == NULL) {
impl->rt_callbacks = SPA_CALLBACKS_INIT(events, data);
listener->removed = hook_removed;
listener->priv = impl;
}
}
SPA_EXPORT

View file

@ -155,6 +155,8 @@ struct stream {
struct spa_latency_info latency;
uint64_t quantum;
struct spa_callbacks rt_callbacks;
unsigned int disconnecting:1;
unsigned int disconnect_core:1;
unsigned int draining:1;
@ -388,10 +390,9 @@ do_call_process(struct spa_loop *loop,
static void call_process(struct stream *impl)
{
struct pw_stream *stream = &impl->this;
pw_log_trace(NAME" %p: call process rt:%u", impl, impl->process_rt);
if (impl->process_rt)
pw_stream_emit_process(stream);
spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
else
pw_loop_invoke(impl->context->main_loop,
do_call_process, 1, NULL, 0, false, impl);
@ -1443,13 +1444,27 @@ void pw_stream_destroy(struct pw_stream *stream)
free(impl);
}
static void hook_removed(struct spa_hook *hook)
{
struct stream *impl = hook->priv;
spa_zero(impl->rt_callbacks);
hook->priv = NULL;
hook->removed = NULL;
}
SPA_EXPORT
void pw_stream_add_listener(struct pw_stream *stream,
struct spa_hook *listener,
const struct pw_stream_events *events,
void *data)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
spa_hook_list_append(&stream->listener_list, listener, events, data);
if (events->process && impl->rt_callbacks.funcs == NULL) {
impl->rt_callbacks = SPA_CALLBACKS_INIT(events, data);
listener->removed = hook_removed;
listener->priv = impl;
}
}
SPA_EXPORT