module-rtp: support direct clock timestamps

Always use the timestamp of the graph clock for RTP packets. Add an
option to apply a random or fixed offset.
Add a ts-refclk option on the sender to specify a reference clock to
use for timestamping. This will activate the direct timestamp mode and
signal this in the SDP.

Parse ts-refclk and ts-offset from the SDP. Make it possible to match
them in rules. Add option to activate the direct-timestamp, where the
rate matching is disabled and the timestamps are used directly for
writing and reading from the ringbuffer using the graph clock.

This makes it possible to set a PTP clock on sink and source and avoid
rate matching by using a shared clock.
This commit is contained in:
Wim Taymans 2023-02-01 18:31:33 +01:00
parent 206df03c27
commit 1a5de467db
2 changed files with 116 additions and 16 deletions

View file

@ -70,6 +70,8 @@
* - `sess.min-ptime = <int>`: minimum packet time in milliseconds, default 2
* - `sess.max-ptime = <int>`: maximum packet time in milliseconds, default 20
* - `sess.name = <str>`: a session name
* - `sess.ts-offset = <int>`: an offset to apply to the timestamp, default -1 = random offset
* - `sess.ts-refclk = <string>`: the name of a reference clock
* - `stream.props = {}`: properties to be passed to the stream
*
* ## General options
@ -149,6 +151,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_MIN_PTIME 2
#define DEFAULT_MAX_PTIME 20
#define DEFAULT_TS_OFFSET -1
#define USAGE "sap.ip=<SAP IP address to send announce, default:"DEFAULT_SAP_IP"> " \
"sap.port=<SAP port to send on, default:"SPA_STRINGIFY(DEFAULT_SAP_PORT)"> " \
@ -212,6 +215,8 @@ struct impl {
struct pw_stream *stream;
struct spa_hook stream_listener;
struct spa_io_position *io_position;
unsigned int do_disconnect:1;
char *ifname;
@ -220,8 +225,8 @@ struct impl {
int mtu;
bool ttl;
bool mcast_loop;
uint32_t min_ptime;
uint32_t max_ptime;
float min_ptime;
float max_ptime;
uint32_t pbytes;
struct sockaddr_storage src_addr;
@ -243,8 +248,9 @@ struct impl {
uint32_t frame_size;
int payload;
uint16_t seq;
uint32_t timestamp;
uint32_t ssrc;
uint32_t ts_offset;
char ts_refclk[64];
struct spa_ringbuffer ring;
uint8_t buffer[BUFFER_SIZE];
@ -306,13 +312,14 @@ static void flush_packets(struct impl *impl)
while (avail >= tosend) {
header.sequence_number = htons(impl->seq);
header.timestamp = htonl(impl->timestamp);
header.timestamp = htonl(impl->ts_offset + index / impl->frame_size);
set_iovec(&impl->ring,
impl->buffer, BUFFER_SIZE,
index & BUFFER_MASK,
&iov[1], tosend);
pw_log_trace("sending %d index:%d", tosend, index);
n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL);
if (n < 0) {
switch (errno) {
@ -327,7 +334,6 @@ static void flush_packets(struct impl *impl)
}
impl->seq++;
impl->timestamp += tosend / impl->frame_size;
index += tosend;
avail -= tosend;
@ -352,6 +358,11 @@ static void stream_process(void *data)
wanted = d[0].chunk->size;
filled = spa_ringbuffer_get_write_index(&impl->ring, &index);
if (filled == 0 && impl->io_position) {
index = impl->ring.readindex = impl->ring.writeindex =
impl->io_position->clock.position * impl->frame_size;
}
if (filled + wanted > (int32_t)BUFFER_SIZE) {
pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE);
@ -370,6 +381,16 @@ static void stream_process(void *data)
flush_packets(impl);
}
static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct impl *impl = data;
switch (id) {
case SPA_IO_Position:
impl->io_position = area;
break;
}
}
static void on_stream_state_changed(void *d, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
@ -391,6 +412,7 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old,
static const struct pw_stream_events in_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.io_changed = stream_io_changed,
.state_changed = on_stream_state_changed,
.process = stream_process
};
@ -548,6 +570,7 @@ static void send_sap(struct impl *impl, bool bye)
struct sap_header header;
struct iovec iov[4];
struct msghdr msg;
struct spa_strbuf buf;
spa_zero(header);
header.v = 1;
@ -580,7 +603,8 @@ static void send_sap(struct impl *impl, bool bye)
if (is_multicast((struct sockaddr*)&impl->dst_addr, impl->dst_len))
snprintf(dst_ttl, sizeof(dst_ttl), "/%d", impl->ttl);
snprintf(buffer, sizeof(buffer),
spa_strbuf_init(&buf, buffer, sizeof(buffer));
spa_strbuf_append(&buf,
"v=0\n"
"o=%s %u 0 IN %s %s\n"
"s=%s\n"
@ -602,6 +626,16 @@ static void send_sap(struct impl *impl, bool bye)
impl->info.rate, impl->info.channels,
(impl->pbytes / impl->frame_size) * 1000 / impl->info.rate);
if (impl->ts_refclk[0] != '\0') {
spa_strbuf_append(&buf,
"a=ts-refclk:%s\n"
"a=mediaclk:direct=%u\n",
impl->ts_refclk,
impl->ts_offset);
} else {
spa_strbuf_append(&buf, "a=mediaclk:sender\n");
}
iov[3].iov_base = buffer;
iov[3].iov_len = strlen(buffer);
@ -794,6 +828,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
struct pw_properties *props = NULL, *stream_props = NULL;
uint32_t id = pw_global_get_id(pw_impl_module_get_global(module));
uint32_t pid = getpid(), port, min_bytes, max_bytes;
int64_t ts_offset;
char addr[64];
const char *str;
int res = 0;
@ -873,7 +908,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->payload = 127;
impl->seq = rand();
impl->timestamp = rand();
impl->ssrc = rand();
str = pw_properties_get(props, "local.ifname");
@ -907,8 +941,19 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
impl->min_ptime = pw_properties_get_uint32(props, "sess.min-ptime", DEFAULT_MIN_PTIME);
impl->max_ptime = pw_properties_get_uint32(props, "sess.max-ptime", DEFAULT_MAX_PTIME);
ts_offset = pw_properties_get_int64(props, "sess.ts-offset", DEFAULT_TS_OFFSET);
impl->ts_offset = ts_offset < 0 ? rand() : ts_offset;
str = pw_properties_get(props, "sess.ts-refclk");
if (str != NULL)
snprintf(impl->ts_refclk, sizeof(impl->ts_refclk), "%s", str);
str = pw_properties_get(props, "sess.min-ptime");
if (!spa_atof(str, &impl->min_ptime))
impl->min_ptime = DEFAULT_MIN_PTIME;
str = pw_properties_get(props, "sess.max-ptime");
if (!spa_atof(str, &impl->max_ptime))
impl->max_ptime = DEFAULT_MAX_PTIME;
min_bytes = (impl->min_ptime * impl->info.rate / 1000) * impl->frame_size;
max_bytes = (impl->max_ptime * impl->info.rate / 1000) * impl->frame_size;

View file

@ -99,11 +99,14 @@
* #rtp.payload = "127"
* #rtp.fmt = "L16/48000/2"
* #rtp.session = "PipeWire RTP Stream on fedora"
* #rtp.ts-offset = 0
* #rtp.ts-refclk = "private"
* }
* ]
* actions = {
* create-stream = {
* #sess.latency.msec = 100
* #sess.ts-direct = false
* #target.object = ""
* }
* }
@ -216,6 +219,9 @@ struct sdp_info {
const struct format_info *format_info;
struct spa_audio_info_raw info;
uint32_t stride;
uint32_t ts_offset;
char refclk[64];
};
struct session {
@ -241,6 +247,7 @@ struct session {
uint8_t buffer[BUFFER_SIZE];
struct spa_io_rate_match *rate_match;
struct spa_io_position *position;
struct spa_dll dll;
uint32_t target_buffer;
uint32_t last_packet_size;
@ -248,6 +255,7 @@ struct session {
unsigned buffering:1;
unsigned first:1;
unsigned receiving:1;
unsigned direct_timestamp:1;
};
static void stream_destroy(void *d)
@ -275,6 +283,10 @@ static void stream_process(void *data)
SPA_MIN(buf->requested * sess->info.stride, d[0].maxsize)
: d[0].maxsize;
if (sess->position && sess->direct_timestamp) {
spa_ringbuffer_read_update(&sess->ring,
sess->position->clock.position * sess->info.stride);
}
avail = spa_ringbuffer_get_read_index(&sess->ring, &index);
target_buffer = sess->target_buffer + sess->last_packet_size / 2;
@ -311,7 +323,7 @@ static void stream_process(void *data)
pw_log_debug("avail:%u target:%u error:%f corr:%f", avail,
target_buffer, error, corr);
if (sess->rate_match) {
if (sess->rate_match && !sess->direct_timestamp) {
SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE);
sess->rate_match->rate = 1.0f / corr;
}
@ -340,6 +352,9 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size
case SPA_IO_RateMatch:
sess->rate_match = area;
break;
case SPA_IO_Position:
sess->position = area;
break;
}
}
@ -395,9 +410,12 @@ on_rtp_io(void *data, int fd, uint32_t mask)
filled = spa_ringbuffer_get_write_index(&sess->ring, &index);
timestamp = ntohl(hdr->timestamp);
timestamp = ntohl(hdr->timestamp) - sess->info.ts_offset;
expected_index = timestamp * sess->info.stride;
if (sess->direct_timestamp)
expected_index += sess->target_buffer;
if (!sess->have_sync) {
pw_log_trace("got rtp, no sync");
sess->ring.readindex = sess->ring.writeindex =
@ -695,6 +713,8 @@ static int session_new(struct impl *impl, struct sdp_info *info)
} else {
pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream");
}
pw_properties_setf(props, "rtp.ts-offset", "%u", info->ts_offset);
pw_properties_set(props, "rtp.ts-refclk", info->refclk);
if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) {
struct session_info sinfo = {
@ -710,8 +730,10 @@ static int session_new(struct impl *impl, struct sdp_info *info)
goto error;
}
}
session->direct_timestamp = pw_properties_get_bool(props, "sess.ts-direct", false);
pw_log_info("new session %s %s", info->origin, info->session);
pw_log_info("new session %s %s direct:%d", info->origin, info->session,
session->direct_timestamp);
sess_latency_msec = pw_properties_get_uint32(props,
"sess.latency.msec", impl->sess_latency_msec);
@ -872,7 +894,7 @@ static int parse_sdp_i(struct impl *impl, char *c, struct sdp_info *info)
return 0;
}
static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info)
static int parse_sdp_a_rtpmap(struct impl *impl, char *c, struct sdp_info *info)
{
int payload, len, rate, channels;
@ -904,7 +926,7 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info)
if (sscanf(c, "%u/%u", &rate, &channels) == 2) {
info->info.rate = rate;
info->info.channels = channels;
pw_log_info("rate: %d, ch: %d", rate, channels);
pw_log_debug("rate: %d, ch: %d", rate, channels);
if (channels == 2) {
info->info.position[0] = SPA_AUDIO_CHANNEL_FL;
info->info.position[1] = SPA_AUDIO_CHANNEL_FR;
@ -920,6 +942,35 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info)
return 0;
}
static int parse_sdp_a_mediaclk(struct impl *impl, char *c, struct sdp_info *info)
{
if (!spa_strstartswith(c, "a=mediaclk:"))
return 0;
c += strlen("a=mediaclk:");
if (spa_strstartswith(c, "direct=")) {
int offset;
c += strlen("direct=");
if (sscanf(c, "%i", &offset) != 1)
return -EINVAL;
info->ts_offset = offset;
} else if (spa_strstartswith(c, "sender")) {
info->ts_offset = 0;
}
return 0;
}
static int parse_sdp_a_ts_refclk(struct impl *impl, char *c, struct sdp_info *info)
{
if (!spa_strstartswith(c, "a=ts-refclk:"))
return 0;
c += strlen("a=ts-refclk:");
snprintf(info->refclk, sizeof(info->refclk), "%s", c);
return 0;
}
static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info)
{
char *s = sdp;
@ -944,8 +995,12 @@ static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info)
res = parse_sdp_c(impl, s, info);
else if (spa_strstartswith(s, "m="))
res = parse_sdp_m(impl, s, info);
else if (spa_strstartswith(s, "a="))
res = parse_sdp_a(impl, s, info);
else if (spa_strstartswith(s, "a=rtpmap:"))
res = parse_sdp_a_rtpmap(impl, s, info);
else if (spa_strstartswith(s, "a=mediaclk:"))
res = parse_sdp_a_mediaclk(impl, s, info);
else if (spa_strstartswith(s, "a=ts-refclk:"))
res = parse_sdp_a_ts_refclk(impl, s, info);
else if (spa_strstartswith(s, "i="))
res = parse_sdp_i(impl, s, info);