diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 52e516fa7..3c9322af2 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -128,7 +128,6 @@ struct client { struct message *message; struct pw_map streams; - struct spa_list free_messages; struct spa_list out_messages; struct spa_list operations; @@ -231,6 +230,7 @@ struct impl { struct pw_map samples; + struct spa_list free_messages; struct stats stat; }; @@ -275,7 +275,7 @@ struct command { }; static const struct command commands[COMMAND_MAX]; -static void message_free(struct client *client, struct message *msg, bool dequeue, bool destroy) +static void message_free(struct impl *impl, struct message *msg, bool dequeue, bool destroy) { if (dequeue) spa_list_remove(&msg->link); @@ -287,23 +287,23 @@ static void message_free(struct client *client, struct message *msg, bool dequeu free(msg); } else { pw_log_trace("recycle message %p", msg); - spa_list_append(&client->free_messages, &msg->link); + spa_list_append(&impl->free_messages, &msg->link); } } -static struct message *message_alloc(struct client *client, uint32_t channel, uint32_t size) +static struct message *message_alloc(struct impl *impl, uint32_t channel, uint32_t size) { struct message *msg = NULL; - if (!spa_list_is_empty(&client->free_messages)) { - msg = spa_list_first(&client->free_messages, struct message, link); + if (!spa_list_is_empty(&impl->free_messages)) { + msg = spa_list_first(&impl->free_messages, struct message, link); spa_list_remove(&msg->link); pw_log_trace("using recycled message %p", msg); } if (msg == NULL) { msg = calloc(1, sizeof(struct message)); pw_log_trace("new message %p", msg); - msg->stat = &client->impl->stat; + msg->stat = &impl->stat; msg->stat->n_allocated++; msg->stat->n_accumulated++; } @@ -319,6 +319,7 @@ static struct message *message_alloc(struct client *client, uint32_t channel, ui static int flush_messages(struct client *client) { + struct impl *impl = client->impl; int res; while (true) { @@ -347,7 +348,7 @@ static int flush_messages(struct client *client) } else { if (debug_messages && m->channel == SPA_ID_INVALID) message_dump(SPA_LOG_LEVEL_INFO, m); - message_free(client, m, true, false); + message_free(impl, m, true, false); client->out_index = 0; continue; } @@ -395,14 +396,15 @@ static int send_message(struct client *client, struct message *m) } return 0; error: - message_free(client, m, false, false); + message_free(impl, m, false, false); return res; } static struct message *reply_new(struct client *client, uint32_t tag) { + struct impl *impl = client->impl; struct message *reply; - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); message_put(reply, TAG_U32, COMMAND_REPLY, @@ -419,6 +421,7 @@ static int reply_simple_ack(struct client *client, uint32_t tag) static int reply_error(struct client *client, uint32_t command, uint32_t tag, int res) { + struct impl *impl = client->impl; struct message *reply; uint32_t error = res_to_err(res); const char *name; @@ -432,7 +435,7 @@ static int reply_error(struct client *client, uint32_t command, uint32_t tag, in NAME" %p: [%s] ERROR command:%d (%s) tag:%u error:%u (%s)", client, client->name, command, name, tag, error, spa_strerror(res)); - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_ERROR, TAG_U32, tag, @@ -454,7 +457,7 @@ static int send_underflow(struct stream *stream, int64_t offset) client, client->name, stream->channel, offset); } - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_UNDERFLOW, TAG_U32, -1, @@ -470,6 +473,7 @@ static int send_underflow(struct stream *stream, int64_t offset) static int send_subscribe_event(struct client *client, uint32_t event, uint32_t id) { + struct impl *impl = client->impl; struct message *reply, *m, *t; pw_log_debug(NAME" %p: SUBSCRIBE event:%08x id:%u", client, event, id); @@ -487,7 +491,7 @@ static int send_subscribe_event(struct client *client, uint32_t event, uint32_t /* This object is being removed, hence there is no * point in keeping the old events regarding this * entry in the queue. */ - message_free(client, m, true, false); + message_free(impl, m, true, false); pw_log_debug("Dropped redundant event due to remove event."); continue; } @@ -500,7 +504,7 @@ static int send_subscribe_event(struct client *client, uint32_t event, uint32_t } } - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); reply->extra[0] = COMMAND_SUBSCRIBE_EVENT, reply->extra[1] = event, reply->extra[2] = id, @@ -528,12 +532,13 @@ static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t static int send_overflow(struct stream *stream) { struct client *client = stream->client; + struct impl *impl = client->impl; struct message *reply; pw_log_warn(NAME" %p: [%s] OVERFLOW channel:%u", client, client->name, stream->channel); - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_OVERFLOW, TAG_U32, -1, @@ -545,6 +550,7 @@ static int send_overflow(struct stream *stream) static int send_stream_killed(struct stream *stream) { struct client *client = stream->client; + struct impl *impl = client->impl; struct message *reply; uint32_t command; @@ -558,7 +564,7 @@ static int send_stream_killed(struct stream *stream) if (client->version < 23) return 0; - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, command, TAG_U32, -1, @@ -570,11 +576,12 @@ static int send_stream_killed(struct stream *stream) static int send_stream_started(struct stream *stream) { struct client *client = stream->client; + struct impl *impl = client->impl; struct message *reply; pw_log_debug(NAME" %p: STARTED channel:%u", client, stream->channel); - reply = message_alloc(client, -1, 0); + reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_STARTED, TAG_U32, -1, @@ -933,6 +940,7 @@ static uint32_t stream_pop_missing(struct stream *stream) static int send_command_request(struct stream *stream) { struct client *client = stream->client; + struct impl *impl = client->impl; struct message *msg; uint32_t size; @@ -942,7 +950,7 @@ static int send_command_request(struct stream *stream) if (size == 0) return 0; - msg = message_alloc(client, -1, 0); + msg = message_alloc(impl, -1, 0); message_put(msg, TAG_U32, COMMAND_REQUEST, TAG_U32, -1, @@ -1361,6 +1369,7 @@ do_process_done(struct spa_loop *loop, { struct stream *stream = user_data; struct client *client = stream->client; + struct impl *impl = client->impl; const struct process_data *pd = data; uint32_t index; int32_t avail; @@ -1413,7 +1422,7 @@ do_process_done(struct spa_loop *loop, index = stream->write_index - avail; } - msg = message_alloc(client, stream->channel, avail); + msg = message_alloc(impl, stream->channel, avail); if (msg == NULL) return -errno; @@ -3971,7 +3980,7 @@ error_invalid: goto error; error: if (reply) - message_free(client, reply, false, false); + message_free(impl, reply, false, false); return res; } @@ -4042,7 +4051,7 @@ static int do_get_sample_info(struct client *client, uint32_t command, uint32_t error: if (reply) - message_free(client, reply, false, false); + message_free(impl, reply, false, false); return res; } @@ -4759,10 +4768,8 @@ static void client_free(struct client *client) spa_list_for_each_safe(module, tmp, &client->modules, link) unload_module(module); - spa_list_consume(msg, &client->free_messages, link) - message_free(client, msg, true, true); spa_list_consume(msg, &client->out_messages, link) - message_free(client, msg, true, true); + message_free(impl, msg, true, false); if (client->manager) pw_manager_destroy(client->manager); if (client->core) { @@ -4814,7 +4821,7 @@ static int handle_packet(struct client *client, struct message *msg) res = commands[command].run(client, command, tag, msg); finish: - message_free(client, msg, false, false); + message_free(impl, msg, false, false); if (res < 0) reply_error(client, command, tag, res); return 0; @@ -4883,7 +4890,7 @@ static int handle_memblock(struct client *client, struct message *msg) spa_ringbuffer_write_update(&stream->ring, stream->write_index); stream->requested -= msg->length; finish: - message_free(client, msg, false, false); + message_free(impl, msg, false, false); return res; } @@ -4951,8 +4958,8 @@ static int do_read(struct client *client) } } if (client->message) - message_free(client, client->message, false, false); - client->message = message_alloc(client, channel, length); + message_free(impl, client->message, false, false); + client->message = message_alloc(impl, channel, length); } else if (client->message && client->in_index >= client->message->length + sizeof(client->desc)) { struct message *msg = client->message; @@ -5057,7 +5064,6 @@ on_connect(void *data, int fd, uint32_t mask) client->connect_tag = SPA_ID_INVALID; spa_list_append(&server->clients, &client->link); pw_map_init(&client->streams, 16, 16); - spa_list_init(&client->free_messages); spa_list_init(&client->out_messages); spa_list_init(&client->operations); spa_list_init(&client->modules); @@ -5462,6 +5468,7 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context, impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; impl->rate_limit.burst = 1; pw_map_init(&impl->samples, 16, 16); + spa_list_init(&impl->free_messages); pw_context_add_listener(context, &impl->context_listener, &context_events, impl); @@ -5495,5 +5502,8 @@ void *pw_protocol_pulse_get_user_data(struct pw_protocol_pulse *pulse) void pw_protocol_pulse_destroy(struct pw_protocol_pulse *pulse) { struct impl *impl = (struct impl*)pulse; + struct message *msg; + spa_list_consume(msg, &impl->free_messages, link) + message_free(impl, msg, true, true); impl_free(impl); }