diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 377cc4109..5c3d1f05f 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -156,6 +157,9 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic +#define DEFAULT_IGMP_CHECK_INTERVAL_SEC 5 +#define DEFAULT_IGMP_DEADLINE_SEC 30 + #define DEFAULT_CLEANUP_SEC 60 #define DEFAULT_SOURCE_IP "224.0.0.56" @@ -180,6 +184,23 @@ static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; +struct igmp_recovery { + struct pw_timer timer; + int socket_fd; + struct sockaddr_storage mcast_addr; + socklen_t mcast_len; + uint32_t if_index; + bool is_ipv6; + /* This is the interval the recovery timer runs at. The timer + * checks at each interval if recovery is required. This value + * is defined by the igmp.check.interval.sec property. */ + uint32_t check_interval; + /* This is the deadline for packets to arrive. If the deadline + * is exceeded, an IGMP recovery is attempted. This value is + * defined by the igmp.deadline.sec property. */ + uint32_t deadline; +}; + struct impl { struct pw_impl_module *module; struct spa_hook module_listener; @@ -201,6 +222,15 @@ struct impl { bool always_process; uint32_t cleanup_interval; + /* IGMP recovery (triggers when no RTP packets are + * received after the recovery deadline is reached) */ + struct igmp_recovery igmp_recovery; + + /* Monotonic timestamp of the last time a packet was + * received. This is accessed with atomic accessors + * to avoid race conditions. */ + uint64_t last_packet_time; + struct pw_timer standby_timer; /* This timer is used when the first stream_start() call fails because * of an ENODEV error (see the stream_start() code for details) */ @@ -297,7 +327,138 @@ short_packet: return; } -static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) +static int rejoin_igmp_group(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + /* IMPORTANT: This must be run from within the data loop. */ + + int res; + struct impl *impl = user_data; + uint64_t current_time; + + /* Force IGMP membership refresh by leaving the group first, then rejoin */ + if (impl->igmp_recovery.is_ipv6) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr; + mr6.ipv6mr_interface = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, + &mr6, sizeof(mr6)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv6 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv6 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv6 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mr6, sizeof(mr6)); + if (res < 0) { + pw_log_warn("failed to re-join IPv6 multicast group: %m"); + } else { + pw_log_info("re-joined IPv6 multicast group successfully"); + } + } else { + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr; + mr4.imr_ifindex = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv4 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv4 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv4 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (res < 0) { + pw_log_warn("failed to re-join IPv4 multicast group: %m"); + } else { + pw_log_info("re-joined IPv4 multicast group successfully"); + } + } + + current_time = rtp_stream_get_nsec(impl->stream); + SPA_ATOMIC_STORE(impl->last_packet_time, current_time); + + return res; +} + +static void on_igmp_recovery_timer_event(void *data) +{ + int res; + struct impl *impl = data; + char addr[128]; + uint64_t current_time, elapsed_seconds, last_packet_time; + + /* Only attempt recovery if we have a valid socket and multicast address */ + if (SPA_UNLIKELY(impl->igmp_recovery.socket_fd < 0)) { + pw_log_trace("no socket, skipping IGMP recovery"); + goto finish; + } + + /* This check if performed even if standby = false or + * receiving != STATE_RECEIVING , because the very reason + * for these states may be that the receiver socket was + * silently kicked out of the IGMP group (which causes data + * to no longer arrive, thus leading to these states). */ + + current_time = rtp_stream_get_nsec(impl->stream); + last_packet_time = SPA_ATOMIC_LOAD(impl->last_packet_time); + elapsed_seconds = (current_time - last_packet_time) / SPA_NSEC_PER_SEC; + + /* Only trigger recovery if enough time has elapsed since last packet */ + if (elapsed_seconds < impl->igmp_recovery.deadline) { + pw_log_trace("IGMP recovery check: %" PRIu64 " seconds elapsed, " + "need %" PRIu32 " seconds", elapsed_seconds, + impl->igmp_recovery.deadline); + goto finish; + } + + pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL); + pw_log_info("starting IGMP recovery for %s", addr); + + /* Run the actual recovery in the data loop, since recovery involves + * rejoining the socket to the IGMP group. By running this in the + * data loop, race conditions due to stray packets causing an on_rtp_io() + * invocation at the same time when the IGMP group rejoining takes place + * is avoided, since on_rtp_io() too runs in the data loop. + * This is a blocking call to make sure the rejoin attempt was fully + * done by the time this callback ends. (rejoin_igmp_group() does not + * do work that takes a long time to finish. )*/ + res = pw_loop_locked(impl->data_loop, rejoin_igmp_group, 1, NULL, 0, impl); + + if (SPA_LIKELY(res == 0)) { + pw_log_info("IGMP recovery for %s finished", addr); + } else { + pw_log_error("error while finishing IGMP recovery for %s: %s", + addr, spa_strerror(res)); + } + +finish: + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + &impl->igmp_recovery.timer.timeout, + impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + +static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname, + struct igmp_recovery *igmp_recovery) { int af, fd, val, res; struct ifreq req; @@ -377,6 +538,16 @@ static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) goto error; } + /* Store multicast info for recovery */ + igmp_recovery->socket_fd = fd; + igmp_recovery->mcast_addr = ba; + igmp_recovery->mcast_len = salen; + igmp_recovery->if_index = req.ifr_ifindex; + igmp_recovery->is_ipv6 = (af == AF_INET6); + pw_log_debug("stored %s multicast info: socket_fd=%d, " + "if_index=%d", igmp_recovery->is_ipv6 ? + "IPv6" : "IPv4", fd, req.ifr_ifindex); + if (bind(fd, (struct sockaddr*)&ba, salen) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -425,7 +596,8 @@ static void stream_open_connection(void *data, int *result) pw_log_info("starting RTP listener"); if ((fd = make_socket((const struct sockaddr *)&impl->src_addr, - impl->src_len, impl->ifname)) < 0) { + impl->src_len, impl->ifname, + &(impl->igmp_recovery))) < 0) { /* If make_socket() tries to create a socket and join to a multicast * group while the network interfaces are not ready yet to do so * (usually because a network manager component is still setting up @@ -475,6 +647,13 @@ static void stream_open_connection(void *data, int *result) goto finish; } + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + NULL, impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl)) < 0) { + pw_log_error("can't add timer: %s", spa_strerror(res)); + goto finish; + } + finish: if (res != 0) { pw_log_error("failed to start RTP stream: %s", spa_strerror(res)); @@ -498,6 +677,7 @@ static void stream_close_connection(void *data, int *result) pw_log_info("stopping RTP listener"); pw_timer_queue_cancel(&impl->stream_start_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); pw_loop_destroy_source(impl->data_loop, impl->source); impl->source = NULL; @@ -636,6 +816,7 @@ static void impl_destroy(struct impl *impl) pw_timer_queue_cancel(&impl->standby_timer); pw_timer_queue_cancel(&impl->stream_start_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); @@ -803,6 +984,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->cleanup_interval = pw_properties_get_uint32(stream_props, "cleanup.sec", DEFAULT_CLEANUP_SEC); + impl->igmp_recovery.check_interval = SPA_MAX(pw_properties_get_uint32(stream_props, + "igmp.check.interval.sec", + DEFAULT_IGMP_CHECK_INTERVAL_SEC), 1u); + pw_log_info("using IGMP check interval of %" PRIu32 " second(s)", + impl->igmp_recovery.check_interval); + + impl->igmp_recovery.deadline = SPA_MAX(pw_properties_get_uint32(stream_props, + "igmp.deadline.sec", DEFAULT_IGMP_DEADLINE_SEC), 5u); + pw_log_info("using IGMP deadline of %" PRIu32 " second(s)", + impl->igmp_recovery.deadline); + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME);