diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index 22fe89b68..27df24816 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -248,6 +248,16 @@ struct node { struct session *session; }; +struct igmp_recovery { + struct pw_timer timer; + int socket_fd; + struct sockaddr_storage mcast_addr; + socklen_t mcast_len; + uint32_t if_index; + bool is_ipv6; + uint32_t deadline; +}; + struct impl { struct pw_properties *props; @@ -265,7 +275,11 @@ struct impl { struct pw_registry *registry; struct spa_hook registry_listener; - struct pw_timer timer; + struct pw_timer sap_send_timer; + + /* This timer is used when the first start_sap() call fails because + * of an ENODEV error (see the start_sap() code for details) */ + struct pw_timer start_sap_retry_timer; char *ifname; uint32_t ttl; @@ -281,6 +295,10 @@ struct impl { struct spa_source *sap_source; uint32_t cleanup_interval; + /* IGMP recovery (triggers when no SAP packets are + * received after the recovery deadline is reached) */ + struct igmp_recovery igmp_recovery; + uint32_t max_sessions; uint32_t n_sessions; struct spa_list sessions; @@ -288,7 +306,7 @@ struct impl { char *extra_attrs_preamble; char *extra_attrs_end; - char *ptp_mgmt_socket; + char *ptp_mgmt_socket_path; int ptp_fd; uint32_t ptp_seq; uint8_t clock_id[8]; @@ -322,6 +340,7 @@ static const struct format_info *find_audio_format_info(const char *mime) return NULL; } +static int start_sap(struct impl *impl); static int send_sap(struct impl *impl, struct session *sess, bool bye); @@ -383,7 +402,7 @@ static bool is_multicast(struct sockaddr *sa, socklen_t salen) return false; } -static int make_unix_socket(const char *path) { +static int make_unix_ptp_mgmt_socket(const char *path) { struct sockaddr_un addr; spa_autoclose int fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0); @@ -419,7 +438,7 @@ static int make_send_socket( af = src->ss_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { - pw_log_error("socket failed: %m"); + pw_log_error("socket() failed: %m"); return -errno; } if (bind(fd, (struct sockaddr*)src, src_len) < 0) { @@ -451,6 +470,9 @@ static int make_send_socket( pw_log_warn("setsockopt(IPV6_MULTICAST_HOPS) failed: %m"); } } + + pw_log_info("sender socket up and running"); + return fd; error: close(fd); @@ -458,7 +480,7 @@ error: } static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, - char *ifname) + char *ifname, struct igmp_recovery *igmp_recovery) { int af, fd, val, res; struct ifreq req; @@ -468,13 +490,13 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, af = sa->ss_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { - pw_log_error("socket failed: %m"); + pw_log_error("socket() failed: %m"); return -errno; } val = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { res = -errno; - pw_log_error("setsockopt failed: %m"); + pw_log_error("setsockopt() failed: %m"); goto error; } spa_zero(req); @@ -528,6 +550,16 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, goto error; } + /* Store multicast info for recovery */ + igmp_recovery->socket_fd = fd; + igmp_recovery->mcast_addr = ba; + igmp_recovery->mcast_len = salen; + igmp_recovery->if_index = req.ifr_ifindex; + igmp_recovery->is_ipv6 = (af == AF_INET6); + pw_log_debug("stored %s multicast info: socket_fd=%d, " + "if_index=%d", igmp_recovery->is_ipv6 ? + "IPv6" : "IPv4", fd, req.ifr_ifindex); + if (bind(fd, (struct sockaddr*)&ba, salen) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -540,6 +572,9 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen, goto error; } } + + pw_log_info("receiver socket up and running"); + return fd; error: close(fd); @@ -548,8 +583,13 @@ error: static bool update_ts_refclk(struct impl *impl) { - if (!impl->ptp_mgmt_socket || impl->ptp_fd < 0) + if (!impl->ptp_mgmt_socket_path) return false; + if (impl->ptp_fd < 0) { + impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path); + if (impl->ptp_fd < 0) + return false; + } // Read if something is left in the socket int avail; @@ -581,6 +621,12 @@ static bool update_ts_refclk(struct impl *impl) if (write(impl->ptp_fd, &req, sizeof(req)) == -1) { pw_log_warn("Failed to send PTP management request: %m"); + if (errno != ENOTCONN) + return false; + close(impl->ptp_fd); + impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path); + if (impl->ptp_fd > -1) + pw_log_info("Reopened PTP management socket"); return false; } @@ -922,7 +968,98 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye) return res; } -static void on_timer_event(void *data) +static void on_igmp_recovery_timer_event(void *data) +{ + struct impl *impl = data; + char addr[128]; + int res = 0; + + /* Only attempt recovery if we have a valid socket and multicast address */ + if (impl->igmp_recovery.socket_fd < 0) { + pw_log_debug("no socket, skipping IGMP recovery"); + goto finish; + } + + pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL); + pw_log_info("IGMP recovery triggered for %s", addr); + + /* Force IGMP membership refresh by leaving the group first, then rejoin */ + if (impl->igmp_recovery.is_ipv6) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr; + mr6.ipv6mr_interface = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, + &mr6, sizeof(mr6)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv6 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv6 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv6 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mr6, sizeof(mr6)); + if (res < 0) { + pw_log_warn("failed to re-join IPv6 multicast group: %m"); + } else { + pw_log_info("re-joined IPv6 multicast group successfully"); + } + } else { + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr; + mr4.imr_ifindex = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv4 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv4 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv4 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (res < 0) { + pw_log_warn("failed to re-join IPv4 multicast group: %m"); + } else { + pw_log_info("re-joined IPv4 multicast group successfully"); + } + } + +finish: + /* If rejoining failed, try again in 1 second. This can happen + * for example when the network interface went down, and is not + * yet up and running again, and ENODEV is returned as a result. + * Otherwise, continue with the usual deadline. */ + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + &impl->igmp_recovery.timer.timeout, + ((res == 0) ? impl->igmp_recovery.deadline : 1) * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + +static void rearm_igmp_recovery_timer(struct impl *impl) +{ + pw_timer_queue_cancel(&impl->igmp_recovery.timer); + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + NULL, impl->igmp_recovery.deadline * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + +static void on_sap_send_timer_event(void *data) { struct impl *impl = data; struct session *sess, *tmp; @@ -956,9 +1093,16 @@ static void on_timer_event(void *data) } } - pw_timer_queue_add(impl->timer_queue, &impl->timer, - &impl->timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, - on_timer_event, impl); + pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer, + &impl->sap_send_timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, + on_sap_send_timer_event, impl); +} + +static void on_start_sap_retry_timer_event(void *data) +{ + struct impl *impl = data; + pw_log_debug("trying again to start SAP send after previous attempt failed with ENODEV"); + start_sap(impl); } static struct session *session_find(struct impl *impl, const struct sdp_info *info) @@ -1646,23 +1790,70 @@ on_sap_io(void *data, int fd, uint32_t mask) buffer[len] = 0; if ((res = parse_sap(impl, buffer, len)) < 0) pw_log_warn("error parsing SAP: %s", spa_strerror(res)); + + rearm_igmp_recovery_timer(impl); } } static int start_sap(struct impl *impl) { - int fd = -1, res; + int fd = -1, res = 0; char addr[128] = "invalid"; - pw_log_info("starting SAP timer"); - if ((res = pw_timer_queue_add(impl->timer_queue, &impl->timer, + pw_log_info("starting SAP send timer"); + /* start_sap() might be called more than once. See the make_recv_socket() + * call below for why that can happen. In such a case, the timer was + * started already. The easiest way of handling it is to just cancel it. + * Such cases are not expected to occur often, so canceling and then + * adding the timer again is acceptable. */ + pw_timer_queue_cancel(&impl->sap_send_timer); + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer, NULL, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, - on_timer_event, impl)) < 0) { - pw_log_error("can't add timer: %s", spa_strerror(res)); + on_sap_send_timer_event, impl)) < 0) { + pw_log_error("can't add SAP send timer: %s", spa_strerror(res)); goto error; } - if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0) - return fd; + if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname, + &(impl->igmp_recovery))) < 0) { + /* If make_recv_socket() tries to create a socket and join to a multicast + * group while the network interfaces are not ready yet to do so + * (usually because a network manager component is still setting up + * those network interfaces), ENODEV will be returned. This is essentially + * a race condition. There is no discernible way to be notified when the + * network interfaces are ready for that operation, so the next best + * approach is to essentially do a form of polling by retrying the + * start_sap() call after some time. The start_sap_retry_timer exists + * precisely for that purpose. This means that ENODEV is not treated as + * an error, but instead, it triggers the creation of that timer. */ + if (fd == -ENODEV) { + pw_log_warn("failed to create receiver socket because network device " + "is not ready and present yet; will try again"); + + pw_timer_queue_cancel(&impl->start_sap_retry_timer); + /* Use a 1-second retry interval. The network interfaces + * are likely to be up and running then. */ + pw_timer_queue_add(impl->timer_queue, &impl->start_sap_retry_timer, + NULL, 1 * SPA_NSEC_PER_SEC, + on_start_sap_retry_timer_event, impl); + + /* It is important to return 0 in this case. Otherwise, the nonzero return + * value will later be propagated through the core as an error. */ + res = 0; + goto finish; + } else { + pw_log_error("failed to create socket: %s", spa_strerror(-fd)); + /* If ENODEV was returned earlier, and the start_sap_retry_timer + * was consequently created, but then a non-ENODEV error occurred, + * the timer must be stopped and removed. */ + pw_timer_queue_cancel(&impl->start_sap_retry_timer); + res = fd; + goto error; + } + } + + /* Cleanup the timer in case ENODEV occurred earlier, and this time, + * the socket creation succeeded. */ + pw_timer_queue_cancel(&impl->start_sap_retry_timer); pw_net_get_ip(&impl->sap_addr, addr, sizeof(addr), NULL, NULL); pw_log_info("starting SAP listener on %s", addr); @@ -1673,11 +1864,15 @@ static int start_sap(struct impl *impl) goto error; } - return 0; + rearm_igmp_recovery_timer(impl); + +finish: + return res; + error: if (fd > 0) close(fd); - return res; + goto finish; } static void node_event_info(void *data, const struct pw_node_info *info) @@ -1807,7 +2002,9 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - pw_timer_queue_cancel(&impl->timer); + pw_timer_queue_cancel(&impl->sap_send_timer); + pw_timer_queue_cancel(&impl->start_sap_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); if (impl->sap_source) pw_loop_destroy_source(impl->loop, impl->sap_source); @@ -1821,7 +2018,7 @@ static void impl_destroy(struct impl *impl) free(impl->extra_attrs_preamble); free(impl->extra_attrs_end); - free(impl->ptp_mgmt_socket); + free(impl->ptp_mgmt_socket_path); free(impl->ifname); free(impl); } @@ -1874,6 +2071,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->ptp_fd = -1; spa_list_init(&impl->sessions); + impl->igmp_recovery.socket_fd = -1; + impl->igmp_recovery.if_index = -1; + if (args == NULL) args = ""; @@ -1893,11 +2093,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->ifname = str ? strdup(str) : NULL; str = pw_properties_get(props, "ptp.management-socket"); - impl->ptp_mgmt_socket = str ? strdup(str) : NULL; + impl->ptp_mgmt_socket_path = str ? strdup(str) : NULL; // TODO: support UDP management access as well - if (impl->ptp_mgmt_socket) - impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket); + if (impl->ptp_mgmt_socket_path) + impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path); if ((str = pw_properties_get(props, "sap.ip")) == NULL) str = DEFAULT_SAP_IP; @@ -1909,6 +2109,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->cleanup_interval = pw_properties_get_uint32(impl->props, "sap.cleanup.sec", DEFAULT_CLEANUP_SEC); + /* We will use half of the cleanup interval for IGMP deadline, minimum 1 second */ + impl->igmp_recovery.deadline = SPA_MAX(impl->cleanup_interval / 2, 1u); + pw_log_info("using IGMP deadline of %" PRIu32 " second(s)", + impl->igmp_recovery.deadline); + impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); impl->max_sessions = pw_properties_get_uint32(props, "sap.max-sessions", DEFAULT_MAX_SESSIONS); diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index 2b69a0be7..cd7ca7f4e 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -1043,8 +1043,11 @@ on_data_io(void *data, int fd, uint32_t mask) if (sess == NULL) goto unknown_ssrc; - if (sess->data_ready && sess->receiving) - rtp_stream_receive_packet(sess->recv, buffer, len); + if (sess->data_ready && sess->receiving) { + uint64_t current_time = rtp_stream_get_nsec(sess->recv); + rtp_stream_receive_packet(sess->recv, buffer, len, + current_time); + } } } return; diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 63f1f399d..5c3d1f05f 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -156,6 +157,9 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic +#define DEFAULT_IGMP_CHECK_INTERVAL_SEC 5 +#define DEFAULT_IGMP_DEADLINE_SEC 30 + #define DEFAULT_CLEANUP_SEC 60 #define DEFAULT_SOURCE_IP "224.0.0.56" @@ -180,6 +184,23 @@ static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; +struct igmp_recovery { + struct pw_timer timer; + int socket_fd; + struct sockaddr_storage mcast_addr; + socklen_t mcast_len; + uint32_t if_index; + bool is_ipv6; + /* This is the interval the recovery timer runs at. The timer + * checks at each interval if recovery is required. This value + * is defined by the igmp.check.interval.sec property. */ + uint32_t check_interval; + /* This is the deadline for packets to arrive. If the deadline + * is exceeded, an IGMP recovery is attempted. This value is + * defined by the igmp.deadline.sec property. */ + uint32_t deadline; +}; + struct impl { struct pw_impl_module *module; struct spa_hook module_listener; @@ -201,6 +222,15 @@ struct impl { bool always_process; uint32_t cleanup_interval; + /* IGMP recovery (triggers when no RTP packets are + * received after the recovery deadline is reached) */ + struct igmp_recovery igmp_recovery; + + /* Monotonic timestamp of the last time a packet was + * received. This is accessed with atomic accessors + * to avoid race conditions. */ + uint64_t last_packet_time; + struct pw_timer standby_timer; /* This timer is used when the first stream_start() call fails because * of an ENODEV error (see the stream_start() code for details) */ @@ -227,13 +257,6 @@ struct impl { bool waiting; }; -static inline uint64_t get_time_ns(void) -{ - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return SPA_TIMESPEC_TO_NSEC(&ts); -} - static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { @@ -261,6 +284,9 @@ on_rtp_io(void *data, int fd, uint32_t mask) struct impl *impl = data; ssize_t len; int suppressed; + uint64_t current_time; + + current_time = rtp_stream_get_nsec(impl->stream); if (mask & SPA_IO_IN) { if ((len = recv(fd, impl->buffer, impl->buffer_size, 0)) < 0) @@ -270,10 +296,17 @@ on_rtp_io(void *data, int fd, uint32_t mask) goto short_packet; if (SPA_LIKELY(impl->stream)) { - if (rtp_stream_receive_packet(impl->stream, impl->buffer, len) < 0) + if (rtp_stream_receive_packet(impl->stream, impl->buffer, len, + current_time) < 0) goto receive_error; } + /* Update last packet timestamp for IGMP recovery. + * The recovery timer will check this to see if recovery + * is necessary. Do this _before_ invoking do_start() + * in case the stream is waking up from standby. */ + SPA_ATOMIC_STORE(impl->last_packet_time, current_time); + if (SPA_ATOMIC_LOAD(impl->state) != STATE_RECEIVING) { if (!SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_RECEIVING)) { if (SPA_ATOMIC_CAS(impl->state, STATE_IDLE, STATE_RECEIVING)) @@ -284,17 +317,148 @@ on_rtp_io(void *data, int fd, uint32_t mask) return; receive_error: - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0) pw_log_warn("(%d suppressed) recv() error: %m", suppressed); return; short_packet: - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0) pw_log_warn("(%d suppressed) short packet of len %zd received", suppressed, len); return; } -static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) +static int rejoin_igmp_group(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + /* IMPORTANT: This must be run from within the data loop. */ + + int res; + struct impl *impl = user_data; + uint64_t current_time; + + /* Force IGMP membership refresh by leaving the group first, then rejoin */ + if (impl->igmp_recovery.is_ipv6) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr; + mr6.ipv6mr_interface = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, + &mr6, sizeof(mr6)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv6 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv6 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv6 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mr6, sizeof(mr6)); + if (res < 0) { + pw_log_warn("failed to re-join IPv6 multicast group: %m"); + } else { + pw_log_info("re-joined IPv6 multicast group successfully"); + } + } else { + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr; + mr4.imr_ifindex = impl->igmp_recovery.if_index; + + /* Leave the group first */ + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (SPA_LIKELY(res == 0)) { + pw_log_info("left IPv4 multicast group"); + } else { + if (errno == EADDRNOTAVAIL) { + pw_log_info("attempted to leave IPv4 multicast group, but " + "membership was already silently dropped"); + } else { + pw_log_warn("failed to leave IPv4 multicast group: %m"); + } + } + + res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mr4, sizeof(mr4)); + if (res < 0) { + pw_log_warn("failed to re-join IPv4 multicast group: %m"); + } else { + pw_log_info("re-joined IPv4 multicast group successfully"); + } + } + + current_time = rtp_stream_get_nsec(impl->stream); + SPA_ATOMIC_STORE(impl->last_packet_time, current_time); + + return res; +} + +static void on_igmp_recovery_timer_event(void *data) +{ + int res; + struct impl *impl = data; + char addr[128]; + uint64_t current_time, elapsed_seconds, last_packet_time; + + /* Only attempt recovery if we have a valid socket and multicast address */ + if (SPA_UNLIKELY(impl->igmp_recovery.socket_fd < 0)) { + pw_log_trace("no socket, skipping IGMP recovery"); + goto finish; + } + + /* This check if performed even if standby = false or + * receiving != STATE_RECEIVING , because the very reason + * for these states may be that the receiver socket was + * silently kicked out of the IGMP group (which causes data + * to no longer arrive, thus leading to these states). */ + + current_time = rtp_stream_get_nsec(impl->stream); + last_packet_time = SPA_ATOMIC_LOAD(impl->last_packet_time); + elapsed_seconds = (current_time - last_packet_time) / SPA_NSEC_PER_SEC; + + /* Only trigger recovery if enough time has elapsed since last packet */ + if (elapsed_seconds < impl->igmp_recovery.deadline) { + pw_log_trace("IGMP recovery check: %" PRIu64 " seconds elapsed, " + "need %" PRIu32 " seconds", elapsed_seconds, + impl->igmp_recovery.deadline); + goto finish; + } + + pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL); + pw_log_info("starting IGMP recovery for %s", addr); + + /* Run the actual recovery in the data loop, since recovery involves + * rejoining the socket to the IGMP group. By running this in the + * data loop, race conditions due to stray packets causing an on_rtp_io() + * invocation at the same time when the IGMP group rejoining takes place + * is avoided, since on_rtp_io() too runs in the data loop. + * This is a blocking call to make sure the rejoin attempt was fully + * done by the time this callback ends. (rejoin_igmp_group() does not + * do work that takes a long time to finish. )*/ + res = pw_loop_locked(impl->data_loop, rejoin_igmp_group, 1, NULL, 0, impl); + + if (SPA_LIKELY(res == 0)) { + pw_log_info("IGMP recovery for %s finished", addr); + } else { + pw_log_error("error while finishing IGMP recovery for %s: %s", + addr, spa_strerror(res)); + } + +finish: + pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + &impl->igmp_recovery.timer.timeout, + impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl); +} + +static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname, + struct igmp_recovery *igmp_recovery) { int af, fd, val, res; struct ifreq req; @@ -374,6 +538,16 @@ static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) goto error; } + /* Store multicast info for recovery */ + igmp_recovery->socket_fd = fd; + igmp_recovery->mcast_addr = ba; + igmp_recovery->mcast_len = salen; + igmp_recovery->if_index = req.ifr_ifindex; + igmp_recovery->is_ipv6 = (af == AF_INET6); + pw_log_debug("stored %s multicast info: socket_fd=%d, " + "if_index=%d", igmp_recovery->is_ipv6 ? + "IPv6" : "IPv4", fd, req.ifr_ifindex); + if (bind(fd, (struct sockaddr*)&ba, salen) < 0) { res = -errno; pw_log_error("bind() failed: %m"); @@ -422,7 +596,8 @@ static void stream_open_connection(void *data, int *result) pw_log_info("starting RTP listener"); if ((fd = make_socket((const struct sockaddr *)&impl->src_addr, - impl->src_len, impl->ifname)) < 0) { + impl->src_len, impl->ifname, + &(impl->igmp_recovery))) < 0) { /* If make_socket() tries to create a socket and join to a multicast * group while the network interfaces are not ready yet to do so * (usually because a network manager component is still setting up @@ -433,7 +608,7 @@ static void stream_open_connection(void *data, int *result) * stream_start() call after some time. The stream_start_retry_timer exists * precisely for that purpose. This means that ENODEV is not treated as * an error, but instead, it triggers the creation of that timer. */ - if (errno == ENODEV) { + if (fd == -ENODEV) { pw_log_warn("failed to create socket because network device is not ready " "and present yet; will try again"); @@ -449,12 +624,12 @@ static void stream_open_connection(void *data, int *result) res = 0; goto finish; } else { - pw_log_error("failed to create socket: %m"); + pw_log_error("failed to create socket: %s", spa_strerror(fd)); /* If ENODEV was returned earlier, and the stream_start_retry_timer * was consequently created, but then a non-ENODEV error occurred, * the timer must be stopped and removed. */ pw_timer_queue_cancel(&impl->stream_start_retry_timer); - res = -errno; + res = fd; goto finish; } } @@ -472,6 +647,13 @@ static void stream_open_connection(void *data, int *result) goto finish; } + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer, + NULL, impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC, + on_igmp_recovery_timer_event, impl)) < 0) { + pw_log_error("can't add timer: %s", spa_strerror(res)); + goto finish; + } + finish: if (res != 0) { pw_log_error("failed to start RTP stream: %s", spa_strerror(res)); @@ -495,6 +677,7 @@ static void stream_close_connection(void *data, int *result) pw_log_info("stopping RTP listener"); pw_timer_queue_cancel(&impl->stream_start_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); pw_loop_destroy_source(impl->data_loop, impl->source); impl->source = NULL; @@ -633,6 +816,7 @@ static void impl_destroy(struct impl *impl) pw_timer_queue_cancel(&impl->standby_timer); pw_timer_queue_cancel(&impl->stream_start_retry_timer); + pw_timer_queue_cancel(&impl->igmp_recovery.timer); if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); @@ -797,9 +981,20 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) * till we make it (or get timed out) */ pw_properties_set(stream_props, "rtp.receiving", "true"); - impl->cleanup_interval = pw_properties_get_uint32(props, + impl->cleanup_interval = pw_properties_get_uint32(stream_props, "cleanup.sec", DEFAULT_CLEANUP_SEC); + impl->igmp_recovery.check_interval = SPA_MAX(pw_properties_get_uint32(stream_props, + "igmp.check.interval.sec", + DEFAULT_IGMP_CHECK_INTERVAL_SEC), 1u); + pw_log_info("using IGMP check interval of %" PRIu32 " second(s)", + impl->igmp_recovery.check_interval); + + impl->igmp_recovery.deadline = SPA_MAX(pw_properties_get_uint32(stream_props, + "igmp.deadline.sec", DEFAULT_IGMP_DEADLINE_SEC), 5u); + pw_log_info("using IGMP deadline of %" PRIu32 " second(s)", + impl->igmp_recovery.deadline); + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME); diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 1563e1917..0af38d649 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -233,7 +233,8 @@ static void rtp_audio_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } -static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen, plen; @@ -273,7 +274,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len) timestamp = ntohl(hdr->timestamp) - impl->ts_offset; impl->receiving = true; - impl->last_recv_timestamp = pw_stream_get_nsec(impl->stream); + impl->last_recv_timestamp = current_time; plen = len - hlen; samples = plen / stride; diff --git a/src/modules/module-rtp/midi.c b/src/modules/module-rtp/midi.c index 498c6e6a9..5fbdf3b63 100644 --- a/src/modules/module-rtp/midi.c +++ b/src/modules/module-rtp/midi.c @@ -318,7 +318,8 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti return 0; } -static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen; diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c index 7eeda7f43..d13a4efaf 100644 --- a/src/modules/module-rtp/opus.c +++ b/src/modules/module-rtp/opus.c @@ -99,7 +99,8 @@ static void rtp_opus_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } -static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time) { struct rtp_header *hdr; ssize_t hlen, plen; diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 3834206ec..fa055ce0c 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -151,7 +151,8 @@ struct impl { * access below for the reason why. */ uint8_t timer_running; - int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); + int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len, + uint64_t current_time); /* Used for resetting the ring buffer before the stream starts, to prevent * reading from uninitialized memory. This can otherwise happen in direct * timestamp mode when the read index is set to an uninitialized location. @@ -1036,10 +1037,17 @@ int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *di return pw_stream_update_properties(impl->stream, dict); } -int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, + uint64_t current_time) { struct impl *impl = (struct impl*)s; - return impl->receive_rtp(impl, buffer, len); + return impl->receive_rtp(impl, buffer, len, current_time); +} + +uint64_t rtp_stream_get_nsec(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + return pw_stream_get_nsec(impl->stream); } uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate) diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index ea358f350..095b8395c 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -62,7 +62,10 @@ void rtp_stream_destroy(struct rtp_stream *s); int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *dict); -int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len); +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len, + uint64_t current_time); + +uint64_t rtp_stream_get_nsec(struct rtp_stream *s); uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate);