From 0ea8a92ea5cd1ff98fa38be55b3802d1891852a4 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 14 Jan 2020 16:37:01 +0100 Subject: [PATCH] core: implement pw_core_set_paused Make it possible to pause emision of events from the core object. This is interesting if we need to wait for completion of some operations on another connection before resuming processing. See #204 --- src/modules/module-protocol-native.c | 207 ++++++++++++++++----------- src/pipewire/core.c | 6 + src/pipewire/core.h | 4 + src/pipewire/protocol.h | 2 + 4 files changed, 133 insertions(+), 86 deletions(-) diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c index 4ec7d6917..afab88c12 100644 --- a/src/modules/module-protocol-native.c +++ b/src/modules/module-protocol-native.c @@ -90,6 +90,7 @@ struct client { unsigned int disconnecting:1; unsigned int flushing:1; + unsigned int paused:1; }; struct server { @@ -121,7 +122,7 @@ struct client_data { struct protocol_compat_v2 compat_v2; }; -static void +static int process_messages(struct client_data *data) { struct pw_protocol_native_connection *conn = data->connection; @@ -175,8 +176,10 @@ process_messages(struct client_data *data) } marshal = pw_resource_get_marshal(resource); - if (marshal == NULL || msg->opcode >= marshal->n_client_methods) + if (marshal == NULL || msg->opcode >= marshal->n_client_methods) { + res = -EINVAL; goto invalid_method; + } demarshal = marshal->server_demarshal; if (!demarshal[msg->opcode].func) { @@ -198,16 +201,16 @@ process_messages(struct client_data *data) if ((res = demarshal[msg->opcode].func(resource, msg)) < 0) goto invalid_message; } + res = 0; done: context->current_client = NULL; - return; + return res; invalid_method: pw_log_error(NAME" %p: invalid method id:%u op:%u", client->protocol, msg->id, msg->opcode); - pw_resource_errorf(resource, -EINVAL, "invalid method id:%u op:%u", + pw_resource_errorf(resource, res, "invalid method id:%u op:%u", msg->id, msg->opcode); - pw_impl_client_destroy(client); goto done; invalid_message: pw_log_error(NAME" %p: invalid message received id:%u op:%u (%s)", @@ -215,11 +218,9 @@ invalid_message: pw_resource_errorf(resource, res, "invalid message received id:%u op:%u (%s)", msg->id, msg->opcode, spa_strerror(res)); spa_debug_pod(0, NULL, (struct spa_pod *)msg->data); - pw_impl_client_destroy(client); goto done; error: pw_log_error(NAME" %p: client error (%s)", client->protocol, spa_strerror(res)); - pw_impl_client_destroy(client); goto done; } @@ -239,7 +240,6 @@ client_busy_changed(void *data, bool busy) if (!busy) process_messages(c); - } static void @@ -250,14 +250,12 @@ connection_data(void *data, int fd, uint32_t mask) int res; if (mask & SPA_IO_HUP) { - pw_log_info(NAME" %p: client %p disconnected", client->protocol, client); - pw_impl_client_destroy(client); - return; + res = -EPIPE; + goto error; } if (mask & SPA_IO_ERR) { - pw_log_error(NAME" %p: client %p error", client->protocol, client); - pw_impl_client_destroy(client); - return; + res = -EIO; + goto error; } if (mask & SPA_IO_OUT) { res = pw_protocol_native_connection_flush(this->connection); @@ -266,15 +264,21 @@ connection_data(void *data, int fd, uint32_t mask) SPA_FLAG_CLEAR(mask, SPA_IO_OUT); pw_loop_update_io(client->context->main_loop, this->source, mask); - } else if (res != EAGAIN) { - pw_log_error("client %p: could not flush: %s", - client, spa_strerror(res)); - pw_impl_client_destroy(client); - return; - } + } else if (res != EAGAIN) + goto error; } - if (mask & SPA_IO_IN) - process_messages(this); + if (mask & SPA_IO_IN) { + if ((res = process_messages(this)) < 0) + goto error; + } + return; +error: + if (res == -EPIPE) + pw_log_info(NAME" %p: client %p disconnected", client->protocol, client); + else + pw_log_error(NAME" %p: client %p error %d (%s)", client->protocol, + client, res, spa_strerror(res)); + pw_impl_client_destroy(client); } static void client_free(void *data) @@ -588,6 +592,77 @@ static int impl_steal_fd(struct pw_protocol_client *client) return fd; } +static int +process_remote(struct client *impl) +{ + const struct pw_protocol_native_message *msg; + struct pw_protocol_native_connection *conn = impl->connection; + struct pw_core *this = impl->this.core; + int res = 0; + + while (!impl->disconnecting && !impl->paused) { + struct pw_proxy *proxy; + const struct pw_protocol_native_demarshal *demarshal; + const struct pw_protocol_marshal *marshal; + + res = pw_protocol_native_connection_get_next(conn, &msg); + if (res < 0) { + if (res == -EAGAIN) + break; + return res; + } + if (res == 0) + break; + + pw_log_trace(NAME" %p: got message %d from %u seq:%d", + this, msg->opcode, msg->id, msg->seq); + + this->recv_seq = msg->seq; + + if (debug_messages) { + fprintf(stderr, "<<<<<<<<< in: id:%d op:%d size:%d seq:%d\n", + msg->id, msg->opcode, msg->size, msg->seq); + spa_debug_pod(0, NULL, (struct spa_pod *)msg->data); + } + + proxy = pw_core_find_proxy(this, msg->id); + if (proxy == NULL || proxy->zombie) { + if (proxy == NULL) + pw_log_error(NAME" %p: could not find proxy %u", this, msg->id); + else + pw_log_debug(NAME" %p: zombie proxy %u", this, msg->id); + + /* FIXME close fds */ + continue; + } + + marshal = pw_proxy_get_marshal(proxy); + if (marshal == NULL || msg->opcode >= marshal->n_server_methods) { + pw_log_error(NAME" %p: invalid method %u for %u (%d)", + this, msg->opcode, msg->id, + marshal ? marshal->n_server_methods : (uint32_t)-1); + continue; + } + + demarshal = marshal->client_demarshal; + if (!demarshal[msg->opcode].func) { + pw_log_error(NAME" %p: function %d not implemented on %u", + this, msg->opcode, msg->id); + continue; + } + proxy->refcount++; + res = demarshal[msg->opcode].func(proxy, msg); + pw_proxy_unref(proxy); + + if (res < 0) { + pw_log_error (NAME" %p: invalid message received %u for %u", + this, msg->opcode, msg->id); + continue; + } + } + return 0; +} + static void on_remote_data(void *data, int fd, uint32_t mask) { @@ -615,69 +690,9 @@ on_remote_data(void *data, int fd, uint32_t mask) } if (mask & SPA_IO_IN) { - const struct pw_protocol_native_message *msg; - - while (!impl->disconnecting) { - struct pw_proxy *proxy; - const struct pw_protocol_native_demarshal *demarshal; - const struct pw_protocol_marshal *marshal; - - res = pw_protocol_native_connection_get_next(conn, &msg); - if (res < 0) { - if (res == -EAGAIN) - break; - goto error; - } - if (res == 0) - break; - - pw_log_trace(NAME" %p: got message %d from %u seq:%d", - this, msg->opcode, msg->id, msg->seq); - - this->recv_seq = msg->seq; - - if (debug_messages) { - fprintf(stderr, "<<<<<<<<< in: id:%d op:%d size:%d seq:%d\n", - msg->id, msg->opcode, msg->size, msg->seq); - spa_debug_pod(0, NULL, (struct spa_pod *)msg->data); - } - - proxy = pw_core_find_proxy(this, msg->id); - if (proxy == NULL || proxy->zombie) { - if (proxy == NULL) - pw_log_error(NAME" %p: could not find proxy %u", this, msg->id); - else - pw_log_debug(NAME" %p: zombie proxy %u", this, msg->id); - - /* FIXME close fds */ - continue; - } - - marshal = pw_proxy_get_marshal(proxy); - if (marshal == NULL || msg->opcode >= marshal->n_server_methods) { - pw_log_error(NAME" %p: invalid method %u for %u (%d)", - this, msg->opcode, msg->id, - marshal ? marshal->n_server_methods : (uint32_t)-1); - continue; - } - - demarshal = marshal->client_demarshal; - if (!demarshal[msg->opcode].func) { - pw_log_error(NAME" %p: function %d not implemented on %u", - this, msg->opcode, msg->id); - continue; - } - proxy->refcount++; - res = demarshal[msg->opcode].func(proxy, msg); - pw_proxy_unref(proxy); - - if (res < 0) { - pw_log_error (NAME" %p: invalid message received %u for %u", - this, msg->opcode, msg->id); - continue; - } - } - } + if ((res = process_remote(impl)) < 0) + goto error; + } return; error: pw_log_error(NAME" %p: got connection error %d (%s)", impl, res, spa_strerror(res)); @@ -688,7 +703,6 @@ error: impl->source = NULL; } - static void on_need_flush(void *data) { struct client *impl = data; @@ -764,6 +778,26 @@ static void impl_destroy(struct pw_protocol_client *client) free(impl); } +static int impl_set_paused(struct pw_protocol_client *client, bool paused) +{ + struct client *impl = SPA_CONTAINER_OF(client, struct client, this); + uint32_t mask; + + if (impl->source == NULL) + return -EIO; + + mask = impl->source->mask; + + impl->paused = paused; + + SPA_FLAG_UPDATE(mask, SPA_IO_IN, !paused); + + pw_log_debug(NAME" %p: paused %d", client->protocol, paused); + pw_loop_update_io(impl->context->main_loop, impl->source, mask); + + return paused ? 0 : process_remote(impl); +} + static int pw_protocol_native_connect_internal(struct pw_protocol_client *client, const struct spa_dict *props, void (*done_callback) (void *data, int res), @@ -856,6 +890,7 @@ impl_new_client(struct pw_protocol *protocol, this->connect_fd = impl_connect_fd; this->disconnect = impl_disconnect; this->destroy = impl_destroy; + this->set_paused = impl_set_paused; spa_list_append(&protocol->client_list, &this->link); diff --git a/src/pipewire/core.c b/src/pipewire/core.c index 5423e7cce..19480d43b 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -432,6 +432,12 @@ int pw_core_steal_fd(struct pw_core *core) return pw_protocol_client_steal_fd(core->conn); } +SPA_EXPORT +int pw_core_set_paused(struct pw_core *core, bool paused) +{ + return pw_protocol_client_set_paused(core->conn, paused); +} + SPA_EXPORT struct pw_mempool * pw_core_get_mempool(struct pw_core *core) { diff --git a/src/pipewire/core.h b/src/pipewire/core.h index 2dc3241ce..2d2cabc75 100644 --- a/src/pipewire/core.h +++ b/src/pipewire/core.h @@ -528,6 +528,10 @@ pw_context_connect_self(struct pw_context *context, /**< a \ref pw_context to co * will be disconnected after this call. */ int pw_core_steal_fd(struct pw_core *core); +/** Pause or resume the core. When the core is paused, no new events + * will be dispatched until the core is resumed again. */ +int pw_core_set_paused(struct pw_core *core, bool paused); + /** disconnect and destroy a core */ int pw_core_disconnect(struct pw_core *core); diff --git a/src/pipewire/protocol.h b/src/pipewire/protocol.h index acb9133c8..b7b0f0346 100644 --- a/src/pipewire/protocol.h +++ b/src/pipewire/protocol.h @@ -54,6 +54,7 @@ struct pw_protocol_client { int (*steal_fd) (struct pw_protocol_client *client); void (*disconnect) (struct pw_protocol_client *client); void (*destroy) (struct pw_protocol_client *client); + int (*set_paused) (struct pw_protocol_client *client, bool paused); }; #define pw_protocol_client_connect(c,p,cb,d) ((c)->connect(c,p,cb,d)) @@ -61,6 +62,7 @@ struct pw_protocol_client { #define pw_protocol_client_steal_fd(c) ((c)->steal_fd(c)) #define pw_protocol_client_disconnect(c) ((c)->disconnect(c)) #define pw_protocol_client_destroy(c) ((c)->destroy(c)) +#define pw_protocol_client_set_paused(c,p) ((c)->set_paused(c,p)) struct pw_protocol_server { struct spa_list link; /**< link in protocol server_list */