pulse-server: client: restructure message handling

Move all I/O event source modifications into client.c.
This commit is contained in:
Barnabás Pőcze 2021-11-11 12:36:43 +01:00 committed by Wim Taymans
parent 5ef9deae83
commit 4678ea06a0
3 changed files with 80 additions and 60 deletions

View file

@ -177,7 +177,7 @@ void client_free(struct client *client)
int client_queue_message(struct client *client, struct message *msg) int client_queue_message(struct client *client, struct message *msg)
{ {
struct impl *impl = client->impl; struct impl *impl = client->impl;
int res, mask; int res;
if (msg == NULL) if (msg == NULL)
return -EINVAL; return -EINVAL;
@ -198,13 +198,14 @@ int client_queue_message(struct client *client, struct message *msg)
msg->offset = 0; msg->offset = 0;
spa_list_append(&client->out_messages, &msg->link); spa_list_append(&client->out_messages, &msg->link);
mask = client->source->mask; uint32_t mask = client->source->mask;
if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) { if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
client->need_flush = true;
SPA_FLAG_SET(mask, SPA_IO_OUT); SPA_FLAG_SET(mask, SPA_IO_OUT);
pw_loop_update_io(impl->loop, client->source, mask); pw_loop_update_io(impl->loop, client->source, mask);
} }
client->new_msg_since_last_flush = true;
return 0; return 0;
error: error:
@ -212,24 +213,20 @@ error:
return res; return res;
} }
int client_flush_messages(struct client *client) static int client_try_flush_messages(struct client *client)
{ {
struct impl *impl = client->impl; struct impl *impl = client->impl;
int res;
pw_log_trace("client %p: flushing", client);
spa_assert(!client->disconnect); spa_assert(!client->disconnect);
while (true) { while (!spa_list_is_empty(&client->out_messages)) {
struct message *m; struct message *m = spa_list_first(&client->out_messages, struct message, link);
struct descriptor desc; struct descriptor desc;
void *data; const void *data;
size_t size; size_t size;
if (spa_list_is_empty(&client->out_messages))
break;
m = spa_list_first(&client->out_messages, struct message, link);
if (client->out_index < sizeof(desc)) { if (client->out_index < sizeof(desc)) {
desc.length = htonl(m->length); desc.length = htonl(m->length);
desc.channel = htonl(m->channel); desc.channel = htonl(m->channel);
@ -252,18 +249,18 @@ int client_flush_messages(struct client *client)
} }
while (true) { while (true) {
res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); ssize_t sent = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT);
if (res < 0) { if (sent < 0) {
res = -errno; int res = -errno;
if (res == -EINTR) if (res == -EINTR)
continue; continue;
if (res != -EAGAIN && res != -EWOULDBLOCK) if (res != -EAGAIN && res != -EWOULDBLOCK)
pw_log_warn("client %p: send channel:%d %zu, error %d: %m", pw_log_warn("client %p: send channel:%u %zu, error %d: %m",
client, m->channel, size, res); client, m->channel, size, res);
return res; return res;
} }
client->out_index += res; client->out_index += sent;
break; break;
} }
} }
@ -271,20 +268,35 @@ int client_flush_messages(struct client *client)
return 0; return 0;
} }
int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id) int client_flush_messages(struct client *client)
{
client->new_msg_since_last_flush = false;
int res = client_try_flush_messages(client);
if (res >= 0) {
uint32_t mask = client->source->mask;
if (SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(client->impl->loop, client->source, mask);
}
} else {
if (res != -EAGAIN && res != -EWOULDBLOCK)
return res;
}
return 0;
}
/* returns true if an event with the (mask, event, id) triplet should be dropped because it is redundant */
static bool client_prune_subscribe_events(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
{ {
struct impl *impl = client->impl; struct impl *impl = client->impl;
struct message *reply, *m, *t; struct message *m, *t;
if (client->disconnect) if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW)
return -ENOTCONN; return false;
if (!(client->subscribed & mask))
return 0;
pw_log_debug("client %p: SUBSCRIBE event:%08x id:%u", client, event, id);
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) != SUBSCRIPTION_EVENT_NEW) {
spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) { spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT) if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT)
continue; continue;
@ -306,15 +318,30 @@ int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t
/* This object has changed. If a "new" or "change" event for /* This object has changed. If a "new" or "change" event for
* this object is still in the queue we can exit. */ * this object is still in the queue we can exit. */
pw_log_debug("client %p: dropped redundant event due to change event", client); pw_log_debug("client %p: dropped redundant event due to change event", client);
return 0; return true;
}
} }
} }
reply = message_alloc(impl, -1, 0); return false;
reply->extra[0] = COMMAND_SUBSCRIBE_EVENT, }
reply->extra[1] = event,
reply->extra[2] = id, int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
{
if (client->disconnect)
return -ENOTCONN;
if (!(client->subscribed & mask))
return 0;
pw_log_debug("client %p: SUBSCRIBE event:%08x id:%u", client, event, id);
if (client_prune_subscribe_events(client, mask, event, id))
return 0;
struct message *reply = message_alloc(client->impl, -1, 0);
reply->extra[0] = COMMAND_SUBSCRIBE_EVENT;
reply->extra[1] = event;
reply->extra[2] = id;
message_put(reply, message_put(reply,
TAG_U32, COMMAND_SUBSCRIBE_EVENT, TAG_U32, COMMAND_SUBSCRIBE_EVENT,

View file

@ -96,7 +96,7 @@ struct client {
unsigned int disconnect:1; unsigned int disconnect:1;
unsigned int disconnecting:1; unsigned int disconnecting:1;
unsigned int need_flush:1; unsigned int new_msg_since_last_flush:1;
struct pw_manager_object *prev_default_sink; struct pw_manager_object *prev_default_sink;
struct pw_manager_object *prev_default_source; struct pw_manager_object *prev_default_source;

View file

@ -275,7 +275,6 @@ static void
on_client_data(void *data, int fd, uint32_t mask) on_client_data(void *data, int fd, uint32_t mask)
{ {
struct client * const client = data; struct client * const client = data;
struct impl * const impl = client->impl;
int res; int res;
client->ref++; client->ref++;
@ -302,15 +301,9 @@ on_client_data(void *data, int fd, uint32_t mask)
} }
} }
if (mask & SPA_IO_OUT || client->need_flush) { if (mask & SPA_IO_OUT || client->new_msg_since_last_flush) {
pw_log_trace("client %p: can write", client);
client->need_flush = false;
res = client_flush_messages(client); res = client_flush_messages(client);
if (res >= 0) { if (res < 0)
int m = client->source->mask;
SPA_FLAG_CLEAR(m, SPA_IO_OUT);
pw_loop_update_io(impl->loop, client->source, m);
} else if (res != -EAGAIN && res != -EWOULDBLOCK)
goto error; goto error;
} }