module-netjack2: small improvements

This commit is contained in:
Wim Taymans 2023-06-01 18:33:23 +02:00
parent 52bc7451fd
commit f8aa18c88b
2 changed files with 86 additions and 79 deletions

View file

@ -33,7 +33,6 @@
#include <pipewire/impl.h>
#include <pipewire/i18n.h>
#include <pipewire/private.h>
#include <pipewire/thread.h>
#include "module-netjack2/packets.h"
@ -49,7 +48,7 @@
*
* ## Module Options
*
* - `driver.mode`: the tunnel mode, sink|source|duplex, default duplex
* - `driver.mode`: the driver mode, sink|source|duplex, default duplex
* - `jack.client-name`: the name of the JACK client.
* - `local.ifname = <str>`: interface name to use
* - `net.ip =<str>`: multicast IP address, default "225.3.19.154"
@ -82,7 +81,7 @@
*
*\code{.unparsed}
* context.modules = [
* { name = libpipewire-module-jack-tunnel
* { name = libpipewire-module-netjack2-driver
* args = {
* #driver.mode = duplex
* #jack.client-name = PipeWire
@ -125,8 +124,6 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_POSITION "[ FL FR ]"
#define DEFAULT_MIDI_PORTS 1
#define NETWORK_PROTOCOL 8
#define FOLLOWER_INIT_TIMEOUT 1
#define FOLLOWER_INIT_RETRY 5
@ -201,8 +198,12 @@ struct impl {
uint32_t mode;
struct pw_properties *props;
struct pw_impl_module *module;
bool loop;
int ttl;
int dscp;
int mtu;
struct pw_impl_module *module;
struct spa_hook module_listener;
struct pw_core *core;
@ -220,8 +221,6 @@ struct impl {
uint32_t pw_xrun;
uint32_t nj2_xrun;
pthread_t thread;
struct sockaddr_storage dst_addr;
socklen_t dst_len;
struct sockaddr_storage src_addr;
@ -368,11 +367,13 @@ static int32_t netjack2_sync_wait(struct impl *impl)
if (len < (ssize_t)sizeof(*sync))
continue;
//nj2_dump_packet_header(sync);
if (strcmp(sync->type, "header") != 0)
continue;
if (ntohl(sync->data_type) != 's')
continue;
if (ntohl(sync->data_stream) != 's')
if (ntohl(sync->data_type) != 's' ||
ntohl(sync->data_stream) != 's' ||
ntohl(sync->id) != impl->params.id)
continue;
break;
}
@ -636,12 +637,13 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t
if (len < (ssize_t)sizeof(*header))
goto receive_error;
//nj2_dump_packet_header(header);
if (ntohl(header->data_stream) != 's' ||
ntohl(header->id) != impl->params.id) {
pw_log_debug("not our packet");
continue;
}
//nj2_dump_packet_header(header);
switch (ntohl(header->data_type)) {
case 'm':
@ -1088,7 +1090,6 @@ static int make_socket(struct sockaddr_storage *src, socklen_t src_len,
if (setsockopt(fd, IPPROTO_IP, IP_TOS, &val, sizeof(val)) < 0)
pw_log_warn("setsockopt(IP_TOS) failed: %m");
}
if (bind(fd, (struct sockaddr*)src, src_len) < 0) {
res = -errno;
pw_log_error("bind() failed: %m");
@ -1110,7 +1111,7 @@ error:
return res;
}
static int get_ip(const struct sockaddr_storage *sa, char *ip, size_t len)
static const char *get_ip(const struct sockaddr_storage *sa, char *ip, size_t len)
{
if (sa->ss_family == AF_INET) {
struct sockaddr_in *in = (struct sockaddr_in*)sa;
@ -1119,8 +1120,8 @@ static int get_ip(const struct sockaddr_storage *sa, char *ip, size_t len)
struct sockaddr_in6 *in = (struct sockaddr_in6*)sa;
inet_ntop(sa->ss_family, &in->sin6_addr, ip, len);
} else
return -EIO;
return 0;
snprintf(ip, len, "invalid ip");
return ip;
}
static void update_timer(struct impl *impl, uint64_t timeout)
@ -1159,6 +1160,17 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
impl->params.follower_sync_mode = ntohl(params->follower_sync_mode);
impl->params.network_latency = ntohl(params->network_latency);
if (impl->params.send_audio_channels < 0 ||
impl->params.recv_audio_channels < 0 ||
impl->params.send_midi_channels < 0 ||
impl->params.recv_midi_channels < 0 ||
impl->params.sample_rate == 0 ||
impl->params.period_size == 0 ||
impl->params.sample_encoder != NJ2_ENCODER_FLOAT) {
pw_log_warn("invalid follower setup");
return -EINVAL;
}
update_timer(impl, 0);
pw_loop_update_io(impl->main_loop, impl->setup_socket, 0);
@ -1194,62 +1206,76 @@ connect_error:
return -errno;
}
static int handle_packet(struct impl *impl, void *data, int len,
struct sockaddr_storage *addr, socklen_t addr_len)
{
struct nj2_session_params *params;
if (len < (int)sizeof(struct nj2_session_params))
goto short_packet;
params = data;
if (strcmp(params->type, impl->params.type) != 0) {
pw_log_info("wrong packet type '%s' != '%s'",
params->type, impl->params.type);
return -EINVAL;
}
if (ntohl(params->packet_id) != NJ2_ID_FOLLOWER_SETUP) {
pw_log_info("wrong packet id %d != %d",
htonl(params->packet_id), NJ2_ID_FOLLOWER_SETUP);
return -EINVAL;
}
return handle_follower_setup(impl, data, addr, addr_len);
short_packet:
pw_log_warn("short packet received");
return -EIO;
}
static void
on_socket_io(void *data, int fd, uint32_t mask)
{
struct impl *impl = data;
ssize_t len;
uint8_t buffer[2048];
struct sockaddr_storage addr;
socklen_t addr_len;
if (mask & SPA_IO_IN) {
if ((len = recvfrom(fd, buffer, sizeof(buffer), 0,
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
ssize_t len;
struct nj2_session_params params;
if ((len = recvfrom(fd, &params, sizeof(params), 0,
(struct sockaddr *)&addr, &addr_len)) < 0)
goto receive_error;
handle_packet(impl, (void *)buffer, len, &addr, addr_len);
if (len < (int)sizeof(struct nj2_session_params))
goto short_packet;
if (strcmp(params.type, "params") != 0)
goto wrong_type;
switch(ntohl(params.packet_id)) {
case NJ2_ID_FOLLOWER_SETUP:
handle_follower_setup(impl, &params, &addr, addr_len);
break;
}
}
return;
receive_error:
pw_log_warn("recv error: %m");
return;
short_packet:
pw_log_warn("short packet received");
return;
wrong_type:
pw_log_warn("wrong packet type received");
return;
}
static int send_follower_available(struct impl *impl)
{
char buffer[256];
struct nj2_session_params params;
const char *client_name;
pw_loop_update_io(impl->main_loop, impl->setup_socket, SPA_IO_IN);
pw_log_info("sending AVAILABLE");
impl->params.packet_id = htonl(NJ2_ID_FOLLOWER_AVAILABLE);
sendto(impl->setup_socket->fd, &impl->params, sizeof(impl->params), 0,
pw_log_info("sending AVAILABLE to %s", get_ip(&impl->dst_addr, buffer, sizeof(buffer)));
client_name = pw_properties_get(impl->props, "jack.client-name");
if (client_name == NULL)
client_name = DEFAULT_CLIENT_NAME;
spa_zero(params);
strcpy(params.type, "params");
params.version = htonl(NJ2_NETWORK_PROTOCOL);
params.packet_id = htonl(NJ2_ID_FOLLOWER_AVAILABLE);
snprintf(params.name, sizeof(params.name), "%s", client_name);
snprintf(params.follower_name, sizeof(params.follower_name), "%s", pw_get_host_name());
params.mtu = htonl(impl->mtu);
params.transport_sync = htonl(0);
params.send_audio_channels = htonl(-1);
params.recv_audio_channels = htonl(-1);
params.send_midi_channels = htonl(-1);
params.recv_midi_channels = htonl(-1);
params.sample_encoder = htonl(NJ2_ENCODER_FLOAT);
params.follower_sync_mode = htonl(1);
params.network_latency = htonl(NETWORK_DEFAULT_LATENCY);
sendto(impl->setup_socket->fd, &params, sizeof(params), 0,
(struct sockaddr*)&impl->dst_addr, impl->dst_len);
return 0;
}
@ -1268,11 +1294,9 @@ static void on_timer_event(void *data, uint64_t expirations)
static int create_netjack2_socket(struct impl *impl)
{
const char *client_name, *str;
const char *str;
uint32_t port;
int fd, res;
bool loop;
int ttl, dscp, mtu;
port = pw_properties_get_uint32(impl->props, "net.port", 0);
if (port == 0)
@ -1288,13 +1312,13 @@ static int create_netjack2_socket(struct impl *impl)
goto out;
}
mtu = pw_properties_get_uint32(impl->props, "net.mtu", DEFAULT_NET_MTU);
ttl = pw_properties_get_uint32(impl->props, "net.ttl", DEFAULT_NET_TTL);
loop = pw_properties_get_bool(impl->props, "net.loop", DEFAULT_NET_LOOP);
dscp = pw_properties_get_uint32(impl->props, "net.dscp", DEFAULT_NET_DSCP);
impl->mtu = pw_properties_get_uint32(impl->props, "net.mtu", DEFAULT_NET_MTU);
impl->ttl = pw_properties_get_uint32(impl->props, "net.ttl", DEFAULT_NET_TTL);
impl->loop = pw_properties_get_bool(impl->props, "net.loop", DEFAULT_NET_LOOP);
impl->dscp = pw_properties_get_uint32(impl->props, "net.dscp", DEFAULT_NET_DSCP);
fd = make_socket(&impl->src_addr, impl->src_len,
&impl->dst_addr, impl->dst_len, loop, ttl, dscp);
&impl->dst_addr, impl->dst_len, impl->loop, impl->ttl, impl->dscp);
if (fd < 0) {
res = -errno;
pw_log_error("can't create socket: %s", spa_strerror(res));
@ -1326,24 +1350,6 @@ static int create_netjack2_socket(struct impl *impl)
}
update_timer(impl, FOLLOWER_INIT_TIMEOUT);
client_name = pw_properties_get(impl->props, "jack.client-name");
if (client_name == NULL)
client_name = DEFAULT_CLIENT_NAME;
strcpy(impl->params.type, "params");
impl->params.version = htonl(NETWORK_PROTOCOL);
snprintf(impl->params.name, sizeof(impl->params.name), "%s", client_name);
impl->params.mtu = htonl(mtu);
impl->params.sample_encoder = htonl(NJ2_ENCODER_FLOAT);
get_ip(&impl->src_addr, impl->params.follower_name, sizeof(impl->params.follower_name));
impl->params.transport_sync = htonl(0);
impl->params.network_latency = htonl(NETWORK_DEFAULT_LATENCY);
impl->params.follower_sync_mode = htonl(1);
impl->params.send_audio_channels = htonl(-1);
impl->params.recv_audio_channels = htonl(-1);
impl->params.send_midi_channels = htonl(-1);
impl->params.recv_midi_channels = htonl(-1);
return 0;
out:
return res;
@ -1535,7 +1541,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->sink.direction = PW_DIRECTION_INPUT;
impl->mode = MODE_DUPLEX;
if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) {
if ((str = pw_properties_get(props, "driver.mode")) != NULL) {
if (spa_streq(str, "source")) {
impl->mode = MODE_SOURCE;
} else if (spa_streq(str, "sink")) {
@ -1543,7 +1549,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
} else if (spa_streq(str, "duplex")) {
impl->mode = MODE_DUPLEX;
} else {
pw_log_error("invalid tunnel.mode '%s'", str);
pw_log_error("invalid driver.mode '%s'", str);
res = -EINVAL;
goto error;
}

View file

@ -14,6 +14,7 @@ extern "C" {
struct nj2_session_params {
char type[8]; /* packet type ('param') */
#define NJ2_NETWORK_PROTOCOL 8
uint32_t version; /* version */
#define NJ2_ID_FOLLOWER_AVAILABLE 0 /* a follower is available */
#define NJ2_ID_FOLLOWER_SETUP 1 /* follower configuration */