From 1a5de467db9f2715e97f01ebd1921bf92974f664 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 1 Feb 2023 18:31:33 +0100 Subject: [PATCH] module-rtp: support direct clock timestamps Always use the timestamp of the graph clock for RTP packets. Add an option to apply a random or fixed offset. Add a ts-refclk option on the sender to specify a reference clock to use for timestamping. This will activate the direct timestamp mode and signal this in the SDP. Parse ts-refclk and ts-offset from the SDP. Make it possible to match them in rules. Add option to activate the direct-timestamp, where the rate matching is disabled and the timestamps are used directly for writing and reading from the ringbuffer using the graph clock. This makes it possible to set a PTP clock on sink and source and avoid rate matching by using a shared clock. --- src/modules/module-rtp-sink.c | 63 +++++++++++++++++++++++++----- src/modules/module-rtp-source.c | 69 +++++++++++++++++++++++++++++---- 2 files changed, 116 insertions(+), 16 deletions(-) diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index b0c622d1a..a4398ffb3 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -70,6 +70,8 @@ * - `sess.min-ptime = `: minimum packet time in milliseconds, default 2 * - `sess.max-ptime = `: maximum packet time in milliseconds, default 20 * - `sess.name = `: a session name + * - `sess.ts-offset = `: an offset to apply to the timestamp, default -1 = random offset + * - `sess.ts-refclk = `: the name of a reference clock * - `stream.props = {}`: properties to be passed to the stream * * ## General options @@ -149,6 +151,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define DEFAULT_MIN_PTIME 2 #define DEFAULT_MAX_PTIME 20 +#define DEFAULT_TS_OFFSET -1 #define USAGE "sap.ip= " \ "sap.port= " \ @@ -212,6 +215,8 @@ struct impl { struct pw_stream *stream; struct spa_hook stream_listener; + struct spa_io_position *io_position; + unsigned int do_disconnect:1; char *ifname; @@ -220,8 +225,8 @@ struct impl { int mtu; bool ttl; bool mcast_loop; - uint32_t min_ptime; - uint32_t max_ptime; + float min_ptime; + float max_ptime; uint32_t pbytes; struct sockaddr_storage src_addr; @@ -243,8 +248,9 @@ struct impl { uint32_t frame_size; int payload; uint16_t seq; - uint32_t timestamp; uint32_t ssrc; + uint32_t ts_offset; + char ts_refclk[64]; struct spa_ringbuffer ring; uint8_t buffer[BUFFER_SIZE]; @@ -306,13 +312,14 @@ static void flush_packets(struct impl *impl) while (avail >= tosend) { header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->timestamp); + header.timestamp = htonl(impl->ts_offset + index / impl->frame_size); set_iovec(&impl->ring, impl->buffer, BUFFER_SIZE, index & BUFFER_MASK, &iov[1], tosend); + pw_log_trace("sending %d index:%d", tosend, index); n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); if (n < 0) { switch (errno) { @@ -327,7 +334,6 @@ static void flush_packets(struct impl *impl) } impl->seq++; - impl->timestamp += tosend / impl->frame_size; index += tosend; avail -= tosend; @@ -352,6 +358,11 @@ static void stream_process(void *data) wanted = d[0].chunk->size; filled = spa_ringbuffer_get_write_index(&impl->ring, &index); + if (filled == 0 && impl->io_position) { + index = impl->ring.readindex = impl->ring.writeindex = + impl->io_position->clock.position * impl->frame_size; + } + if (filled + wanted > (int32_t)BUFFER_SIZE) { pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE); @@ -370,6 +381,16 @@ static void stream_process(void *data) flush_packets(impl); } +static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + struct impl *impl = data; + switch (id) { + case SPA_IO_Position: + impl->io_position = area; + break; + } +} + static void on_stream_state_changed(void *d, enum pw_stream_state old, enum pw_stream_state state, const char *error) { @@ -391,6 +412,7 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, static const struct pw_stream_events in_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, + .io_changed = stream_io_changed, .state_changed = on_stream_state_changed, .process = stream_process }; @@ -548,6 +570,7 @@ static void send_sap(struct impl *impl, bool bye) struct sap_header header; struct iovec iov[4]; struct msghdr msg; + struct spa_strbuf buf; spa_zero(header); header.v = 1; @@ -580,7 +603,8 @@ static void send_sap(struct impl *impl, bool bye) if (is_multicast((struct sockaddr*)&impl->dst_addr, impl->dst_len)) snprintf(dst_ttl, sizeof(dst_ttl), "/%d", impl->ttl); - snprintf(buffer, sizeof(buffer), + spa_strbuf_init(&buf, buffer, sizeof(buffer)); + spa_strbuf_append(&buf, "v=0\n" "o=%s %u 0 IN %s %s\n" "s=%s\n" @@ -602,6 +626,16 @@ static void send_sap(struct impl *impl, bool bye) impl->info.rate, impl->info.channels, (impl->pbytes / impl->frame_size) * 1000 / impl->info.rate); + if (impl->ts_refclk[0] != '\0') { + spa_strbuf_append(&buf, + "a=ts-refclk:%s\n" + "a=mediaclk:direct=%u\n", + impl->ts_refclk, + impl->ts_offset); + } else { + spa_strbuf_append(&buf, "a=mediaclk:sender\n"); + } + iov[3].iov_base = buffer; iov[3].iov_len = strlen(buffer); @@ -794,6 +828,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_properties *props = NULL, *stream_props = NULL; uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); uint32_t pid = getpid(), port, min_bytes, max_bytes; + int64_t ts_offset; char addr[64]; const char *str; int res = 0; @@ -873,7 +908,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->payload = 127; impl->seq = rand(); - impl->timestamp = rand(); impl->ssrc = rand(); str = pw_properties_get(props, "local.ifname"); @@ -907,8 +941,19 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); - impl->min_ptime = pw_properties_get_uint32(props, "sess.min-ptime", DEFAULT_MIN_PTIME); - impl->max_ptime = pw_properties_get_uint32(props, "sess.max-ptime", DEFAULT_MAX_PTIME); + ts_offset = pw_properties_get_int64(props, "sess.ts-offset", DEFAULT_TS_OFFSET); + impl->ts_offset = ts_offset < 0 ? rand() : ts_offset; + + str = pw_properties_get(props, "sess.ts-refclk"); + if (str != NULL) + snprintf(impl->ts_refclk, sizeof(impl->ts_refclk), "%s", str); + + str = pw_properties_get(props, "sess.min-ptime"); + if (!spa_atof(str, &impl->min_ptime)) + impl->min_ptime = DEFAULT_MIN_PTIME; + str = pw_properties_get(props, "sess.max-ptime"); + if (!spa_atof(str, &impl->max_ptime)) + impl->max_ptime = DEFAULT_MAX_PTIME; min_bytes = (impl->min_ptime * impl->info.rate / 1000) * impl->frame_size; max_bytes = (impl->max_ptime * impl->info.rate / 1000) * impl->frame_size; diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 21400256d..31acc78e1 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -99,11 +99,14 @@ * #rtp.payload = "127" * #rtp.fmt = "L16/48000/2" * #rtp.session = "PipeWire RTP Stream on fedora" + * #rtp.ts-offset = 0 + * #rtp.ts-refclk = "private" * } * ] * actions = { * create-stream = { * #sess.latency.msec = 100 + * #sess.ts-direct = false * #target.object = "" * } * } @@ -216,6 +219,9 @@ struct sdp_info { const struct format_info *format_info; struct spa_audio_info_raw info; uint32_t stride; + + uint32_t ts_offset; + char refclk[64]; }; struct session { @@ -241,6 +247,7 @@ struct session { uint8_t buffer[BUFFER_SIZE]; struct spa_io_rate_match *rate_match; + struct spa_io_position *position; struct spa_dll dll; uint32_t target_buffer; uint32_t last_packet_size; @@ -248,6 +255,7 @@ struct session { unsigned buffering:1; unsigned first:1; unsigned receiving:1; + unsigned direct_timestamp:1; }; static void stream_destroy(void *d) @@ -275,6 +283,10 @@ static void stream_process(void *data) SPA_MIN(buf->requested * sess->info.stride, d[0].maxsize) : d[0].maxsize; + if (sess->position && sess->direct_timestamp) { + spa_ringbuffer_read_update(&sess->ring, + sess->position->clock.position * sess->info.stride); + } avail = spa_ringbuffer_get_read_index(&sess->ring, &index); target_buffer = sess->target_buffer + sess->last_packet_size / 2; @@ -311,7 +323,7 @@ static void stream_process(void *data) pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, target_buffer, error, corr); - if (sess->rate_match) { + if (sess->rate_match && !sess->direct_timestamp) { SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); sess->rate_match->rate = 1.0f / corr; } @@ -340,6 +352,9 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size case SPA_IO_RateMatch: sess->rate_match = area; break; + case SPA_IO_Position: + sess->position = area; + break; } } @@ -395,9 +410,12 @@ on_rtp_io(void *data, int fd, uint32_t mask) filled = spa_ringbuffer_get_write_index(&sess->ring, &index); - timestamp = ntohl(hdr->timestamp); + timestamp = ntohl(hdr->timestamp) - sess->info.ts_offset; expected_index = timestamp * sess->info.stride; + if (sess->direct_timestamp) + expected_index += sess->target_buffer; + if (!sess->have_sync) { pw_log_trace("got rtp, no sync"); sess->ring.readindex = sess->ring.writeindex = @@ -695,6 +713,8 @@ static int session_new(struct impl *impl, struct sdp_info *info) } else { pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream"); } + pw_properties_setf(props, "rtp.ts-offset", "%u", info->ts_offset); + pw_properties_set(props, "rtp.ts-refclk", info->refclk); if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { struct session_info sinfo = { @@ -710,8 +730,10 @@ static int session_new(struct impl *impl, struct sdp_info *info) goto error; } } + session->direct_timestamp = pw_properties_get_bool(props, "sess.ts-direct", false); - pw_log_info("new session %s %s", info->origin, info->session); + pw_log_info("new session %s %s direct:%d", info->origin, info->session, + session->direct_timestamp); sess_latency_msec = pw_properties_get_uint32(props, "sess.latency.msec", impl->sess_latency_msec); @@ -872,7 +894,7 @@ static int parse_sdp_i(struct impl *impl, char *c, struct sdp_info *info) return 0; } -static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info) +static int parse_sdp_a_rtpmap(struct impl *impl, char *c, struct sdp_info *info) { int payload, len, rate, channels; @@ -904,7 +926,7 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info) if (sscanf(c, "%u/%u", &rate, &channels) == 2) { info->info.rate = rate; info->info.channels = channels; - pw_log_info("rate: %d, ch: %d", rate, channels); + pw_log_debug("rate: %d, ch: %d", rate, channels); if (channels == 2) { info->info.position[0] = SPA_AUDIO_CHANNEL_FL; info->info.position[1] = SPA_AUDIO_CHANNEL_FR; @@ -920,6 +942,35 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info) return 0; } +static int parse_sdp_a_mediaclk(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!spa_strstartswith(c, "a=mediaclk:")) + return 0; + + c += strlen("a=mediaclk:"); + + if (spa_strstartswith(c, "direct=")) { + int offset; + c += strlen("direct="); + if (sscanf(c, "%i", &offset) != 1) + return -EINVAL; + info->ts_offset = offset; + } else if (spa_strstartswith(c, "sender")) { + info->ts_offset = 0; + } + return 0; +} + +static int parse_sdp_a_ts_refclk(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!spa_strstartswith(c, "a=ts-refclk:")) + return 0; + + c += strlen("a=ts-refclk:"); + snprintf(info->refclk, sizeof(info->refclk), "%s", c); + return 0; +} + static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) { char *s = sdp; @@ -944,8 +995,12 @@ static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) res = parse_sdp_c(impl, s, info); else if (spa_strstartswith(s, "m=")) res = parse_sdp_m(impl, s, info); - else if (spa_strstartswith(s, "a=")) - res = parse_sdp_a(impl, s, info); + else if (spa_strstartswith(s, "a=rtpmap:")) + res = parse_sdp_a_rtpmap(impl, s, info); + else if (spa_strstartswith(s, "a=mediaclk:")) + res = parse_sdp_a_mediaclk(impl, s, info); + else if (spa_strstartswith(s, "a=ts-refclk:")) + res = parse_sdp_a_ts_refclk(impl, s, info); else if (spa_strstartswith(s, "i=")) res = parse_sdp_i(impl, s, info);