protocol: improve flushing

Use the IO_OUT flag to schedule flushing instead of a flush_event.

Handle EGAIN and wait for IO_OUT to try again.

Fixes #111
This commit is contained in:
Wim Taymans 2019-10-01 12:53:56 +02:00
parent 3d48ba8394
commit cc8e992cd1
2 changed files with 62 additions and 39 deletions

View file

@ -82,9 +82,8 @@ struct client {
struct pw_protocol_native_connection *connection;
struct spa_hook conn_listener;
struct spa_source *flush_event;
unsigned int disconnecting:1;
unsigned int flush_signaled:1;
unsigned int flushing:1;
};
struct server {
@ -106,6 +105,7 @@ struct client_data {
struct spa_source *source;
struct pw_protocol_native_connection *connection;
unsigned int busy:1;
unsigned int need_flush:1;
};
static void
@ -200,12 +200,14 @@ client_busy_changed(void *data, bool busy)
{
struct client_data *c = data;
struct pw_client *client = c->client;
uint32_t mask = SPA_IO_ERR | SPA_IO_HUP;
uint32_t mask = c->source->mask;
c->busy = busy;
if (!busy)
mask |= SPA_IO_IN;
if (busy)
SPA_FLAG_UNSET(mask, SPA_IO_IN);
else
SPA_FLAG_SET(mask, SPA_IO_IN);
pw_log_debug(NAME" %p: busy changed %d", client->protocol, busy);
pw_loop_update_io(client->core->main_loop, c->source, mask);
@ -220,13 +222,32 @@ connection_data(void *data, int fd, uint32_t mask)
{
struct client_data *this = data;
struct pw_client *client = this->client;
int res;
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
if (mask & SPA_IO_HUP) {
pw_log_info(NAME" %p: client %p disconnected", client->protocol, client);
pw_client_destroy(client);
return;
}
if (mask & SPA_IO_ERR) {
pw_log_error(NAME" %p: client %p error", client->protocol, client);
pw_client_destroy(client);
return;
}
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(this->connection);
if (res >= 0) {
int mask = this->source->mask;
SPA_FLAG_UNSET(mask, SPA_IO_OUT);
pw_loop_update_io(client->protocol->core->main_loop,
this->source, mask);
} else if (res != EAGAIN) {
pw_log_error("client %p: could not flush: %s",
client, spa_strerror(res));
pw_client_destroy(client);
return;
}
}
if (mask & SPA_IO_IN)
process_messages(this);
}
@ -296,7 +317,8 @@ static struct pw_client *client_new(struct server *s, int fd)
this->client = client;
this->source = pw_loop_add_io(pw_core_get_main_loop(core),
fd, SPA_IO_ERR | SPA_IO_HUP, true, connection_data, this);
fd, SPA_IO_ERR | SPA_IO_HUP, true,
connection_data, this);
if (this->source == NULL)
goto cleanup_client;
@ -408,7 +430,7 @@ socket_data(void *data, int fd, uint32_t mask)
if (!client->busy)
pw_loop_update_io(client->protocol->core->main_loop,
c->source, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
c->source, c->source->mask | SPA_IO_IN);
}
static int add_socket(struct pw_protocol *protocol, struct server *s)
@ -514,6 +536,17 @@ on_remote_data(void *data, int fd, uint32_t mask)
res = -EPIPE;
goto error;
}
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(conn);
if (res >= 0) {
int mask = impl->source->mask;
SPA_FLAG_UNSET(mask, SPA_IO_OUT);
pw_loop_update_io(core->main_loop,
impl->source, mask);
impl->flushing = false;
} else if (res != EAGAIN)
goto error;
}
if (mask & SPA_IO_IN) {
const struct pw_protocol_native_message *msg;
@ -588,23 +621,17 @@ error:
}
static void do_flush_event(void *data, uint64_t count)
{
struct client *impl = data;
impl->flush_signaled = false;
if (impl->connection)
if (pw_protocol_native_connection_flush(impl->connection) < 0)
impl->this.disconnect(&impl->this);
}
static void on_need_flush(void *data)
{
struct client *impl = data;
struct pw_remote *remote = impl->this.remote;
if (!impl->flush_signaled) {
impl->flush_signaled = true;
pw_loop_signal_event(remote->core->main_loop, impl->flush_event);
if (!impl->flushing) {
int mask = impl->source->mask;
impl->flushing = true;
SPA_FLAG_SET(mask, SPA_IO_OUT);
pw_loop_update_io(remote->core->main_loop,
impl->source, mask);
}
}
@ -669,12 +696,9 @@ static void impl_disconnect(struct pw_protocol_client *client)
static void impl_destroy(struct pw_protocol_client *client)
{
struct client *impl = SPA_CONTAINER_OF(client, struct client, this);
struct pw_remote *remote = client->remote;
impl_disconnect(client);
pw_loop_destroy_source(remote->core->main_loop, impl->flush_event);
spa_list_remove(&client->link);
free(impl);
}
@ -687,7 +711,6 @@ impl_new_client(struct pw_protocol *protocol,
struct client *impl;
struct pw_protocol_client *this;
const char *str = NULL;
int res;
if ((impl = calloc(1, sizeof(struct client))) == NULL)
return NULL;
@ -711,20 +734,9 @@ impl_new_client(struct pw_protocol *protocol,
this->disconnect = impl_disconnect;
this->destroy = impl_destroy;
impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl);
if (impl->flush_event == NULL) {
res = -errno;
goto error_cleanup;
}
spa_list_append(&protocol->client_list, &this->link);
return this;
error_cleanup:
free(impl);
errno = -res;
return NULL;
}
static void destroy_server(struct pw_protocol_server *server)
@ -757,10 +769,23 @@ static void on_before_hook(void *_data)
struct pw_protocol_server *this = &server->this;
struct pw_client *client, *tmp;
struct client_data *data;
int res;
spa_list_for_each_safe(client, tmp, &this->client_list, protocol_link) {
data = client->user_data;
pw_protocol_native_connection_flush(data->connection);
res = pw_protocol_native_connection_flush(data->connection);
if (res == -EAGAIN) {
int mask = data->source->mask;
SPA_FLAG_SET(mask, SPA_IO_OUT);
pw_loop_update_io(client->protocol->core->main_loop,
data->source, mask);
} else if (res < 0) {
pw_log_warn("client %p: could not flush: %s",
data->client, spa_strerror(res));
pw_client_destroy(client);
}
}
}

View file

@ -502,8 +502,6 @@ int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *co
continue;
else {
res = -errno;
pw_log_error("could not sendmsg on fd:%d n_fds:%d: %s",
conn->fd, n_fds, spa_strerror(res));
goto exit;
}
}