fix a socket leak

Let the loop source free the sockets.
Free the write fd in the stream.
This commit is contained in:
Wim Taymans 2017-08-27 13:14:34 +02:00
parent 577f86be0d
commit 174d34ada6
3 changed files with 31 additions and 39 deletions

View file

@ -65,8 +65,6 @@ struct protocol_data {
struct client { struct client {
struct pw_protocol_client this; struct pw_protocol_client this;
int fd;
struct spa_source *source; struct spa_source *source;
struct pw_protocol_native_connection *connection; struct pw_protocol_native_connection *connection;
@ -80,7 +78,6 @@ struct client {
struct server { struct server {
struct pw_protocol_server this; struct pw_protocol_server this;
int fd;
int fd_lock; int fd_lock;
struct sockaddr_un addr; struct sockaddr_un addr;
char lock_addr[UNIX_PATH_MAX + LOCK_SUFFIXLEN]; char lock_addr[UNIX_PATH_MAX + LOCK_SUFFIXLEN];
@ -93,7 +90,6 @@ struct server {
struct client_data { struct client_data {
struct pw_client *client; struct pw_client *client;
struct spa_hook client_listener; struct spa_hook client_listener;
int fd;
struct spa_source *source; struct spa_source *source;
struct pw_protocol_native_connection *connection; struct pw_protocol_native_connection *connection;
bool busy; bool busy;
@ -217,7 +213,6 @@ static void client_free(void *data)
spa_list_remove(&client->protocol_link); spa_list_remove(&client->protocol_link);
pw_protocol_native_connection_destroy(this->connection); pw_protocol_native_connection_destroy(this->connection);
close(this->fd);
} }
static const struct pw_client_events client_events = { static const struct pw_client_events client_events = {
@ -254,10 +249,8 @@ static struct pw_client *client_new(struct server *s, int fd)
this = pw_client_get_user_data(client); this = pw_client_get_user_data(client);
this->client = client; this->client = client;
this->fd = fd;
this->source = pw_loop_add_io(pw_core_get_main_loop(core), this->source = pw_loop_add_io(pw_core_get_main_loop(core),
this->fd, fd, SPA_IO_ERR | SPA_IO_HUP, true, connection_data, this);
SPA_IO_ERR | SPA_IO_HUP, false, connection_data, this);
if (this->source == NULL) if (this->source == NULL)
goto no_source; goto no_source;
@ -376,27 +369,34 @@ socket_data(void *data, int fd, enum spa_io mask)
static bool add_socket(struct pw_protocol *protocol, struct server *s) static bool add_socket(struct pw_protocol *protocol, struct server *s)
{ {
socklen_t size; socklen_t size;
int fd;
if ((s->fd = socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) if ((fd = socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0)
return false; goto error;
size = offsetof(struct sockaddr_un, sun_path) + strlen(s->addr.sun_path); size = offsetof(struct sockaddr_un, sun_path) + strlen(s->addr.sun_path);
if (bind(s->fd, (struct sockaddr *) &s->addr, size) < 0) { if (bind(fd, (struct sockaddr *) &s->addr, size) < 0) {
pw_log_error("bind() failed with error: %m"); pw_log_error("bind() failed with error: %m");
return false; goto error_close;
} }
if (listen(s->fd, 128) < 0) { if (listen(fd, 128) < 0) {
pw_log_error("listen() failed with error: %m"); pw_log_error("listen() failed with error: %m");
return false; goto error_close;
} }
s->loop = pw_core_get_main_loop(protocol->core); s->loop = pw_core_get_main_loop(protocol->core);
s->source = pw_loop_add_io(s->loop, s->fd, SPA_IO_IN, false, socket_data, s); s->source = pw_loop_add_io(s->loop, fd, SPA_IO_IN, true, socket_data, s);
if (s->source == NULL) if (s->source == NULL)
return false; goto error_close;
return true; return true;
error_close:
close(fd);
error:
return false;
} }
static const char * static const char *
@ -561,11 +561,10 @@ static int impl_connect_fd(struct pw_protocol_client *client, int fd)
&conn_events, &conn_events,
impl); impl);
impl->fd = fd;
impl->source = pw_loop_add_io(remote->core->main_loop, impl->source = pw_loop_add_io(remote->core->main_loop,
fd, fd,
SPA_IO_IN | SPA_IO_HUP | SPA_IO_ERR, SPA_IO_IN | SPA_IO_HUP | SPA_IO_ERR,
false, on_remote_data, impl); true, on_remote_data, impl);
return 0; return 0;
@ -588,10 +587,6 @@ static void impl_disconnect(struct pw_protocol_client *client)
if (impl->connection) if (impl->connection)
pw_protocol_native_connection_destroy(impl->connection); pw_protocol_native_connection_destroy(impl->connection);
impl->connection = NULL; impl->connection = NULL;
if (impl->fd != -1)
close(impl->fd);
impl->fd = -1;
} }
static void impl_destroy(struct pw_protocol_client *client) static void impl_destroy(struct pw_protocol_client *client)
@ -599,6 +594,8 @@ static void impl_destroy(struct pw_protocol_client *client)
struct client *impl = SPA_CONTAINER_OF(client, struct client, this); struct client *impl = SPA_CONTAINER_OF(client, struct client, this);
struct pw_remote *remote = client->remote; struct pw_remote *remote = client->remote;
impl_disconnect(client);
pw_loop_destroy_source(remote->core->main_loop, impl->flush_event); pw_loop_destroy_source(remote->core->main_loop, impl->flush_event);
spa_list_remove(&client->link); spa_list_remove(&client->link);
@ -625,7 +622,6 @@ impl_new_client(struct pw_protocol *protocol,
this->disconnect = impl_disconnect; this->disconnect = impl_disconnect;
this->destroy = impl_destroy; this->destroy = impl_destroy;
impl->fd = -1;
impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl); impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl);
spa_list_append(&protocol->client_list, &this->link); spa_list_append(&protocol->client_list, &this->link);
@ -647,11 +643,9 @@ static void destroy_server(struct pw_protocol_server *server)
pw_loop_destroy_source(s->loop, s->source); pw_loop_destroy_source(s->loop, s->source);
if (s->addr.sun_path[0]) if (s->addr.sun_path[0])
unlink(s->addr.sun_path); unlink(s->addr.sun_path);
if (s->fd >= 0)
close(s->fd);
if (s->lock_addr[0]) if (s->lock_addr[0])
unlink(s->lock_addr); unlink(s->lock_addr);
if (s->fd_lock >= 0) if (s->fd_lock != 1)
close(s->fd_lock); close(s->fd_lock);
free(s); free(s);
} }
@ -686,7 +680,6 @@ impl_add_server(struct pw_protocol *protocol,
if ((s = calloc(1, sizeof(struct server))) == NULL) if ((s = calloc(1, sizeof(struct server))) == NULL)
return NULL; return NULL;
s->fd = -1;
s->fd_lock = -1; s->fd_lock = -1;
this = &s->this; this = &s->this;

View file

@ -67,7 +67,6 @@ struct node_data {
struct pw_type *t; struct pw_type *t;
uint32_t node_id; uint32_t node_id;
int rtreadfd;
int rtwritefd; int rtwritefd;
struct spa_source *rtsocket_source; struct spa_source *rtsocket_source;
struct pw_client_node_transport *trans; struct pw_client_node_transport *trans;
@ -323,7 +322,7 @@ static int do_connect(struct pw_remote *remote)
return 0; return 0;
no_proxy: no_proxy:
pw_protocol_client_disconnect (remote->conn); pw_protocol_client_disconnect(remote->conn);
pw_remote_update_state(remote, PW_REMOTE_STATE_ERROR, "can't connect: no memory"); pw_remote_update_state(remote, PW_REMOTE_STATE_ERROR, "can't connect: no memory");
return -1; return -1;
} }
@ -470,7 +469,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
struct pw_client_node_message message; struct pw_client_node_message message;
uint64_t cmd; uint64_t cmd;
read(data->rtreadfd, &cmd, 8); read(fd, &cmd, 8);
while (pw_client_node_transport_next_message(data->trans, &message) == SPA_RESULT_OK) { while (pw_client_node_transport_next_message(data->trans, &message) == SPA_RESULT_OK) {
struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message));
@ -546,10 +545,9 @@ static void client_node_transport(void *object, uint32_t node_id,
spa_list_for_each(port, &data->node->output_ports, link) spa_list_for_each(port, &data->node->output_ports, link)
spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id]); spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id]);
data->rtreadfd = readfd;
data->rtwritefd = writefd; data->rtwritefd = writefd;
data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop,
data->rtreadfd, readfd,
SPA_IO_ERR | SPA_IO_HUP, SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, proxy); true, on_rtsocket_condition, proxy);
} }

View file

@ -78,7 +78,6 @@ struct stream {
enum pw_stream_mode mode; enum pw_stream_mode mode;
int rtreadfd;
int rtwritefd; int rtwritefd;
struct spa_source *rtsocket_source; struct spa_source *rtsocket_source;
@ -207,6 +206,7 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
this->remote = remote; this->remote = remote;
this->name = strdup(name); this->name = strdup(name);
impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode);
impl->rtwritefd = -1;
spa_hook_list_init(&this->listener_list); spa_hook_list_init(&this->listener_list);
@ -268,7 +268,11 @@ do_remove_sources(struct spa_loop *loop,
pw_loop_destroy_source(stream->remote->core->data_loop, impl->timeout_source); pw_loop_destroy_source(stream->remote->core->data_loop, impl->timeout_source);
impl->timeout_source = NULL; impl->timeout_source = NULL;
} }
return SPA_RESULT_OK; if (impl->rtwritefd != -1) {
close(impl->rtwritefd);
impl->rtwritefd = -1;
}
return SPA_RESULT_OK;
} }
static void unhandle_socket(struct pw_stream *stream) static void unhandle_socket(struct pw_stream *stream)
@ -355,8 +359,6 @@ void pw_stream_destroy(struct pw_stream *stream)
if (stream->name) if (stream->name)
free(stream->name); free(stream->name);
close(impl->rtwritefd);
free(impl); free(impl);
} }
@ -559,7 +561,7 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask)
struct pw_client_node_message message; struct pw_client_node_message message;
uint64_t cmd; uint64_t cmd;
read(impl->rtreadfd, &cmd, 8); read(fd, &cmd, 8);
while (pw_client_node_transport_next_message(impl->trans, &message) == SPA_RESULT_OK) { while (pw_client_node_transport_next_message(impl->trans, &message) == SPA_RESULT_OK) {
struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message));
@ -574,10 +576,9 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct timespec interval; struct timespec interval;
impl->rtreadfd = rtreadfd;
impl->rtwritefd = rtwritefd; impl->rtwritefd = rtwritefd;
impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop, impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop,
impl->rtreadfd, rtreadfd,
SPA_IO_ERR | SPA_IO_HUP, SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, stream); true, on_rtsocket_condition, stream);