sap: only send bye + new SAP when SDP changed

Reorganize some code to separate the creation and sending of the SAP
message.

Check if when the node changed, we have an actual change in the SDP
before we send BYE and the new SAP message. It's possible that nothing
changed, for example when the node simply changed state or an unrelated
property.
This commit is contained in:
Wim Taymans 2025-01-21 13:17:36 +01:00
parent e57a01594e
commit a44afd84ff

View file

@ -155,6 +155,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_TTL 1
#define DEFAULT_LOOP false
#define MAX_SDP 2048
#define USAGE "( local.ifname=<local interface name to use> ) " \
"( sap.ip=<SAP IP address to send announce, default:"DEFAULT_SAP_IP"> ) " \
"( sap.port=<SAP port to send on, default:"SPA_STRINGIFY(DEFAULT_SAP_PORT)"> ) " \
@ -222,6 +224,8 @@ struct session {
struct sdp_info info;
unsigned has_sent_sap:1;
unsigned has_sdp:1;
char sdp[MAX_SDP];
struct pw_properties *props;
@ -653,45 +657,19 @@ static void update_ts_refclk(struct impl *impl)
memcpy(impl->gm_id, gmid, 8);
}
static int send_sap(struct impl *impl, struct session *sess, bool bye)
static int make_sdp(struct impl *impl, struct session *sess, char *buffer, size_t buffer_size)
{
char buffer[2048], src_addr[64], dst_addr[64], dst_ttl[8];
const char *user_name;
struct sockaddr *sa = (struct sockaddr*)&impl->src_addr;
struct sap_header header;
struct iovec iov[4];
struct msghdr msg;
struct spa_strbuf buf;
char src_addr[64], dst_addr[64], dst_ttl[8];
struct sdp_info *sdp = &sess->info;
bool src_ip4, dst_ip4;
bool multicast;
const char *user_name;
struct spa_strbuf buf;
int res;
if (!sess->has_sent_sap && bye)
return 0;
spa_zero(header);
header.v = 1;
header.t = bye;
header.msg_id_hash = sdp->hash;
iov[0].iov_base = &header;
iov[0].iov_len = sizeof(header);
if ((res = pw_net_get_ip(&impl->src_addr, src_addr, sizeof(src_addr), &src_ip4, NULL)) < 0)
return res;
if (src_ip4) {
iov[1].iov_base = &((struct sockaddr_in*) sa)->sin_addr;
iov[1].iov_len = 4U;
} else {
iov[1].iov_base = &((struct sockaddr_in6*) sa)->sin6_addr;
iov[1].iov_len = 16U;
header.a = 1;
}
iov[2].iov_base = SAP_MIME_TYPE;
iov[2].iov_len = sizeof(SAP_MIME_TYPE);
if ((res = pw_net_get_ip(&sdp->dst_addr, dst_addr, sizeof(dst_addr), &dst_ip4, NULL)) < 0)
return res;
@ -704,7 +682,7 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye)
if (multicast)
snprintf(dst_ttl, sizeof(dst_ttl), "/%d", sdp->ttl);
spa_strbuf_init(&buf, buffer, sizeof(buffer));
spa_strbuf_init(&buf, buffer, buffer_size);
/* Don't add any sdp records in between this definition or change the order
it will break compatibility with Dante/AES67 devices. Add new records to
the end. */
@ -792,10 +770,83 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye)
if (impl->extra_attrs_end)
spa_strbuf_append(&buf, "%s", impl->extra_attrs_end);
pw_log_debug("sending SAP for %u %s", sess->node->id, buffer);
return 0;
}
iov[3].iov_base = buffer;
iov[3].iov_len = strlen(buffer);
static int send_sap(struct impl *impl, struct session *sess, bool bye)
{
struct sap_header header;
struct iovec iov[4];
struct msghdr msg;
struct sdp_info *sdp = &sess->info;
int res;
if (!sess->has_sent_sap && bye)
return 0;
if (impl->sap_fd == -1) {
int fd;
char addr[64];
const char *str;
if ((str = pw_properties_get(sess->props, "source.ip")) == NULL) {
if (impl->ifname) {
int fd = socket(impl->sap_addr.ss_family, SOCK_DGRAM, 0);
if (fd >= 0) {
struct ifreq req;
spa_zero(req);
req.ifr_addr.sa_family = impl->sap_addr.ss_family;
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", impl->ifname);
res = ioctl(fd, SIOCGIFADDR, &req);
if (res < 0)
pw_log_warn("SIOCGIFADDR %s failed: %m", impl->ifname);
str = inet_ntop(req.ifr_addr.sa_family,
&((struct sockaddr_in *)&req.ifr_addr)->sin_addr,
addr, sizeof(addr));
if (str == NULL) {
pw_log_warn("can't parse interface ip: %m");
} else {
pw_log_info("interface %s IP: %s", impl->ifname, str);
}
close(fd);
}
}
if (str == NULL)
str = impl->sap_addr.ss_family == AF_INET ?
DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6;
}
if ((res = pw_net_parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) {
pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res));
return res;
}
if ((fd = make_send_socket(&impl->src_addr, impl->src_len,
&impl->sap_addr, impl->sap_len,
impl->mcast_loop, impl->ttl)) < 0)
return fd;
impl->sap_fd = fd;
}
spa_zero(header);
header.v = 1;
header.t = bye;
header.msg_id_hash = sdp->hash;
iov[0].iov_base = &header;
iov[0].iov_len = sizeof(header);
if (impl->src_addr.ss_family == AF_INET) {
iov[1].iov_base = &((struct sockaddr_in*) &impl->src_addr)->sin_addr;
iov[1].iov_len = 4U;
} else {
iov[1].iov_base = &((struct sockaddr_in6*) &impl->src_addr)->sin6_addr;
iov[1].iov_len = 16U;
header.a = 1;
}
iov[2].iov_base = SAP_MIME_TYPE;
iov[2].iov_len = sizeof(SAP_MIME_TYPE);
iov[3].iov_base = sess->sdp;
iov[3].iov_len = strlen(sess->sdp);
msg.msg_name = NULL;
msg.msg_namelen = 0;
@ -805,6 +856,8 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye)
msg.msg_controllen = 0;
msg.msg_flags = 0;
pw_log_debug("sending SAP for %u %s", sess->node->id, sess->sdp);
res = sendmsg(impl->sap_fd, &msg, MSG_NOSIGNAL);
if (res < 0)
res = -errno;
@ -849,101 +902,50 @@ static struct session *session_find(struct impl *impl, const struct sdp_info *in
return NULL;
}
static inline void replace_str(char **dst, const char *val)
{
free(*dst);
*dst = val ? strdup(val) : NULL;
}
static struct session *session_new_announce(struct impl *impl, struct node *node,
struct pw_properties *props)
{
char buffer[MAX_SDP];
struct session *sess = NULL;
struct sdp_info *sdp;
const char *str;
uint32_t port;
int res;
// We want to recreate the session with updated parameters, maybe
if (node->session)
session_free(node->session);
if (impl->n_sessions >= impl->max_sessions) {
pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, impl->max_sessions);
errno = EMFILE;
return NULL;
}
if (impl->sap_fd == -1) {
int fd;
char addr[64];
if ((str = pw_properties_get(props, "source.ip")) == NULL) {
if (impl->ifname) {
int fd = socket(impl->sap_addr.ss_family, SOCK_DGRAM, 0);
if (fd >= 0) {
struct ifreq req;
spa_zero(req);
req.ifr_addr.sa_family = impl->sap_addr.ss_family;
snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", impl->ifname);
res = ioctl(fd, SIOCGIFADDR, &req);
if (res < 0)
pw_log_warn("SIOCGIFADDR %s failed: %m", impl->ifname);
str = inet_ntop(req.ifr_addr.sa_family,
&((struct sockaddr_in *)&req.ifr_addr)->sin_addr,
addr, sizeof(addr));
if (str == NULL) {
pw_log_warn("can't parse interface ip: %m");
} else {
pw_log_info("interface %s IP: %s", impl->ifname, str);
}
close(fd);
}
}
if (str == NULL)
str = impl->sap_addr.ss_family == AF_INET ?
DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6;
}
if ((res = pw_net_parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) {
pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res));
sess = node->session;
if (sess == NULL) {
if (impl->n_sessions >= impl->max_sessions) {
pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, impl->max_sessions);
errno = EMFILE;
return NULL;
}
if ((fd = make_send_socket(&impl->src_addr, impl->src_len,
&impl->sap_addr, impl->sap_len,
impl->mcast_loop, impl->ttl)) < 0)
sess = calloc(1, sizeof(struct session));
if (sess == NULL)
return NULL;
impl->sap_fd = fd;
pw_log_info("created new session for node:%u", node->id);
node->session = sess;
sess->node = node;
sess->impl = impl;
sess->announce = true;
spa_list_append(&impl->sessions, &sess->link);
impl->n_sessions++;
}
sess = calloc(1, sizeof(struct session));
if (sess == NULL)
return NULL;
sdp = &sess->info;
sess->announce = true;
sdp->hash = pw_rand32();
if ((str = pw_properties_get(props, "sess.id")) != NULL) {
if (!spa_atou32(str, &sdp->session_id, 10)) {
pw_log_error("Invalid session id: %s (must be a uint32)", str);
goto error_free;
}
sdp->t_ntp = pw_properties_get_uint32(props, "rtp.ntp",
(uint32_t) time(NULL) + 2208988800U + impl->n_sessions);
} else {
sdp->session_id = (uint32_t) time(NULL) + 2208988800U + impl->n_sessions;
sdp->t_ntp = pw_properties_get_uint32(props, "rtp.ntp", sdp->session_id);
}
if ((str = pw_properties_get(props, "sess.version")) != NULL) {
if (!spa_atou32(str, &sdp->session_version, 10)) {
pw_log_error("Invalid session version: %s (must be a uint32)", str);
goto error_free;
}
} else {
sdp->session_version = sdp->t_ntp;
}
pw_properties_free(sess->props);
sess->props = props;
if ((str = pw_properties_get(props, "sess.name")) == NULL)
str = pw_get_host_name();
sdp->session_name = strdup(str);
replace_str(&sdp->session_name, str);
if ((str = pw_properties_get(props, "rtp.destination.port")) == NULL)
goto error_free;
@ -968,18 +970,20 @@ static struct session *session_new_announce(struct impl *impl, struct node *node
if (!spa_atou32(str, &sdp->framecount, 0))
sdp->framecount = 0;
if ((str = pw_properties_get(props, "rtp.media")) != NULL)
sdp->media_type = strdup(str);
if ((str = pw_properties_get(props, "rtp.mime")) != NULL)
sdp->mime_type = strdup(str);
str = pw_properties_get(props, "rtp.media");
replace_str(&sdp->media_type, str);
str = pw_properties_get(props, "rtp.mime");
replace_str(&sdp->mime_type, str);
if ((str = pw_properties_get(props, "rtp.rate")) != NULL)
sdp->rate = atoi(str);
if ((str = pw_properties_get(props, "rtp.channels")) != NULL)
sdp->channels = atoi(str);
if ((str = pw_properties_get(props, "rtp.ts-offset")) != NULL)
sdp->ts_offset = atoi(str);
if ((str = pw_properties_get(props, "rtp.ts-refclk")) != NULL)
sdp->ts_refclk = strdup(str);
str = pw_properties_get(props, "rtp.ts-refclk");
replace_str(&sdp->ts_refclk, str);
sess->ts_refclk_ptp = pw_properties_get_bool(props, "rtp.fetch-ts-refclk", false);
if ((str = pw_properties_get(props, PW_KEY_NODE_CHANNELNAMES)) != NULL) {
struct spa_strbuf buf;
@ -994,15 +998,39 @@ static struct session *session_new_announce(struct impl *impl, struct node *node
spa_strbuf_append(&buf, "%s%s", count++ > 0 ? ", " : "", v);
}
}
make_sdp(impl, sess, buffer, sizeof(buffer));
pw_log_info("created new session for node:%u", node->id);
node->session = sess;
sess->node = node;
/* we had no sdp or something changed */
if (!sess->has_sdp || strcmp(buffer, sess->sdp) != 0) {
/* send bye on the old session */
send_sap(impl, sess, 1);
sess->impl = impl;
spa_list_append(&impl->sessions, &sess->link);
impl->n_sessions++;
/* update the version and hash */
sdp->hash = pw_rand32();
if ((str = pw_properties_get(props, "sess.id")) != NULL) {
if (!spa_atou32(str, &sdp->session_id, 10)) {
pw_log_error("Invalid session id: %s (must be a uint32)", str);
goto error_free;
}
sdp->t_ntp = pw_properties_get_uint32(props, "rtp.ntp",
(uint32_t) time(NULL) + 2208988800U + impl->n_sessions);
} else {
sdp->session_id = (uint32_t) time(NULL) + 2208988800U + impl->n_sessions;
sdp->t_ntp = pw_properties_get_uint32(props, "rtp.ntp", sdp->session_id);
}
if ((str = pw_properties_get(props, "sess.version")) != NULL) {
if (!spa_atou32(str, &sdp->session_version, 10)) {
pw_log_error("Invalid session version: %s (must be a uint32)", str);
goto error_free;
}
} else {
sdp->session_version = sdp->t_ntp;
}
/* make an updated SDP for sending */
make_sdp(impl, sess, sess->sdp, sizeof(sess->sdp));
sess->has_sdp = true;
}
send_sap(impl, sess, 0);
return sess;
@ -1522,7 +1550,7 @@ on_sap_io(void *data, int fd, uint32_t mask)
int res;
if (mask & SPA_IO_IN) {
uint8_t buffer[2048];
uint8_t buffer[MAX_SDP];
ssize_t len;
if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) {
@ -1810,7 +1838,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->extra_attrs_preamble = NULL;
impl->extra_attrs_end = NULL;
char buffer[2048];
char buffer[MAX_SDP];
struct spa_strbuf buf;
if ((str = pw_properties_get(props, "sap.preamble-extra")) != NULL) {
spa_strbuf_init(&buf, buffer, sizeof(buffer));