From a48822c38a8eb9e54c6f9044fafa5ff3f4e003cd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 21 Feb 2023 17:18:52 +0100 Subject: [PATCH] module-rtp: make source from module Add match rules to SAP to decide when to announce and create RTP streams. Use SAP to load an RTP source. Remove SAP code from RTP source. --- src/modules/module-rtp-sap.c | 941 ++++++++++++++++++++---- src/modules/module-rtp-sink.c | 20 +- src/modules/module-rtp-source.c | 1200 +++++++++++-------------------- 3 files changed, 1223 insertions(+), 938 deletions(-) diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index d505ba2ea..8ffd27307 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -17,38 +17,33 @@ #include #include -#include -#include #include -#include #include #include #include -#include - -/** \page page_module_rtp_sap PipeWire Module: Announce and receive RTP streams +/** \page page_module_rtp_sap PipeWire Module: Announce and create RTP streams * - * The `rtp-sap` module announces RTP stream with the sess.sap.announce property - * set to true. + * The `rtp-sap` module announces RTP streams that match the rules with the + * announce-stream action. * - * It will also create source rtp streams that are announced with SAP when they - * match the pattern. + * It will create source RTP streams that are announced with SAP when they + * match the rule with the create-stream action. * * ## Module Options * * Options specific to the behavior of this module * + * - `local.ifname = `: interface name to use * - `sap.ip = `: IP address of the SAP messages, default "224.0.0.56" * - `sap.port = `: port of the SAP messages, default 9875 + * - `sap.cleanup.sec = `: cleanup interval in seconds, default 90 seconds * - `source.ip =`: source IP address, default "0.0.0.0" - * - `destination.ip =`: destination IP address, default "224.0.0.56" - * - `destination.port =`: destination port, default random beteen 46000 and 47024 - * - `local.ifname = `: interface name to use * - `net.ttl = `: TTL to use, default 1 * - `net.loop = `: loopback multicast, default false + * - `stream.rules` = : match rules, use create-stream and announce-stream actions * * ## General options * @@ -61,16 +56,13 @@ * context.modules = [ * { name = libpipewire-module-rtp-sap * args = { + * #local.ifname = "eth0" * #sap.ip = "224.0.0.56" * #sap.port = 9875 + * #sap.cleanup.sec = 5 * #source.ip = "0.0.0.0" - * #local.ifname = "eth0" * #net.ttl = 1 * #net.loop = false - * stream.props = { - * #media.class = "Audio/Source" - * #node.name = "rtp-source" - * } * stream.rules = [ * { matches = [ * # any of the items in matches needs to match, if one does, @@ -82,6 +74,25 @@ * #rtp.session = "PipeWire RTP Stream on fedora" * #rtp.ts-offset = 0 * #rtp.ts-refclk = "private" + * sess.sap.announce = true + * } + * ] + * actions = { + * announce-stream = { + * #sess.latency.msec = 100 + * #sess.ts-direct = false + * #target.object = "" + * } + * } + * } + * { matches = [ + * { # all keys must match the value. ~ in value starts regex. + * #rtp.origin = "wim 3883629975 0 IN IP4 0.0.0.0" + * #rtp.payload = "127" + * #rtp.fmt = "L16/48000/2" + * #rtp.session = "PipeWire RTP Stream on fedora" + * #rtp.ts-offset = 0 + * #rtp.ts-refclk = "private" * } * ] * actions = { @@ -94,9 +105,8 @@ * } * ] * } - * } - *} - *] + * } + * ] *\endcode * * \since 0.3.67 @@ -107,58 +117,73 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic +#define MAX_SESSIONS 64 + +#define DEFAULT_CLEANUP_SEC 90 #define SAP_INTERVAL_SEC 5 #define SAP_MIME_TYPE "application/sdp" #define DEFAULT_SAP_IP "224.0.0.56" #define DEFAULT_SAP_PORT 9875 -#define DEFAULT_PORT 46000 #define DEFAULT_SOURCE_IP "0.0.0.0" -#define DEFAULT_DESTINATION_IP "224.0.0.56" #define DEFAULT_TTL 1 #define DEFAULT_LOOP false -#define USAGE "sap.ip= " \ +#define USAGE "local.ifname= " \ + "sap.ip= " \ "sap.port= " \ + "sap.cleanup.sec= " \ "source.ip= " \ - "destination.ip= " \ - "local.ifname= " \ "net.ttl= " \ - "net.loop= " + "net.loop= " \ + "stream.rules=, use announce-stream and create-stream actions " static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, - { PW_KEY_MODULE_DESCRIPTION, "RTP Sink" }, + { PW_KEY_MODULE_DESCRIPTION, "RTP SAP announce/listen" }, { PW_KEY_MODULE_USAGE, USAGE }, { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; -struct session { - struct spa_list link; - - struct impl *impl; - struct node *node; - - uint16_t msg_id_hash; +struct sdp_info { + uint16_t hash; uint32_t ntp; - uint32_t ts_offset; - char *ts_refclk; - + char *origin; + char *session_name; char *media_type; char *mime_type; - char *session_name; - int payload; - uint32_t rate; - uint32_t channels; - float ptime; + char channelmap[512]; uint16_t dst_port; struct sockaddr_storage dst_addr; socklen_t dst_len; uint16_t ttl; + uint16_t port; + uint8_t payload; + + uint32_t rate; + uint32_t channels; + + float ptime; + + uint32_t ts_offset; + char *ts_refclk; +}; + +struct session { + struct spa_list link; + + bool announce; + uint64_t timestamp; + + struct impl *impl; + struct node *node; + + struct sdp_info info; + unsigned has_sent_sap:1; struct pw_properties *props; @@ -190,14 +215,13 @@ struct impl { struct pw_core *core; struct spa_hook core_listener; struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; struct pw_registry *registry; struct spa_hook registry_listener; struct spa_source *timer; - unsigned int do_disconnect:1; - char *ifname; bool ttl; bool mcast_loop; @@ -208,12 +232,74 @@ struct impl { uint16_t sap_port; struct sockaddr_storage sap_addr; socklen_t sap_len; - int sap_fd; + struct spa_source *sap_source; + uint32_t cleanup_interval; + uint32_t n_sessions; struct spa_list sessions; }; +struct format_info { + uint32_t media_subtype; + uint32_t format; + uint32_t size; + const char *mime; + const char *media_type; + const char *format_str; +}; + +static const struct format_info audio_format_info[] = { + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_U8, 1, "L8", "audio", "U8" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio", "ALAW" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio", "ULAW" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio", "S16BE" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio", "S16LE" }, + { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "midi", NULL }, +}; + +static const struct format_info *find_audio_format_info(const char *mime) +{ + SPA_FOR_EACH_ELEMENT_VAR(audio_format_info, f) + if (spa_streq(f->mime, mime)) + return f; + return NULL; +} + +static void send_sap(struct impl *impl, struct session *sess, bool bye); + + +static void clear_sdp_info(struct sdp_info *info) +{ + free(info->origin); + free(info->session_name); + free(info->media_type); + free(info->mime_type); + free(info->ts_refclk); + spa_zero(*info); +} + +static void session_touch(struct session *sess) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + sess->timestamp = SPA_TIMESPEC_TO_NSEC(&ts); +} + +static void session_free(struct session *sess) +{ + struct impl *impl = sess->impl; + + if (sess->impl) { + send_sap(impl, sess, 1); + spa_list_remove(&sess->link); + impl->n_sessions++; + } + pw_properties_free(sess->props); + clear_sdp_info(&sess->info); + free(sess); +} + static int parse_address(const char *address, uint16_t port, struct sockaddr_storage *addr, socklen_t *len) { @@ -249,15 +335,60 @@ static bool is_multicast(struct sockaddr *sa, socklen_t salen) static int make_socket(struct sockaddr_storage *src, socklen_t src_len, struct sockaddr_storage *dst, socklen_t dst_len, - bool loop, int ttl) + bool loop, int ttl, char *ifname) { int af, fd, val, res; + struct ifreq req; af = src->ss_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { pw_log_error("socket failed: %m"); return -errno; } + val = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { + res = -errno; + pw_log_error("setsockopt failed: %m"); + goto error; + } + spa_zero(req); + if (ifname) { + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", ifname); + res = ioctl(fd, SIOCGIFINDEX, &req); + if (res < 0) + pw_log_warn("SIOCGIFINDEX %s failed: %m", ifname); + } + res = 0; + if (is_multicast((struct sockaddr*)dst, dst_len)) { + val = loop; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_MULTICAST_LOOP) failed: %m"); + + val = ttl; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_MULTICAST_TTL) failed: %m"); + + if (af == AF_INET) { + struct sockaddr_in *sa4 = (struct sockaddr_in*)dst; + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = sa4->sin_addr; + mr4.imr_ifindex = req.ifr_ifindex; + res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); + } else if (af == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)dst; + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = sa6->sin6_addr; + mr6.ipv6mr_interface = req.ifr_ifindex; + res = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); + } + if (res < 0) { + res = -errno; + pw_log_error("join mcast failed: %m"); + goto error; + } + } if (bind(fd, (struct sockaddr*)src, src_len) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -268,15 +399,6 @@ static int make_socket(struct sockaddr_storage *src, socklen_t src_len, pw_log_error("connect() failed: %m"); goto error; } - if (is_multicast((struct sockaddr*)dst, dst_len)) { - val = loop; - if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)) < 0) - pw_log_warn("setsockopt(IP_MULTICAST_LOOP) failed: %m"); - - val = ttl; - if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &val, sizeof(val)) < 0) - pw_log_warn("setsockopt(IP_MULTICAST_TTL) failed: %m"); - } return fd; error: close(fd); @@ -304,14 +426,17 @@ static void send_sap(struct impl *impl, struct session *sess, bool bye) struct iovec iov[4]; struct msghdr msg; struct spa_strbuf buf; + struct sdp_info *sdp = &sess->info; + if (!sess->announce) + return; if (!sess->has_sent_sap && bye) return; spa_zero(header); header.v = 1; header.t = bye; - header.msg_id_hash = sess->msg_id_hash; + header.msg_id_hash = sdp->hash; iov[0].iov_base = &header; iov[0].iov_len = sizeof(header); @@ -330,14 +455,14 @@ static void send_sap(struct impl *impl, struct session *sess, bool bye) iov[2].iov_len = sizeof(SAP_MIME_TYPE); get_ip(&impl->src_addr, src_addr, sizeof(src_addr)); - get_ip(&sess->dst_addr, dst_addr, sizeof(dst_addr)); + get_ip(&sdp->dst_addr, dst_addr, sizeof(dst_addr)); if ((user_name = pw_get_user_name()) == NULL) user_name = "-"; spa_zero(dst_ttl); - if (is_multicast((struct sockaddr*)&sess->dst_addr, sess->dst_len)) - snprintf(dst_ttl, sizeof(dst_ttl), "/%d", sess->ttl); + if (is_multicast((struct sockaddr*)&sdp->dst_addr, sdp->dst_len)) + snprintf(dst_ttl, sizeof(dst_ttl), "/%d", sdp->ttl); spa_strbuf_init(&buf, buffer, sizeof(buffer)); spa_strbuf_append(&buf, @@ -349,40 +474,47 @@ static void send_sap(struct impl *impl, struct session *sess, bool bye) "a=recvonly\n" "a=tool:PipeWire %s\n" "a=type:broadcast\n", - user_name, sess->ntp, af, src_addr, - sess->session_name, + user_name, sdp->ntp, af, src_addr, + sdp->session_name, af, dst_addr, dst_ttl, - sess->ntp, + sdp->ntp, pw_get_library_version()); spa_strbuf_append(&buf, "m=%s %u RTP/AVP %i\n", - sess->media_type, - sess->dst_port, sess->payload); + sdp->media_type, + sdp->dst_port, sdp->payload); - if (sess->channels) { + if (sdp->channels) { spa_strbuf_append(&buf, "a=rtpmap:%i %s/%u/%u\n", - sess->payload, sess->mime_type, - sess->rate, sess->channels); + sdp->payload, sdp->mime_type, + sdp->rate, sdp->channels); + if (sdp->channelmap[0] != 0) { + spa_strbuf_append(&buf, + "i=%d channels: %s\n", sdp->channels, + sdp->channelmap); + } } else { spa_strbuf_append(&buf, "a=rtpmap:%i %s/%u\n", - sess->payload, sess->mime_type, sess->rate); + sdp->payload, sdp->mime_type, sdp->rate); } - if (sess->ptime != 0) + if (sdp->ptime != 0) spa_strbuf_append(&buf, - "a=ptime:%f\n", sess->ptime); + "a=ptime:%f\n", sdp->ptime); - if (sess->ts_refclk != NULL) { + if (sdp->ts_refclk != NULL) { spa_strbuf_append(&buf, "a=ts-refclk:%s\n" "a=mediaclk:direct=%u\n", - sess->ts_refclk, - sess->ts_offset); + sdp->ts_refclk, + sdp->ts_offset); } else { spa_strbuf_append(&buf, "a=mediaclk:sender\n"); } + pw_log_debug("sending SAP for %u", sess->node->id); + iov[3].iov_base = buffer; iov[3].iov_len = strlen(buffer); @@ -404,8 +536,558 @@ static void on_timer_event(void *data, uint64_t expirations) struct impl *impl = data; struct session *sess; - spa_list_for_each(sess, &impl->sessions, link) { + spa_list_for_each(sess, &impl->sessions, link) send_sap(impl, sess, 0); +} + +static struct session *session_find(struct impl *impl, const struct sdp_info *info) +{ + struct session *sess; + spa_list_for_each(sess, &impl->sessions, link) { + if (info->hash == sess->info.hash && + spa_streq(info->origin, sess->info.origin)) + return sess; + } + return NULL; +} + +static struct session *session_new_announce(struct impl *impl, struct node *node, + struct pw_properties *props) +{ + struct session *sess = NULL; + struct sdp_info *sdp; + const char *str; + uint32_t port; + int res; + + if (impl->n_sessions >= MAX_SESSIONS) { + pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); + errno = EMFILE; + return NULL; + } + + sess = calloc(1, sizeof(struct session)); + if (sess == NULL) + return NULL; + + sdp = &sess->info; + + sess->announce = true; + + sdp->hash = rand(); + sdp->ntp = (uint32_t) time(NULL) + 2208988800U; + sess->props = props; + + if ((str = pw_properties_get(props, "rtp.session")) != NULL) + sdp->session_name = strdup(str); + + if ((str = pw_properties_get(props, "rtp.destination.port")) == NULL) + goto error_free; + if (!spa_atou32(str, &port, 0)) + goto error_free; + sdp->dst_port = port; + + if ((str = pw_properties_get(props, "rtp.destination.ip")) == NULL) + goto error_free; + if ((res = parse_address(str, sdp->dst_port, &sdp->dst_addr, &sdp->dst_len)) < 0) { + pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); + goto error_free; + } + sdp->ttl = pw_properties_get_int32(props, "rtp.ttl", DEFAULT_TTL); + sdp->payload = pw_properties_get_int32(props, "rtp.payload", 127); + + if ((str = pw_properties_get(props, "rtp.media")) != NULL) + sdp->media_type = strdup(str); + if ((str = pw_properties_get(props, "rtp.mime")) != NULL) + sdp->mime_type = strdup(str); + if ((str = pw_properties_get(props, "rtp.rate")) != NULL) + sdp->rate = atoi(str); + if ((str = pw_properties_get(props, "rtp.channels")) != NULL) + sdp->channels = atoi(str); + if ((str = pw_properties_get(props, "rtp.ts-offset")) != NULL) + sdp->ts_offset = atoi(str); + if ((str = pw_properties_get(props, "rtp.ts-refclk")) != NULL) + sdp->ts_refclk = strdup(str); + if ((str = pw_properties_get(props, "rtp.channel-names")) != NULL) + snprintf(sdp->channelmap, sizeof(sdp->channelmap), "%s", str); + + pw_log_info("created new session for node:%u", node->id); + node->session = sess; + sess->node = node; + + sess->impl = impl; + spa_list_append(&impl->sessions, &sess->link); + impl->n_sessions++; + + return sess; + +error_free: + pw_log_warn("invalid session props"); + session_free(sess); + return NULL; +} + +static int session_load_source(struct session *session, struct pw_properties *props) +{ + struct impl *impl = session->impl; + struct pw_context *context = pw_impl_module_get_context(impl->module); + FILE *f; + char *args; + size_t size; + const char *str, *media; + + if ((f = open_memstream(&args, &size)) == NULL) { + pw_log_error("Can't open memstream: %m"); + return -errno; + } + + if ((str = pw_properties_get(props, "rtp.destination.ip")) != NULL) + pw_properties_set(props, "source.ip", str); + if ((str = pw_properties_get(props, "rtp.destination.port")) != NULL) + pw_properties_set(props, "source.port", str); + if ((str = pw_properties_get(props, "rtp.session")) != NULL) + pw_properties_set(props, "sess.name", str); + + if ((media = pw_properties_get(props, "rtp.media")) == NULL) + media = "audio"; + + if (spa_streq(media, "audio")) { + const char *mime; + const struct format_info *format_info; + + if ((mime = pw_properties_get(props, "rtp.mime")) == NULL) { + pw_log_error("missing rtp.mime property"); + return -EINVAL; + } + format_info = find_audio_format_info(mime); + if (format_info == NULL) { + pw_log_error("unknown rtp.mime type %s", mime); + return -EINVAL; + } + pw_properties_set(props, "rtp.media", format_info->media_type); + if (format_info->format_str != NULL) { + pw_properties_set(props, "audio.format", format_info->format_str); + if ((str = pw_properties_get(props, "rtp.rate")) != NULL) + pw_properties_set(props, "audio.rate", str); + if ((str = pw_properties_get(props, "rtp.channels")) != NULL) + pw_properties_set(props, "audio.channels", str); + } + } else { + pw_log_error("Unhandled media %s", media); + return -EINVAL; + } + if ((str = pw_properties_get(props, "rtp.ts-offset")) != NULL) + pw_properties_set(props, "sess.ts-offset", str); + + fprintf(f, "{"); + fprintf(f, " stream.props = {"); + pw_properties_serialize_dict(f, &props->dict, 0); + fprintf(f, " }"); + fprintf(f, "}"); + fclose(f); + + pw_log_info("loading new RTP source"); + session->module = pw_context_load_module(context, + "libpipewire-module-rtp-source", + args, NULL); + free(args); + + if (session->module == NULL) { + pw_log_error("Can't load module: %m"); + return -errno; + } + return 0; +} + +struct match_info { + struct impl *impl; + struct session *session; + struct node *node; + struct pw_properties *props; + bool matched; +}; + +static int rule_matched(void *data, const char *location, const char *action, + const char *str, size_t len) +{ + struct match_info *i = data; + int res = 0; + + i->matched = true; + if (i->session && spa_streq(action, "create-stream")) { + pw_properties_update_string(i->props, str, len); + + session_load_source(i->session, i->props); + } + else if (i->node && spa_streq(action, "announce-stream")) { + struct pw_properties *props; + + if ((props = pw_properties_new_dict(i->node->info->props)) == NULL) + return -errno; + + pw_properties_update_string(props, str, len); + + session_new_announce(i->impl, i->node, props); + } + return res; +} + +static struct session *session_new(struct impl *impl, const struct sdp_info *info) +{ + struct session *session; + struct pw_properties *props; + const char *str; + char dst_addr[64]; + + if (impl->n_sessions >= MAX_SESSIONS) { + pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); + errno = EMFILE; + return NULL; + } + + session = calloc(1, sizeof(struct session)); + if (session == NULL) + return NULL; + + session->info = *info; + session->announce = false; + + props = pw_properties_new(NULL, NULL); + if (props == NULL) + goto error; + + session->impl = impl; + spa_list_append(&impl->sessions, &session->link); + impl->n_sessions++; + + pw_properties_set(props, "rtp.origin", info->origin); + if (info->session_name != NULL) { + pw_properties_set(props, "rtp.session", info->session_name); + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", + info->session_name); + pw_properties_setf(props, PW_KEY_NODE_NAME, "%s", + info->session_name); + } else { + pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream"); + } + + get_ip(&info->dst_addr, dst_addr, sizeof(dst_addr)); + pw_properties_setf(props, "rtp.destination.ip", "%s", dst_addr); + pw_properties_setf(props, "rtp.destination.port", "%u", info->dst_port); + pw_properties_setf(props, "rtp.payload", "%u", info->payload); + pw_properties_setf(props, "rtp.media", "%s", info->media_type); + pw_properties_setf(props, "rtp.mime", "%s", info->mime_type); + pw_properties_setf(props, "rtp.rate", "%u", info->rate); + pw_properties_setf(props, "rtp.channels", "%u", info->channels); + + pw_properties_setf(props, "rtp.ts-offset", "%u", info->ts_offset); + pw_properties_set(props, "rtp.ts-refclk", info->ts_refclk); + + if (info->channelmap[0]) + pw_properties_set(props, PW_KEY_NODE_CHANNELNAMES, info->channelmap); + + if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { + struct match_info minfo = { + .impl = impl, + .session = session, + .props = props, + }; + pw_conf_match_rules(str, strlen(str), NAME, &props->dict, + rule_matched, &minfo); + } + session->props = props; + + return NULL; +error: + session_free(session); + return NULL; +} + +static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) +{ + int res; + + c[strcspn(c, "/")] = 0; + if (spa_strstartswith(c, "c=IN IP4 ")) { + struct sockaddr_in *sa = (struct sockaddr_in*) &info->dst_addr; + + c += strlen("c=IN IP4 "); + if (inet_pton(AF_INET, c, &sa->sin_addr) <= 0) { + res = -errno; + pw_log_warn("inet_pton(%s) failed: %m", c); + goto error; + } + sa->sin_family = AF_INET; + info->dst_len = sizeof(struct sockaddr_in); + } + else if (spa_strstartswith(c, "c=IN IP6 ")) { + struct sockaddr_in6 *sa = (struct sockaddr_in6*) &info->dst_addr; + + c += strlen("c=IN IP6 "); + if (inet_pton(AF_INET6, c, &sa->sin6_addr) <= 0) { + res = -errno; + pw_log_warn("inet_pton(%s) failed: %m", c); + goto error; + } + + sa->sin6_family = AF_INET6; + info->dst_len = sizeof(struct sockaddr_in6); + } else + return -EINVAL; + + + res= 0; +error: + return res; +} + +static int parse_sdp_m(struct impl *impl, char *c, struct sdp_info *info) +{ + int port, payload; + char media_type[12]; + + if (!spa_strstartswith(c, "m=")) + return -EINVAL; + + c += strlen("m="); + if (sscanf(c, "%11s %i RTP/AVP %i", media_type, &port, &payload) != 3) + return -EINVAL; + + if (port <= 0 || port > 0xFFFF) + return -EINVAL; + + if (payload < 0 || payload > 127) + return -EINVAL; + + info->media_type = strdup(media_type); + info->dst_port = (uint16_t) port; + info->payload = (uint8_t) payload; + + return 0; +} + +/* some AES67 devices have channelmap encoded in i=* + * if `i` record is found, it matches the template + * and channel count matches, name the channels respectively + * `i=2 channels: 01, 08` is the format */ +static int parse_sdp_i(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!strstr(c, " channels: ")) { + return 0; + } + + c += strlen("i="); + c[strcspn(c, " ")] = '\0'; + + uint32_t channels; + if (sscanf(c, "%u", &channels) != 1 || channels <= 0 || channels > SPA_AUDIO_MAX_CHANNELS) + return 0; + + c += strcspn(c, "\0"); + c += strlen(" channels: "); + + strncpy(info->channelmap, c, sizeof(info->channelmap) - 1); + + return 0; +} + +static int parse_sdp_a_rtpmap(struct impl *impl, char *c, struct sdp_info *info) +{ + int payload, len, rate, channels; + + if (!spa_strstartswith(c, "a=rtpmap:")) + return 0; + + c += strlen("a=rtpmap:"); + + if (sscanf(c, "%i %n", &payload, &len) != 1) + return -EINVAL; + + if (payload < 0 || payload > 127) + return -EINVAL; + + if (payload != info->payload) + return 0; + + c += len; + c[strcspn(c, "/")] = 0; + info->mime_type = strdup(c); + c += strlen(c) + 1; + + if (sscanf(c, "%u/%u", &rate, &channels) == 2) { + info->channels = channels; + info->rate = rate; + } else if (sscanf(c, "%u", &rate) == 1) { + info->rate = rate; + info->channels = 1; + } else + return -EINVAL; + + pw_log_debug("rate: %d, ch: %d", info->rate, info->channels); + + return 0; +} + +static int parse_sdp_a_mediaclk(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!spa_strstartswith(c, "a=mediaclk:")) + return 0; + + c += strlen("a=mediaclk:"); + + if (spa_strstartswith(c, "direct=")) { + int offset; + c += strlen("direct="); + if (sscanf(c, "%i", &offset) != 1) + return -EINVAL; + info->ts_offset = offset; + } else if (spa_strstartswith(c, "sender")) { + info->ts_offset = 0; + } + return 0; +} + +static int parse_sdp_a_ts_refclk(struct impl *impl, char *c, struct sdp_info *info) +{ + if (!spa_strstartswith(c, "a=ts-refclk:")) + return 0; + + c += strlen("a=ts-refclk:"); + info->ts_refclk = strdup(c); + return 0; +} + +static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) +{ + char *s = sdp; + int count = 0, res = 0; + size_t l; + + while (*s) { + if ((l = strcspn(s, "\r\n")) < 2) + goto too_short; + + s[l] = 0; + pw_log_debug("%d: %s", count, s); + + if (count++ == 0 && strcmp(s, "v=0") != 0) + goto invalid_version; + + if (spa_strstartswith(s, "o=")) + info->origin = strdup(&s[2]); + else if (spa_strstartswith(s, "s=")) + info->session_name = strdup(&s[2]); + else if (spa_strstartswith(s, "c=")) + res = parse_sdp_c(impl, s, info); + else if (spa_strstartswith(s, "m=")) + res = parse_sdp_m(impl, s, info); + else if (spa_strstartswith(s, "a=rtpmap:")) + res = parse_sdp_a_rtpmap(impl, s, info); + else if (spa_strstartswith(s, "a=mediaclk:")) + res = parse_sdp_a_mediaclk(impl, s, info); + else if (spa_strstartswith(s, "a=ts-refclk:")) + res = parse_sdp_a_ts_refclk(impl, s, info); + else if (spa_strstartswith(s, "i=")) + res = parse_sdp_i(impl, s, info); + + if (res < 0) + goto error; + s += l + 1; + while (isspace(*s)) + s++; + } + if (((struct sockaddr*) &info->dst_addr)->sa_family == AF_INET) + ((struct sockaddr_in*) &info->dst_addr)->sin_port = htons(info->dst_port); + else + ((struct sockaddr_in6*) &info->dst_addr)->sin6_port = htons(info->dst_port); + + return 0; +too_short: + pw_log_warn("SDP: line starting with `%.6s...' too short", s); + return -EINVAL; +invalid_version: + pw_log_warn("SDP: invalid first version line `%*s'", (int)l, s); + return -EINVAL; +error: + pw_log_warn("SDP: error: %s", spa_strerror(res)); + return res; +} + +static int parse_sap(struct impl *impl, void *data, size_t len) +{ + struct sap_header *header; + char *mime, *sdp; + struct sdp_info info; + struct session *sess; + int res; + size_t offs; + bool bye; + + if (len < 8) + return -EINVAL; + + header = (struct sap_header*) data; + if (header->v != 1) + return -EINVAL; + + if (header->e) + return -ENOTSUP; + if (header->c) + return -ENOTSUP; + + offs = header->a ? 12 : 8; + offs += header->auth_len * 4; + if (len <= offs) + return -EINVAL; + + mime = SPA_PTROFF(data, offs, char); + if (spa_strstartswith(mime, "v=0")) { + sdp = mime; + mime = SAP_MIME_TYPE; + } else if (spa_streq(mime, SAP_MIME_TYPE)) + sdp = SPA_PTROFF(mime, strlen(mime)+1, char); + else + return -EINVAL; + + pw_log_debug("got SAP: %s %s", mime, sdp); + + spa_zero(info); + if ((res = parse_sdp(impl, sdp, &info)) < 0) + return res; + + bye = header->t; + + sess = session_find(impl, &info); + if (sess == NULL) { + if (!bye) + session_new(impl, &info); + } else { + if (bye) + session_free(sess); + else + session_touch(sess); + } + return res; +} + +static void +on_sap_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + + if (mask & SPA_IO_IN) { + uint8_t buffer[2048]; + ssize_t len; + + if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) { + pw_log_warn("recv error: %m"); + return; + } + if ((size_t)len >= sizeof(buffer)) + return; + + buffer[len] = 0; + parse_sap(impl, buffer, len); } } @@ -416,7 +1098,8 @@ static int start_sap_announce(struct impl *impl) if ((fd = make_socket(&impl->src_addr, impl->src_len, &impl->sap_addr, impl->sap_len, - impl->mcast_loop, impl->ttl)) < 0) + impl->mcast_loop, impl->ttl, + impl->ifname)) < 0) return fd; impl->sap_fd = fd; @@ -434,93 +1117,26 @@ static int start_sap_announce(struct impl *impl) interval.tv_nsec = 0; pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); + pw_log_info("starting SAP listener"); + impl->sap_source = pw_loop_add_io(impl->loop, fd, + SPA_IO_IN, false, on_sap_io, impl); + if (impl->sap_source == NULL) { + res = -errno; + goto error; + } + return 0; error: close(fd); return res; } -static struct session *session_create(struct impl *impl, struct node *node) -{ - struct session *sess = NULL; - const char *str; - uint32_t port; - int res; - - sess = calloc(1, sizeof(struct session)); - if (sess == NULL) - return NULL; - - sess->impl = impl; - sess->node = node; - sess->msg_id_hash = rand(); - sess->ntp = (uint32_t) time(NULL) + 2208988800U; - - sess->props = pw_properties_new_dict(node->info->props); - if (sess->props == NULL) - goto error_free; - - if ((str = pw_properties_get(sess->props, "rtp.session")) != NULL) - sess->session_name = strdup(str); - - if ((str = pw_properties_get(sess->props, "rtp.destination.port")) == NULL) - goto error_free; - if (!spa_atou32(str, &port, 0)) - goto error_free; - sess->dst_port = port; - - if ((str = pw_properties_get(sess->props, "rtp.destination.ip")) == NULL) - goto error_free; - if ((res = parse_address(str, sess->dst_port, &sess->dst_addr, &sess->dst_len)) < 0) { - pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); - goto error_free; - } - sess->ttl = pw_properties_get_int32(sess->props, "rtp.ttl", DEFAULT_TTL); - sess->payload = pw_properties_get_int32(sess->props, "rtp.payload", 127); - - if ((str = pw_properties_get(sess->props, "rtp.media")) != NULL) - sess->media_type = strdup(str); - if ((str = pw_properties_get(sess->props, "rtp.mime")) != NULL) - sess->mime_type = strdup(str); - if ((str = pw_properties_get(sess->props, "rtp.rate")) != NULL) - sess->rate = atoi(str); - if ((str = pw_properties_get(sess->props, "rtp.channels")) != NULL) - sess->channels = atoi(str); - if ((str = pw_properties_get(sess->props, "rtp.ts-offset")) != NULL) - sess->ts_offset = atoi(str); - if ((str = pw_properties_get(sess->props, "rtp.ts-refclk")) != NULL) - sess->ts_refclk = strdup(str); - - spa_list_append(&impl->sessions, &sess->link); - return sess; - -error_free: - pw_log_warn("invalid session props"); - pw_properties_free(sess->props); - free(sess->session_name); - free(sess); - return NULL; -} - -static void session_free(struct session *sess) -{ - struct impl *impl = sess->impl; - - send_sap(impl, sess, 1); - - spa_list_remove(&sess->link); - - free(sess->session_name); - free(sess); -} - static void node_event_info(void *data, const struct pw_node_info *info) { struct node *n = data; + struct impl *impl = n->impl; const char *str; - pw_log_info("node %d added %p", n->id, n->session); - if (n->session != NULL || info == NULL) return; @@ -528,13 +1144,16 @@ static void node_event_info(void *data, const struct pw_node_info *info) if (n->info == NULL) return; - spa_debug_dict(0, info->props); + pw_log_debug("node %d changed", n->id); - if ((str = spa_dict_lookup(info->props, "sess.sap.announce")) == NULL || - !pw_properties_parse_bool(str)) - return; - - session_create(n->impl, n); + if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { + struct match_info minfo = { + .impl = impl, + .node = n, + }; + pw_conf_match_rules(str, strlen(str), NAME, n->info->props, + rule_matched, &minfo); + } } static const struct pw_node_events node_events = { @@ -630,6 +1249,8 @@ static void impl_destroy(struct impl *impl) if (impl->timer) pw_loop_destroy_source(impl->loop, impl->timer); + if (impl->sap_source) + pw_loop_destroy_source(impl->loop, impl->sap_source); if (impl->sap_fd != -1) close(impl->sap_fd); @@ -711,10 +1332,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_error("invalid sap.ip %s: %s", str, spa_strerror(res)); goto out; } + impl->cleanup_interval = pw_properties_get_uint32(impl->props, + "sap.cleanup.sec", DEFAULT_CLEANUP_SEC); if ((str = pw_properties_get(props, "source.ip")) == NULL) str = DEFAULT_SOURCE_IP; - if ((res = parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) { + if ((res = parse_address(str, port, &impl->src_addr, &impl->src_len)) < 0) { pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res)); goto out; } diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 2715da9d0..bec993d1b 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -27,7 +27,6 @@ #include - /** \page page_module_rtp_sink PipeWire Module: RTP sink * * The `rtp-sink` module creates a PipeWire sink that sends audio @@ -74,10 +73,10 @@ * context.modules = [ * { name = libpipewire-module-rtp-sink * args = { + * #local.ifname = "eth0" * #source.ip = "0.0.0.0" * #destination.ip = "224.0.0.56" * #destination.port = 46000 - * #local.ifname = "eth0" * #net.mtu = 1280 * #net.ttl = 1 * #net.loop = false @@ -105,15 +104,9 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define SAP_INTERVAL_SEC 5 -#define SAP_MIME_TYPE "application/sdp" - #define BUFFER_SIZE (1u<<20) #define BUFFER_MASK (BUFFER_SIZE-1) -#define DEFAULT_SAP_IP "224.0.0.56" -#define DEFAULT_SAP_PORT 9875 - #define DEFAULT_SESS_MEDIA "audio" #define DEFAULT_FORMAT "S16BE" @@ -135,6 +128,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define USAGE "source.ip= " \ "destination.ip= " \ + "destination.port= " \ "local.ifname= " \ "net.mtu= " \ "net.ttl= " \ @@ -214,9 +208,9 @@ struct impl { int rtp_fd; unsigned sync:1; + unsigned apple_midi:1; }; - static void stream_destroy(void *d) { struct impl *impl = d; @@ -366,7 +360,7 @@ static void stream_audio_process(struct impl *impl) } } if (!impl->sync) { - pw_log_info("sync to timestamp %u", timestamp); + pw_log_info("sync to timestamp %u ts_offset:%u", timestamp, impl->ts_offset); impl->ring.readindex = impl->ring.writeindex = timestamp; memset(impl->buffer, 0, BUFFER_SIZE); impl->sync = true; @@ -487,7 +481,6 @@ static void flush_midi_packets(struct impl *impl, struct spa_pod_sequence *seque static void send_cmd(struct impl *impl) { -// struct rtp_header header; uint8_t buffer[16]; struct iovec iov[3]; struct msghdr msg; @@ -541,8 +534,8 @@ static void stream_midi_process(void *data) if (!impl->sync) { pw_log_info("sync to timestamp %u", timestamp); impl->sync = true; - - send_cmd(impl); + if (impl->apple_midi) + send_cmd(impl); } flush_midi_packets(impl, (struct spa_pod_sequence*)pod, timestamp); @@ -962,6 +955,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, PW_KEY_NODE_GROUP); copy_props(impl, props, PW_KEY_NODE_LATENCY); copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); copy_props(impl, props, PW_KEY_MEDIA_NAME); copy_props(impl, props, PW_KEY_MEDIA_CLASS); diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 2159164ed..b6254b1ce 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -19,13 +19,15 @@ #include #include #include +#include #include #include +#include +#include #include #include -#include #include #ifdef __FreeBSD__ @@ -35,63 +37,52 @@ /** \page page_module_rtp_source PipeWire Module: RTP source * * The `rtp-source` module creates a PipeWire source that receives audio - * RTP packets. + * and midi RTP packets. * * ## Module Options * * Options specific to the behavior of this module * - * - `sap.ip = `: IP address of the SAP messages, default "224.0.0.56" - * - `sap.port = `: port of the SAP messages, default 9875 * - `local.ifname = `: interface name to use + * - `node.always-process = `: true to receive even when not running * - `sess.latency.msec = `: target network latency in milliseconds, default 100 + * - `rtp.media = `: the media type audio|midi, default audio * - `stream.props = {}`: properties to be passed to the stream * * ## General options * * Options with well-known behavior: * - * - \ref PW_KEY_NODE_NAME - * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_REMOTE_NAME + * - \ref PW_KEY_AUDIO_FORMAT + * - \ref PW_KEY_AUDIO_RATE + * - \ref PW_KEY_AUDIO_CHANNELS + * - \ref SPA_KEY_AUDIO_POSITION * - \ref PW_KEY_MEDIA_NAME * - \ref PW_KEY_MEDIA_CLASS + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL * * ## Example configuration *\code{.unparsed} * context.modules = [ * { name = libpipewire-module-rtp-source * args = { - * #sap.ip = 224.0.0.56 - * #sap.port = 9875 * #local.ifname = eth0 * sess.latency.msec = 100 - * #node.always-process = false # true to receive even when not running + * #node.always-process = false + * #rtp.media = "audio" + * #audio.format = "S16BE" + * #audio.rate = 48000 + * #audio.channels = 2 + * #audio.position = [ FL FR ] * stream.props = { * #media.class = "Audio/Source" - * #node.name = "rtp-source" + * node.name = "rtp-source" * } - * stream.rules = [ - * { matches = [ - * # any of the items in matches needs to match, if one does, - * # actions are emited. - * { # all keys must match the value. ~ in value starts regex. - * #rtp.origin = "wim 3883629975 0 IN IP4 0.0.0.0" - * #rtp.payload = "127" - * #rtp.fmt = "L16/48000/2" - * #rtp.session = "PipeWire RTP Stream on fedora" - * #rtp.ts-offset = 0 - * #rtp.ts-refclk = "private" - * } - * ] - * actions = { - * create-stream = { - * #sess.latency.msec = 100 - * #sess.ts-direct = false - * #target.object = "" - * } - * } - * } - * ] * } * } * ] @@ -105,27 +96,33 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define SAP_MIME_TYPE "application/sdp" - +#define DEFAULT_CLEANUP_SEC 60 #define ERROR_MSEC 2 -#define MAX_SESSIONS 16 -#define DEFAULT_CLEANUP_INTERVAL_SEC 90 -#define DEFAULT_SAP_IP "224.0.0.56" -#define DEFAULT_SAP_PORT 9875 #define DEFAULT_SESS_LATENCY 100 +#define DEFAULT_SOURCE_IP "224.0.0.56" + +#define DEFAULT_FORMAT "S16BE" +#define DEFAULT_RATE 48000 +#define DEFAULT_CHANNELS 2 +#define DEFAULT_POSITION "[ FL FR ]" + #define BUFFER_SIZE (1u<<22) #define BUFFER_MASK (BUFFER_SIZE-1) #define BUFFER_SIZE2 (BUFFER_SIZE>>1) #define BUFFER_MASK2 (BUFFER_SIZE2-1) -#define USAGE "sap.ip= " \ - "sap.port= " \ - "local.ifname= " \ +#define USAGE "local.ifname= " \ + "source.ip= " \ + "source.port= " \ "sess.latency.msec= " \ - "stream.props= { key=value ... } " \ - "stream.rules= " + "rtp.media= " \ + "audio.format= " \ + "audio.rate= " \ + "audio.channels= "\ + "audio.position= " \ + "stream.props= { key=value ... } " static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, @@ -146,90 +143,33 @@ struct impl { struct pw_core *core; struct spa_hook core_listener; struct spa_hook core_proxy_listener; - - struct spa_source *timer; - struct spa_source *sap_source; - - struct pw_properties *stream_props; - unsigned int do_disconnect:1; char *ifname; - char *sap_ip; bool always_process; - int sap_port; int sess_latency_msec; uint32_t cleanup_interval; - struct spa_list sessions; - uint32_t n_sessions; -}; + struct spa_source *timer; -struct format_info { - uint32_t media_subtype; - uint32_t format; - uint32_t size; - const char *mime; - const char *media_type; -}; - -static const struct format_info audio_format_info[] = { - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_U8, 1, "L8", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, - { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, -}; - -static const struct format_info *find_format_info(const char *mime) -{ - SPA_FOR_EACH_ELEMENT_VAR(audio_format_info, f) - if (spa_streq(f->mime, mime)) - return f; - return NULL; -} - -struct sdp_info { - uint16_t hash; - - char origin[128]; - char session[256]; - char channelmap[512]; - - struct sockaddr_storage sa; - socklen_t salen; - - uint16_t port; - uint8_t payload; - - const struct format_info *format_info; struct spa_audio_info info; - uint32_t rate; - uint32_t stride; - - uint32_t ts_offset; - char refclk[64]; -}; - -struct session { - struct impl *impl; - struct spa_list link; - - uint64_t timestamp; - - struct sdp_info info; - - struct spa_source *source; - + struct pw_properties *stream_props; struct pw_stream *stream; struct spa_hook stream_listener; + uint16_t src_port; + struct sockaddr_storage src_addr; + socklen_t src_len; + struct spa_source *source; + + uint32_t rate; + uint32_t stride; uint32_t expected_ssrc; uint16_t expected_seq; unsigned have_ssrc:1; unsigned have_seq:1; unsigned have_sync:1; + uint32_t ts_offset; struct spa_ringbuffer ring; uint8_t buffer[BUFFER_SIZE]; @@ -240,6 +180,7 @@ struct session { double corr; uint32_t target_buffer; float max_error; + unsigned first:1; unsigned receiving:1; unsigned direct_timestamp:1; @@ -248,47 +189,74 @@ struct session { float last_time; }; -static void session_touch(struct session *sess) +struct format_info { + uint32_t media_subtype; + uint32_t format; + uint32_t size; + const char *mime; + const char *media_type; +}; + +static uint32_t audio_get_stride(const struct spa_audio_info *info) { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - sess->timestamp = SPA_TIMESPEC_TO_NSEC(&ts); + uint32_t stride = 0; + + if (info->media_type != SPA_MEDIA_TYPE_audio || + info->media_subtype != SPA_MEDIA_SUBTYPE_raw) + return 0; + + switch (info->info.raw.format) { + case SPA_AUDIO_FORMAT_U8: + case SPA_AUDIO_FORMAT_ALAW: + case SPA_AUDIO_FORMAT_ULAW: + stride = 1; + break; + case SPA_AUDIO_FORMAT_S16_BE: + stride = 2; + break; + case SPA_AUDIO_FORMAT_S24_BE: + stride = 3; + break; + default: + break; + } + return stride * info->info.raw.channels; } -static void process_audio(struct session *sess) +static void process_audio(struct impl *impl) { struct pw_buffer *buf; struct spa_data *d; uint32_t wanted, timestamp, target_buffer, stride, maxsize; int32_t avail; - if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); return; } d = buf->buffer->datas; - stride = sess->info.stride; + stride = impl->stride; maxsize = d[0].maxsize / stride; wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; - if (sess->position && sess->direct_timestamp) { + if (impl->position && impl->direct_timestamp) { /* in direct mode, read directly from the timestamp index, * because sender and receiver are in sync, this would keep * target_buffer of samples available. */ - spa_ringbuffer_read_update(&sess->ring, - sess->position->clock.position); + spa_ringbuffer_read_update(&impl->ring, + impl->position->clock.position); } - avail = spa_ringbuffer_get_read_index(&sess->ring, ×tamp); + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); - target_buffer = sess->target_buffer; + target_buffer = impl->target_buffer; if (avail < (int32_t)wanted) { enum spa_log_level level; memset(d[0].data, 0, wanted * stride); - if (sess->have_sync) { - sess->have_sync = false; + if (impl->have_sync) { + impl->have_sync = false; level = SPA_LOG_LEVEL_WARN; } else { level = SPA_LOG_LEVEL_DEBUG; @@ -297,7 +265,7 @@ static void process_audio(struct session *sess) avail, target_buffer, wanted); } else { float error, corr; - if (sess->first) { + if (impl->first) { if ((uint32_t)avail > target_buffer) { uint32_t skip = avail - target_buffer; pw_log_debug("first: avail:%d skip:%u target:%u", @@ -305,74 +273,75 @@ static void process_audio(struct session *sess) timestamp += skip; avail = target_buffer; } - sess->first = false; + impl->first = false; } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) { pw_log_warn("overrun %u > %u", avail, target_buffer * 8); timestamp += avail - target_buffer; avail = target_buffer; } - if (!sess->direct_timestamp) { + if (!impl->direct_timestamp) { /* when not using direct timestamp and clocks are not * in sync, try to adjust our playback rate to keep the * requested target_buffer bytes in the ringbuffer */ error = (float)target_buffer - (float)avail; - error = SPA_CLAMP(error, -sess->max_error, sess->max_error); + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); - corr = spa_dll_update(&sess->dll, error); + corr = spa_dll_update(&impl->dll, error); pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, target_buffer, error, corr); - if (sess->rate_match) { - SPA_FLAG_SET(sess->rate_match->flags, + if (impl->rate_match) { + SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); - sess->rate_match->rate = 1.0f / corr; + impl->rate_match->rate = 1.0f / corr; } } - spa_ringbuffer_read_data(&sess->ring, - sess->buffer, + spa_ringbuffer_read_data(&impl->ring, + impl->buffer, BUFFER_SIZE, (timestamp * stride) & BUFFER_MASK, d[0].data, wanted * stride); timestamp += wanted; - spa_ringbuffer_read_update(&sess->ring, timestamp); + spa_ringbuffer_read_update(&impl->ring, timestamp); } d[0].chunk->size = wanted * stride; d[0].chunk->stride = stride; d[0].chunk->offset = 0; buf->size = wanted; - pw_stream_queue_buffer(sess->stream, buf); + pw_stream_queue_buffer(impl->stream, buf); } -static void receive_audio(struct session *sess, uint8_t *packet, +static void receive_audio(struct impl *impl, uint8_t *packet, uint32_t timestamp, uint32_t payload_offset, uint32_t len) { uint32_t plen = len - payload_offset; uint8_t *payload = &packet[payload_offset]; - uint32_t stride = sess->info.stride; + uint32_t stride = impl->stride; uint32_t samples = plen / stride; uint32_t write, expected_write; int32_t filled; - filled = spa_ringbuffer_get_write_index(&sess->ring, &expected_write); + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); /* we always write to timestamp + delay */ - write = timestamp + sess->target_buffer; + write = timestamp + impl->target_buffer; - if (!sess->have_sync) { - pw_log_info("sync to timestamp %u direct:%d", write, sess->direct_timestamp); + if (!impl->have_sync) { + pw_log_info("sync to timestamp %u direct:%d ts_offset:%u", + write, impl->direct_timestamp, impl->ts_offset); /* we read from timestamp, keeping target_buffer of data * in the ringbuffer. */ - sess->ring.readindex = timestamp; - sess->ring.writeindex = write; - filled = sess->target_buffer; + impl->ring.readindex = timestamp; + impl->ring.writeindex = write; + filled = impl->target_buffer; - spa_dll_init(&sess->dll); - spa_dll_set_bw(&sess->dll, SPA_DLL_BW_MIN, 128, sess->info.rate); - memset(sess->buffer, 0, BUFFER_SIZE); - sess->have_sync = true; + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + memset(impl->buffer, 0, BUFFER_SIZE); + impl->have_sync = true; } else if (expected_write != write) { pw_log_debug("unexpected write (%u != %u)", write, expected_write); @@ -381,20 +350,20 @@ static void receive_audio(struct session *sess, uint8_t *packet, if (filled + samples > BUFFER_SIZE / stride) { pw_log_debug("capture overrun %u + %u > %u", filled, samples, BUFFER_SIZE / stride); - sess->have_sync = false; + impl->have_sync = false; } else { pw_log_debug("got samples:%u", samples); - spa_ringbuffer_write_data(&sess->ring, - sess->buffer, + spa_ringbuffer_write_data(&impl->ring, + impl->buffer, BUFFER_SIZE, (write * stride) & BUFFER_MASK, payload, (samples * stride)); write += samples; - spa_ringbuffer_write_update(&sess->ring, write); + spa_ringbuffer_write_update(&impl->ring, write); } } -static void process_midi(struct session *sess) +static void process_midi(struct impl *impl) { struct pw_buffer *buf; struct spa_data *d; @@ -405,7 +374,7 @@ static void process_midi(struct session *sess) struct spa_pod *pod; struct spa_pod_control *c; - if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); return; } @@ -415,9 +384,9 @@ static void process_midi(struct session *sess) /* we always use the graph position to select events, the receiver side is * responsible for smoothing out the RTP timestamps to graph time */ - duration = sess->position->clock.duration; - if (sess->position) - timestamp = sess->position->clock.position; + duration = impl->position->clock.duration; + if (impl->position) + timestamp = impl->position->clock.position; else timestamp = 0; @@ -426,11 +395,11 @@ static void process_midi(struct session *sess) spa_pod_builder_push_sequence(&b, &f[0], 0); while (true) { - int32_t avail = spa_ringbuffer_get_read_index(&sess->ring, &read); + int32_t avail = spa_ringbuffer_get_read_index(&impl->ring, &read); if (avail <= 0) break; - ptr = SPA_PTROFF(sess->buffer, read & BUFFER_MASK2, void); + ptr = SPA_PTROFF(impl->buffer, read & BUFFER_MASK2, void); if ((pod = spa_pod_from_data(ptr, avail, 0, avail)) == NULL) goto done; @@ -441,7 +410,7 @@ static void process_midi(struct session *sess) * received packet */ SPA_POD_SEQUENCE_FOREACH((struct spa_pod_sequence*)pod, c) { /* try to render with given delay */ - uint32_t target = c->offset + sess->target_buffer; + uint32_t target = c->offset + impl->target_buffer; if (timestamp != 0) { /* skip old packets */ if (target < timestamp) @@ -460,7 +429,7 @@ static void process_midi(struct session *sess) /* we completed a sequence (one RTP packet), advance ringbuffer * and go to the next packet */ read += SPA_PTRDIFF(c, ptr); - spa_ringbuffer_read_update(&sess->ring, read); + spa_ringbuffer_read_update(&impl->ring, read); } complete: spa_pod_builder_pop(&b, &f[0]); @@ -473,7 +442,7 @@ complete: d[0].chunk->stride = 1; d[0].chunk->offset = 0; done: - pw_stream_queue_buffer(sess->stream, buf); + pw_stream_queue_buffer(impl->stream, buf); } static int parse_varlen(uint8_t *p, uint32_t avail, uint32_t *result) @@ -514,17 +483,17 @@ static int get_midi_size(uint8_t *p, uint32_t avail) return size; } -static double get_time(struct session *sess) +static double get_time(struct impl *impl) { struct timespec ts; double t; clock_gettime(CLOCK_MONOTONIC, &ts); - t = sess->position->clock.position / (double) sess->position->clock.rate.denom; - t += (SPA_TIMESPEC_TO_NSEC(&ts) - sess->position->clock.nsec) / (double)SPA_NSEC_PER_SEC; + t = impl->position->clock.position / (double) impl->position->clock.rate.denom; + t += (SPA_TIMESPEC_TO_NSEC(&ts) - impl->position->clock.nsec) / (double)SPA_NSEC_PER_SEC; return t; } -static void receive_midi(struct session *sess, uint8_t *packet, +static void receive_midi(struct impl *impl, uint8_t *packet, uint32_t timestamp, uint32_t payload_offset, uint32_t plen) { uint32_t write; @@ -536,54 +505,54 @@ static void receive_midi(struct session *sess, uint8_t *packet, uint32_t offs = payload_offset, len, end; bool first = true; - if (sess->direct_timestamp) { + if (impl->direct_timestamp) { /* in direct timestamp we attach the RTP timestamp directly on the * midi events and render them in the corresponding cycle */ - if (!sess->have_sync) { + if (!impl->have_sync) { pw_log_info("sync to timestamp %u/ direct:%d", timestamp, - sess->direct_timestamp); - sess->have_sync = true; + impl->direct_timestamp); + impl->have_sync = true; } } else { /* in non-direct timestamp mode, we relate the graph clock against * the RTP timestamps */ - double ts = timestamp / (float) sess->info.rate; - double t = get_time(sess); + double ts = timestamp / (float) impl->rate; + double t = get_time(impl); double elapsed, estimated, diff; /* the elapsed time between RTP timestamps */ - elapsed = ts - sess->last_timestamp; + elapsed = ts - impl->last_timestamp; /* for that elapsed time, our clock should have advanced * by this amount since the last estimation */ - estimated = sess->last_time + elapsed * sess->corr; + estimated = impl->last_time + elapsed * impl->corr; /* calculate the diff between estimated and current clock time in * samples */ - diff = (estimated - t) * sess->info.rate; + diff = (estimated - t) * impl->rate; /* no sync or we drifted too far, resync */ - if (!sess->have_sync || fabs(diff) > sess->target_buffer) { - sess->corr = 1.0; - spa_dll_set_bw(&sess->dll, SPA_DLL_BW_MIN, 256, sess->info.rate); + if (!impl->have_sync || fabs(diff) > impl->target_buffer) { + impl->corr = 1.0; + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256, impl->rate); pw_log_info("sync to timestamp %u/%f direct:%d", timestamp, t, - sess->direct_timestamp); - sess->have_sync = true; - sess->ring.readindex = sess->ring.writeindex; + impl->direct_timestamp); + impl->have_sync = true; + impl->ring.readindex = impl->ring.writeindex; } else { /* update our new rate correction */ - sess->corr = spa_dll_update(&sess->dll, diff); + impl->corr = spa_dll_update(&impl->dll, diff); /* our current time is now the estimated time */ t = estimated; } - pw_log_debug("%f %f %f %f", t, estimated, diff, sess->corr); + pw_log_debug("%f %f %f %f", t, estimated, diff, impl->corr); - timestamp = t * sess->info.rate; + timestamp = t * impl->rate; - sess->last_timestamp = ts; - sess->last_time = t; + impl->last_timestamp = ts; + impl->last_time = t; } - filled = spa_ringbuffer_get_write_index(&sess->ring, &write); + filled = spa_ringbuffer_get_write_index(&impl->ring, &write); if (filled > (int32_t)BUFFER_SIZE2) return; @@ -597,7 +566,7 @@ static void receive_midi(struct session *sess, uint8_t *packet, if (end > plen) return; - ptr = SPA_PTROFF(sess->buffer, write & BUFFER_MASK2, void); + ptr = SPA_PTROFF(impl->buffer, write & BUFFER_MASK2, void); /* each packet is written as a sequence of events. The offset is * the RTP timestamp */ @@ -613,7 +582,7 @@ static void receive_midi(struct session *sess, uint8_t *packet, else offs += parse_varlen(&packet[offs], end - offs, &delta); - timestamp += delta * sess->corr; + timestamp += delta * impl->corr; spa_pod_builder_control(&b, timestamp, SPA_CONTROL_Midi); size = get_midi_size(&packet[offs], end - offs); @@ -632,38 +601,38 @@ static void receive_midi(struct session *sess, uint8_t *packet, spa_pod_builder_pop(&b, &f[0]); write += b.state.offset; - spa_ringbuffer_write_update(&sess->ring, write); + spa_ringbuffer_write_update(&impl->ring, write); } static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) { - struct session *sess = data; + struct impl *impl = data; switch (id) { case SPA_IO_RateMatch: - sess->rate_match = area; + impl->rate_match = area; break; case SPA_IO_Position: - sess->position = area; + impl->position = area; break; } } static void stream_destroy(void *d) { - struct session *sess = d; - spa_hook_remove(&sess->stream_listener); - sess->stream = NULL; + struct impl *impl = d; + spa_hook_remove(&impl->stream_listener); + impl->stream = NULL; } static void stream_process(void *data) { - struct session *sess = data; - switch (sess->info.info.media_type) { + struct impl *impl = data; + switch (impl->info.media_type) { case SPA_MEDIA_TYPE_audio: - process_audio(sess); + process_audio(impl); break; case SPA_MEDIA_TYPE_application: - process_midi(sess); + process_midi(impl); break; } } @@ -671,7 +640,7 @@ static void stream_process(void *data) static void on_rtp_io(void *data, int fd, uint32_t mask) { - struct session *sess = data; + struct impl *impl = data; struct rtp_header *hdr; ssize_t len, hlen; uint8_t buffer[2048]; @@ -694,29 +663,29 @@ on_rtp_io(void *data, int fd, uint32_t mask) if (hlen > len) goto invalid_len; - if (sess->have_ssrc && sess->expected_ssrc != hdr->ssrc) + if (impl->have_ssrc && impl->expected_ssrc != hdr->ssrc) goto unexpected_ssrc; - sess->expected_ssrc = hdr->ssrc; - sess->have_ssrc = true; + impl->expected_ssrc = hdr->ssrc; + impl->have_ssrc = true; seq = ntohs(hdr->sequence_number); - if (sess->have_seq && sess->expected_seq != seq) { - pw_log_info("unexpected seq (%d != %d)", seq, sess->expected_seq); - sess->have_sync = false; + if (impl->have_seq && impl->expected_seq != seq) { + pw_log_info("unexpected seq (%d != %d)", seq, impl->expected_seq); + impl->have_sync = false; } - sess->expected_seq = seq + 1; - sess->have_seq = true; + impl->expected_seq = seq + 1; + impl->have_seq = true; - timestamp = ntohl(hdr->timestamp) - sess->info.ts_offset; + timestamp = ntohl(hdr->timestamp) - impl->ts_offset; - switch (sess->info.info.media_type) { + switch (impl->info.media_type) { case SPA_MEDIA_TYPE_audio: - receive_audio(sess, buffer, timestamp, hlen, len); + receive_audio(impl, buffer, timestamp, hlen, len); break; case SPA_MEDIA_TYPE_application: - receive_midi(sess, buffer, timestamp, hlen, len); + receive_midi(impl, buffer, timestamp, hlen, len); } - sess->receiving = true; + impl->receiving = true; } return; @@ -728,16 +697,37 @@ short_packet: return; invalid_version: pw_log_warn("invalid RTP version"); + spa_debug_mem(0, buffer, len); return; invalid_len: pw_log_warn("invalid RTP length"); return; unexpected_ssrc: pw_log_warn("unexpected SSRC (expected %u != %u)", - sess->expected_ssrc, hdr->ssrc); + impl->expected_ssrc, hdr->ssrc); return; } +static int parse_address(const char *address, uint16_t port, + struct sockaddr_storage *addr, socklen_t *len) +{ + struct sockaddr_in *sa4 = (struct sockaddr_in*)addr; + struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)addr; + + if (inet_pton(AF_INET, address, &sa4->sin_addr) > 0) { + sa4->sin_family = AF_INET; + sa4->sin_port = htons(port); + *len = sizeof(*sa4); + } else if (inet_pton(AF_INET6, address, &sa6->sin6_addr) > 0) { + sa6->sin6_family = AF_INET6; + sa6->sin6_port = htons(port); + *len = sizeof(*sa6); + } else + return -EINVAL; + + return 0; +} + static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) { int af, fd, val, res; @@ -815,60 +805,29 @@ error: return res; } -static uint32_t msec_to_samples(struct sdp_info *info, uint32_t msec) +static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) { - return msec * info->rate / 1000; + return msec * impl->rate / 1000; } -static void session_free(struct session *sess) +static int stream_start(struct impl *impl) { - if (sess->impl) { - pw_log_info("free session %s %s", sess->info.origin, sess->info.session); - sess->impl->n_sessions--; - spa_list_remove(&sess->link); - } - if (sess->stream) - pw_stream_destroy(sess->stream); - if (sess->source) - pw_loop_destroy_source(sess->impl->data_loop, sess->source); - free(sess); -} - -struct session_info { - struct session *session; - struct pw_properties *props; - bool matched; -}; - -static int rule_matched(void *data, const char *location, const char *action, - const char *str, size_t len) -{ - struct session_info *i = data; - int res = 0; - - i->matched = true; - if (spa_streq(action, "create-stream")) { - pw_properties_update_string(i->props, str, len); - } - return res; -} - -static int session_start(struct impl *impl, struct session *session) { int fd; - if (session->source) - return 0; + + if (impl->source != NULL) + return 0; pw_log_info("starting RTP listener"); - if ((fd = make_socket((const struct sockaddr *)&session->info.sa, - session->info.salen, impl->ifname)) < 0) { + if ((fd = make_socket((const struct sockaddr *)&impl->src_addr, + impl->src_len, impl->ifname)) < 0) { pw_log_error("failed to create socket: %m"); return fd; } - session->source = pw_loop_add_io(impl->data_loop, fd, - SPA_IO_IN, true, on_rtp_io, session); - if (session->source == NULL) { + impl->source = pw_loop_add_io(impl->data_loop, fd, + SPA_IO_IN, true, on_rtp_io, impl); + if (impl->source == NULL) { pw_log_error("can't create io source: %m"); close(fd); return -errno; @@ -876,25 +835,21 @@ static int session_start(struct impl *impl, struct session *session) { return 0; } -static void session_stop(struct impl *impl, struct session *session) { - if (!session->source) +static void stream_stop(struct impl *impl) +{ + if (!impl->source) return; pw_log_info("stopping RTP listener"); - pw_loop_destroy_source( - session->impl->data_loop, - session->source - ); - - session->source = NULL; + pw_loop_destroy_source(impl->data_loop, impl->source); + impl->source = NULL; } static void on_stream_state_changed(void *d, enum pw_stream_state old, enum pw_stream_state state, const char *error) { - struct session *sess = d; - struct impl *impl = sess->impl; + struct impl *impl = d; switch (state) { case PW_STREAM_STATE_UNCONNECTED: @@ -905,12 +860,12 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, pw_log_error("stream error: %s", error); break; case PW_STREAM_STATE_STREAMING: - if ((errno = -session_start(impl, sess)) < 0) + if ((errno = -stream_start(impl)) < 0) pw_log_error("failed to start RTP stream: %m"); break; case PW_STREAM_STATE_PAUSED: if (!impl->always_process) - session_stop(impl, sess); + stream_stop(impl); break; default: break; @@ -925,28 +880,16 @@ static const struct pw_stream_events out_stream_events = { .process = stream_process }; -static int session_new(struct impl *impl, struct sdp_info *info) +static int setup_stream(struct impl *impl) { - struct session *session; const struct spa_pod *params[1]; struct spa_pod_builder b; uint32_t n_params; uint8_t buffer[1024]; struct pw_properties *props; - int res, sess_latency_msec; - const char *str; + int res; - if (impl->n_sessions >= MAX_SESSIONS) { - pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); - return -EMFILE; - } - - session = calloc(1, sizeof(struct session)); - if (session == NULL) - return -errno; - - session->info = *info; - session->first = true; + impl->first = true; props = pw_properties_copy(impl->stream_props); if (props == NULL) { @@ -954,79 +897,29 @@ static int session_new(struct impl *impl, struct sdp_info *info) goto error; } - pw_properties_set(props, "rtp.origin", info->origin); - pw_properties_setf(props, "rtp.payload", "%u", info->payload); - pw_properties_setf(props, "rtp.fmt", "%s/%u/%u", info->format_info->mime, - info->rate, info->info.info.raw.channels); - if (info->session[0]) { - pw_properties_set(props, "rtp.session", info->session); - pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", - info->session); - pw_properties_setf(props, PW_KEY_NODE_NAME, "%s", - info->session); - } else { - pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream"); - } - pw_properties_setf(props, "rtp.ts-offset", "%u", info->ts_offset); - pw_properties_set(props, "rtp.ts-refclk", info->refclk); + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + impl->corr = 1.0; - if ((str = pw_properties_get(impl->props, "stream.rules")) != NULL) { - struct session_info sinfo = { - .session = session, - .props = props, - }; - pw_conf_match_rules(str, strlen(str), NAME, &props->dict, - rule_matched, &sinfo); - - if (!sinfo.matched) { - res = 0; - pw_log_info("session '%s' was not matched", info->session); - goto error; - } - } - session->direct_timestamp = pw_properties_get_bool(props, "sess.ts-direct", false); - - pw_log_info("new session %s %s direct:%d", info->origin, info->session, - session->direct_timestamp); - - sess_latency_msec = pw_properties_get_uint32(props, - "sess.latency.msec", impl->sess_latency_msec); - - session->target_buffer = msec_to_samples(info, sess_latency_msec); - session->max_error = msec_to_samples(info, ERROR_MSEC); - - pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", info->rate); - pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", - session->target_buffer / 2, info->rate); - - spa_dll_init(&session->dll); - spa_dll_set_bw(&session->dll, SPA_DLL_BW_MIN, 128, info->rate); - session->corr = 1.0; - - if (info->channelmap[0]) { - pw_properties_set(props, PW_KEY_NODE_CHANNELNAMES, info->channelmap); - pw_log_info("channelmap: %s", info->channelmap); - } - - session->stream = pw_stream_new(impl->core, + impl->stream = pw_stream_new(impl->core, "rtp-source playback", props); - if (session->stream == NULL) { + if (impl->stream == NULL) { res = -errno; pw_log_error("can't create stream: %m"); goto error; } - pw_stream_add_listener(session->stream, - &session->stream_listener, - &out_stream_events, session); + pw_stream_add_listener(impl->stream, + &impl->stream_listener, + &out_stream_events, impl); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); - switch (info->info.media_type) { + switch (impl->info.media_type) { case SPA_MEDIA_TYPE_audio: params[n_params++] = spa_format_audio_build(&b, - SPA_PARAM_EnumFormat, &info->info); + SPA_PARAM_EnumFormat, &impl->info); break; case SPA_MEDIA_TYPE_application: params[n_params++] = spa_pod_builder_add_object(&b, @@ -1038,7 +931,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) return -EINVAL; } - if ((res = pw_stream_connect(session->stream, + if ((res = pw_stream_connect(impl->stream, PW_DIRECTION_OUTPUT, PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | @@ -1050,408 +943,25 @@ static int session_new(struct impl *impl, struct sdp_info *info) } if (impl->always_process && - (res = session_start(impl, session)) < 0) + (res = stream_start(impl)) < 0) goto error; - session_touch(session); - - session->impl = impl; - spa_list_append(&impl->sessions, &session->link); - impl->n_sessions++; - return 0; error: - session_free(session); return res; } -static struct session *session_find(struct impl *impl, struct sdp_info *info) -{ - struct session *sess; - spa_list_for_each(sess, &impl->sessions, link) { - if (info->hash == sess->info.hash && - spa_streq(info->origin, sess->info.origin)) - return sess; - } - return NULL; -} - -static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) -{ - int res; - - c[strcspn(c, "/")] = 0; - if (spa_strstartswith(c, "c=IN IP4 ")) { - struct sockaddr_in *sa = (struct sockaddr_in*) &info->sa; - - c += strlen("c=IN IP4 "); - if (inet_pton(AF_INET, c, &sa->sin_addr) <= 0) { - res = -errno; - pw_log_warn("inet_pton(%s) failed: %m", c); - goto error; - } - sa->sin_family = AF_INET; - info->salen = sizeof(struct sockaddr_in); - } - else if (spa_strstartswith(c, "c=IN IP6 ")) { - struct sockaddr_in6 *sa = (struct sockaddr_in6*) &info->sa; - - c += strlen("c=IN IP6 "); - if (inet_pton(AF_INET6, c, &sa->sin6_addr) <= 0) { - res = -errno; - pw_log_warn("inet_pton(%s) failed: %m", c); - goto error; - } - - sa->sin6_family = AF_INET6; - info->salen = sizeof(struct sockaddr_in6); - } else - return -EINVAL; - - - res= 0; -error: - return res; -} - -static int parse_sdp_m(struct impl *impl, char *c, struct sdp_info *info) -{ - int port, payload; - - if (!spa_strstartswith(c, "m=audio ")) - return -EINVAL; - - c += strlen("m=audio "); - if (sscanf(c, "%i RTP/AVP %i", &port, &payload) != 2) - return -EINVAL; - - if (port <= 0 || port > 0xFFFF) - return -EINVAL; - - if (payload < 0 || payload > 127) - return -EINVAL; - - info->port = (uint16_t) port; - info->payload = (uint8_t) payload; - - return 0; -} - -// some AES67 devices have channelmap encoded in i=* -// if `i` record is found, it matches the template -// and channel count matches, name the channels respectively -// `i=2 channels: 01, 08` is the format -static int parse_sdp_i(struct impl *impl, char *c, struct sdp_info *info) -{ - if (!strstr(c, " channels: ")) { - return 0; - } - - c += strlen("i="); - c[strcspn(c, " ")] = '\0'; - - uint32_t channels; - if (sscanf(c, "%u", &channels) != 1 || channels <= 0 || channels > SPA_AUDIO_MAX_CHANNELS) - return 0; - - c += strcspn(c, "\0"); - c += strlen(" channels: "); - - strncpy(info->channelmap, c, sizeof(info->channelmap) - 1); - - return 0; -} - -static int parse_sdp_a_rtpmap(struct impl *impl, char *c, struct sdp_info *info) -{ - int payload, len, rate, channels; - - if (!spa_strstartswith(c, "a=rtpmap:")) - return 0; - - c += strlen("a=rtpmap:"); - - if (sscanf(c, "%i %n", &payload, &len) != 1) - return -EINVAL; - - if (payload < 0 || payload > 127) - return -EINVAL; - - if (payload != info->payload) - return 0; - - c += len; - c[strcspn(c, "/")] = 0; - - info->format_info = find_format_info(c); - if (info->format_info == NULL) - return -EINVAL; - - info->stride = info->format_info->size; - - info->info.media_subtype = info->format_info->media_subtype; - - c += strlen(c) + 1; - - switch (info->info.media_subtype) { - case SPA_MEDIA_SUBTYPE_raw: - info->info.media_type = SPA_MEDIA_TYPE_audio; - info->info.info.raw.format = info->format_info->format; - if (sscanf(c, "%u/%u", &rate, &channels) == 2) { - info->info.info.raw.channels = channels; - } else if (sscanf(c, "%u", &rate) == 1) { - info->info.info.raw.channels = 1; - } else - return -EINVAL; - - info->info.info.raw.rate = rate; - - pw_log_debug("rate: %d, ch: %d", rate, channels); - - if (channels == 1) { - info->info.info.raw.position[0] = SPA_AUDIO_CHANNEL_MONO; - } else if (channels == 2) { - info->info.info.raw.position[0] = SPA_AUDIO_CHANNEL_FL; - info->info.info.raw.position[1] = SPA_AUDIO_CHANNEL_FR; - } - info->stride *= channels; - info->rate = rate; - break; - case SPA_MEDIA_SUBTYPE_control: - info->info.media_type = SPA_MEDIA_TYPE_application; - if (sscanf(c, "%u", &rate) != 1) - return -EINVAL; - info->rate = rate; - break; - } - return 0; -} - -static int parse_sdp_a_mediaclk(struct impl *impl, char *c, struct sdp_info *info) -{ - if (!spa_strstartswith(c, "a=mediaclk:")) - return 0; - - c += strlen("a=mediaclk:"); - - if (spa_strstartswith(c, "direct=")) { - int offset; - c += strlen("direct="); - if (sscanf(c, "%i", &offset) != 1) - return -EINVAL; - info->ts_offset = offset; - } else if (spa_strstartswith(c, "sender")) { - info->ts_offset = 0; - } - return 0; -} - -static int parse_sdp_a_ts_refclk(struct impl *impl, char *c, struct sdp_info *info) -{ - if (!spa_strstartswith(c, "a=ts-refclk:")) - return 0; - - c += strlen("a=ts-refclk:"); - snprintf(info->refclk, sizeof(info->refclk), "%s", c); - return 0; -} - -static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) -{ - char *s = sdp; - int count = 0, res = 0; - size_t l; - - while (*s) { - if ((l = strcspn(s, "\r\n")) < 2) - goto too_short; - - s[l] = 0; - pw_log_debug("%d: %s", count, s); - - if (count++ == 0 && strcmp(s, "v=0") != 0) - goto invalid_version; - - if (spa_strstartswith(s, "o=")) - snprintf(info->origin, sizeof(info->origin), "%s", &s[2]); - else if (spa_strstartswith(s, "s=")) - snprintf(info->session, sizeof(info->session), "%s", &s[2]); - else if (spa_strstartswith(s, "c=")) - res = parse_sdp_c(impl, s, info); - else if (spa_strstartswith(s, "m=")) - res = parse_sdp_m(impl, s, info); - else if (spa_strstartswith(s, "a=rtpmap:")) - res = parse_sdp_a_rtpmap(impl, s, info); - else if (spa_strstartswith(s, "a=mediaclk:")) - res = parse_sdp_a_mediaclk(impl, s, info); - else if (spa_strstartswith(s, "a=ts-refclk:")) - res = parse_sdp_a_ts_refclk(impl, s, info); - else if (spa_strstartswith(s, "i=")) - res = parse_sdp_i(impl, s, info); - - if (res < 0) - goto error; - s += l + 1; - while (isspace(*s)) - s++; - } - if (((struct sockaddr*) &info->sa)->sa_family == AF_INET) - ((struct sockaddr_in*) &info->sa)->sin_port = htons(info->port); - else - ((struct sockaddr_in6*) &info->sa)->sin6_port = htons(info->port); - - return 0; -too_short: - pw_log_warn("SDP: line starting with `%.6s...' too short", s); - return -EINVAL; -invalid_version: - pw_log_warn("SDP: invalid first version line `%*s'", (int)l, s); - return -EINVAL; -error: - pw_log_warn("SDP: error: %s", spa_strerror(res)); - return res; -} - -static int parse_sap(struct impl *impl, void *data, size_t len) -{ - struct sap_header *header; - char *mime, *sdp; - struct sdp_info info; - struct session *sess; - int res; - size_t offs; - bool bye; - - if (len < 8) - return -EINVAL; - - header = (struct sap_header*) data; - if (header->v != 1) - return -EINVAL; - - if (header->e) - return -ENOTSUP; - if (header->c) - return -ENOTSUP; - - offs = header->a ? 12 : 8; - offs += header->auth_len * 4; - if (len <= offs) - return -EINVAL; - - mime = SPA_PTROFF(data, offs, char); - if (spa_strstartswith(mime, "v=0")) { - sdp = mime; - mime = SAP_MIME_TYPE; - } else if (spa_streq(mime, SAP_MIME_TYPE)) - sdp = SPA_PTROFF(mime, strlen(mime)+1, char); - else - return -EINVAL; - - pw_log_debug("got sap: %s %s", mime, sdp); - - spa_zero(info); - if ((res = parse_sdp(impl, sdp, &info)) < 0) - return res; - - bye = header->t; - - sess = session_find(impl, &info); - if (sess == NULL) { - if (!bye) - session_new(impl, &info); - } else { - if (bye) - session_free(sess); - else - session_touch(sess); - } - return res; -} - -static void -on_sap_io(void *data, int fd, uint32_t mask) -{ - struct impl *impl = data; - - if (mask & SPA_IO_IN) { - uint8_t buffer[2048]; - ssize_t len; - - if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) { - pw_log_warn("recv error: %m"); - return; - } - if ((size_t)len >= sizeof(buffer)) - return; - - buffer[len] = 0; - parse_sap(impl, buffer, len); - } -} - -static int start_sap_listener(struct impl *impl) -{ - struct sockaddr_in sa4; - struct sockaddr_in6 sa6; - struct sockaddr *sa; - socklen_t salen; - int fd, res; - - if (inet_pton(AF_INET, impl->sap_ip, &sa4.sin_addr) > 0) { - sa4.sin_family = AF_INET; - sa4.sin_port = htons(impl->sap_port); - sa = (struct sockaddr*) &sa4; - salen = sizeof(sa4); - } else if (inet_pton(AF_INET6, impl->sap_ip, &sa6.sin6_addr) > 0) { - sa6.sin6_family = AF_INET6; - sa6.sin6_port = htons(impl->sap_port); - sa = (struct sockaddr*) &sa6; - salen = sizeof(sa6); - } else - return -EINVAL; - - if ((fd = make_socket(sa, salen, impl->ifname)) < 0) - return fd; - - pw_log_info("starting SAP listener"); - impl->sap_source = pw_loop_add_io(impl->loop, fd, - SPA_IO_IN, true, on_sap_io, impl); - if (impl->sap_source == NULL) { - res = -errno; - goto error; - } - return 0; -error: - close(fd); - return res; - -} - static void on_timer_event(void *data, uint64_t expirations) { struct impl *impl = data; - struct timespec now; - struct session *sess, *tmp; - uint64_t timestamp, interval; - clock_gettime(CLOCK_MONOTONIC, &now); - timestamp = SPA_TIMESPEC_TO_NSEC(&now); - interval = impl->cleanup_interval * SPA_NSEC_PER_SEC; - - spa_list_for_each_safe(sess, tmp, &impl->sessions, link) { - if (sess->timestamp + interval < timestamp) { - pw_log_debug("More than %lu elapsed from last advertisement at %lu", - interval, sess->timestamp); - if (!sess->receiving) { - pw_log_info("SAP timeout, closing inactive RTP source"); - session_free(sess); - } else { - pw_log_info("SAP timeout, keeping active RTP source"); - } - } - sess->receiving = false; + if (!impl->receiving) { + pw_log_info("timeout, closing inactive RTP source"); + pw_impl_module_schedule_destroy(impl->module); + } else { + pw_log_debug("timeout, keeping active RTP source"); } + impl->receiving = false; } static void core_destroy(void *d) @@ -1468,15 +978,14 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { - struct session *sess; - spa_list_consume(sess, &impl->sessions, link) - session_free(sess); + if (impl->stream) + pw_stream_destroy(impl->stream); + if (impl->source) + pw_loop_destroy_source(impl->data_loop, impl->source); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->sap_source) - pw_loop_destroy_source(impl->loop, impl->sap_source); if (impl->timer) pw_loop_destroy_source(impl->loop, impl->timer); @@ -1484,7 +993,6 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->props); free(impl->ifname); - free(impl->sap_ip); free(impl); } @@ -1516,13 +1024,82 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; +static inline uint32_t format_from_name(const char *name, size_t len) +{ + int i; + for (i = 0; spa_type_audio_format[i].name; i++) { + if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) + return spa_type_audio_format[i].type; + } + return SPA_AUDIO_FORMAT_UNKNOWN; +} + +static uint32_t channel_from_name(const char *name) +{ + int i; + for (i = 0; spa_type_audio_channel[i].name; i++) { + if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) + return spa_type_audio_channel[i].type; + } + return SPA_AUDIO_CHANNEL_UNKNOWN; +} + +static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) +{ + struct spa_json it[2]; + char v[256]; + + spa_json_init(&it[0], val, len); + if (spa_json_enter_array(&it[0], &it[1]) <= 0) + spa_json_init(&it[1], val, len); + + info->channels = 0; + while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && + info->channels < SPA_AUDIO_MAX_CHANNELS) { + info->position[info->channels++] = channel_from_name(v); + } +} + +static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) +{ + const char *str; + + spa_zero(*info); + if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) + str = DEFAULT_FORMAT; + info->format = format_from_name(str, strlen(str)); + + info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); + if (info->rate == 0) + info->rate = DEFAULT_RATE; + + info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); + info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); + if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) + parse_position(info, str, strlen(str)); + if (info->channels == 0) + parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); +} + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); + uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); + uint32_t pid = getpid(); struct impl *impl; const char *str; struct timespec value, interval; + struct pw_properties *props, *stream_props; int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -1531,14 +1108,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (impl == NULL) return -errno; - spa_list_init(&impl->sessions); - if (args == NULL) args = ""; - impl->props = pw_properties_new_string(args); - impl->stream_props = pw_properties_new(NULL, NULL); - if (impl->props == NULL || impl->stream_props == NULL) { + props = impl->props = pw_properties_new_string(args); + stream_props = impl->stream_props = pw_properties_new(NULL, NULL); + if (props == NULL || stream_props == NULL) { res = -errno; pw_log_error( "can't create properties: %m"); goto out; @@ -1549,31 +1124,124 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->loop = pw_context_get_main_loop(context); impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context)); - if (pw_properties_get(impl->stream_props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(impl->stream_props, PW_KEY_NODE_VIRTUAL, "true"); - if (pw_properties_get(impl->stream_props, PW_KEY_NODE_NETWORK) == NULL) - pw_properties_set(impl->stream_props, PW_KEY_NODE_NETWORK, "true"); + if (pw_properties_get(stream_props, PW_KEY_NODE_VIRTUAL) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_VIRTUAL, "true"); + if (pw_properties_get(stream_props, PW_KEY_NODE_NETWORK) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_NETWORK, "true"); - if ((str = pw_properties_get(impl->props, "stream.props")) != NULL) - pw_properties_update_string(impl->stream_props, str, strlen(str)); + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp-source-%u-%u", pid, id); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, + pw_properties_get(props, PW_KEY_NODE_NAME)); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Receiver Stream"); - str = pw_properties_get(impl->props, "local.ifname"); + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_AUDIO_FORMAT); + copy_props(impl, props, PW_KEY_AUDIO_RATE); + copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + if ((str = pw_properties_get(stream_props, "rtp.media")) != NULL) { + if (spa_streq(str, "audio")) { + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + } + else if (spa_streq(str, "midi")) { + impl->info.media_type = SPA_MEDIA_TYPE_application; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; + } + else { + pw_log_error("unsupported media type:%s", str); + res = -EINVAL; + goto out; + } + } + + switch (impl->info.media_type) { + case SPA_MEDIA_TYPE_audio: + parse_audio_info(stream_props, &impl->info.info.raw); + impl->stride = audio_get_stride(&impl->info); + if (impl->stride == 0) { + pw_log_error("unsupported audio format:%d channels:%d", + impl->info.info.raw.format, impl->info.info.raw.channels); + res = -EINVAL; + goto out; + } + impl->rate = impl->info.info.raw.rate; + break; + case SPA_MEDIA_TYPE_application: + pw_properties_set(stream_props, PW_KEY_FORMAT_DSP, "8 bit raw midi"); + impl->stride = 1; + impl->rate = 48000; + break; + default: + spa_assert_not_reached(); + break; + } + + str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; - impl->always_process = pw_properties_get_bool(impl->props, PW_KEY_NODE_ALWAYS_PROCESS, false); + impl->src_port = pw_properties_get_uint32(stream_props, "source.port", 0); + if (impl->src_port == 0) { + pw_log_error("invalid source.port"); + goto out; + } + if ((str = pw_properties_get(stream_props, "source.ip")) == NULL || + (res = parse_address(str, impl->src_port, &impl->src_addr, &impl->src_len)) < 0) { + pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res)); + goto out; + } - str = pw_properties_get(impl->props, "sap.ip"); - impl->sap_ip = strdup(str ? str : DEFAULT_SAP_IP); - impl->sap_port = pw_properties_get_uint32(impl->props, - "sap.port", DEFAULT_SAP_PORT); - impl->sess_latency_msec = pw_properties_get_uint32(impl->props, + impl->always_process = pw_properties_get_bool(stream_props, + PW_KEY_NODE_ALWAYS_PROCESS, true); + + impl->cleanup_interval = pw_properties_get_uint32(props, + "cleanup.sec", DEFAULT_CLEANUP_SEC); + + if ((str = pw_properties_get(props, "sess.name")) != NULL) { + pw_properties_set(stream_props, "rtp.session", str); + if (pw_properties_get(stream_props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(stream_props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", str); + if (pw_properties_get(stream_props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(stream_props, PW_KEY_NODE_NAME, "%s", str); + } else { + if (pw_properties_get(stream_props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_set(stream_props, PW_KEY_MEDIA_NAME, "RTP Stream"); + } + + impl->ts_offset = pw_properties_get_int64(stream_props, "sess.ts-offset", 0); + pw_properties_setf(stream_props, "rtp.ts-offset", "%u", impl->ts_offset); + + impl->direct_timestamp = pw_properties_get_bool(stream_props, + "sess.ts-direct", false); + + impl->sess_latency_msec = pw_properties_get_uint32(stream_props, "sess.latency.msec", DEFAULT_SESS_LATENCY); - impl->cleanup_interval = pw_properties_get_uint32(impl->props, - "sap.interval.sec", DEFAULT_CLEANUP_INTERVAL_SEC); + impl->target_buffer = msec_to_samples(impl, impl->sess_latency_msec); + impl->max_error = msec_to_samples(impl, ERROR_MSEC); + + pw_properties_setf(stream_props, PW_KEY_NODE_RATE, "1/%d", impl->rate); + pw_properties_setf(stream_props, PW_KEY_NODE_LATENCY, "%d/%d", + impl->target_buffer / 2, impl->rate); impl->core = pw_context_get_object(impl->module_context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { - str = pw_properties_get(impl->props, PW_KEY_REMOTE_NAME); + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); impl->core = pw_context_connect(impl->module_context, pw_properties_new( PW_KEY_REMOTE_NAME, str, @@ -1600,13 +1268,13 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_error("can't create timer source: %m"); goto out; } - value.tv_sec = 0; - value.tv_nsec = 1; + value.tv_sec = impl->cleanup_interval; + value.tv_nsec = 0; interval.tv_sec = impl->cleanup_interval; interval.tv_nsec = 0; pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); - if ((res = start_sap_listener(impl)) < 0) + if ((res = setup_stream(impl)) < 0) goto out; pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);