From 04e17a8b1cadd4123137ecceb66537f73ac3a158 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 5 Jun 2023 13:30:16 +0200 Subject: [PATCH] module-netjack2: handle driver reconnect Let the driver keep announcing. Keep track of when a message was received in the driver. If we don't receive anything for a while, reconnect. Don't connect twide in the manager. --- src/modules/module-netjack2-driver.c | 111 +++++++++++++++++--------- src/modules/module-netjack2-manager.c | 11 +-- 2 files changed, 77 insertions(+), 45 deletions(-) diff --git a/src/modules/module-netjack2-driver.c b/src/modules/module-netjack2-driver.c index 4d9ea83db..4718b78ec 100644 --- a/src/modules/module-netjack2-driver.c +++ b/src/modules/module-netjack2-driver.c @@ -128,7 +128,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define DEFAULT_MIDI_PORTS 1 #define FOLLOWER_INIT_TIMEOUT 1 -#define FOLLOWER_INIT_RETRY 5 +#define FOLLOWER_INIT_RETRY -1 #define MODULE_USAGE "( remote.name= ) " \ "( driver.mode= ) " \ @@ -229,16 +229,18 @@ struct impl { struct spa_source *setup_socket; struct spa_source *socket; struct spa_source *timer; - uint32_t init_retry; + int32_t init_retry; struct netjack2_peer peer; uint32_t driving; + uint32_t received; unsigned int triggered:1; unsigned int do_disconnect:1; unsigned int done:1; unsigned int new_xrun:1; + unsigned int started:1; }; static void reset_volume(struct volume *vol, uint32_t n_volumes) @@ -253,7 +255,11 @@ static void reset_volume(struct volume *vol, uint32_t n_volumes) static void stream_destroy(void *d) { struct stream *s = d; + uint32_t i; + spa_hook_remove(&s->listener); + for (i = 0; i < s->n_ports; i++) + s->ports[i] = NULL; s->filter = NULL; } @@ -545,8 +551,7 @@ static int make_stream(struct stream *s, const char *name) n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); - s->filter = pw_filter_new(impl->core, name, s->props); - s->props = NULL; + s->filter = pw_filter_new(impl->core, name, pw_properties_copy(s->props)); if (s->filter == NULL) return -errno; @@ -603,6 +608,7 @@ on_data_io(void *data, int fd, uint32_t mask) if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_warn("error:%08x", mask); + pw_loop_update_io(impl->data_loop->loop, impl->socket, 0); return; } if (mask & SPA_IO_IN) { @@ -620,6 +626,7 @@ on_data_io(void *data, int fd, uint32_t mask) impl->pw_xrun++; impl->new_xrun = true; } + impl->received++; source_running = impl->source.running; sink_running = impl->sink.running; @@ -818,7 +825,6 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p return -EINVAL; } - update_timer(impl, 0); pw_loop_update_io(impl->main_loop, impl->setup_socket, 0); impl->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels; @@ -849,6 +855,12 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p if ((res = create_filters(impl)) < 0) return res; + peer->fd = impl->socket->fd; + peer->our_stream = 'r'; + peer->other_stream = 's'; + peer->send_volume = &impl->sink.volume; + peer->recv_volume = &impl->source.volume; + int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + peer->params.period_size * sizeof(float) * SPA_MAX(impl->source.n_ports, impl->sink.n_ports)); @@ -859,23 +871,17 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p if (setsockopt(impl->socket->fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0) pw_log_warn("setsockopt(SO_SNDBUF) failed: %m"); - peer->fd = impl->socket->fd; - peer->our_stream = 'r'; - peer->other_stream = 's'; - peer->send_volume = &impl->sink.volume; - peer->recv_volume = &impl->source.volume; - if (connect(impl->socket->fd, (struct sockaddr*)addr, addr_len) < 0) goto connect_error; + impl->started = true; + params->packet_id = htonl(NJ2_ID_START_DRIVER); + send(impl->socket->fd, params, sizeof(*params), 0); + impl->done = true; pw_loop_update_io(impl->data_loop->loop, impl->socket, SPA_IO_IN); - params->packet_id = htonl(NJ2_ID_START_DRIVER); - send(impl->socket->fd, params, sizeof(*params), 0); - return 0; - connect_error: pw_log_error("connect() failed: %m"); return -errno; @@ -955,18 +961,6 @@ static int send_follower_available(struct impl *impl) return 0; } -static void on_timer_event(void *data, uint64_t expirations) -{ - struct impl *impl = data; - - if (--impl->init_retry == 0) { - pw_log_error("timeout in connect"); - update_timer(impl, 0); - return; - } - send_follower_available(impl); -} - static int create_netjack2_socket(struct impl *impl) { const char *str; @@ -1008,6 +1002,7 @@ static int create_netjack2_socket(struct impl *impl) close(fd); goto out; } + impl->socket = pw_loop_add_io(impl->data_loop->loop, fd, 0, false, on_data_io, impl); if (impl->socket == NULL) { @@ -1016,13 +1011,7 @@ static int create_netjack2_socket(struct impl *impl) goto out; } - impl->init_retry = FOLLOWER_INIT_RETRY; - impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl); - if (impl->timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); - goto out; - } + impl->init_retry = -1; update_timer(impl, FOLLOWER_INIT_TIMEOUT); return 0; @@ -1033,16 +1022,29 @@ out: static int send_stop_driver(struct impl *impl) { struct nj2_session_params params; + + impl->started = false; + if (impl->socket) + pw_loop_update_io(impl->data_loop->loop, impl->socket, 0); + pw_log_info("sending STOP_DRIVER"); nj2_session_params_hton(¶ms, &impl->peer.params); params.packet_id = htonl(NJ2_ID_STOP_DRIVER); sendto(impl->setup_socket->fd, ¶ms, sizeof(params), 0, (struct sockaddr*)&impl->dst_addr, impl->dst_len); + + if (impl->source.filter) + pw_filter_destroy(impl->source.filter); + if (impl->sink.filter) + pw_filter_destroy(impl->sink.filter); + return 0; } static int destroy_netjack2_socket(struct impl *impl) { + update_timer(impl, 0); + if (impl->socket) { pw_loop_destroy_source(impl->data_loop->loop, impl->socket); impl->socket = NULL; @@ -1052,13 +1054,36 @@ static int destroy_netjack2_socket(struct impl *impl) pw_loop_destroy_source(impl->main_loop, impl->setup_socket); impl->setup_socket = NULL; } - if (impl->timer) { - pw_loop_destroy_source(impl->main_loop, impl->timer); - impl->timer = NULL; - } return 0; } +static void restart_netjack2_socket(struct impl *impl) +{ + destroy_netjack2_socket(impl); + create_netjack2_socket(impl); +} + +static void on_timer_event(void *data, uint64_t expirations) +{ + struct impl *impl = data; + + if (impl->started) { + if (impl->received == 0) { + pw_log_warn("receive timeout, restarting"); + restart_netjack2_socket(impl); + } + impl->received = 0; + } + if (!impl->started) { + if (impl->init_retry > 0 && --impl->init_retry == 0) { + pw_log_error("timeout in connect"); + update_timer(impl, 0); + pw_impl_module_schedule_destroy(impl->module); + return; + } + send_follower_available(impl); + } +} static void core_error(void *data, uint32_t id, int seq, int res, const char *message) { @@ -1099,6 +1124,9 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); + if (impl->timer) + pw_loop_destroy_source(impl->main_loop, impl->timer); + pw_properties_free(impl->sink.props); pw_properties_free(impl->source.props); pw_properties_free(impl->props); @@ -1299,6 +1327,13 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); + impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl); + if (impl->timer == NULL) { + res = -errno; + pw_log_error("can't create timer source: %m"); + goto error; + } + if ((res = create_netjack2_socket(impl)) < 0) goto error; diff --git a/src/modules/module-netjack2-manager.c b/src/modules/module-netjack2-manager.c index 8fe49cdee..dbd6f5aec 100644 --- a/src/modules/module-netjack2-manager.c +++ b/src/modules/module-netjack2-manager.c @@ -263,7 +263,11 @@ static void reset_volume(struct volume *vol, uint32_t n_volumes) static void stream_destroy(void *d) { struct stream *s = d; + uint32_t i; + spa_hook_remove(&s->listener); + for (i = 0; i < s->n_ports; i++) + s->ports[i] = NULL; s->filter = NULL; } @@ -941,9 +945,6 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0) pw_log_warn("setsockopt(SO_SNDBUF) failed: %m"); - if (connect(fd, (struct sockaddr*)addr, addr_len) < 0) - goto connect_error; - impl->follower_id++; nj2_session_params_hton(params, &peer->params); @@ -962,10 +963,6 @@ socket_failed: res = fd; pw_log_error("can't create socket: %s", spa_strerror(res)); goto cleanup; -connect_error: - res = -errno; - pw_log_error("connect() failed: %m"); - goto cleanup; cleanup: follower_free(follower); return res;