diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c index ca3208414..4eef16c4d 100644 --- a/src/modules/module-protocol-native.c +++ b/src/modules/module-protocol-native.c @@ -65,8 +65,6 @@ struct protocol_data { struct client { struct pw_protocol_client this; - int fd; - struct spa_source *source; struct pw_protocol_native_connection *connection; @@ -80,7 +78,6 @@ struct client { struct server { struct pw_protocol_server this; - int fd; int fd_lock; struct sockaddr_un addr; char lock_addr[UNIX_PATH_MAX + LOCK_SUFFIXLEN]; @@ -93,7 +90,6 @@ struct server { struct client_data { struct pw_client *client; struct spa_hook client_listener; - int fd; struct spa_source *source; struct pw_protocol_native_connection *connection; bool busy; @@ -217,7 +213,6 @@ static void client_free(void *data) spa_list_remove(&client->protocol_link); pw_protocol_native_connection_destroy(this->connection); - close(this->fd); } static const struct pw_client_events client_events = { @@ -254,10 +249,8 @@ static struct pw_client *client_new(struct server *s, int fd) this = pw_client_get_user_data(client); this->client = client; - this->fd = fd; this->source = pw_loop_add_io(pw_core_get_main_loop(core), - this->fd, - SPA_IO_ERR | SPA_IO_HUP, false, connection_data, this); + fd, SPA_IO_ERR | SPA_IO_HUP, true, connection_data, this); if (this->source == NULL) goto no_source; @@ -376,27 +369,34 @@ socket_data(void *data, int fd, enum spa_io mask) static bool add_socket(struct pw_protocol *protocol, struct server *s) { socklen_t size; + int fd; - if ((s->fd = socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) - return false; + if ((fd = socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) + goto error; size = offsetof(struct sockaddr_un, sun_path) + strlen(s->addr.sun_path); - if (bind(s->fd, (struct sockaddr *) &s->addr, size) < 0) { + if (bind(fd, (struct sockaddr *) &s->addr, size) < 0) { pw_log_error("bind() failed with error: %m"); - return false; + goto error_close; } - if (listen(s->fd, 128) < 0) { + if (listen(fd, 128) < 0) { pw_log_error("listen() failed with error: %m"); - return false; + goto error_close; } s->loop = pw_core_get_main_loop(protocol->core); - s->source = pw_loop_add_io(s->loop, s->fd, SPA_IO_IN, false, socket_data, s); + s->source = pw_loop_add_io(s->loop, fd, SPA_IO_IN, true, socket_data, s); if (s->source == NULL) - return false; + goto error_close; return true; + + error_close: + close(fd); + error: + return false; + } static const char * @@ -561,11 +561,10 @@ static int impl_connect_fd(struct pw_protocol_client *client, int fd) &conn_events, impl); - impl->fd = fd; impl->source = pw_loop_add_io(remote->core->main_loop, fd, SPA_IO_IN | SPA_IO_HUP | SPA_IO_ERR, - false, on_remote_data, impl); + true, on_remote_data, impl); return 0; @@ -588,10 +587,6 @@ static void impl_disconnect(struct pw_protocol_client *client) if (impl->connection) pw_protocol_native_connection_destroy(impl->connection); impl->connection = NULL; - - if (impl->fd != -1) - close(impl->fd); - impl->fd = -1; } static void impl_destroy(struct pw_protocol_client *client) @@ -599,6 +594,8 @@ static void impl_destroy(struct pw_protocol_client *client) struct client *impl = SPA_CONTAINER_OF(client, struct client, this); struct pw_remote *remote = client->remote; + impl_disconnect(client); + pw_loop_destroy_source(remote->core->main_loop, impl->flush_event); spa_list_remove(&client->link); @@ -625,7 +622,6 @@ impl_new_client(struct pw_protocol *protocol, this->disconnect = impl_disconnect; this->destroy = impl_destroy; - impl->fd = -1; impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl); spa_list_append(&protocol->client_list, &this->link); @@ -647,11 +643,9 @@ static void destroy_server(struct pw_protocol_server *server) pw_loop_destroy_source(s->loop, s->source); if (s->addr.sun_path[0]) unlink(s->addr.sun_path); - if (s->fd >= 0) - close(s->fd); if (s->lock_addr[0]) unlink(s->lock_addr); - if (s->fd_lock >= 0) + if (s->fd_lock != 1) close(s->fd_lock); free(s); } @@ -686,7 +680,6 @@ impl_add_server(struct pw_protocol *protocol, if ((s = calloc(1, sizeof(struct server))) == NULL) return NULL; - s->fd = -1; s->fd_lock = -1; this = &s->this; diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 3a00a7ce2..52eedfb48 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -67,7 +67,6 @@ struct node_data { struct pw_type *t; uint32_t node_id; - int rtreadfd; int rtwritefd; struct spa_source *rtsocket_source; struct pw_client_node_transport *trans; @@ -323,7 +322,7 @@ static int do_connect(struct pw_remote *remote) return 0; no_proxy: - pw_protocol_client_disconnect (remote->conn); + pw_protocol_client_disconnect(remote->conn); pw_remote_update_state(remote, PW_REMOTE_STATE_ERROR, "can't connect: no memory"); return -1; } @@ -470,7 +469,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) struct pw_client_node_message message; uint64_t cmd; - read(data->rtreadfd, &cmd, 8); + read(fd, &cmd, 8); while (pw_client_node_transport_next_message(data->trans, &message) == SPA_RESULT_OK) { struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); @@ -546,10 +545,9 @@ static void client_node_transport(void *object, uint32_t node_id, spa_list_for_each(port, &data->node->output_ports, link) spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id]); - data->rtreadfd = readfd; data->rtwritefd = writefd; data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, - data->rtreadfd, + readfd, SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, proxy); } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index dea18eb46..76d14cbd0 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -78,7 +78,6 @@ struct stream { enum pw_stream_mode mode; - int rtreadfd; int rtwritefd; struct spa_source *rtsocket_source; @@ -207,6 +206,7 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, this->remote = remote; this->name = strdup(name); impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); + impl->rtwritefd = -1; spa_hook_list_init(&this->listener_list); @@ -268,7 +268,11 @@ do_remove_sources(struct spa_loop *loop, pw_loop_destroy_source(stream->remote->core->data_loop, impl->timeout_source); impl->timeout_source = NULL; } - return SPA_RESULT_OK; + if (impl->rtwritefd != -1) { + close(impl->rtwritefd); + impl->rtwritefd = -1; + } + return SPA_RESULT_OK; } static void unhandle_socket(struct pw_stream *stream) @@ -355,8 +359,6 @@ void pw_stream_destroy(struct pw_stream *stream) if (stream->name) free(stream->name); - close(impl->rtwritefd); - free(impl); } @@ -559,7 +561,7 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask) struct pw_client_node_message message; uint64_t cmd; - read(impl->rtreadfd, &cmd, 8); + read(fd, &cmd, 8); while (pw_client_node_transport_next_message(impl->trans, &message) == SPA_RESULT_OK) { struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); @@ -574,10 +576,9 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct timespec interval; - impl->rtreadfd = rtreadfd; impl->rtwritefd = rtwritefd; impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop, - impl->rtreadfd, + rtreadfd, SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, stream);