mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
Merge branch 'rtp-module-fixes' into 'master'
module-rtp: Changes for better robustness, including SAP receive start retries, PTP management socket reconnects, and IGMP recovery logic See merge request pipewire/pipewire!2580
This commit is contained in:
commit
57811bf80c
8 changed files with 469 additions and 52 deletions
|
|
@ -248,6 +248,16 @@ struct node {
|
|||
struct session *session;
|
||||
};
|
||||
|
||||
struct igmp_recovery {
|
||||
struct pw_timer timer;
|
||||
int socket_fd;
|
||||
struct sockaddr_storage mcast_addr;
|
||||
socklen_t mcast_len;
|
||||
uint32_t if_index;
|
||||
bool is_ipv6;
|
||||
uint32_t deadline;
|
||||
};
|
||||
|
||||
struct impl {
|
||||
struct pw_properties *props;
|
||||
|
||||
|
|
@ -265,7 +275,11 @@ struct impl {
|
|||
struct pw_registry *registry;
|
||||
struct spa_hook registry_listener;
|
||||
|
||||
struct pw_timer timer;
|
||||
struct pw_timer sap_send_timer;
|
||||
|
||||
/* This timer is used when the first start_sap() call fails because
|
||||
* of an ENODEV error (see the start_sap() code for details) */
|
||||
struct pw_timer start_sap_retry_timer;
|
||||
|
||||
char *ifname;
|
||||
uint32_t ttl;
|
||||
|
|
@ -281,6 +295,10 @@ struct impl {
|
|||
struct spa_source *sap_source;
|
||||
uint32_t cleanup_interval;
|
||||
|
||||
/* IGMP recovery (triggers when no SAP packets are
|
||||
* received after the recovery deadline is reached) */
|
||||
struct igmp_recovery igmp_recovery;
|
||||
|
||||
uint32_t max_sessions;
|
||||
uint32_t n_sessions;
|
||||
struct spa_list sessions;
|
||||
|
|
@ -288,7 +306,7 @@ struct impl {
|
|||
char *extra_attrs_preamble;
|
||||
char *extra_attrs_end;
|
||||
|
||||
char *ptp_mgmt_socket;
|
||||
char *ptp_mgmt_socket_path;
|
||||
int ptp_fd;
|
||||
uint32_t ptp_seq;
|
||||
uint8_t clock_id[8];
|
||||
|
|
@ -322,6 +340,7 @@ static const struct format_info *find_audio_format_info(const char *mime)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int start_sap(struct impl *impl);
|
||||
static int send_sap(struct impl *impl, struct session *sess, bool bye);
|
||||
|
||||
|
||||
|
|
@ -383,7 +402,7 @@ static bool is_multicast(struct sockaddr *sa, socklen_t salen)
|
|||
return false;
|
||||
}
|
||||
|
||||
static int make_unix_socket(const char *path) {
|
||||
static int make_unix_ptp_mgmt_socket(const char *path) {
|
||||
struct sockaddr_un addr;
|
||||
|
||||
spa_autoclose int fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0);
|
||||
|
|
@ -419,7 +438,7 @@ static int make_send_socket(
|
|||
|
||||
af = src->ss_family;
|
||||
if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) {
|
||||
pw_log_error("socket failed: %m");
|
||||
pw_log_error("socket() failed: %m");
|
||||
return -errno;
|
||||
}
|
||||
if (bind(fd, (struct sockaddr*)src, src_len) < 0) {
|
||||
|
|
@ -451,6 +470,9 @@ static int make_send_socket(
|
|||
pw_log_warn("setsockopt(IPV6_MULTICAST_HOPS) failed: %m");
|
||||
}
|
||||
}
|
||||
|
||||
pw_log_info("sender socket up and running");
|
||||
|
||||
return fd;
|
||||
error:
|
||||
close(fd);
|
||||
|
|
@ -458,7 +480,7 @@ error:
|
|||
}
|
||||
|
||||
static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen,
|
||||
char *ifname)
|
||||
char *ifname, struct igmp_recovery *igmp_recovery)
|
||||
{
|
||||
int af, fd, val, res;
|
||||
struct ifreq req;
|
||||
|
|
@ -468,13 +490,13 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen,
|
|||
|
||||
af = sa->ss_family;
|
||||
if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) {
|
||||
pw_log_error("socket failed: %m");
|
||||
pw_log_error("socket() failed: %m");
|
||||
return -errno;
|
||||
}
|
||||
val = 1;
|
||||
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
|
||||
res = -errno;
|
||||
pw_log_error("setsockopt failed: %m");
|
||||
pw_log_error("setsockopt() failed: %m");
|
||||
goto error;
|
||||
}
|
||||
spa_zero(req);
|
||||
|
|
@ -528,6 +550,16 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen,
|
|||
goto error;
|
||||
}
|
||||
|
||||
/* Store multicast info for recovery */
|
||||
igmp_recovery->socket_fd = fd;
|
||||
igmp_recovery->mcast_addr = ba;
|
||||
igmp_recovery->mcast_len = salen;
|
||||
igmp_recovery->if_index = req.ifr_ifindex;
|
||||
igmp_recovery->is_ipv6 = (af == AF_INET6);
|
||||
pw_log_debug("stored %s multicast info: socket_fd=%d, "
|
||||
"if_index=%d", igmp_recovery->is_ipv6 ?
|
||||
"IPv6" : "IPv4", fd, req.ifr_ifindex);
|
||||
|
||||
if (bind(fd, (struct sockaddr*)&ba, salen) < 0) {
|
||||
res = -errno;
|
||||
pw_log_error("bind() failed: %m");
|
||||
|
|
@ -540,6 +572,9 @@ static int make_recv_socket(struct sockaddr_storage *sa, socklen_t salen,
|
|||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
pw_log_info("receiver socket up and running");
|
||||
|
||||
return fd;
|
||||
error:
|
||||
close(fd);
|
||||
|
|
@ -548,8 +583,13 @@ error:
|
|||
|
||||
static bool update_ts_refclk(struct impl *impl)
|
||||
{
|
||||
if (!impl->ptp_mgmt_socket || impl->ptp_fd < 0)
|
||||
if (!impl->ptp_mgmt_socket_path)
|
||||
return false;
|
||||
if (impl->ptp_fd < 0) {
|
||||
impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path);
|
||||
if (impl->ptp_fd < 0)
|
||||
return false;
|
||||
}
|
||||
|
||||
// Read if something is left in the socket
|
||||
int avail;
|
||||
|
|
@ -581,6 +621,12 @@ static bool update_ts_refclk(struct impl *impl)
|
|||
|
||||
if (write(impl->ptp_fd, &req, sizeof(req)) == -1) {
|
||||
pw_log_warn("Failed to send PTP management request: %m");
|
||||
if (errno != ENOTCONN)
|
||||
return false;
|
||||
close(impl->ptp_fd);
|
||||
impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path);
|
||||
if (impl->ptp_fd > -1)
|
||||
pw_log_info("Reopened PTP management socket");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -922,7 +968,98 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye)
|
|||
return res;
|
||||
}
|
||||
|
||||
static void on_timer_event(void *data)
|
||||
static void on_igmp_recovery_timer_event(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
char addr[128];
|
||||
int res = 0;
|
||||
|
||||
/* Only attempt recovery if we have a valid socket and multicast address */
|
||||
if (impl->igmp_recovery.socket_fd < 0) {
|
||||
pw_log_debug("no socket, skipping IGMP recovery");
|
||||
goto finish;
|
||||
}
|
||||
|
||||
pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL);
|
||||
pw_log_info("IGMP recovery triggered for %s", addr);
|
||||
|
||||
/* Force IGMP membership refresh by leaving the group first, then rejoin */
|
||||
if (impl->igmp_recovery.is_ipv6) {
|
||||
struct ipv6_mreq mr6;
|
||||
memset(&mr6, 0, sizeof(mr6));
|
||||
mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr;
|
||||
mr6.ipv6mr_interface = impl->igmp_recovery.if_index;
|
||||
|
||||
/* Leave the group first */
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
|
||||
&mr6, sizeof(mr6));
|
||||
if (SPA_LIKELY(res == 0)) {
|
||||
pw_log_info("left IPv6 multicast group");
|
||||
} else {
|
||||
if (errno == EADDRNOTAVAIL) {
|
||||
pw_log_info("attempted to leave IPv6 multicast group, but "
|
||||
"membership was already silently dropped");
|
||||
} else {
|
||||
pw_log_warn("failed to leave IPv6 multicast group: %m");
|
||||
}
|
||||
}
|
||||
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
|
||||
&mr6, sizeof(mr6));
|
||||
if (res < 0) {
|
||||
pw_log_warn("failed to re-join IPv6 multicast group: %m");
|
||||
} else {
|
||||
pw_log_info("re-joined IPv6 multicast group successfully");
|
||||
}
|
||||
} else {
|
||||
struct ip_mreqn mr4;
|
||||
memset(&mr4, 0, sizeof(mr4));
|
||||
mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr;
|
||||
mr4.imr_ifindex = impl->igmp_recovery.if_index;
|
||||
|
||||
/* Leave the group first */
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP,
|
||||
&mr4, sizeof(mr4));
|
||||
if (SPA_LIKELY(res == 0)) {
|
||||
pw_log_info("left IPv4 multicast group");
|
||||
} else {
|
||||
if (errno == EADDRNOTAVAIL) {
|
||||
pw_log_info("attempted to leave IPv4 multicast group, but "
|
||||
"membership was already silently dropped");
|
||||
} else {
|
||||
pw_log_warn("failed to leave IPv4 multicast group: %m");
|
||||
}
|
||||
}
|
||||
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
|
||||
&mr4, sizeof(mr4));
|
||||
if (res < 0) {
|
||||
pw_log_warn("failed to re-join IPv4 multicast group: %m");
|
||||
} else {
|
||||
pw_log_info("re-joined IPv4 multicast group successfully");
|
||||
}
|
||||
}
|
||||
|
||||
finish:
|
||||
/* If rejoining failed, try again in 1 second. This can happen
|
||||
* for example when the network interface went down, and is not
|
||||
* yet up and running again, and ENODEV is returned as a result.
|
||||
* Otherwise, continue with the usual deadline. */
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer,
|
||||
&impl->igmp_recovery.timer.timeout,
|
||||
((res == 0) ? impl->igmp_recovery.deadline : 1) * SPA_NSEC_PER_SEC,
|
||||
on_igmp_recovery_timer_event, impl);
|
||||
}
|
||||
|
||||
static void rearm_igmp_recovery_timer(struct impl *impl)
|
||||
{
|
||||
pw_timer_queue_cancel(&impl->igmp_recovery.timer);
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer,
|
||||
NULL, impl->igmp_recovery.deadline * SPA_NSEC_PER_SEC,
|
||||
on_igmp_recovery_timer_event, impl);
|
||||
}
|
||||
|
||||
static void on_sap_send_timer_event(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
struct session *sess, *tmp;
|
||||
|
|
@ -956,9 +1093,16 @@ static void on_timer_event(void *data)
|
|||
|
||||
}
|
||||
}
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->timer,
|
||||
&impl->timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC,
|
||||
on_timer_event, impl);
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer,
|
||||
&impl->sap_send_timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC,
|
||||
on_sap_send_timer_event, impl);
|
||||
}
|
||||
|
||||
static void on_start_sap_retry_timer_event(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_log_debug("trying again to start SAP send after previous attempt failed with ENODEV");
|
||||
start_sap(impl);
|
||||
}
|
||||
|
||||
static struct session *session_find(struct impl *impl, const struct sdp_info *info)
|
||||
|
|
@ -1646,23 +1790,70 @@ on_sap_io(void *data, int fd, uint32_t mask)
|
|||
buffer[len] = 0;
|
||||
if ((res = parse_sap(impl, buffer, len)) < 0)
|
||||
pw_log_warn("error parsing SAP: %s", spa_strerror(res));
|
||||
|
||||
rearm_igmp_recovery_timer(impl);
|
||||
}
|
||||
}
|
||||
|
||||
static int start_sap(struct impl *impl)
|
||||
{
|
||||
int fd = -1, res;
|
||||
int fd = -1, res = 0;
|
||||
char addr[128] = "invalid";
|
||||
|
||||
pw_log_info("starting SAP timer");
|
||||
if ((res = pw_timer_queue_add(impl->timer_queue, &impl->timer,
|
||||
pw_log_info("starting SAP send timer");
|
||||
/* start_sap() might be called more than once. See the make_recv_socket()
|
||||
* call below for why that can happen. In such a case, the timer was
|
||||
* started already. The easiest way of handling it is to just cancel it.
|
||||
* Such cases are not expected to occur often, so canceling and then
|
||||
* adding the timer again is acceptable. */
|
||||
pw_timer_queue_cancel(&impl->sap_send_timer);
|
||||
if ((res = pw_timer_queue_add(impl->timer_queue, &impl->sap_send_timer,
|
||||
NULL, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC,
|
||||
on_timer_event, impl)) < 0) {
|
||||
pw_log_error("can't add timer: %s", spa_strerror(res));
|
||||
on_sap_send_timer_event, impl)) < 0) {
|
||||
pw_log_error("can't add SAP send timer: %s", spa_strerror(res));
|
||||
goto error;
|
||||
}
|
||||
if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0)
|
||||
return fd;
|
||||
if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname,
|
||||
&(impl->igmp_recovery))) < 0) {
|
||||
/* If make_recv_socket() tries to create a socket and join to a multicast
|
||||
* group while the network interfaces are not ready yet to do so
|
||||
* (usually because a network manager component is still setting up
|
||||
* those network interfaces), ENODEV will be returned. This is essentially
|
||||
* a race condition. There is no discernible way to be notified when the
|
||||
* network interfaces are ready for that operation, so the next best
|
||||
* approach is to essentially do a form of polling by retrying the
|
||||
* start_sap() call after some time. The start_sap_retry_timer exists
|
||||
* precisely for that purpose. This means that ENODEV is not treated as
|
||||
* an error, but instead, it triggers the creation of that timer. */
|
||||
if (fd == -ENODEV) {
|
||||
pw_log_warn("failed to create receiver socket because network device "
|
||||
"is not ready and present yet; will try again");
|
||||
|
||||
pw_timer_queue_cancel(&impl->start_sap_retry_timer);
|
||||
/* Use a 1-second retry interval. The network interfaces
|
||||
* are likely to be up and running then. */
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->start_sap_retry_timer,
|
||||
NULL, 1 * SPA_NSEC_PER_SEC,
|
||||
on_start_sap_retry_timer_event, impl);
|
||||
|
||||
/* It is important to return 0 in this case. Otherwise, the nonzero return
|
||||
* value will later be propagated through the core as an error. */
|
||||
res = 0;
|
||||
goto finish;
|
||||
} else {
|
||||
pw_log_error("failed to create socket: %s", spa_strerror(-fd));
|
||||
/* If ENODEV was returned earlier, and the start_sap_retry_timer
|
||||
* was consequently created, but then a non-ENODEV error occurred,
|
||||
* the timer must be stopped and removed. */
|
||||
pw_timer_queue_cancel(&impl->start_sap_retry_timer);
|
||||
res = fd;
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
/* Cleanup the timer in case ENODEV occurred earlier, and this time,
|
||||
* the socket creation succeeded. */
|
||||
pw_timer_queue_cancel(&impl->start_sap_retry_timer);
|
||||
|
||||
pw_net_get_ip(&impl->sap_addr, addr, sizeof(addr), NULL, NULL);
|
||||
pw_log_info("starting SAP listener on %s", addr);
|
||||
|
|
@ -1673,11 +1864,15 @@ static int start_sap(struct impl *impl)
|
|||
goto error;
|
||||
}
|
||||
|
||||
return 0;
|
||||
rearm_igmp_recovery_timer(impl);
|
||||
|
||||
finish:
|
||||
return res;
|
||||
|
||||
error:
|
||||
if (fd > 0)
|
||||
close(fd);
|
||||
return res;
|
||||
goto finish;
|
||||
}
|
||||
|
||||
static void node_event_info(void *data, const struct pw_node_info *info)
|
||||
|
|
@ -1807,7 +2002,9 @@ static void impl_destroy(struct impl *impl)
|
|||
if (impl->core && impl->do_disconnect)
|
||||
pw_core_disconnect(impl->core);
|
||||
|
||||
pw_timer_queue_cancel(&impl->timer);
|
||||
pw_timer_queue_cancel(&impl->sap_send_timer);
|
||||
pw_timer_queue_cancel(&impl->start_sap_retry_timer);
|
||||
pw_timer_queue_cancel(&impl->igmp_recovery.timer);
|
||||
if (impl->sap_source)
|
||||
pw_loop_destroy_source(impl->loop, impl->sap_source);
|
||||
|
||||
|
|
@ -1821,7 +2018,7 @@ static void impl_destroy(struct impl *impl)
|
|||
free(impl->extra_attrs_preamble);
|
||||
free(impl->extra_attrs_end);
|
||||
|
||||
free(impl->ptp_mgmt_socket);
|
||||
free(impl->ptp_mgmt_socket_path);
|
||||
free(impl->ifname);
|
||||
free(impl);
|
||||
}
|
||||
|
|
@ -1874,6 +2071,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
impl->ptp_fd = -1;
|
||||
spa_list_init(&impl->sessions);
|
||||
|
||||
impl->igmp_recovery.socket_fd = -1;
|
||||
impl->igmp_recovery.if_index = -1;
|
||||
|
||||
if (args == NULL)
|
||||
args = "";
|
||||
|
||||
|
|
@ -1893,11 +2093,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
impl->ifname = str ? strdup(str) : NULL;
|
||||
|
||||
str = pw_properties_get(props, "ptp.management-socket");
|
||||
impl->ptp_mgmt_socket = str ? strdup(str) : NULL;
|
||||
impl->ptp_mgmt_socket_path = str ? strdup(str) : NULL;
|
||||
|
||||
// TODO: support UDP management access as well
|
||||
if (impl->ptp_mgmt_socket)
|
||||
impl->ptp_fd = make_unix_socket(impl->ptp_mgmt_socket);
|
||||
if (impl->ptp_mgmt_socket_path)
|
||||
impl->ptp_fd = make_unix_ptp_mgmt_socket(impl->ptp_mgmt_socket_path);
|
||||
|
||||
if ((str = pw_properties_get(props, "sap.ip")) == NULL)
|
||||
str = DEFAULT_SAP_IP;
|
||||
|
|
@ -1909,6 +2109,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
impl->cleanup_interval = pw_properties_get_uint32(impl->props,
|
||||
"sap.cleanup.sec", DEFAULT_CLEANUP_SEC);
|
||||
|
||||
/* We will use half of the cleanup interval for IGMP deadline, minimum 1 second */
|
||||
impl->igmp_recovery.deadline = SPA_MAX(impl->cleanup_interval / 2, 1u);
|
||||
pw_log_info("using IGMP deadline of %" PRIu32 " second(s)",
|
||||
impl->igmp_recovery.deadline);
|
||||
|
||||
impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
|
||||
impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
|
||||
impl->max_sessions = pw_properties_get_uint32(props, "sap.max-sessions", DEFAULT_MAX_SESSIONS);
|
||||
|
|
|
|||
|
|
@ -1043,8 +1043,11 @@ on_data_io(void *data, int fd, uint32_t mask)
|
|||
if (sess == NULL)
|
||||
goto unknown_ssrc;
|
||||
|
||||
if (sess->data_ready && sess->receiving)
|
||||
rtp_stream_receive_packet(sess->recv, buffer, len);
|
||||
if (sess->data_ready && sess->receiving) {
|
||||
uint64_t current_time = rtp_stream_get_nsec(sess->recv);
|
||||
rtp_stream_receive_packet(sess->recv, buffer, len,
|
||||
current_time);
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
#include <net/if.h>
|
||||
#include <ctype.h>
|
||||
|
||||
#include <spa/utils/atomic.h>
|
||||
#include <spa/utils/hook.h>
|
||||
#include <spa/utils/result.h>
|
||||
#include <spa/utils/ringbuffer.h>
|
||||
|
|
@ -156,6 +157,9 @@
|
|||
PW_LOG_TOPIC(mod_topic, "mod." NAME);
|
||||
#define PW_LOG_TOPIC_DEFAULT mod_topic
|
||||
|
||||
#define DEFAULT_IGMP_CHECK_INTERVAL_SEC 5
|
||||
#define DEFAULT_IGMP_DEADLINE_SEC 30
|
||||
|
||||
#define DEFAULT_CLEANUP_SEC 60
|
||||
#define DEFAULT_SOURCE_IP "224.0.0.56"
|
||||
|
||||
|
|
@ -180,6 +184,23 @@ static const struct spa_dict_item module_info[] = {
|
|||
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
|
||||
};
|
||||
|
||||
struct igmp_recovery {
|
||||
struct pw_timer timer;
|
||||
int socket_fd;
|
||||
struct sockaddr_storage mcast_addr;
|
||||
socklen_t mcast_len;
|
||||
uint32_t if_index;
|
||||
bool is_ipv6;
|
||||
/* This is the interval the recovery timer runs at. The timer
|
||||
* checks at each interval if recovery is required. This value
|
||||
* is defined by the igmp.check.interval.sec property. */
|
||||
uint32_t check_interval;
|
||||
/* This is the deadline for packets to arrive. If the deadline
|
||||
* is exceeded, an IGMP recovery is attempted. This value is
|
||||
* defined by the igmp.deadline.sec property. */
|
||||
uint32_t deadline;
|
||||
};
|
||||
|
||||
struct impl {
|
||||
struct pw_impl_module *module;
|
||||
struct spa_hook module_listener;
|
||||
|
|
@ -201,6 +222,15 @@ struct impl {
|
|||
bool always_process;
|
||||
uint32_t cleanup_interval;
|
||||
|
||||
/* IGMP recovery (triggers when no RTP packets are
|
||||
* received after the recovery deadline is reached) */
|
||||
struct igmp_recovery igmp_recovery;
|
||||
|
||||
/* Monotonic timestamp of the last time a packet was
|
||||
* received. This is accessed with atomic accessors
|
||||
* to avoid race conditions. */
|
||||
uint64_t last_packet_time;
|
||||
|
||||
struct pw_timer standby_timer;
|
||||
/* This timer is used when the first stream_start() call fails because
|
||||
* of an ENODEV error (see the stream_start() code for details) */
|
||||
|
|
@ -227,13 +257,6 @@ struct impl {
|
|||
bool waiting;
|
||||
};
|
||||
|
||||
static inline uint64_t get_time_ns(void)
|
||||
{
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
return SPA_TIMESPEC_TO_NSEC(&ts);
|
||||
}
|
||||
|
||||
static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data,
|
||||
size_t size, void *user_data)
|
||||
{
|
||||
|
|
@ -261,6 +284,9 @@ on_rtp_io(void *data, int fd, uint32_t mask)
|
|||
struct impl *impl = data;
|
||||
ssize_t len;
|
||||
int suppressed;
|
||||
uint64_t current_time;
|
||||
|
||||
current_time = rtp_stream_get_nsec(impl->stream);
|
||||
|
||||
if (mask & SPA_IO_IN) {
|
||||
if ((len = recv(fd, impl->buffer, impl->buffer_size, 0)) < 0)
|
||||
|
|
@ -270,10 +296,17 @@ on_rtp_io(void *data, int fd, uint32_t mask)
|
|||
goto short_packet;
|
||||
|
||||
if (SPA_LIKELY(impl->stream)) {
|
||||
if (rtp_stream_receive_packet(impl->stream, impl->buffer, len) < 0)
|
||||
if (rtp_stream_receive_packet(impl->stream, impl->buffer, len,
|
||||
current_time) < 0)
|
||||
goto receive_error;
|
||||
}
|
||||
|
||||
/* Update last packet timestamp for IGMP recovery.
|
||||
* The recovery timer will check this to see if recovery
|
||||
* is necessary. Do this _before_ invoking do_start()
|
||||
* in case the stream is waking up from standby. */
|
||||
SPA_ATOMIC_STORE(impl->last_packet_time, current_time);
|
||||
|
||||
if (SPA_ATOMIC_LOAD(impl->state) != STATE_RECEIVING) {
|
||||
if (!SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_RECEIVING)) {
|
||||
if (SPA_ATOMIC_CAS(impl->state, STATE_IDLE, STATE_RECEIVING))
|
||||
|
|
@ -284,17 +317,148 @@ on_rtp_io(void *data, int fd, uint32_t mask)
|
|||
return;
|
||||
|
||||
receive_error:
|
||||
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
|
||||
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0)
|
||||
pw_log_warn("(%d suppressed) recv() error: %m", suppressed);
|
||||
return;
|
||||
short_packet:
|
||||
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
|
||||
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, current_time)) >= 0)
|
||||
pw_log_warn("(%d suppressed) short packet of len %zd received",
|
||||
suppressed, len);
|
||||
return;
|
||||
}
|
||||
|
||||
static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname)
|
||||
static int rejoin_igmp_group(struct spa_loop *loop, bool async, uint32_t seq,
|
||||
const void *data, size_t size, void *user_data)
|
||||
{
|
||||
/* IMPORTANT: This must be run from within the data loop. */
|
||||
|
||||
int res;
|
||||
struct impl *impl = user_data;
|
||||
uint64_t current_time;
|
||||
|
||||
/* Force IGMP membership refresh by leaving the group first, then rejoin */
|
||||
if (impl->igmp_recovery.is_ipv6) {
|
||||
struct ipv6_mreq mr6;
|
||||
memset(&mr6, 0, sizeof(mr6));
|
||||
mr6.ipv6mr_multiaddr = ((struct sockaddr_in6*)&impl->igmp_recovery.mcast_addr)->sin6_addr;
|
||||
mr6.ipv6mr_interface = impl->igmp_recovery.if_index;
|
||||
|
||||
/* Leave the group first */
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
|
||||
&mr6, sizeof(mr6));
|
||||
if (SPA_LIKELY(res == 0)) {
|
||||
pw_log_info("left IPv6 multicast group");
|
||||
} else {
|
||||
if (errno == EADDRNOTAVAIL) {
|
||||
pw_log_info("attempted to leave IPv6 multicast group, but "
|
||||
"membership was already silently dropped");
|
||||
} else {
|
||||
pw_log_warn("failed to leave IPv6 multicast group: %m");
|
||||
}
|
||||
}
|
||||
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
|
||||
&mr6, sizeof(mr6));
|
||||
if (res < 0) {
|
||||
pw_log_warn("failed to re-join IPv6 multicast group: %m");
|
||||
} else {
|
||||
pw_log_info("re-joined IPv6 multicast group successfully");
|
||||
}
|
||||
} else {
|
||||
struct ip_mreqn mr4;
|
||||
memset(&mr4, 0, sizeof(mr4));
|
||||
mr4.imr_multiaddr = ((struct sockaddr_in*)&impl->igmp_recovery.mcast_addr)->sin_addr;
|
||||
mr4.imr_ifindex = impl->igmp_recovery.if_index;
|
||||
|
||||
/* Leave the group first */
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_DROP_MEMBERSHIP,
|
||||
&mr4, sizeof(mr4));
|
||||
if (SPA_LIKELY(res == 0)) {
|
||||
pw_log_info("left IPv4 multicast group");
|
||||
} else {
|
||||
if (errno == EADDRNOTAVAIL) {
|
||||
pw_log_info("attempted to leave IPv4 multicast group, but "
|
||||
"membership was already silently dropped");
|
||||
} else {
|
||||
pw_log_warn("failed to leave IPv4 multicast group: %m");
|
||||
}
|
||||
}
|
||||
|
||||
res = setsockopt(impl->igmp_recovery.socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
|
||||
&mr4, sizeof(mr4));
|
||||
if (res < 0) {
|
||||
pw_log_warn("failed to re-join IPv4 multicast group: %m");
|
||||
} else {
|
||||
pw_log_info("re-joined IPv4 multicast group successfully");
|
||||
}
|
||||
}
|
||||
|
||||
current_time = rtp_stream_get_nsec(impl->stream);
|
||||
SPA_ATOMIC_STORE(impl->last_packet_time, current_time);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static void on_igmp_recovery_timer_event(void *data)
|
||||
{
|
||||
int res;
|
||||
struct impl *impl = data;
|
||||
char addr[128];
|
||||
uint64_t current_time, elapsed_seconds, last_packet_time;
|
||||
|
||||
/* Only attempt recovery if we have a valid socket and multicast address */
|
||||
if (SPA_UNLIKELY(impl->igmp_recovery.socket_fd < 0)) {
|
||||
pw_log_trace("no socket, skipping IGMP recovery");
|
||||
goto finish;
|
||||
}
|
||||
|
||||
/* This check if performed even if standby = false or
|
||||
* receiving != STATE_RECEIVING , because the very reason
|
||||
* for these states may be that the receiver socket was
|
||||
* silently kicked out of the IGMP group (which causes data
|
||||
* to no longer arrive, thus leading to these states). */
|
||||
|
||||
current_time = rtp_stream_get_nsec(impl->stream);
|
||||
last_packet_time = SPA_ATOMIC_LOAD(impl->last_packet_time);
|
||||
elapsed_seconds = (current_time - last_packet_time) / SPA_NSEC_PER_SEC;
|
||||
|
||||
/* Only trigger recovery if enough time has elapsed since last packet */
|
||||
if (elapsed_seconds < impl->igmp_recovery.deadline) {
|
||||
pw_log_trace("IGMP recovery check: %" PRIu64 " seconds elapsed, "
|
||||
"need %" PRIu32 " seconds", elapsed_seconds,
|
||||
impl->igmp_recovery.deadline);
|
||||
goto finish;
|
||||
}
|
||||
|
||||
pw_net_get_ip(&impl->igmp_recovery.mcast_addr, addr, sizeof(addr), NULL, NULL);
|
||||
pw_log_info("starting IGMP recovery for %s", addr);
|
||||
|
||||
/* Run the actual recovery in the data loop, since recovery involves
|
||||
* rejoining the socket to the IGMP group. By running this in the
|
||||
* data loop, race conditions due to stray packets causing an on_rtp_io()
|
||||
* invocation at the same time when the IGMP group rejoining takes place
|
||||
* is avoided, since on_rtp_io() too runs in the data loop.
|
||||
* This is a blocking call to make sure the rejoin attempt was fully
|
||||
* done by the time this callback ends. (rejoin_igmp_group() does not
|
||||
* do work that takes a long time to finish. )*/
|
||||
res = pw_loop_locked(impl->data_loop, rejoin_igmp_group, 1, NULL, 0, impl);
|
||||
|
||||
if (SPA_LIKELY(res == 0)) {
|
||||
pw_log_info("IGMP recovery for %s finished", addr);
|
||||
} else {
|
||||
pw_log_error("error while finishing IGMP recovery for %s: %s",
|
||||
addr, spa_strerror(res));
|
||||
}
|
||||
|
||||
finish:
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer,
|
||||
&impl->igmp_recovery.timer.timeout,
|
||||
impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC,
|
||||
on_igmp_recovery_timer_event, impl);
|
||||
}
|
||||
|
||||
static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname,
|
||||
struct igmp_recovery *igmp_recovery)
|
||||
{
|
||||
int af, fd, val, res;
|
||||
struct ifreq req;
|
||||
|
|
@ -374,6 +538,16 @@ static int make_socket(const struct sockaddr* sa, socklen_t salen, char *ifname)
|
|||
goto error;
|
||||
}
|
||||
|
||||
/* Store multicast info for recovery */
|
||||
igmp_recovery->socket_fd = fd;
|
||||
igmp_recovery->mcast_addr = ba;
|
||||
igmp_recovery->mcast_len = salen;
|
||||
igmp_recovery->if_index = req.ifr_ifindex;
|
||||
igmp_recovery->is_ipv6 = (af == AF_INET6);
|
||||
pw_log_debug("stored %s multicast info: socket_fd=%d, "
|
||||
"if_index=%d", igmp_recovery->is_ipv6 ?
|
||||
"IPv6" : "IPv4", fd, req.ifr_ifindex);
|
||||
|
||||
if (bind(fd, (struct sockaddr*)&ba, salen) < 0) {
|
||||
res = -errno;
|
||||
pw_log_error("bind() failed: %m");
|
||||
|
|
@ -422,7 +596,8 @@ static void stream_open_connection(void *data, int *result)
|
|||
pw_log_info("starting RTP listener");
|
||||
|
||||
if ((fd = make_socket((const struct sockaddr *)&impl->src_addr,
|
||||
impl->src_len, impl->ifname)) < 0) {
|
||||
impl->src_len, impl->ifname,
|
||||
&(impl->igmp_recovery))) < 0) {
|
||||
/* If make_socket() tries to create a socket and join to a multicast
|
||||
* group while the network interfaces are not ready yet to do so
|
||||
* (usually because a network manager component is still setting up
|
||||
|
|
@ -433,7 +608,7 @@ static void stream_open_connection(void *data, int *result)
|
|||
* stream_start() call after some time. The stream_start_retry_timer exists
|
||||
* precisely for that purpose. This means that ENODEV is not treated as
|
||||
* an error, but instead, it triggers the creation of that timer. */
|
||||
if (errno == ENODEV) {
|
||||
if (fd == -ENODEV) {
|
||||
pw_log_warn("failed to create socket because network device is not ready "
|
||||
"and present yet; will try again");
|
||||
|
||||
|
|
@ -449,12 +624,12 @@ static void stream_open_connection(void *data, int *result)
|
|||
res = 0;
|
||||
goto finish;
|
||||
} else {
|
||||
pw_log_error("failed to create socket: %m");
|
||||
pw_log_error("failed to create socket: %s", spa_strerror(fd));
|
||||
/* If ENODEV was returned earlier, and the stream_start_retry_timer
|
||||
* was consequently created, but then a non-ENODEV error occurred,
|
||||
* the timer must be stopped and removed. */
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
res = -errno;
|
||||
res = fd;
|
||||
goto finish;
|
||||
}
|
||||
}
|
||||
|
|
@ -472,6 +647,13 @@ static void stream_open_connection(void *data, int *result)
|
|||
goto finish;
|
||||
}
|
||||
|
||||
if ((res = pw_timer_queue_add(impl->timer_queue, &impl->igmp_recovery.timer,
|
||||
NULL, impl->igmp_recovery.check_interval * SPA_NSEC_PER_SEC,
|
||||
on_igmp_recovery_timer_event, impl)) < 0) {
|
||||
pw_log_error("can't add timer: %s", spa_strerror(res));
|
||||
goto finish;
|
||||
}
|
||||
|
||||
finish:
|
||||
if (res != 0) {
|
||||
pw_log_error("failed to start RTP stream: %s", spa_strerror(res));
|
||||
|
|
@ -495,6 +677,7 @@ static void stream_close_connection(void *data, int *result)
|
|||
pw_log_info("stopping RTP listener");
|
||||
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
pw_timer_queue_cancel(&impl->igmp_recovery.timer);
|
||||
|
||||
pw_loop_destroy_source(impl->data_loop, impl->source);
|
||||
impl->source = NULL;
|
||||
|
|
@ -633,6 +816,7 @@ static void impl_destroy(struct impl *impl)
|
|||
|
||||
pw_timer_queue_cancel(&impl->standby_timer);
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
pw_timer_queue_cancel(&impl->igmp_recovery.timer);
|
||||
|
||||
if (impl->data_loop)
|
||||
pw_context_release_loop(impl->context, impl->data_loop);
|
||||
|
|
@ -797,9 +981,20 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
* till we make it (or get timed out) */
|
||||
pw_properties_set(stream_props, "rtp.receiving", "true");
|
||||
|
||||
impl->cleanup_interval = pw_properties_get_uint32(props,
|
||||
impl->cleanup_interval = pw_properties_get_uint32(stream_props,
|
||||
"cleanup.sec", DEFAULT_CLEANUP_SEC);
|
||||
|
||||
impl->igmp_recovery.check_interval = SPA_MAX(pw_properties_get_uint32(stream_props,
|
||||
"igmp.check.interval.sec",
|
||||
DEFAULT_IGMP_CHECK_INTERVAL_SEC), 1u);
|
||||
pw_log_info("using IGMP check interval of %" PRIu32 " second(s)",
|
||||
impl->igmp_recovery.check_interval);
|
||||
|
||||
impl->igmp_recovery.deadline = SPA_MAX(pw_properties_get_uint32(stream_props,
|
||||
"igmp.deadline.sec", DEFAULT_IGMP_DEADLINE_SEC), 5u);
|
||||
pw_log_info("using IGMP deadline of %" PRIu32 " second(s)",
|
||||
impl->igmp_recovery.deadline);
|
||||
|
||||
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
|
||||
if (impl->core == NULL) {
|
||||
str = pw_properties_get(props, PW_KEY_REMOTE_NAME);
|
||||
|
|
|
|||
|
|
@ -233,7 +233,8 @@ static void rtp_audio_process_playback(void *data)
|
|||
pw_stream_queue_buffer(impl->stream, buf);
|
||||
}
|
||||
|
||||
static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
|
||||
static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
|
||||
uint64_t current_time)
|
||||
{
|
||||
struct rtp_header *hdr;
|
||||
ssize_t hlen, plen;
|
||||
|
|
@ -273,7 +274,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
|
|||
timestamp = ntohl(hdr->timestamp) - impl->ts_offset;
|
||||
|
||||
impl->receiving = true;
|
||||
impl->last_recv_timestamp = pw_stream_get_nsec(impl->stream);
|
||||
impl->last_recv_timestamp = current_time;
|
||||
|
||||
plen = len - hlen;
|
||||
samples = plen / stride;
|
||||
|
|
|
|||
|
|
@ -318,7 +318,8 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
|
||||
static int rtp_midi_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
|
||||
uint64_t current_time)
|
||||
{
|
||||
struct rtp_header *hdr;
|
||||
ssize_t hlen;
|
||||
|
|
|
|||
|
|
@ -99,7 +99,8 @@ static void rtp_opus_process_playback(void *data)
|
|||
pw_stream_queue_buffer(impl->stream, buf);
|
||||
}
|
||||
|
||||
static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len)
|
||||
static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len,
|
||||
uint64_t current_time)
|
||||
{
|
||||
struct rtp_header *hdr;
|
||||
ssize_t hlen, plen;
|
||||
|
|
|
|||
|
|
@ -151,7 +151,8 @@ struct impl {
|
|||
* access below for the reason why. */
|
||||
uint8_t timer_running;
|
||||
|
||||
int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len);
|
||||
int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len,
|
||||
uint64_t current_time);
|
||||
/* Used for resetting the ring buffer before the stream starts, to prevent
|
||||
* reading from uninitialized memory. This can otherwise happen in direct
|
||||
* timestamp mode when the read index is set to an uninitialized location.
|
||||
|
|
@ -1036,10 +1037,17 @@ int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *di
|
|||
return pw_stream_update_properties(impl->stream, dict);
|
||||
}
|
||||
|
||||
int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len)
|
||||
int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len,
|
||||
uint64_t current_time)
|
||||
{
|
||||
struct impl *impl = (struct impl*)s;
|
||||
return impl->receive_rtp(impl, buffer, len);
|
||||
return impl->receive_rtp(impl, buffer, len, current_time);
|
||||
}
|
||||
|
||||
uint64_t rtp_stream_get_nsec(struct rtp_stream *s)
|
||||
{
|
||||
struct impl *impl = (struct impl*)s;
|
||||
return pw_stream_get_nsec(impl->stream);
|
||||
}
|
||||
|
||||
uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate)
|
||||
|
|
|
|||
|
|
@ -62,7 +62,10 @@ void rtp_stream_destroy(struct rtp_stream *s);
|
|||
|
||||
int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *dict);
|
||||
|
||||
int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len);
|
||||
int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len,
|
||||
uint64_t current_time);
|
||||
|
||||
uint64_t rtp_stream_get_nsec(struct rtp_stream *s);
|
||||
|
||||
uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue