client: add busy state

When the client is busy, no methods are allowed on it until it
becomes not busy again. We can use this to wait until an async operation
completes (access checks) and to serialize the requests from the client.
This commit is contained in:
Wim Taymans 2017-06-09 18:22:57 +02:00
parent b5e60ad02a
commit 2c1245f8ef
8 changed files with 86 additions and 66 deletions

View file

@ -119,8 +119,7 @@ static void core_event_done(void *object, uint32_t seq)
pw_core_do_sync(this->core_proxy, 1);
} else if (seq == 1) {
context_set_state(this, PW_CONTEXT_STATE_CONNECTED, NULL);
} else
pw_signal_emit(&this->sync_done, this, seq);
}
}
static void core_event_error(void *object, uint32_t id, int res, const char *error, ...)
@ -468,7 +467,6 @@ struct pw_context *pw_context_new(struct pw_loop *loop,
pw_signal_init(&this->state_changed);
pw_signal_init(&this->subscription);
pw_signal_init(&this->destroy_signal);
pw_signal_init(&this->sync_done);
return this;

View file

@ -173,8 +173,6 @@ struct pw_context {
/** Signal emited when the context is destroyed */
PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_context *context));
PW_SIGNAL(sync_done, (struct pw_listener *listener, struct pw_context *context, int seq));
};
struct pw_context *

View file

@ -889,20 +889,6 @@ static void on_node_proxy_destroy(struct pw_listener *listener, struct pw_proxy
stream_set_state(this, PW_STREAM_STATE_UNCONNECTED, NULL);
}
static void on_node_proxy_sync_done(struct pw_listener *listener, struct pw_context *context, int seq)
{
struct stream *impl = SPA_CONTAINER_OF(listener, struct stream, node_proxy_sync_done);
struct pw_stream *this = &impl->this;
if (seq != 2)
return;
pw_log_info("stream %p: sync done %d", this, seq);
do_node_init(this);
pw_signal_remove(&impl->node_proxy_sync_done);
}
/** Connect a stream for input or output on \a port_path.
* \param stream a \ref pw_stream
* \param direction the stream direction
@ -962,8 +948,7 @@ pw_stream_connect(struct pw_stream *stream,
"client-node",
&stream->properties->dict, impl->node_proxy->id);
pw_signal_add(&stream->context->sync_done,
&impl->node_proxy_sync_done, on_node_proxy_sync_done);
do_node_init(stream);
return true;
}

View file

@ -48,16 +48,6 @@ static struct pw_node *create_node(struct pw_node_factory *factory,
if (node == NULL)
goto no_mem;
#if 0
if ((res = pw_client_node_get_fds(node, &readfd, &writefd)) < 0) {
pw_core_notify_error(client->core_resource,
client->core_resource->id, SPA_RESULT_ERROR, "can't get data fds");
return NULL;
}
pw_client_node_notify_done(node->resource, readfd, writefd);
#endif
pw_core_notify_done(client->core_resource, 2);
return node->node;
no_mem:

View file

@ -86,6 +86,7 @@ struct native_client {
struct spa_source *source;
struct pw_connection *connection;
struct pw_listener resource_added;
struct pw_listener busy_changed;
};
static void client_destroy(struct native_client *this)
@ -99,6 +100,48 @@ static void client_destroy(struct native_client *this)
free(this);
}
static void
process_messages(struct native_client *client)
{
struct pw_connection *conn = client->connection;
uint8_t opcode;
uint32_t id;
uint32_t size;
struct pw_client *c = client->client;
void *message;
while (pw_connection_get_next(conn, &opcode, &id, &message, &size)) {
struct pw_resource *resource;
const demarshal_func_t *demarshal;
pw_log_trace("protocol-native %p: got message %d from %u", client->impl,
opcode, id);
resource = pw_map_lookup(&c->objects, id);
if (resource == NULL) {
pw_log_error("protocol-native %p: unknown resource %u",
client->impl, id);
continue;
}
if (opcode >= resource->iface->n_methods) {
pw_log_error("protocol-native %p: invalid method %u %u", client->impl,
id, opcode);
client_destroy(client);
break;
}
demarshal = resource->iface->methods;
if (!demarshal[opcode] || !demarshal[opcode] (resource, message, size)) {
pw_log_error("protocol-native %p: invalid message received %u %u",
client->impl, id, opcode);
client_destroy(client);
break;
}
if (c->busy) {
break;
}
}
}
static void
on_resource_added(struct pw_listener *listener,
struct pw_client *client, struct pw_resource *resource)
@ -106,6 +149,23 @@ on_resource_added(struct pw_listener *listener,
pw_protocol_native_server_setup(resource);
}
static void
on_busy_changed(struct pw_listener *listener,
struct pw_client *client)
{
struct native_client *c = SPA_CONTAINER_OF(listener, struct native_client, busy_changed);
enum spa_io mask = SPA_IO_ERR | SPA_IO_HUP;
if (!client->busy)
mask |= SPA_IO_IN;
pw_loop_update_io(c->impl->core->main_loop->loop, c->source, mask);
if (!client->busy)
process_messages(c);
}
static void on_before_iterate(struct pw_listener *listener, struct pw_loop *loop)
{
struct impl *this = SPA_CONTAINER_OF(listener, struct impl, before_iterate);
@ -120,12 +180,6 @@ connection_data(struct spa_loop_utils *utils,
struct spa_source *source, int fd, enum spa_io mask, void *data)
{
struct native_client *client = data;
struct pw_connection *conn = client->connection;
uint8_t opcode;
uint32_t id;
uint32_t size;
struct pw_client *c = client->client;
void *message;
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pw_log_error("protocol-native %p: got connection error", client->impl);
@ -133,35 +187,8 @@ connection_data(struct spa_loop_utils *utils,
return;
}
if (mask & SPA_IO_IN) {
while (pw_connection_get_next(conn, &opcode, &id, &message, &size)) {
struct pw_resource *resource;
const demarshal_func_t *demarshal;
pw_log_trace("protocol-native %p: got message %d from %u", client->impl,
opcode, id);
resource = pw_map_lookup(&c->objects, id);
if (resource == NULL) {
pw_log_error("protocol-native %p: unknown resource %u",
client->impl, id);
continue;
}
if (opcode >= resource->iface->n_methods) {
pw_log_error("protocol-native %p: invalid method %u %u", client->impl,
id, opcode);
client_destroy(client);
break;
}
demarshal = resource->iface->methods;
if (!demarshal[opcode] || !demarshal[opcode] (resource, message, size)) {
pw_log_error("protocol-native %p: invalid message received %u %u",
client->impl, id, opcode);
client_destroy(client);
break;
}
}
}
if (mask & SPA_IO_IN)
process_messages(client);
}
static struct native_client *client_new(struct impl *impl, int fd)
@ -206,6 +233,7 @@ static struct native_client *client_new(struct impl *impl, int fd)
spa_list_insert(impl->client_list.prev, &this->link);
pw_signal_add(&client->resource_added, &this->resource_added, on_resource_added);
pw_signal_add(&client->busy_changed, &this->busy_changed, on_busy_changed);
pw_global_bind(impl->core->global, client, 0, 0);
return this;

View file

@ -94,6 +94,7 @@ struct pw_client *pw_client_new(struct pw_core *core,
pw_signal_init(&this->properties_changed);
pw_signal_init(&this->resource_added);
pw_signal_init(&this->resource_removed);
pw_signal_init(&this->busy_changed);
pw_map_init(&this->objects, 0, 32);
pw_map_init(&this->types, 0, 32);
@ -180,3 +181,11 @@ void pw_client_update_properties(struct pw_client *client, const struct spa_dict
pw_client_notify_info(resource, &client->info);
}
}
void pw_client_set_busy(struct pw_client *client, bool busy)
{
if (client->busy != busy) {
client->busy = busy;
pw_signal_emit(&client->busy_changed, client);
}
}

View file

@ -102,6 +102,11 @@ struct pw_client {
PW_SIGNAL(resource_removed, (struct pw_listener *listener,
struct pw_client *client, struct pw_resource *resource));
bool busy;
/** Emited when the client starts/stops an async operation that should
* block/resume all methods for this client */
PW_SIGNAL(busy_changed, (struct pw_listener *listener, struct pw_client *client));
/** Emited when the client is destroyed */
PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_client *client));
};
@ -116,6 +121,8 @@ pw_client_destroy(struct pw_client *client);
void
pw_client_update_properties(struct pw_client *client, const struct spa_dict *dict);
void pw_client_set_busy(struct pw_client *client, bool busy);
#ifdef __cplusplus
}
#endif

View file

@ -146,6 +146,7 @@ static void core_get_registry(void *object, uint32_t new_id)
static void *async_create_node_start(struct pw_access_data *data, size_t size)
{
struct access_create_node *d;
struct pw_client *client = data->resource->client;
d = calloc(1, sizeof(struct access_create_node) + size);
memcpy(d, data, sizeof(struct access_create_node));
@ -153,6 +154,9 @@ static void *async_create_node_start(struct pw_access_data *data, size_t size)
d->name = strdup(d->name);
d->async = true;
d->data.user_data = SPA_MEMBER(d, sizeof(struct access_create_node), void);
pw_client_set_busy(client, true);
return d;
}
@ -202,6 +206,7 @@ static void async_create_node_complete(struct pw_access_data *data, int res)
resource->id, SPA_RESULT_NO_PERMISSION, "operation not allowed");
done:
async_create_node_free(&d->data);
pw_client_set_busy(client, false);
return;
}