module-rtp-source: implement IGMP recovery for multicast subscription loss

Add IGMP recovery mechanism that monitors RTP packet reception and
triggers multicast group refresh when no packets are received if
a deadline is reached. The deadline is configurable via a new stream
property "igmp.deadline.sec" (in seconds), with the default value
being 30 seconds (and a minimum of 5 seconds).

A timer checks regularly if the deadline was reached. That timer's
interval is set by the igmp.check.interval.sec property (in seconds),
with the default value being 5 seconds (and a minimum of 1 second).

When the deadline is reached, the mechanism performs IGMP leave/rejoin
operations to refresh multicast group membership. This ensures RTP
data continues to be received when network conditions cause IGMP
membership to expire or become stale due to router timeouts or
network issues.
This commit is contained in:
Carlos Rafael Giani 2025-10-23 20:16:14 +02:00
parent 955c9ae837
commit 1096d63468

View file

@ -15,6 +15,7 @@
#include <net/if.h>
#include <ctype.h>
#include <spa/utils/atomic.h>
#include <spa/utils/hook.h>
#include <spa/utils/result.h>
#include <spa/utils/ringbuffer.h>
@ -156,6 +157,9 @@
PW_LOG_TOPIC(mod_topic, "mod." NAME);
#define PW_LOG_TOPIC_DEFAULT mod_topic
#define DEFAULT_IGMP_CHECK_INTERVAL_SEC 5
#define DEFAULT_IGMP_DEADLINE_SEC 30
#define DEFAULT_CLEANUP_SEC 60
#define DEFAULT_SOURCE_IP "224.0.0.56"
@ -180,6 +184,23 @@ static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
struct igmp_recovery {
struct pw_timer timer;
int socket_fd;
struct sockaddr_storage mcast_addr;
socklen_t mcast_len;
uint32_t if_index;
bool is_ipv6;
/* This is the interval the recovery timer runs at. The timer
* checks at each interval if recovery is required. This value
* is defined by the igmp.check.interval.sec property. */
uint32_t check_interval;
/* This is the deadline for packets to arrive. If the deadline
* is exceeded, an IGMP recovery is attempted. This value is
* defined by the igmp.deadline.sec property. */
uint32_t deadline;
};
struct impl {
struct pw_impl_module *module;
struct spa_hook module_listener;
@ -201,6 +222,15 @@ struct impl {
bool always_process;
uint32_t cleanup_interval;
/* IGMP recovery (triggers when no RTP packets are
* received after the recovery deadline is reached) */
struct igmp_recovery igmp_recovery;
/* Monotonic timestamp of the last time a packet was
* received. This is accessed with atomic accessors
* to avoid race conditions. */
uint64_t last_packet_time;
struct pw_timer standby_timer;
/* This timer is used when the first stream_start() call fails because
* of an ENODEV error (see the stream_start() code for details) */
@ -297,7 +327,138 @@ short_packet:
return;
}
static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname)
static int rejoin_igmp_group(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
/* IMPORTANT: This must be run from within the data loop. */
int res;
struct impl *impl = user_data;
uint64_t current_time;
/* Force IGMP membership refresh by leaving the group first, then rejoin */
if (impl->igmp_recovery.is_ipv6) {
struct ipv6_mreq mr6;
memset(&mr6, 0, sizeof(mr6));
mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr;
mr6.ipv6mr_interface = impl->igmp_recovery.if_index;
/* Leave the group first */
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
&mr6, sizeof(mr6));
if (SPA_LIKELY(res == 0)) {
pw_log_info("left IPv6 multicast group");
} else {
if (errno == EADDRNOTAVAIL) {
pw_log_info("attempted to leave IPv6 multicast group, but "
"membership was already silently dropped");
} else {
pw_log_warn("failed to leave IPv6 multicast group: %m");
}
}
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
&mr6, sizeof(mr6));
if (res < 0) {
pw_log_warn("failed to re-join IPv6 multicast group: %m");
} else {
pw_log_info("re-joined IPv6 multicast group successfully");
}
} else {
struct ip_mreqn mr4;
memset(&mr4, 0, sizeof(mr4));
mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr;
mr4.imr_ifindex = impl->igmp_recovery.if_index;
/* Leave the group first */
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP,
&mr4, sizeof(mr4));
if (SPA_LIKELY(res == 0)) {
pw_log_info("left IPv4 multicast group");
} else {
if (errno == EADDRNOTAVAIL) {
pw_log_info("attempted to leave IPv4 multicast group, but "
"membership was already silently dropped");
} else {
pw_log_warn("failed to leave IPv4 multicast group: %m");
}
}
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
&mr4, sizeof(mr4));
if (res < 0) {
pw_log_warn("failed to re-join IPv4 multicast group: %m");
} else {
pw_log_info("re-joined IPv4 multicast group successfully");
}
}
current_time = rtp_stream_get_nsec(impl->stream);
SPA_ATOMIC_STORE(impl->last_packet_time, current_time);
return res;
}
static void on_igmp_recovery_timer_event(void *data)
{
int res;
struct impl *impl = data;
char addr[128];
uint64_t current_time, elapsed_seconds, last_packet_time;
/* Only attempt recovery if we have a valid socket and multicast address */
if (SPA_UNLIKELY(impl->igmp_recovery.socket_fd < 0)) {
pw_log_trace("no socket, skipping IGMP recovery");
goto finish;
}
/* This check if performed even if standby = false or
* receiving != STATE_RECEIVING , because the very reason
* for these states may be that the receiver socket was
* silently kicked out of the IGMP group (which causes data
* to no longer arrive, thus leading to these states). */
current_time = rtp_stream_get_nsec(impl->stream);
last_packet_time = SPA_ATOMIC_LOAD(impl->last_packet_time);
elapsed_seconds = (current_time - last_packet_time) / SPA_NSEC_PER_SEC;
/* Only trigger recovery if enough time has elapsed since last packet */
if (elapsed_seconds < impl->igmp_recovery.deadline) {
pw_log_trace("IGMP recovery check: %" PRIu64 " seconds elapsed, "
"need %" PRIu32 " seconds", elapsed_seconds,
impl->igmp_recovery.deadline);
goto finish;
}
pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL);
pw_log_info("starting IGMP recovery for %s", addr);
/* Run the actual recovery in the data loop, since recovery involves
* rejoining the socket to the IGMP group. By running this in the
* data loop, race conditions due to stray packets causing an on_rtp_io()
* invocation at the same time when the IGMP group rejoining takes place
* is avoided, since on_rtp_io() too runs in the data loop.
* This is a blocking call to make sure the rejoin attempt was fully
* done by the time this callback ends. (rejoin_igmp_group() does not
* do work that takes a long time to finish. )*/
res = pw_loop_locked(impl->data_loop, rejoin_igmp_group, 1, NULL, 0, impl);
if (SPA_LIKELY(res == 0)) {
pw_log_info("IGMP recovery for %s finished", addr);
} else {
pw_log_error("error while finishing IGMP recovery for %s: %s",
addr, spa_strerror(res));
}
finish:
pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer,
&impl->igmp_recovery.timer.timeout,
impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC,
on_igmp_recovery_timer_event, impl);
}
static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname,
struct igmp_recovery *igmp_recovery)
{
int af, fd, val, res;
struct ifreq req;
@ -377,6 +538,16 @@ static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname)
goto error;
}
/* Store multicast info for recovery */
igmp_recovery->socket_fd = fd;
igmp_recovery->mcast_addr = ba;
igmp_recovery->mcast_len = salen;
igmp_recovery->if_index = req.ifr_ifindex;
igmp_recovery->is_ipv6 = (af == AF_INET6);
pw_log_debug("stored %s multicast info: socket_fd=%d, "
"if_index=%d", igmp_recovery->is_ipv6 ?
"IPv6" : "IPv4", fd, req.ifr_ifindex);
if (bind(fd, (struct sockaddr*)&ba, salen) < 0) {
res = -errno;
pw_log_error("bind() failed: %m");
@ -425,7 +596,8 @@ static void stream_open_connection(void *data, int *result)
pw_log_info("starting RTP listener");
if ((fd = make_socket((const struct sockaddr *)&impl->src_addr,
impl->src_len, impl->ifname)) < 0) {
impl->src_len, impl->ifname,
&(impl->igmp_recovery))) < 0) {
/* If make_socket() tries to create a socket and join to a multicast
* group while the network interfaces are not ready yet to do so
* (usually because a network manager component is still setting up
@ -475,6 +647,13 @@ static void stream_open_connection(void *data, int *result)
goto finish;
}
if ((res = pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer,
NULL, impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC,
on_igmp_recovery_timer_event, impl)) < 0) {
pw_log_error("can't add timer: %s", spa_strerror(res));
goto finish;
}
finish:
if (res != 0) {
pw_log_error("failed to start RTP stream: %s", spa_strerror(res));
@ -498,6 +677,7 @@ static void stream_close_connection(void *data, int *result)
pw_log_info("stopping RTP listener");
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
pw_timer_queue_cancel(&impl->igmp_recovery.timer);
pw_loop_destroy_source(impl->data_loop, impl->source);
impl->source = NULL;
@ -636,6 +816,7 @@ static void impl_destroy(struct impl *impl)
pw_timer_queue_cancel(&impl->standby_timer);
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
pw_timer_queue_cancel(&impl->igmp_recovery.timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
@ -803,6 +984,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->cleanup_interval = pw_properties_get_uint32(stream_props,
"cleanup.sec", DEFAULT_CLEANUP_SEC);
impl->igmp_recovery.check_interval = SPA_MAX(pw_properties_get_uint32(stream_props,
"igmp.check.interval.sec",
DEFAULT_IGMP_CHECK_INTERVAL_SEC), 1u);
pw_log_info("using IGMP check interval of %" PRIu32 " second(s)",
impl->igmp_recovery.check_interval);
impl->igmp_recovery.deadline = SPA_MAX(pw_properties_get_uint32(stream_props,
"igmp.deadline.sec", DEFAULT_IGMP_DEADLINE_SEC), 5u);
pw_log_info("using IGMP deadline of %" PRIu32 " second(s)",
impl->igmp_recovery.deadline);
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {
str = pw_properties_get(props, PW_KEY_REMOTE_NAME);