pulse-server: use a work-queue to schedule destroy

Use the context work queue to schedule destroys from callbacks.
This is better because we can pass the destroyed object around and
implement just the action we need to do on it.
This commit is contained in:
Wim Taymans 2021-04-21 12:55:49 +02:00
parent 5041c44937
commit c70a5de526
2 changed files with 28 additions and 51 deletions

View file

@ -79,7 +79,6 @@ struct client {
const char *name;
struct spa_source *source;
struct spa_source *cleanup;
uint32_t version;
@ -220,7 +219,7 @@ struct impl {
struct spa_source *source;
struct spa_list servers;
struct spa_source *cleanup;
struct pw_work_queue *work_queue;
struct spa_list cleanup_clients;
struct pw_map samples;

View file

@ -122,6 +122,7 @@ static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t
#include "module.c"
#include "message-handler.c"
static void client_free(struct client *client);
static void sample_free(struct sample *sample)
{
@ -1368,6 +1369,15 @@ static void stream_control_info(void *data, uint32_t id,
}
}
static void on_stream_cleanup(void *obj, void *data, int res, uint32_t id)
{
struct stream *stream = obj;
struct client *client = stream->client;
stream_free(stream);
if (client->ref <= 0)
client_free(client);
}
static void stream_state_changed(void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
@ -1390,8 +1400,10 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
case PW_STREAM_STATE_STREAMING:
break;
}
if (stream->done)
pw_loop_signal_event(impl->loop, client->cleanup);
if (stream->done) {
pw_work_queue_add(impl->work_queue, stream, 0,
on_stream_cleanup, client);
}
}
static const struct spa_pod *get_buffers_param(struct stream *s,
@ -2653,6 +2665,15 @@ static void sample_play_ready(void *data, uint32_t index)
send_message(client, reply);
}
static void on_sample_done(void *obj, void *data, int res, uint32_t id)
{
struct pending_sample *ps = obj;
struct client *client = ps->client;
pending_sample_free(ps);
if (client->ref <= 0)
client_free(client);
}
static void sample_play_done(void *data, int res)
{
struct pending_sample *ps = data;
@ -2665,7 +2686,8 @@ static void sample_play_done(void *data, int res)
pw_log_info(NAME" %p: PLAY_SAMPLE done tag:%u", client, ps->tag);
ps->done = true;
pw_loop_signal_event(impl->loop, client->cleanup);
pw_work_queue_add(impl->work_queue, ps, 0,
on_sample_done, client);
}
static const struct sample_play_events sample_play_events = {
@ -5359,8 +5381,6 @@ static void client_free(struct client *client)
pw_properties_free(client->props);
if (client->routes)
pw_properties_free(client->routes);
if (client->cleanup)
pw_loop_destroy_source(impl->loop, client->cleanup);
free(client);
}
@ -5565,30 +5585,6 @@ exit:
return res;
}
static int client_cleanup_stream(void *item, void *data)
{
struct stream *s = item;
if (s->done)
stream_free(s);
return 0;
}
static void on_client_cleanup(void *data, uint64_t count)
{
struct client *client = data;
struct impl *impl = client->impl;
struct pending_sample *p, *t;
spa_list_for_each_safe(p, t, &client->pending_samples, link) {
if (p->done)
pending_sample_free(p);
}
pw_map_for_each(&client->streams, client_cleanup_stream, client);
if (client->ref <= 0)
pw_loop_signal_event(impl->loop, impl->cleanup);
}
static void
on_client_data(void *data, int fd, uint32_t mask)
{
@ -5786,11 +5782,6 @@ on_connect(void *data, int fd, uint32_t mask)
if (client->source == NULL)
goto error;
client->cleanup = pw_loop_add_event(impl->loop,
on_client_cleanup, client);
if (client->cleanup == NULL)
goto error;
return;
error:
pw_log_error(NAME" %p: failed to create client: %m", impl);
@ -6140,8 +6131,6 @@ static void impl_free(struct impl *impl)
pw_map_for_each(&impl->modules, impl_free_module, impl);
pw_map_clear(&impl->modules);
if (impl->cleanup != NULL)
pw_loop_destroy_source(impl->loop, impl->cleanup);
pw_properties_free(impl->props);
free(impl);
}
@ -6161,16 +6150,6 @@ static const struct pw_context_events context_events = {
.destroy = context_destroy,
};
static void on_server_cleanup(void *data, uint64_t count)
{
struct impl *impl = data;
struct client *c, *t;
spa_list_for_each_safe(c, t, &impl->cleanup_clients, link) {
if (c->ref <= 0)
client_free(c);
}
}
static int parse_frac(struct pw_properties *props, const char *key, const char *def,
struct spa_fraction *res)
{
@ -6271,9 +6250,8 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
impl->loop = pw_context_get_main_loop(context);
impl->props = props;
impl->cleanup = pw_loop_add_event(impl->loop,
on_server_cleanup, impl);
if (impl->cleanup == NULL)
impl->work_queue = pw_context_get_work_queue(context);
if (impl->work_queue == NULL)
goto error_free;
spa_list_init(&impl->servers);