diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c index 829fc6c06..ee80cf12b 100644 --- a/src/modules/module-protocol-native.c +++ b/src/modules/module-protocol-native.c @@ -106,7 +106,7 @@ struct client { int ref; unsigned int disconnecting:1; - unsigned int flushing:1; + unsigned int need_flush:1; unsigned int paused:1; }; @@ -125,7 +125,7 @@ struct server { struct pw_loop *loop; struct spa_source *source; - struct spa_hook hook; + struct spa_source *resume; unsigned int activated:1; }; @@ -134,6 +134,7 @@ struct client_data { struct spa_hook client_listener; struct spa_list protocol_link; + struct server *server; struct spa_source *source; struct pw_protocol_native_connection *connection; @@ -257,6 +258,7 @@ static void client_busy_changed(void *data, bool busy) { struct client_data *c = data; + struct server *s = c->server; struct pw_impl_client *client = c->client; uint32_t mask = c->source->mask; @@ -268,7 +270,7 @@ client_busy_changed(void *data, bool busy) pw_loop_update_io(client->context->main_loop, c->source, mask); if (!busy) - process_messages(c); + pw_loop_signal_event(s->loop, s->resume); } static void handle_client_error(struct pw_impl_client *client, int res) @@ -296,20 +298,19 @@ connection_data(void *data, int fd, uint32_t mask) res = -EIO; goto error; } - if (mask & SPA_IO_OUT) { - res = pw_protocol_native_connection_flush(this->connection); - if (res >= 0) { - int mask = this->source->mask; - SPA_FLAG_CLEAR(mask, SPA_IO_OUT); - pw_loop_update_io(client->context->main_loop, - this->source, mask); - } else if (res != -EAGAIN) - goto error; - } if (mask & SPA_IO_IN) { if ((res = process_messages(this)) < 0) goto error; } + if (mask & SPA_IO_OUT || this->need_flush) { + this->need_flush = false; + res = pw_protocol_native_connection_flush(this->connection); + if (res >= 0) { + pw_loop_update_io(client->context->main_loop, + this->source, this->source->mask & ~SPA_IO_OUT); + } else if (res != -EAGAIN) + goto error; + } return; error: handle_client_error(client, res); @@ -365,10 +366,25 @@ static void on_start(void *data, uint32_t version) return; } +static void on_server_need_flush(void *data) +{ + struct client_data *this = data; + struct pw_impl_client *client = this->client; + + pw_log_debug("need flush"); + this->need_flush = true; + + if (this->source && !(this->source->mask & SPA_IO_OUT)) { + pw_loop_update_io(client->context->main_loop, + this->source, this->source->mask | SPA_IO_OUT); + } +} + static const struct pw_protocol_native_connection_events server_conn_events = { PW_VERSION_PROTOCOL_NATIVE_CONNECTION_EVENTS, .destroy = on_server_connection_destroy, .start = on_start, + .need_flush = on_server_need_flush, }; static struct client_data *client_new(struct server *s, int fd) @@ -436,6 +452,7 @@ static struct client_data *client_new(struct server *s, int fd) this = pw_impl_client_get_user_data(client); spa_list_append(&s->this.client_list, &this->protocol_link); + this->server = s; this->client = client; this->source = pw_loop_add_io(pw_context_get_main_loop(context), fd, SPA_IO_ERR | SPA_IO_HUP, true, @@ -774,22 +791,20 @@ on_remote_data(void *data, int fd, uint32_t mask) res = -EPIPE; goto error; } - if (mask & SPA_IO_OUT) { - res = pw_protocol_native_connection_flush(conn); - if (res >= 0) { - int mask = impl->source->mask; - SPA_FLAG_CLEAR(mask, SPA_IO_OUT); - pw_loop_update_io(loop, - impl->source, mask); - impl->flushing = false; - } else if (res != -EAGAIN) - goto error; - } - if (mask & SPA_IO_IN) { if ((res = process_remote(impl)) < 0) goto error; } + if (mask & SPA_IO_OUT || impl->need_flush) { + impl->need_flush = false; + res = pw_protocol_native_connection_flush(conn); + if (res >= 0) { + pw_loop_update_io(loop, impl->source, + impl->source->mask & ~SPA_IO_OUT); + } else if (res != -EAGAIN) + goto error; + } + done: client_unref(impl); pw_proxy_unref(core_proxy); @@ -812,23 +827,23 @@ static void on_client_connection_destroy(void *data) spa_hook_remove(&impl->conn_listener); } -static void on_need_flush(void *data) +static void on_client_need_flush(void *data) { struct client *impl = data; - if (!impl->flushing && impl->source) { - int mask = impl->source->mask; - impl->flushing = true; - SPA_FLAG_SET(mask, SPA_IO_OUT); + pw_log_debug("need flush"); + impl->need_flush = true; + + if (impl->source && !(impl->source->mask & SPA_IO_OUT)) { pw_loop_update_io(impl->context->main_loop, - impl->source, mask); + impl->source, impl->source->mask | SPA_IO_OUT); } } static const struct pw_protocol_native_connection_events client_conn_events = { PW_VERSION_PROTOCOL_NATIVE_CONNECTION_EVENTS, .destroy = on_client_connection_destroy, - .need_flush = on_need_flush, + .need_flush = on_client_need_flush, }; static int impl_connect_fd(struct pw_protocol_client *client, int fd, bool do_close) @@ -839,7 +854,6 @@ static int impl_connect_fd(struct pw_protocol_client *client, int fd, bool do_cl impl->disconnecting = false; pw_protocol_native_connection_set_fd(impl->connection, fd); - impl->flushing = true; impl->source = pw_loop_add_io(impl->context->main_loop, fd, SPA_IO_IN | SPA_IO_OUT | SPA_IO_HUP | SPA_IO_ERR, @@ -1020,14 +1034,14 @@ static void destroy_server(struct pw_protocol_server *server) pw_log_debug(NAME" %p: server %p", s->this.protocol, s); spa_list_remove(&server->link); - spa_hook_remove(&s->hook); spa_list_for_each_safe(data, tmp, &server->client_list, protocol_link) pw_impl_client_destroy(data->client); if (s->source) pw_loop_destroy_source(s->loop, s->source); - + if (s->resume) + pw_loop_destroy_source(s->loop, s->resume); if (s->addr.sun_path[0] && !s->activated) unlink(s->addr.sun_path); if (s->lock_addr[0]) @@ -1037,29 +1051,23 @@ static void destroy_server(struct pw_protocol_server *server) free(s); } -static void on_before_hook(void *_data) +static void do_resume(void *_data, uint64_t count) { struct server *server = _data; struct pw_protocol_server *this = &server->this; struct client_data *data, *tmp; int res; - spa_list_for_each_safe(data, tmp, &this->client_list, protocol_link) { - res = pw_protocol_native_connection_flush(data->connection); - if (res == -EAGAIN) { - int mask = data->source->mask; - SPA_FLAG_SET(mask, SPA_IO_OUT); - pw_loop_update_io(data->client->context->main_loop, - data->source, mask); - } else if (res < 0) - handle_client_error(data->client, res); - } -} + pw_log_debug("flush"); -static const struct spa_loop_control_hooks impl_hooks = { - SPA_VERSION_LOOP_CONTROL_HOOKS, - .before = on_before_hook, -}; + spa_list_for_each_safe(data, tmp, &this->client_list, protocol_link) { + if ((res = process_messages(data)) < 0) + goto error; + } + return; +error: + handle_client_error(data->client, res); +} static const char * get_server_name(const struct spa_dict *props) @@ -1081,7 +1089,6 @@ create_server(struct pw_protocol *protocol, const struct spa_dict *props) { struct pw_protocol_server *this; - struct pw_context *context = protocol->context; struct server *s; if ((s = calloc(1, sizeof(struct server))) == NULL) @@ -1097,8 +1104,6 @@ create_server(struct pw_protocol *protocol, spa_list_append(&protocol->server_list, &this->link); - pw_loop_add_hook(pw_context_get_main_loop(context), &s->hook, &impl_hooks, s); - pw_log_debug(NAME" %p: created server %p", protocol, this); return s; @@ -1130,6 +1135,9 @@ impl_add_server(struct pw_protocol *protocol, if ((res = add_socket(protocol, s)) < 0) goto error; + if ((s->resume = pw_loop_add_event(s->loop, do_resume, s)) == NULL) + goto error; + pw_log_info(NAME" %p: Listening on '%s'", protocol, name); return this;