diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c index 4ec7d6917..afab88c12 100644 --- a/src/modules/module-protocol-native.c +++ b/src/modules/module-protocol-native.c @@ -90,6 +90,7 @@ struct client { unsigned int disconnecting:1; unsigned int flushing:1; + unsigned int paused:1; }; struct server { @@ -121,7 +122,7 @@ struct client_data { struct protocol_compat_v2 compat_v2; }; -static void +static int process_messages(struct client_data *data) { struct pw_protocol_native_connection *conn = data->connection; @@ -175,8 +176,10 @@ process_messages(struct client_data *data) } marshal = pw_resource_get_marshal(resource); - if (marshal == NULL || msg->opcode >= marshal->n_client_methods) + if (marshal == NULL || msg->opcode >= marshal->n_client_methods) { + res = -EINVAL; goto invalid_method; + } demarshal = marshal->server_demarshal; if (!demarshal[msg->opcode].func) { @@ -198,16 +201,16 @@ process_messages(struct client_data *data) if ((res = demarshal[msg->opcode].func(resource, msg)) < 0) goto invalid_message; } + res = 0; done: context->current_client = NULL; - return; + return res; invalid_method: pw_log_error(NAME" %p: invalid method id:%u op:%u", client->protocol, msg->id, msg->opcode); - pw_resource_errorf(resource, -EINVAL, "invalid method id:%u op:%u", + pw_resource_errorf(resource, res, "invalid method id:%u op:%u", msg->id, msg->opcode); - pw_impl_client_destroy(client); goto done; invalid_message: pw_log_error(NAME" %p: invalid message received id:%u op:%u (%s)", @@ -215,11 +218,9 @@ invalid_message: pw_resource_errorf(resource, res, "invalid message received id:%u op:%u (%s)", msg->id, msg->opcode, spa_strerror(res)); spa_debug_pod(0, NULL, (struct spa_pod *)msg->data); - pw_impl_client_destroy(client); goto done; error: pw_log_error(NAME" %p: client error (%s)", client->protocol, spa_strerror(res)); - pw_impl_client_destroy(client); goto done; } @@ -239,7 +240,6 @@ client_busy_changed(void *data, bool busy) if (!busy) process_messages(c); - } static void @@ -250,14 +250,12 @@ connection_data(void *data, int fd, uint32_t mask) int res; if (mask & SPA_IO_HUP) { - pw_log_info(NAME" %p: client %p disconnected", client->protocol, client); - pw_impl_client_destroy(client); - return; + res = -EPIPE; + goto error; } if (mask & SPA_IO_ERR) { - pw_log_error(NAME" %p: client %p error", client->protocol, client); - pw_impl_client_destroy(client); - return; + res = -EIO; + goto error; } if (mask & SPA_IO_OUT) { res = pw_protocol_native_connection_flush(this->connection); @@ -266,15 +264,21 @@ connection_data(void *data, int fd, uint32_t mask) SPA_FLAG_CLEAR(mask, SPA_IO_OUT); pw_loop_update_io(client->context->main_loop, this->source, mask); - } else if (res != EAGAIN) { - pw_log_error("client %p: could not flush: %s", - client, spa_strerror(res)); - pw_impl_client_destroy(client); - return; - } + } else if (res != EAGAIN) + goto error; } - if (mask & SPA_IO_IN) - process_messages(this); + if (mask & SPA_IO_IN) { + if ((res = process_messages(this)) < 0) + goto error; + } + return; +error: + if (res == -EPIPE) + pw_log_info(NAME" %p: client %p disconnected", client->protocol, client); + else + pw_log_error(NAME" %p: client %p error %d (%s)", client->protocol, + client, res, spa_strerror(res)); + pw_impl_client_destroy(client); } static void client_free(void *data) @@ -588,6 +592,77 @@ static int impl_steal_fd(struct pw_protocol_client *client) return fd; } +static int +process_remote(struct client *impl) +{ + const struct pw_protocol_native_message *msg; + struct pw_protocol_native_connection *conn = impl->connection; + struct pw_core *this = impl->this.core; + int res = 0; + + while (!impl->disconnecting && !impl->paused) { + struct pw_proxy *proxy; + const struct pw_protocol_native_demarshal *demarshal; + const struct pw_protocol_marshal *marshal; + + res = pw_protocol_native_connection_get_next(conn, &msg); + if (res < 0) { + if (res == -EAGAIN) + break; + return res; + } + if (res == 0) + break; + + pw_log_trace(NAME" %p: got message %d from %u seq:%d", + this, msg->opcode, msg->id, msg->seq); + + this->recv_seq = msg->seq; + + if (debug_messages) { + fprintf(stderr, "<<<<<<<<< in: id:%d op:%d size:%d seq:%d\n", + msg->id, msg->opcode, msg->size, msg->seq); + spa_debug_pod(0, NULL, (struct spa_pod *)msg->data); + } + + proxy = pw_core_find_proxy(this, msg->id); + if (proxy == NULL || proxy->zombie) { + if (proxy == NULL) + pw_log_error(NAME" %p: could not find proxy %u", this, msg->id); + else + pw_log_debug(NAME" %p: zombie proxy %u", this, msg->id); + + /* FIXME close fds */ + continue; + } + + marshal = pw_proxy_get_marshal(proxy); + if (marshal == NULL || msg->opcode >= marshal->n_server_methods) { + pw_log_error(NAME" %p: invalid method %u for %u (%d)", + this, msg->opcode, msg->id, + marshal ? marshal->n_server_methods : (uint32_t)-1); + continue; + } + + demarshal = marshal->client_demarshal; + if (!demarshal[msg->opcode].func) { + pw_log_error(NAME" %p: function %d not implemented on %u", + this, msg->opcode, msg->id); + continue; + } + proxy->refcount++; + res = demarshal[msg->opcode].func(proxy, msg); + pw_proxy_unref(proxy); + + if (res < 0) { + pw_log_error (NAME" %p: invalid message received %u for %u", + this, msg->opcode, msg->id); + continue; + } + } + return 0; +} + static void on_remote_data(void *data, int fd, uint32_t mask) { @@ -615,69 +690,9 @@ on_remote_data(void *data, int fd, uint32_t mask) } if (mask & SPA_IO_IN) { - const struct pw_protocol_native_message *msg; - - while (!impl->disconnecting) { - struct pw_proxy *proxy; - const struct pw_protocol_native_demarshal *demarshal; - const struct pw_protocol_marshal *marshal; - - res = pw_protocol_native_connection_get_next(conn, &msg); - if (res < 0) { - if (res == -EAGAIN) - break; - goto error; - } - if (res == 0) - break; - - pw_log_trace(NAME" %p: got message %d from %u seq:%d", - this, msg->opcode, msg->id, msg->seq); - - this->recv_seq = msg->seq; - - if (debug_messages) { - fprintf(stderr, "<<<<<<<<< in: id:%d op:%d size:%d seq:%d\n", - msg->id, msg->opcode, msg->size, msg->seq); - spa_debug_pod(0, NULL, (struct spa_pod *)msg->data); - } - - proxy = pw_core_find_proxy(this, msg->id); - if (proxy == NULL || proxy->zombie) { - if (proxy == NULL) - pw_log_error(NAME" %p: could not find proxy %u", this, msg->id); - else - pw_log_debug(NAME" %p: zombie proxy %u", this, msg->id); - - /* FIXME close fds */ - continue; - } - - marshal = pw_proxy_get_marshal(proxy); - if (marshal == NULL || msg->opcode >= marshal->n_server_methods) { - pw_log_error(NAME" %p: invalid method %u for %u (%d)", - this, msg->opcode, msg->id, - marshal ? marshal->n_server_methods : (uint32_t)-1); - continue; - } - - demarshal = marshal->client_demarshal; - if (!demarshal[msg->opcode].func) { - pw_log_error(NAME" %p: function %d not implemented on %u", - this, msg->opcode, msg->id); - continue; - } - proxy->refcount++; - res = demarshal[msg->opcode].func(proxy, msg); - pw_proxy_unref(proxy); - - if (res < 0) { - pw_log_error (NAME" %p: invalid message received %u for %u", - this, msg->opcode, msg->id); - continue; - } - } - } + if ((res = process_remote(impl)) < 0) + goto error; + } return; error: pw_log_error(NAME" %p: got connection error %d (%s)", impl, res, spa_strerror(res)); @@ -688,7 +703,6 @@ error: impl->source = NULL; } - static void on_need_flush(void *data) { struct client *impl = data; @@ -764,6 +778,26 @@ static void impl_destroy(struct pw_protocol_client *client) free(impl); } +static int impl_set_paused(struct pw_protocol_client *client, bool paused) +{ + struct client *impl = SPA_CONTAINER_OF(client, struct client, this); + uint32_t mask; + + if (impl->source == NULL) + return -EIO; + + mask = impl->source->mask; + + impl->paused = paused; + + SPA_FLAG_UPDATE(mask, SPA_IO_IN, !paused); + + pw_log_debug(NAME" %p: paused %d", client->protocol, paused); + pw_loop_update_io(impl->context->main_loop, impl->source, mask); + + return paused ? 0 : process_remote(impl); +} + static int pw_protocol_native_connect_internal(struct pw_protocol_client *client, const struct spa_dict *props, void (*done_callback) (void *data, int res), @@ -856,6 +890,7 @@ impl_new_client(struct pw_protocol *protocol, this->connect_fd = impl_connect_fd; this->disconnect = impl_disconnect; this->destroy = impl_destroy; + this->set_paused = impl_set_paused; spa_list_append(&protocol->client_list, &this->link); diff --git a/src/pipewire/core.c b/src/pipewire/core.c index 5423e7cce..19480d43b 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -432,6 +432,12 @@ int pw_core_steal_fd(struct pw_core *core) return pw_protocol_client_steal_fd(core->conn); } +SPA_EXPORT +int pw_core_set_paused(struct pw_core *core, bool paused) +{ + return pw_protocol_client_set_paused(core->conn, paused); +} + SPA_EXPORT struct pw_mempool * pw_core_get_mempool(struct pw_core *core) { diff --git a/src/pipewire/core.h b/src/pipewire/core.h index 2dc3241ce..2d2cabc75 100644 --- a/src/pipewire/core.h +++ b/src/pipewire/core.h @@ -528,6 +528,10 @@ pw_context_connect_self(struct pw_context *context, /**< a \ref pw_context to co * will be disconnected after this call. */ int pw_core_steal_fd(struct pw_core *core); +/** Pause or resume the core. When the core is paused, no new events + * will be dispatched until the core is resumed again. */ +int pw_core_set_paused(struct pw_core *core, bool paused); + /** disconnect and destroy a core */ int pw_core_disconnect(struct pw_core *core); diff --git a/src/pipewire/protocol.h b/src/pipewire/protocol.h index acb9133c8..b7b0f0346 100644 --- a/src/pipewire/protocol.h +++ b/src/pipewire/protocol.h @@ -54,6 +54,7 @@ struct pw_protocol_client { int (*steal_fd) (struct pw_protocol_client *client); void (*disconnect) (struct pw_protocol_client *client); void (*destroy) (struct pw_protocol_client *client); + int (*set_paused) (struct pw_protocol_client *client, bool paused); }; #define pw_protocol_client_connect(c,p,cb,d) ((c)->connect(c,p,cb,d)) @@ -61,6 +62,7 @@ struct pw_protocol_client { #define pw_protocol_client_steal_fd(c) ((c)->steal_fd(c)) #define pw_protocol_client_disconnect(c) ((c)->disconnect(c)) #define pw_protocol_client_destroy(c) ((c)->destroy(c)) +#define pw_protocol_client_set_paused(c,p) ((c)->set_paused(c,p)) struct pw_protocol_server { struct spa_list link; /**< link in protocol server_list */