From c5effbd979c90fdd2cb7f086a9de2045729e17f2 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 7 Mar 2023 09:00:40 +0100 Subject: [PATCH] module-rtp: add timer for ck requests Scale RTP timestamps against the clock, allow some jitter. Make method to query current RTP timestamps. --- src/modules/module-rtp-session.c | 177 ++++++++++++++++++++++++++++--- src/modules/module-rtp-sink.c | 5 - src/modules/module-rtp/audio.c | 9 +- src/modules/module-rtp/stream.c | 13 +++ src/modules/module-rtp/stream.h | 2 + 5 files changed, 181 insertions(+), 25 deletions(-) diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index 254c842e7..35e143b48 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -193,6 +193,8 @@ struct session { #define SESSION_STATE_ESTABLISHING 3 #define SESSION_STATE_ESTABLISHED 4 int state; + int ck_count; + uint64_t next_time; uint32_t initiator; uint32_t remote_ssrc; @@ -230,6 +232,7 @@ struct impl { unsigned int do_disconnect:1; struct spa_source *timer; + uint64_t next_time; struct spa_source *ctrl_source; struct spa_source *data_source; @@ -270,6 +273,97 @@ static ssize_t send_packet(int fd, struct msghdr *msg) return n; } +static uint64_t current_time_ns(void) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return SPA_TIMESPEC_TO_NSEC(&ts); +} + +static void set_timeout(struct impl *impl, uint64_t time) +{ + struct itimerspec ts; + ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + pw_loop_update_timer(impl->loop, impl->timer, &ts.it_value, &ts.it_interval, true); + impl->next_time = time; +} + +static void schedule_timeout(struct impl *impl) +{ + struct session *sess; + uint64_t next_time = 0; + spa_list_for_each(sess, &impl->sessions, link) { + if (next_time == 0 || + (sess->next_time != 0 && sess->next_time < next_time)) + next_time = sess->next_time; + } + set_timeout(impl, next_time); +} + +static void send_apple_midi_cmd_ck0(struct session *sess) +{ + struct impl *impl = sess->impl; + struct iovec iov[3]; + struct msghdr msg; + struct rtp_apple_midi_ck hdr; + uint64_t current_time, ts; + + spa_zero(hdr); + hdr.cmd = htonl(APPLE_MIDI_CMD_CK); + hdr.ssrc = htonl(sess->ssrc); + + current_time = current_time_ns(); + ts = current_time / 10000; + hdr.ts1_h = htonl(ts >> 32); + hdr.ts1_l = htonl(ts); + + iov[0].iov_base = &hdr; + iov[0].iov_len = sizeof(hdr); + + spa_zero(msg); + msg.msg_name = &sess->data_addr; + msg.msg_namelen = sess->data_len; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + send_packet(impl->data_source->fd, &msg); + + if (sess->ck_count++ < 8) + sess->next_time = current_time + SPA_NSEC_PER_SEC; + else if (sess->ck_count++ < 16) + sess->next_time = current_time + 2 * SPA_NSEC_PER_SEC; + else + sess->next_time = current_time + 5 * SPA_NSEC_PER_SEC; +} + +static void session_update_state(struct session *sess, int state) +{ + if (sess->state == state) + return; + + pw_log_info("session initiator:%08x state:%d", sess->initiator, state); + + sess->state = state; + switch (state) { + case SESSION_STATE_ESTABLISHED: + if (sess->we_initiated) { + sess->ck_count = 0; + send_apple_midi_cmd_ck0(sess); + schedule_timeout(sess->impl); + } + break; + case SESSION_STATE_INIT: + sess->next_time = 0; + schedule_timeout(sess->impl); + break; + default: + break; + } +} + static void send_apple_midi_cmd_in(struct session *sess, bool ctrl) { struct impl *impl = sess->impl; @@ -294,12 +388,12 @@ static void send_apple_midi_cmd_in(struct session *sess, bool ctrl) msg.msg_name = &sess->ctrl_addr; msg.msg_namelen = sess->ctrl_len; fd = impl->ctrl_source->fd; - sess->state = SESSION_STATE_SENDING_CTRL_IN; + session_update_state(sess, SESSION_STATE_SENDING_CTRL_IN); } else { msg.msg_name = &sess->data_addr; msg.msg_namelen = sess->data_len; fd = impl->data_source->fd; - sess->state = SESSION_STATE_SENDING_DATA_IN; + session_update_state(sess, SESSION_STATE_SENDING_DATA_IN); } msg.msg_iov = iov; msg.msg_iovlen = 2; @@ -370,7 +464,7 @@ static void session_stop(struct session *sess) send_apple_midi_cmd_by(sess, false); sess->data_ready = false; } - sess->state = SESSION_STATE_INIT; + session_update_state(sess, SESSION_STATE_INIT); } static void send_destroy(void *data) @@ -508,6 +602,9 @@ static struct session *make_session(struct impl *impl, struct pw_properties *pro pw_properties_setf(props, "rtp.sender-ssrc", "%u", sess->ssrc); pw_properties_set(props, "rtp.session", sess->name); + if (pw_properties_get(props, PW_KEY_NODE_GROUP) == NULL) + pw_properties_set(props, PW_KEY_NODE_GROUP, impl->session_name); + copy = pw_properties_copy(props); if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL) { @@ -524,6 +621,7 @@ static struct session *make_session(struct impl *impl, struct pw_properties *pro pw_properties_setf(props, PW_KEY_MEDIA_CLASS, "%s/Source", media); } } + sess->send = rtp_stream_new(impl->core, PW_DIRECTION_INPUT, copy, &send_stream_events, sess); @@ -607,7 +705,7 @@ static void parse_apple_midi_cmd_in(struct impl *impl, bool ctrl, uint8_t *buffe sess->ctrl_addr = *sa; sess->ctrl_len = salen; sess->ctrl_ready = true; - sess->state = SESSION_STATE_ESTABLISHING; + session_update_state(sess, SESSION_STATE_ESTABLISHING); } } else { @@ -620,7 +718,7 @@ static void parse_apple_midi_cmd_in(struct impl *impl, bool ctrl, uint8_t *buffe sess->data_addr = *sa; sess->data_len = salen; sess->data_ready = true; - sess->state = SESSION_STATE_ESTABLISHED; + session_update_state(sess, SESSION_STATE_ESTABLISHED); } } @@ -671,7 +769,7 @@ static void parse_apple_midi_cmd_ok(struct impl *impl, bool ctrl, uint8_t *buffe sess->remote_ssrc = ntohl(hdr->ssrc); sess->data_ready = true; if (sess->ctrl_ready) - sess->state = SESSION_STATE_ESTABLISHED; + session_update_state(sess, SESSION_STATE_ESTABLISHED); } } @@ -682,9 +780,8 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe struct iovec iov[3]; struct msghdr msg; struct rtp_apple_midi_ck reply; - struct timespec ts; struct session *sess; - uint64_t now; + uint64_t now, t1, t2, t3; uint32_t ssrc = ntohl(hdr->ssrc); sess = find_session_by_ssrc(impl, ssrc); @@ -695,27 +792,44 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe pw_log_debug("got CK count %d", hdr->count); - clock_gettime(CLOCK_MONOTONIC, &ts); - now = SPA_TIMESPEC_TO_NSEC(&ts); + now = current_time_ns() / 10000; reply = *hdr; reply.ssrc = htonl(sess->ssrc); reply.count++; iov[0].iov_base = &reply; iov[0].iov_len = sizeof(reply); + t1 = ((uint64_t)ntohl(hdr->ts1_h) << 32) | ntohl(hdr->ts1_l); + t2 = t3 = 0; + switch (hdr->count) { case 0: - reply.ts2_h = htonl(now >> 32); - reply.ts2_l = htonl(now); + t2 = now; break; case 1: - reply.ts3_h = htonl(now >> 32); - reply.ts3_l = htonl(now); + t2 = ((uint64_t)ntohl(hdr->ts2_h) << 32) | ntohl(hdr->ts2_l); + t3 = now; break; case 2: + t3 = ((uint64_t)ntohl(hdr->ts3_h) << 32) | ntohl(hdr->ts3_l); return; } + if (hdr->count >= 1) { + int64_t latency, offset; + latency = t3 - t1; + offset = ((t3 + t1) / 2) - t2; + + pw_log_info("latency:%f offset:%f", latency / 1e5, offset / 1e5); + if (hdr->count >= 2) + return; + } + + reply.ts2_h = htonl(t2 >> 32); + reply.ts2_l = htonl(t2); + reply.ts3_h = htonl(t3 >> 32); + reply.ts3_l = htonl(t3); + spa_zero(msg); msg.msg_name = sa; msg.msg_namelen = salen; @@ -744,12 +858,12 @@ static void parse_apple_midi_cmd_by(struct impl *impl, bool ctrl, uint8_t *buffe pw_log_info("%p: got ctrl BY %08x %u", sess, initiator, sess->data_ready); sess->ctrl_ready = false; if (!sess->data_ready) - sess->state = SESSION_STATE_INIT; + session_update_state(sess, SESSION_STATE_INIT); } else { pw_log_info("%p: got data BY %08x %u", sess, initiator, sess->ctrl_ready); sess->data_ready = false; if (!sess->ctrl_ready) - sess->state = SESSION_STATE_INIT; + session_update_state(sess, SESSION_STATE_INIT); } } @@ -1423,6 +1537,24 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda } } +static void on_timer_event(void *data, uint64_t expirations) +{ + struct impl *impl = data; + struct session *sess; + uint64_t current_time = impl->next_time; + + pw_log_info("timeout"); + spa_list_for_each(sess, &impl->sessions, link) { + if (sess->state != SESSION_STATE_ESTABLISHED) + continue; + if (sess->next_time < current_time) + continue; + + send_apple_midi_cmd_ck0(sess); + } + schedule_timeout(impl); +} + static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -1440,6 +1572,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_properties *props = NULL, *stream_props = NULL; uint16_t port; const char *str; + struct timespec value, interval; int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -1564,6 +1697,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); + impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl); + if (impl->timer == NULL) { + res = -errno; + pw_log_error("can't create timer source: %m"); + goto out; + } + value.tv_sec = 0; + value.tv_nsec = 1; + interval.tv_sec = 1; + interval.tv_nsec = 0; + pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); + if ((res = setup_apple_session(impl)) < 0) goto out; diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index b409e243e..ab3d40d93 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -151,8 +151,6 @@ struct impl { struct spa_hook core_listener; struct spa_hook core_proxy_listener; - struct spa_source *timer; - struct pw_properties *stream_props; struct rtp_stream *stream; @@ -338,9 +336,6 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->timer) - pw_loop_destroy_source(impl->loop, impl->timer); - if (impl->rtp_fd != -1) close(impl->rtp_fd); diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 2fd5f79bc..3612ad470 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -266,13 +266,14 @@ static void process_audio_capture(void *data) wanted = size / stride; filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); - if (SPA_LIKELY(impl->io_position)) - timestamp = impl->io_position->clock.position; - else + if (SPA_LIKELY(impl->io_position)) { + uint32_t rate = impl->io_position->clock.rate.denom; + timestamp = impl->io_position->clock.position * impl->rate / rate; + } else timestamp = expected_timestamp; if (impl->have_sync) { - if (expected_timestamp != timestamp) { + if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) { pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); impl->have_sync = false; } else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) { diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 1e32e00a4..1a251e465 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -466,3 +466,16 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) struct impl *impl = (struct impl*)s; return impl->receive_rtp(impl, buffer, len); } + +uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate) +{ + struct impl *impl = (struct impl*)s; + struct spa_io_position *pos = impl->io_position; + + if (pos == NULL) + return -EIO; + + *rate = impl->rate; + return pos->clock.position * impl->rate * + pos->clock.rate.num / pos->clock.rate.denom; +} diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 8cb137aee..1af20b1d4 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -42,6 +42,8 @@ void rtp_stream_destroy(struct rtp_stream *s); int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len); +uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate); + #ifdef __cplusplus }