From 2c1245f8ef34e02ee3d66945fb4de64b17e594ed Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 9 Jun 2017 18:22:57 +0200 Subject: [PATCH] client: add busy state When the client is busy, no methods are allowed on it until it becomes not busy again. We can use this to wait until an async operation completes (access checks) and to serialize the requests from the client. --- pipewire/client/context.c | 4 +- pipewire/client/context.h | 2 - pipewire/client/stream.c | 17 +--- pipewire/modules/module-client-node.c | 10 --- pipewire/modules/module-protocol-native.c | 98 +++++++++++++++-------- pipewire/server/client.c | 9 +++ pipewire/server/client.h | 7 ++ pipewire/server/core.c | 5 ++ 8 files changed, 86 insertions(+), 66 deletions(-) diff --git a/pipewire/client/context.c b/pipewire/client/context.c index bd783fd0a..68b364e8d 100644 --- a/pipewire/client/context.c +++ b/pipewire/client/context.c @@ -119,8 +119,7 @@ static void core_event_done(void *object, uint32_t seq) pw_core_do_sync(this->core_proxy, 1); } else if (seq == 1) { context_set_state(this, PW_CONTEXT_STATE_CONNECTED, NULL); - } else - pw_signal_emit(&this->sync_done, this, seq); + } } static void core_event_error(void *object, uint32_t id, int res, const char *error, ...) @@ -468,7 +467,6 @@ struct pw_context *pw_context_new(struct pw_loop *loop, pw_signal_init(&this->state_changed); pw_signal_init(&this->subscription); pw_signal_init(&this->destroy_signal); - pw_signal_init(&this->sync_done); return this; diff --git a/pipewire/client/context.h b/pipewire/client/context.h index 43a8e65fa..33a31ad86 100644 --- a/pipewire/client/context.h +++ b/pipewire/client/context.h @@ -173,8 +173,6 @@ struct pw_context { /** Signal emited when the context is destroyed */ PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_context *context)); - - PW_SIGNAL(sync_done, (struct pw_listener *listener, struct pw_context *context, int seq)); }; struct pw_context * diff --git a/pipewire/client/stream.c b/pipewire/client/stream.c index a4df972e1..b27e04ea2 100644 --- a/pipewire/client/stream.c +++ b/pipewire/client/stream.c @@ -889,20 +889,6 @@ static void on_node_proxy_destroy(struct pw_listener *listener, struct pw_proxy stream_set_state(this, PW_STREAM_STATE_UNCONNECTED, NULL); } -static void on_node_proxy_sync_done(struct pw_listener *listener, struct pw_context *context, int seq) -{ - struct stream *impl = SPA_CONTAINER_OF(listener, struct stream, node_proxy_sync_done); - struct pw_stream *this = &impl->this; - - if (seq != 2) - return; - - pw_log_info("stream %p: sync done %d", this, seq); - do_node_init(this); - - pw_signal_remove(&impl->node_proxy_sync_done); -} - /** Connect a stream for input or output on \a port_path. * \param stream a \ref pw_stream * \param direction the stream direction @@ -962,8 +948,7 @@ pw_stream_connect(struct pw_stream *stream, "client-node", &stream->properties->dict, impl->node_proxy->id); - pw_signal_add(&stream->context->sync_done, - &impl->node_proxy_sync_done, on_node_proxy_sync_done); + do_node_init(stream); return true; } diff --git a/pipewire/modules/module-client-node.c b/pipewire/modules/module-client-node.c index 61efdcf80..2e753734d 100644 --- a/pipewire/modules/module-client-node.c +++ b/pipewire/modules/module-client-node.c @@ -48,16 +48,6 @@ static struct pw_node *create_node(struct pw_node_factory *factory, if (node == NULL) goto no_mem; -#if 0 - if ((res = pw_client_node_get_fds(node, &readfd, &writefd)) < 0) { - pw_core_notify_error(client->core_resource, - client->core_resource->id, SPA_RESULT_ERROR, "can't get data fds"); - return NULL; - } - pw_client_node_notify_done(node->resource, readfd, writefd); -#endif - pw_core_notify_done(client->core_resource, 2); - return node->node; no_mem: diff --git a/pipewire/modules/module-protocol-native.c b/pipewire/modules/module-protocol-native.c index 67fc9eb38..2667e6acb 100644 --- a/pipewire/modules/module-protocol-native.c +++ b/pipewire/modules/module-protocol-native.c @@ -86,6 +86,7 @@ struct native_client { struct spa_source *source; struct pw_connection *connection; struct pw_listener resource_added; + struct pw_listener busy_changed; }; static void client_destroy(struct native_client *this) @@ -99,6 +100,48 @@ static void client_destroy(struct native_client *this) free(this); } +static void +process_messages(struct native_client *client) +{ + struct pw_connection *conn = client->connection; + uint8_t opcode; + uint32_t id; + uint32_t size; + struct pw_client *c = client->client; + void *message; + + while (pw_connection_get_next(conn, &opcode, &id, &message, &size)) { + struct pw_resource *resource; + const demarshal_func_t *demarshal; + + pw_log_trace("protocol-native %p: got message %d from %u", client->impl, + opcode, id); + + resource = pw_map_lookup(&c->objects, id); + if (resource == NULL) { + pw_log_error("protocol-native %p: unknown resource %u", + client->impl, id); + continue; + } + if (opcode >= resource->iface->n_methods) { + pw_log_error("protocol-native %p: invalid method %u %u", client->impl, + id, opcode); + client_destroy(client); + break; + } + demarshal = resource->iface->methods; + if (!demarshal[opcode] || !demarshal[opcode] (resource, message, size)) { + pw_log_error("protocol-native %p: invalid message received %u %u", + client->impl, id, opcode); + client_destroy(client); + break; + } + if (c->busy) { + break; + } + } +} + static void on_resource_added(struct pw_listener *listener, struct pw_client *client, struct pw_resource *resource) @@ -106,6 +149,23 @@ on_resource_added(struct pw_listener *listener, pw_protocol_native_server_setup(resource); } +static void +on_busy_changed(struct pw_listener *listener, + struct pw_client *client) +{ + struct native_client *c = SPA_CONTAINER_OF(listener, struct native_client, busy_changed); + enum spa_io mask = SPA_IO_ERR | SPA_IO_HUP; + + if (!client->busy) + mask |= SPA_IO_IN; + + pw_loop_update_io(c->impl->core->main_loop->loop, c->source, mask); + + if (!client->busy) + process_messages(c); + +} + static void on_before_iterate(struct pw_listener *listener, struct pw_loop *loop) { struct impl *this = SPA_CONTAINER_OF(listener, struct impl, before_iterate); @@ -120,12 +180,6 @@ connection_data(struct spa_loop_utils *utils, struct spa_source *source, int fd, enum spa_io mask, void *data) { struct native_client *client = data; - struct pw_connection *conn = client->connection; - uint8_t opcode; - uint32_t id; - uint32_t size; - struct pw_client *c = client->client; - void *message; if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_error("protocol-native %p: got connection error", client->impl); @@ -133,35 +187,8 @@ connection_data(struct spa_loop_utils *utils, return; } - if (mask & SPA_IO_IN) { - while (pw_connection_get_next(conn, &opcode, &id, &message, &size)) { - struct pw_resource *resource; - const demarshal_func_t *demarshal; - - pw_log_trace("protocol-native %p: got message %d from %u", client->impl, - opcode, id); - - resource = pw_map_lookup(&c->objects, id); - if (resource == NULL) { - pw_log_error("protocol-native %p: unknown resource %u", - client->impl, id); - continue; - } - if (opcode >= resource->iface->n_methods) { - pw_log_error("protocol-native %p: invalid method %u %u", client->impl, - id, opcode); - client_destroy(client); - break; - } - demarshal = resource->iface->methods; - if (!demarshal[opcode] || !demarshal[opcode] (resource, message, size)) { - pw_log_error("protocol-native %p: invalid message received %u %u", - client->impl, id, opcode); - client_destroy(client); - break; - } - } - } + if (mask & SPA_IO_IN) + process_messages(client); } static struct native_client *client_new(struct impl *impl, int fd) @@ -206,6 +233,7 @@ static struct native_client *client_new(struct impl *impl, int fd) spa_list_insert(impl->client_list.prev, &this->link); pw_signal_add(&client->resource_added, &this->resource_added, on_resource_added); + pw_signal_add(&client->busy_changed, &this->busy_changed, on_busy_changed); pw_global_bind(impl->core->global, client, 0, 0); return this; diff --git a/pipewire/server/client.c b/pipewire/server/client.c index 7dee08512..2b2f81aeb 100644 --- a/pipewire/server/client.c +++ b/pipewire/server/client.c @@ -94,6 +94,7 @@ struct pw_client *pw_client_new(struct pw_core *core, pw_signal_init(&this->properties_changed); pw_signal_init(&this->resource_added); pw_signal_init(&this->resource_removed); + pw_signal_init(&this->busy_changed); pw_map_init(&this->objects, 0, 32); pw_map_init(&this->types, 0, 32); @@ -180,3 +181,11 @@ void pw_client_update_properties(struct pw_client *client, const struct spa_dict pw_client_notify_info(resource, &client->info); } } + +void pw_client_set_busy(struct pw_client *client, bool busy) +{ + if (client->busy != busy) { + client->busy = busy; + pw_signal_emit(&client->busy_changed, client); + } +} diff --git a/pipewire/server/client.h b/pipewire/server/client.h index e50a17d8e..e08892616 100644 --- a/pipewire/server/client.h +++ b/pipewire/server/client.h @@ -102,6 +102,11 @@ struct pw_client { PW_SIGNAL(resource_removed, (struct pw_listener *listener, struct pw_client *client, struct pw_resource *resource)); + bool busy; + /** Emited when the client starts/stops an async operation that should + * block/resume all methods for this client */ + PW_SIGNAL(busy_changed, (struct pw_listener *listener, struct pw_client *client)); + /** Emited when the client is destroyed */ PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_client *client)); }; @@ -116,6 +121,8 @@ pw_client_destroy(struct pw_client *client); void pw_client_update_properties(struct pw_client *client, const struct spa_dict *dict); +void pw_client_set_busy(struct pw_client *client, bool busy); + #ifdef __cplusplus } #endif diff --git a/pipewire/server/core.c b/pipewire/server/core.c index 71352a3d8..d2b9571e2 100644 --- a/pipewire/server/core.c +++ b/pipewire/server/core.c @@ -146,6 +146,7 @@ static void core_get_registry(void *object, uint32_t new_id) static void *async_create_node_start(struct pw_access_data *data, size_t size) { struct access_create_node *d; + struct pw_client *client = data->resource->client; d = calloc(1, sizeof(struct access_create_node) + size); memcpy(d, data, sizeof(struct access_create_node)); @@ -153,6 +154,9 @@ static void *async_create_node_start(struct pw_access_data *data, size_t size) d->name = strdup(d->name); d->async = true; d->data.user_data = SPA_MEMBER(d, sizeof(struct access_create_node), void); + + pw_client_set_busy(client, true); + return d; } @@ -202,6 +206,7 @@ static void async_create_node_complete(struct pw_access_data *data, int res) resource->id, SPA_RESULT_NO_PERMISSION, "operation not allowed"); done: async_create_node_free(&d->data); + pw_client_set_busy(client, false); return; }