protocol-native: rework resume of busy client

If a client becomes unbusy again, signal a resume event so that the
messages are processed in the next mainloop iteration. This gives the
current iteration time to perform cleanups if needed.

Remove the look hook and always do flushing with an IO_OUT event. Rework
some things so that we can flush right after processing input without
having to go through a loop iteration.

See #298
This commit is contained in:
Wim Taymans 2020-11-19 11:57:36 +01:00
parent 505ae98b0f
commit d2c2edb7ce

View file

@ -106,7 +106,7 @@ struct client {
int ref; int ref;
unsigned int disconnecting:1; unsigned int disconnecting:1;
unsigned int flushing:1; unsigned int need_flush:1;
unsigned int paused:1; unsigned int paused:1;
}; };
@ -125,7 +125,7 @@ struct server {
struct pw_loop *loop; struct pw_loop *loop;
struct spa_source *source; struct spa_source *source;
struct spa_hook hook; struct spa_source *resume;
unsigned int activated:1; unsigned int activated:1;
}; };
@ -134,6 +134,7 @@ struct client_data {
struct spa_hook client_listener; struct spa_hook client_listener;
struct spa_list protocol_link; struct spa_list protocol_link;
struct server *server;
struct spa_source *source; struct spa_source *source;
struct pw_protocol_native_connection *connection; struct pw_protocol_native_connection *connection;
@ -257,6 +258,7 @@ static void
client_busy_changed(void *data, bool busy) client_busy_changed(void *data, bool busy)
{ {
struct client_data *c = data; struct client_data *c = data;
struct server *s = c->server;
struct pw_impl_client *client = c->client; struct pw_impl_client *client = c->client;
uint32_t mask = c->source->mask; uint32_t mask = c->source->mask;
@ -268,7 +270,7 @@ client_busy_changed(void *data, bool busy)
pw_loop_update_io(client->context->main_loop, c->source, mask); pw_loop_update_io(client->context->main_loop, c->source, mask);
if (!busy) if (!busy)
process_messages(c); pw_loop_signal_event(s->loop, s->resume);
} }
static void handle_client_error(struct pw_impl_client *client, int res) static void handle_client_error(struct pw_impl_client *client, int res)
@ -296,20 +298,19 @@ connection_data(void *data, int fd, uint32_t mask)
res = -EIO; res = -EIO;
goto error; goto error;
} }
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(this->connection);
if (res >= 0) {
int mask = this->source->mask;
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(client->context->main_loop,
this->source, mask);
} else if (res != -EAGAIN)
goto error;
}
if (mask & SPA_IO_IN) { if (mask & SPA_IO_IN) {
if ((res = process_messages(this)) < 0) if ((res = process_messages(this)) < 0)
goto error; goto error;
} }
if (mask & SPA_IO_OUT || this->need_flush) {
this->need_flush = false;
res = pw_protocol_native_connection_flush(this->connection);
if (res >= 0) {
pw_loop_update_io(client->context->main_loop,
this->source, this->source->mask & ~SPA_IO_OUT);
} else if (res != -EAGAIN)
goto error;
}
return; return;
error: error:
handle_client_error(client, res); handle_client_error(client, res);
@ -365,10 +366,25 @@ static void on_start(void *data, uint32_t version)
return; return;
} }
static void on_server_need_flush(void *data)
{
struct client_data *this = data;
struct pw_impl_client *client = this->client;
pw_log_debug("need flush");
this->need_flush = true;
if (this->source && !(this->source->mask & SPA_IO_OUT)) {
pw_loop_update_io(client->context->main_loop,
this->source, this->source->mask | SPA_IO_OUT);
}
}
static const struct pw_protocol_native_connection_events server_conn_events = { static const struct pw_protocol_native_connection_events server_conn_events = {
PW_VERSION_PROTOCOL_NATIVE_CONNECTION_EVENTS, PW_VERSION_PROTOCOL_NATIVE_CONNECTION_EVENTS,
.destroy = on_server_connection_destroy, .destroy = on_server_connection_destroy,
.start = on_start, .start = on_start,
.need_flush = on_server_need_flush,
}; };
static struct client_data *client_new(struct server *s, int fd) static struct client_data *client_new(struct server *s, int fd)
@ -436,6 +452,7 @@ static struct client_data *client_new(struct server *s, int fd)
this = pw_impl_client_get_user_data(client); this = pw_impl_client_get_user_data(client);
spa_list_append(&s->this.client_list, &this->protocol_link); spa_list_append(&s->this.client_list, &this->protocol_link);
this->server = s;
this->client = client; this->client = client;
this->source = pw_loop_add_io(pw_context_get_main_loop(context), this->source = pw_loop_add_io(pw_context_get_main_loop(context),
fd, SPA_IO_ERR | SPA_IO_HUP, true, fd, SPA_IO_ERR | SPA_IO_HUP, true,
@ -774,22 +791,20 @@ on_remote_data(void *data, int fd, uint32_t mask)
res = -EPIPE; res = -EPIPE;
goto error; goto error;
} }
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(conn);
if (res >= 0) {
int mask = impl->source->mask;
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(loop,
impl->source, mask);
impl->flushing = false;
} else if (res != -EAGAIN)
goto error;
}
if (mask & SPA_IO_IN) { if (mask & SPA_IO_IN) {
if ((res = process_remote(impl)) < 0) if ((res = process_remote(impl)) < 0)
goto error; goto error;
} }
if (mask & SPA_IO_OUT || impl->need_flush) {
impl->need_flush = false;
res = pw_protocol_native_connection_flush(conn);
if (res >= 0) {
pw_loop_update_io(loop, impl->source,
impl->source->mask & ~SPA_IO_OUT);
} else if (res != -EAGAIN)
goto error;
}
done: done:
client_unref(impl); client_unref(impl);
pw_proxy_unref(core_proxy); pw_proxy_unref(core_proxy);
@ -812,23 +827,23 @@ static void on_client_connection_destroy(void *data)
spa_hook_remove(&impl->conn_listener); spa_hook_remove(&impl->conn_listener);
} }
static void on_need_flush(void *data) static void on_client_need_flush(void *data)
{ {
struct client *impl = data; struct client *impl = data;
if (!impl->flushing && impl->source) { pw_log_debug("need flush");
int mask = impl->source->mask; impl->need_flush = true;
impl->flushing = true;
SPA_FLAG_SET(mask, SPA_IO_OUT); if (impl->source && !(impl->source->mask & SPA_IO_OUT)) {
pw_loop_update_io(impl->context->main_loop, pw_loop_update_io(impl->context->main_loop,
impl->source, mask); impl->source, impl->source->mask | SPA_IO_OUT);
} }
} }
static const struct pw_protocol_native_connection_events client_conn_events = { static const struct pw_protocol_native_connection_events client_conn_events = {
PW_VERSION_PROTOCOL_NATIVE_CONNECTION_EVENTS, PW_VERSION_PROTOCOL_NATIVE_CONNECTION_EVENTS,
.destroy = on_client_connection_destroy, .destroy = on_client_connection_destroy,
.need_flush = on_need_flush, .need_flush = on_client_need_flush,
}; };
static int impl_connect_fd(struct pw_protocol_client *client, int fd, bool do_close) static int impl_connect_fd(struct pw_protocol_client *client, int fd, bool do_close)
@ -839,7 +854,6 @@ static int impl_connect_fd(struct pw_protocol_client *client, int fd, bool do_cl
impl->disconnecting = false; impl->disconnecting = false;
pw_protocol_native_connection_set_fd(impl->connection, fd); pw_protocol_native_connection_set_fd(impl->connection, fd);
impl->flushing = true;
impl->source = pw_loop_add_io(impl->context->main_loop, impl->source = pw_loop_add_io(impl->context->main_loop,
fd, fd,
SPA_IO_IN | SPA_IO_OUT | SPA_IO_HUP | SPA_IO_ERR, SPA_IO_IN | SPA_IO_OUT | SPA_IO_HUP | SPA_IO_ERR,
@ -1020,14 +1034,14 @@ static void destroy_server(struct pw_protocol_server *server)
pw_log_debug(NAME" %p: server %p", s->this.protocol, s); pw_log_debug(NAME" %p: server %p", s->this.protocol, s);
spa_list_remove(&server->link); spa_list_remove(&server->link);
spa_hook_remove(&s->hook);
spa_list_for_each_safe(data, tmp, &server->client_list, protocol_link) spa_list_for_each_safe(data, tmp, &server->client_list, protocol_link)
pw_impl_client_destroy(data->client); pw_impl_client_destroy(data->client);
if (s->source) if (s->source)
pw_loop_destroy_source(s->loop, s->source); pw_loop_destroy_source(s->loop, s->source);
if (s->resume)
pw_loop_destroy_source(s->loop, s->resume);
if (s->addr.sun_path[0] && !s->activated) if (s->addr.sun_path[0] && !s->activated)
unlink(s->addr.sun_path); unlink(s->addr.sun_path);
if (s->lock_addr[0]) if (s->lock_addr[0])
@ -1037,29 +1051,23 @@ static void destroy_server(struct pw_protocol_server *server)
free(s); free(s);
} }
static void on_before_hook(void *_data) static void do_resume(void *_data, uint64_t count)
{ {
struct server *server = _data; struct server *server = _data;
struct pw_protocol_server *this = &server->this; struct pw_protocol_server *this = &server->this;
struct client_data *data, *tmp; struct client_data *data, *tmp;
int res; int res;
pw_log_debug("flush");
spa_list_for_each_safe(data, tmp, &this->client_list, protocol_link) { spa_list_for_each_safe(data, tmp, &this->client_list, protocol_link) {
res = pw_protocol_native_connection_flush(data->connection); if ((res = process_messages(data)) < 0)
if (res == -EAGAIN) { goto error;
int mask = data->source->mask; }
SPA_FLAG_SET(mask, SPA_IO_OUT); return;
pw_loop_update_io(data->client->context->main_loop, error:
data->source, mask);
} else if (res < 0)
handle_client_error(data->client, res); handle_client_error(data->client, res);
} }
}
static const struct spa_loop_control_hooks impl_hooks = {
SPA_VERSION_LOOP_CONTROL_HOOKS,
.before = on_before_hook,
};
static const char * static const char *
get_server_name(const struct spa_dict *props) get_server_name(const struct spa_dict *props)
@ -1081,7 +1089,6 @@ create_server(struct pw_protocol *protocol,
const struct spa_dict *props) const struct spa_dict *props)
{ {
struct pw_protocol_server *this; struct pw_protocol_server *this;
struct pw_context *context = protocol->context;
struct server *s; struct server *s;
if ((s = calloc(1, sizeof(struct server))) == NULL) if ((s = calloc(1, sizeof(struct server))) == NULL)
@ -1097,8 +1104,6 @@ create_server(struct pw_protocol *protocol,
spa_list_append(&protocol->server_list, &this->link); spa_list_append(&protocol->server_list, &this->link);
pw_loop_add_hook(pw_context_get_main_loop(context), &s->hook, &impl_hooks, s);
pw_log_debug(NAME" %p: created server %p", protocol, this); pw_log_debug(NAME" %p: created server %p", protocol, this);
return s; return s;
@ -1130,6 +1135,9 @@ impl_add_server(struct pw_protocol *protocol,
if ((res = add_socket(protocol, s)) < 0) if ((res = add_socket(protocol, s)) < 0)
goto error; goto error;
if ((s->resume = pw_loop_add_event(s->loop, do_resume, s)) == NULL)
goto error;
pw_log_info(NAME" %p: Listening on '%s'", protocol, name); pw_log_info(NAME" %p: Listening on '%s'", protocol, name);
return this; return this;