diff --git a/src/modules/module-protocol-pulse/internal.h b/src/modules/module-protocol-pulse/internal.h index 903d13f54..2646c14be 100644 --- a/src/modules/module-protocol-pulse/internal.h +++ b/src/modules/module-protocol-pulse/internal.h @@ -79,7 +79,6 @@ struct client { const char *name; struct spa_source *source; - struct spa_source *cleanup; uint32_t version; @@ -220,7 +219,7 @@ struct impl { struct spa_source *source; struct spa_list servers; - struct spa_source *cleanup; + struct pw_work_queue *work_queue; struct spa_list cleanup_clients; struct pw_map samples; diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 02df1eae1..fe413ef0a 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -122,6 +122,7 @@ static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t #include "module.c" #include "message-handler.c" +static void client_free(struct client *client); static void sample_free(struct sample *sample) { @@ -1368,6 +1369,15 @@ static void stream_control_info(void *data, uint32_t id, } } +static void on_stream_cleanup(void *obj, void *data, int res, uint32_t id) +{ + struct stream *stream = obj; + struct client *client = stream->client; + stream_free(stream); + if (client->ref <= 0) + client_free(client); +} + static void stream_state_changed(void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) { @@ -1390,8 +1400,10 @@ static void stream_state_changed(void *data, enum pw_stream_state old, case PW_STREAM_STATE_STREAMING: break; } - if (stream->done) - pw_loop_signal_event(impl->loop, client->cleanup); + if (stream->done) { + pw_work_queue_add(impl->work_queue, stream, 0, + on_stream_cleanup, client); + } } static const struct spa_pod *get_buffers_param(struct stream *s, @@ -2653,6 +2665,15 @@ static void sample_play_ready(void *data, uint32_t index) send_message(client, reply); } +static void on_sample_done(void *obj, void *data, int res, uint32_t id) +{ + struct pending_sample *ps = obj; + struct client *client = ps->client; + pending_sample_free(ps); + if (client->ref <= 0) + client_free(client); +} + static void sample_play_done(void *data, int res) { struct pending_sample *ps = data; @@ -2665,7 +2686,8 @@ static void sample_play_done(void *data, int res) pw_log_info(NAME" %p: PLAY_SAMPLE done tag:%u", client, ps->tag); ps->done = true; - pw_loop_signal_event(impl->loop, client->cleanup); + pw_work_queue_add(impl->work_queue, ps, 0, + on_sample_done, client); } static const struct sample_play_events sample_play_events = { @@ -5359,8 +5381,6 @@ static void client_free(struct client *client) pw_properties_free(client->props); if (client->routes) pw_properties_free(client->routes); - if (client->cleanup) - pw_loop_destroy_source(impl->loop, client->cleanup); free(client); } @@ -5565,30 +5585,6 @@ exit: return res; } -static int client_cleanup_stream(void *item, void *data) -{ - struct stream *s = item; - if (s->done) - stream_free(s); - return 0; -} - -static void on_client_cleanup(void *data, uint64_t count) -{ - struct client *client = data; - struct impl *impl = client->impl; - struct pending_sample *p, *t; - - spa_list_for_each_safe(p, t, &client->pending_samples, link) { - if (p->done) - pending_sample_free(p); - } - pw_map_for_each(&client->streams, client_cleanup_stream, client); - - if (client->ref <= 0) - pw_loop_signal_event(impl->loop, impl->cleanup); -} - static void on_client_data(void *data, int fd, uint32_t mask) { @@ -5786,11 +5782,6 @@ on_connect(void *data, int fd, uint32_t mask) if (client->source == NULL) goto error; - client->cleanup = pw_loop_add_event(impl->loop, - on_client_cleanup, client); - if (client->cleanup == NULL) - goto error; - return; error: pw_log_error(NAME" %p: failed to create client: %m", impl); @@ -6140,8 +6131,6 @@ static void impl_free(struct impl *impl) pw_map_for_each(&impl->modules, impl_free_module, impl); pw_map_clear(&impl->modules); - if (impl->cleanup != NULL) - pw_loop_destroy_source(impl->loop, impl->cleanup); pw_properties_free(impl->props); free(impl); } @@ -6161,16 +6150,6 @@ static const struct pw_context_events context_events = { .destroy = context_destroy, }; -static void on_server_cleanup(void *data, uint64_t count) -{ - struct impl *impl = data; - struct client *c, *t; - spa_list_for_each_safe(c, t, &impl->cleanup_clients, link) { - if (c->ref <= 0) - client_free(c); - } -} - static int parse_frac(struct pw_properties *props, const char *key, const char *def, struct spa_fraction *res) { @@ -6271,9 +6250,8 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context, impl->loop = pw_context_get_main_loop(context); impl->props = props; - impl->cleanup = pw_loop_add_event(impl->loop, - on_server_cleanup, impl); - if (impl->cleanup == NULL) + impl->work_queue = pw_context_get_work_queue(context); + if (impl->work_queue == NULL) goto error_free; spa_list_init(&impl->servers);