pulse-server: improve message queue pruning

Firstly, separate the message dropping logic into
its own `drop_from_out_queue()` function.

Secondly, do not check earlier messages if the NEW
event for a particular object has been reached while
processing a REMOVE event for that object.

Thirdly, if - while processing a REMOVE event -
the corresponding NEW event is found and dropped,
drop the REMOVE event as well.

See #1840
This commit is contained in:
Barnabás Pőcze 2022-01-13 18:38:00 +01:00 committed by Wim Taymans
parent 8f9bd42d93
commit 3d081215f2

View file

@ -295,17 +295,28 @@ int client_flush_messages(struct client *client)
return 0;
}
static bool drop_from_out_queue(struct client *client, struct message *m)
{
spa_assert(!spa_list_is_empty(&client->out_messages));
struct message *first = spa_list_first(&client->out_messages, struct message, link);
if (m == first && client->out_index > 0)
return false;
message_free(client->impl, m, true, false);
return true;
}
/* returns true if an event with the (mask, event, id) triplet should be dropped because it is redundant */
static bool client_prune_subscribe_events(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
{
struct impl *impl = client->impl;
struct message *m, *t, *first;
struct message *m, *t;
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW)
return false;
first = spa_list_first(&client->out_messages, struct message, link);
/* NOTE: reverse iteration */
spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT)
continue;
@ -319,22 +330,36 @@ static bool client_prune_subscribe_events(struct client *client, uint32_t mask,
* point in keeping the old events regarding
* entry in the queue. */
/* if the first message has already been partially sent, do not drop it */
if (m != first || client->out_index == 0) {
message_free(impl, m, true, false);
pw_log_debug("client %p: dropped redundant event due to remove event", client);
bool is_new = (m->extra[1] & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW;
if (drop_from_out_queue(client, m)) {
pw_log_debug("client %p: dropped redundant event due to remove event for object %u",
client, id);
/* if the NEW event for the current object could successfully be dropped,
there is no need to deliver the REMOVE event */
if (is_new)
goto drop;
}
/* stop if the NEW event for the current object is reached */
if (is_new)
break;
}
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) {
/* This object has changed. If a "new" or "change" event for
* this object is still in the queue we can exit. */
pw_log_debug("client %p: dropped redundant event due to change event", client);
return true;
goto drop;
}
}
return false;
drop:
pw_log_debug("client %p: dropped redundant event for object %u", client, id);
return true;
}
int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id)