diff --git a/src/modules/module-netjack2-driver.c b/src/modules/module-netjack2-driver.c index 5584bb47e..8ba873b18 100644 --- a/src/modules/module-netjack2-driver.c +++ b/src/modules/module-netjack2-driver.c @@ -393,6 +393,7 @@ receive_error: struct data_info { void *data; uint32_t id; + bool filled; }; static int netjack2_send_sync(struct stream *s, uint32_t nframes) @@ -565,8 +566,6 @@ static int netjack2_recv_midi(struct stream *s, struct nj2_packet_header *header if ((len = recv(impl->socket->fd, impl->recv_buffer, ntohl(header->packet_size), 0)) < 0) return -errno; - impl->sync.is_last = ntohl(header->is_last); - impl->sync.cycle = ntohl(header->cycle); impl->sync.num_packets = ntohl(header->num_packets); @@ -586,8 +585,6 @@ static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *heade if ((len = recv(impl->socket->fd, impl->recv_buffer, ntohl(header->packet_size), 0)) < 0) return -errno; - impl->sync.is_last = ntohl(header->is_last); - sub_cycle = ntohl(header->sub_cycle); active_ports = ntohl(header->active_ports); @@ -616,12 +613,9 @@ static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *heade sub_cycle * sub_period_size * sizeof(float), float); do_volume(dst, (float*)&ap[1], &s->volume, active_port, sub_period_size); + info[active_port].filled = impl->sync.is_last; } } - if (impl->sync.is_last) { - pw_log_trace_fp("got last audio packet"); - } - return 0; } @@ -629,7 +623,7 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t { struct impl *impl = s->impl; ssize_t len; - uint32_t count = 0; + uint32_t i, count = 0; struct nj2_packet_header *header = (struct nj2_packet_header *)impl->recv_buffer; while (!impl->sync.is_last) { @@ -647,6 +641,8 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t continue; } + impl->sync.is_last = ntohl(header->is_last); + switch (ntohl(header->data_type)) { case 'm': netjack2_recv_midi(s, header, &count, info, n_info); @@ -656,9 +652,14 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t break; case 's': pw_log_info("missing last data packet"); - return 0; + impl->sync.is_last = true; + break; } } + for (i = 0; i < s->n_ports; i++) { + if (!info[i].filled && info[i].data != NULL) + memset(info[i].data, 0, impl->params.period_size * sizeof(float)); + } impl->sync.cycle = ntohl(header->cycle); return 0; @@ -685,8 +686,8 @@ static void source_process(void *d, struct spa_io_position *position) struct port *p = s->ports[i]; info[i].data = p ? pw_filter_get_dsp_buffer(p, n_samples) : NULL; info[i].id = i; + info[i].filled = false; } - netjack2_recv_data(s, info, s->n_ports); } diff --git a/src/modules/module-netjack2-manager.c b/src/modules/module-netjack2-manager.c index 4ab275f5c..325fd44bf 100644 --- a/src/modules/module-netjack2-manager.c +++ b/src/modules/module-netjack2-manager.c @@ -361,6 +361,7 @@ static void stream_state_changed(void *d, enum pw_filter_state old, struct data_info { void *data; uint32_t id; + bool filled; }; static int netjack2_send_sync(struct stream *s, uint32_t nframes) @@ -571,6 +572,7 @@ static int netjack2_recv_audio(struct stream *s, struct nj2_packet_header *heade sub_cycle * sub_period_size * sizeof(float), float); do_volume(dst, (float*)&ap[1], &s->volume, active_port, sub_period_size); + info[active_port].filled = follower->sync.is_last; } } return 0; @@ -606,6 +608,7 @@ static int32_t netjack2_sync_wait(struct follower *follower) offset = follower->cycle - follower->sync.cycle; if (offset < (int32_t)follower->params.network_latency) { pw_log_info("sync offset %d %d %d", follower->cycle, follower->sync.cycle, offset); + follower->sync.is_last = true; return 0; } else { if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, 0)) < 0) @@ -623,10 +626,12 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t { struct follower *follower = s->follower; ssize_t len; - uint32_t count = 0; + uint32_t i, count = 0; struct nj2_packet_header *header = (struct nj2_packet_header *)follower->recv_buffer; int res = 0; + netjack2_sync_wait(follower); + while (!follower->sync.is_last) { if ((len = recv(follower->socket->fd, follower->recv_buffer, follower->params.mtu, MSG_PEEK)) < 0) goto receive_error; @@ -652,13 +657,18 @@ static int netjack2_recv_data(struct stream *s, struct data_info *info, uint32_t break; case 's': pw_log_info("missing last data packet"); - return 0; + follower->sync.is_last = true; + break; } if (res < 0) { pw_log_warn("recv error: %s", spa_strerror(res)); break; } } + for (i = 0; i < n_info; i++) { + if (!info[i].filled && info[i].data != NULL) + memset(info[i].data, 0, follower->params.period_size * sizeof(float)); + } follower->sync.cycle = ntohl(header->cycle); return 0; @@ -672,19 +682,14 @@ static void source_process(void *d, struct spa_io_position *position) struct stream *s = d; uint32_t i, n_samples = position->clock.duration; struct data_info info[s->n_ports]; - uint32_t sync; - - sync = netjack2_sync_wait(s->follower); for (i = 0; i < s->n_ports; i++) { struct port *p = s->ports[i]; info[i].data = p ? pw_filter_get_dsp_buffer(p, n_samples) : NULL; info[i].id = i; - if (sync == 0 && info[i].data != NULL) - memset(info[i].data, 0, n_samples * sizeof(float)); + info[i].filled = false; } - if (sync > 0) - netjack2_recv_data(s, info, s->n_ports); + netjack2_recv_data(s, info, s->n_ports); } static void follower_free(struct follower *follower)