module-rtp: add receive to ringbuffer

This commit is contained in:
Wim Taymans 2022-10-04 22:48:26 +02:00
parent 6f1e96bb59
commit f20959d62f

View file

@ -118,6 +118,8 @@ struct impl {
}; };
struct sdp_info { struct sdp_info {
uint16_t hash;
char origin[128]; char origin[128];
char session[256]; char session[256];
@ -177,7 +179,7 @@ static void playback_process(void *data)
avail = spa_ringbuffer_get_read_index(&sess->ring, &index); avail = spa_ringbuffer_get_read_index(&sess->ring, &index);
if (avail < wanted) { if (avail < wanted) {
pw_log_debug("capture underrun %d < %d", avail, wanted); pw_log_warn("capture underrun %d < %d", avail, wanted);
memset(d[0].data, 0, wanted); memset(d[0].data, 0, wanted);
} else { } else {
spa_ringbuffer_read_data(&sess->ring, spa_ringbuffer_read_data(&sess->ring,
@ -222,6 +224,115 @@ static const struct pw_stream_events out_stream_events = {
.process = playback_process .process = playback_process
}; };
static void
on_rtp_io(void *data, int fd, uint32_t mask)
{
struct session *sess = data;
if (mask & SPA_IO_IN) {
uint8_t buffer[2048], *payload;
ssize_t len, hlen;
struct rtp_header *hdr;
uint32_t index;
int32_t filled;
pw_log_info("got rtp");
if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) {
pw_log_warn("recv error: %m");
return;
}
if (len < 12)
return;
hdr = (struct rtp_header*)buffer;
if (hdr->v != 2)
return;
hlen = 12 + hdr->cc * 4;
if (hlen > len)
return;
len -= hlen;
payload = &buffer[hlen];
filled = spa_ringbuffer_get_write_index(&sess->ring, &index);
if (filled + len > BUFFER_SIZE) {
pw_log_warn("capture overrun");
} else {
spa_ringbuffer_write_data(&sess->ring,
sess->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
payload, len);
index += len;
spa_ringbuffer_write_update(&sess->ring, index);
}
}
}
static int make_multicast_socket(const struct sockaddr* sa, socklen_t salen)
{
int af, fd, val, res;
af = sa->sa_family;
if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) {
pw_log_error("socket failed: %m");
return -errno;
}
#ifdef SO_TIMESTAMP
val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &val, sizeof(val)) < 0) {
res = -errno;
pw_log_error("setsockopt failed: %m");
goto error;
}
#endif
val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
res = -errno;
pw_log_error("setsockopt failed: %m");
goto error;
}
res = 0;
if (af == AF_INET) {
static const uint32_t ipv4_mcast_mask = 0xe0000000;
const struct sockaddr_in *sa4 = (struct sockaddr_in*)sa;
if ((ntohl(sa4->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
struct ip_mreq mr4;
memset(&mr4, 0, sizeof(mr4));
mr4.imr_multiaddr = sa4->sin_addr;
res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
}
} else if (af == AF_INET6) {
const struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)sa;
if (sa6->sin6_addr.s6_addr[0] == 0xff) {
struct ipv6_mreq mr6;
memset(&mr6, 0, sizeof(mr6));
mr6.ipv6mr_multiaddr = sa6->sin6_addr;
res = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
}
} else {
res = -EINVAL;
goto error;
}
if (res < 0) {
res = -errno;
pw_log_error("join mcast failed: %m");
goto error;
}
if (bind(fd, sa, salen) < 0) {
res = -errno;
pw_log_warn("bind() failed: %m");
goto error;
}
return fd;
error:
return res;
}
static int session_new(struct impl *impl, struct sdp_info *sdp) static int session_new(struct impl *impl, struct sdp_info *sdp)
{ {
struct session *session; struct session *session;
@ -230,7 +341,7 @@ static int session_new(struct impl *impl, struct sdp_info *sdp)
uint32_t n_params; uint32_t n_params;
uint8_t buffer[1024]; uint8_t buffer[1024];
struct pw_properties *props; struct pw_properties *props;
int res; int res, fd;
session = calloc(1, sizeof(struct session)); session = calloc(1, sizeof(struct session));
if (session == NULL) if (session == NULL)
@ -268,6 +379,16 @@ static int session_new(struct impl *impl, struct sdp_info *sdp)
params, n_params)) < 0) params, n_params)) < 0)
return res; return res;
if ((fd = make_multicast_socket((const struct sockaddr *)&sdp->sa, sdp->salen)) < 0)
return fd;
session->source = pw_loop_add_io(impl->loop, fd,
SPA_IO_IN, true, on_rtp_io, session);
if (session->source == NULL)
return -errno;
spa_list_append(&impl->sessions, &session->link);
return 0; return 0;
} }
@ -276,9 +397,22 @@ static void session_free(struct session *sess)
spa_list_remove(&sess->link); spa_list_remove(&sess->link);
if (sess->playback) if (sess->playback)
pw_stream_destroy(sess->playback); pw_stream_destroy(sess->playback);
if (sess->source)
pw_loop_destroy_source(sess->impl->loop, sess->source);
free(sess); free(sess);
} }
static struct session *session_find(struct impl *impl, struct sdp_info *info)
{
struct session *sess;
spa_list_for_each(sess, &impl->sessions, link) {
if (info->hash == sess->info.hash &&
spa_streq(info->origin, sess->info.origin))
return sess;
}
return NULL;
}
static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info)
{ {
int res; int res;
@ -287,7 +421,7 @@ static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info)
if (spa_strstartswith(c, "c=IN IP4 ")) { if (spa_strstartswith(c, "c=IN IP4 ")) {
struct sockaddr_in *sa = (struct sockaddr_in*) &info->sa; struct sockaddr_in *sa = (struct sockaddr_in*) &info->sa;
c += sizeof("c=IN IP4 "); c += strlen("c=IN IP4 ");
if (inet_pton(AF_INET, c, &sa->sin_addr) <= 0) { if (inet_pton(AF_INET, c, &sa->sin_addr) <= 0) {
res = -errno; res = -errno;
pw_log_warn("inet_pton(%s) failed: %m", c); pw_log_warn("inet_pton(%s) failed: %m", c);
@ -299,7 +433,7 @@ static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info)
else if (spa_strstartswith(c, "c=IN IP6 ")) { else if (spa_strstartswith(c, "c=IN IP6 ")) {
struct sockaddr_in6 *sa = (struct sockaddr_in6*) &info->sa; struct sockaddr_in6 *sa = (struct sockaddr_in6*) &info->sa;
c += sizeof("c=IN IP6 "); c += strlen("c=IN IP6 ");
if (inet_pton(AF_INET6, c, &sa->sin6_addr) <= 0) { if (inet_pton(AF_INET6, c, &sa->sin6_addr) <= 0) {
res = -errno; res = -errno;
pw_log_warn("inet_pton(%s) failed: %m", c); pw_log_warn("inet_pton(%s) failed: %m", c);
@ -324,7 +458,7 @@ static int parse_sdp_m(struct impl *impl, char *c, struct sdp_info *info)
if (!spa_strstartswith(c, "m=audio ")) if (!spa_strstartswith(c, "m=audio "))
return -EINVAL; return -EINVAL;
c += sizeof("m=audio "); c += strlen("m=audio ");
if (sscanf(c, "%i RTP/AVP %i", &port, &payload) != 2) if (sscanf(c, "%i RTP/AVP %i", &port, &payload) != 2)
return -EINVAL; return -EINVAL;
@ -347,7 +481,7 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info)
if (!spa_strstartswith(c, "a=rtpmap:")) if (!spa_strstartswith(c, "a=rtpmap:"))
return 0; return 0;
c += sizeof("a=rtpmap:"); c += strlen("a=rtpmap:");
if (sscanf(c, "%i %n", &payload, &len) != 1) if (sscanf(c, "%i %n", &payload, &len) != 1)
return -EINVAL; return -EINVAL;
@ -417,6 +551,11 @@ static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info)
while (isspace(*s)) while (isspace(*s))
s++; s++;
} }
if (((struct sockaddr*) &info->sa)->sa_family == AF_INET)
((struct sockaddr_in*) &info->sa)->sin_port = htons(info->port);
else
((struct sockaddr_in6*) &info->sa)->sin6_port = htons(info->port);
return 0; return 0;
too_short: too_short:
pw_log_warn("SDP: line starting with `%.6s...' too short", s); pw_log_warn("SDP: line starting with `%.6s...' too short", s);
@ -425,6 +564,7 @@ invalid_version:
pw_log_warn("SDP: invalid first version line `%*s'", (int)l, s); pw_log_warn("SDP: invalid first version line `%*s'", (int)l, s);
return -EINVAL; return -EINVAL;
error: error:
pw_log_warn("SDP: error: %s", spa_strerror(res));
return res; return res;
} }
@ -433,6 +573,7 @@ static int parse_sap(struct impl *impl, void *data, size_t len)
struct sap_header *header; struct sap_header *header;
char *mime, *sdp; char *mime, *sdp;
struct sdp_info info; struct sdp_info info;
struct session *sess;
int res; int res;
if (len < 8) if (len < 8)
@ -443,17 +584,26 @@ static int parse_sap(struct impl *impl, void *data, size_t len)
return -EINVAL; return -EINVAL;
mime = SPA_PTROFF(data, 8, char); mime = SPA_PTROFF(data, 8, char);
if (spa_strstartswith(mime, "v=0")) if (spa_strstartswith(mime, "v=0")) {
sdp = mime; sdp = mime;
else if (spa_streq(mime, "application/sdp")) mime = "application/sdp";
} else if (spa_streq(mime, "application/sdp"))
sdp = SPA_PTROFF(mime, strlen(mime)+1, char); sdp = SPA_PTROFF(mime, strlen(mime)+1, char);
else else
return -EINVAL; return -EINVAL;
pw_log_info("got sap: %s", mime);
spa_zero(info); spa_zero(info);
if ((res = parse_sdp(impl, sdp, &info)) < 0) if ((res = parse_sdp(impl, sdp, &info)) < 0)
return res; return res;
sess = session_find(impl, &info);
if (sess == NULL) {
session_new(impl, &info);
} else {
}
return res; return res;
} }
@ -466,7 +616,6 @@ on_sap_io(void *data, int fd, uint32_t mask)
uint8_t buffer[2048]; uint8_t buffer[2048];
ssize_t len; ssize_t len;
pw_log_info("got sap");
if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) { if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) {
pw_log_warn("recv error: %m"); pw_log_warn("recv error: %m");
return; return;
@ -485,72 +634,23 @@ static int start_sap_listener(struct impl *impl)
struct sockaddr_in6 sa6; struct sockaddr_in6 sa6;
struct sockaddr *sa; struct sockaddr *sa;
socklen_t salen; socklen_t salen;
int af, fd, val, res; int fd, res;
if (inet_pton(AF_INET, impl->local_ip, &sa4.sin_addr) > 0) { if (inet_pton(AF_INET, impl->local_ip, &sa4.sin_addr) > 0) {
af = sa4.sin_family = AF_INET; sa4.sin_family = AF_INET;
sa4.sin_port = htons(impl->local_port); sa4.sin_port = htons(impl->local_port);
sa = (struct sockaddr*) &sa4; sa = (struct sockaddr*) &sa4;
salen = sizeof(sa4); salen = sizeof(sa4);
} else if (inet_pton(AF_INET6, impl->local_ip, &sa6.sin6_addr) > 0) { } else if (inet_pton(AF_INET6, impl->local_ip, &sa6.sin6_addr) > 0) {
af = sa6.sin6_family = AF_INET6; sa6.sin6_family = AF_INET6;
sa6.sin6_port = htons(impl->local_port); sa6.sin6_port = htons(impl->local_port);
sa = (struct sockaddr*) &sa6; sa = (struct sockaddr*) &sa6;
salen = sizeof(sa6); salen = sizeof(sa6);
} else } else
return -EINVAL; return -EINVAL;
if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { if ((fd = make_multicast_socket(sa, salen)) < 0)
pw_log_error("socket failed: %m"); return fd;
return -errno;
}
#ifdef SO_TIMESTAMP
val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &val, sizeof(val)) < 0) {
res = -errno;
pw_log_error("setsockopt failed: %m");
goto error;
}
#endif
val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
res = -errno;
pw_log_error("setsockopt failed: %m");
goto error;
}
res = 0;
if (af == AF_INET) {
static const uint32_t ipv4_mcast_mask = 0xe0000000;
if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) &
ipv4_mcast_mask) == ipv4_mcast_mask) {
struct ip_mreq mr4;
memset(&mr4, 0, sizeof(mr4));
mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
}
} else if (af == AF_INET6) {
if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
struct ipv6_mreq mr6;
memset(&mr6, 0, sizeof(mr6));
mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
res = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
}
} else {
res = -EINVAL;
goto error;
}
if (res < 0) {
res = -errno;
pw_log_error("join mcast failed: %m");
goto error;
}
if (bind(fd, sa, salen) < 0) {
res = -errno;
pw_log_warn("bind() failed: %m");
goto error;
}
pw_log_info("starting SAP listener"); pw_log_info("starting SAP listener");
impl->sap_source = pw_loop_add_io(impl->loop, fd, impl->sap_source = pw_loop_add_io(impl->loop, fd,