From 3d081215f25f5c8f20941cdb578b22a9b3299923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barnab=C3=A1s=20P=C5=91cze?= Date: Thu, 13 Jan 2022 18:38:00 +0100 Subject: [PATCH] pulse-server: improve message queue pruning Firstly, separate the message dropping logic into its own `drop_from_out_queue()` function. Secondly, do not check earlier messages if the NEW event for a particular object has been reached while processing a REMOVE event for that object. Thirdly, if - while processing a REMOVE event - the corresponding NEW event is found and dropped, drop the REMOVE event as well. See #1840 --- src/modules/module-protocol-pulse/client.c | 45 +++++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/src/modules/module-protocol-pulse/client.c b/src/modules/module-protocol-pulse/client.c index 62e1717ea..1f45f4207 100644 --- a/src/modules/module-protocol-pulse/client.c +++ b/src/modules/module-protocol-pulse/client.c @@ -295,17 +295,28 @@ int client_flush_messages(struct client *client) return 0; } +static bool drop_from_out_queue(struct client *client, struct message *m) +{ + spa_assert(!spa_list_is_empty(&client->out_messages)); + + struct message *first = spa_list_first(&client->out_messages, struct message, link); + if (m == first && client->out_index > 0) + return false; + + message_free(client->impl, m, true, false); + + return true; +} + /* returns true if an event with the (mask, event, id) triplet should be dropped because it is redundant */ static bool client_prune_subscribe_events(struct client *client, uint32_t mask, uint32_t event, uint32_t id) { - struct impl *impl = client->impl; - struct message *m, *t, *first; + struct message *m, *t; if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW) return false; - first = spa_list_first(&client->out_messages, struct message, link); - + /* NOTE: reverse iteration */ spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) { if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT) continue; @@ -319,22 +330,36 @@ static bool client_prune_subscribe_events(struct client *client, uint32_t mask, * point in keeping the old events regarding * entry in the queue. */ - /* if the first message has already been partially sent, do not drop it */ - if (m != first || client->out_index == 0) { - message_free(impl, m, true, false); - pw_log_debug("client %p: dropped redundant event due to remove event", client); + bool is_new = (m->extra[1] & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW; + + if (drop_from_out_queue(client, m)) { + pw_log_debug("client %p: dropped redundant event due to remove event for object %u", + client, id); + + /* if the NEW event for the current object could successfully be dropped, + there is no need to deliver the REMOVE event */ + if (is_new) + goto drop; } + + /* stop if the NEW event for the current object is reached */ + if (is_new) + break; } if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) { /* This object has changed. If a "new" or "change" event for * this object is still in the queue we can exit. */ - pw_log_debug("client %p: dropped redundant event due to change event", client); - return true; + goto drop; } } return false; + +drop: + pw_log_debug("client %p: dropped redundant event for object %u", client, id); + + return true; } int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id)