module-netjack2: small improvements

Improve node properties
Improve module parameters and docs
Send send and recv buffer sizes.
Stop follower on connection errors.
This commit is contained in:
Wim Taymans 2023-06-02 13:24:49 +02:00
parent 2d253de359
commit 2988d9c831
2 changed files with 127 additions and 89 deletions

View file

@ -113,7 +113,6 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_NET_DSCP 34 /* Default to AES-67 AF41 (34) */
#define MAX_MTU 9000
#define NETWORK_DEFAULT_LATENCY 5
#define NETWORK_MAX_LATENCY 30
#define DEFAULT_CHANNELS 2
@ -127,7 +126,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
"( net.mtu=<MTU to use, default 1500> ) " \
"( net.ttl=<TTL to use, default 1> ) " \
"( net.loop=<loopback, default false> ) " \
"( jack.connect=<bool, autoconnect ports> ) " \
"( netjack2.connect=<bool, autoconnect ports> ) " \
"( midi.ports=<number of midi ports> ) " \
"( audio.channels=<number of channels> ) " \
"( audio.position=<channel map> ) " \
@ -525,8 +524,6 @@ static int netjack2_recv_midi(struct stream *s, struct nj2_packet_header *header
if ((len = recv(follower->socket->fd, follower->recv_buffer, ntohl(header->packet_size), 0)) < 0)
return -errno;
follower->sync.is_last = ntohl(header->is_last);
follower->sync.cycle = ntohl(header->cycle);
follower->sync.num_packets = ntohl(header->num_packets);
@ -546,8 +543,6 @@ static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *heade
if ((len = recv(follower->socket->fd, follower->recv_buffer, ntohl(header->packet_size), 0)) < 0)
return -errno;
follower->sync.is_last = ntohl(header->is_last);
sub_cycle = ntohl(header->sub_cycle);
active_ports = ntohl(header->active_ports);
@ -578,10 +573,6 @@ static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *heade
do_volume(dst, (float*)&ap[1], &s->volume, active_port, sub_period_size);
}
}
if (follower->sync.is_last) {
pw_log_trace_fp("got last audio packet");
}
return 0;
}
@ -595,19 +586,14 @@ static int32_t netjack2_sync_wait(struct follower *follower)
if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, MSG_PEEK)) < 0)
goto receive_error;
if (len < (ssize_t)sizeof(*sync))
goto discard;
//nj2_dump_packet_header(sync);
if (strcmp(sync->type, "header") != 0 ||
ntohl(sync->data_type) != 's' ||
ntohl(sync->data_stream) != 'r' ||
ntohl(sync->id) != follower->params.id)
goto discard;
break;
discard:
if (len >= (ssize_t)sizeof(*sync)) {
//nj2_dump_packet_header(sync);
if (strcmp(sync->type, "header") == 0 &&
ntohl(sync->data_type) == 's' &&
ntohl(sync->data_stream) == 'r' &&
ntohl(sync->id) == follower->params.id)
break;
}
if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, 0)) < 0)
goto receive_error;
}
@ -639,9 +625,10 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t
ssize_t len;
uint32_t count = 0;
struct nj2_packet_header *header = (struct nj2_packet_header *)follower->recv_buffer;
int res = 0;
while (!follower->sync.is_last) {
if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, 0)) < 0)
if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, MSG_PEEK)) < 0)
goto receive_error;
if (len < (ssize_t)sizeof(*header))
@ -654,18 +641,23 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t
pw_log_debug("not our packet");
continue;
}
follower->sync.is_last = ntohl(header->is_last);
switch (ntohl(header->data_type)) {
case 'm':
netjack2_recv_midi(s, header, &count, info, n_info);
res = netjack2_recv_midi(s, header, &count, info, n_info);
break;
case 'a':
netjack2_recv_audio(s, header, &count, info, n_info);
res = netjack2_recv_audio(s, header, &count, info, n_info);
break;
case 's':
pw_log_info("missing last data packet");
return 0;
}
if (res < 0) {
pw_log_warn("recv error: %s", spa_strerror(res));
break;
}
}
follower->sync.cycle = ntohl(header->cycle);
return 0;
@ -695,6 +687,39 @@ static void source_process(void *d, struct spa_io_position *position)
netjack2_recv_data(s, info, s->n_ports);
}
static void follower_free(struct follower *follower)
{
struct impl *impl = follower->impl;
spa_list_remove(&follower->link);
if (follower->source.filter)
pw_filter_destroy(follower->source.filter);
if (follower->sink.filter)
pw_filter_destroy(follower->sink.filter);
pw_properties_free(follower->source.props);
pw_properties_free(follower->sink.props);
if (follower->socket)
pw_loop_destroy_source(impl->data_loop->loop, follower->socket);
free(follower);
}
static int
do_stop_follower(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct follower *follower = user_data;
if (follower->source.filter)
pw_filter_set_active(follower->source.filter, false);
if (follower->sink.filter)
pw_filter_set_active(follower->sink.filter, false);
follower_free(follower);
return 0;
}
static void
on_data_io(void *data, int fd, uint32_t mask)
{
@ -703,6 +728,9 @@ on_data_io(void *data, int fd, uint32_t mask)
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pw_log_warn("error:%08x", mask);
pw_loop_destroy_source(impl->data_loop->loop, follower->socket);
follower->socket = NULL;
pw_loop_invoke(impl->main_loop, do_stop_follower, 1, NULL, 0, false, follower);
return;
}
if (mask & SPA_IO_IN) {
@ -946,10 +974,10 @@ static int create_filters(struct follower *follower)
int res = 0;
if (impl->mode & MODE_SINK)
res = make_stream(&follower->sink, "JACK Sink");
res = make_stream(&follower->sink, "NETJACK2 Send");
if (impl->mode & MODE_SOURCE)
res = make_stream(&follower->source, "JACK Source");
res = make_stream(&follower->source, "NETJACK2 Receive");
return res;
}
@ -1002,7 +1030,7 @@ static bool is_multicast(struct sockaddr *sa, socklen_t salen)
return false;
}
static int make_send_socket(struct sockaddr_storage *sa, socklen_t salen,
static int make_data_socket(struct sockaddr_storage *sa, socklen_t salen,
bool loop, int ttl, int dscp, char *ifname)
{
int af, fd, val, res;
@ -1044,7 +1072,7 @@ error:
return res;
}
static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen,
static int make_announce_socket(struct sockaddr_storage *sa, socklen_t salen,
char *ifname)
{
int af, fd, val, res;
@ -1127,26 +1155,6 @@ static const char *get_ip(const struct sockaddr_storage *sa, char *ip, size_t le
return ip;
}
static void follower_free(struct follower *follower)
{
struct impl *impl = follower->impl;
spa_list_remove(&follower->link);
if (follower->source.filter)
pw_filter_destroy(follower->source.filter);
if (follower->sink.filter)
pw_filter_destroy(follower->sink.filter);
pw_properties_free(follower->source.props);
pw_properties_free(follower->sink.props);
if (follower->socket)
pw_loop_destroy_source(impl->data_loop->loop, follower->socket);
free(follower);
}
static int handle_follower_available(struct impl *impl, struct nj2_session_params *params,
struct sockaddr_storage *addr, socklen_t addr_len)
{
@ -1196,8 +1204,10 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
pw_properties_setf(follower->source.props, PW_KEY_NODE_FORCE_RATE, "1/%u", follower->samplerate);
pw_properties_setf(follower->source.props, PW_KEY_NODE_FORCE_QUANTUM, "%u", follower->period_size);
pw_properties_set(follower->source.props, PW_KEY_NODE_DESCRIPTION, params->name);
pw_properties_set(follower->sink.props, PW_KEY_NODE_DESCRIPTION, params->name);
pw_properties_setf(follower->source.props, PW_KEY_NODE_DESCRIPTION, "%s NETJACK2 from %s",
params->name, params->follower_name);
pw_properties_setf(follower->sink.props, PW_KEY_NODE_DESCRIPTION, "%s NETJACK2 to %s",
params->name, params->follower_name);
snprintf(params->driver_name, sizeof(params->driver_name), "%s", pw_get_host_name());
@ -1248,7 +1258,7 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
if ((res = create_filters(follower)) < 0)
goto create_failed;
fd = make_send_socket(addr, addr_len, impl->loop,
fd = make_data_socket(addr, addr_len, impl->loop,
impl->ttl, impl->dscp, NULL);
if (fd < 0)
goto socket_failed;
@ -1260,6 +1270,17 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
pw_log_error("can't create data source: %m");
goto socket_failed;
}
int bufsize = NETWORK_MAX_LATENCY * (follower->params.mtu +
follower->period_size * sizeof(float) *
SPA_MAX(follower->source.n_ports, follower->sink.n_ports));
pw_log_info("send/recv buffer %d", bufsize);
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) < 0)
pw_log_warn("setsockopt(SO_SNDBUF) failed: %m");
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0)
pw_log_warn("setsockopt(SO_SNDBUF) failed: %m");
if (connect(fd, (struct sockaddr*)addr, addr_len) < 0)
goto connect_error;
@ -1367,7 +1388,7 @@ static int create_netjack2_socket(struct impl *impl)
impl->loop = pw_properties_get_bool(impl->props, "net.loop", DEFAULT_NET_LOOP);
impl->dscp = pw_properties_get_uint32(impl->props, "net.dscp", DEFAULT_NET_DSCP);
fd = make_recv_socket(&impl->src_addr, impl->src_len, NULL);
fd = make_announce_socket(&impl->src_addr, impl->src_len, NULL);
if (fd < 0) {
res = fd;
pw_log_error("can't create socket: %s", spa_strerror(res));
@ -1562,6 +1583,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(props, PW_KEY_NODE_NETWORK) == NULL)
pw_properties_set(props, PW_KEY_NODE_NETWORK, "true");
if (pw_properties_get(props, PW_KEY_NODE_LINK_GROUP) == NULL)
pw_properties_set(props, PW_KEY_NODE_LINK_GROUP, "jack-group");
if (pw_properties_get(props, PW_KEY_NODE_ALWAYS_PROCESS) == NULL)
@ -1573,24 +1596,23 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_properties_set(impl->sink_props, PW_KEY_MEDIA_CLASS, "Audio/Sink");
pw_properties_set(impl->sink_props, PW_KEY_NODE_NAME, "jack_manager_send");
pw_properties_set(impl->sink_props, PW_KEY_NODE_DESCRIPTION, "JACK Send");
pw_properties_set(impl->source_props, PW_KEY_MEDIA_CLASS, "Audio/Source");
pw_properties_set(impl->source_props, PW_KEY_NODE_NAME, "jack_manager_recv");
pw_properties_set(impl->source_props, PW_KEY_NODE_DESCRIPTION, "JACK Receive");
if ((str = pw_properties_get(props, "sink.props")) != NULL)
pw_properties_update_string(impl->sink_props, str, strlen(str));
if ((str = pw_properties_get(props, "source.props")) != NULL)
pw_properties_update_string(impl->source_props, str, strlen(str));
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);
copy_props(impl, props, SPA_KEY_AUDIO_POSITION);
copy_props(impl, props, PW_KEY_NODE_ALWAYS_PROCESS);
copy_props(impl, props, PW_KEY_NODE_LINK_GROUP);
copy_props(impl, props, PW_KEY_NODE_VIRTUAL);
copy_props(impl, props, PW_KEY_NODE_NETWORK);
copy_props(impl, props, PW_KEY_NODE_LINK_GROUP);
copy_props(impl, props, PW_KEY_NODE_ALWAYS_PROCESS);
copy_props(impl, props, PW_KEY_NODE_LOCK_QUANTUM);
copy_props(impl, props, PW_KEY_NODE_LOCK_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);
copy_props(impl, props, SPA_KEY_AUDIO_POSITION);
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {