module-netjack2: handle driver reconnect

Let the driver keep announcing.
Keep track of when a message was received in the driver. If we don't
receive anything for a while, reconnect.

Don't connect twide in the manager.
This commit is contained in:
Wim Taymans 2023-06-05 13:30:16 +02:00
parent 8db98b114d
commit 04e17a8b1c
2 changed files with 77 additions and 45 deletions

View file

@ -128,7 +128,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_MIDI_PORTS 1 #define DEFAULT_MIDI_PORTS 1
#define FOLLOWER_INIT_TIMEOUT 1 #define FOLLOWER_INIT_TIMEOUT 1
#define FOLLOWER_INIT_RETRY 5 #define FOLLOWER_INIT_RETRY -1
#define MODULE_USAGE "( remote.name=<remote> ) " \ #define MODULE_USAGE "( remote.name=<remote> ) " \
"( driver.mode=<sink|source|duplex> ) " \ "( driver.mode=<sink|source|duplex> ) " \
@ -229,16 +229,18 @@ struct impl {
struct spa_source *setup_socket; struct spa_source *setup_socket;
struct spa_source *socket; struct spa_source *socket;
struct spa_source *timer; struct spa_source *timer;
uint32_t init_retry; int32_t init_retry;
struct netjack2_peer peer; struct netjack2_peer peer;
uint32_t driving; uint32_t driving;
uint32_t received;
unsigned int triggered:1; unsigned int triggered:1;
unsigned int do_disconnect:1; unsigned int do_disconnect:1;
unsigned int done:1; unsigned int done:1;
unsigned int new_xrun:1; unsigned int new_xrun:1;
unsigned int started:1;
}; };
static void reset_volume(struct volume *vol, uint32_t n_volumes) static void reset_volume(struct volume *vol, uint32_t n_volumes)
@ -253,7 +255,11 @@ static void reset_volume(struct volume *vol, uint32_t n_volumes)
static void stream_destroy(void *d) static void stream_destroy(void *d)
{ {
struct stream *s = d; struct stream *s = d;
uint32_t i;
spa_hook_remove(&s->listener); spa_hook_remove(&s->listener);
for (i = 0; i < s->n_ports; i++)
s->ports[i] = NULL;
s->filter = NULL; s->filter = NULL;
} }
@ -545,8 +551,7 @@ static int make_stream(struct stream *s, const char *name)
n_params = 0; n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer)); spa_pod_builder_init(&b, buffer, sizeof(buffer));
s->filter = pw_filter_new(impl->core, name, s->props); s->filter = pw_filter_new(impl->core, name, pw_properties_copy(s->props));
s->props = NULL;
if (s->filter == NULL) if (s->filter == NULL)
return -errno; return -errno;
@ -603,6 +608,7 @@ on_data_io(void *data, int fd, uint32_t mask)
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pw_log_warn("error:%08x", mask); pw_log_warn("error:%08x", mask);
pw_loop_update_io(impl->data_loop->loop, impl->socket, 0);
return; return;
} }
if (mask & SPA_IO_IN) { if (mask & SPA_IO_IN) {
@ -620,6 +626,7 @@ on_data_io(void *data, int fd, uint32_t mask)
impl->pw_xrun++; impl->pw_xrun++;
impl->new_xrun = true; impl->new_xrun = true;
} }
impl->received++;
source_running = impl->source.running; source_running = impl->source.running;
sink_running = impl->sink.running; sink_running = impl->sink.running;
@ -818,7 +825,6 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
return -EINVAL; return -EINVAL;
} }
update_timer(impl, 0);
pw_loop_update_io(impl->main_loop, impl->setup_socket, 0); pw_loop_update_io(impl->main_loop, impl->setup_socket, 0);
impl->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels; impl->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels;
@ -849,6 +855,12 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
if ((res = create_filters(impl)) < 0) if ((res = create_filters(impl)) < 0)
return res; return res;
peer->fd = impl->socket->fd;
peer->our_stream = 'r';
peer->other_stream = 's';
peer->send_volume = &impl->sink.volume;
peer->recv_volume = &impl->source.volume;
int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu +
peer->params.period_size * sizeof(float) * peer->params.period_size * sizeof(float) *
SPA_MAX(impl->source.n_ports, impl->sink.n_ports)); SPA_MAX(impl->source.n_ports, impl->sink.n_ports));
@ -859,23 +871,17 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
if (setsockopt(impl->socket->fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0) if (setsockopt(impl->socket->fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0)
pw_log_warn("setsockopt(SO_SNDBUF) failed: %m"); pw_log_warn("setsockopt(SO_SNDBUF) failed: %m");
peer->fd = impl->socket->fd;
peer->our_stream = 'r';
peer->other_stream = 's';
peer->send_volume = &impl->sink.volume;
peer->recv_volume = &impl->source.volume;
if (connect(impl->socket->fd, (struct sockaddr*)addr, addr_len) < 0) if (connect(impl->socket->fd, (struct sockaddr*)addr, addr_len) < 0)
goto connect_error; goto connect_error;
impl->started = true;
params->packet_id = htonl(NJ2_ID_START_DRIVER);
send(impl->socket->fd, params, sizeof(*params), 0);
impl->done = true; impl->done = true;
pw_loop_update_io(impl->data_loop->loop, impl->socket, SPA_IO_IN); pw_loop_update_io(impl->data_loop->loop, impl->socket, SPA_IO_IN);
params->packet_id = htonl(NJ2_ID_START_DRIVER);
send(impl->socket->fd, params, sizeof(*params), 0);
return 0; return 0;
connect_error: connect_error:
pw_log_error("connect() failed: %m"); pw_log_error("connect() failed: %m");
return -errno; return -errno;
@ -955,18 +961,6 @@ static int send_follower_available(struct impl *impl)
return 0; return 0;
} }
static void on_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
if (--impl->init_retry == 0) {
pw_log_error("timeout in connect");
update_timer(impl, 0);
return;
}
send_follower_available(impl);
}
static int create_netjack2_socket(struct impl *impl) static int create_netjack2_socket(struct impl *impl)
{ {
const char *str; const char *str;
@ -1008,6 +1002,7 @@ static int create_netjack2_socket(struct impl *impl)
close(fd); close(fd);
goto out; goto out;
} }
impl->socket = pw_loop_add_io(impl->data_loop->loop, fd, impl->socket = pw_loop_add_io(impl->data_loop->loop, fd,
0, false, on_data_io, impl); 0, false, on_data_io, impl);
if (impl->socket == NULL) { if (impl->socket == NULL) {
@ -1016,13 +1011,7 @@ static int create_netjack2_socket(struct impl *impl)
goto out; goto out;
} }
impl->init_retry = FOLLOWER_INIT_RETRY; impl->init_retry = -1;
impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
goto out;
}
update_timer(impl, FOLLOWER_INIT_TIMEOUT); update_timer(impl, FOLLOWER_INIT_TIMEOUT);
return 0; return 0;
@ -1033,16 +1022,29 @@ out:
static int send_stop_driver(struct impl *impl) static int send_stop_driver(struct impl *impl)
{ {
struct nj2_session_params params; struct nj2_session_params params;
impl->started = false;
if (impl->socket)
pw_loop_update_io(impl->data_loop->loop, impl->socket, 0);
pw_log_info("sending STOP_DRIVER"); pw_log_info("sending STOP_DRIVER");
nj2_session_params_hton(&params, &impl->peer.params); nj2_session_params_hton(&params, &impl->peer.params);
params.packet_id = htonl(NJ2_ID_STOP_DRIVER); params.packet_id = htonl(NJ2_ID_STOP_DRIVER);
sendto(impl->setup_socket->fd, &params, sizeof(params), 0, sendto(impl->setup_socket->fd, &params, sizeof(params), 0,
(struct sockaddr*)&impl->dst_addr, impl->dst_len); (struct sockaddr*)&impl->dst_addr, impl->dst_len);
if (impl->source.filter)
pw_filter_destroy(impl->source.filter);
if (impl->sink.filter)
pw_filter_destroy(impl->sink.filter);
return 0; return 0;
} }
static int destroy_netjack2_socket(struct impl *impl) static int destroy_netjack2_socket(struct impl *impl)
{ {
update_timer(impl, 0);
if (impl->socket) { if (impl->socket) {
pw_loop_destroy_source(impl->data_loop->loop, impl->socket); pw_loop_destroy_source(impl->data_loop->loop, impl->socket);
impl->socket = NULL; impl->socket = NULL;
@ -1052,13 +1054,36 @@ static int destroy_netjack2_socket(struct impl *impl)
pw_loop_destroy_source(impl->main_loop, impl->setup_socket); pw_loop_destroy_source(impl->main_loop, impl->setup_socket);
impl->setup_socket = NULL; impl->setup_socket = NULL;
} }
if (impl->timer) {
pw_loop_destroy_source(impl->main_loop, impl->timer);
impl->timer = NULL;
}
return 0; return 0;
} }
static void restart_netjack2_socket(struct impl *impl)
{
destroy_netjack2_socket(impl);
create_netjack2_socket(impl);
}
static void on_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
if (impl->started) {
if (impl->received == 0) {
pw_log_warn("receive timeout, restarting");
restart_netjack2_socket(impl);
}
impl->received = 0;
}
if (!impl->started) {
if (impl->init_retry > 0 && --impl->init_retry == 0) {
pw_log_error("timeout in connect");
update_timer(impl, 0);
pw_impl_module_schedule_destroy(impl->module);
return;
}
send_follower_available(impl);
}
}
static void core_error(void *data, uint32_t id, int seq, int res, const char *message) static void core_error(void *data, uint32_t id, int seq, int res, const char *message)
{ {
@ -1099,6 +1124,9 @@ static void impl_destroy(struct impl *impl)
if (impl->core && impl->do_disconnect) if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core); pw_core_disconnect(impl->core);
if (impl->timer)
pw_loop_destroy_source(impl->main_loop, impl->timer);
pw_properties_free(impl->sink.props); pw_properties_free(impl->sink.props);
pw_properties_free(impl->source.props); pw_properties_free(impl->source.props);
pw_properties_free(impl->props); pw_properties_free(impl->props);
@ -1299,6 +1327,13 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener, &impl->core_listener,
&core_events, impl); &core_events, impl);
impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
goto error;
}
if ((res = create_netjack2_socket(impl)) < 0) if ((res = create_netjack2_socket(impl)) < 0)
goto error; goto error;

View file

@ -263,7 +263,11 @@ static void reset_volume(struct volume *vol, uint32_t n_volumes)
static void stream_destroy(void *d) static void stream_destroy(void *d)
{ {
struct stream *s = d; struct stream *s = d;
uint32_t i;
spa_hook_remove(&s->listener); spa_hook_remove(&s->listener);
for (i = 0; i < s->n_ports; i++)
s->ports[i] = NULL;
s->filter = NULL; s->filter = NULL;
} }
@ -941,9 +945,6 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0) if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0)
pw_log_warn("setsockopt(SO_SNDBUF) failed: %m"); pw_log_warn("setsockopt(SO_SNDBUF) failed: %m");
if (connect(fd, (struct sockaddr*)addr, addr_len) < 0)
goto connect_error;
impl->follower_id++; impl->follower_id++;
nj2_session_params_hton(params, &peer->params); nj2_session_params_hton(params, &peer->params);
@ -962,10 +963,6 @@ socket_failed:
res = fd; res = fd;
pw_log_error("can't create socket: %s", spa_strerror(res)); pw_log_error("can't create socket: %s", spa_strerror(res));
goto cleanup; goto cleanup;
connect_error:
res = -errno;
pw_log_error("connect() failed: %m");
goto cleanup;
cleanup: cleanup:
follower_free(follower); follower_free(follower);
return res; return res;