diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index d505ba2ea..8ffd27307 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -17,38 +17,33 @@ #include #include -#include -#include #include -#include #include #include #include -#include - -/** \page page_module_rtp_sap PipeWire Module: Announce and receive RTP streams +/** \page page_module_rtp_sap PipeWire Module: Announce and create RTP streams * - * The `rtp-sap` module announces RTP stream with the sess.sap.announce property - * set to true. + * The `rtp-sap` module announces RTP streams that match the rules with the + * announce-stream action. * - * It will also create source rtp streams that are announced with SAP when they - * match the pattern. + * It will create source RTP streams that are announced with SAP when they + * match the rule with the create-stream action. * * ## Module Options * * Options specific to the behavior of this module * + * - `local.ifname = `: interface name to use * - `sap.ip = `: IP address of the SAP messages, default "224.0.0.56" * - `sap.port = `: port of the SAP messages, default 9875 + * - `sap.cleanup.sec = `: cleanup interval in seconds, default 90 seconds * - `source.ip =`: source IP address, default "0.0.0.0" - * - `destination.ip =`: destination IP address, default "224.0.0.56" - * - `destination.port =`: destination port, default random beteen 46000 and 47024 - * - `local.ifname = `: interface name to use * - `net.ttl = `: TTL to use, default 1 * - `net.loop = `: loopback multicast, default false + * - `stream.rules` = : match rules, use create-stream and announce-stream actions * * ## General options * @@ -61,16 +56,13 @@ * context.modules = [ * { name = libpipewire-module-rtp-sap * args = { + * #local.ifname = "eth0" * #sap.ip = "224.0.0.56" * #sap.port = 9875 + * #sap.cleanup.sec = 5 * #source.ip = "0.0.0.0" - * #local.ifname = "eth0" * #net.ttl = 1 * #net.loop = false - * stream.props = { - * #media.class = "Audio/Source" - * #node.name = "rtp-source" - * } * stream.rules = [ * { matches = [ * # any of the items in matches needs to match, if one does, @@ -82,6 +74,25 @@ * #rtp.session = "PipeWire RTP Stream on fedora" * #rtp.ts-offset = 0 * #rtp.ts-refclk = "private" + * sess.sap.announce = true + * } + * ] + * actions = { + * announce-stream = { + * #sess.latency.msec = 100 + * #sess.ts-direct = false + * #target.object = "" + * } + * } + * } + * { matches = [ + * { # all keys must match the value. ~ in value starts regex. + * #rtp.origin = "wim 3883629975 0 IN IP4 0.0.0.0" + * #rtp.payload = "127" + * #rtp.fmt = "L16/48000/2" + * #rtp.session = "PipeWire RTP Stream on fedora" + * #rtp.ts-offset = 0 + * #rtp.ts-refclk = "private" * } * ] * actions = { @@ -94,9 +105,8 @@ * } * ] * } - * } - *} - *] + * } + * ] *\endcode * * \since 0.3.67 @@ -107,58 +117,73 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic +#define MAX_SESSIONS 64 + +#define DEFAULT_CLEANUP_SEC 90 #define SAP_INTERVAL_SEC 5 #define SAP_MIME_TYPE "application/sdp" #define DEFAULT_SAP_IP "224.0.0.56" #define DEFAULT_SAP_PORT 9875 -#define DEFAULT_PORT 46000 #define DEFAULT_SOURCE_IP "0.0.0.0" -#define DEFAULT_DESTINATION_IP "224.0.0.56" #define DEFAULT_TTL 1 #define DEFAULT_LOOP false -#define USAGE "sap.ip= " \ +#define USAGE "local.ifname= " \ + "sap.ip= " \ "sap.port= " \ + "sap.cleanup.sec= " \ "source.ip= " \ - "destination.ip= " \ - "local.ifname= " \ "net.ttl= " \ - "net.loop= " + "net.loop= " \ + "stream.rules=, use announce-stream and create-stream actions " static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, - { PW_KEY_MODULE_DESCRIPTION, "RTP Sink" }, + { PW_KEY_MODULE_DESCRIPTION, "RTP SAP announce/listen" }, { PW_KEY_MODULE_USAGE, USAGE }, { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; -struct session { - struct spa_list link; - - struct impl *impl; - struct node *node; - - uint16_t msg_id_hash; +struct sdp_info { + uint16_t hash; uint32_t ntp; - uint32_t ts_offset; - char *ts_refclk; - + char *origin; + char *session_name; char *media_type; char *mime_type; - char *session_name; - int payload; - uint32_t rate; - uint32_t channels; - float ptime; + char channelmap[512]; uint16_t dst_port; struct sockaddr_storage dst_addr; socklen_t dst_len; uint16_t ttl; + uint16_t port; + uint8_t payload; + + uint32_t rate; + uint32_t channels; + + float ptime; + + uint32_t ts_offset; + char *ts_refclk; +}; + +struct session { + struct spa_list link; + + bool announce; + uint64_t timestamp; + + struct impl *impl; + struct node *node; + + struct sdp_info info; + unsigned has_sent_sap:1; struct pw_properties *props; @@ -190,14 +215,13 @@ struct impl { struct pw_core *core; struct spa_hook core_listener; struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; struct pw_registry *registry; struct spa_hook registry_listener; struct spa_source *timer; - unsigned int do_disconnect:1; - char *ifname; bool ttl; bool mcast_loop; @@ -208,12 +232,74 @@ struct impl { uint16_t sap_port; struct sockaddr_storage sap_addr; socklen_t sap_len; - int sap_fd; + struct spa_source *sap_source; + uint32_t cleanup_interval; + uint32_t n_sessions; struct spa_list sessions; }; +struct format_info { + uint32_t media_subtype; + uint32_t format; + uint32_t size; + const char *mime; + const char *media_type; + const char *format_str; +}; + +static const struct format_info audio_format_info[] = { + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_U8, 1, "L8", "audio", "U8" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio", "ALAW" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio", "ULAW" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio", "S16BE" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio", "S16LE" }, + { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "midi", NULL }, +}; + +static const struct format_info *find_audio_format_info(const char *mime) +{ + SPA_FOR_EACH_ELEMENT_VAR(audio_format_info, f) + if (spa_streq(f->mime, mime)) + return f; + return NULL; +} + +static void send_sap(struct impl *impl, struct session *sess, bool bye); + + +static void clear_sdp_info(struct sdp_info *info) +{ + free(info->origin); + free(info->session_name); + free(info->media_type); + free(info->mime_type); + free(info->ts_refclk); + spa_zero(*info); +} + +static void session_touch(struct session *sess) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + sess->timestamp = SPA_TIMESPEC_TO_NSEC(&ts); +} + +static void session_free(struct session *sess) +{ + struct impl *impl = sess->impl; + + if (sess->impl) { + send_sap(impl, sess, 1); + spa_list_remove(&sess->link); + impl->n_sessions++; + } + pw_properties_free(sess->props); + clear_sdp_info(&sess->info); + free(sess); +} + static int parse_address(const char *address, uint16_t port, struct sockaddr_storage *addr, socklen_t *len) { @@ -249,15 +335,60 @@ static bool is_multicast(struct sockaddr *sa, socklen_t salen) static int make_socket(struct sockaddr_storage *src, socklen_t src_len, struct sockaddr_storage *dst, socklen_t dst_len, - bool loop, int ttl) + bool loop, int ttl, char *ifname) { int af, fd, val, res; + struct ifreq req; af = src->ss_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { pw_log_error("socket failed: %m"); return -errno; } + val = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { + res = -errno; + pw_log_error("setsockopt failed: %m"); + goto error; + } + spa_zero(req); + if (ifname) { + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", ifname); + res = ioctl(fd, SIOCGIFINDEX, &req); + if (res < 0) + pw_log_warn("SIOCGIFINDEX %s failed: %m", ifname); + } + res = 0; + if (is_multicast((struct sockaddr*)dst, dst_len)) { + val = loop; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_MULTICAST_LOOP) failed: %m"); + + val = ttl; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_MULTICAST_TTL) failed: %m"); + + if (af == AF_INET) { + struct sockaddr_in *sa4 = (struct sockaddr_in*)dst; + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = sa4->sin_addr; + mr4.imr_ifindex = req.ifr_ifindex; + res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); + } else if (af == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)dst; + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = sa6->sin6_addr; + mr6.ipv6mr_interface = req.ifr_ifindex; + res = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); + } + if (res < 0) { + res = -errno; + pw_log_error("join mcast failed: %m"); + goto error; + } + } if (bind(fd, (struct sockaddr*)src, src_len) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -268,15 +399,6 @@ static int make_socket(struct sockaddr_storage *src, socklen_t src_len, pw_log_error("connect() failed: %m"); goto error; } - if (is_multicast((struct sockaddr*)dst, dst_len)) { - val = loop; - if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)) < 0) - pw_log_warn("setsockopt(IP_MULTICAST_LOOP) failed: %m"); - - val = ttl; - if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &val, sizeof(val)) < 0) - pw_log_warn("setsockopt(IP_MULTICAST_TTL) failed: %m"); - } return fd; error: close(fd); @@ -304,14 +426,17 @@ static void send_sap(struct impl *impl, struct session *sess, bool bye) struct iovec iov[4]; struct msghdr msg; struct spa_strbuf buf; + struct sdp_info *sdp = &sess->info; + if (!sess->announce) + return; if (!sess->has_sent_sap && bye) return; spa_zero(header); header.v = 1; header.t = bye; - header.msg_id_hash = sess->msg_id_hash; + header.msg_id_hash = sdp->hash; iov[0].iov_base = &header; iov[0].iov_len = sizeof(header); @@ -330,14 +455,14 @@ static void send_sap(struct impl *impl, struct session *sess, bool bye) iov[2].iov_len = sizeof(SAP_MIME_TYPE); get_ip(&impl->src_addr, src_addr, sizeof(src_addr)); - get_ip(&sess->dst_addr, dst_addr, sizeof(dst_addr)); + get_ip(&sdp->dst_addr, dst_addr, sizeof(dst_addr)); if ((user_name = pw_get_user_name()) == NULL) user_name = "-"; spa_zero(dst_ttl); - if (is_multicast((struct sockaddr*)&sess->dst_addr, sess->dst_len)) - snprintf(dst_ttl, sizeof(dst_ttl), "/%d", sess->ttl); + if (is_multicast((struct sockaddr*)&sdp->dst_addr, sdp->dst_len)) + snprintf(dst_ttl, sizeof(dst_ttl), "/%d", sdp->ttl); spa_strbuf_init(&buf, buffer, sizeof(buffer)); spa_strbuf_append(&buf, @@ -349,40 +474,47 @@ static void send_sap(struct impl *impl, struct session *sess, bool bye) "a=recvonly\n" "a=tool:PipeWire %s\n" "a=type:broadcast\n", - user_name, sess->ntp, af, src_addr, - sess->session_name, + user_name, sdp->ntp, af, src_addr, + sdp->session_name, af, dst_addr, dst_ttl, - sess->ntp, + sdp->ntp, pw_get_library_version()); spa_strbuf_append(&buf, "m=%s %u RTP/AVP %i\n", - sess->media_type, - sess->dst_port, sess->payload); + sdp->media_type, + sdp->dst_port, sdp->payload); - if (sess->channels) { + if (sdp->channels) { spa_strbuf_append(&buf, "a=rtpmap:%i %s/%u/%u\n", - sess->payload, sess->mime_type, - sess->rate, sess->channels); + sdp->payload, sdp->mime_type, + sdp->rate, sdp->channels); + if (sdp->channelmap[0] != 0) { + spa_strbuf_append(&buf, + "i=%d channels: %s\n", sdp->channels, + sdp->channelmap); + } } else { spa_strbuf_append(&buf, "a=rtpmap:%i %s/%u\n", - sess->payload, sess->mime_type, sess->rate); + sdp->payload, sdp->mime_type, sdp->rate); } - if (sess->ptime != 0) + if (sdp->ptime != 0) spa_strbuf_append(&buf, - "a=ptime:%f\n", sess->ptime); + "a=ptime:%f\n", sdp->ptime); - if (sess->ts_refclk != NULL) { + if (sdp->ts_refclk != NULL) { spa_strbuf_append(&buf, "a=ts-refclk:%s\n" "a=mediaclk:direct=%u\n", - sess->ts_refclk, - sess->ts_offset); + sdp->ts_refclk, + sdp->ts_offset); } else { spa_strbuf_append(&buf, "a=mediaclk:sender\n"); } + pw_log_debug("sending SAP for %u", sess->node->id); + iov[3].iov_base = buffer; iov[3].iov_len = strlen(buffer); @@ -404,8 +536,558 @@ static void on_timer_event(void *data, uint64_t expirations) struct impl *impl = data; struct session *sess; - spa_list_for_each(sess, &impl->sessions, link) { + spa_list_for_each(sess, &impl->sessions, link) send_sap(impl, sess, 0); +} + +static struct session *session_find(struct impl *impl, const struct sdp_info *info) +{ + struct session *sess; + spa_list_for_each(sess, &impl->sessions, link) { + if (info->hash == sess->info.hash && + spa_streq(info->origin, sess->info.origin)) + return sess; + } + return NULL; +} + +static struct session *session_new_announce(struct impl *impl, struct node *node, + struct pw_properties *props) +{ + struct session *sess = NULL; + struct sdp_info *sdp; + const char *str; + uint32_t port; + int res; + + if (impl->n_sessions >= MAX_SESSIONS) { + pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); + errno = EMFILE; + return NULL; + } + + sess = calloc(1, sizeof(struct session)); + if (sess == NULL) + return NULL; + + sdp = &sess->info; + + sess->announce = true; + + sdp->hash = rand(); + sdp->ntp = (uint32_t) time(NULL) + 2208988800U; + sess->props = props; + + if ((str = pw_properties_get(props, "rtp.session")) != NULL) + sdp->session_name = strdup(str); + + if ((str = pw_properties_get(props, "rtp.destination.port")) == NULL) + goto error_free; + if (!spa_atou32(str, &port, 0)) + goto error_free; + sdp->dst_port = port; + + if ((str = pw_properties_get(props, "rtp.destination.ip")) == NULL) + goto error_free; + if ((res = parse_address(str, sdp->dst_port, &sdp->dst_addr, &sdp->dst_len)) < 0) { + pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); + goto error_free; + } + sdp->ttl = pw_properties_get_int32(props, "rtp.ttl", DEFAULT_TTL); + sdp->payload = pw_properties_get_int32(props, "rtp.payload", 127); + + if ((str = pw_properties_get(props, "rtp.media")) != NULL) + sdp->media_type = strdup(str); + if ((str = pw_properties_get(props, "rtp.mime")) != NULL) + sdp->mime_type = strdup(str); + if ((str = pw_properties_get(props, "rtp.rate")) != NULL) + sdp->rate = atoi(str); + if ((str = pw_properties_get(props, "rtp.channels")) != NULL) + sdp->channels = atoi(str); + if ((str = pw_properties_get(props, "rtp.ts-offset")) != NULL) + sdp->ts_offset = atoi(str); + if ((str = pw_properties_get(props, "rtp.ts-refclk")) != NULL) + sdp->ts_refclk = strdup(str); + if ((str = pw_properties_get(props, "rtp.channel-names")) != NULL) + snprintf(sdp->channelmap, sizeof(sdp->channelmap), "%s", str); + + pw_log_info("created new session for node:%u", node->id); + node->session = sess; + sess->node = node; + + sess->impl = impl; + spa_list_append(&impl->sessions, &sess->link); + impl->n_sessions++; + + return sess; + +error_free: + pw_log_warn("invalid session props"); + session_free(sess); + return NULL; +} + +static int session_load_source(struct session *session, struct pw_properties *props) +{ + struct impl *impl = session->impl; + struct pw_context *context = pw_impl_module_get_context(impl->module); + FILE *f; + char *args; + size_t size; + const char *str, *media; + + if ((f = open_memstream(&args, &size)) == NULL) { + pw_log_error("Can't open memstream: %m"); + return -errno; + } + + if ((str = pw_properties_get(props, "rtp.destination.ip")) != NULL) + pw_properties_set(props, "source.ip", str); + if ((str = pw_properties_get(props, "rtp.destination.port")) != NULL) + pw_properties_set(props, "source.port", str); + if ((str = pw_properties_get(props, "rtp.session")) != NULL) + pw_properties_set(props, "sess.name", str); + + if ((media = pw_properties_get(props, "rtp.media")) == NULL) + media = "audio"; + + if (spa_streq(media, "audio")) { + const char *mime; + const struct format_info *format_info; + + if ((mime = pw_properties_get(props, "rtp.mime")) == NULL) { + pw_log_error("missing rtp.mime property"); + return -EINVAL; + } + format_info = find_audio_format_info(mime); + if (format_info == NULL) { + pw_log_error("unknown rtp.mime type %s", mime); + return -EINVAL; + } + pw_properties_set(props, "rtp.media", format_info->media_type); + if (format_info->format_str != NULL) { + pw_properties_set(props, "audio.format", format_info->format_str); + if ((str = pw_properties_get(props, "rtp.rate")) != NULL) + pw_properties_set(props, "audio.rate", str); + if ((str = pw_properties_get(props, "rtp.channels")) != NULL) + pw_properties_set(props, "audio.channels", str); + } + } else { + pw_log_error("Unhandled media %s", media); + return -EINVAL; + } + if ((str = pw_properties_get(props, "rtp.ts-offset")) != NULL) + pw_properties_set(props, "sess.ts-offset", str); + + fprintf(f, "{"); + fprintf(f, " stream.props = {"); + pw_properties_serialize_dict(f, &props->dict, 0); + fprintf(f, " }"); + fprintf(f, "}"); + fclose(f); + + pw_log_info("loading new RTP source"); + session->module = pw_context_load_module(context, + "libpipewire-module-rtp-source", + args, NULL); + free(args); + + if (session->module == NULL) { + pw_log_error("Can't load module: %m"); + return -errno; + } + return 0; +} + +struct match_info { + struct impl *impl; + struct session *session; + struct node *node; + struct pw_properties *props; + bool matched; +}; + +static int rule_matched(void *data, const char *location, const char *action, + const char *str, size_t len) +{ + struct match_info *i = data; + int res = 0; + + i->matched = true; + if (i->session && spa_streq(action, "create-stream")) { + pw_properties_update_string(i->props, str, len); + + session_load_source(i->session, i->props); + } + else if (i->node && spa_streq(action, "announce-stream")) { + struct pw_properties *props; + + if ((props = pw_properties_new_dict(i->node->info->props)) == NULL) + return -errno; + + pw_properties_update_string(props, str, len); + + session_new_announce(i->impl, i->node, props); + } + return res; +} + +static struct session *session_new(struct impl *impl, const struct sdp_info *info) +{ + struct session *session; + struct pw_properties *props; + const char *str; + char dst_addr[64]; + + if (impl->n_sessions >= MAX_SESSIONS) { + pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); + errno = EMFILE; + return NULL; + } + + session = calloc(1, sizeof(struct session)); + if (session == NULL) + return NULL; + + session->info = *info; + session->announce = false; + + props = pw_properties_new(NULL, NULL); + if (props == NULL) + goto error; + + session->impl = impl; + spa_list_append(&impl->sessions, &session->link); + impl->n_sessions++; + + pw_properties_set(props, "rtp.origin", info->origin); + if (info->session_name != NULL) { + pw_properties_set(props, "rtp.session", info->session_name); + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", + info->session_name); + pw_properties_setf(props, PW_KEY_NODE_NAME, "%s", + info->session_name); + } else { + pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream"); + } + + get_ip(&info->dst_addr, dst_addr, sizeof(dst_addr)); + pw_properties_setf(props, "rtp.destination.ip", "%s", dst_addr); + pw_properties_setf(props, "rtp.destination.port", "%u", info->dst_port); + pw_properties_setf(props, "rtp.payload", "%u", info->payload); + pw_properties_setf(props, "rtp.media", "%s", info->media_type); + pw_properties_setf(props, "rtp.mime", "%s", info->mime_type); + pw_properties_setf(props, "rtp.rate", "%u", info->rate); + pw_properties_setf(props, "rtp.channels", "%u", info->channels); + + pw_properties_setf(props, "rtp.ts-offset", "%u", info->ts_offset); + pw_properties_set(props, "rtp.ts-refclk", info->ts_refclk); + + if (info->channelmap[0]) + pw_properties_set(props, PW_KEY_NODE_CHANNELNAMES, info->channelmap); + + if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { + struct match_info minfo = { + .impl = impl, + .session = session, + .props = props, + }; + pw_conf_match_rules(str, strlen(str), NAME, &props->dict, + rule_matched, &minfo); + } + session->props = props; + + return NULL; +error: + session_free(session); + return NULL; +} + +static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) +{ + int res; + + c[strcspn(c, "/")] = 0; + if (spa_strstartswith(c, "c=IN IP4 ")) { + struct sockaddr_in *sa = (struct sockaddr_in*) &info->dst_addr; + + c += strlen("c=IN IP4 "); + if (inet_pton(AF_INET, c, &sa->sin_addr) <= 0) { + res = -errno; + pw_log_warn("inet_pton(%s) failed: %m", c); + goto error; + } + sa->sin_family = AF_INET; + info->dst_len = sizeof(struct sockaddr_in); + } + else if (spa_strstartswith(c, "c=IN IP6 ")) { + struct sockaddr_in6 *sa = (struct sockaddr_in6*) &info->dst_addr; + + c += strlen("c=IN IP6 "); + if (inet_pton(AF_INET6, c, &sa->sin6_addr) <= 0) { + res = -errno; + pw_log_warn("inet_pton(%s) failed: %m", c); + goto error; + } + + sa->sin6_family = AF_INET6; + info->dst_len = sizeof(struct sockaddr_in6); + } else + return -EINVAL; + + + res= 0; +error: + return res; +} + +static int parse_sdp_m(struct impl *impl, char *c, struct sdp_info *info) +{ + int port, payload; + char media_type[12]; + + if (!spa_strstartswith(c, "m=")) + return -EINVAL; + + c += strlen("m="); + if (sscanf(c, "%11s %i RTP/AVP %i", media_type, &port, &payload) != 3) + return -EINVAL; + + if (port <= 0 || port > 0xFFFF) + return -EINVAL; + + if (payload < 0 || payload > 127) + return -EINVAL; + + info->media_type = strdup(media_type); + info->dst_port = (uint16_t) port; + info->payload = (uint8_t) payload; + + return 0; +} + +/* some AES67 devices have channelmap encoded in i=* + * if `i` record is found, it matches the template + * and channel count matches, name the channels respectively + * `i=2 channels: 01, 08` is the format */ +static int parse_sdp_i(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!strstr(c, " channels: ")) { + return 0; + } + + c += strlen("i="); + c[strcspn(c, " ")] = '\0'; + + uint32_t channels; + if (sscanf(c, "%u", &channels) != 1 || channels <= 0 || channels > SPA_AUDIO_MAX_CHANNELS) + return 0; + + c += strcspn(c, "\0"); + c += strlen(" channels: "); + + strncpy(info->channelmap, c, sizeof(info->channelmap) - 1); + + return 0; +} + +static int parse_sdp_a_rtpmap(struct impl *impl, char *c, struct sdp_info *info) +{ + int payload, len, rate, channels; + + if (!spa_strstartswith(c, "a=rtpmap:")) + return 0; + + c += strlen("a=rtpmap:"); + + if (sscanf(c, "%i %n", &payload, &len) != 1) + return -EINVAL; + + if (payload < 0 || payload > 127) + return -EINVAL; + + if (payload != info->payload) + return 0; + + c += len; + c[strcspn(c, "/")] = 0; + info->mime_type = strdup(c); + c += strlen(c) + 1; + + if (sscanf(c, "%u/%u", &rate, &channels) == 2) { + info->channels = channels; + info->rate = rate; + } else if (sscanf(c, "%u", &rate) == 1) { + info->rate = rate; + info->channels = 1; + } else + return -EINVAL; + + pw_log_debug("rate: %d, ch: %d", info->rate, info->channels); + + return 0; +} + +static int parse_sdp_a_mediaclk(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!spa_strstartswith(c, "a=mediaclk:")) + return 0; + + c += strlen("a=mediaclk:"); + + if (spa_strstartswith(c, "direct=")) { + int offset; + c += strlen("direct="); + if (sscanf(c, "%i", &offset) != 1) + return -EINVAL; + info->ts_offset = offset; + } else if (spa_strstartswith(c, "sender")) { + info->ts_offset = 0; + } + return 0; +} + +static int parse_sdp_a_ts_refclk(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!spa_strstartswith(c, "a=ts-refclk:")) + return 0; + + c += strlen("a=ts-refclk:"); + info->ts_refclk = strdup(c); + return 0; +} + +static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) +{ + char *s = sdp; + int count = 0, res = 0; + size_t l; + + while (*s) { + if ((l = strcspn(s, "\r\n")) < 2) + goto too_short; + + s[l] = 0; + pw_log_debug("%d: %s", count, s); + + if (count++ == 0 && strcmp(s, "v=0") != 0) + goto invalid_version; + + if (spa_strstartswith(s, "o=")) + info->origin = strdup(&s[2]); + else if (spa_strstartswith(s, "s=")) + info->session_name = strdup(&s[2]); + else if (spa_strstartswith(s, "c=")) + res = parse_sdp_c(impl, s, info); + else if (spa_strstartswith(s, "m=")) + res = parse_sdp_m(impl, s, info); + else if (spa_strstartswith(s, "a=rtpmap:")) + res = parse_sdp_a_rtpmap(impl, s, info); + else if (spa_strstartswith(s, "a=mediaclk:")) + res = parse_sdp_a_mediaclk(impl, s, info); + else if (spa_strstartswith(s, "a=ts-refclk:")) + res = parse_sdp_a_ts_refclk(impl, s, info); + else if (spa_strstartswith(s, "i=")) + res = parse_sdp_i(impl, s, info); + + if (res < 0) + goto error; + s += l + 1; + while (isspace(*s)) + s++; + } + if (((struct sockaddr*) &info->dst_addr)->sa_family == AF_INET) + ((struct sockaddr_in*) &info->dst_addr)->sin_port = htons(info->dst_port); + else + ((struct sockaddr_in6*) &info->dst_addr)->sin6_port = htons(info->dst_port); + + return 0; +too_short: + pw_log_warn("SDP: line starting with `%.6s...' too short", s); + return -EINVAL; +invalid_version: + pw_log_warn("SDP: invalid first version line `%*s'", (int)l, s); + return -EINVAL; +error: + pw_log_warn("SDP: error: %s", spa_strerror(res)); + return res; +} + +static int parse_sap(struct impl *impl, void *data, size_t len) +{ + struct sap_header *header; + char *mime, *sdp; + struct sdp_info info; + struct session *sess; + int res; + size_t offs; + bool bye; + + if (len < 8) + return -EINVAL; + + header = (struct sap_header*) data; + if (header->v != 1) + return -EINVAL; + + if (header->e) + return -ENOTSUP; + if (header->c) + return -ENOTSUP; + + offs = header->a ? 12 : 8; + offs += header->auth_len * 4; + if (len <= offs) + return -EINVAL; + + mime = SPA_PTROFF(data, offs, char); + if (spa_strstartswith(mime, "v=0")) { + sdp = mime; + mime = SAP_MIME_TYPE; + } else if (spa_streq(mime, SAP_MIME_TYPE)) + sdp = SPA_PTROFF(mime, strlen(mime)+1, char); + else + return -EINVAL; + + pw_log_debug("got SAP: %s %s", mime, sdp); + + spa_zero(info); + if ((res = parse_sdp(impl, sdp, &info)) < 0) + return res; + + bye = header->t; + + sess = session_find(impl, &info); + if (sess == NULL) { + if (!bye) + session_new(impl, &info); + } else { + if (bye) + session_free(sess); + else + session_touch(sess); + } + return res; +} + +static void +on_sap_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + + if (mask & SPA_IO_IN) { + uint8_t buffer[2048]; + ssize_t len; + + if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) { + pw_log_warn("recv error: %m"); + return; + } + if ((size_t)len >= sizeof(buffer)) + return; + + buffer[len] = 0; + parse_sap(impl, buffer, len); } } @@ -416,7 +1098,8 @@ static int start_sap_announce(struct impl *impl) if ((fd = make_socket(&impl->src_addr, impl->src_len, &impl->sap_addr, impl->sap_len, - impl->mcast_loop, impl->ttl)) < 0) + impl->mcast_loop, impl->ttl, + impl->ifname)) < 0) return fd; impl->sap_fd = fd; @@ -434,93 +1117,26 @@ static int start_sap_announce(struct impl *impl) interval.tv_nsec = 0; pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); + pw_log_info("starting SAP listener"); + impl->sap_source = pw_loop_add_io(impl->loop, fd, + SPA_IO_IN, false, on_sap_io, impl); + if (impl->sap_source == NULL) { + res = -errno; + goto error; + } + return 0; error: close(fd); return res; } -static struct session *session_create(struct impl *impl, struct node *node) -{ - struct session *sess = NULL; - const char *str; - uint32_t port; - int res; - - sess = calloc(1, sizeof(struct session)); - if (sess == NULL) - return NULL; - - sess->impl = impl; - sess->node = node; - sess->msg_id_hash = rand(); - sess->ntp = (uint32_t) time(NULL) + 2208988800U; - - sess->props = pw_properties_new_dict(node->info->props); - if (sess->props == NULL) - goto error_free; - - if ((str = pw_properties_get(sess->props, "rtp.session")) != NULL) - sess->session_name = strdup(str); - - if ((str = pw_properties_get(sess->props, "rtp.destination.port")) == NULL) - goto error_free; - if (!spa_atou32(str, &port, 0)) - goto error_free; - sess->dst_port = port; - - if ((str = pw_properties_get(sess->props, "rtp.destination.ip")) == NULL) - goto error_free; - if ((res = parse_address(str, sess->dst_port, &sess->dst_addr, &sess->dst_len)) < 0) { - pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); - goto error_free; - } - sess->ttl = pw_properties_get_int32(sess->props, "rtp.ttl", DEFAULT_TTL); - sess->payload = pw_properties_get_int32(sess->props, "rtp.payload", 127); - - if ((str = pw_properties_get(sess->props, "rtp.media")) != NULL) - sess->media_type = strdup(str); - if ((str = pw_properties_get(sess->props, "rtp.mime")) != NULL) - sess->mime_type = strdup(str); - if ((str = pw_properties_get(sess->props, "rtp.rate")) != NULL) - sess->rate = atoi(str); - if ((str = pw_properties_get(sess->props, "rtp.channels")) != NULL) - sess->channels = atoi(str); - if ((str = pw_properties_get(sess->props, "rtp.ts-offset")) != NULL) - sess->ts_offset = atoi(str); - if ((str = pw_properties_get(sess->props, "rtp.ts-refclk")) != NULL) - sess->ts_refclk = strdup(str); - - spa_list_append(&impl->sessions, &sess->link); - return sess; - -error_free: - pw_log_warn("invalid session props"); - pw_properties_free(sess->props); - free(sess->session_name); - free(sess); - return NULL; -} - -static void session_free(struct session *sess) -{ - struct impl *impl = sess->impl; - - send_sap(impl, sess, 1); - - spa_list_remove(&sess->link); - - free(sess->session_name); - free(sess); -} - static void node_event_info(void *data, const struct pw_node_info *info) { struct node *n = data; + struct impl *impl = n->impl; const char *str; - pw_log_info("node %d added %p", n->id, n->session); - if (n->session != NULL || info == NULL) return; @@ -528,13 +1144,16 @@ static void node_event_info(void *data, const struct pw_node_info *info) if (n->info == NULL) return; - spa_debug_dict(0, info->props); + pw_log_debug("node %d changed", n->id); - if ((str = spa_dict_lookup(info->props, "sess.sap.announce")) == NULL || - !pw_properties_parse_bool(str)) - return; - - session_create(n->impl, n); + if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { + struct match_info minfo = { + .impl = impl, + .node = n, + }; + pw_conf_match_rules(str, strlen(str), NAME, n->info->props, + rule_matched, &minfo); + } } static const struct pw_node_events node_events = { @@ -630,6 +1249,8 @@ static void impl_destroy(struct impl *impl) if (impl->timer) pw_loop_destroy_source(impl->loop, impl->timer); + if (impl->sap_source) + pw_loop_destroy_source(impl->loop, impl->sap_source); if (impl->sap_fd != -1) close(impl->sap_fd); @@ -711,10 +1332,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_error("invalid sap.ip %s: %s", str, spa_strerror(res)); goto out; } + impl->cleanup_interval = pw_properties_get_uint32(impl->props, + "sap.cleanup.sec", DEFAULT_CLEANUP_SEC); if ((str = pw_properties_get(props, "source.ip")) == NULL) str = DEFAULT_SOURCE_IP; - if ((res = parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) { + if ((res = parse_address(str, port, &impl->src_addr, &impl->src_len)) < 0) { pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res)); goto out; } diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 2715da9d0..bec993d1b 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -27,7 +27,6 @@ #include - /** \page page_module_rtp_sink PipeWire Module: RTP sink * * The `rtp-sink` module creates a PipeWire sink that sends audio @@ -74,10 +73,10 @@ * context.modules = [ * { name = libpipewire-module-rtp-sink * args = { + * #local.ifname = "eth0" * #source.ip = "0.0.0.0" * #destination.ip = "224.0.0.56" * #destination.port = 46000 - * #local.ifname = "eth0" * #net.mtu = 1280 * #net.ttl = 1 * #net.loop = false @@ -105,15 +104,9 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define SAP_INTERVAL_SEC 5 -#define SAP_MIME_TYPE "application/sdp" - #define BUFFER_SIZE (1u<<20) #define BUFFER_MASK (BUFFER_SIZE-1) -#define DEFAULT_SAP_IP "224.0.0.56" -#define DEFAULT_SAP_PORT 9875 - #define DEFAULT_SESS_MEDIA "audio" #define DEFAULT_FORMAT "S16BE" @@ -135,6 +128,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define USAGE "source.ip= " \ "destination.ip= " \ + "destination.port= " \ "local.ifname= " \ "net.mtu= " \ "net.ttl= " \ @@ -214,9 +208,9 @@ struct impl { int rtp_fd; unsigned sync:1; + unsigned apple_midi:1; }; - static void stream_destroy(void *d) { struct impl *impl = d; @@ -366,7 +360,7 @@ static void stream_audio_process(struct impl *impl) } } if (!impl->sync) { - pw_log_info("sync to timestamp %u", timestamp); + pw_log_info("sync to timestamp %u ts_offset:%u", timestamp, impl->ts_offset); impl->ring.readindex = impl->ring.writeindex = timestamp; memset(impl->buffer, 0, BUFFER_SIZE); impl->sync = true; @@ -487,7 +481,6 @@ static void flush_midi_packets(struct impl *impl, struct spa_pod_sequence *seque static void send_cmd(struct impl *impl) { -// struct rtp_header header; uint8_t buffer[16]; struct iovec iov[3]; struct msghdr msg; @@ -541,8 +534,8 @@ static void stream_midi_process(void *data) if (!impl->sync) { pw_log_info("sync to timestamp %u", timestamp); impl->sync = true; - - send_cmd(impl); + if (impl->apple_midi) + send_cmd(impl); } flush_midi_packets(impl, (struct spa_pod_sequence*)pod, timestamp); @@ -962,6 +955,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, PW_KEY_NODE_GROUP); copy_props(impl, props, PW_KEY_NODE_LATENCY); copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); copy_props(impl, props, PW_KEY_MEDIA_NAME); copy_props(impl, props, PW_KEY_MEDIA_CLASS); diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 2159164ed..b6254b1ce 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -19,13 +19,15 @@ #include #include #include +#include #include #include +#include +#include #include #include -#include #include #ifdef __FreeBSD__ @@ -35,63 +37,52 @@ /** \page page_module_rtp_source PipeWire Module: RTP source * * The `rtp-source` module creates a PipeWire source that receives audio - * RTP packets. + * and midi RTP packets. * * ## Module Options * * Options specific to the behavior of this module * - * - `sap.ip = `: IP address of the SAP messages, default "224.0.0.56" - * - `sap.port = `: port of the SAP messages, default 9875 * - `local.ifname = `: interface name to use + * - `node.always-process = `: true to receive even when not running * - `sess.latency.msec = `: target network latency in milliseconds, default 100 + * - `rtp.media = `: the media type audio|midi, default audio * - `stream.props = {}`: properties to be passed to the stream * * ## General options * * Options with well-known behavior: * - * - \ref PW_KEY_NODE_NAME - * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_REMOTE_NAME + * - \ref PW_KEY_AUDIO_FORMAT + * - \ref PW_KEY_AUDIO_RATE + * - \ref PW_KEY_AUDIO_CHANNELS + * - \ref SPA_KEY_AUDIO_POSITION * - \ref PW_KEY_MEDIA_NAME * - \ref PW_KEY_MEDIA_CLASS + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL * * ## Example configuration *\code{.unparsed} * context.modules = [ * { name = libpipewire-module-rtp-source * args = { - * #sap.ip = 224.0.0.56 - * #sap.port = 9875 * #local.ifname = eth0 * sess.latency.msec = 100 - * #node.always-process = false # true to receive even when not running + * #node.always-process = false + * #rtp.media = "audio" + * #audio.format = "S16BE" + * #audio.rate = 48000 + * #audio.channels = 2 + * #audio.position = [ FL FR ] * stream.props = { * #media.class = "Audio/Source" - * #node.name = "rtp-source" + * node.name = "rtp-source" * } - * stream.rules = [ - * { matches = [ - * # any of the items in matches needs to match, if one does, - * # actions are emited. - * { # all keys must match the value. ~ in value starts regex. - * #rtp.origin = "wim 3883629975 0 IN IP4 0.0.0.0" - * #rtp.payload = "127" - * #rtp.fmt = "L16/48000/2" - * #rtp.session = "PipeWire RTP Stream on fedora" - * #rtp.ts-offset = 0 - * #rtp.ts-refclk = "private" - * } - * ] - * actions = { - * create-stream = { - * #sess.latency.msec = 100 - * #sess.ts-direct = false - * #target.object = "" - * } - * } - * } - * ] * } * } * ] @@ -105,27 +96,33 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define SAP_MIME_TYPE "application/sdp" - +#define DEFAULT_CLEANUP_SEC 60 #define ERROR_MSEC 2 -#define MAX_SESSIONS 16 -#define DEFAULT_CLEANUP_INTERVAL_SEC 90 -#define DEFAULT_SAP_IP "224.0.0.56" -#define DEFAULT_SAP_PORT 9875 #define DEFAULT_SESS_LATENCY 100 +#define DEFAULT_SOURCE_IP "224.0.0.56" + +#define DEFAULT_FORMAT "S16BE" +#define DEFAULT_RATE 48000 +#define DEFAULT_CHANNELS 2 +#define DEFAULT_POSITION "[ FL FR ]" + #define BUFFER_SIZE (1u<<22) #define BUFFER_MASK (BUFFER_SIZE-1) #define BUFFER_SIZE2 (BUFFER_SIZE>>1) #define BUFFER_MASK2 (BUFFER_SIZE2-1) -#define USAGE "sap.ip= " \ - "sap.port= " \ - "local.ifname= " \ +#define USAGE "local.ifname= " \ + "source.ip= " \ + "source.port= " \ "sess.latency.msec= " \ - "stream.props= { key=value ... } " \ - "stream.rules= " + "rtp.media= " \ + "audio.format= " \ + "audio.rate= " \ + "audio.channels= "\ + "audio.position= " \ + "stream.props= { key=value ... } " static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, @@ -146,90 +143,33 @@ struct impl { struct pw_core *core; struct spa_hook core_listener; struct spa_hook core_proxy_listener; - - struct spa_source *timer; - struct spa_source *sap_source; - - struct pw_properties *stream_props; - unsigned int do_disconnect:1; char *ifname; - char *sap_ip; bool always_process; - int sap_port; int sess_latency_msec; uint32_t cleanup_interval; - struct spa_list sessions; - uint32_t n_sessions; -}; + struct spa_source *timer; -struct format_info { - uint32_t media_subtype; - uint32_t format; - uint32_t size; - const char *mime; - const char *media_type; -}; - -static const struct format_info audio_format_info[] = { - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_U8, 1, "L8", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, - { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, -}; - -static const struct format_info *find_format_info(const char *mime) -{ - SPA_FOR_EACH_ELEMENT_VAR(audio_format_info, f) - if (spa_streq(f->mime, mime)) - return f; - return NULL; -} - -struct sdp_info { - uint16_t hash; - - char origin[128]; - char session[256]; - char channelmap[512]; - - struct sockaddr_storage sa; - socklen_t salen; - - uint16_t port; - uint8_t payload; - - const struct format_info *format_info; struct spa_audio_info info; - uint32_t rate; - uint32_t stride; - - uint32_t ts_offset; - char refclk[64]; -}; - -struct session { - struct impl *impl; - struct spa_list link; - - uint64_t timestamp; - - struct sdp_info info; - - struct spa_source *source; - + struct pw_properties *stream_props; struct pw_stream *stream; struct spa_hook stream_listener; + uint16_t src_port; + struct sockaddr_storage src_addr; + socklen_t src_len; + struct spa_source *source; + + uint32_t rate; + uint32_t stride; uint32_t expected_ssrc; uint16_t expected_seq; unsigned have_ssrc:1; unsigned have_seq:1; unsigned have_sync:1; + uint32_t ts_offset; struct spa_ringbuffer ring; uint8_t buffer[BUFFER_SIZE]; @@ -240,6 +180,7 @@ struct session { double corr; uint32_t target_buffer; float max_error; + unsigned first:1; unsigned receiving:1; unsigned direct_timestamp:1; @@ -248,47 +189,74 @@ struct session { float last_time; }; -static void session_touch(struct session *sess) +struct format_info { + uint32_t media_subtype; + uint32_t format; + uint32_t size; + const char *mime; + const char *media_type; +}; + +static uint32_t audio_get_stride(const struct spa_audio_info *info) { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - sess->timestamp = SPA_TIMESPEC_TO_NSEC(&ts); + uint32_t stride = 0; + + if (info->media_type != SPA_MEDIA_TYPE_audio || + info->media_subtype != SPA_MEDIA_SUBTYPE_raw) + return 0; + + switch (info->info.raw.format) { + case SPA_AUDIO_FORMAT_U8: + case SPA_AUDIO_FORMAT_ALAW: + case SPA_AUDIO_FORMAT_ULAW: + stride = 1; + break; + case SPA_AUDIO_FORMAT_S16_BE: + stride = 2; + break; + case SPA_AUDIO_FORMAT_S24_BE: + stride = 3; + break; + default: + break; + } + return stride * info->info.raw.channels; } -static void process_audio(struct session *sess) +static void process_audio(struct impl *impl) { struct pw_buffer *buf; struct spa_data *d; uint32_t wanted, timestamp, target_buffer, stride, maxsize; int32_t avail; - if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); return; } d = buf->buffer->datas; - stride = sess->info.stride; + stride = impl->stride; maxsize = d[0].maxsize / stride; wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; - if (sess->position && sess->direct_timestamp) { + if (impl->position && impl->direct_timestamp) { /* in direct mode, read directly from the timestamp index, * because sender and receiver are in sync, this would keep * target_buffer of samples available. */ - spa_ringbuffer_read_update(&sess->ring, - sess->position->clock.position); + spa_ringbuffer_read_update(&impl->ring, + impl->position->clock.position); } - avail = spa_ringbuffer_get_read_index(&sess->ring, ×tamp); + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); - target_buffer = sess->target_buffer; + target_buffer = impl->target_buffer; if (avail < (int32_t)wanted) { enum spa_log_level level; memset(d[0].data, 0, wanted * stride); - if (sess->have_sync) { - sess->have_sync = false; + if (impl->have_sync) { + impl->have_sync = false; level = SPA_LOG_LEVEL_WARN; } else { level = SPA_LOG_LEVEL_DEBUG; @@ -297,7 +265,7 @@ static void process_audio(struct session *sess) avail, target_buffer, wanted); } else { float error, corr; - if (sess->first) { + if (impl->first) { if ((uint32_t)avail > target_buffer) { uint32_t skip = avail - target_buffer; pw_log_debug("first: avail:%d skip:%u target:%u", @@ -305,74 +273,75 @@ static void process_audio(struct session *sess) timestamp += skip; avail = target_buffer; } - sess->first = false; + impl->first = false; } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) { pw_log_warn("overrun %u > %u", avail, target_buffer * 8); timestamp += avail - target_buffer; avail = target_buffer; } - if (!sess->direct_timestamp) { + if (!impl->direct_timestamp) { /* when not using direct timestamp and clocks are not * in sync, try to adjust our playback rate to keep the * requested target_buffer bytes in the ringbuffer */ error = (float)target_buffer - (float)avail; - error = SPA_CLAMP(error, -sess->max_error, sess->max_error); + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); - corr = spa_dll_update(&sess->dll, error); + corr = spa_dll_update(&impl->dll, error); pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, target_buffer, error, corr); - if (sess->rate_match) { - SPA_FLAG_SET(sess->rate_match->flags, + if (impl->rate_match) { + SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); - sess->rate_match->rate = 1.0f / corr; + impl->rate_match->rate = 1.0f / corr; } } - spa_ringbuffer_read_data(&sess->ring, - sess->buffer, + spa_ringbuffer_read_data(&impl->ring, + impl->buffer, BUFFER_SIZE, (timestamp * stride) & BUFFER_MASK, d[0].data, wanted * stride); timestamp += wanted; - spa_ringbuffer_read_update(&sess->ring, timestamp); + spa_ringbuffer_read_update(&impl->ring, timestamp); } d[0].chunk->size = wanted * stride; d[0].chunk->stride = stride; d[0].chunk->offset = 0; buf->size = wanted; - pw_stream_queue_buffer(sess->stream, buf); + pw_stream_queue_buffer(impl->stream, buf); } -static void receive_audio(struct session *sess, uint8_t *packet, +static void receive_audio(struct impl *impl, uint8_t *packet, uint32_t timestamp, uint32_t payload_offset, uint32_t len) { uint32_t plen = len - payload_offset; uint8_t *payload = &packet[payload_offset]; - uint32_t stride = sess->info.stride; + uint32_t stride = impl->stride; uint32_t samples = plen / stride; uint32_t write, expected_write; int32_t filled; - filled = spa_ringbuffer_get_write_index(&sess->ring, &expected_write); + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); /* we always write to timestamp + delay */ - write = timestamp + sess->target_buffer; + write = timestamp + impl->target_buffer; - if (!sess->have_sync) { - pw_log_info("sync to timestamp %u direct:%d", write, sess->direct_timestamp); + if (!impl->have_sync) { + pw_log_info("sync to timestamp %u direct:%d ts_offset:%u", + write, impl->direct_timestamp, impl->ts_offset); /* we read from timestamp, keeping target_buffer of data * in the ringbuffer. */ - sess->ring.readindex = timestamp; - sess->ring.writeindex = write; - filled = sess->target_buffer; + impl->ring.readindex = timestamp; + impl->ring.writeindex = write; + filled = impl->target_buffer; - spa_dll_init(&sess->dll); - spa_dll_set_bw(&sess->dll, SPA_DLL_BW_MIN, 128, sess->info.rate); - memset(sess->buffer, 0, BUFFER_SIZE); - sess->have_sync = true; + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + memset(impl->buffer, 0, BUFFER_SIZE); + impl->have_sync = true; } else if (expected_write != write) { pw_log_debug("unexpected write (%u != %u)", write, expected_write); @@ -381,20 +350,20 @@ static void receive_audio(struct session *sess, uint8_t *packet, if (filled + samples > BUFFER_SIZE / stride) { pw_log_debug("capture overrun %u + %u > %u", filled, samples, BUFFER_SIZE / stride); - sess->have_sync = false; + impl->have_sync = false; } else { pw_log_debug("got samples:%u", samples); - spa_ringbuffer_write_data(&sess->ring, - sess->buffer, + spa_ringbuffer_write_data(&impl->ring, + impl->buffer, BUFFER_SIZE, (write * stride) & BUFFER_MASK, payload, (samples * stride)); write += samples; - spa_ringbuffer_write_update(&sess->ring, write); + spa_ringbuffer_write_update(&impl->ring, write); } } -static void process_midi(struct session *sess) +static void process_midi(struct impl *impl) { struct pw_buffer *buf; struct spa_data *d; @@ -405,7 +374,7 @@ static void process_midi(struct session *sess) struct spa_pod *pod; struct spa_pod_control *c; - if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); return; } @@ -415,9 +384,9 @@ static void process_midi(struct session *sess) /* we always use the graph position to select events, the receiver side is * responsible for smoothing out the RTP timestamps to graph time */ - duration = sess->position->clock.duration; - if (sess->position) - timestamp = sess->position->clock.position; + duration = impl->position->clock.duration; + if (impl->position) + timestamp = impl->position->clock.position; else timestamp = 0; @@ -426,11 +395,11 @@ static void process_midi(struct session *sess) spa_pod_builder_push_sequence(&b, &f[0], 0); while (true) { - int32_t avail = spa_ringbuffer_get_read_index(&sess->ring, &read); + int32_t avail = spa_ringbuffer_get_read_index(&impl->ring, &read); if (avail <= 0) break; - ptr = SPA_PTROFF(sess->buffer, read & BUFFER_MASK2, void); + ptr = SPA_PTROFF(impl->buffer, read & BUFFER_MASK2, void); if ((pod = spa_pod_from_data(ptr, avail, 0, avail)) == NULL) goto done; @@ -441,7 +410,7 @@ static void process_midi(struct session *sess) * received packet */ SPA_POD_SEQUENCE_FOREACH((struct spa_pod_sequence*)pod, c) { /* try to render with given delay */ - uint32_t target = c->offset + sess->target_buffer; + uint32_t target = c->offset + impl->target_buffer; if (timestamp != 0) { /* skip old packets */ if (target < timestamp) @@ -460,7 +429,7 @@ static void process_midi(struct session *sess) /* we completed a sequence (one RTP packet), advance ringbuffer * and go to the next packet */ read += SPA_PTRDIFF(c, ptr); - spa_ringbuffer_read_update(&sess->ring, read); + spa_ringbuffer_read_update(&impl->ring, read); } complete: spa_pod_builder_pop(&b, &f[0]); @@ -473,7 +442,7 @@ complete: d[0].chunk->stride = 1; d[0].chunk->offset = 0; done: - pw_stream_queue_buffer(sess->stream, buf); + pw_stream_queue_buffer(impl->stream, buf); } static int parse_varlen(uint8_t *p, uint32_t avail, uint32_t *result) @@ -514,17 +483,17 @@ static int get_midi_size(uint8_t *p, uint32_t avail) return size; } -static double get_time(struct session *sess) +static double get_time(struct impl *impl) { struct timespec ts; double t; clock_gettime(CLOCK_MONOTONIC, &ts); - t = sess->position->clock.position / (double) sess->position->clock.rate.denom; - t += (SPA_TIMESPEC_TO_NSEC(&ts) - sess->position->clock.nsec) / (double)SPA_NSEC_PER_SEC; + t = impl->position->clock.position / (double) impl->position->clock.rate.denom; + t += (SPA_TIMESPEC_TO_NSEC(&ts) - impl->position->clock.nsec) / (double)SPA_NSEC_PER_SEC; return t; } -static void receive_midi(struct session *sess, uint8_t *packet, +static void receive_midi(struct impl *impl, uint8_t *packet, uint32_t timestamp, uint32_t payload_offset, uint32_t plen) { uint32_t write; @@ -536,54 +505,54 @@ static void receive_midi(struct session *sess, uint8_t *packet, uint32_t offs = payload_offset, len, end; bool first = true; - if (sess->direct_timestamp) { + if (impl->direct_timestamp) { /* in direct timestamp we attach the RTP timestamp directly on the * midi events and render them in the corresponding cycle */ - if (!sess->have_sync) { + if (!impl->have_sync) { pw_log_info("sync to timestamp %u/ direct:%d", timestamp, - sess->direct_timestamp); - sess->have_sync = true; + impl->direct_timestamp); + impl->have_sync = true; } } else { /* in non-direct timestamp mode, we relate the graph clock against * the RTP timestamps */ - double ts = timestamp / (float) sess->info.rate; - double t = get_time(sess); + double ts = timestamp / (float) impl->rate; + double t = get_time(impl); double elapsed, estimated, diff; /* the elapsed time between RTP timestamps */ - elapsed = ts - sess->last_timestamp; + elapsed = ts - impl->last_timestamp; /* for that elapsed time, our clock should have advanced * by this amount since the last estimation */ - estimated = sess->last_time + elapsed * sess->corr; + estimated = impl->last_time + elapsed * impl->corr; /* calculate the diff between estimated and current clock time in * samples */ - diff = (estimated - t) * sess->info.rate; + diff = (estimated - t) * impl->rate; /* no sync or we drifted too far, resync */ - if (!sess->have_sync || fabs(diff) > sess->target_buffer) { - sess->corr = 1.0; - spa_dll_set_bw(&sess->dll, SPA_DLL_BW_MIN, 256, sess->info.rate); + if (!impl->have_sync || fabs(diff) > impl->target_buffer) { + impl->corr = 1.0; + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256, impl->rate); pw_log_info("sync to timestamp %u/%f direct:%d", timestamp, t, - sess->direct_timestamp); - sess->have_sync = true; - sess->ring.readindex = sess->ring.writeindex; + impl->direct_timestamp); + impl->have_sync = true; + impl->ring.readindex = impl->ring.writeindex; } else { /* update our new rate correction */ - sess->corr = spa_dll_update(&sess->dll, diff); + impl->corr = spa_dll_update(&impl->dll, diff); /* our current time is now the estimated time */ t = estimated; } - pw_log_debug("%f %f %f %f", t, estimated, diff, sess->corr); + pw_log_debug("%f %f %f %f", t, estimated, diff, impl->corr); - timestamp = t * sess->info.rate; + timestamp = t * impl->rate; - sess->last_timestamp = ts; - sess->last_time = t; + impl->last_timestamp = ts; + impl->last_time = t; } - filled = spa_ringbuffer_get_write_index(&sess->ring, &write); + filled = spa_ringbuffer_get_write_index(&impl->ring, &write); if (filled > (int32_t)BUFFER_SIZE2) return; @@ -597,7 +566,7 @@ static void receive_midi(struct session *sess, uint8_t *packet, if (end > plen) return; - ptr = SPA_PTROFF(sess->buffer, write & BUFFER_MASK2, void); + ptr = SPA_PTROFF(impl->buffer, write & BUFFER_MASK2, void); /* each packet is written as a sequence of events. The offset is * the RTP timestamp */ @@ -613,7 +582,7 @@ static void receive_midi(struct session *sess, uint8_t *packet, else offs += parse_varlen(&packet[offs], end - offs, &delta); - timestamp += delta * sess->corr; + timestamp += delta * impl->corr; spa_pod_builder_control(&b, timestamp, SPA_CONTROL_Midi); size = get_midi_size(&packet[offs], end - offs); @@ -632,38 +601,38 @@ static void receive_midi(struct session *sess, uint8_t *packet, spa_pod_builder_pop(&b, &f[0]); write += b.state.offset; - spa_ringbuffer_write_update(&sess->ring, write); + spa_ringbuffer_write_update(&impl->ring, write); } static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) { - struct session *sess = data; + struct impl *impl = data; switch (id) { case SPA_IO_RateMatch: - sess->rate_match = area; + impl->rate_match = area; break; case SPA_IO_Position: - sess->position = area; + impl->position = area; break; } } static void stream_destroy(void *d) { - struct session *sess = d; - spa_hook_remove(&sess->stream_listener); - sess->stream = NULL; + struct impl *impl = d; + spa_hook_remove(&impl->stream_listener); + impl->stream = NULL; } static void stream_process(void *data) { - struct session *sess = data; - switch (sess->info.info.media_type) { + struct impl *impl = data; + switch (impl->info.media_type) { case SPA_MEDIA_TYPE_audio: - process_audio(sess); + process_audio(impl); break; case SPA_MEDIA_TYPE_application: - process_midi(sess); + process_midi(impl); break; } } @@ -671,7 +640,7 @@ static void stream_process(void *data) static void on_rtp_io(void *data, int fd, uint32_t mask) { - struct session *sess = data; + struct impl *impl = data; struct rtp_header *hdr; ssize_t len, hlen; uint8_t buffer[2048]; @@ -694,29 +663,29 @@ on_rtp_io(void *data, int fd, uint32_t mask) if (hlen > len) goto invalid_len; - if (sess->have_ssrc && sess->expected_ssrc != hdr->ssrc) + if (impl->have_ssrc && impl->expected_ssrc != hdr->ssrc) goto unexpected_ssrc; - sess->expected_ssrc = hdr->ssrc; - sess->have_ssrc = true; + impl->expected_ssrc = hdr->ssrc; + impl->have_ssrc = true; seq = ntohs(hdr->sequence_number); - if (sess->have_seq && sess->expected_seq != seq) { - pw_log_info("unexpected seq (%d != %d)", seq, sess->expected_seq); - sess->have_sync = false; + if (impl->have_seq && impl->expected_seq != seq) { + pw_log_info("unexpected seq (%d != %d)", seq, impl->expected_seq); + impl->have_sync = false; } - sess->expected_seq = seq + 1; - sess->have_seq = true; + impl->expected_seq = seq + 1; + impl->have_seq = true; - timestamp = ntohl(hdr->timestamp) - sess->info.ts_offset; + timestamp = ntohl(hdr->timestamp) - impl->ts_offset; - switch (sess->info.info.media_type) { + switch (impl->info.media_type) { case SPA_MEDIA_TYPE_audio: - receive_audio(sess, buffer, timestamp, hlen, len); + receive_audio(impl, buffer, timestamp, hlen, len); break; case SPA_MEDIA_TYPE_application: - receive_midi(sess, buffer, timestamp, hlen, len); + receive_midi(impl, buffer, timestamp, hlen, len); } - sess->receiving = true; + impl->receiving = true; } return; @@ -728,16 +697,37 @@ short_packet: return; invalid_version: pw_log_warn("invalid RTP version"); + spa_debug_mem(0, buffer, len); return; invalid_len: pw_log_warn("invalid RTP length"); return; unexpected_ssrc: pw_log_warn("unexpected SSRC (expected %u != %u)", - sess->expected_ssrc, hdr->ssrc); + impl->expected_ssrc, hdr->ssrc); return; } +static int parse_address(const char *address, uint16_t port, + struct sockaddr_storage *addr, socklen_t *len) +{ + struct sockaddr_in *sa4 = (struct sockaddr_in*)addr; + struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)addr; + + if (inet_pton(AF_INET, address, &sa4->sin_addr) > 0) { + sa4->sin_family = AF_INET; + sa4->sin_port = htons(port); + *len = sizeof(*sa4); + } else if (inet_pton(AF_INET6, address, &sa6->sin6_addr) > 0) { + sa6->sin6_family = AF_INET6; + sa6->sin6_port = htons(port); + *len = sizeof(*sa6); + } else + return -EINVAL; + + return 0; +} + static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) { int af, fd, val, res; @@ -815,60 +805,29 @@ error: return res; } -static uint32_t msec_to_samples(struct sdp_info *info, uint32_t msec) +static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) { - return msec * info->rate / 1000; + return msec * impl->rate / 1000; } -static void session_free(struct session *sess) +static int stream_start(struct impl *impl) { - if (sess->impl) { - pw_log_info("free session %s %s", sess->info.origin, sess->info.session); - sess->impl->n_sessions--; - spa_list_remove(&sess->link); - } - if (sess->stream) - pw_stream_destroy(sess->stream); - if (sess->source) - pw_loop_destroy_source(sess->impl->data_loop, sess->source); - free(sess); -} - -struct session_info { - struct session *session; - struct pw_properties *props; - bool matched; -}; - -static int rule_matched(void *data, const char *location, const char *action, - const char *str, size_t len) -{ - struct session_info *i = data; - int res = 0; - - i->matched = true; - if (spa_streq(action, "create-stream")) { - pw_properties_update_string(i->props, str, len); - } - return res; -} - -static int session_start(struct impl *impl, struct session *session) { int fd; - if (session->source) - return 0; + + if (impl->source != NULL) + return 0; pw_log_info("starting RTP listener"); - if ((fd = make_socket((const struct sockaddr *)&session->info.sa, - session->info.salen, impl->ifname)) < 0) { + if ((fd = make_socket((const struct sockaddr *)&impl->src_addr, + impl->src_len, impl->ifname)) < 0) { pw_log_error("failed to create socket: %m"); return fd; } - session->source = pw_loop_add_io(impl->data_loop, fd, - SPA_IO_IN, true, on_rtp_io, session); - if (session->source == NULL) { + impl->source = pw_loop_add_io(impl->data_loop, fd, + SPA_IO_IN, true, on_rtp_io, impl); + if (impl->source == NULL) { pw_log_error("can't create io source: %m"); close(fd); return -errno; @@ -876,25 +835,21 @@ static int session_start(struct impl *impl, struct session *session) { return 0; } -static void session_stop(struct impl *impl, struct session *session) { - if (!session->source) +static void stream_stop(struct impl *impl) +{ + if (!impl->source) return; pw_log_info("stopping RTP listener"); - pw_loop_destroy_source( - session->impl->data_loop, - session->source - ); - - session->source = NULL; + pw_loop_destroy_source(impl->data_loop, impl->source); + impl->source = NULL; } static void on_stream_state_changed(void *d, enum pw_stream_state old, enum pw_stream_state state, const char *error) { - struct session *sess = d; - struct impl *impl = sess->impl; + struct impl *impl = d; switch (state) { case PW_STREAM_STATE_UNCONNECTED: @@ -905,12 +860,12 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, pw_log_error("stream error: %s", error); break; case PW_STREAM_STATE_STREAMING: - if ((errno = -session_start(impl, sess)) < 0) + if ((errno = -stream_start(impl)) < 0) pw_log_error("failed to start RTP stream: %m"); break; case PW_STREAM_STATE_PAUSED: if (!impl->always_process) - session_stop(impl, sess); + stream_stop(impl); break; default: break; @@ -925,28 +880,16 @@ static const struct pw_stream_events out_stream_events = { .process = stream_process }; -static int session_new(struct impl *impl, struct sdp_info *info) +static int setup_stream(struct impl *impl) { - struct session *session; const struct spa_pod *params[1]; struct spa_pod_builder b; uint32_t n_params; uint8_t buffer[1024]; struct pw_properties *props; - int res, sess_latency_msec; - const char *str; + int res; - if (impl->n_sessions >= MAX_SESSIONS) { - pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); - return -EMFILE; - } - - session = calloc(1, sizeof(struct session)); - if (session == NULL) - return -errno; - - session->info = *info; - session->first = true; + impl->first = true; props = pw_properties_copy(impl->stream_props); if (props == NULL) { @@ -954,79 +897,29 @@ static int session_new(struct impl *impl, struct sdp_info *info) goto error; } - pw_properties_set(props, "rtp.origin", info->origin); - pw_properties_setf(props, "rtp.payload", "%u", info->payload); - pw_properties_setf(props, "rtp.fmt", "%s/%u/%u", info->format_info->mime, - info->rate, info->info.info.raw.channels); - if (info->session[0]) { - pw_properties_set(props, "rtp.session", info->session); - pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", - info->session); - pw_properties_setf(props, PW_KEY_NODE_NAME, "%s", - info->session); - } else { - pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream"); - } - pw_properties_setf(props, "rtp.ts-offset", "%u", info->ts_offset); - pw_properties_set(props, "rtp.ts-refclk", info->refclk); + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + impl->corr = 1.0; - if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { - struct session_info sinfo = { - .session = session, - .props = props, - }; - pw_conf_match_rules(str, strlen(str), NAME, &props->dict, - rule_matched, &sinfo); - - if (!sinfo.matched) { - res = 0; - pw_log_info("session '%s' was not matched", info->session); - goto error; - } - } - session->direct_timestamp = pw_properties_get_bool(props, "sess.ts-direct", false); - - pw_log_info("new session %s %s direct:%d", info->origin, info->session, - session->direct_timestamp); - - sess_latency_msec = pw_properties_get_uint32(props, - "sess.latency.msec", impl->sess_latency_msec); - - session->target_buffer = msec_to_samples(info, sess_latency_msec); - session->max_error = msec_to_samples(info, ERROR_MSEC); - - pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", info->rate); - pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", - session->target_buffer / 2, info->rate); - - spa_dll_init(&session->dll); - spa_dll_set_bw(&session->dll, SPA_DLL_BW_MIN, 128, info->rate); - session->corr = 1.0; - - if (info->channelmap[0]) { - pw_properties_set(props, PW_KEY_NODE_CHANNELNAMES, info->channelmap); - pw_log_info("channelmap: %s", info->channelmap); - } - - session->stream = pw_stream_new(impl->core, + impl->stream = pw_stream_new(impl->core, "rtp-source playback", props); - if (session->stream == NULL) { + if (impl->stream == NULL) { res = -errno; pw_log_error("can't create stream: %m"); goto error; } - pw_stream_add_listener(session->stream, - &session->stream_listener, - &out_stream_events, session); + pw_stream_add_listener(impl->stream, + &impl->stream_listener, + &out_stream_events, impl); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); - switch (info->info.media_type) { + switch (impl->info.media_type) { case SPA_MEDIA_TYPE_audio: params[n_params++] = spa_format_audio_build(&b, - SPA_PARAM_EnumFormat, &info->info); + SPA_PARAM_EnumFormat, &impl->info); break; case SPA_MEDIA_TYPE_application: params[n_params++] = spa_pod_builder_add_object(&b, @@ -1038,7 +931,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) return -EINVAL; } - if ((res = pw_stream_connect(session->stream, + if ((res = pw_stream_connect(impl->stream, PW_DIRECTION_OUTPUT, PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | @@ -1050,408 +943,25 @@ static int session_new(struct impl *impl, struct sdp_info *info) } if (impl->always_process && - (res = session_start(impl, session)) < 0) + (res = stream_start(impl)) < 0) goto error; - session_touch(session); - - session->impl = impl; - spa_list_append(&impl->sessions, &session->link); - impl->n_sessions++; - return 0; error: - session_free(session); return res; } -static struct session *session_find(struct impl *impl, struct sdp_info *info) -{ - struct session *sess; - spa_list_for_each(sess, &impl->sessions, link) { - if (info->hash == sess->info.hash && - spa_streq(info->origin, sess->info.origin)) - return sess; - } - return NULL; -} - -static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) -{ - int res; - - c[strcspn(c, "/")] = 0; - if (spa_strstartswith(c, "c=IN IP4 ")) { - struct sockaddr_in *sa = (struct sockaddr_in*) &info->sa; - - c += strlen("c=IN IP4 "); - if (inet_pton(AF_INET, c, &sa->sin_addr) <= 0) { - res = -errno; - pw_log_warn("inet_pton(%s) failed: %m", c); - goto error; - } - sa->sin_family = AF_INET; - info->salen = sizeof(struct sockaddr_in); - } - else if (spa_strstartswith(c, "c=IN IP6 ")) { - struct sockaddr_in6 *sa = (struct sockaddr_in6*) &info->sa; - - c += strlen("c=IN IP6 "); - if (inet_pton(AF_INET6, c, &sa->sin6_addr) <= 0) { - res = -errno; - pw_log_warn("inet_pton(%s) failed: %m", c); - goto error; - } - - sa->sin6_family = AF_INET6; - info->salen = sizeof(struct sockaddr_in6); - } else - return -EINVAL; - - - res= 0; -error: - return res; -} - -static int parse_sdp_m(struct impl *impl, char *c, struct sdp_info *info) -{ - int port, payload; - - if (!spa_strstartswith(c, "m=audio ")) - return -EINVAL; - - c += strlen("m=audio "); - if (sscanf(c, "%i RTP/AVP %i", &port, &payload) != 2) - return -EINVAL; - - if (port <= 0 || port > 0xFFFF) - return -EINVAL; - - if (payload < 0 || payload > 127) - return -EINVAL; - - info->port = (uint16_t) port; - info->payload = (uint8_t) payload; - - return 0; -} - -// some AES67 devices have channelmap encoded in i=* -// if `i` record is found, it matches the template -// and channel count matches, name the channels respectively -// `i=2 channels: 01, 08` is the format -static int parse_sdp_i(struct impl *impl, char *c, struct sdp_info *info) -{ - if (!strstr(c, " channels: ")) { - return 0; - } - - c += strlen("i="); - c[strcspn(c, " ")] = '\0'; - - uint32_t channels; - if (sscanf(c, "%u", &channels) != 1 || channels <= 0 || channels > SPA_AUDIO_MAX_CHANNELS) - return 0; - - c += strcspn(c, "\0"); - c += strlen(" channels: "); - - strncpy(info->channelmap, c, sizeof(info->channelmap) - 1); - - return 0; -} - -static int parse_sdp_a_rtpmap(struct impl *impl, char *c, struct sdp_info *info) -{ - int payload, len, rate, channels; - - if (!spa_strstartswith(c, "a=rtpmap:")) - return 0; - - c += strlen("a=rtpmap:"); - - if (sscanf(c, "%i %n", &payload, &len) != 1) - return -EINVAL; - - if (payload < 0 || payload > 127) - return -EINVAL; - - if (payload != info->payload) - return 0; - - c += len; - c[strcspn(c, "/")] = 0; - - info->format_info = find_format_info(c); - if (info->format_info == NULL) - return -EINVAL; - - info->stride = info->format_info->size; - - info->info.media_subtype = info->format_info->media_subtype; - - c += strlen(c) + 1; - - switch (info->info.media_subtype) { - case SPA_MEDIA_SUBTYPE_raw: - info->info.media_type = SPA_MEDIA_TYPE_audio; - info->info.info.raw.format = info->format_info->format; - if (sscanf(c, "%u/%u", &rate, &channels) == 2) { - info->info.info.raw.channels = channels; - } else if (sscanf(c, "%u", &rate) == 1) { - info->info.info.raw.channels = 1; - } else - return -EINVAL; - - info->info.info.raw.rate = rate; - - pw_log_debug("rate: %d, ch: %d", rate, channels); - - if (channels == 1) { - info->info.info.raw.position[0] = SPA_AUDIO_CHANNEL_MONO; - } else if (channels == 2) { - info->info.info.raw.position[0] = SPA_AUDIO_CHANNEL_FL; - info->info.info.raw.position[1] = SPA_AUDIO_CHANNEL_FR; - } - info->stride *= channels; - info->rate = rate; - break; - case SPA_MEDIA_SUBTYPE_control: - info->info.media_type = SPA_MEDIA_TYPE_application; - if (sscanf(c, "%u", &rate) != 1) - return -EINVAL; - info->rate = rate; - break; - } - return 0; -} - -static int parse_sdp_a_mediaclk(struct impl *impl, char *c, struct sdp_info *info) -{ - if (!spa_strstartswith(c, "a=mediaclk:")) - return 0; - - c += strlen("a=mediaclk:"); - - if (spa_strstartswith(c, "direct=")) { - int offset; - c += strlen("direct="); - if (sscanf(c, "%i", &offset) != 1) - return -EINVAL; - info->ts_offset = offset; - } else if (spa_strstartswith(c, "sender")) { - info->ts_offset = 0; - } - return 0; -} - -static int parse_sdp_a_ts_refclk(struct impl *impl, char *c, struct sdp_info *info) -{ - if (!spa_strstartswith(c, "a=ts-refclk:")) - return 0; - - c += strlen("a=ts-refclk:"); - snprintf(info->refclk, sizeof(info->refclk), "%s", c); - return 0; -} - -static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) -{ - char *s = sdp; - int count = 0, res = 0; - size_t l; - - while (*s) { - if ((l = strcspn(s, "\r\n")) < 2) - goto too_short; - - s[l] = 0; - pw_log_debug("%d: %s", count, s); - - if (count++ == 0 && strcmp(s, "v=0") != 0) - goto invalid_version; - - if (spa_strstartswith(s, "o=")) - snprintf(info->origin, sizeof(info->origin), "%s", &s[2]); - else if (spa_strstartswith(s, "s=")) - snprintf(info->session, sizeof(info->session), "%s", &s[2]); - else if (spa_strstartswith(s, "c=")) - res = parse_sdp_c(impl, s, info); - else if (spa_strstartswith(s, "m=")) - res = parse_sdp_m(impl, s, info); - else if (spa_strstartswith(s, "a=rtpmap:")) - res = parse_sdp_a_rtpmap(impl, s, info); - else if (spa_strstartswith(s, "a=mediaclk:")) - res = parse_sdp_a_mediaclk(impl, s, info); - else if (spa_strstartswith(s, "a=ts-refclk:")) - res = parse_sdp_a_ts_refclk(impl, s, info); - else if (spa_strstartswith(s, "i=")) - res = parse_sdp_i(impl, s, info); - - if (res < 0) - goto error; - s += l + 1; - while (isspace(*s)) - s++; - } - if (((struct sockaddr*) &info->sa)->sa_family == AF_INET) - ((struct sockaddr_in*) &info->sa)->sin_port = htons(info->port); - else - ((struct sockaddr_in6*) &info->sa)->sin6_port = htons(info->port); - - return 0; -too_short: - pw_log_warn("SDP: line starting with `%.6s...' too short", s); - return -EINVAL; -invalid_version: - pw_log_warn("SDP: invalid first version line `%*s'", (int)l, s); - return -EINVAL; -error: - pw_log_warn("SDP: error: %s", spa_strerror(res)); - return res; -} - -static int parse_sap(struct impl *impl, void *data, size_t len) -{ - struct sap_header *header; - char *mime, *sdp; - struct sdp_info info; - struct session *sess; - int res; - size_t offs; - bool bye; - - if (len < 8) - return -EINVAL; - - header = (struct sap_header*) data; - if (header->v != 1) - return -EINVAL; - - if (header->e) - return -ENOTSUP; - if (header->c) - return -ENOTSUP; - - offs = header->a ? 12 : 8; - offs += header->auth_len * 4; - if (len <= offs) - return -EINVAL; - - mime = SPA_PTROFF(data, offs, char); - if (spa_strstartswith(mime, "v=0")) { - sdp = mime; - mime = SAP_MIME_TYPE; - } else if (spa_streq(mime, SAP_MIME_TYPE)) - sdp = SPA_PTROFF(mime, strlen(mime)+1, char); - else - return -EINVAL; - - pw_log_debug("got sap: %s %s", mime, sdp); - - spa_zero(info); - if ((res = parse_sdp(impl, sdp, &info)) < 0) - return res; - - bye = header->t; - - sess = session_find(impl, &info); - if (sess == NULL) { - if (!bye) - session_new(impl, &info); - } else { - if (bye) - session_free(sess); - else - session_touch(sess); - } - return res; -} - -static void -on_sap_io(void *data, int fd, uint32_t mask) -{ - struct impl *impl = data; - - if (mask & SPA_IO_IN) { - uint8_t buffer[2048]; - ssize_t len; - - if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) { - pw_log_warn("recv error: %m"); - return; - } - if ((size_t)len >= sizeof(buffer)) - return; - - buffer[len] = 0; - parse_sap(impl, buffer, len); - } -} - -static int start_sap_listener(struct impl *impl) -{ - struct sockaddr_in sa4; - struct sockaddr_in6 sa6; - struct sockaddr *sa; - socklen_t salen; - int fd, res; - - if (inet_pton(AF_INET, impl->sap_ip, &sa4.sin_addr) > 0) { - sa4.sin_family = AF_INET; - sa4.sin_port = htons(impl->sap_port); - sa = (struct sockaddr*) &sa4; - salen = sizeof(sa4); - } else if (inet_pton(AF_INET6, impl->sap_ip, &sa6.sin6_addr) > 0) { - sa6.sin6_family = AF_INET6; - sa6.sin6_port = htons(impl->sap_port); - sa = (struct sockaddr*) &sa6; - salen = sizeof(sa6); - } else - return -EINVAL; - - if ((fd = make_socket(sa, salen, impl->ifname)) < 0) - return fd; - - pw_log_info("starting SAP listener"); - impl->sap_source = pw_loop_add_io(impl->loop, fd, - SPA_IO_IN, true, on_sap_io, impl); - if (impl->sap_source == NULL) { - res = -errno; - goto error; - } - return 0; -error: - close(fd); - return res; - -} - static void on_timer_event(void *data, uint64_t expirations) { struct impl *impl = data; - struct timespec now; - struct session *sess, *tmp; - uint64_t timestamp, interval; - clock_gettime(CLOCK_MONOTONIC, &now); - timestamp = SPA_TIMESPEC_TO_NSEC(&now); - interval = impl->cleanup_interval * SPA_NSEC_PER_SEC; - - spa_list_for_each_safe(sess, tmp, &impl->sessions, link) { - if (sess->timestamp + interval < timestamp) { - pw_log_debug("More than %lu elapsed from last advertisement at %lu", - interval, sess->timestamp); - if (!sess->receiving) { - pw_log_info("SAP timeout, closing inactive RTP source"); - session_free(sess); - } else { - pw_log_info("SAP timeout, keeping active RTP source"); - } - } - sess->receiving = false; + if (!impl->receiving) { + pw_log_info("timeout, closing inactive RTP source"); + pw_impl_module_schedule_destroy(impl->module); + } else { + pw_log_debug("timeout, keeping active RTP source"); } + impl->receiving = false; } static void core_destroy(void *d) @@ -1468,15 +978,14 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { - struct session *sess; - spa_list_consume(sess, &impl->sessions, link) - session_free(sess); + if (impl->stream) + pw_stream_destroy(impl->stream); + if (impl->source) + pw_loop_destroy_source(impl->data_loop, impl->source); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->sap_source) - pw_loop_destroy_source(impl->loop, impl->sap_source); if (impl->timer) pw_loop_destroy_source(impl->loop, impl->timer); @@ -1484,7 +993,6 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->props); free(impl->ifname); - free(impl->sap_ip); free(impl); } @@ -1516,13 +1024,82 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; +static inline uint32_t format_from_name(const char *name, size_t len) +{ + int i; + for (i = 0; spa_type_audio_format[i].name; i++) { + if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) + return spa_type_audio_format[i].type; + } + return SPA_AUDIO_FORMAT_UNKNOWN; +} + +static uint32_t channel_from_name(const char *name) +{ + int i; + for (i = 0; spa_type_audio_channel[i].name; i++) { + if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) + return spa_type_audio_channel[i].type; + } + return SPA_AUDIO_CHANNEL_UNKNOWN; +} + +static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) +{ + struct spa_json it[2]; + char v[256]; + + spa_json_init(&it[0], val, len); + if (spa_json_enter_array(&it[0], &it[1]) <= 0) + spa_json_init(&it[1], val, len); + + info->channels = 0; + while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && + info->channels < SPA_AUDIO_MAX_CHANNELS) { + info->position[info->channels++] = channel_from_name(v); + } +} + +static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) +{ + const char *str; + + spa_zero(*info); + if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) + str = DEFAULT_FORMAT; + info->format = format_from_name(str, strlen(str)); + + info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); + if (info->rate == 0) + info->rate = DEFAULT_RATE; + + info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); + info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); + if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) + parse_position(info, str, strlen(str)); + if (info->channels == 0) + parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); +} + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); + uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); + uint32_t pid = getpid(); struct impl *impl; const char *str; struct timespec value, interval; + struct pw_properties *props, *stream_props; int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -1531,14 +1108,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (impl == NULL) return -errno; - spa_list_init(&impl->sessions); - if (args == NULL) args = ""; - impl->props = pw_properties_new_string(args); - impl->stream_props = pw_properties_new(NULL, NULL); - if (impl->props == NULL || impl->stream_props == NULL) { + props = impl->props = pw_properties_new_string(args); + stream_props = impl->stream_props = pw_properties_new(NULL, NULL); + if (props == NULL || stream_props == NULL) { res = -errno; pw_log_error( "can't create properties: %m"); goto out; @@ -1549,31 +1124,124 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->loop = pw_context_get_main_loop(context); impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context)); - if (pw_properties_get(impl->stream_props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(impl->stream_props, PW_KEY_NODE_VIRTUAL, "true"); - if (pw_properties_get(impl->stream_props, PW_KEY_NODE_NETWORK) == NULL) - pw_properties_set(impl->stream_props, PW_KEY_NODE_NETWORK, "true"); + if (pw_properties_get(stream_props, PW_KEY_NODE_VIRTUAL) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_VIRTUAL, "true"); + if (pw_properties_get(stream_props, PW_KEY_NODE_NETWORK) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_NETWORK, "true"); - if ((str = pw_properties_get(impl->props, "stream.props")) != NULL) - pw_properties_update_string(impl->stream_props, str, strlen(str)); + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp-source-%u-%u", pid, id); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, + pw_properties_get(props, PW_KEY_NODE_NAME)); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Receiver Stream"); - str = pw_properties_get(impl->props, "local.ifname"); + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_AUDIO_FORMAT); + copy_props(impl, props, PW_KEY_AUDIO_RATE); + copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + if ((str = pw_properties_get(stream_props, "rtp.media")) != NULL) { + if (spa_streq(str, "audio")) { + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + } + else if (spa_streq(str, "midi")) { + impl->info.media_type = SPA_MEDIA_TYPE_application; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; + } + else { + pw_log_error("unsupported media type:%s", str); + res = -EINVAL; + goto out; + } + } + + switch (impl->info.media_type) { + case SPA_MEDIA_TYPE_audio: + parse_audio_info(stream_props, &impl->info.info.raw); + impl->stride = audio_get_stride(&impl->info); + if (impl->stride == 0) { + pw_log_error("unsupported audio format:%d channels:%d", + impl->info.info.raw.format, impl->info.info.raw.channels); + res = -EINVAL; + goto out; + } + impl->rate = impl->info.info.raw.rate; + break; + case SPA_MEDIA_TYPE_application: + pw_properties_set(stream_props, PW_KEY_FORMAT_DSP, "8 bit raw midi"); + impl->stride = 1; + impl->rate = 48000; + break; + default: + spa_assert_not_reached(); + break; + } + + str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; - impl->always_process = pw_properties_get_bool(impl->props, PW_KEY_NODE_ALWAYS_PROCESS, false); + impl->src_port = pw_properties_get_uint32(stream_props, "source.port", 0); + if (impl->src_port == 0) { + pw_log_error("invalid source.port"); + goto out; + } + if ((str = pw_properties_get(stream_props, "source.ip")) == NULL || + (res = parse_address(str, impl->src_port, &impl->src_addr, &impl->src_len)) < 0) { + pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res)); + goto out; + } - str = pw_properties_get(impl->props, "sap.ip"); - impl->sap_ip = strdup(str ? str : DEFAULT_SAP_IP); - impl->sap_port = pw_properties_get_uint32(impl->props, - "sap.port", DEFAULT_SAP_PORT); - impl->sess_latency_msec = pw_properties_get_uint32(impl->props, + impl->always_process = pw_properties_get_bool(stream_props, + PW_KEY_NODE_ALWAYS_PROCESS, true); + + impl->cleanup_interval = pw_properties_get_uint32(props, + "cleanup.sec", DEFAULT_CLEANUP_SEC); + + if ((str = pw_properties_get(props, "sess.name")) != NULL) { + pw_properties_set(stream_props, "rtp.session", str); + if (pw_properties_get(stream_props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(stream_props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", str); + if (pw_properties_get(stream_props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(stream_props, PW_KEY_NODE_NAME, "%s", str); + } else { + if (pw_properties_get(stream_props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_set(stream_props, PW_KEY_MEDIA_NAME, "RTP Stream"); + } + + impl->ts_offset = pw_properties_get_int64(stream_props, "sess.ts-offset", 0); + pw_properties_setf(stream_props, "rtp.ts-offset", "%u", impl->ts_offset); + + impl->direct_timestamp = pw_properties_get_bool(stream_props, + "sess.ts-direct", false); + + impl->sess_latency_msec = pw_properties_get_uint32(stream_props, "sess.latency.msec", DEFAULT_SESS_LATENCY); - impl->cleanup_interval = pw_properties_get_uint32(impl->props, - "sap.interval.sec", DEFAULT_CLEANUP_INTERVAL_SEC); + impl->target_buffer = msec_to_samples(impl, impl->sess_latency_msec); + impl->max_error = msec_to_samples(impl, ERROR_MSEC); + + pw_properties_setf(stream_props, PW_KEY_NODE_RATE, "1/%d", impl->rate); + pw_properties_setf(stream_props, PW_KEY_NODE_LATENCY, "%d/%d", + impl->target_buffer / 2, impl->rate); impl->core = pw_context_get_object(impl->module_context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { - str = pw_properties_get(impl->props, PW_KEY_REMOTE_NAME); + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); impl->core = pw_context_connect(impl->module_context, pw_properties_new( PW_KEY_REMOTE_NAME, str, @@ -1600,13 +1268,13 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_error("can't create timer source: %m"); goto out; } - value.tv_sec = 0; - value.tv_nsec = 1; + value.tv_sec = impl->cleanup_interval; + value.tv_nsec = 0; interval.tv_sec = impl->cleanup_interval; interval.tv_nsec = 0; pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); - if ((res = start_sap_listener(impl)) < 0) + if ((res = setup_stream(impl)) < 0) goto out; pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);