diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 1983f3543..2371a49b3 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -99,6 +99,7 @@ struct client { struct impl *impl; struct server *server; + int ref; const char *name; struct spa_source *source; @@ -133,8 +134,9 @@ struct client { struct spa_list operations; struct spa_list modules; - struct spa_list samples; + struct spa_list pending_samples; + unsigned int disconnect:1; unsigned int disconnecting:1; unsigned int need_flush:1; }; @@ -228,6 +230,9 @@ struct impl { struct spa_source *source; struct spa_list servers; + struct spa_source *cleanup; + struct spa_list cleanup_clients; + struct pw_map samples; struct spa_list free_messages; @@ -2346,6 +2351,7 @@ static void pending_sample_free(struct pending_sample *ps) spa_list_remove(&ps->link); spa_hook_remove(&ps->listener); sample_play_destroy(ps->play); + ps->client->ref--; } static void sample_play_ready(void *data, uint32_t index) @@ -2447,7 +2453,8 @@ static int do_play_sample(struct client *client, uint32_t command, uint32_t tag, ps->play = play; ps->tag = tag; sample_play_add_listener(play, &ps->listener, &sample_play_events, ps); - spa_list_append(&client->samples, &ps->link); + spa_list_append(&client->pending_samples, &ps->link); + client->ref++; return 0; @@ -4789,28 +4796,48 @@ static int client_free_stream(void *item, void *data) return 0; } +static void client_disconnect(struct client *client) +{ + struct impl *impl = client->impl; + + if (client->disconnect) + return; + + client->disconnect = true; + spa_list_remove(&client->link); + spa_list_append(&impl->cleanup_clients, &client->link); + + pw_map_for_each(&client->streams, client_free_stream, client); + pw_map_clear(&client->streams); + + if (client->source) + pw_loop_destroy_source(impl->loop, client->source); + if (client->manager) + pw_manager_destroy(client->manager); +} + static void client_free(struct client *client) { struct impl *impl = client->impl; struct message *msg; struct module *module, *tmp; - struct pending_sample *p, *t; + struct pending_sample *p; + + pw_log_info(NAME" %p: client %p free", impl, client); + + client_disconnect(client); - pw_log_debug(NAME" %p: client %p free", impl, client); spa_list_remove(&client->link); - pw_map_for_each(&client->streams, client_free_stream, client); - pw_map_clear(&client->streams); - - spa_list_for_each_safe(p, t, &client->samples, link) + spa_list_consume(p, &client->pending_samples, link) pending_sample_free(p); spa_list_for_each_safe(module, tmp, &client->modules, link) unload_module(module); + spa_list_consume(msg, &client->out_messages, link) message_free(impl, msg, true, false); - if (client->manager) - pw_manager_destroy(client->manager); + if (client->core) { client->disconnecting = true; pw_core_disconnect(client->core); @@ -4819,13 +4846,17 @@ static void client_free(struct client *client) pw_properties_free(client->props); if (client->routes) pw_properties_free(client->routes); - if (client->source) - pw_loop_destroy_source(impl->loop, client->source); if (client->cleanup) pw_loop_destroy_source(impl->loop, client->cleanup); free(client); } +static void client_unref(struct client *client) +{ + if (--client->ref == 0) + client_free(client); +} + static int handle_packet(struct client *client, struct message *msg) { struct impl *impl = client->impl; @@ -5026,12 +5057,17 @@ static int client_cleanup_stream(void *item, void *data) static void on_client_cleanup(void *data, uint64_t count) { struct client *client = data; + struct impl *impl = client->impl; struct pending_sample *p, *t; - spa_list_for_each_safe(p, t, &client->samples, link) { + + spa_list_for_each_safe(p, t, &client->pending_samples, link) { if (p->done) pending_sample_free(p); } pw_map_for_each(&client->streams, client_cleanup_stream, client); + + if (client->ref <= 0) + pw_loop_signal_event(impl->loop, impl->cleanup); } static void @@ -5081,7 +5117,8 @@ error: client, client->name, res, spa_strerror(res)); return; } - client_free(client); + client_disconnect(client); + client_unref(client); } static void @@ -5099,6 +5136,7 @@ on_connect(void *data, int fd, uint32_t mask) goto error; client->impl = impl; + client->ref = 1; client->server = server; client->connect_tag = SPA_ID_INVALID; spa_list_append(&server->clients, &client->link); @@ -5106,7 +5144,7 @@ on_connect(void *data, int fd, uint32_t mask) spa_list_init(&client->out_messages); spa_list_init(&client->operations); spa_list_init(&client->modules); - spa_list_init(&client->samples); + spa_list_init(&client->pending_samples); client->props = pw_properties_new( PW_KEY_CLIENT_API, "pipewire-pulse", @@ -5449,12 +5487,18 @@ static int impl_free_sample(void *item, void *data) static void impl_free(struct impl *impl) { struct server *s; + struct client *c; + if (impl->context) spa_hook_remove(&impl->context_listener); + spa_list_consume(c, &impl->cleanup_clients, link) + client_free(c); spa_list_consume(s, &impl->servers, link) server_free(s); pw_map_for_each(&impl->samples, impl_free_sample, impl); pw_map_clear(&impl->samples); + if (impl->cleanup) + pw_loop_destroy_source(impl->loop, impl->cleanup); if (impl->props) pw_properties_free(impl->props); free(impl); @@ -5475,6 +5519,16 @@ static const struct pw_context_events context_events = { .destroy = context_destroy, }; +static void on_server_cleanup(void *data, uint64_t count) +{ + struct impl *impl = data; + struct client *c, *t; + spa_list_for_each_safe(c, t, &impl->cleanup_clients, link) { + if (c->ref <= 0) + client_free(c); + } +} + struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context, struct pw_properties *props, size_t user_data_size) { @@ -5503,10 +5557,17 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context, impl->context = context; impl->loop = pw_context_get_main_loop(context); impl->props = props; + + impl->cleanup = pw_loop_add_event(impl->loop, + on_server_cleanup, impl); + if (impl->cleanup == NULL) + goto error_free; + spa_list_init(&impl->servers); impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; impl->rate_limit.burst = 1; pw_map_init(&impl->samples, 16, 16); + spa_list_init(&impl->cleanup_clients); spa_list_init(&impl->free_messages); pw_context_add_listener(context, &impl->context_listener,