From cc82715325aacb52f9912735580053c3e7c5002f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sat, 3 Jun 2023 07:14:58 +0200 Subject: [PATCH] module-netjack2: refactor Move common code to a separate file. --- src/modules/module-netjack2-driver.c | 445 +++-------------------- src/modules/module-netjack2-manager.c | 484 +++----------------------- src/modules/module-netjack2/packets.h | 42 +++ src/modules/module-netjack2/peer.c | 432 +++++++++++++++++++++++ 4 files changed, 563 insertions(+), 840 deletions(-) create mode 100644 src/modules/module-netjack2/peer.c diff --git a/src/modules/module-netjack2-driver.c b/src/modules/module-netjack2-driver.c index 242235990..69b4e942c 100644 --- a/src/modules/module-netjack2-driver.c +++ b/src/modules/module-netjack2-driver.c @@ -35,6 +35,7 @@ #include #include "module-netjack2/packets.h" +#include "module-netjack2/peer.c" #ifndef IPTOS_DSCP #define IPTOS_DSCP_MASK 0xfc @@ -161,12 +162,6 @@ struct port { unsigned int is_midi:1; }; -struct volume { - bool mute; - uint32_t n_volumes; - float volumes[SPA_AUDIO_MAX_CHANNELS]; -}; - struct stream { struct impl *impl; @@ -235,10 +230,7 @@ struct impl { struct spa_source *timer; uint32_t init_retry; - struct nj2_session_params params; - struct nj2_packet_header sync; - uint8_t send_buffer[MAX_MTU]; - uint8_t recv_buffer[MAX_MTU]; + struct netjack2_peer peer; uint32_t driving; @@ -246,7 +238,6 @@ struct impl { unsigned int do_disconnect:1; unsigned int done:1; unsigned int new_xrun:1; - unsigned int fix_midi:1; }; static void reset_volume(struct volume *vol, uint32_t n_volumes) @@ -258,79 +249,6 @@ static void reset_volume(struct volume *vol, uint32_t n_volumes) vol->volumes[i] = 1.0f; } -static inline void do_volume(float *dst, const float *src, struct volume *vol, uint32_t ch, uint32_t n_samples) -{ - float v = vol->mute ? 0.0f : vol->volumes[ch]; - - if (v == 0.0f || src == NULL) - memset(dst, 0, n_samples * sizeof(float)); - else if (v == 1.0f) - memcpy(dst, src, n_samples * sizeof(float)); - else { - uint32_t i; - for (i = 0; i < n_samples; i++) - dst[i] = src[i] * v; - } -} - -static inline void fix_midi_event(uint8_t *data, size_t size) -{ - /* fixup NoteOn with vel 0 */ - if (size > 2 && (data[0] & 0xF0) == 0x90 && data[2] == 0x00) { - data[0] = 0x80 + (data[0] & 0x0F); - data[2] = 0x40; - } -} - -static void midi_to_netjack2(struct impl *impl, float *dst, float *src, uint32_t n_samples) -{ - struct spa_pod *pod; - struct spa_pod_sequence *seq; - struct spa_pod_control *c; - - if (src == NULL) - return; - - if ((pod = spa_pod_from_data(src, n_samples * sizeof(float), 0, n_samples * sizeof(float))) == NULL) - return; - if (!spa_pod_is_sequence(pod)) - return; - - seq = (struct spa_pod_sequence*)pod; - - SPA_POD_SEQUENCE_FOREACH(seq, c) { - switch(c->type) { - case SPA_CONTROL_Midi: - { - uint8_t *data = SPA_POD_BODY(&c->value); - size_t size = SPA_POD_BODY_SIZE(&c->value); - - if (impl->fix_midi) - fix_midi_event(data, size); - - break; - } - default: - break; - } - } -} - -static void netjack2_to_midi(float *dst, float *src, uint32_t size) -{ - struct spa_pod_builder b = { 0, }; - uint32_t i, count; - struct spa_pod_frame f; - - count = 0; - - spa_pod_builder_init(&b, dst, size); - spa_pod_builder_push_sequence(&b, &f, 0); - for (i = 0; i < count; i++) { - } - spa_pod_builder_pop(&b, &f); -} - static void stream_destroy(void *d) { struct stream *s = d; @@ -359,170 +277,6 @@ static void stream_state_changed(void *d, enum pw_filter_state old, } } -static int32_t netjack2_sync_wait(struct impl *impl) -{ - struct nj2_packet_header *sync = (struct nj2_packet_header *)impl->recv_buffer; - ssize_t len; - - while (true) { - if ((len = recv(impl->socket->fd, impl->recv_buffer, impl->params.mtu, 0)) < 0) - goto receive_error; - - if (len >= (ssize_t)sizeof(*sync)) { - //nj2_dump_packet_header(sync); - - if (strcmp(sync->type, "header") == 0 && - ntohl(sync->data_type) == 's' && - ntohl(sync->data_stream) == 's' && - ntohl(sync->id) == impl->params.id) - break; - } - } - impl->sync.is_last = ntohl(sync->is_last); - impl->sync.frames = ntohl(sync->frames); - if (impl->sync.frames == -1) - impl->sync.frames = impl->params.period_size; - - return impl->sync.frames; - -receive_error: - pw_log_warn("recv error: %m"); - return 0; -} - -struct data_info { - void *data; - uint32_t id; - bool filled; -}; - -static int netjack2_send_sync(struct stream *s, uint32_t nframes) -{ - struct impl *impl = s->impl; - struct nj2_packet_header *header = (struct nj2_packet_header *)impl->send_buffer; - uint32_t i, packet_size, active_ports, is_last; - int32_t *p; - - /* we always listen on all ports */ - active_ports = impl->params.send_audio_channels; - packet_size = sizeof(*header) + active_ports * sizeof(int32_t); - is_last = impl->params.recv_midi_channels == 0 && - impl->params.recv_audio_channels == 0 ? 1 : 0; - - header->active_ports = htonl(active_ports); - header->frames = htonl(nframes); - header->cycle = htonl(impl->sync.cycle); - header->sub_cycle = 0; - header->data_type = htonl('s'); - header->is_last = htonl(is_last); - header->packet_size = htonl(packet_size); - p = SPA_PTROFF(header, sizeof(*header), int32_t); - for (i = 0; i < active_ports; i++) - p[i] = htonl(i); - send(impl->socket->fd, header, packet_size, 0); - return 0; -} - -static int netjack2_send_midi(struct stream *s, uint32_t nframes, - struct data_info *info, uint32_t n_info) -{ - struct impl *impl = s->impl; - struct nj2_packet_header *header = (struct nj2_packet_header *)impl->send_buffer; - uint32_t i, num_packets, active_ports, data_size, res1, res2, max_size; - int32_t *ap; - - if (impl->params.recv_midi_channels <= 0) - return 0; - - active_ports = impl->params.recv_midi_channels; - ap = SPA_PTROFF(header, sizeof(*header), int32_t); - - data_size = active_ports * sizeof(struct nj2_midi_buffer); - memset(ap, 0, data_size); - - max_size = PACKET_AVAILABLE_SIZE(impl->params.mtu); - - res1 = data_size % max_size; - res2 = data_size / max_size; - num_packets = (res1) ? res2 + 1 : res2; - - header->data_type = htonl('m'); - header->active_ports = htonl(active_ports); - header->num_packets = htonl(num_packets); - - for (i = 0; i < num_packets; i++) { - uint32_t is_last = ((i == num_packets - 1) && impl->params.recv_audio_channels == 0) ? 1 : 0; - uint32_t packet_size = sizeof(*header) + data_size; - - header->sub_cycle = htonl(i); - header->is_last = htonl(is_last); - header->packet_size = htonl(packet_size); - send(impl->socket->fd, header, packet_size, 0); - //nj2_dump_packet_header(header); - } - return 0; -} - -static int netjack2_send_audio(struct stream *s, uint32_t frames, struct data_info *info, uint32_t n_info) -{ - struct impl *impl = s->impl; - struct nj2_packet_header *header = (struct nj2_packet_header *)impl->send_buffer; - uint32_t i, j, active_ports, num_packets; - uint32_t sub_period_size, sub_period_bytes; - - if (impl->params.recv_audio_channels <= 0) - return 0; - - active_ports = n_info; - - if (active_ports == 0) { - sub_period_size = frames; - } else { - uint32_t max_size = PACKET_AVAILABLE_SIZE(impl->params.mtu); - uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / - (active_ports * sizeof(float))) / logf(2.f))); - sub_period_size = SPA_MIN(period, frames); - } - sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); - num_packets = frames / sub_period_size; - - header->data_type = htonl('a'); - header->active_ports = htonl(active_ports); - header->num_packets = htonl(num_packets); - - for (i = 0; i < num_packets; i++) { - uint32_t is_last = (i == num_packets - 1) ? 1 : 0; - uint32_t packet_size = sizeof(*header) + active_ports * sub_period_bytes; - int32_t *ap = SPA_PTROFF(header, sizeof(*header), int32_t); - float *src; - - for (j = 0; j < active_ports; j++) { - ap[0] = htonl(info[j].id); - - src = SPA_PTROFF(info[j].data, i * sub_period_size * sizeof(float), float); - do_volume((float*)&ap[1], src, &s->volume, info[j].id, sub_period_size); - - ap = SPA_PTROFF(ap, sub_period_bytes, int32_t); - } - header->sub_cycle = htonl(i); - header->is_last = htonl(is_last); - header->packet_size = htonl(packet_size); - send(impl->socket->fd, header, packet_size, 0); - //nj2_dump_packet_header(header); - } - return 0; -} - -static int netjack2_send_data(struct stream *s, uint32_t nframes, - struct data_info *midi, uint32_t n_midi, - struct data_info *audio, uint32_t n_audio) -{ - netjack2_send_sync(s, nframes); - netjack2_send_midi(s, nframes, midi, n_midi); - netjack2_send_audio(s, nframes, audio, n_audio); - return 0; -} - static void sink_process(void *d, struct spa_io_position *position) { struct stream *s = d; @@ -550,124 +304,13 @@ static void sink_process(void *d, struct spa_io_position *position) } } - netjack2_send_data(s, nframes, midi, n_midi, audio, n_audio); + netjack2_send_data(&impl->peer, nframes, midi, n_midi, audio, n_audio); pw_log_trace_fp("done %"PRIu64, impl->frame_time); if (impl->driving == MODE_SINK) impl->done = true; } -static int netjack2_recv_midi(struct stream *s, struct nj2_packet_header *header, uint32_t *count, - struct data_info *info, uint32_t n_info) -{ - struct impl *impl = s->impl; - ssize_t len; - - if ((len = recv(impl->socket->fd, impl->recv_buffer, ntohl(header->packet_size), 0)) < 0) - return -errno; - - impl->sync.cycle = ntohl(header->cycle); - impl->sync.num_packets = ntohl(header->num_packets); - - if (++(*count) == impl->sync.num_packets) { - pw_log_trace_fp("got last midi packet"); - } - return 0; -} - -static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *header, uint32_t *count, - struct data_info *info, uint32_t n_info) -{ - struct impl *impl = s->impl; - ssize_t len; - uint32_t i, sub_cycle, sub_period_size, sub_period_bytes, active_ports; - - if ((len = recv(impl->socket->fd, impl->recv_buffer, ntohl(header->packet_size), 0)) < 0) - return -errno; - - sub_cycle = ntohl(header->sub_cycle); - active_ports = ntohl(header->active_ports); - - if (active_ports == 0) { - sub_period_size = impl->sync.frames; - } else { - uint32_t max_size = PACKET_AVAILABLE_SIZE(impl->params.mtu); - uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / - (active_ports * sizeof(float))) / logf(2.f))); - sub_period_size = SPA_MIN(period, (uint32_t)impl->sync.frames); - } - sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); - - for (i = 0; i < active_ports; i++) { - int32_t *ap = SPA_PTROFF(header, sizeof(*header) + i * sub_period_bytes, int32_t); - uint32_t active_port = ntohl(ap[0]); - void *data; - - pw_log_trace_fp("%u/%u %u %u", active_port, n_info, - sub_cycle, sub_period_size); - if (active_port >= n_info) - continue; - - if ((data = info[active_port].data) != NULL) { - float *dst = SPA_PTROFF(data, - sub_cycle * sub_period_size * sizeof(float), - float); - do_volume(dst, (float*)&ap[1], &s->volume, active_port, sub_period_size); - info[active_port].filled = impl->sync.is_last; - } - } - return 0; -} - -static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t n_info) -{ - struct impl *impl = s->impl; - ssize_t len; - uint32_t i, count = 0; - struct nj2_packet_header *header = (struct nj2_packet_header *)impl->recv_buffer; - - while (!impl->sync.is_last) { - if ((len = recv(impl->socket->fd, impl->recv_buffer, impl->params.mtu, MSG_PEEK)) < 0) - goto receive_error; - - if (len < (ssize_t)sizeof(*header)) - goto receive_error; - - //nj2_dump_packet_header(header); - - if (ntohl(header->data_stream) != 's' || - ntohl(header->id) != impl->params.id) { - pw_log_debug("not our packet"); - continue; - } - - impl->sync.is_last = ntohl(header->is_last); - - switch (ntohl(header->data_type)) { - case 'm': - netjack2_recv_midi(s, header, &count, info, n_info); - break; - case 'a': - netjack2_recv_audio(s, header, &count, info, n_info); - break; - case 's': - pw_log_info("missing last data packet"); - impl->sync.is_last = true; - break; - } - } - for (i = 0; i < s->n_ports; i++) { - if (!info[i].filled && info[i].data != NULL) - memset(info[i].data, 0, impl->sync.frames * sizeof(float)); - } - impl->sync.cycle = ntohl(header->cycle); - return 0; - -receive_error: - pw_log_warn("recv error: %m"); - return -errno; -} - static void source_process(void *d, struct spa_io_position *position) { struct stream *s = d; @@ -688,7 +331,7 @@ static void source_process(void *d, struct spa_io_position *position) info[i].id = i; info[i].filled = false; } - netjack2_recv_data(s, info, s->n_ports); + netjack2_recv_data(&impl->peer, info, s->n_ports); } static void stream_io_changed(void *data, void *port_data, uint32_t id, void *area, uint32_t size) @@ -953,7 +596,7 @@ on_data_io(void *data, int fd, uint32_t mask) uint32_t nframes; uint64_t nsec; - nframes = netjack2_sync_wait(impl); + nframes = netjack2_driver_sync_wait(&impl->peer); if (nframes == 0) return; @@ -991,7 +634,7 @@ on_data_io(void *data, int fd, uint32_t mask) c->target_duration = c->duration; } if (!source_running) - netjack2_recv_data(&impl->sink, NULL, 0); + netjack2_recv_data(&impl->peer, NULL, 0); if (impl->mode & MODE_SOURCE && source_running) { impl->done = false; @@ -1008,7 +651,7 @@ on_data_io(void *data, int fd, uint32_t mask) impl->done = true; } if (!sink_running) - netjack2_send_data(&impl->sink, nframes, NULL, 0, NULL, 0); + netjack2_send_data(&impl->peer, nframes, NULL, 0, NULL, 0); } } @@ -1141,35 +784,22 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p struct sockaddr_storage *addr, socklen_t addr_len) { int res; - struct nj2_packet_header *header; + struct netjack2_peer *peer = &impl->peer; pw_log_info("got follower setup"); nj2_dump_session_params(params); - impl->params = *params; - impl->params.version = ntohl(params->version); - impl->params.packet_id = ntohl(params->packet_id); - impl->params.mtu = ntohl(params->mtu); - impl->params.id = ntohl(params->id); - impl->params.transport_sync = ntohl(params->transport_sync); - impl->params.send_audio_channels = ntohl(params->send_audio_channels); - impl->params.recv_audio_channels = ntohl(params->recv_audio_channels); - impl->params.send_midi_channels = ntohl(params->send_midi_channels); - impl->params.recv_midi_channels = ntohl(params->recv_midi_channels); - impl->params.sample_rate = ntohl(params->sample_rate); - impl->params.period_size = ntohl(params->period_size); - impl->params.sample_encoder = ntohl(params->sample_encoder); - impl->params.kbps = ntohl(params->kbps); - impl->params.follower_sync_mode = ntohl(params->follower_sync_mode); - impl->params.network_latency = ntohl(params->network_latency); + nj2_session_params_ntoh(&peer->params, params); + SPA_SWAP(peer->params.send_audio_channels, peer->params.recv_audio_channels); + SPA_SWAP(peer->params.send_midi_channels, peer->params.recv_midi_channels); - if (impl->params.send_audio_channels < 0 || - impl->params.recv_audio_channels < 0 || - impl->params.send_midi_channels < 0 || - impl->params.recv_midi_channels < 0 || - impl->params.sample_rate == 0 || - impl->params.period_size == 0 || - impl->params.sample_encoder != NJ2_ENCODER_FLOAT) { + if (peer->params.send_audio_channels < 0 || + peer->params.recv_audio_channels < 0 || + peer->params.send_midi_channels < 0 || + peer->params.recv_midi_channels < 0 || + peer->params.sample_rate == 0 || + peer->params.period_size == 0 || + peer->params.sample_encoder != NJ2_ENCODER_FLOAT) { pw_log_warn("invalid follower setup"); return -EINVAL; } @@ -1177,29 +807,24 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p update_timer(impl, 0); pw_loop_update_io(impl->main_loop, impl->setup_socket, 0); - impl->source.n_ports = impl->params.send_audio_channels + impl->params.send_midi_channels; - impl->source.info.rate = impl->params.sample_rate; - impl->source.info.channels = impl->params.send_audio_channels; - impl->sink.n_ports = impl->params.recv_audio_channels + impl->params.recv_midi_channels; - impl->sink.info.rate = impl->params.sample_rate; - impl->sink.info.channels = impl->params.recv_audio_channels; - impl->samplerate = impl->params.sample_rate; - - header = (struct nj2_packet_header *)impl->send_buffer; - snprintf(header->type, sizeof(header->type), "header"); - header->data_stream = htonl('r'); - header->id = params->id; + impl->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels; + impl->source.info.rate = peer->params.sample_rate; + impl->source.info.channels = peer->params.send_audio_channels; + impl->sink.n_ports = peer->params.recv_audio_channels + peer->params.recv_midi_channels; + impl->sink.info.rate = peer->params.sample_rate; + impl->sink.info.channels = peer->params.recv_audio_channels; + impl->samplerate = peer->params.sample_rate; pw_properties_setf(impl->sink.props, PW_KEY_NODE_DESCRIPTION, "NETJACK2 to %s", - impl->params.driver_name); + peer->params.driver_name); pw_properties_setf(impl->source.props, PW_KEY_NODE_DESCRIPTION, "NETJACK2 from %s", - impl->params.driver_name); + peer->params.driver_name); if ((res = create_filters(impl)) < 0) return res; - int bufsize = NETWORK_MAX_LATENCY * (impl->params.mtu + - impl->params.period_size * sizeof(float) * + int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + + peer->params.period_size * sizeof(float) * SPA_MAX(impl->source.n_ports, impl->sink.n_ports)); pw_log_info("send/recv buffer %d", bufsize); @@ -1208,6 +833,12 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p if (setsockopt(impl->socket->fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) < 0) pw_log_warn("setsockopt(SO_SNDBUF) failed: %m"); + peer->fd = impl->socket->fd; + peer->our_stream = 'r'; + peer->other_stream = 's'; + peer->send_volume = &impl->sink.volume; + peer->recv_volume = &impl->source.volume; + if (connect(impl->socket->fd, (struct sockaddr*)addr, addr_len) < 0) goto connect_error; @@ -1375,9 +1006,11 @@ out: static int send_stop_driver(struct impl *impl) { + struct nj2_session_params params; pw_log_info("sending STOP_DRIVER"); - impl->params.packet_id = htonl(NJ2_ID_STOP_DRIVER); - sendto(impl->setup_socket->fd, &impl->params, sizeof(impl->params), 0, + nj2_session_params_hton(¶ms, &impl->peer.params); + params.packet_id = htonl(NJ2_ID_STOP_DRIVER); + sendto(impl->setup_socket->fd, ¶ms, sizeof(params), 0, (struct sockaddr*)&impl->dst_addr, impl->dst_len); return 0; } diff --git a/src/modules/module-netjack2-manager.c b/src/modules/module-netjack2-manager.c index a719e016f..2bcbbdf8d 100644 --- a/src/modules/module-netjack2-manager.c +++ b/src/modules/module-netjack2-manager.c @@ -37,6 +37,8 @@ #include "module-netjack2/packets.h" +#include "module-netjack2/peer.c" + #ifndef IPTOS_DSCP #define IPTOS_DSCP_MASK 0xfc #define IPTOS_DSCP(x) ((x) & IPTOS_DSCP_MASK) @@ -159,12 +161,6 @@ struct port { unsigned int is_midi:1; }; -struct volume { - bool mute; - uint32_t n_volumes; - float volumes[SPA_AUDIO_MAX_CHANNELS]; -}; - struct stream { struct impl *impl; struct follower *follower; @@ -211,14 +207,10 @@ struct follower { struct spa_source *socket; - struct nj2_session_params params; - struct nj2_packet_header sync; - uint8_t send_buffer[MAX_MTU]; - uint8_t recv_buffer[MAX_MTU]; + struct netjack2_peer peer; unsigned int done:1; unsigned int new_xrun:1; - unsigned int fix_midi:1; }; struct impl { @@ -268,79 +260,6 @@ static void reset_volume(struct volume *vol, uint32_t n_volumes) vol->volumes[i] = 1.0f; } -static inline void do_volume(float *dst, const float *src, struct volume *vol, uint32_t ch, uint32_t n_samples) -{ - float v = vol->mute ? 0.0f : vol->volumes[ch]; - - if (v == 0.0f || src == NULL) - memset(dst, 0, n_samples * sizeof(float)); - else if (v == 1.0f) - memcpy(dst, src, n_samples * sizeof(float)); - else { - uint32_t i; - for (i = 0; i < n_samples; i++) - dst[i] = src[i] * v; - } -} - -static inline void fix_midi_event(uint8_t *data, size_t size) -{ - /* fixup NoteOn with vel 0 */ - if (size > 2 && (data[0] & 0xF0) == 0x90 && data[2] == 0x00) { - data[0] = 0x80 + (data[0] & 0x0F); - data[2] = 0x40; - } -} - -static void midi_to_netjack2(struct follower *follower, float *dst, float *src, uint32_t n_samples) -{ - struct spa_pod *pod; - struct spa_pod_sequence *seq; - struct spa_pod_control *c; - - if (src == NULL) - return; - - if ((pod = spa_pod_from_data(src, n_samples * sizeof(float), 0, n_samples * sizeof(float))) == NULL) - return; - if (!spa_pod_is_sequence(pod)) - return; - - seq = (struct spa_pod_sequence*)pod; - - SPA_POD_SEQUENCE_FOREACH(seq, c) { - switch(c->type) { - case SPA_CONTROL_Midi: - { - uint8_t *data = SPA_POD_BODY(&c->value); - size_t size = SPA_POD_BODY_SIZE(&c->value); - - if (follower->fix_midi) - fix_midi_event(data, size); - - break; - } - default: - break; - } - } -} - -static void netjack2_to_midi(float *dst, float *src, uint32_t size) -{ - struct spa_pod_builder b = { 0, }; - uint32_t i, count; - struct spa_pod_frame f; - - count = 0; - - spa_pod_builder_init(&b, dst, size); - spa_pod_builder_push_sequence(&b, &f, 0); - for (i = 0; i < count; i++) { - } - spa_pod_builder_pop(&b, &f); -} - static void stream_destroy(void *d) { struct stream *s = d; @@ -369,142 +288,10 @@ static void stream_state_changed(void *d, enum pw_filter_state old, } } -struct data_info { - void *data; - uint32_t id; - bool filled; -}; - -static int netjack2_send_sync(struct stream *s, uint32_t nframes) -{ - struct follower *follower = s->follower; - struct nj2_packet_header *header = (struct nj2_packet_header *)follower->send_buffer; - uint32_t i, packet_size, active_ports, is_last; - int32_t *p; - - /* we always listen on all ports */ - active_ports = follower->params.recv_audio_channels; - packet_size = sizeof(*header) + active_ports * sizeof(int32_t); - is_last = follower->params.send_midi_channels == 0 && - follower->params.send_audio_channels == 0 ? 1 : 0; - - header->active_ports = htonl(active_ports); - header->frames = htonl(nframes); - header->cycle = htonl(follower->cycle++); - header->sub_cycle = 0; - header->data_type = htonl('s'); - header->is_last = htonl(is_last); - header->packet_size = htonl(packet_size); - p = SPA_PTROFF(header, sizeof(*header), int32_t); - for (i = 0; i < active_ports; i++) - p[i] = htonl(i); - send(follower->socket->fd, header, packet_size, 0); - return 0; -} - -static int netjack2_send_midi(struct stream *s, uint32_t nframes, - struct data_info *info, uint32_t n_info) -{ - struct follower *follower = s->follower; - struct nj2_packet_header *header = (struct nj2_packet_header *)follower->send_buffer; - uint32_t i, num_packets, active_ports, data_size, res1, res2, max_size; - int32_t *ap; - - if (follower->params.send_midi_channels <= 0) - return 0; - - active_ports = follower->params.send_midi_channels; - ap = SPA_PTROFF(header, sizeof(*header), int32_t); - - data_size = active_ports * sizeof(struct nj2_midi_buffer); - memset(ap, 0, data_size); - - max_size = PACKET_AVAILABLE_SIZE(follower->params.mtu); - - res1 = data_size % max_size; - res2 = data_size / max_size; - num_packets = (res1) ? res2 + 1 : res2; - - header->data_type = htonl('m'); - header->active_ports = htonl(active_ports); - header->num_packets = htonl(num_packets); - - for (i = 0; i < num_packets; i++) { - uint32_t is_last = ((i == num_packets - 1) && follower->params.recv_audio_channels == 0) ? 1 : 0; - uint32_t packet_size = sizeof(*header) + data_size; - - header->sub_cycle = htonl(i); - header->is_last = htonl(is_last); - header->packet_size = htonl(packet_size); - send(follower->socket->fd, header, packet_size, 0); - //nj2_dump_packet_header(header); - } - return 0; -} - -static int netjack2_send_audio(struct stream *s, uint32_t frames, struct data_info *info, uint32_t n_info) -{ - struct follower *follower = s->follower; - struct nj2_packet_header *header = (struct nj2_packet_header *)follower->send_buffer; - uint32_t i, j, active_ports, num_packets; - uint32_t sub_period_size, sub_period_bytes; - - if (follower->params.send_audio_channels <= 0) - return 0; - - active_ports = n_info; - - if (active_ports == 0) { - sub_period_size = frames; - } else { - uint32_t max_size = PACKET_AVAILABLE_SIZE(follower->params.mtu); - uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / - (active_ports * sizeof(float))) / logf(2.f))); - sub_period_size = SPA_MIN(period, frames); - } - sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); - num_packets = frames / sub_period_size; - - header->data_type = htonl('a'); - header->active_ports = htonl(active_ports); - header->num_packets = htonl(num_packets); - - for (i = 0; i < num_packets; i++) { - uint32_t is_last = (i == num_packets - 1) ? 1 : 0; - uint32_t packet_size = sizeof(*header) + active_ports * sub_period_bytes; - int32_t *ap = SPA_PTROFF(header, sizeof(*header), int32_t); - float *src; - - for (j = 0; j < active_ports; j++) { - ap[0] = htonl(info[j].id); - - src = SPA_PTROFF(info[j].data, i * sub_period_size * sizeof(float), float); - do_volume((float*)&ap[1], src, &s->volume, info[j].id, sub_period_size); - - ap = SPA_PTROFF(ap, sub_period_bytes, int32_t); - } - header->sub_cycle = htonl(i); - header->is_last = htonl(is_last); - header->packet_size = htonl(packet_size); - send(follower->socket->fd, header, packet_size, 0); - //nj2_dump_packet_header(header); - } - return 0; -} - -static int netjack2_send_data(struct stream *s, uint32_t nframes, - struct data_info *midi, uint32_t n_midi, - struct data_info *audio, uint32_t n_audio) -{ - netjack2_send_sync(s, nframes); - netjack2_send_midi(s, nframes, midi, n_midi); - netjack2_send_audio(s, nframes, audio, n_audio); - return 0; -} - static void sink_process(void *d, struct spa_io_position *position) { struct stream *s = d; + struct follower *follower = s->follower; uint32_t i, nframes = position->clock.duration; struct data_info midi[s->n_ports]; struct data_info audio[s->n_ports]; @@ -522,175 +309,16 @@ static void sink_process(void *d, struct spa_io_position *position) audio[n_audio++].id = i; } } - netjack2_send_data(s, nframes, midi, n_midi, audio, n_audio); + follower->peer.cycle++; + netjack2_send_data(&follower->peer, nframes, midi, n_midi, audio, n_audio); - pw_loop_update_io(s->impl->data_loop->loop, s->follower->socket, SPA_IO_IN); -} - -static int netjack2_recv_midi(struct stream *s, struct nj2_packet_header *header, uint32_t *count, - struct data_info *info, uint32_t n_info) -{ - struct follower *follower = s->follower; - ssize_t len; - - if ((len = recv(follower->socket->fd, follower->recv_buffer, ntohl(header->packet_size), 0)) < 0) - return -errno; - - follower->sync.cycle = ntohl(header->cycle); - follower->sync.num_packets = ntohl(header->num_packets); - - if (++(*count) == follower->sync.num_packets) { - pw_log_trace_fp("got last midi packet"); - } - return 0; -} - -static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *header, uint32_t *count, - struct data_info *info, uint32_t n_info) -{ - struct follower *follower = s->follower; - ssize_t len; - uint32_t i, sub_cycle, sub_period_size, sub_period_bytes, active_ports; - - if ((len = recv(follower->socket->fd, follower->recv_buffer, ntohl(header->packet_size), 0)) < 0) - return -errno; - - sub_cycle = ntohl(header->sub_cycle); - active_ports = ntohl(header->active_ports); - - if (active_ports == 0) { - sub_period_size = follower->sync.frames; - } else { - uint32_t max_size = PACKET_AVAILABLE_SIZE(follower->params.mtu); - uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / - (active_ports * sizeof(float))) / logf(2.f))); - sub_period_size = SPA_MIN(period, (uint32_t)follower->sync.frames); - } - sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); - - for (i = 0; i < active_ports; i++) { - int32_t *ap = SPA_PTROFF(header, sizeof(*header) + i * sub_period_bytes, int32_t); - uint32_t active_port = ntohl(ap[0]); - void *data; - - pw_log_trace_fp("%u/%u %u %u", active_port, n_info, - sub_cycle, sub_period_size); - if (active_port >= n_info) - continue; - - if ((data = info[active_port].data) != NULL) { - float *dst = SPA_PTROFF(data, - sub_cycle * sub_period_size * sizeof(float), - float); - do_volume(dst, (float*)&ap[1], &s->volume, active_port, sub_period_size); - info[active_port].filled = follower->sync.is_last; - } - } - return 0; -} - -static int32_t netjack2_sync_wait(struct follower *follower) -{ - struct nj2_packet_header *sync = (struct nj2_packet_header *)follower->recv_buffer; - ssize_t len; - int32_t offset; - - while (true) { - if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, MSG_PEEK)) < 0) - goto receive_error; - - if (len >= (ssize_t)sizeof(*sync)) { - //nj2_dump_packet_header(sync); - if (strcmp(sync->type, "header") == 0 && - ntohl(sync->data_type) == 's' && - ntohl(sync->data_stream) == 'r' && - ntohl(sync->id) == follower->params.id) - break; - } - if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, 0)) < 0) - goto receive_error; - } - follower->sync.cycle = ntohl(sync->cycle); - follower->sync.is_last = ntohl(sync->is_last); - follower->sync.frames = ntohl(sync->frames); - if (follower->sync.frames == -1) - follower->sync.frames = follower->params.period_size; - - offset = follower->cycle - follower->sync.cycle; - if (offset < (int32_t)follower->params.network_latency) { - pw_log_info("sync offset %d %d %d", follower->cycle, follower->sync.cycle, offset); - follower->sync.is_last = true; - return 0; - } else { - if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, 0)) < 0) - goto receive_error; - } - return follower->sync.frames; - -receive_error: - pw_log_warn("recv error: %m"); - return 0; -} - - -static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t n_info) -{ - struct follower *follower = s->follower; - ssize_t len; - uint32_t i, count = 0; - struct nj2_packet_header *header = (struct nj2_packet_header *)follower->recv_buffer; - int res = 0; - - netjack2_sync_wait(follower); - - while (!follower->sync.is_last) { - if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, MSG_PEEK)) < 0) - goto receive_error; - - if (len < (ssize_t)sizeof(*header)) - goto receive_error; - - //nj2_dump_packet_header(header); - - if (ntohl(header->data_stream) != 'r' || - ntohl(header->id) != follower->params.id) { - pw_log_debug("not our packet"); - continue; - } - follower->sync.is_last = ntohl(header->is_last); - - switch (ntohl(header->data_type)) { - case 'm': - res = netjack2_recv_midi(s, header, &count, info, n_info); - break; - case 'a': - res = netjack2_recv_audio(s, header, &count, info, n_info); - break; - case 's': - pw_log_info("missing last data packet"); - follower->sync.is_last = true; - break; - } - if (res < 0) { - pw_log_warn("recv error: %s", spa_strerror(res)); - break; - } - } - for (i = 0; i < n_info; i++) { - if (!info[i].filled && info[i].data != NULL) - memset(info[i].data, 0, follower->sync.frames * sizeof(float)); - } - follower->sync.cycle = ntohl(header->cycle); - return 0; - -receive_error: - pw_log_warn("recv error: %m"); - return -errno; + pw_loop_update_io(s->impl->data_loop->loop, follower->socket, SPA_IO_IN); } static void source_process(void *d, struct spa_io_position *position) { struct stream *s = d; + struct follower *follower = s->follower; uint32_t i, n_samples = position->clock.duration; struct data_info info[s->n_ports]; @@ -700,7 +328,8 @@ static void source_process(void *d, struct spa_io_position *position) info[i].id = i; info[i].filled = false; } - netjack2_recv_data(s, info, s->n_ports); + netjack2_manager_sync_wait(&follower->peer); + netjack2_recv_data(&follower->peer, info, s->n_ports); } static void follower_free(struct follower *follower) @@ -1175,9 +804,9 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param struct sockaddr_storage *addr, socklen_t addr_len) { int res, fd; - struct nj2_packet_header *header; struct follower *follower; char buffer[256]; + struct netjack2_peer *peer; pw_log_info("got follower available"); nj2_dump_session_params(params); @@ -1195,6 +824,8 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param follower->id = impl->follower_id; spa_list_append(&impl->follower_list, &follower->link); + peer = &follower->peer; + follower->source.impl = impl; follower->source.follower = follower; follower->source.direction = PW_DIRECTION_OUTPUT; @@ -1215,48 +846,45 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param follower->samplerate = impl->samplerate; follower->period_size = impl->period_size; - pw_properties_setf(follower->sink.props, PW_KEY_NODE_FORCE_RATE, "1/%u", follower->samplerate); - pw_properties_setf(follower->sink.props, PW_KEY_NODE_FORCE_QUANTUM, "%u", follower->period_size); - pw_properties_setf(follower->source.props, PW_KEY_NODE_FORCE_RATE, "1/%u", follower->samplerate); - pw_properties_setf(follower->source.props, PW_KEY_NODE_FORCE_QUANTUM, "%u", follower->period_size); + pw_properties_setf(follower->sink.props, PW_KEY_NODE_FORCE_RATE, + "1/%u", follower->samplerate); + pw_properties_setf(follower->sink.props, PW_KEY_NODE_FORCE_QUANTUM, + "%u", follower->period_size); + pw_properties_setf(follower->source.props, PW_KEY_NODE_FORCE_RATE, + "1/%u", follower->samplerate); + pw_properties_setf(follower->source.props, PW_KEY_NODE_FORCE_QUANTUM, + "%u", follower->period_size); + + nj2_session_params_ntoh(&peer->params, params); pw_properties_setf(follower->source.props, PW_KEY_NODE_DESCRIPTION, "%s NETJACK2 from %s", params->name, params->follower_name); pw_properties_setf(follower->sink.props, PW_KEY_NODE_DESCRIPTION, "%s NETJACK2 to %s", params->name, params->follower_name); + peer->params.mtu = impl->mtu; + peer->params.id = follower->id; snprintf(params->driver_name, sizeof(params->driver_name), "%s", pw_get_host_name()); + peer->params.sample_rate = follower->samplerate; + peer->params.period_size = follower->period_size; + peer->params.sample_encoder = NJ2_ENCODER_FLOAT; + peer->params.kbps = 0; - follower->params = *params; - follower->params.mtu = impl->mtu; - follower->params.id = follower->id; - follower->params.transport_sync = ntohl(params->transport_sync); - follower->params.send_audio_channels = ntohl(params->send_audio_channels); - follower->params.recv_audio_channels = ntohl(params->recv_audio_channels); - follower->params.send_midi_channels = ntohl(params->send_midi_channels); - follower->params.recv_midi_channels = ntohl(params->recv_midi_channels); - follower->params.sample_rate = follower->samplerate; - follower->params.period_size = follower->period_size; - follower->params.sample_encoder = NJ2_ENCODER_FLOAT; - follower->params.kbps = 0; - follower->params.follower_sync_mode = ntohl(params->follower_sync_mode); - follower->params.network_latency = ntohl(params->network_latency); + if (peer->params.send_audio_channels < 0) + peer->params.send_audio_channels = follower->sink.info.channels; + if (peer->params.recv_audio_channels < 0) + peer->params.recv_audio_channels = follower->source.info.channels; + if (peer->params.send_midi_channels < 0) + peer->params.send_midi_channels = follower->sink.n_midi; + if (peer->params.recv_midi_channels < 0) + peer->params.recv_midi_channels = follower->source.n_midi; - if (follower->params.send_audio_channels < 0) - follower->params.send_audio_channels = follower->sink.info.channels; - if (follower->params.recv_audio_channels < 0) - follower->params.recv_audio_channels = follower->source.info.channels; - if (follower->params.send_midi_channels < 0) - follower->params.send_midi_channels = follower->sink.n_midi; - if (follower->params.recv_midi_channels < 0) - follower->params.recv_midi_channels = follower->source.n_midi; - - follower->source.n_ports = follower->params.send_audio_channels + follower->params.send_midi_channels; - follower->source.info.rate = follower->params.sample_rate; - follower->source.info.channels = follower->params.send_audio_channels; - follower->sink.n_ports = follower->params.recv_audio_channels + follower->params.recv_midi_channels; - follower->sink.info.rate = follower->params.sample_rate; - follower->sink.info.channels = follower->params.recv_audio_channels; + follower->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels; + follower->source.info.rate = peer->params.sample_rate; + follower->source.info.channels = peer->params.send_audio_channels; + follower->sink.n_ports = peer->params.recv_audio_channels + peer->params.recv_midi_channels; + follower->sink.info.rate = peer->params.sample_rate; + follower->sink.info.channels = peer->params.recv_audio_channels; follower->source.n_ports = follower->source.n_midi + follower->source.info.channels; follower->sink.n_ports = follower->sink.n_midi + follower->sink.info.channels; @@ -1266,11 +894,6 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param goto cleanup; } - header = (struct nj2_packet_header *)follower->send_buffer; - snprintf(header->type, sizeof(header->type), "header"); - header->data_stream = htonl('s'); - header->id = params->id; - if ((res = create_filters(follower)) < 0) goto create_failed; @@ -1286,8 +909,13 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param pw_log_error("can't create data source: %m"); goto socket_failed; } + peer->fd = fd; + peer->our_stream = 's'; + peer->other_stream = 'r'; + peer->send_volume = &follower->sink.volume; + peer->recv_volume = &follower->source.volume; - int bufsize = NETWORK_MAX_LATENCY * (follower->params.mtu + + int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + follower->period_size * sizeof(float) * SPA_MAX(follower->source.n_ports, follower->sink.n_ports)); @@ -1302,20 +930,8 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param impl->follower_id++; + nj2_session_params_hton(params, &peer->params); params->packet_id = htonl(NJ2_ID_FOLLOWER_SETUP); - params->mtu = htonl(follower->params.mtu); - params->id = htonl(follower->params.id); - params->transport_sync = htonl(follower->params.transport_sync); - params->send_audio_channels = htonl(follower->params.send_audio_channels); - params->recv_audio_channels = htonl(follower->params.recv_audio_channels); - params->send_midi_channels = htonl(follower->params.send_midi_channels); - params->recv_midi_channels = htonl(follower->params.recv_midi_channels); - params->sample_rate = htonl(follower->params.sample_rate); - params->period_size = htonl(follower->params.period_size); - params->sample_encoder = htonl(follower->params.sample_encoder); - params->kbps = htonl(follower->params.kbps); - params->follower_sync_mode = htonl(follower->params.follower_sync_mode); - params->network_latency = htonl(follower->params.network_latency); pw_log_info("sending follower setup to %s", get_ip(addr, buffer, sizeof(buffer))); nj2_dump_session_params(params); diff --git a/src/modules/module-netjack2/packets.h b/src/modules/module-netjack2/packets.h index 8057b8d3d..90ad7641c 100644 --- a/src/modules/module-netjack2/packets.h +++ b/src/modules/module-netjack2/packets.h @@ -67,6 +67,48 @@ static inline void nj2_dump_session_params(struct nj2_session_params *params) pw_log_info("Latency: %u", ntohl(params->network_latency)); } +static inline void nj2_session_params_ntoh(struct nj2_session_params *host, + const struct nj2_session_params *net) +{ + memcpy(host, net, sizeof(*host)); + host->version = ntohl(net->version); + host->packet_id = ntohl(net->packet_id); + host->mtu = ntohl(net->mtu); + host->id = ntohl(net->id); + host->transport_sync = ntohl(net->transport_sync); + host->send_audio_channels = ntohl(net->send_audio_channels); + host->recv_audio_channels = ntohl(net->recv_audio_channels); + host->send_midi_channels = ntohl(net->send_midi_channels); + host->recv_midi_channels = ntohl(net->recv_midi_channels); + host->sample_rate = ntohl(net->sample_rate); + host->period_size = ntohl(net->period_size); + host->sample_encoder = ntohl(net->sample_encoder); + host->kbps = ntohl(net->kbps); + host->follower_sync_mode = ntohl(net->follower_sync_mode); + host->network_latency = ntohl(net->network_latency); +} + +static inline void nj2_session_params_hton(struct nj2_session_params *net, + const struct nj2_session_params *host) +{ + memcpy(net, host, sizeof(*net)); + net->version = htonl(host->version); + net->packet_id = htonl(host->packet_id); + net->mtu = htonl(host->mtu); + net->id = htonl(host->id); + net->transport_sync = htonl(host->transport_sync); + net->send_audio_channels = htonl(host->send_audio_channels); + net->recv_audio_channels = htonl(host->recv_audio_channels); + net->send_midi_channels = htonl(host->send_midi_channels); + net->recv_midi_channels = htonl(host->recv_midi_channels); + net->sample_rate = htonl(host->sample_rate); + net->period_size = htonl(host->period_size); + net->sample_encoder = htonl(host->sample_encoder); + net->kbps = htonl(host->kbps); + net->follower_sync_mode = htonl(host->follower_sync_mode); + net->network_latency = htonl(host->network_latency); +} + struct nj2_packet_header { char type[8]; /* packet type ('headr') */ uint32_t data_type; /* 'a' for audio, 'm' for midi and 's' for sync */ diff --git a/src/modules/module-netjack2/peer.c b/src/modules/module-netjack2/peer.c new file mode 100644 index 000000000..11a8bbbba --- /dev/null +++ b/src/modules/module-netjack2/peer.c @@ -0,0 +1,432 @@ + +struct volume { + bool mute; + uint32_t n_volumes; + float volumes[SPA_AUDIO_MAX_CHANNELS]; +}; + +static inline void do_volume(float *dst, const float *src, struct volume *vol, uint32_t ch, uint32_t n_samples) +{ + float v = vol->mute ? 0.0f : vol->volumes[ch]; + + if (v == 0.0f || src == NULL) + memset(dst, 0, n_samples * sizeof(float)); + else if (v == 1.0f) + memcpy(dst, src, n_samples * sizeof(float)); + else { + uint32_t i; + for (i = 0; i < n_samples; i++) + dst[i] = src[i] * v; + } +} + +struct netjack2_peer { + int fd; + + uint32_t our_stream; + uint32_t other_stream; + struct nj2_session_params params; + struct nj2_packet_header sync; + uint32_t cycle; + + struct volume *send_volume; + struct volume *recv_volume; + + unsigned fix_midi:1; +}; + +static inline void fix_midi_event(uint8_t *data, size_t size) +{ + /* fixup NoteOn with vel 0 */ + if (size > 2 && (data[0] & 0xF0) == 0x90 && data[2] == 0x00) { + data[0] = 0x80 + (data[0] & 0x0F); + data[2] = 0x40; + } +} + +static inline void midi_to_netjack2(struct netjack2_peer *peer, float *dst, + float *src, uint32_t n_samples) +{ + struct spa_pod *pod; + struct spa_pod_sequence *seq; + struct spa_pod_control *c; + + if (src == NULL) + return; + + if ((pod = spa_pod_from_data(src, n_samples * sizeof(float), + 0, n_samples * sizeof(float))) == NULL) + return; + if (!spa_pod_is_sequence(pod)) + return; + + seq = (struct spa_pod_sequence*)pod; + + SPA_POD_SEQUENCE_FOREACH(seq, c) { + switch(c->type) { + case SPA_CONTROL_Midi: + { + uint8_t *data = SPA_POD_BODY(&c->value); + size_t size = SPA_POD_BODY_SIZE(&c->value); + + if (peer->fix_midi) + fix_midi_event(data, size); + + break; + } + default: + break; + } + } +} + +static inline void netjack2_to_midi(float *dst, float *src, uint32_t size) +{ + struct spa_pod_builder b = { 0, }; + uint32_t i, count; + struct spa_pod_frame f; + + count = 0; + spa_pod_builder_init(&b, dst, size); + spa_pod_builder_push_sequence(&b, &f, 0); + for (i = 0; i < count; i++) { + } + spa_pod_builder_pop(&b, &f); +} + +struct data_info { + void *data; + uint32_t id; + bool filled; +}; + +static int netjack2_send_sync(struct netjack2_peer *peer, uint32_t nframes) +{ + struct nj2_packet_header header; + uint8_t buffer[peer->params.mtu]; + uint32_t i, packet_size, active_ports, is_last; + int32_t *p; + + /* we always listen on all ports */ + active_ports = peer->params.recv_audio_channels; + packet_size = sizeof(header) + active_ports * sizeof(int32_t); + is_last = peer->params.send_midi_channels == 0 && + peer->params.send_audio_channels == 0 ? 1 : 0; + + strcpy(header.type, "header"); + header.data_type = htonl('s'); + header.data_stream = htonl(peer->our_stream); + header.id = htonl(peer->params.id); + header.num_packets = 0; + header.packet_size = htonl(packet_size); + header.active_ports = htonl(active_ports); + header.cycle = htonl(peer->cycle); + header.sub_cycle = 0; + header.frames = htonl(nframes); + header.is_last = htonl(is_last); + + memcpy(buffer, &header, sizeof(header)); + p = SPA_PTROFF(buffer, sizeof(header), int32_t); + for (i = 0; i < active_ports; i++) + p[i] = htonl(i); + send(peer->fd, buffer, packet_size, 0); + return 0; +} + +static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, + struct data_info *info, uint32_t n_info) +{ + struct nj2_packet_header header; + uint8_t buffer[peer->params.mtu]; + uint32_t i, num_packets, active_ports, data_size, res1, res2, max_size; + int32_t *ap; + + if (peer->params.send_midi_channels <= 0) + return 0; + + active_ports = peer->params.send_midi_channels; + ap = SPA_PTROFF(buffer, sizeof(header), int32_t); + + data_size = active_ports * sizeof(struct nj2_midi_buffer); + memset(ap, 0, data_size); + + max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); + + res1 = data_size % max_size; + res2 = data_size / max_size; + num_packets = (res1) ? res2 + 1 : res2; + + strcpy(header.type, "header"); + header.data_type = htonl('m'); + header.data_stream = htonl(peer->our_stream); + header.id = htonl(peer->params.id); + header.cycle = htonl(peer->cycle); + header.active_ports = htonl(active_ports); + header.num_packets = htonl(num_packets); + + for (i = 0; i < num_packets; i++) { + uint32_t is_last = ((i == num_packets - 1) && peer->params.send_audio_channels == 0) ? 1 : 0; + uint32_t packet_size = sizeof(header) + data_size; + + header.sub_cycle = htonl(i); + header.is_last = htonl(is_last); + header.packet_size = htonl(packet_size); + memcpy(buffer, &header, sizeof(header)); + send(peer->fd, buffer, packet_size, 0); + //nj2_dump_packet_header(&header); + } + return 0; +} + +static int netjack2_send_audio(struct netjack2_peer *peer, uint32_t frames, + struct data_info *info, uint32_t n_info) +{ + struct nj2_packet_header header; + uint8_t buffer[peer->params.mtu]; + uint32_t i, j, active_ports, num_packets; + uint32_t sub_period_size, sub_period_bytes; + + if (peer->params.send_audio_channels <= 0) + return 0; + + active_ports = n_info; + + if (active_ports == 0) { + sub_period_size = frames; + } else { + uint32_t max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); + uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / + (active_ports * sizeof(float))) / logf(2.f))); + sub_period_size = SPA_MIN(period, frames); + } + sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); + num_packets = frames / sub_period_size; + + strcpy(header.type, "header"); + header.data_type = htonl('a'); + header.data_stream = htonl(peer->our_stream); + header.id = htonl(peer->params.id); + header.cycle = htonl(peer->cycle); + header.active_ports = htonl(active_ports); + header.num_packets = htonl(num_packets); + + for (i = 0; i < num_packets; i++) { + uint32_t is_last = (i == num_packets - 1) ? 1 : 0; + uint32_t packet_size = sizeof(header) + active_ports * sub_period_bytes; + int32_t *ap = SPA_PTROFF(buffer, sizeof(header), int32_t); + float *src; + + for (j = 0; j < active_ports; j++) { + ap[0] = htonl(info[j].id); + + src = SPA_PTROFF(info[j].data, i * sub_period_size * sizeof(float), float); + do_volume((float*)&ap[1], src, peer->send_volume, info[j].id, sub_period_size); + + ap = SPA_PTROFF(ap, sub_period_bytes, int32_t); + } + header.sub_cycle = htonl(i); + header.is_last = htonl(is_last); + header.packet_size = htonl(packet_size); + memcpy(buffer, &header, sizeof(header)); + send(peer->fd, buffer, packet_size, 0); + //nj2_dump_packet_header(&header); + } + return 0; +} + +static int netjack2_send_data(struct netjack2_peer *peer, uint32_t nframes, + struct data_info *midi, uint32_t n_midi, + struct data_info *audio, uint32_t n_audio) +{ + netjack2_send_sync(peer, nframes); + netjack2_send_midi(peer, nframes, midi, n_midi); + netjack2_send_audio(peer, nframes, audio, n_audio); + return 0; +} + +static inline int32_t netjack2_driver_sync_wait(struct netjack2_peer *peer) +{ + struct nj2_packet_header sync; + ssize_t len; + + while (true) { + if ((len = recv(peer->fd, &sync, sizeof(sync), 0)) < 0) + goto receive_error; + + if (len >= (ssize_t)sizeof(sync)) { + //nj2_dump_packet_header(&sync); + + if (strcmp(sync.type, "header") == 0 && + ntohl(sync.data_type) == 's' && + ntohl(sync.data_stream) == peer->other_stream && + ntohl(sync.id) == peer->params.id) + break; + } + } + peer->sync.is_last = ntohl(sync.is_last); + peer->sync.frames = ntohl(sync.frames); + if (peer->sync.frames == -1) + peer->sync.frames = peer->params.period_size; + + return peer->sync.frames; + +receive_error: + pw_log_warn("recv error: %m"); + return 0; +} + +static inline int32_t netjack2_manager_sync_wait(struct netjack2_peer *peer) +{ + struct nj2_packet_header sync; + ssize_t len; + int32_t offset; + + while (true) { + if ((len = recv(peer->fd, &sync, sizeof(sync), MSG_PEEK)) < 0) + goto receive_error; + + if (len >= (ssize_t)sizeof(sync)) { + //nj2_dump_packet_header(sync); + + if (strcmp(sync.type, "header") == 0 && + ntohl(sync.data_type) == 's' && + ntohl(sync.data_stream) == peer->other_stream && + ntohl(sync.id) == peer->params.id) + break; + } + if ((len = recv(peer->fd, &sync, sizeof(sync), 0)) < 0) + goto receive_error; + } + peer->sync.cycle = ntohl(sync.cycle); + peer->sync.is_last = ntohl(sync.is_last); + peer->sync.frames = ntohl(sync.frames); + if (peer->sync.frames == -1) + peer->sync.frames = peer->params.period_size; + + offset = peer->cycle - peer->sync.cycle; + if (offset < (int32_t)peer->params.network_latency) { + pw_log_info("sync offset %d %d %d", peer->cycle, peer->sync.cycle, offset); + peer->sync.is_last = true; + return 0; + } else { + if ((len = recv(peer->fd, &sync, sizeof(sync), 0)) < 0) + goto receive_error; + } + return peer->sync.frames; + +receive_error: + pw_log_warn("recv error: %m"); + return 0; +} + +static int netjack2_recv_midi(struct netjack2_peer *peer, struct nj2_packet_header *header, uint32_t *count, + struct data_info *info, uint32_t n_info) +{ + ssize_t len; + uint32_t packet_size = SPA_MIN(ntohl(header->packet_size), peer->params.mtu); + uint8_t buffer[packet_size]; + + if ((len = recv(peer->fd, buffer, packet_size, 0)) < 0) + return -errno; + + peer->sync.cycle = ntohl(header->cycle); + peer->sync.num_packets = ntohl(header->num_packets); + + if (++(*count) == peer->sync.num_packets) { + pw_log_trace_fp("got last midi packet"); + } + return 0; +} + +static int netjack2_recv_audio(struct netjack2_peer *peer, struct nj2_packet_header *header, uint32_t *count, + struct data_info *info, uint32_t n_info) +{ + ssize_t len; + uint32_t i, sub_cycle, sub_period_size, sub_period_bytes, active_ports; + uint32_t packet_size = SPA_MIN(ntohl(header->packet_size), peer->params.mtu); + uint8_t buffer[packet_size]; + + if ((len = recv(peer->fd, buffer, packet_size, 0)) < 0) + return -errno; + + sub_cycle = ntohl(header->sub_cycle); + active_ports = ntohl(header->active_ports); + + if (active_ports == 0) { + sub_period_size = peer->sync.frames; + } else { + uint32_t max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); + uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / + (active_ports * sizeof(float))) / logf(2.f))); + sub_period_size = SPA_MIN(period, (uint32_t)peer->sync.frames); + } + sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); + + for (i = 0; i < active_ports; i++) { + int32_t *ap = SPA_PTROFF(buffer, sizeof(*header) + i * sub_period_bytes, int32_t); + uint32_t active_port = ntohl(ap[0]); + void *data; + + pw_log_trace_fp("%u/%u %u %u", active_port, n_info, + sub_cycle, sub_period_size); + if (active_port >= n_info) + continue; + + if ((data = info[active_port].data) != NULL) { + float *dst = SPA_PTROFF(data, + sub_cycle * sub_period_size * sizeof(float), + float); + do_volume(dst, (float*)&ap[1], peer->recv_volume, active_port, sub_period_size); + info[active_port].filled = peer->sync.is_last; + } + } + return 0; +} + +static int netjack2_recv_data(struct netjack2_peer *peer, struct data_info *info, uint32_t n_info) +{ + ssize_t len; + uint32_t i, count = 0; + struct nj2_packet_header header; + + while (!peer->sync.is_last) { + if ((len = recv(peer->fd, &header, sizeof(header), MSG_PEEK)) < 0) + goto receive_error; + + if (len < (ssize_t)sizeof(header)) + goto receive_error; + + //nj2_dump_packet_header(&header); + + if (ntohl(header.data_stream) != peer->other_stream || + ntohl(header.id) != peer->params.id) { + pw_log_debug("not our packet"); + continue; + } + + peer->sync.is_last = ntohl(header.is_last); + + switch (ntohl(header.data_type)) { + case 'm': + netjack2_recv_midi(peer, &header, &count, info, n_info); + break; + case 'a': + netjack2_recv_audio(peer, &header, &count, info, n_info); + break; + case 's': + pw_log_info("missing last data packet"); + peer->sync.is_last = true; + break; + } + } + for (i = 0; i < n_info; i++) { + if (!info[i].filled && info[i].data != NULL) + memset(info[i].data, 0, peer->sync.frames * sizeof(float)); + } + peer->sync.cycle = ntohl(header.cycle); + return 0; + +receive_error: + pw_log_warn("recv error: %m"); + return -errno; +}