diff --git a/src/modules/module-protocol-pulse/client.c b/src/modules/module-protocol-pulse/client.c index e4be4f1b8..0b74cad32 100644 --- a/src/modules/module-protocol-pulse/client.c +++ b/src/modules/module-protocol-pulse/client.c @@ -177,7 +177,7 @@ void client_free(struct client *client) int client_queue_message(struct client *client, struct message *msg) { struct impl *impl = client->impl; - int res, mask; + int res; if (msg == NULL) return -EINVAL; @@ -198,13 +198,14 @@ int client_queue_message(struct client *client, struct message *msg) msg->offset = 0; spa_list_append(&client->out_messages, &msg->link); - mask = client->source->mask; + uint32_t mask = client->source->mask; if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) { - client->need_flush = true; SPA_FLAG_SET(mask, SPA_IO_OUT); pw_loop_update_io(impl->loop, client->source, mask); } + client->new_msg_since_last_flush = true; + return 0; error: @@ -212,24 +213,20 @@ error: return res; } -int client_flush_messages(struct client *client) +static int client_try_flush_messages(struct client *client) { struct impl *impl = client->impl; - int res; + + pw_log_trace("client %p: flushing", client); spa_assert(!client->disconnect); - while (true) { - struct message *m; + while (!spa_list_is_empty(&client->out_messages)) { + struct message *m = spa_list_first(&client->out_messages, struct message, link); struct descriptor desc; - void *data; + const void *data; size_t size; - if (spa_list_is_empty(&client->out_messages)) - break; - - m = spa_list_first(&client->out_messages, struct message, link); - if (client->out_index < sizeof(desc)) { desc.length = htonl(m->length); desc.channel = htonl(m->channel); @@ -252,18 +249,18 @@ int client_flush_messages(struct client *client) } while (true) { - res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); - if (res < 0) { - res = -errno; + ssize_t sent = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); + if (sent < 0) { + int res = -errno; if (res == -EINTR) continue; if (res != -EAGAIN && res != -EWOULDBLOCK) - pw_log_warn("client %p: send channel:%d %zu, error %d: %m", + pw_log_warn("client %p: send channel:%u %zu, error %d: %m", client, m->channel, size, res); return res; } - client->out_index += res; + client->out_index += sent; break; } } @@ -271,11 +268,65 @@ int client_flush_messages(struct client *client) return 0; } -int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id) +int client_flush_messages(struct client *client) +{ + client->new_msg_since_last_flush = false; + + int res = client_try_flush_messages(client); + if (res >= 0) { + uint32_t mask = client->source->mask; + + if (SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) { + SPA_FLAG_CLEAR(mask, SPA_IO_OUT); + pw_loop_update_io(client->impl->loop, client->source, mask); + } + } else { + if (res != -EAGAIN && res != -EWOULDBLOCK) + return res; + } + + return 0; +} + +/* returns true if an event with the (mask, event, id) triplet should be dropped because it is redundant */ +static bool client_prune_subscribe_events(struct client *client, uint32_t mask, uint32_t event, uint32_t id) { struct impl *impl = client->impl; - struct message *reply, *m, *t; + struct message *m, *t; + if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW) + return false; + + spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) { + if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT) + continue; + if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK) + continue; + if (m->extra[2] != id) + continue; + + if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) { + /* This object is being removed, hence there is + * point in keeping the old events regarding + * entry in the queue. */ + message_free(impl, m, true, false); + pw_log_debug("client %p: dropped redundant event due to remove event", client); + continue; + } + + if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) { + /* This object has changed. If a "new" or "change" event for + * this object is still in the queue we can exit. */ + pw_log_debug("client %p: dropped redundant event due to change event", client); + return true; + } + } + + return false; +} + +int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id) +{ if (client->disconnect) return -ENOTCONN; @@ -284,37 +335,13 @@ int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t pw_log_debug("client %p: SUBSCRIBE event:%08x id:%u", client, event, id); - if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) != SUBSCRIPTION_EVENT_NEW) { - spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) { - if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT) - continue; - if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK) - continue; - if (m->extra[2] != id) - continue; + if (client_prune_subscribe_events(client, mask, event, id)) + return 0; - if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) { - /* This object is being removed, hence there is - * point in keeping the old events regarding - * entry in the queue. */ - message_free(impl, m, true, false); - pw_log_debug("client %p: dropped redundant event due to remove event", client); - continue; - } - - if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) { - /* This object has changed. If a "new" or "change" event for - * this object is still in the queue we can exit. */ - pw_log_debug("client %p: dropped redundant event due to change event", client); - return 0; - } - } - } - - reply = message_alloc(impl, -1, 0); - reply->extra[0] = COMMAND_SUBSCRIBE_EVENT, - reply->extra[1] = event, - reply->extra[2] = id, + struct message *reply = message_alloc(client->impl, -1, 0); + reply->extra[0] = COMMAND_SUBSCRIBE_EVENT; + reply->extra[1] = event; + reply->extra[2] = id; message_put(reply, TAG_U32, COMMAND_SUBSCRIBE_EVENT, diff --git a/src/modules/module-protocol-pulse/client.h b/src/modules/module-protocol-pulse/client.h index ef16747d1..1dbf1fa97 100644 --- a/src/modules/module-protocol-pulse/client.h +++ b/src/modules/module-protocol-pulse/client.h @@ -96,7 +96,7 @@ struct client { unsigned int disconnect:1; unsigned int disconnecting:1; - unsigned int need_flush:1; + unsigned int new_msg_since_last_flush:1; struct pw_manager_object *prev_default_sink; struct pw_manager_object *prev_default_source; diff --git a/src/modules/module-protocol-pulse/server.c b/src/modules/module-protocol-pulse/server.c index d3e14d2b5..7d5c1f428 100644 --- a/src/modules/module-protocol-pulse/server.c +++ b/src/modules/module-protocol-pulse/server.c @@ -275,7 +275,6 @@ static void on_client_data(void *data, int fd, uint32_t mask) { struct client * const client = data; - struct impl * const impl = client->impl; int res; client->ref++; @@ -302,15 +301,9 @@ on_client_data(void *data, int fd, uint32_t mask) } } - if (mask & SPA_IO_OUT || client->need_flush) { - pw_log_trace("client %p: can write", client); - client->need_flush = false; + if (mask & SPA_IO_OUT || client->new_msg_since_last_flush) { res = client_flush_messages(client); - if (res >= 0) { - int m = client->source->mask; - SPA_FLAG_CLEAR(m, SPA_IO_OUT); - pw_loop_update_io(impl->loop, client->source, m); - } else if (res != -EAGAIN && res != -EWOULDBLOCK) + if (res < 0) goto error; }