pulse-server: don't destroy clients playing a sample

When a sample is playing a cached sample and it disconnects we should
wait until all the pending samples are finished otherwise we stop
their playback and get a chopped or broken sample.

See #595
This commit is contained in:
Wim Taymans 2021-01-21 11:53:06 +01:00
parent dcc5779cd6
commit d9af97899c

View file

@ -99,6 +99,7 @@ struct client {
struct impl *impl; struct impl *impl;
struct server *server; struct server *server;
int ref;
const char *name; const char *name;
struct spa_source *source; struct spa_source *source;
@ -133,8 +134,9 @@ struct client {
struct spa_list operations; struct spa_list operations;
struct spa_list modules; struct spa_list modules;
struct spa_list samples; struct spa_list pending_samples;
unsigned int disconnect:1;
unsigned int disconnecting:1; unsigned int disconnecting:1;
unsigned int need_flush:1; unsigned int need_flush:1;
}; };
@ -228,6 +230,9 @@ struct impl {
struct spa_source *source; struct spa_source *source;
struct spa_list servers; struct spa_list servers;
struct spa_source *cleanup;
struct spa_list cleanup_clients;
struct pw_map samples; struct pw_map samples;
struct spa_list free_messages; struct spa_list free_messages;
@ -2346,6 +2351,7 @@ static void pending_sample_free(struct pending_sample *ps)
spa_list_remove(&ps->link); spa_list_remove(&ps->link);
spa_hook_remove(&ps->listener); spa_hook_remove(&ps->listener);
sample_play_destroy(ps->play); sample_play_destroy(ps->play);
ps->client->ref--;
} }
static void sample_play_ready(void *data, uint32_t index) static void sample_play_ready(void *data, uint32_t index)
@ -2447,7 +2453,8 @@ static int do_play_sample(struct client *client, uint32_t command, uint32_t tag,
ps->play = play; ps->play = play;
ps->tag = tag; ps->tag = tag;
sample_play_add_listener(play, &ps->listener, &sample_play_events, ps); sample_play_add_listener(play, &ps->listener, &sample_play_events, ps);
spa_list_append(&client->samples, &ps->link); spa_list_append(&client->pending_samples, &ps->link);
client->ref++;
return 0; return 0;
@ -4789,28 +4796,48 @@ static int client_free_stream(void *item, void *data)
return 0; return 0;
} }
static void client_disconnect(struct client *client)
{
struct impl *impl = client->impl;
if (client->disconnect)
return;
client->disconnect = true;
spa_list_remove(&client->link);
spa_list_append(&impl->cleanup_clients, &client->link);
pw_map_for_each(&client->streams, client_free_stream, client);
pw_map_clear(&client->streams);
if (client->source)
pw_loop_destroy_source(impl->loop, client->source);
if (client->manager)
pw_manager_destroy(client->manager);
}
static void client_free(struct client *client) static void client_free(struct client *client)
{ {
struct impl *impl = client->impl; struct impl *impl = client->impl;
struct message *msg; struct message *msg;
struct module *module, *tmp; struct module *module, *tmp;
struct pending_sample *p, *t; struct pending_sample *p;
pw_log_info(NAME" %p: client %p free", impl, client);
client_disconnect(client);
pw_log_debug(NAME" %p: client %p free", impl, client);
spa_list_remove(&client->link); spa_list_remove(&client->link);
pw_map_for_each(&client->streams, client_free_stream, client); spa_list_consume(p, &client->pending_samples, link)
pw_map_clear(&client->streams);
spa_list_for_each_safe(p, t, &client->samples, link)
pending_sample_free(p); pending_sample_free(p);
spa_list_for_each_safe(module, tmp, &client->modules, link) spa_list_for_each_safe(module, tmp, &client->modules, link)
unload_module(module); unload_module(module);
spa_list_consume(msg, &client->out_messages, link) spa_list_consume(msg, &client->out_messages, link)
message_free(impl, msg, true, false); message_free(impl, msg, true, false);
if (client->manager)
pw_manager_destroy(client->manager);
if (client->core) { if (client->core) {
client->disconnecting = true; client->disconnecting = true;
pw_core_disconnect(client->core); pw_core_disconnect(client->core);
@ -4819,13 +4846,17 @@ static void client_free(struct client *client)
pw_properties_free(client->props); pw_properties_free(client->props);
if (client->routes) if (client->routes)
pw_properties_free(client->routes); pw_properties_free(client->routes);
if (client->source)
pw_loop_destroy_source(impl->loop, client->source);
if (client->cleanup) if (client->cleanup)
pw_loop_destroy_source(impl->loop, client->cleanup); pw_loop_destroy_source(impl->loop, client->cleanup);
free(client); free(client);
} }
static void client_unref(struct client *client)
{
if (--client->ref == 0)
client_free(client);
}
static int handle_packet(struct client *client, struct message *msg) static int handle_packet(struct client *client, struct message *msg)
{ {
struct impl *impl = client->impl; struct impl *impl = client->impl;
@ -5026,12 +5057,17 @@ static int client_cleanup_stream(void *item, void *data)
static void on_client_cleanup(void *data, uint64_t count) static void on_client_cleanup(void *data, uint64_t count)
{ {
struct client *client = data; struct client *client = data;
struct impl *impl = client->impl;
struct pending_sample *p, *t; struct pending_sample *p, *t;
spa_list_for_each_safe(p, t, &client->samples, link) {
spa_list_for_each_safe(p, t, &client->pending_samples, link) {
if (p->done) if (p->done)
pending_sample_free(p); pending_sample_free(p);
} }
pw_map_for_each(&client->streams, client_cleanup_stream, client); pw_map_for_each(&client->streams, client_cleanup_stream, client);
if (client->ref <= 0)
pw_loop_signal_event(impl->loop, impl->cleanup);
} }
static void static void
@ -5081,7 +5117,8 @@ error:
client, client->name, res, spa_strerror(res)); client, client->name, res, spa_strerror(res));
return; return;
} }
client_free(client); client_disconnect(client);
client_unref(client);
} }
static void static void
@ -5099,6 +5136,7 @@ on_connect(void *data, int fd, uint32_t mask)
goto error; goto error;
client->impl = impl; client->impl = impl;
client->ref = 1;
client->server = server; client->server = server;
client->connect_tag = SPA_ID_INVALID; client->connect_tag = SPA_ID_INVALID;
spa_list_append(&server->clients, &client->link); spa_list_append(&server->clients, &client->link);
@ -5106,7 +5144,7 @@ on_connect(void *data, int fd, uint32_t mask)
spa_list_init(&client->out_messages); spa_list_init(&client->out_messages);
spa_list_init(&client->operations); spa_list_init(&client->operations);
spa_list_init(&client->modules); spa_list_init(&client->modules);
spa_list_init(&client->samples); spa_list_init(&client->pending_samples);
client->props = pw_properties_new( client->props = pw_properties_new(
PW_KEY_CLIENT_API, "pipewire-pulse", PW_KEY_CLIENT_API, "pipewire-pulse",
@ -5449,12 +5487,18 @@ static int impl_free_sample(void *item, void *data)
static void impl_free(struct impl *impl) static void impl_free(struct impl *impl)
{ {
struct server *s; struct server *s;
struct client *c;
if (impl->context) if (impl->context)
spa_hook_remove(&impl->context_listener); spa_hook_remove(&impl->context_listener);
spa_list_consume(c, &impl->cleanup_clients, link)
client_free(c);
spa_list_consume(s, &impl->servers, link) spa_list_consume(s, &impl->servers, link)
server_free(s); server_free(s);
pw_map_for_each(&impl->samples, impl_free_sample, impl); pw_map_for_each(&impl->samples, impl_free_sample, impl);
pw_map_clear(&impl->samples); pw_map_clear(&impl->samples);
if (impl->cleanup)
pw_loop_destroy_source(impl->loop, impl->cleanup);
if (impl->props) if (impl->props)
pw_properties_free(impl->props); pw_properties_free(impl->props);
free(impl); free(impl);
@ -5475,6 +5519,16 @@ static const struct pw_context_events context_events = {
.destroy = context_destroy, .destroy = context_destroy,
}; };
static void on_server_cleanup(void *data, uint64_t count)
{
struct impl *impl = data;
struct client *c, *t;
spa_list_for_each_safe(c, t, &impl->cleanup_clients, link) {
if (c->ref <= 0)
client_free(c);
}
}
struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context, struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
struct pw_properties *props, size_t user_data_size) struct pw_properties *props, size_t user_data_size)
{ {
@ -5503,10 +5557,17 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
impl->context = context; impl->context = context;
impl->loop = pw_context_get_main_loop(context); impl->loop = pw_context_get_main_loop(context);
impl->props = props; impl->props = props;
impl->cleanup = pw_loop_add_event(impl->loop,
on_server_cleanup, impl);
if (impl->cleanup == NULL)
goto error_free;
spa_list_init(&impl->servers); spa_list_init(&impl->servers);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1; impl->rate_limit.burst = 1;
pw_map_init(&impl->samples, 16, 16); pw_map_init(&impl->samples, 16, 16);
spa_list_init(&impl->cleanup_clients);
spa_list_init(&impl->free_messages); spa_list_init(&impl->free_messages);
pw_context_add_listener(context, &impl->context_listener, pw_context_add_listener(context, &impl->context_listener,