From c1e737bbe45bab94c69dbd4fe206c2acd40e7bc0 Mon Sep 17 00:00:00 2001 From: Rui Matos Date: Tue, 26 Aug 2025 10:40:13 +0200 Subject: [PATCH 1/8] module-rtp: Attempt to reconnect the ptp management socket This should gracefully recover the cases where the other end of the socket isn't ready yet when we start or terminates and gets restarted. --- src/modules/module-rtp-sap.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index 22fe89b68..6ab0c8477 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -548,8 +548,13 @@ error: static bool update_ts_refclk(struct impl *impl) { - if (!impl->ptp_mgmt_socket || impl->ptp_fd < 0) + if (!impl->ptp_mgmt_socket) return false; + if (impl->ptp_fd < 0) { + impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket); + if (impl->ptp_fd < 0) + return false; + } // Read if something is left in the socket int avail; @@ -581,6 +586,12 @@ static bool update_ts_refclk(struct impl *impl) if (write(impl->ptp_fd, &req, sizeof(req)) == -1) { pw_log_warn("Failed to send PTP management request: %m"); + if (errno != ENOTCONN) + return false; + close(impl->ptp_fd); + impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket); + if (impl->ptp_fd > -1) + pw_log_info("Reopened PTP management socket"); return false; } From b57bd00be01a66dfb08673491c4412370549a169 Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Thu, 23 Oct 2025 16:06:56 +0200 Subject: [PATCH 2/8] module-rtp-sap: Improve names for clearer code --- src/modules/module-rtp-sap.c | 50 ++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index 6ab0c8477..1818df13e 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -265,7 +265,7 @@ struct impl { struct pw_registry *registry; struct spa_hook registry_listener; - struct pw_timer timer; + struct pw_timer sap_send_timer; char *ifname; uint32_t ttl; @@ -288,7 +288,7 @@ struct impl { char *extra_attrs_preamble; char *extra_attrs_end; - char *ptp_mgmt_socket; + char *ptp_mgmt_socket_path; int ptp_fd; uint32_t ptp_seq; uint8_t clock_id[8]; @@ -383,7 +383,7 @@ static bool is_multicast(struct sockaddr *sa, socklen_t salen) return false; } -static int make_unix_socket(const char *path) { +static int make_unix_ptp_mgmt_socket(const char *path) { struct sockaddr_un addr; spa_autoclose int fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0); @@ -419,7 +419,7 @@ static int make_send_socket( af = src->ss_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { - pw_log_error("socket failed: %m"); + pw_log_error("socket() failed: %m"); return -errno; } if (bind(fd, (struct sockaddr*)src, src_len) < 0) { @@ -451,6 +451,9 @@ static int make_send_socket( pw_log_warn("setsockopt(IPV6_MULTICAST_HOPS) failed: %m"); } } + + pw_log_info("sender socket up and running"); + return fd; error: close(fd); @@ -468,13 +471,13 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, af = sa->ss_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { - pw_log_error("socket failed: %m"); + pw_log_error("socket() failed: %m"); return -errno; } val = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { res = -errno; - pw_log_error("setsockopt failed: %m"); + pw_log_error("setsockopt() failed: %m"); goto error; } spa_zero(req); @@ -540,6 +543,9 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, goto error; } } + + pw_log_info("receiver socket up and running"); + return fd; error: close(fd); @@ -548,10 +554,10 @@ error: static bool update_ts_refclk(struct impl *impl) { - if (!impl->ptp_mgmt_socket) + if (!impl->ptp_mgmt_socket_path) return false; if (impl->ptp_fd < 0) { - impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket); + impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path); if (impl->ptp_fd < 0) return false; } @@ -589,7 +595,7 @@ static bool update_ts_refclk(struct impl *impl) if (errno != ENOTCONN) return false; close(impl->ptp_fd); - impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket); + impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path); if (impl->ptp_fd > -1) pw_log_info("Reopened PTP management socket"); return false; @@ -933,7 +939,7 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye) return res; } -static void on_timer_event(void *data) +static void on_sap_send_timer_event(void *data) { struct impl *impl = data; struct session *sess, *tmp; @@ -967,9 +973,9 @@ static void on_timer_event(void *data) } } - pw_timer_queue_add(impl->timer_queue, &impl->timer, - &impl->timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, - on_timer_event, impl); + pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer, + &impl->sap_send_timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, + on_sap_send_timer_event, impl); } static struct session *session_find(struct impl *impl, const struct sdp_info *info) @@ -1665,11 +1671,11 @@ static int start_sap(struct impl *impl) int fd = -1, res; char addr[128] = "invalid"; - pw_log_info("starting SAP timer"); - if ((res = pw_timer_queue_add(impl->timer_queue, &impl->timer, + pw_log_info("starting SAP send timer"); + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer, NULL, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, - on_timer_event, impl)) < 0) { - pw_log_error("can't add timer: %s", spa_strerror(res)); + on_sap_send_timer_event, impl)) < 0) { + pw_log_error("can't add SAP send timer: %s", spa_strerror(res)); goto error; } if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0) @@ -1818,7 +1824,7 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - pw_timer_queue_cancel(&impl->timer); + pw_timer_queue_cancel(&impl->sap_send_timer); if (impl->sap_source) pw_loop_destroy_source(impl->loop, impl->sap_source); @@ -1832,7 +1838,7 @@ static void impl_destroy(struct impl *impl) free(impl->extra_attrs_preamble); free(impl->extra_attrs_end); - free(impl->ptp_mgmt_socket); + free(impl->ptp_mgmt_socket_path); free(impl->ifname); free(impl); } @@ -1904,11 +1910,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->ifname = str ? strdup(str) : NULL; str = pw_properties_get(props, "ptp.management-socket"); - impl->ptp_mgmt_socket = str ? strdup(str) : NULL; + impl->ptp_mgmt_socket_path = str ? strdup(str) : NULL; // TODO: support UDP management access as well - if (impl->ptp_mgmt_socket) - impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket); + if (impl->ptp_mgmt_socket_path) + impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path); if ((str = pw_properties_get(props, "sap.ip")) == NULL) str = DEFAULT_SAP_IP; From 80e7302a05cc7d4da3c9a431d64c5eba538483ae Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Thu, 23 Oct 2025 16:39:51 +0200 Subject: [PATCH 3/8] module-rtp-sap: Add retry code for when start_sap() fails due to ENODEV --- src/modules/module-rtp-sap.c | 69 +++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index 1818df13e..b9404b667 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -267,6 +267,10 @@ struct impl { struct pw_timer sap_send_timer; + /* This timer is used when the first start_sap() call fails because + * of an ENODEV error (see the start_sap() code for details) */ + struct pw_timer start_sap_retry_timer; + char *ifname; uint32_t ttl; bool mcast_loop; @@ -322,6 +326,7 @@ static const struct format_info *find_audio_format_info(const char *mime) return NULL; } +static int start_sap(struct impl *impl); static int send_sap(struct impl *impl, struct session *sess, bool bye); @@ -978,6 +983,13 @@ static void on_sap_send_timer_event(void *data) on_sap_send_timer_event, impl); } +static void on_start_sap_retry_timer_event(void *data) +{ + struct impl *impl = data; + pw_log_debug("trying again to start SAP send after previous attempt failed with ENODEV"); + start_sap(impl); +} + static struct session *session_find(struct impl *impl, const struct sdp_info *info) { struct session *sess; @@ -1668,18 +1680,62 @@ on_sap_io(void *data, int fd, uint32_t mask) static int start_sap(struct impl *impl) { - int fd = -1, res; + int fd = -1, res = 0; char addr[128] = "invalid"; pw_log_info("starting SAP send timer"); + /* start_sap() might be called more than once. See the make_recv_socket() + * call below for why that can happen. In such a case, the timer was + * started already. The easiest way of handling it is to just cancel it. + * Such cases are not expected to occur often, so canceling and then + * adding the timer again is acceptable. */ + pw_timer_queue_cancel(&impl->sap_send_timer); if ((res = pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer, NULL, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, on_sap_send_timer_event, impl)) < 0) { pw_log_error("can't add SAP send timer: %s", spa_strerror(res)); goto error; } - if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0) - return fd; + if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0) { + /* If make_recv_socket() tries to create a socket and join to a multicast + * group while the network interfaces are not ready yet to do so + * (usually because a network manager component is still setting up + * those network interfaces), ENODEV will be returned. This is essentially + * a race condition. There is no discernible way to be notified when the + * network interfaces are ready for that operation, so the next best + * approach is to essentially do a form of polling by retrying the + * start_sap() call after some time. The start_sap_retry_timer exists + * precisely for that purpose. This means that ENODEV is not treated as + * an error, but instead, it triggers the creation of that timer. */ + if (fd == -ENODEV) { + pw_log_warn("failed to create receiver socket because network device " + "is not ready and present yet; will try again"); + + pw_timer_queue_cancel(&impl->start_sap_retry_timer); + /* Use a 1-second retry interval. The network interfaces + * are likely to be up and running then. */ + pw_timer_queue_add(impl->timer_queue, &impl->start_sap_retry_timer, + NULL, 1 * SPA_NSEC_PER_SEC, + on_start_sap_retry_timer_event, impl); + + /* It is important to return 0 in this case. Otherwise, the nonzero return + * value will later be propagated through the core as an error. */ + res = 0; + goto finish; + } else { + pw_log_error("failed to create socket: %s", spa_strerror(-fd)); + /* If ENODEV was returned earlier, and the start_sap_retry_timer + * was consequently created, but then a non-ENODEV error occurred, + * the timer must be stopped and removed. */ + pw_timer_queue_cancel(&impl->start_sap_retry_timer); + res = fd; + goto error; + } + } + + /* Cleanup the timer in case ENODEV occurred earlier, and this time, + * the socket creation succeeded. */ + pw_timer_queue_cancel(&impl->start_sap_retry_timer); pw_net_get_ip(&impl->sap_addr, addr, sizeof(addr), NULL, NULL); pw_log_info("starting SAP listener on %s", addr); @@ -1690,11 +1746,13 @@ static int start_sap(struct impl *impl) goto error; } - return 0; +finish: + return res; + error: if (fd > 0) close(fd); - return res; + goto finish; } static void node_event_info(void *data, const struct pw_node_info *info) @@ -1825,6 +1883,7 @@ static void impl_destroy(struct impl *impl) pw_core_disconnect(impl->core); pw_timer_queue_cancel(&impl->sap_send_timer); + pw_timer_queue_cancel(&impl->start_sap_retry_timer); if (impl->sap_source) pw_loop_destroy_source(impl->loop, impl->sap_source); From f1ffd5e5e83c91cf3176c9940bef6642b4245407 Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Sun, 19 Oct 2025 16:19:16 +0200 Subject: [PATCH 4/8] module-rtp-source: Read cleanup.sec property from stream properties This allows for setting the cleanup.sec value in the create-stream block in the module-rtp-sap configuration. --- src/modules/module-rtp-source.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 63f1f399d..26da7c45e 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -797,7 +797,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) * till we make it (or get timed out) */ pw_properties_set(stream_props, "rtp.receiving", "true"); - impl->cleanup_interval = pw_properties_get_uint32(props, + impl->cleanup_interval = pw_properties_get_uint32(stream_props, "cleanup.sec", DEFAULT_CLEANUP_SEC); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); From 5d21e12658f0a67a87f3d9037264cf80919f89f2 Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Sun, 19 Oct 2025 19:31:44 +0200 Subject: [PATCH 5/8] module-rtp-source: Use make_socket() error value instead of errno make_socket() already returns the negative errno. --- src/modules/module-rtp-source.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 26da7c45e..3954d3137 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -433,7 +433,7 @@ static void stream_open_connection(void *data, int *result) * stream_start() call after some time. The stream_start_retry_timer exists * precisely for that purpose. This means that ENODEV is not treated as * an error, but instead, it triggers the creation of that timer. */ - if (errno == ENODEV) { + if (fd == -ENODEV) { pw_log_warn("failed to create socket because network device is not ready " "and present yet; will try again"); @@ -449,12 +449,12 @@ static void stream_open_connection(void *data, int *result) res = 0; goto finish; } else { - pw_log_error("failed to create socket: %m"); + pw_log_error("failed to create socket: %s", spa_strerror(fd)); /* If ENODEV was returned earlier, and the stream_start_retry_timer * was consequently created, but then a non-ENODEV error occurred, * the timer must be stopped and removed. */ pw_timer_queue_cancel(&impl->stream_start_retry_timer); - res = -errno; + res = fd; goto finish; } } From 3e0f4daf600de2a04b69c0ba5ca0c26e9e5a04f6 Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Thu, 23 Oct 2025 17:53:54 +0200 Subject: [PATCH 6/8] module-rtp-sap: implement IGMP recovery for multicast subscription loss Add IGMP recovery mechanism that monitors SAP packet reception and triggers multicast group refresh when no packets are received if a deadline is reached. The deadline is set to half of the cleanup interval, with a minimum of 1 second. When the deadline is reached, the mechanism performs IGMP leave/rejoin operations to refresh multicast group membership. This ensures SAP announcements continue to be received when network conditions cause IGMP membership to expire or become stale due to router timeouts or network issues. --- src/modules/module-rtp-sap.c | 133 ++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 2 deletions(-) diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index b9404b667..27df24816 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -248,6 +248,16 @@ struct node { struct session *session; }; +struct igmp_recovery { + struct pw_timer timer; + int socket_fd; + struct sockaddr_storage mcast_addr; + socklen_t mcast_len; + uint32_t if_index; + bool is_ipv6; + uint32_t deadline; +}; + struct impl { struct pw_properties *props; @@ -285,6 +295,10 @@ struct impl { struct spa_source *sap_source; uint32_t cleanup_interval; + /* IGMP recovery (triggers when no SAP packets are + * received after the recovery deadline is reached) */ + struct igmp_recovery igmp_recovery; + uint32_t max_sessions; uint32_t n_sessions; struct spa_list sessions; @@ -466,7 +480,7 @@ error: } static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, - char *ifname) + char *ifname, struct igmp_recovery *igmp_recovery) { int af, fd, val, res; struct ifreq req; @@ -536,6 +550,16 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, goto error; } + /* Store multicast info for recovery */ + igmp_recovery->socket_fd = fd; + igmp_recovery->mcast_addr = ba; + igmp_recovery->mcast_len = salen; + igmp_recovery->if_index = req.ifr_ifindex; + igmp_recovery->is_ipv6 = (af == AF_INET6); + pw_log_debug("stored %s multicast info: socket_fd=%d, " + "if_index=%d", igmp_recovery->is_ipv6 ? + "IPv6" : "IPv4", fd, req.ifr_ifindex); + if (bind(fd, (struct sockaddr*)&ba, salen) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -944,6 +968,97 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye) return res; } +static void on_igmp_recovery_timer_event(void *data) +{ + struct impl *impl = data; + char addr[128]; + int res = 0; + + /* Only attempt recovery if we have a valid socket and multicast address */ + if (impl->igmp_recovery.socket_fd < 0) { + pw_log_debug("no socket, skipping IGMP recovery"); + goto finish; + } + + pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL); + pw_log_info("IGMP recovery triggered for %s", addr); + + /* Force IGMP membership refresh by leaving the group first, then rejoin */ + if (impl->igmp_recovery.is_ipv6) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr; + mr6.ipv6mr_interface = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, + &mr6, sizeof(mr6)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv6 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv6 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv6 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mr6, sizeof(mr6)); + if (res < 0) { + pw_log_warn("failed to re-join IPv6 multicast group: %m"); + } else { + pw_log_info("re-joined IPv6 multicast group successfully"); + } + } else { + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr; + mr4.imr_ifindex = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv4 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv4 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv4 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (res < 0) { + pw_log_warn("failed to re-join IPv4 multicast group: %m"); + } else { + pw_log_info("re-joined IPv4 multicast group successfully"); + } + } + +finish: + /* If rejoining failed, try again in 1 second. This can happen + * for example when the network interface went down, and is not + * yet up and running again, and ENODEV is returned as a result. + * Otherwise, continue with the usual deadline. */ + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + &impl->igmp_recovery.timer.timeout, + ((res == 0) ? impl->igmp_recovery.deadline : 1) * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + +static void rearm_igmp_recovery_timer(struct impl *impl) +{ + pw_timer_queue_cancel(&impl->igmp_recovery.timer); + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + NULL, impl->igmp_recovery.deadline * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + static void on_sap_send_timer_event(void *data) { struct impl *impl = data; @@ -1675,6 +1790,8 @@ on_sap_io(void *data, int fd, uint32_t mask) buffer[len] = 0; if ((res = parse_sap(impl, buffer, len)) < 0) pw_log_warn("error parsing SAP: %s", spa_strerror(res)); + + rearm_igmp_recovery_timer(impl); } } @@ -1696,7 +1813,8 @@ static int start_sap(struct impl *impl) pw_log_error("can't add SAP send timer: %s", spa_strerror(res)); goto error; } - if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0) { + if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname, + &(impl->igmp_recovery))) < 0) { /* If make_recv_socket() tries to create a socket and join to a multicast * group while the network interfaces are not ready yet to do so * (usually because a network manager component is still setting up @@ -1746,6 +1864,8 @@ static int start_sap(struct impl *impl) goto error; } + rearm_igmp_recovery_timer(impl); + finish: return res; @@ -1884,6 +2004,7 @@ static void impl_destroy(struct impl *impl) pw_timer_queue_cancel(&impl->sap_send_timer); pw_timer_queue_cancel(&impl->start_sap_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); if (impl->sap_source) pw_loop_destroy_source(impl->loop, impl->sap_source); @@ -1950,6 +2071,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->ptp_fd = -1; spa_list_init(&impl->sessions); + impl->igmp_recovery.socket_fd = -1; + impl->igmp_recovery.if_index = -1; + if (args == NULL) args = ""; @@ -1985,6 +2109,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->cleanup_interval = pw_properties_get_uint32(impl->props, "sap.cleanup.sec", DEFAULT_CLEANUP_SEC); + /* We will use half of the cleanup interval for IGMP deadline, minimum 1 second */ + impl->igmp_recovery.deadline = SPA_MAX(impl->cleanup_interval / 2, 1u); + pw_log_info("using IGMP deadline of %" PRIu32 " second(s)", + impl->igmp_recovery.deadline); + impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); impl->max_sessions = pw_properties_get_uint32(props, "sap.max-sessions", DEFAULT_MAX_SESSIONS); From 955c9ae837dfdeceb4dd3b2261a32622e12807b4 Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Thu, 23 Oct 2025 20:08:22 +0200 Subject: [PATCH 7/8] module-rtp: Get the current stream time in a reusable manner That way, redundant pw_stream_get_nsec() and clock_gettime() calls can be avoided. --- src/modules/module-rtp-session.c | 7 +++++-- src/modules/module-rtp-source.c | 23 +++++++++++++---------- src/modules/module-rtp/audio.c | 5 +++-- src/modules/module-rtp/midi.c | 3 ++- src/modules/module-rtp/opus.c | 3 ++- src/modules/module-rtp/stream.c | 14 +++++++++++--- src/modules/module-rtp/stream.h | 5 ++++- 7 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index 2b69a0be7..cd7ca7f4e 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -1043,8 +1043,11 @@ on_data_io(void *data, int fd, uint32_t mask) if (sess == NULL) goto unknown_ssrc; - if (sess->data_ready && sess->receiving) - rtp_stream_receive_packet(sess->recv, buffer, len); + if (sess->data_ready && sess->receiving) { + uint64_t current_time = rtp_stream_get_nsec(sess->recv); + rtp_stream_receive_packet(sess->recv, buffer, len, + current_time); + } } } return; diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 3954d3137..377cc4109 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -227,13 +227,6 @@ struct impl { bool waiting; }; -static inline uint64_t get_time_ns(void) -{ - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return SPA_TIMESPEC_TO_NSEC(&ts); -} - static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { @@ -261,6 +254,9 @@ on_rtp_io(void *data, int fd, uint32_t mask) struct impl *impl = data; ssize_t len; int suppressed; + uint64_t current_time; + + current_time = rtp_stream_get_nsec(impl->stream); if (mask & SPA_IO_IN) { if ((len = recv(fd, impl->buffer, impl->buffer_size, 0)) < 0) @@ -270,10 +266,17 @@ on_rtp_io(void *data, int fd, uint32_t mask) goto short_packet; if (SPA_LIKELY(impl->stream)) { - if (rtp_stream_receive_packet(impl->stream, impl->buffer, len) < 0) + if (rtp_stream_receive_packet(impl->stream, impl->buffer, len, + current_time) < 0) goto receive_error; } + /* Update last packet timestamp for IGMP recovery. + * The recovery timer will check this to see if recovery + * is necessary. Do this _before_ invoking do_start() + * in case the stream is waking up from standby. */ + SPA_ATOMIC_STORE(impl->last_packet_time, current_time); + if (SPA_ATOMIC_LOAD(impl->state) != STATE_RECEIVING) { if (!SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_RECEIVING)) { if (SPA_ATOMIC_CAS(impl->state, STATE_IDLE, STATE_RECEIVING)) @@ -284,11 +287,11 @@ on_rtp_io(void *data, int fd, uint32_t mask) return; receive_error: - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0) pw_log_warn("(%d suppressed) recv() error: %m", suppressed); return; short_packet: - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0) pw_log_warn("(%d suppressed) short packet of len %zd received", suppressed, len); return; diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 1563e1917..0af38d649 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -233,7 +233,8 @@ static void rtp_audio_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } -static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen, plen; @@ -273,7 +274,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len) timestamp = ntohl(hdr->timestamp) - impl->ts_offset; impl->receiving = true; - impl->last_recv_timestamp = pw_stream_get_nsec(impl->stream); + impl->last_recv_timestamp = current_time; plen = len - hlen; samples = plen / stride; diff --git a/src/modules/module-rtp/midi.c b/src/modules/module-rtp/midi.c index 498c6e6a9..5fbdf3b63 100644 --- a/src/modules/module-rtp/midi.c +++ b/src/modules/module-rtp/midi.c @@ -318,7 +318,8 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti return 0; } -static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen; diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c index 7eeda7f43..d13a4efaf 100644 --- a/src/modules/module-rtp/opus.c +++ b/src/modules/module-rtp/opus.c @@ -99,7 +99,8 @@ static void rtp_opus_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } -static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen, plen; diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 3834206ec..fa055ce0c 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -151,7 +151,8 @@ struct impl { * access below for the reason why. */ uint8_t timer_running; - int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); + int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time); /* Used for resetting the ring buffer before the stream starts, to prevent * reading from uninitialized memory. This can otherwise happen in direct * timestamp mode when the read index is set to an uninitialized location. @@ -1036,10 +1037,17 @@ int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *di return pw_stream_update_properties(impl->stream, dict); } -int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, + uint64_t current_time) { struct impl *impl = (struct impl*)s; - return impl->receive_rtp(impl, buffer, len); + return impl->receive_rtp(impl, buffer, len, current_time); +} + +uint64_t rtp_stream_get_nsec(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + return pw_stream_get_nsec(impl->stream); } uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate) diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index ea358f350..095b8395c 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -62,7 +62,10 @@ void rtp_stream_destroy(struct rtp_stream *s); int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *dict); -int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len); +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, + uint64_t current_time); + +uint64_t rtp_stream_get_nsec(struct rtp_stream *s); uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate); From 1096d634682bb2c9988c394bfc3e9d8cb5a13160 Mon Sep 17 00:00:00 2001 From: Carlos Rafael Giani Date: Thu, 23 Oct 2025 20:16:14 +0200 Subject: [PATCH 8/8] module-rtp-source: implement IGMP recovery for multicast subscription loss Add IGMP recovery mechanism that monitors RTP packet reception and triggers multicast group refresh when no packets are received if a deadline is reached. The deadline is configurable via a new stream property "igmp.deadline.sec" (in seconds), with the default value being 30 seconds (and a minimum of 5 seconds). A timer checks regularly if the deadline was reached. That timer's interval is set by the igmp.check.interval.sec property (in seconds), with the default value being 5 seconds (and a minimum of 1 second). When the deadline is reached, the mechanism performs IGMP leave/rejoin operations to refresh multicast group membership. This ensures RTP data continues to be received when network conditions cause IGMP membership to expire or become stale due to router timeouts or network issues. --- src/modules/module-rtp-source.c | 196 +++++++++++++++++++++++++++++++- 1 file changed, 194 insertions(+), 2 deletions(-) diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 377cc4109..5c3d1f05f 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -156,6 +157,9 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic +#define DEFAULT_IGMP_CHECK_INTERVAL_SEC 5 +#define DEFAULT_IGMP_DEADLINE_SEC 30 + #define DEFAULT_CLEANUP_SEC 60 #define DEFAULT_SOURCE_IP "224.0.0.56" @@ -180,6 +184,23 @@ static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; +struct igmp_recovery { + struct pw_timer timer; + int socket_fd; + struct sockaddr_storage mcast_addr; + socklen_t mcast_len; + uint32_t if_index; + bool is_ipv6; + /* This is the interval the recovery timer runs at. The timer + * checks at each interval if recovery is required. This value + * is defined by the igmp.check.interval.sec property. */ + uint32_t check_interval; + /* This is the deadline for packets to arrive. If the deadline + * is exceeded, an IGMP recovery is attempted. This value is + * defined by the igmp.deadline.sec property. */ + uint32_t deadline; +}; + struct impl { struct pw_impl_module *module; struct spa_hook module_listener; @@ -201,6 +222,15 @@ struct impl { bool always_process; uint32_t cleanup_interval; + /* IGMP recovery (triggers when no RTP packets are + * received after the recovery deadline is reached) */ + struct igmp_recovery igmp_recovery; + + /* Monotonic timestamp of the last time a packet was + * received. This is accessed with atomic accessors + * to avoid race conditions. */ + uint64_t last_packet_time; + struct pw_timer standby_timer; /* This timer is used when the first stream_start() call fails because * of an ENODEV error (see the stream_start() code for details) */ @@ -297,7 +327,138 @@ short_packet: return; } -static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) +static int rejoin_igmp_group(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + /* IMPORTANT: This must be run from within the data loop. */ + + int res; + struct impl *impl = user_data; + uint64_t current_time; + + /* Force IGMP membership refresh by leaving the group first, then rejoin */ + if (impl->igmp_recovery.is_ipv6) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr; + mr6.ipv6mr_interface = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, + &mr6, sizeof(mr6)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv6 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv6 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv6 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mr6, sizeof(mr6)); + if (res < 0) { + pw_log_warn("failed to re-join IPv6 multicast group: %m"); + } else { + pw_log_info("re-joined IPv6 multicast group successfully"); + } + } else { + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr; + mr4.imr_ifindex = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv4 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv4 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv4 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (res < 0) { + pw_log_warn("failed to re-join IPv4 multicast group: %m"); + } else { + pw_log_info("re-joined IPv4 multicast group successfully"); + } + } + + current_time = rtp_stream_get_nsec(impl->stream); + SPA_ATOMIC_STORE(impl->last_packet_time, current_time); + + return res; +} + +static void on_igmp_recovery_timer_event(void *data) +{ + int res; + struct impl *impl = data; + char addr[128]; + uint64_t current_time, elapsed_seconds, last_packet_time; + + /* Only attempt recovery if we have a valid socket and multicast address */ + if (SPA_UNLIKELY(impl->igmp_recovery.socket_fd < 0)) { + pw_log_trace("no socket, skipping IGMP recovery"); + goto finish; + } + + /* This check if performed even if standby = false or + * receiving != STATE_RECEIVING , because the very reason + * for these states may be that the receiver socket was + * silently kicked out of the IGMP group (which causes data + * to no longer arrive, thus leading to these states). */ + + current_time = rtp_stream_get_nsec(impl->stream); + last_packet_time = SPA_ATOMIC_LOAD(impl->last_packet_time); + elapsed_seconds = (current_time - last_packet_time) / SPA_NSEC_PER_SEC; + + /* Only trigger recovery if enough time has elapsed since last packet */ + if (elapsed_seconds < impl->igmp_recovery.deadline) { + pw_log_trace("IGMP recovery check: %" PRIu64 " seconds elapsed, " + "need %" PRIu32 " seconds", elapsed_seconds, + impl->igmp_recovery.deadline); + goto finish; + } + + pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL); + pw_log_info("starting IGMP recovery for %s", addr); + + /* Run the actual recovery in the data loop, since recovery involves + * rejoining the socket to the IGMP group. By running this in the + * data loop, race conditions due to stray packets causing an on_rtp_io() + * invocation at the same time when the IGMP group rejoining takes place + * is avoided, since on_rtp_io() too runs in the data loop. + * This is a blocking call to make sure the rejoin attempt was fully + * done by the time this callback ends. (rejoin_igmp_group() does not + * do work that takes a long time to finish. )*/ + res = pw_loop_locked(impl->data_loop, rejoin_igmp_group, 1, NULL, 0, impl); + + if (SPA_LIKELY(res == 0)) { + pw_log_info("IGMP recovery for %s finished", addr); + } else { + pw_log_error("error while finishing IGMP recovery for %s: %s", + addr, spa_strerror(res)); + } + +finish: + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + &impl->igmp_recovery.timer.timeout, + impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + +static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname, + struct igmp_recovery *igmp_recovery) { int af, fd, val, res; struct ifreq req; @@ -377,6 +538,16 @@ static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) goto error; } + /* Store multicast info for recovery */ + igmp_recovery->socket_fd = fd; + igmp_recovery->mcast_addr = ba; + igmp_recovery->mcast_len = salen; + igmp_recovery->if_index = req.ifr_ifindex; + igmp_recovery->is_ipv6 = (af == AF_INET6); + pw_log_debug("stored %s multicast info: socket_fd=%d, " + "if_index=%d", igmp_recovery->is_ipv6 ? + "IPv6" : "IPv4", fd, req.ifr_ifindex); + if (bind(fd, (struct sockaddr*)&ba, salen) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -425,7 +596,8 @@ static void stream_open_connection(void *data, int *result) pw_log_info("starting RTP listener"); if ((fd = make_socket((const struct sockaddr *)&impl->src_addr, - impl->src_len, impl->ifname)) < 0) { + impl->src_len, impl->ifname, + &(impl->igmp_recovery))) < 0) { /* If make_socket() tries to create a socket and join to a multicast * group while the network interfaces are not ready yet to do so * (usually because a network manager component is still setting up @@ -475,6 +647,13 @@ static void stream_open_connection(void *data, int *result) goto finish; } + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + NULL, impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl)) < 0) { + pw_log_error("can't add timer: %s", spa_strerror(res)); + goto finish; + } + finish: if (res != 0) { pw_log_error("failed to start RTP stream: %s", spa_strerror(res)); @@ -498,6 +677,7 @@ static void stream_close_connection(void *data, int *result) pw_log_info("stopping RTP listener"); pw_timer_queue_cancel(&impl->stream_start_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); pw_loop_destroy_source(impl->data_loop, impl->source); impl->source = NULL; @@ -636,6 +816,7 @@ static void impl_destroy(struct impl *impl) pw_timer_queue_cancel(&impl->standby_timer); pw_timer_queue_cancel(&impl->stream_start_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); @@ -803,6 +984,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->cleanup_interval = pw_properties_get_uint32(stream_props, "cleanup.sec", DEFAULT_CLEANUP_SEC); + impl->igmp_recovery.check_interval = SPA_MAX(pw_properties_get_uint32(stream_props, + "igmp.check.interval.sec", + DEFAULT_IGMP_CHECK_INTERVAL_SEC), 1u); + pw_log_info("using IGMP check interval of %" PRIu32 " second(s)", + impl->igmp_recovery.check_interval); + + impl->igmp_recovery.deadline = SPA_MAX(pw_properties_get_uint32(stream_props, + "igmp.deadline.sec", DEFAULT_IGMP_DEADLINE_SEC), 5u); + pw_log_info("using IGMP deadline of %" PRIu32 " second(s)", + impl->igmp_recovery.deadline); + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME);