pulse-server: keep global free message pool

This commit is contained in:
Wim Taymans 2021-01-19 11:06:00 +01:00
parent 778bc45558
commit 89641785c3

View file

@ -128,7 +128,6 @@ struct client {
struct message *message; struct message *message;
struct pw_map streams; struct pw_map streams;
struct spa_list free_messages;
struct spa_list out_messages; struct spa_list out_messages;
struct spa_list operations; struct spa_list operations;
@ -231,6 +230,7 @@ struct impl {
struct pw_map samples; struct pw_map samples;
struct spa_list free_messages;
struct stats stat; struct stats stat;
}; };
@ -275,7 +275,7 @@ struct command {
}; };
static const struct command commands[COMMAND_MAX]; static const struct command commands[COMMAND_MAX];
static void message_free(struct client *client, struct message *msg, bool dequeue, bool destroy) static void message_free(struct impl *impl, struct message *msg, bool dequeue, bool destroy)
{ {
if (dequeue) if (dequeue)
spa_list_remove(&msg->link); spa_list_remove(&msg->link);
@ -287,23 +287,23 @@ static void message_free(struct client *client, struct message *msg, bool dequeu
free(msg); free(msg);
} else { } else {
pw_log_trace("recycle message %p", msg); pw_log_trace("recycle message %p", msg);
spa_list_append(&client->free_messages, &msg->link); spa_list_append(&impl->free_messages, &msg->link);
} }
} }
static struct message *message_alloc(struct client *client, uint32_t channel, uint32_t size) static struct message *message_alloc(struct impl *impl, uint32_t channel, uint32_t size)
{ {
struct message *msg = NULL; struct message *msg = NULL;
if (!spa_list_is_empty(&client->free_messages)) { if (!spa_list_is_empty(&impl->free_messages)) {
msg = spa_list_first(&client->free_messages, struct message, link); msg = spa_list_first(&impl->free_messages, struct message, link);
spa_list_remove(&msg->link); spa_list_remove(&msg->link);
pw_log_trace("using recycled message %p", msg); pw_log_trace("using recycled message %p", msg);
} }
if (msg == NULL) { if (msg == NULL) {
msg = calloc(1, sizeof(struct message)); msg = calloc(1, sizeof(struct message));
pw_log_trace("new message %p", msg); pw_log_trace("new message %p", msg);
msg->stat = &client->impl->stat; msg->stat = &impl->stat;
msg->stat->n_allocated++; msg->stat->n_allocated++;
msg->stat->n_accumulated++; msg->stat->n_accumulated++;
} }
@ -319,6 +319,7 @@ static struct message *message_alloc(struct client *client, uint32_t channel, ui
static int flush_messages(struct client *client) static int flush_messages(struct client *client)
{ {
struct impl *impl = client->impl;
int res; int res;
while (true) { while (true) {
@ -347,7 +348,7 @@ static int flush_messages(struct client *client)
} else { } else {
if (debug_messages && m->channel == SPA_ID_INVALID) if (debug_messages && m->channel == SPA_ID_INVALID)
message_dump(SPA_LOG_LEVEL_INFO, m); message_dump(SPA_LOG_LEVEL_INFO, m);
message_free(client, m, true, false); message_free(impl, m, true, false);
client->out_index = 0; client->out_index = 0;
continue; continue;
} }
@ -395,14 +396,15 @@ static int send_message(struct client *client, struct message *m)
} }
return 0; return 0;
error: error:
message_free(client, m, false, false); message_free(impl, m, false, false);
return res; return res;
} }
static struct message *reply_new(struct client *client, uint32_t tag) static struct message *reply_new(struct client *client, uint32_t tag)
{ {
struct impl *impl = client->impl;
struct message *reply; struct message *reply;
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); pw_log_debug(NAME" %p: REPLY tag:%u", client, tag);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_REPLY, TAG_U32, COMMAND_REPLY,
@ -419,6 +421,7 @@ static int reply_simple_ack(struct client *client, uint32_t tag)
static int reply_error(struct client *client, uint32_t command, uint32_t tag, int res) static int reply_error(struct client *client, uint32_t command, uint32_t tag, int res)
{ {
struct impl *impl = client->impl;
struct message *reply; struct message *reply;
uint32_t error = res_to_err(res); uint32_t error = res_to_err(res);
const char *name; const char *name;
@ -432,7 +435,7 @@ static int reply_error(struct client *client, uint32_t command, uint32_t tag, in
NAME" %p: [%s] ERROR command:%d (%s) tag:%u error:%u (%s)", NAME" %p: [%s] ERROR command:%d (%s) tag:%u error:%u (%s)",
client, client->name, command, name, tag, error, spa_strerror(res)); client, client->name, command, name, tag, error, spa_strerror(res));
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_ERROR, TAG_U32, COMMAND_ERROR,
TAG_U32, tag, TAG_U32, tag,
@ -454,7 +457,7 @@ static int send_underflow(struct stream *stream, int64_t offset)
client, client->name, stream->channel, offset); client, client->name, stream->channel, offset);
} }
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_UNDERFLOW, TAG_U32, COMMAND_UNDERFLOW,
TAG_U32, -1, TAG_U32, -1,
@ -470,6 +473,7 @@ static int send_underflow(struct stream *stream, int64_t offset)
static int send_subscribe_event(struct client *client, uint32_t event, uint32_t id) static int send_subscribe_event(struct client *client, uint32_t event, uint32_t id)
{ {
struct impl *impl = client->impl;
struct message *reply, *m, *t; struct message *reply, *m, *t;
pw_log_debug(NAME" %p: SUBSCRIBE event:%08x id:%u", client, event, id); pw_log_debug(NAME" %p: SUBSCRIBE event:%08x id:%u", client, event, id);
@ -487,7 +491,7 @@ static int send_subscribe_event(struct client *client, uint32_t event, uint32_t
/* This object is being removed, hence there is no /* This object is being removed, hence there is no
* point in keeping the old events regarding this * point in keeping the old events regarding this
* entry in the queue. */ * entry in the queue. */
message_free(client, m, true, false); message_free(impl, m, true, false);
pw_log_debug("Dropped redundant event due to remove event."); pw_log_debug("Dropped redundant event due to remove event.");
continue; continue;
} }
@ -500,7 +504,7 @@ static int send_subscribe_event(struct client *client, uint32_t event, uint32_t
} }
} }
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
reply->extra[0] = COMMAND_SUBSCRIBE_EVENT, reply->extra[0] = COMMAND_SUBSCRIBE_EVENT,
reply->extra[1] = event, reply->extra[1] = event,
reply->extra[2] = id, reply->extra[2] = id,
@ -528,12 +532,13 @@ static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t
static int send_overflow(struct stream *stream) static int send_overflow(struct stream *stream)
{ {
struct client *client = stream->client; struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply; struct message *reply;
pw_log_warn(NAME" %p: [%s] OVERFLOW channel:%u", client, pw_log_warn(NAME" %p: [%s] OVERFLOW channel:%u", client,
client->name, stream->channel); client->name, stream->channel);
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_OVERFLOW, TAG_U32, COMMAND_OVERFLOW,
TAG_U32, -1, TAG_U32, -1,
@ -545,6 +550,7 @@ static int send_overflow(struct stream *stream)
static int send_stream_killed(struct stream *stream) static int send_stream_killed(struct stream *stream)
{ {
struct client *client = stream->client; struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply; struct message *reply;
uint32_t command; uint32_t command;
@ -558,7 +564,7 @@ static int send_stream_killed(struct stream *stream)
if (client->version < 23) if (client->version < 23)
return 0; return 0;
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
message_put(reply, message_put(reply,
TAG_U32, command, TAG_U32, command,
TAG_U32, -1, TAG_U32, -1,
@ -570,11 +576,12 @@ static int send_stream_killed(struct stream *stream)
static int send_stream_started(struct stream *stream) static int send_stream_started(struct stream *stream)
{ {
struct client *client = stream->client; struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply; struct message *reply;
pw_log_debug(NAME" %p: STARTED channel:%u", client, stream->channel); pw_log_debug(NAME" %p: STARTED channel:%u", client, stream->channel);
reply = message_alloc(client, -1, 0); reply = message_alloc(impl, -1, 0);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_STARTED, TAG_U32, COMMAND_STARTED,
TAG_U32, -1, TAG_U32, -1,
@ -933,6 +940,7 @@ static uint32_t stream_pop_missing(struct stream *stream)
static int send_command_request(struct stream *stream) static int send_command_request(struct stream *stream)
{ {
struct client *client = stream->client; struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *msg; struct message *msg;
uint32_t size; uint32_t size;
@ -942,7 +950,7 @@ static int send_command_request(struct stream *stream)
if (size == 0) if (size == 0)
return 0; return 0;
msg = message_alloc(client, -1, 0); msg = message_alloc(impl, -1, 0);
message_put(msg, message_put(msg,
TAG_U32, COMMAND_REQUEST, TAG_U32, COMMAND_REQUEST,
TAG_U32, -1, TAG_U32, -1,
@ -1361,6 +1369,7 @@ do_process_done(struct spa_loop *loop,
{ {
struct stream *stream = user_data; struct stream *stream = user_data;
struct client *client = stream->client; struct client *client = stream->client;
struct impl *impl = client->impl;
const struct process_data *pd = data; const struct process_data *pd = data;
uint32_t index; uint32_t index;
int32_t avail; int32_t avail;
@ -1413,7 +1422,7 @@ do_process_done(struct spa_loop *loop,
index = stream->write_index - avail; index = stream->write_index - avail;
} }
msg = message_alloc(client, stream->channel, avail); msg = message_alloc(impl, stream->channel, avail);
if (msg == NULL) if (msg == NULL)
return -errno; return -errno;
@ -3971,7 +3980,7 @@ error_invalid:
goto error; goto error;
error: error:
if (reply) if (reply)
message_free(client, reply, false, false); message_free(impl, reply, false, false);
return res; return res;
} }
@ -4042,7 +4051,7 @@ static int do_get_sample_info(struct client *client, uint32_t command, uint32_t
error: error:
if (reply) if (reply)
message_free(client, reply, false, false); message_free(impl, reply, false, false);
return res; return res;
} }
@ -4759,10 +4768,8 @@ static void client_free(struct client *client)
spa_list_for_each_safe(module, tmp, &client->modules, link) spa_list_for_each_safe(module, tmp, &client->modules, link)
unload_module(module); unload_module(module);
spa_list_consume(msg, &client->free_messages, link)
message_free(client, msg, true, true);
spa_list_consume(msg, &client->out_messages, link) spa_list_consume(msg, &client->out_messages, link)
message_free(client, msg, true, true); message_free(impl, msg, true, false);
if (client->manager) if (client->manager)
pw_manager_destroy(client->manager); pw_manager_destroy(client->manager);
if (client->core) { if (client->core) {
@ -4814,7 +4821,7 @@ static int handle_packet(struct client *client, struct message *msg)
res = commands[command].run(client, command, tag, msg); res = commands[command].run(client, command, tag, msg);
finish: finish:
message_free(client, msg, false, false); message_free(impl, msg, false, false);
if (res < 0) if (res < 0)
reply_error(client, command, tag, res); reply_error(client, command, tag, res);
return 0; return 0;
@ -4883,7 +4890,7 @@ static int handle_memblock(struct client *client, struct message *msg)
spa_ringbuffer_write_update(&stream->ring, stream->write_index); spa_ringbuffer_write_update(&stream->ring, stream->write_index);
stream->requested -= msg->length; stream->requested -= msg->length;
finish: finish:
message_free(client, msg, false, false); message_free(impl, msg, false, false);
return res; return res;
} }
@ -4951,8 +4958,8 @@ static int do_read(struct client *client)
} }
} }
if (client->message) if (client->message)
message_free(client, client->message, false, false); message_free(impl, client->message, false, false);
client->message = message_alloc(client, channel, length); client->message = message_alloc(impl, channel, length);
} else if (client->message && } else if (client->message &&
client->in_index >= client->message->length + sizeof(client->desc)) { client->in_index >= client->message->length + sizeof(client->desc)) {
struct message *msg = client->message; struct message *msg = client->message;
@ -5057,7 +5064,6 @@ on_connect(void *data, int fd, uint32_t mask)
client->connect_tag = SPA_ID_INVALID; client->connect_tag = SPA_ID_INVALID;
spa_list_append(&server->clients, &client->link); spa_list_append(&server->clients, &client->link);
pw_map_init(&client->streams, 16, 16); pw_map_init(&client->streams, 16, 16);
spa_list_init(&client->free_messages);
spa_list_init(&client->out_messages); spa_list_init(&client->out_messages);
spa_list_init(&client->operations); spa_list_init(&client->operations);
spa_list_init(&client->modules); spa_list_init(&client->modules);
@ -5462,6 +5468,7 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1; impl->rate_limit.burst = 1;
pw_map_init(&impl->samples, 16, 16); pw_map_init(&impl->samples, 16, 16);
spa_list_init(&impl->free_messages);
pw_context_add_listener(context, &impl->context_listener, pw_context_add_listener(context, &impl->context_listener,
&context_events, impl); &context_events, impl);
@ -5495,5 +5502,8 @@ void *pw_protocol_pulse_get_user_data(struct pw_protocol_pulse *pulse)
void pw_protocol_pulse_destroy(struct pw_protocol_pulse *pulse) void pw_protocol_pulse_destroy(struct pw_protocol_pulse *pulse)
{ {
struct impl *impl = (struct impl*)pulse; struct impl *impl = (struct impl*)pulse;
struct message *msg;
spa_list_consume(msg, &impl->free_messages, link)
message_free(impl, msg, true, true);
impl_free(impl); impl_free(impl);
} }