module-rtp: add timer for ck requests

Scale RTP timestamps against the clock, allow some jitter.
Make method to query current RTP timestamps.
This commit is contained in:
Wim Taymans 2023-03-07 09:00:40 +01:00
parent 8e5b9da177
commit c5effbd979
5 changed files with 181 additions and 25 deletions

View file

@ -193,6 +193,8 @@ struct session {
#define SESSION_STATE_ESTABLISHING 3
#define SESSION_STATE_ESTABLISHED 4
int state;
int ck_count;
uint64_t next_time;
uint32_t initiator;
uint32_t remote_ssrc;
@ -230,6 +232,7 @@ struct impl {
unsigned int do_disconnect:1;
struct spa_source *timer;
uint64_t next_time;
struct spa_source *ctrl_source;
struct spa_source *data_source;
@ -270,6 +273,97 @@ static ssize_t send_packet(int fd, struct msghdr *msg)
return n;
}
static uint64_t current_time_ns(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static void set_timeout(struct impl *impl, uint64_t time)
{
struct itimerspec ts;
ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
pw_loop_update_timer(impl->loop, impl->timer, &ts.it_value, &ts.it_interval, true);
impl->next_time = time;
}
static void schedule_timeout(struct impl *impl)
{
struct session *sess;
uint64_t next_time = 0;
spa_list_for_each(sess, &impl->sessions, link) {
if (next_time == 0 ||
(sess->next_time != 0 && sess->next_time < next_time))
next_time = sess->next_time;
}
set_timeout(impl, next_time);
}
static void send_apple_midi_cmd_ck0(struct session *sess)
{
struct impl *impl = sess->impl;
struct iovec iov[3];
struct msghdr msg;
struct rtp_apple_midi_ck hdr;
uint64_t current_time, ts;
spa_zero(hdr);
hdr.cmd = htonl(APPLE_MIDI_CMD_CK);
hdr.ssrc = htonl(sess->ssrc);
current_time = current_time_ns();
ts = current_time / 10000;
hdr.ts1_h = htonl(ts >> 32);
hdr.ts1_l = htonl(ts);
iov[0].iov_base = &hdr;
iov[0].iov_len = sizeof(hdr);
spa_zero(msg);
msg.msg_name = &sess->data_addr;
msg.msg_namelen = sess->data_len;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
send_packet(impl->data_source->fd, &msg);
if (sess->ck_count++ < 8)
sess->next_time = current_time + SPA_NSEC_PER_SEC;
else if (sess->ck_count++ < 16)
sess->next_time = current_time + 2 * SPA_NSEC_PER_SEC;
else
sess->next_time = current_time + 5 * SPA_NSEC_PER_SEC;
}
static void session_update_state(struct session *sess, int state)
{
if (sess->state == state)
return;
pw_log_info("session initiator:%08x state:%d", sess->initiator, state);
sess->state = state;
switch (state) {
case SESSION_STATE_ESTABLISHED:
if (sess->we_initiated) {
sess->ck_count = 0;
send_apple_midi_cmd_ck0(sess);
schedule_timeout(sess->impl);
}
break;
case SESSION_STATE_INIT:
sess->next_time = 0;
schedule_timeout(sess->impl);
break;
default:
break;
}
}
static void send_apple_midi_cmd_in(struct session *sess, bool ctrl)
{
struct impl *impl = sess->impl;
@ -294,12 +388,12 @@ static void send_apple_midi_cmd_in(struct session *sess, bool ctrl)
msg.msg_name = &sess->ctrl_addr;
msg.msg_namelen = sess->ctrl_len;
fd = impl->ctrl_source->fd;
sess->state = SESSION_STATE_SENDING_CTRL_IN;
session_update_state(sess, SESSION_STATE_SENDING_CTRL_IN);
} else {
msg.msg_name = &sess->data_addr;
msg.msg_namelen = sess->data_len;
fd = impl->data_source->fd;
sess->state = SESSION_STATE_SENDING_DATA_IN;
session_update_state(sess, SESSION_STATE_SENDING_DATA_IN);
}
msg.msg_iov = iov;
msg.msg_iovlen = 2;
@ -370,7 +464,7 @@ static void session_stop(struct session *sess)
send_apple_midi_cmd_by(sess, false);
sess->data_ready = false;
}
sess->state = SESSION_STATE_INIT;
session_update_state(sess, SESSION_STATE_INIT);
}
static void send_destroy(void *data)
@ -508,6 +602,9 @@ static struct session *make_session(struct impl *impl, struct pw_properties *pro
pw_properties_setf(props, "rtp.sender-ssrc", "%u", sess->ssrc);
pw_properties_set(props, "rtp.session", sess->name);
if (pw_properties_get(props, PW_KEY_NODE_GROUP) == NULL)
pw_properties_set(props, PW_KEY_NODE_GROUP, impl->session_name);
copy = pw_properties_copy(props);
if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL) {
@ -524,6 +621,7 @@ static struct session *make_session(struct impl *impl, struct pw_properties *pro
pw_properties_setf(props, PW_KEY_MEDIA_CLASS, "%s/Source", media);
}
}
sess->send = rtp_stream_new(impl->core,
PW_DIRECTION_INPUT, copy,
&send_stream_events, sess);
@ -607,7 +705,7 @@ static void parse_apple_midi_cmd_in(struct impl *impl, bool ctrl, uint8_t *buffe
sess->ctrl_addr = *sa;
sess->ctrl_len = salen;
sess->ctrl_ready = true;
sess->state = SESSION_STATE_ESTABLISHING;
session_update_state(sess, SESSION_STATE_ESTABLISHING);
}
}
else {
@ -620,7 +718,7 @@ static void parse_apple_midi_cmd_in(struct impl *impl, bool ctrl, uint8_t *buffe
sess->data_addr = *sa;
sess->data_len = salen;
sess->data_ready = true;
sess->state = SESSION_STATE_ESTABLISHED;
session_update_state(sess, SESSION_STATE_ESTABLISHED);
}
}
@ -671,7 +769,7 @@ static void parse_apple_midi_cmd_ok(struct impl *impl, bool ctrl, uint8_t *buffe
sess->remote_ssrc = ntohl(hdr->ssrc);
sess->data_ready = true;
if (sess->ctrl_ready)
sess->state = SESSION_STATE_ESTABLISHED;
session_update_state(sess, SESSION_STATE_ESTABLISHED);
}
}
@ -682,9 +780,8 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe
struct iovec iov[3];
struct msghdr msg;
struct rtp_apple_midi_ck reply;
struct timespec ts;
struct session *sess;
uint64_t now;
uint64_t now, t1, t2, t3;
uint32_t ssrc = ntohl(hdr->ssrc);
sess = find_session_by_ssrc(impl, ssrc);
@ -695,27 +792,44 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe
pw_log_debug("got CK count %d", hdr->count);
clock_gettime(CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_NSEC(&ts);
now = current_time_ns() / 10000;
reply = *hdr;
reply.ssrc = htonl(sess->ssrc);
reply.count++;
iov[0].iov_base = &reply;
iov[0].iov_len = sizeof(reply);
t1 = ((uint64_t)ntohl(hdr->ts1_h) << 32) | ntohl(hdr->ts1_l);
t2 = t3 = 0;
switch (hdr->count) {
case 0:
reply.ts2_h = htonl(now >> 32);
reply.ts2_l = htonl(now);
t2 = now;
break;
case 1:
reply.ts3_h = htonl(now >> 32);
reply.ts3_l = htonl(now);
t2 = ((uint64_t)ntohl(hdr->ts2_h) << 32) | ntohl(hdr->ts2_l);
t3 = now;
break;
case 2:
t3 = ((uint64_t)ntohl(hdr->ts3_h) << 32) | ntohl(hdr->ts3_l);
return;
}
if (hdr->count >= 1) {
int64_t latency, offset;
latency = t3 - t1;
offset = ((t3 + t1) / 2) - t2;
pw_log_info("latency:%f offset:%f", latency / 1e5, offset / 1e5);
if (hdr->count >= 2)
return;
}
reply.ts2_h = htonl(t2 >> 32);
reply.ts2_l = htonl(t2);
reply.ts3_h = htonl(t3 >> 32);
reply.ts3_l = htonl(t3);
spa_zero(msg);
msg.msg_name = sa;
msg.msg_namelen = salen;
@ -744,12 +858,12 @@ static void parse_apple_midi_cmd_by(struct impl *impl, bool ctrl, uint8_t *buffe
pw_log_info("%p: got ctrl BY %08x %u", sess, initiator, sess->data_ready);
sess->ctrl_ready = false;
if (!sess->data_ready)
sess->state = SESSION_STATE_INIT;
session_update_state(sess, SESSION_STATE_INIT);
} else {
pw_log_info("%p: got data BY %08x %u", sess, initiator, sess->ctrl_ready);
sess->data_ready = false;
if (!sess->ctrl_ready)
sess->state = SESSION_STATE_INIT;
session_update_state(sess, SESSION_STATE_INIT);
}
}
@ -1423,6 +1537,24 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
}
}
static void on_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
struct session *sess;
uint64_t current_time = impl->next_time;
pw_log_info("timeout");
spa_list_for_each(sess, &impl->sessions, link) {
if (sess->state != SESSION_STATE_ESTABLISHED)
continue;
if (sess->next_time < current_time)
continue;
send_apple_midi_cmd_ck0(sess);
}
schedule_timeout(impl);
}
static void copy_props(struct impl *impl, struct pw_properties *props, const char *key)
{
const char *str;
@ -1440,6 +1572,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
struct pw_properties *props = NULL, *stream_props = NULL;
uint16_t port;
const char *str;
struct timespec value, interval;
int res = 0;
PW_LOG_TOPIC_INIT(mod_topic);
@ -1564,6 +1697,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener,
&core_events, impl);
impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
goto out;
}
value.tv_sec = 0;
value.tv_nsec = 1;
interval.tv_sec = 1;
interval.tv_nsec = 0;
pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false);
if ((res = setup_apple_session(impl)) < 0)
goto out;

View file

@ -151,8 +151,6 @@ struct impl {
struct spa_hook core_listener;
struct spa_hook core_proxy_listener;
struct spa_source *timer;
struct pw_properties *stream_props;
struct rtp_stream *stream;
@ -338,9 +336,6 @@ static void impl_destroy(struct impl *impl)
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
if (impl->timer)
pw_loop_destroy_source(impl->loop, impl->timer);
if (impl->rtp_fd != -1)
close(impl->rtp_fd);

View file

@ -266,13 +266,14 @@ static void process_audio_capture(void *data)
wanted = size / stride;
filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp);
if (SPA_LIKELY(impl->io_position))
timestamp = impl->io_position->clock.position;
else
if (SPA_LIKELY(impl->io_position)) {
uint32_t rate = impl->io_position->clock.rate.denom;
timestamp = impl->io_position->clock.position * impl->rate / rate;
} else
timestamp = expected_timestamp;
if (impl->have_sync) {
if (expected_timestamp != timestamp) {
if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) {
pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp);
impl->have_sync = false;
} else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) {

View file

@ -466,3 +466,16 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len)
struct impl *impl = (struct impl*)s;
return impl->receive_rtp(impl, buffer, len);
}
uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate)
{
struct impl *impl = (struct impl*)s;
struct spa_io_position *pos = impl->io_position;
if (pos == NULL)
return -EIO;
*rate = impl->rate;
return pos->clock.position * impl->rate *
pos->clock.rate.num / pos->clock.rate.denom;
}

View file

@ -42,6 +42,8 @@ void rtp_stream_destroy(struct rtp_stream *s);
int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len);
uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate);
#ifdef __cplusplus
}