From fb63bb3c5cecc13f78e3f1c058fd03012096ee72 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 12 Jun 2023 17:56:20 +0200 Subject: [PATCH] module-netjack2: start follower after START message --- src/modules/module-netjack2-manager.c | 79 ++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/src/modules/module-netjack2-manager.c b/src/modules/module-netjack2-manager.c index e4ee1d900..b054b6d4c 100644 --- a/src/modules/module-netjack2-manager.c +++ b/src/modules/module-netjack2-manager.c @@ -187,6 +187,7 @@ struct stream { uint32_t active_midi_ports; unsigned int running:1; + unsigned int ready:1; }; struct follower { @@ -210,12 +211,14 @@ struct follower { uint32_t pw_xrun; uint32_t nj2_xrun; + struct spa_source *setup_socket; struct spa_source *socket; struct netjack2_peer peer; unsigned int done:1; unsigned int new_xrun:1; + unsigned int started:1; }; struct impl { @@ -371,6 +374,8 @@ static void follower_free(struct follower *follower) if (follower->socket) pw_loop_destroy_source(impl->data_loop->loop, follower->socket); + if (follower->setup_socket) + pw_loop_destroy_source(impl->main_loop, follower->setup_socket); netjack2_cleanup(&follower->peer); free(follower); @@ -381,6 +386,7 @@ do_stop_follower(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct follower *follower = user_data; + follower->started = false; if (follower->source.filter) pw_filter_set_active(follower->source.filter, false); if (follower->sink.filter) @@ -389,6 +395,63 @@ do_stop_follower(struct spa_loop *loop, return 0; } +static int start_follower(struct follower *follower) +{ + struct impl *impl = follower->impl; + pw_log_info("start follower %s", follower->peer.params.name); + follower->started = true; + if (follower->source.filter && follower->source.ready) + pw_filter_set_active(follower->source.filter, true); + if (follower->sink.filter && follower->sink.ready) + pw_filter_set_active(follower->sink.filter, true); + pw_loop_update_io(impl->main_loop, follower->setup_socket, 0); + return 0; +} + +static void +on_setup_io(void *data, int fd, uint32_t mask) +{ + struct follower *follower = data; + struct impl *impl = follower->impl; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + pw_log_warn("error:%08x", mask); + pw_loop_destroy_source(impl->main_loop, follower->setup_socket); + follower->setup_socket = NULL; + return; + } + if (mask & SPA_IO_IN) { + ssize_t len; + struct nj2_session_params params; + + if ((len = recv(fd, ¶ms, sizeof(params), 0)) < 0) + goto receive_error; + + if (len < (int)sizeof(params)) + goto short_packet; + + if (strcmp(params.type, "params") != 0) + goto wrong_type; + + switch(ntohl(params.packet_id)) { + case NJ2_ID_START_DRIVER: + start_follower(follower); + break; + } + } + return; + +receive_error: + pw_log_warn("recv error: %m"); + return; +short_packet: + pw_log_warn("short packet received"); + return; +wrong_type: + pw_log_warn("wrong packet type received"); + return; +} + static void on_data_io(void *data, int fd, uint32_t mask) { @@ -581,7 +644,9 @@ static void stream_param_changed(void *data, void *port_data, uint32_t id, case SPA_PARAM_PortConfig: pw_log_debug("PortConfig"); make_stream_ports(s); - pw_filter_set_active(s->filter, true); + s->ready = true; + if (s->follower->started) + pw_filter_set_active(s->filter, true); break; case SPA_PARAM_Props: pw_log_debug("Props"); @@ -942,8 +1007,16 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param if (fd < 0) goto socket_failed; + follower->setup_socket = pw_loop_add_io(impl->main_loop, fd, + 0, true, on_setup_io, follower); + if (follower->setup_socket == NULL) { + res = -errno; + pw_log_error("can't create setup source: %m"); + goto socket_failed; + } + follower->socket = pw_loop_add_io(impl->data_loop->loop, fd, - SPA_IO_IN, true, on_data_io, follower); + 0, false, on_data_io, follower); if (follower->socket == NULL) { res = -errno; pw_log_error("can't create data source: %m"); @@ -968,6 +1041,8 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param impl->follower_id++; + pw_loop_update_io(impl->main_loop, follower->setup_socket, SPA_IO_IN); + nj2_session_params_hton(params, &peer->params); params->packet_id = htonl(NJ2_ID_FOLLOWER_SETUP);