pulse-server: support network connections

This commit is contained in:
Wim Taymans 2020-10-08 17:00:24 +02:00
parent 177e897a88
commit d01586bfa5

View file

@ -226,8 +226,7 @@ static int flush_messages(struct client *client)
data = m->data + idx; data = m->data + idx;
size = m->length - idx; size = m->length - idx;
} else { } else {
spa_list_remove(&m->link); message_free(client, m, false);
spa_list_append(&client->free_messages, &m->link);
client->out_index = 0; client->out_index = 0;
continue; continue;
} }
@ -2309,6 +2308,7 @@ static void client_free(struct client *client)
pw_log_info(NAME" %p: client %p free", impl, client); pw_log_info(NAME" %p: client %p free", impl, client);
spa_list_remove(&client->link); spa_list_remove(&client->link);
pw_map_clear(&client->streams); pw_map_clear(&client->streams);
spa_list_consume(msg, &client->free_messages, link) spa_list_consume(msg, &client->free_messages, link)
message_free(client, msg, true); message_free(client, msg, true);
spa_list_consume(msg, &client->out_messages, link) spa_list_consume(msg, &client->out_messages, link)
@ -2424,7 +2424,7 @@ static int do_read(struct client *client)
if (client->message == NULL) { if (client->message == NULL) {
res = -EIO; res = -EIO;
goto error; goto exit;
} }
data = SPA_MEMBER(client->message->data, idx, void); data = SPA_MEMBER(client->message->data, idx, void);
size = client->message->length - idx; size = client->message->length - idx;
@ -2434,7 +2434,7 @@ static int do_read(struct client *client)
if (errno == EINTR) if (errno == EINTR)
continue; continue;
res = -errno; res = -errno;
goto error; goto exit;
} }
client->in_index += r; client->in_index += r;
break; break;
@ -2446,7 +2446,7 @@ static int do_read(struct client *client)
flags = ntohl(client->desc.flags); flags = ntohl(client->desc.flags);
if ((flags & FLAG_SHMMASK) != 0) { if ((flags & FLAG_SHMMASK) != 0) {
res = -ENOTSUP; res = -ENOTSUP;
goto error; goto exit;
} }
length = ntohl(client->desc.length); length = ntohl(client->desc.length);
@ -2454,7 +2454,7 @@ static int do_read(struct client *client)
pw_log_warn(NAME" %p: Received invalid frame size: %u", pw_log_warn(NAME" %p: Received invalid frame size: %u",
impl, length); impl, length);
res = -EPROTO; res = -EPROTO;
goto error; goto exit;
} }
channel = ntohl(client->desc.channel); channel = ntohl(client->desc.channel);
if (channel == (uint32_t) -1) { if (channel == (uint32_t) -1) {
@ -2462,8 +2462,12 @@ static int do_read(struct client *client)
pw_log_warn(NAME" %p: Received packet frame with invalid " pw_log_warn(NAME" %p: Received packet frame with invalid "
"flags value.", impl); "flags value.", impl);
res = -EPROTO; res = -EPROTO;
goto error; goto exit;
} }
} else {
} }
if (client->message) if (client->message)
message_free(client, client->message, false); message_free(client, client->message, false);
@ -2480,7 +2484,7 @@ static int do_read(struct client *client)
else else
res = handle_memblock(client, msg); res = handle_memblock(client, msg);
} }
error: exit:
return res; return res;
} }
@ -2610,22 +2614,14 @@ static void server_free(struct server *server)
free(server); free(server);
} }
static struct server *create_local_server(struct impl *impl, const char *name) static int make_local_socket(struct impl *impl, const char *name)
{ {
const char *runtime_dir; const char *runtime_dir;
struct server *server;
socklen_t size;
struct sockaddr_un addr; struct sockaddr_un addr;
socklen_t size;
int name_size, fd, res; int name_size, fd, res;
struct stat socket_stat; struct stat socket_stat;
server = calloc(1, sizeof(struct server));
if (server == NULL)
return NULL;
server->impl = impl;
spa_list_init(&server->clients);
runtime_dir = get_runtime_dir(); runtime_dir = get_runtime_dir();
addr.sun_family = AF_LOCAL; addr.sun_family = AF_LOCAL;
@ -2664,16 +2660,110 @@ static struct server *create_local_server(struct impl *impl, const char *name)
pw_log_error(NAME" %p: listen() failed with error: %m", impl); pw_log_error(NAME" %p: listen() failed with error: %m", impl);
goto error_close; goto error_close;
} }
pw_log_info(NAME" listening on unix:%s", addr.sun_path);
return fd;
error_close:
close(fd);
error:
return res;
}
static int make_inet_socket(struct impl *impl, uint32_t address, uint16_t port)
{
struct sockaddr_in addr;
int res, fd;
if ((fd = socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) {
res = -errno;
goto error;
}
spa_zero(addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(address);
if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
res = -errno;
pw_log_error(NAME" %p: bind() failed with error: %m", impl);
goto error_close;
}
if (listen(fd, 5) < 0) {
res = -errno;
pw_log_error(NAME" %p: listen() failed with error: %m", impl);
goto error_close;
}
pw_log_info(NAME" listening on tcp:%08x:%u", address, port);
return fd;
error_close:
close(fd);
error:
return res;
}
static struct server *create_local_server(struct impl *impl, const char *name)
{
struct server *server;
int fd, res;
server = calloc(1, sizeof(struct server));
if (server == NULL)
return NULL;
server->impl = impl;
spa_list_init(&server->clients);
fd = make_local_socket(impl, name);
if (fd < 0) {
res = fd;
goto error;
}
server->source = pw_loop_add_io(impl->loop, fd, SPA_IO_IN, true, on_connect, server); server->source = pw_loop_add_io(impl->loop, fd, SPA_IO_IN, true, on_connect, server);
if (server->source == NULL) { if (server->source == NULL) {
res = -errno; res = -errno;
pw_log_error(NAME" %p: can't create server source: %m", impl); pw_log_error(NAME" %p: can't create server source: %m", impl);
goto error_close; goto error_close;
} }
pw_log_info(NAME" listening on %s", addr.sun_path);
return server; return server;
error_close:
close(fd);
error:
server_free(server);
errno = -res;
return NULL;
}
static struct server *create_inet_server(struct impl *impl, uint32_t address, uint16_t port)
{
int fd, res;
struct server *server;
server = calloc(1, sizeof(struct server));
if (server == NULL)
return NULL;
server->impl = impl;
spa_list_init(&server->clients);
fd = make_inet_socket(impl, address, port);
if (fd < 0) {
res = fd;
goto error;
}
server->source = pw_loop_add_io(impl->loop, fd, SPA_IO_IN, true, on_connect, server);
if (server->source == NULL) {
res = -errno;
pw_log_error(NAME" %p: can't create server source: %m", impl);
goto error_close;
}
return server;
error_close: error_close:
close(fd); close(fd);
error: error:
@ -2683,6 +2773,7 @@ error:
} }
static void impl_free(struct impl *impl) static void impl_free(struct impl *impl)
{ {
struct server *s; struct server *s;
@ -2698,7 +2789,6 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
{ {
struct impl *impl; struct impl *impl;
const char *str; const char *str;
struct server *server;
int res; int res;
impl = calloc(1, sizeof(struct impl) + user_data_size); impl = calloc(1, sizeof(struct impl) + user_data_size);
@ -2759,11 +2849,38 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
if (str == NULL) if (str == NULL)
str = PW_PROTOCOL_PULSE_DEFAULT_SOCKET; str = PW_PROTOCOL_PULSE_DEFAULT_SOCKET;
server = create_local_server(impl, str); if (create_local_server(impl, str) == NULL) {
if (server == NULL) {
res = -errno; res = -errno;
goto error; goto error;
} }
str = NULL;
if (props != NULL)
str = pw_properties_get(props, "tcp.listen");
if (str != NULL) {
uint32_t address = INADDR_ANY;
uint16_t port;
const char *col = strchr(str, ':');
if (col) {
struct in_addr ipv4;
port = atoi(col+1);
if (inet_pton(AF_INET, str, &ipv4) > 0)
address = ntohl(ipv4.s_addr);
else
address = INADDR_ANY;
} else {
address = INADDR_ANY;
port = atoi(str);
}
if (port == 0)
port = PW_PROTOCOL_PULSE_DEFAULT_PORT;
if (create_inet_server(impl, address, port) == NULL) {
res = -errno;
goto error;
}
}
return (struct pw_protocol_pulse*)impl; return (struct pw_protocol_pulse*)impl;
error: error: