diff --git a/pipewire/client/context.c b/pipewire/client/context.c index bd783fd0a..68b364e8d 100644 --- a/pipewire/client/context.c +++ b/pipewire/client/context.c @@ -119,8 +119,7 @@ static void core_event_done(void *object, uint32_t seq) pw_core_do_sync(this->core_proxy, 1); } else if (seq == 1) { context_set_state(this, PW_CONTEXT_STATE_CONNECTED, NULL); - } else - pw_signal_emit(&this->sync_done, this, seq); + } } static void core_event_error(void *object, uint32_t id, int res, const char *error, ...) @@ -468,7 +467,6 @@ struct pw_context *pw_context_new(struct pw_loop *loop, pw_signal_init(&this->state_changed); pw_signal_init(&this->subscription); pw_signal_init(&this->destroy_signal); - pw_signal_init(&this->sync_done); return this; diff --git a/pipewire/client/context.h b/pipewire/client/context.h index 43a8e65fa..33a31ad86 100644 --- a/pipewire/client/context.h +++ b/pipewire/client/context.h @@ -173,8 +173,6 @@ struct pw_context { /** Signal emited when the context is destroyed */ PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_context *context)); - - PW_SIGNAL(sync_done, (struct pw_listener *listener, struct pw_context *context, int seq)); }; struct pw_context * diff --git a/pipewire/client/stream.c b/pipewire/client/stream.c index a4df972e1..b27e04ea2 100644 --- a/pipewire/client/stream.c +++ b/pipewire/client/stream.c @@ -889,20 +889,6 @@ static void on_node_proxy_destroy(struct pw_listener *listener, struct pw_proxy stream_set_state(this, PW_STREAM_STATE_UNCONNECTED, NULL); } -static void on_node_proxy_sync_done(struct pw_listener *listener, struct pw_context *context, int seq) -{ - struct stream *impl = SPA_CONTAINER_OF(listener, struct stream, node_proxy_sync_done); - struct pw_stream *this = &impl->this; - - if (seq != 2) - return; - - pw_log_info("stream %p: sync done %d", this, seq); - do_node_init(this); - - pw_signal_remove(&impl->node_proxy_sync_done); -} - /** Connect a stream for input or output on \a port_path. * \param stream a \ref pw_stream * \param direction the stream direction @@ -962,8 +948,7 @@ pw_stream_connect(struct pw_stream *stream, "client-node", &stream->properties->dict, impl->node_proxy->id); - pw_signal_add(&stream->context->sync_done, - &impl->node_proxy_sync_done, on_node_proxy_sync_done); + do_node_init(stream); return true; } diff --git a/pipewire/modules/module-client-node.c b/pipewire/modules/module-client-node.c index 61efdcf80..2e753734d 100644 --- a/pipewire/modules/module-client-node.c +++ b/pipewire/modules/module-client-node.c @@ -48,16 +48,6 @@ static struct pw_node *create_node(struct pw_node_factory *factory, if (node == NULL) goto no_mem; -#if 0 - if ((res = pw_client_node_get_fds(node, &readfd, &writefd)) < 0) { - pw_core_notify_error(client->core_resource, - client->core_resource->id, SPA_RESULT_ERROR, "can't get data fds"); - return NULL; - } - pw_client_node_notify_done(node->resource, readfd, writefd); -#endif - pw_core_notify_done(client->core_resource, 2); - return node->node; no_mem: diff --git a/pipewire/modules/module-protocol-native.c b/pipewire/modules/module-protocol-native.c index 67fc9eb38..2667e6acb 100644 --- a/pipewire/modules/module-protocol-native.c +++ b/pipewire/modules/module-protocol-native.c @@ -86,6 +86,7 @@ struct native_client { struct spa_source *source; struct pw_connection *connection; struct pw_listener resource_added; + struct pw_listener busy_changed; }; static void client_destroy(struct native_client *this) @@ -99,6 +100,48 @@ static void client_destroy(struct native_client *this) free(this); } +static void +process_messages(struct native_client *client) +{ + struct pw_connection *conn = client->connection; + uint8_t opcode; + uint32_t id; + uint32_t size; + struct pw_client *c = client->client; + void *message; + + while (pw_connection_get_next(conn, &opcode, &id, &message, &size)) { + struct pw_resource *resource; + const demarshal_func_t *demarshal; + + pw_log_trace("protocol-native %p: got message %d from %u", client->impl, + opcode, id); + + resource = pw_map_lookup(&c->objects, id); + if (resource == NULL) { + pw_log_error("protocol-native %p: unknown resource %u", + client->impl, id); + continue; + } + if (opcode >= resource->iface->n_methods) { + pw_log_error("protocol-native %p: invalid method %u %u", client->impl, + id, opcode); + client_destroy(client); + break; + } + demarshal = resource->iface->methods; + if (!demarshal[opcode] || !demarshal[opcode] (resource, message, size)) { + pw_log_error("protocol-native %p: invalid message received %u %u", + client->impl, id, opcode); + client_destroy(client); + break; + } + if (c->busy) { + break; + } + } +} + static void on_resource_added(struct pw_listener *listener, struct pw_client *client, struct pw_resource *resource) @@ -106,6 +149,23 @@ on_resource_added(struct pw_listener *listener, pw_protocol_native_server_setup(resource); } +static void +on_busy_changed(struct pw_listener *listener, + struct pw_client *client) +{ + struct native_client *c = SPA_CONTAINER_OF(listener, struct native_client, busy_changed); + enum spa_io mask = SPA_IO_ERR | SPA_IO_HUP; + + if (!client->busy) + mask |= SPA_IO_IN; + + pw_loop_update_io(c->impl->core->main_loop->loop, c->source, mask); + + if (!client->busy) + process_messages(c); + +} + static void on_before_iterate(struct pw_listener *listener, struct pw_loop *loop) { struct impl *this = SPA_CONTAINER_OF(listener, struct impl, before_iterate); @@ -120,12 +180,6 @@ connection_data(struct spa_loop_utils *utils, struct spa_source *source, int fd, enum spa_io mask, void *data) { struct native_client *client = data; - struct pw_connection *conn = client->connection; - uint8_t opcode; - uint32_t id; - uint32_t size; - struct pw_client *c = client->client; - void *message; if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_error("protocol-native %p: got connection error", client->impl); @@ -133,35 +187,8 @@ connection_data(struct spa_loop_utils *utils, return; } - if (mask & SPA_IO_IN) { - while (pw_connection_get_next(conn, &opcode, &id, &message, &size)) { - struct pw_resource *resource; - const demarshal_func_t *demarshal; - - pw_log_trace("protocol-native %p: got message %d from %u", client->impl, - opcode, id); - - resource = pw_map_lookup(&c->objects, id); - if (resource == NULL) { - pw_log_error("protocol-native %p: unknown resource %u", - client->impl, id); - continue; - } - if (opcode >= resource->iface->n_methods) { - pw_log_error("protocol-native %p: invalid method %u %u", client->impl, - id, opcode); - client_destroy(client); - break; - } - demarshal = resource->iface->methods; - if (!demarshal[opcode] || !demarshal[opcode] (resource, message, size)) { - pw_log_error("protocol-native %p: invalid message received %u %u", - client->impl, id, opcode); - client_destroy(client); - break; - } - } - } + if (mask & SPA_IO_IN) + process_messages(client); } static struct native_client *client_new(struct impl *impl, int fd) @@ -206,6 +233,7 @@ static struct native_client *client_new(struct impl *impl, int fd) spa_list_insert(impl->client_list.prev, &this->link); pw_signal_add(&client->resource_added, &this->resource_added, on_resource_added); + pw_signal_add(&client->busy_changed, &this->busy_changed, on_busy_changed); pw_global_bind(impl->core->global, client, 0, 0); return this; diff --git a/pipewire/server/client.c b/pipewire/server/client.c index 7dee08512..2b2f81aeb 100644 --- a/pipewire/server/client.c +++ b/pipewire/server/client.c @@ -94,6 +94,7 @@ struct pw_client *pw_client_new(struct pw_core *core, pw_signal_init(&this->properties_changed); pw_signal_init(&this->resource_added); pw_signal_init(&this->resource_removed); + pw_signal_init(&this->busy_changed); pw_map_init(&this->objects, 0, 32); pw_map_init(&this->types, 0, 32); @@ -180,3 +181,11 @@ void pw_client_update_properties(struct pw_client *client, const struct spa_dict pw_client_notify_info(resource, &client->info); } } + +void pw_client_set_busy(struct pw_client *client, bool busy) +{ + if (client->busy != busy) { + client->busy = busy; + pw_signal_emit(&client->busy_changed, client); + } +} diff --git a/pipewire/server/client.h b/pipewire/server/client.h index e50a17d8e..e08892616 100644 --- a/pipewire/server/client.h +++ b/pipewire/server/client.h @@ -102,6 +102,11 @@ struct pw_client { PW_SIGNAL(resource_removed, (struct pw_listener *listener, struct pw_client *client, struct pw_resource *resource)); + bool busy; + /** Emited when the client starts/stops an async operation that should + * block/resume all methods for this client */ + PW_SIGNAL(busy_changed, (struct pw_listener *listener, struct pw_client *client)); + /** Emited when the client is destroyed */ PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_client *client)); }; @@ -116,6 +121,8 @@ pw_client_destroy(struct pw_client *client); void pw_client_update_properties(struct pw_client *client, const struct spa_dict *dict); +void pw_client_set_busy(struct pw_client *client, bool busy); + #ifdef __cplusplus } #endif diff --git a/pipewire/server/core.c b/pipewire/server/core.c index 71352a3d8..d2b9571e2 100644 --- a/pipewire/server/core.c +++ b/pipewire/server/core.c @@ -146,6 +146,7 @@ static void core_get_registry(void *object, uint32_t new_id) static void *async_create_node_start(struct pw_access_data *data, size_t size) { struct access_create_node *d; + struct pw_client *client = data->resource->client; d = calloc(1, sizeof(struct access_create_node) + size); memcpy(d, data, sizeof(struct access_create_node)); @@ -153,6 +154,9 @@ static void *async_create_node_start(struct pw_access_data *data, size_t size) d->name = strdup(d->name); d->async = true; d->data.user_data = SPA_MEMBER(d, sizeof(struct access_create_node), void); + + pw_client_set_busy(client, true); + return d; } @@ -202,6 +206,7 @@ static void async_create_node_complete(struct pw_access_data *data, int res) resource->id, SPA_RESULT_NO_PERMISSION, "operation not allowed"); done: async_create_node_free(&d->data); + pw_client_set_busy(client, false); return; }