pulse-server: support server config

Extend the server.address property so that you can also specify
an object per server. Add support for configuring some aspects of the
server such as max-clients and backlog.

Most importantly, the pipewire client.access can be configured per
server.

See #1960
This commit is contained in:
Wim Taymans 2022-01-04 14:26:08 +01:00
parent 611591d0fc
commit 22625fb658
3 changed files with 104 additions and 73 deletions

View file

@ -40,6 +40,12 @@ context.modules = [
#"tcp:4713" # IPv4 and IPv6 on all addresses #"tcp:4713" # IPv4 and IPv6 on all addresses
#"tcp:[::]:9999" # IPv6 on all addresses #"tcp:[::]:9999" # IPv6 on all addresses
#"tcp:127.0.0.1:8888" # IPv4 on a single address #"tcp:127.0.0.1:8888" # IPv4 on a single address
#
#{ address = "tcp:4713" # address
# max-clients = 64 # maximume number of clients
# listen-backlog = 32 # backlog in the server listen queue
# client.access = "restricted" # permissions for clients
#}
] ]
#pulse.min.req = 256/48000 # 5ms #pulse.min.req = 256/48000 # 5ms
#pulse.default.req = 960/48000 # 20 milliseconds #pulse.default.req = 960/48000 # 20 milliseconds

View file

@ -361,6 +361,7 @@ on_connect(void *data, int fd, uint32_t mask)
socklen_t length; socklen_t length;
int client_fd, val; int client_fd, val;
struct client *client = NULL; struct client *client = NULL;
const char *client_access = NULL;
pid_t pid; pid_t pid;
length = sizeof(name); length = sizeof(name);
@ -410,6 +411,9 @@ on_connect(void *data, int fd, uint32_t mask)
if (client->routes == NULL) if (client->routes == NULL)
goto error; goto error;
if (server->client_access[0] != '\0')
client_access = server->client_access;
if (server->addr.ss_family == AF_UNIX) { if (server->addr.ss_family == AF_UNIX) {
#ifdef SO_PRIORITY #ifdef SO_PRIORITY
val = 6; val = 6;
@ -418,9 +422,10 @@ on_connect(void *data, int fd, uint32_t mask)
#endif #endif
pid = get_client_pid(client, client_fd); pid = get_client_pid(client, client_fd);
if (pid != 0 && check_flatpak(client, pid) == 1) if (pid != 0 && check_flatpak(client, pid) == 1)
pw_properties_set(client->props, PW_KEY_CLIENT_ACCESS, "flatpak"); client_access = "flatpak";
} }
else if (server->addr.ss_family == AF_INET || server->addr.ss_family == AF_INET6) { else if (server->addr.ss_family == AF_INET || server->addr.ss_family == AF_INET6) {
val = 1; val = 1;
if (setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) < 0) if (setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) < 0)
pw_log_warn("setsockopt(TCP_NODELAY) failed: %m"); pw_log_warn("setsockopt(TCP_NODELAY) failed: %m");
@ -430,9 +435,10 @@ on_connect(void *data, int fd, uint32_t mask)
if (setsockopt(client_fd, IPPROTO_IP, IP_TOS, &val, sizeof(val)) < 0) if (setsockopt(client_fd, IPPROTO_IP, IP_TOS, &val, sizeof(val)) < 0)
pw_log_warn("setsockopt(IP_TOS) failed: %m"); pw_log_warn("setsockopt(IP_TOS) failed: %m");
} }
if (client_access == NULL)
pw_properties_set(client->props, PW_KEY_CLIENT_ACCESS, "restricted"); client_access = "restricted";
} }
pw_properties_set(client->props, PW_KEY_CLIENT_ACCESS, client_access);
return; return;
@ -442,9 +448,9 @@ error:
client_free(client); client_free(client);
} }
static int parse_unix_address(const char *address, struct pw_array *addrs) static int parse_unix_address(const char *address, struct sockaddr_storage *addrs, int len)
{ {
struct sockaddr_un addr = {0}, *s; struct sockaddr_un addr = {0};
int res; int res;
if (address[0] != '/') { if (address[0] != '/') {
@ -469,15 +475,13 @@ static int parse_unix_address(const char *address, struct pw_array *addrs)
return -ENAMETOOLONG; return -ENAMETOOLONG;
} }
s = pw_array_add(addrs, sizeof(struct sockaddr_storage)); if (len < 1)
if (s == NULL) return -ENOSPC;
return -ENOMEM;
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
*s = addr; memcpy(&addrs[0], &addr, sizeof(addr));
return 1;
return 0;
} }
#ifndef SUN_LEN #ifndef SUN_LEN
@ -581,7 +585,7 @@ static int start_unix_server(struct server *server, const struct sockaddr_storag
goto error_close; goto error_close;
} }
if (listen(fd, LISTEN_BACKLOG) < 0) { if (listen(fd, server->listen_backlog) < 0) {
res = -errno; res = -errno;
pw_log_warn("server %p: listen() on '%s' failed: %m", pw_log_warn("server %p: listen() on '%s' failed: %m",
server, addr_un->sun_path); server, addr_un->sun_path);
@ -745,49 +749,43 @@ static int get_ip_address_length(const struct sockaddr_storage *addr)
} }
} }
static int parse_ip_address(const char *address, struct pw_array *addrs) static int parse_ip_address(const char *address, struct sockaddr_storage *addrs, int len)
{ {
char ip[FORMATTED_IP_ADDR_STRLEN]; char ip[FORMATTED_IP_ADDR_STRLEN];
struct sockaddr_storage addr, *s; struct sockaddr_storage addr;
int res; int res;
res = parse_ipv6_address(address, (struct sockaddr_in6 *) &addr); res = parse_ipv6_address(address, (struct sockaddr_in6 *) &addr);
if (res == 0) { if (res == 0) {
s = pw_array_add(addrs, sizeof(*s)); if (len < 1)
if (s == NULL) return -ENOSPC;
return -ENOMEM; addrs[0] = addr;
return 1;
*s = addr;
return 0;
} }
res = parse_ipv4_address(address, (struct sockaddr_in *) &addr); res = parse_ipv4_address(address, (struct sockaddr_in *) &addr);
if (res == 0) { if (res == 0) {
s = pw_array_add(addrs, sizeof(*s)); if (len < 1)
if (s == NULL) return -ENOSPC;
return -ENOMEM; addrs[0] = addr;
return 1;
*s = addr;
return 0;
} }
res = parse_port(address); res = parse_port(address);
if (res < 0) if (res < 0)
return res; return res;
s = pw_array_add(addrs, sizeof(*s) * 2); if (len < 2)
if (s == NULL) return -ENOSPC;
return -ENOMEM;
snprintf(ip, sizeof(ip), "[::]:%d", res); snprintf(ip, sizeof(ip), "[::]:%d", res);
spa_assert_se(parse_ipv6_address(ip, (struct sockaddr_in6 *) &addr) == 0); spa_assert_se(parse_ipv6_address(ip, (struct sockaddr_in6 *) &addr) == 0);
*s++ = addr; addrs[0] = addr;
snprintf(ip, sizeof(ip), "0.0.0.0:%d", res); snprintf(ip, sizeof(ip), "0.0.0.0:%d", res);
spa_assert_se(parse_ipv4_address(ip, (struct sockaddr_in *) &addr) == 0); spa_assert_se(parse_ipv4_address(ip, (struct sockaddr_in *) &addr) == 0);
*s++ = addr; addrs[1] = addr;
return 2;
return 0;
} }
static int start_ip_server(struct server *server, const struct sockaddr_storage *addr) static int start_ip_server(struct server *server, const struct sockaddr_storage *addr)
@ -822,7 +820,7 @@ static int start_ip_server(struct server *server, const struct sockaddr_storage
goto error_close; goto error_close;
} }
if (listen(fd, LISTEN_BACKLOG) < 0) { if (listen(fd, server->listen_backlog) < 0) {
res = -errno; res = -errno;
pw_log_warn("server %p: listen() failed: %m", server); pw_log_warn("server %p: listen() failed: %m", server);
goto error_close; goto error_close;
@ -888,13 +886,13 @@ static int server_start(struct server *server, const struct sockaddr_storage *ad
return res; return res;
} }
static int parse_address(const char *address, struct pw_array *addrs) static int parse_address(const char *address, struct sockaddr_storage *addrs, int len)
{ {
if (spa_strstartswith(address, "tcp:")) if (spa_strstartswith(address, "tcp:"))
return parse_ip_address(address + strlen("tcp:"), addrs); return parse_ip_address(address + strlen("tcp:"), addrs, len);
if (spa_strstartswith(address, "unix:")) if (spa_strstartswith(address, "unix:"))
return parse_unix_address(address + strlen("unix:"), addrs); return parse_unix_address(address + strlen("unix:"), addrs, len);
return -EAFNOSUPPORT; return -EAFNOSUPPORT;
} }
@ -927,11 +925,9 @@ static int format_socket_address(const struct sockaddr_storage *addr, char *buff
int servers_create_and_start(struct impl *impl, const char *addresses, struct pw_array *servers) int servers_create_and_start(struct impl *impl, const char *addresses, struct pw_array *servers)
{ {
struct pw_array addrs = PW_ARRAY_INIT(sizeof(struct sockaddr_storage)); int len, res, count = 0, err = 0; /* store the first error to return when no servers could be created */
const struct sockaddr_storage *addr; const char *v;
char addr_str[FORMATTED_SOCKET_ADDR_STRLEN]; struct spa_json it[3];
int res, count = 0, err = 0; /* store the first error to return when no servers could be created */
struct spa_json it[2];
/* update `err` if it hasn't been set to an errno */ /* update `err` if it hasn't been set to an errno */
#define UPDATE_ERR(e) do { if (err == 0) err = (e); } while (false) #define UPDATE_ERR(e) do { if (err == 0) err = (e); } while (false)
@ -939,52 +935,77 @@ int servers_create_and_start(struct impl *impl, const char *addresses, struct pw
/* collect addresses into an array of `struct sockaddr_storage` */ /* collect addresses into an array of `struct sockaddr_storage` */
spa_json_init(&it[0], addresses, strlen(addresses)); spa_json_init(&it[0], addresses, strlen(addresses));
/* [ <server-spec> ... ] */
if (spa_json_enter_array(&it[0], &it[1]) < 0) if (spa_json_enter_array(&it[0], &it[1]) < 0)
return -EINVAL; return -EINVAL;
while (spa_json_get_string(&it[1], addr_str, sizeof(addr_str)) > 0) { /* a server-spec is either an address or an object */
res = parse_address(addr_str, &addrs); while ((len = spa_json_next(&it[1], &v)) > 0) {
if (res < 0) { char addr_str[FORMATTED_SOCKET_ADDR_STRLEN] = { 0 };
char key[128], client_access[64] = { 0 };
struct sockaddr_storage addr[2];
int i, max_clients = MAX_CLIENTS, listen_backlog = LISTEN_BACKLOG, n_addr;
if (spa_json_is_object(v, len)) {
spa_json_enter(&it[1], &it[2]);
while (spa_json_get_string(&it[2], key, sizeof(key)) > 0) {
if ((len = spa_json_next(&it[2], &v)) <= 0)
break;
if (spa_streq(key, "address")) {
spa_json_parse_stringn(v, len, addr_str, sizeof(addr_str));
} else if (spa_streq(key, "max-clients")) {
spa_json_parse_int(v, len, &max_clients);
} else if (spa_streq(key, "listen-backlog")) {
spa_json_parse_int(v, len, &listen_backlog);
} else if (spa_streq(key, "client.access")) {
spa_json_parse_stringn(v, len, client_access, sizeof(client_access));
}
}
} else {
spa_json_parse_stringn(v, len, addr_str, sizeof(addr_str));
}
n_addr = parse_address(addr_str, addr, 2);
if (n_addr < 0) {
pw_log_warn("pulse-server %p: failed to parse address '%s': %s", pw_log_warn("pulse-server %p: failed to parse address '%s': %s",
impl, addr_str, spa_strerror(res)); impl, addr_str, spa_strerror(n_addr));
UPDATE_ERR(n_addr);
UPDATE_ERR(res);
}
}
/* try to create sockets for each address in the list */
pw_array_for_each (addr, &addrs) {
struct server * const server = server_new(impl);
if (server == NULL) {
UPDATE_ERR(-errno);
continue; continue;
} }
res = server_start(server, addr); /* try to create sockets for each address in the list */
if (res < 0) { for (i = 0; i < n_addr; i++) {
spa_assert_se(format_socket_address(addr, addr_str, sizeof(addr_str)) >= 0); struct server * const server = server_new(impl);
pw_log_warn("pulse-server %p: failed to start server on '%s': %s", if (server == NULL) {
impl, addr_str, spa_strerror(res)); UPDATE_ERR(-errno);
continue;
}
UPDATE_ERR(res); server->max_clients = max_clients;
server_free(server); server->listen_backlog = listen_backlog;
memcpy(server->client_access, client_access, sizeof(client_access));
continue; res = server_start(server, addr);
if (res < 0) {
spa_assert_se(format_socket_address(addr, addr_str, sizeof(addr_str)) >= 0);
pw_log_warn("pulse-server %p: failed to start server on '%s': %s",
impl, addr_str, spa_strerror(res));
UPDATE_ERR(res);
server_free(server);
continue;
}
if (servers != NULL)
pw_array_add_ptr(servers, server);
count += 1;
} }
if (servers != NULL)
pw_array_add_ptr(servers, server);
count += 1;
} }
pw_array_clear(&addrs);
if (count == 0) { if (count == 0) {
UPDATE_ERR(-EINVAL); UPDATE_ERR(-EINVAL);
return err; return err;
} }
return count; return count;
#undef UPDATE_ERR #undef UPDATE_ERR

View file

@ -45,6 +45,10 @@ struct server {
struct spa_source *source; struct spa_source *source;
struct spa_list clients; struct spa_list clients;
uint32_t max_clients;
uint32_t listen_backlog;
char client_access[64];
uint32_t n_clients; uint32_t n_clients;
uint32_t wait_clients; uint32_t wait_clients;
unsigned int activated:1; unsigned int activated:1;