From 9dd5bab5358574a5ad10c7d73d03c77c9615274a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 8 Jun 2023 16:38:53 +0200 Subject: [PATCH] module-netjack2: add MIDI send and receive --- src/modules/module-netjack2-driver.c | 62 ++++++---- src/modules/module-netjack2-manager.c | 61 +++++---- src/modules/module-netjack2/packets.h | 32 ++++- src/modules/module-netjack2/peer.c | 171 ++++++++++++++++++++------ 4 files changed, 239 insertions(+), 87 deletions(-) diff --git a/src/modules/module-netjack2-driver.c b/src/modules/module-netjack2-driver.c index 4718b78ec..1a7048095 100644 --- a/src/modules/module-netjack2-driver.c +++ b/src/modules/module-netjack2-driver.c @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -284,11 +283,34 @@ static void stream_state_changed(void *d, enum pw_filter_state old, } } +static inline void set_info(struct stream *s, uint32_t nframes, + struct data_info *midi, uint32_t *n_midi, + struct data_info *audio, uint32_t *n_audio) +{ + uint32_t i, n_m, n_a; + n_m = n_a = 0; + for (i = 0; i < s->n_ports; i++) { + struct port *p = s->ports[i]; + void *data = p ? pw_filter_get_dsp_buffer(p, nframes) : NULL; + if (p && p->is_midi) { + midi[n_m].data = data; + midi[n_m].id = i; + midi[n_m++].filled = false; + } else if (data != NULL) { + audio[n_a].data = data; + audio[n_a].id = i; + audio[n_a++].filled = false; + } + } + *n_midi = n_m; + *n_audio = n_a; +} + static void sink_process(void *d, struct spa_io_position *position) { struct stream *s = d; struct impl *impl = s->impl; - uint32_t i, nframes = position->clock.duration; + uint32_t nframes = position->clock.duration; struct data_info midi[s->n_ports]; struct data_info audio[s->n_ports]; uint32_t n_midi, n_audio; @@ -298,18 +320,7 @@ static void sink_process(void *d, struct spa_io_position *position) return; } - n_midi = n_audio = 0; - for (i = 0; i < s->n_ports; i++) { - struct port *p = s->ports[i]; - void *data = p ? pw_filter_get_dsp_buffer(p, nframes) : NULL; - if (p && p->is_midi) { - midi[n_midi].data = data; - midi[n_midi++].id = i; - } else if (data != NULL) { - audio[n_audio].data = data; - audio[n_audio++].id = i; - } - } + set_info(s, nframes, midi, &n_midi, audio, &n_audio); netjack2_send_data(&impl->peer, nframes, midi, n_midi, audio, n_audio); @@ -322,8 +333,10 @@ static void source_process(void *d, struct spa_io_position *position) { struct stream *s = d; struct impl *impl = s->impl; - uint32_t i, n_samples = position->clock.duration; - struct data_info info[s->n_ports]; + uint32_t nframes = position->clock.duration; + struct data_info midi[s->n_ports]; + struct data_info audio[s->n_ports]; + uint32_t n_midi, n_audio; if (impl->driving == MODE_SOURCE && !impl->triggered) { pw_log_trace_fp("done %"PRIu64, impl->frame_time); @@ -332,13 +345,9 @@ static void source_process(void *d, struct spa_io_position *position) } impl->triggered = false; - for (i = 0; i < s->n_ports; i++) { - struct port *p = s->ports[i]; - info[i].data = p ? pw_filter_get_dsp_buffer(p, n_samples) : NULL; - info[i].id = i; - info[i].filled = false; - } - netjack2_recv_data(&impl->peer, info, s->n_ports); + set_info(s, nframes, midi, &n_midi, audio, &n_audio); + + netjack2_recv_data(&impl->peer, midi, n_midi, audio, n_audio); } static void stream_io_changed(void *data, void *port_data, uint32_t id, void *area, uint32_t size) @@ -655,7 +664,7 @@ on_data_io(void *data, int fd, uint32_t mask) c->target_duration = c->duration; } if (!source_running) - netjack2_recv_data(&impl->peer, NULL, 0); + netjack2_recv_data(&impl->peer, NULL, 0, NULL, 0); if (impl->mode & MODE_SOURCE && source_running) { impl->done = false; @@ -860,6 +869,9 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p peer->other_stream = 's'; peer->send_volume = &impl->sink.volume; peer->recv_volume = &impl->source.volume; + peer->buffer_size = peer->params.period_size * sizeof(float) * + SPA_MAX(peer->params.send_midi_channels, peer->params.recv_midi_channels); + peer->buffer = calloc(1, peer->buffer_size); int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + peer->params.period_size * sizeof(float) * @@ -1037,6 +1049,8 @@ static int send_stop_driver(struct impl *impl) pw_filter_destroy(impl->source.filter); if (impl->sink.filter) pw_filter_destroy(impl->sink.filter); + free(impl->peer.buffer); + impl->peer.buffer = NULL; return 0; } diff --git a/src/modules/module-netjack2-manager.c b/src/modules/module-netjack2-manager.c index dbd6f5aec..9dffa293b 100644 --- a/src/modules/module-netjack2-manager.c +++ b/src/modules/module-netjack2-manager.c @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -292,27 +291,40 @@ static void stream_state_changed(void *d, enum pw_filter_state old, } } -static void sink_process(void *d, struct spa_io_position *position) +static inline void set_info(struct stream *s, uint32_t nframes, + struct data_info *midi, uint32_t *n_midi, + struct data_info *audio, uint32_t *n_audio) { - struct stream *s = d; - struct follower *follower = s->follower; - uint32_t i, nframes = position->clock.duration; - struct data_info midi[s->n_ports]; - struct data_info audio[s->n_ports]; - uint32_t n_midi, n_audio; - - n_midi = n_audio = 0; + uint32_t i, n_m, n_a; + n_m = n_a = 0; for (i = 0; i < s->n_ports; i++) { struct port *p = s->ports[i]; void *data = p ? pw_filter_get_dsp_buffer(p, nframes) : NULL; if (p && p->is_midi) { - midi[n_midi].data = data; - midi[n_midi++].id = i; + midi[n_m].data = data; + midi[n_m].id = i; + midi[n_m++].filled = false; } else if (data != NULL) { - audio[n_audio].data = data; - audio[n_audio++].id = i; + audio[n_a].data = data; + audio[n_a].id = i; + audio[n_a++].filled = false; } } + *n_midi = n_m; + *n_audio = n_a; +} + +static void sink_process(void *d, struct spa_io_position *position) +{ + struct stream *s = d; + struct follower *follower = s->follower; + uint32_t nframes = position->clock.duration; + struct data_info midi[s->n_ports]; + struct data_info audio[s->n_ports]; + uint32_t n_midi, n_audio; + + set_info(s, nframes, midi, &n_midi, audio, &n_audio); + follower->peer.cycle++; netjack2_send_data(&follower->peer, nframes, midi, n_midi, audio, n_audio); @@ -323,17 +335,15 @@ static void source_process(void *d, struct spa_io_position *position) { struct stream *s = d; struct follower *follower = s->follower; - uint32_t i, n_samples = position->clock.duration; - struct data_info info[s->n_ports]; + uint32_t nframes = position->clock.duration; + struct data_info midi[s->n_ports]; + struct data_info audio[s->n_ports]; + uint32_t n_midi, n_audio; + + set_info(s, nframes, midi, &n_midi, audio, &n_audio); - for (i = 0; i < s->n_ports; i++) { - struct port *p = s->ports[i]; - info[i].data = p ? pw_filter_get_dsp_buffer(p, n_samples) : NULL; - info[i].id = i; - info[i].filled = false; - } netjack2_manager_sync_wait(&follower->peer); - netjack2_recv_data(&follower->peer, info, s->n_ports); + netjack2_recv_data(&follower->peer, midi, n_midi, audio, n_audio); } static void follower_free(struct follower *follower) @@ -353,6 +363,8 @@ static void follower_free(struct follower *follower) if (follower->socket) pw_loop_destroy_source(impl->data_loop->loop, follower->socket); + free(follower->peer.buffer); + free(follower); } @@ -934,6 +946,9 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param peer->other_stream = 'r'; peer->send_volume = &follower->sink.volume; peer->recv_volume = &follower->source.volume; + peer->buffer_size = peer->params.period_size * sizeof(float) * + SPA_MAX(peer->params.send_midi_channels, peer->params.recv_midi_channels); + peer->buffer = calloc(1, peer->buffer_size); int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + follower->period_size * sizeof(float) * diff --git a/src/modules/module-netjack2/packets.h b/src/modules/module-netjack2/packets.h index 90ad7641c..4943e941a 100644 --- a/src/modules/module-netjack2/packets.h +++ b/src/modules/module-netjack2/packets.h @@ -142,27 +142,51 @@ static inline void nj2_dump_packet_header(struct nj2_packet_header *header) pw_log_info("Is Last: %u", ntohl(header->is_last)); } +#define MIDI_INLINE_MAX 4 + struct nj2_midi_event { uint32_t time; /**< Sample index at which event is valid */ uint32_t size; /**< Number of bytes of data in the event */ union { - uint32_t offset; /**< offset in buffer */ - uint8_t buffer[4]; /**< Raw inline data */ + uint32_t offset; /**< offset in buffer */ + uint8_t buffer[MIDI_INLINE_MAX]; /**< Raw inline data */ }; }; struct nj2_midi_buffer { #define MIDI_BUFFER_MAGIC 0x900df00d uint32_t magic; - int32_t buffer_size; + uint32_t buffer_size; uint32_t nframes; - int32_t write_pos; + uint32_t write_pos; uint32_t event_count; uint32_t lost_events; struct nj2_midi_event event[1]; }; +static inline void nj2_midi_buffer_hton(struct nj2_midi_buffer *net, + const struct nj2_midi_buffer *host) +{ + net->magic = htonl(host->magic); + net->buffer_size = htonl(host->buffer_size); + net->nframes = htonl(host->nframes); + net->write_pos = htonl(host->write_pos); + net->event_count = htonl(host->event_count); + net->lost_events = htonl(host->lost_events); +} + +static inline void nj2_midi_buffer_ntoh(struct nj2_midi_buffer *host, + const struct nj2_midi_buffer *net) +{ + host->magic = ntohl(net->magic); + host->buffer_size = ntohl(net->buffer_size); + host->nframes = ntohl(net->nframes); + host->write_pos = ntohl(net->write_pos); + host->event_count = ntohl(net->event_count); + host->lost_events = ntohl(net->lost_events); +} + #ifdef __cplusplus } #endif diff --git a/src/modules/module-netjack2/peer.c b/src/modules/module-netjack2/peer.c index 11a8bbbba..cdc2fd597 100644 --- a/src/modules/module-netjack2/peer.c +++ b/src/modules/module-netjack2/peer.c @@ -32,9 +32,18 @@ struct netjack2_peer { struct volume *send_volume; struct volume *recv_volume; + void *buffer; + uint32_t buffer_size; + unsigned fix_midi:1; }; +struct data_info { + uint32_t id; + void *data; + bool filled; +}; + static inline void fix_midi_event(uint8_t *data, size_t size) { /* fixup NoteOn with vel 0 */ @@ -44,12 +53,21 @@ static inline void fix_midi_event(uint8_t *data, size_t size) } } -static inline void midi_to_netjack2(struct netjack2_peer *peer, float *dst, - float *src, uint32_t n_samples) +static void midi_to_netjack2(struct netjack2_peer *peer, + struct nj2_midi_buffer *buf, float *src, uint32_t n_samples) { struct spa_pod *pod; struct spa_pod_sequence *seq; struct spa_pod_control *c; + struct nj2_midi_event *ev; + uint32_t free_size; + + buf->magic = MIDI_BUFFER_MAGIC; + buf->buffer_size = 8192 * sizeof(float); + buf->nframes = n_samples; + buf->write_pos = 0; + buf->event_count = 0; + buf->lost_events = 0; if (src == NULL) return; @@ -62,44 +80,75 @@ static inline void midi_to_netjack2(struct netjack2_peer *peer, float *dst, seq = (struct spa_pod_sequence*)pod; + free_size = buf->buffer_size - sizeof(*buf); + SPA_POD_SEQUENCE_FOREACH(seq, c) { switch(c->type) { case SPA_CONTROL_Midi: { uint8_t *data = SPA_POD_BODY(&c->value); size_t size = SPA_POD_BODY_SIZE(&c->value); + void *ptr; + if (c->offset >= n_samples || + size >= free_size) { + buf->lost_events++; + continue; + } if (peer->fix_midi) fix_midi_event(data, size); + ev = &buf->event[buf->event_count]; + ev->time = c->offset; + ev->size = size; + if (size <= MIDI_INLINE_MAX) { + ptr = ev->buffer; + } else { + buf->write_pos += size; + ev->offset = buf->buffer_size - 1 - buf->write_pos; + free_size -= size; + ptr = SPA_PTROFF(buf, ev->offset, void); + } + memcpy(ptr, data, size); + buf->event_count++; + free_size -= sizeof(*ev); break; } default: break; } } + if (buf->write_pos > 0) + memmove(SPA_PTROFF(buf, sizeof(*buf) + buf->event_count * sizeof(struct nj2_midi_event), void), + SPA_PTROFF(buf, buf->buffer_size - buf->write_pos, void), + buf->write_pos); } -static inline void netjack2_to_midi(float *dst, float *src, uint32_t size) +static inline void netjack2_to_midi(float *dst, uint32_t size, struct nj2_midi_buffer *buf) { struct spa_pod_builder b = { 0, }; - uint32_t i, count; + uint32_t i; struct spa_pod_frame f; - count = 0; spa_pod_builder_init(&b, dst, size); spa_pod_builder_push_sequence(&b, &f, 0); - for (i = 0; i < count; i++) { + for (i = 0; buf != NULL && i < buf->event_count; i++) { + struct nj2_midi_event *ev = &buf->event[i]; + void *data; + + if (ev->size <= MIDI_INLINE_MAX) + data = ev->buffer; + else if (ev->offset > buf->write_pos) + data = SPA_PTROFF(buf, ev->offset - buf->write_pos, void); + else + continue; + + spa_pod_builder_control(&b, ev->time, SPA_CONTROL_Midi); + spa_pod_builder_bytes(&b, data, ev->size); } spa_pod_builder_pop(&b, &f); } -struct data_info { - void *data; - uint32_t id; - bool filled; -}; - static int netjack2_send_sync(struct netjack2_peer *peer, uint32_t nframes) { struct nj2_packet_header header; @@ -138,19 +187,31 @@ static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, { struct nj2_packet_header header; uint8_t buffer[peer->params.mtu]; - uint32_t i, num_packets, active_ports, data_size, res1, res2, max_size; - int32_t *ap; - - if (peer->params.send_midi_channels <= 0) - return 0; + uint32_t i, num_packets, active_ports, data_size, res1, res2; + uint32_t max_size; active_ports = peer->params.send_midi_channels; - ap = SPA_PTROFF(buffer, sizeof(header), int32_t); + if (active_ports <= 0) + return 0; - data_size = active_ports * sizeof(struct nj2_midi_buffer); - memset(ap, 0, data_size); + data_size = 0; + for (i = 0; i < active_ports; i++) { + struct nj2_midi_buffer *mbuf; + void *data = (i < n_info && info) ? info[i].data : NULL; - max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); + mbuf = SPA_PTROFF(peer->buffer, data_size, struct nj2_midi_buffer); + midi_to_netjack2(peer, mbuf, data, nframes); + + data_size += sizeof(*mbuf) + + mbuf->event_count * sizeof(struct nj2_midi_event) + + mbuf->write_pos; + + nj2_midi_buffer_hton(mbuf, mbuf); + } + + /* Note: jack2 calculates the packet max_size and num packets with + * different values... */ + max_size = peer->params.mtu - sizeof(header); res1 = data_size % max_size; res2 = data_size / max_size; @@ -163,22 +224,28 @@ static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, header.cycle = htonl(peer->cycle); header.active_ports = htonl(active_ports); header.num_packets = htonl(num_packets); + header.frames = htonl(nframes); for (i = 0; i < num_packets; i++) { uint32_t is_last = ((i == num_packets - 1) && peer->params.send_audio_channels == 0) ? 1 : 0; - uint32_t packet_size = sizeof(header) + data_size; + uint32_t size = data_size - i * max_size; + uint32_t copy_size = SPA_MIN(size, max_size); + uint32_t packet_size = sizeof(header) + copy_size; header.sub_cycle = htonl(i); header.is_last = htonl(is_last); header.packet_size = htonl(packet_size); memcpy(buffer, &header, sizeof(header)); + memcpy(SPA_PTROFF(buffer, sizeof(header), void), + SPA_PTROFF(peer->buffer, i * max_size, void), + copy_size); send(peer->fd, buffer, packet_size, 0); //nj2_dump_packet_header(&header); } return 0; } -static int netjack2_send_audio(struct netjack2_peer *peer, uint32_t frames, +static int netjack2_send_audio(struct netjack2_peer *peer, uint32_t nframes, struct data_info *info, uint32_t n_info) { struct nj2_packet_header header; @@ -192,15 +259,15 @@ static int netjack2_send_audio(struct netjack2_peer *peer, uint32_t frames, active_ports = n_info; if (active_ports == 0) { - sub_period_size = frames; + sub_period_size = nframes; } else { uint32_t max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); uint32_t period = (uint32_t) powf(2.f, (uint32_t) (logf((float)max_size / (active_ports * sizeof(float))) / logf(2.f))); - sub_period_size = SPA_MIN(period, frames); + sub_period_size = SPA_MIN(period, nframes); } sub_period_bytes = sub_period_size * sizeof(float) + sizeof(int32_t); - num_packets = frames / sub_period_size; + num_packets = nframes / sub_period_size; strcpy(header.type, "header"); header.data_type = htonl('a'); @@ -209,6 +276,7 @@ static int netjack2_send_audio(struct netjack2_peer *peer, uint32_t frames, header.cycle = htonl(peer->cycle); header.active_ports = htonl(active_ports); header.num_packets = htonl(num_packets); + header.frames = htonl(nframes); for (i = 0; i < num_packets; i++) { uint32_t is_last = (i == num_packets - 1) ? 1 : 0; @@ -323,17 +391,42 @@ static int netjack2_recv_midi(struct netjack2_peer *peer, struct nj2_packet_head struct data_info *info, uint32_t n_info) { ssize_t len; + uint32_t i, active_ports, sub_cycle, max_size, offset; uint32_t packet_size = SPA_MIN(ntohl(header->packet_size), peer->params.mtu); - uint8_t buffer[packet_size]; + uint8_t buffer[packet_size], *p = buffer; + + //nj2_dump_packet_header(header); if ((len = recv(peer->fd, buffer, packet_size, 0)) < 0) return -errno; - peer->sync.cycle = ntohl(header->cycle); + active_ports = peer->params.send_midi_channels; + sub_cycle = ntohl(header->sub_cycle); peer->sync.num_packets = ntohl(header->num_packets); + max_size = peer->params.mtu - sizeof(*header); + offset = max_size * sub_cycle; - if (++(*count) == peer->sync.num_packets) { - pw_log_trace_fp("got last midi packet"); + p += sizeof(*header); + len -= sizeof(*header); + + if (offset + len < peer->buffer_size) + memcpy(SPA_PTROFF(peer->buffer, offset, void), p, len); + + if (++(*count) < peer->sync.num_packets) + return 0; + + p = (uint8_t*)peer->buffer; + for (i = 0; i < active_ports; i++) { + struct nj2_midi_buffer *mbuf = (struct nj2_midi_buffer *)p; + nj2_midi_buffer_ntoh(mbuf, mbuf); + + if (i < n_info && info[i].data != NULL) { + netjack2_to_midi(info[i].data, peer->params.period_size * sizeof(float), mbuf); + info[i].filled = true; + } + p += sizeof(*mbuf) + + mbuf->event_count * sizeof(struct nj2_midi_event) + + mbuf->write_pos; } return 0; } @@ -383,7 +476,9 @@ static int netjack2_recv_audio(struct netjack2_peer *peer, struct nj2_packet_hea return 0; } -static int netjack2_recv_data(struct netjack2_peer *peer, struct data_info *info, uint32_t n_info) +static int netjack2_recv_data(struct netjack2_peer *peer, + struct data_info *midi, uint32_t n_midi, + struct data_info *audio, uint32_t n_audio) { ssize_t len; uint32_t i, count = 0; @@ -408,10 +503,10 @@ static int netjack2_recv_data(struct netjack2_peer *peer, struct data_info *info switch (ntohl(header.data_type)) { case 'm': - netjack2_recv_midi(peer, &header, &count, info, n_info); + netjack2_recv_midi(peer, &header, &count, midi, n_midi); break; case 'a': - netjack2_recv_audio(peer, &header, &count, info, n_info); + netjack2_recv_audio(peer, &header, &count, audio, n_audio); break; case 's': pw_log_info("missing last data packet"); @@ -419,9 +514,13 @@ static int netjack2_recv_data(struct netjack2_peer *peer, struct data_info *info break; } } - for (i = 0; i < n_info; i++) { - if (!info[i].filled && info[i].data != NULL) - memset(info[i].data, 0, peer->sync.frames * sizeof(float)); + for (i = 0; i < n_audio; i++) { + if (!audio[i].filled && audio[i].data != NULL) + memset(audio[i].data, 0, peer->sync.frames * sizeof(float)); + } + for (i = 0; i < n_midi; i++) { + if (!midi[i].filled && midi[i].data != NULL) + netjack2_to_midi(midi[i].data, peer->params.period_size * sizeof(float), NULL); } peer->sync.cycle = ntohl(header.cycle); return 0;