core: implement pw_core_set_paused

Make it possible to pause emision of events from the core object.
This is interesting if we need to wait for completion of some operations
on another connection before resuming processing.

See #204
This commit is contained in:
Wim Taymans 2020-01-14 16:37:01 +01:00
parent 6e9d3bdb36
commit 0ea8a92ea5
4 changed files with 133 additions and 86 deletions

View file

@ -90,6 +90,7 @@ struct client {
unsigned int disconnecting:1;
unsigned int flushing:1;
unsigned int paused:1;
};
struct server {
@ -121,7 +122,7 @@ struct client_data {
struct protocol_compat_v2 compat_v2;
};
static void
static int
process_messages(struct client_data *data)
{
struct pw_protocol_native_connection *conn = data->connection;
@ -175,8 +176,10 @@ process_messages(struct client_data *data)
}
marshal = pw_resource_get_marshal(resource);
if (marshal == NULL || msg->opcode >= marshal->n_client_methods)
if (marshal == NULL || msg->opcode >= marshal->n_client_methods) {
res = -EINVAL;
goto invalid_method;
}
demarshal = marshal->server_demarshal;
if (!demarshal[msg->opcode].func) {
@ -198,16 +201,16 @@ process_messages(struct client_data *data)
if ((res = demarshal[msg->opcode].func(resource, msg)) < 0)
goto invalid_message;
}
res = 0;
done:
context->current_client = NULL;
return;
return res;
invalid_method:
pw_log_error(NAME" %p: invalid method id:%u op:%u",
client->protocol, msg->id, msg->opcode);
pw_resource_errorf(resource, -EINVAL, "invalid method id:%u op:%u",
pw_resource_errorf(resource, res, "invalid method id:%u op:%u",
msg->id, msg->opcode);
pw_impl_client_destroy(client);
goto done;
invalid_message:
pw_log_error(NAME" %p: invalid message received id:%u op:%u (%s)",
@ -215,11 +218,9 @@ invalid_message:
pw_resource_errorf(resource, res, "invalid message received id:%u op:%u (%s)",
msg->id, msg->opcode, spa_strerror(res));
spa_debug_pod(0, NULL, (struct spa_pod *)msg->data);
pw_impl_client_destroy(client);
goto done;
error:
pw_log_error(NAME" %p: client error (%s)", client->protocol, spa_strerror(res));
pw_impl_client_destroy(client);
goto done;
}
@ -239,7 +240,6 @@ client_busy_changed(void *data, bool busy)
if (!busy)
process_messages(c);
}
static void
@ -250,14 +250,12 @@ connection_data(void *data, int fd, uint32_t mask)
int res;
if (mask & SPA_IO_HUP) {
pw_log_info(NAME" %p: client %p disconnected", client->protocol, client);
pw_impl_client_destroy(client);
return;
res = -EPIPE;
goto error;
}
if (mask & SPA_IO_ERR) {
pw_log_error(NAME" %p: client %p error", client->protocol, client);
pw_impl_client_destroy(client);
return;
res = -EIO;
goto error;
}
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(this->connection);
@ -266,15 +264,21 @@ connection_data(void *data, int fd, uint32_t mask)
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(client->context->main_loop,
this->source, mask);
} else if (res != EAGAIN) {
pw_log_error("client %p: could not flush: %s",
client, spa_strerror(res));
pw_impl_client_destroy(client);
} else if (res != EAGAIN)
goto error;
}
if (mask & SPA_IO_IN) {
if ((res = process_messages(this)) < 0)
goto error;
}
return;
}
}
if (mask & SPA_IO_IN)
process_messages(this);
error:
if (res == -EPIPE)
pw_log_info(NAME" %p: client %p disconnected", client->protocol, client);
else
pw_log_error(NAME" %p: client %p error %d (%s)", client->protocol,
client, res, spa_strerror(res));
pw_impl_client_destroy(client);
}
static void client_free(void *data)
@ -588,36 +592,15 @@ static int impl_steal_fd(struct pw_protocol_client *client)
return fd;
}
static void
on_remote_data(void *data, int fd, uint32_t mask)
static int
process_remote(struct client *impl)
{
struct client *impl = data;
struct pw_core *this = impl->this.core;
struct pw_protocol_native_connection *conn = impl->connection;
struct pw_context *context = pw_core_get_context(this);
struct pw_loop *loop = pw_context_get_main_loop(context);
int res;
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
res = -EPIPE;
goto error;
}
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(conn);
if (res >= 0) {
int mask = impl->source->mask;
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(loop,
impl->source, mask);
impl->flushing = false;
} else if (res != EAGAIN)
goto error;
}
if (mask & SPA_IO_IN) {
const struct pw_protocol_native_message *msg;
struct pw_protocol_native_connection *conn = impl->connection;
struct pw_core *this = impl->this.core;
int res = 0;
while (!impl->disconnecting) {
while (!impl->disconnecting && !impl->paused) {
struct pw_proxy *proxy;
const struct pw_protocol_native_demarshal *demarshal;
const struct pw_protocol_marshal *marshal;
@ -626,7 +609,7 @@ on_remote_data(void *data, int fd, uint32_t mask)
if (res < 0) {
if (res == -EAGAIN)
break;
goto error;
return res;
}
if (res == 0)
break;
@ -677,6 +660,38 @@ on_remote_data(void *data, int fd, uint32_t mask)
continue;
}
}
return 0;
}
static void
on_remote_data(void *data, int fd, uint32_t mask)
{
struct client *impl = data;
struct pw_core *this = impl->this.core;
struct pw_protocol_native_connection *conn = impl->connection;
struct pw_context *context = pw_core_get_context(this);
struct pw_loop *loop = pw_context_get_main_loop(context);
int res;
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
res = -EPIPE;
goto error;
}
if (mask & SPA_IO_OUT) {
res = pw_protocol_native_connection_flush(conn);
if (res >= 0) {
int mask = impl->source->mask;
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
pw_loop_update_io(loop,
impl->source, mask);
impl->flushing = false;
} else if (res != EAGAIN)
goto error;
}
if (mask & SPA_IO_IN) {
if ((res = process_remote(impl)) < 0)
goto error;
}
return;
error:
@ -688,7 +703,6 @@ error:
impl->source = NULL;
}
static void on_need_flush(void *data)
{
struct client *impl = data;
@ -764,6 +778,26 @@ static void impl_destroy(struct pw_protocol_client *client)
free(impl);
}
static int impl_set_paused(struct pw_protocol_client *client, bool paused)
{
struct client *impl = SPA_CONTAINER_OF(client, struct client, this);
uint32_t mask;
if (impl->source == NULL)
return -EIO;
mask = impl->source->mask;
impl->paused = paused;
SPA_FLAG_UPDATE(mask, SPA_IO_IN, !paused);
pw_log_debug(NAME" %p: paused %d", client->protocol, paused);
pw_loop_update_io(impl->context->main_loop, impl->source, mask);
return paused ? 0 : process_remote(impl);
}
static int pw_protocol_native_connect_internal(struct pw_protocol_client *client,
const struct spa_dict *props,
void (*done_callback) (void *data, int res),
@ -856,6 +890,7 @@ impl_new_client(struct pw_protocol *protocol,
this->connect_fd = impl_connect_fd;
this->disconnect = impl_disconnect;
this->destroy = impl_destroy;
this->set_paused = impl_set_paused;
spa_list_append(&protocol->client_list, &this->link);

View file

@ -432,6 +432,12 @@ int pw_core_steal_fd(struct pw_core *core)
return pw_protocol_client_steal_fd(core->conn);
}
SPA_EXPORT
int pw_core_set_paused(struct pw_core *core, bool paused)
{
return pw_protocol_client_set_paused(core->conn, paused);
}
SPA_EXPORT
struct pw_mempool * pw_core_get_mempool(struct pw_core *core)
{

View file

@ -528,6 +528,10 @@ pw_context_connect_self(struct pw_context *context, /**< a \ref pw_context to co
* will be disconnected after this call. */
int pw_core_steal_fd(struct pw_core *core);
/** Pause or resume the core. When the core is paused, no new events
* will be dispatched until the core is resumed again. */
int pw_core_set_paused(struct pw_core *core, bool paused);
/** disconnect and destroy a core */
int pw_core_disconnect(struct pw_core *core);

View file

@ -54,6 +54,7 @@ struct pw_protocol_client {
int (*steal_fd) (struct pw_protocol_client *client);
void (*disconnect) (struct pw_protocol_client *client);
void (*destroy) (struct pw_protocol_client *client);
int (*set_paused) (struct pw_protocol_client *client, bool paused);
};
#define pw_protocol_client_connect(c,p,cb,d) ((c)->connect(c,p,cb,d))
@ -61,6 +62,7 @@ struct pw_protocol_client {
#define pw_protocol_client_steal_fd(c) ((c)->steal_fd(c))
#define pw_protocol_client_disconnect(c) ((c)->disconnect(c))
#define pw_protocol_client_destroy(c) ((c)->destroy(c))
#define pw_protocol_client_set_paused(c,p) ((c)->set_paused(c,p))
struct pw_protocol_server {
struct spa_list link; /**< link in protocol server_list */