fix punching task leak (#298)
Some checks failed
EasyTier Core / pre_job (push) Has been cancelled
EasyTier GUI / pre_job (push) Has been cancelled
EasyTier Mobile / pre_job (push) Has been cancelled
EasyTier Test / pre_job (push) Has been cancelled
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-latest, x86_64-unknown-freebsd) (push) Has been cancelled
EasyTier Core / build (linux-aarch64, ubuntu-latest, aarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-arm, ubuntu-latest, arm-unknown-linux-musleabi) (push) Has been cancelled
EasyTier Core / build (linux-armhf, ubuntu-latest, arm-unknown-linux-musleabihf) (push) Has been cancelled
EasyTier Core / build (linux-armv7, ubuntu-latest, armv7-unknown-linux-musleabi) (push) Has been cancelled
EasyTier Core / build (linux-armv7hf, ubuntu-latest, armv7-unknown-linux-musleabihf) (push) Has been cancelled
EasyTier Core / build (linux-mips, ubuntu-latest, mips-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-mipsel, ubuntu-latest, mipsel-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-x86_64, ubuntu-latest, x86_64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Has been cancelled
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Has been cancelled
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / core-result (push) Has been cancelled
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-latest, aarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-latest, x86_64-unknown-linux-musl) (push) Has been cancelled
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Has been cancelled
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Has been cancelled
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / gui-result (push) Has been cancelled
EasyTier Mobile / build-mobile (android, ubuntu-latest, android) (push) Has been cancelled
EasyTier Mobile / mobile-result (push) Has been cancelled
EasyTier Test / test (push) Has been cancelled

the punching task creator doesn't check if the task is already
running, and may create many punching task to same peer node.

this patch also improve hole punching by checking hole punch packet
even if punch rpc is failed.
This commit is contained in:
Sijie.Sun 2024-08-31 14:37:34 +08:00 committed by GitHub
parent 2058dbc470
commit f07b3ee9c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 185 additions and 35 deletions

1
Cargo.lock generated
View File

@ -1623,6 +1623,7 @@ dependencies = [
"derivative",
"encoding",
"futures",
"futures-util",
"gethostname 0.5.0",
"globwalk",
"http 1.1.0",

View File

@ -136,7 +136,7 @@ impl NetworkConfig {
}
cfg.set_rpc_portal(
format!("127.0.0.1:{}", self.rpc_port)
format!("0.0.0.0:{}", self.rpc_port)
.parse()
.with_context(|| format!("failed to parse rpc portal port: {}", self.rpc_port))?,
);

View File

@ -203,6 +203,7 @@ zip = "0.6.6"
[dev-dependencies]
serial_test = "3.0.0"
rstest = "0.18.2"
futures-util = "0.3.30"
[target.'cfg(target_os = "linux")'.dev-dependencies]
defguard_wireguard_rs = "0.4.2"

View File

@ -14,6 +14,7 @@ pub mod global_ctx;
pub mod ifcfg;
pub mod netns;
pub mod network;
pub mod scoped_task;
pub mod stun;
pub mod stun_codec_ext;

View File

@ -0,0 +1,134 @@
//! This crate provides a wrapper type of Tokio's JoinHandle: `ScopedTask`, which aborts the task when it's dropped.
//! `ScopedTask` can still be awaited to join the child-task, and abort-on-drop will still trigger while it is being awaited.
//!
//! For example, if task A spawned task B but is doing something else, and task B is waiting for task C to join,
//! aborting A will also abort both B and C.
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task::JoinHandle;
#[derive(Debug)]
pub struct ScopedTask<T> {
inner: JoinHandle<T>,
}
impl<T> Drop for ScopedTask<T> {
fn drop(&mut self) {
self.inner.abort()
}
}
impl<T> Future for ScopedTask<T> {
type Output = <JoinHandle<T> as Future>::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll(cx)
}
}
impl<T> From<JoinHandle<T>> for ScopedTask<T> {
fn from(inner: JoinHandle<T>) -> Self {
Self { inner }
}
}
impl<T> Deref for ScopedTask<T> {
type Target = JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[cfg(test)]
mod tests {
use super::ScopedTask;
use futures_util::future::pending;
use std::sync::{Arc, RwLock};
use tokio::task::yield_now;
struct Sentry(Arc<RwLock<bool>>);
impl Drop for Sentry {
fn drop(&mut self) {
*self.0.write().unwrap() = true
}
}
#[tokio::test]
async fn drop_while_not_waiting_for_join() {
let dropped = Arc::new(RwLock::new(false));
let sentry = Sentry(dropped.clone());
let task = ScopedTask::from(tokio::spawn(async move {
let _sentry = sentry;
pending::<()>().await
}));
yield_now().await;
assert!(!*dropped.read().unwrap());
drop(task);
yield_now().await;
assert!(*dropped.read().unwrap());
}
#[tokio::test]
async fn drop_while_waiting_for_join() {
let dropped = Arc::new(RwLock::new(false));
let sentry = Sentry(dropped.clone());
let handle = tokio::spawn(async move {
ScopedTask::from(tokio::spawn(async move {
let _sentry = sentry;
pending::<()>().await
}))
.await
.unwrap()
});
yield_now().await;
assert!(!*dropped.read().unwrap());
handle.abort();
yield_now().await;
assert!(*dropped.read().unwrap());
}
#[tokio::test]
async fn no_drop_only_join() {
assert_eq!(
ScopedTask::from(tokio::spawn(async {
yield_now().await;
5
}))
.await
.unwrap(),
5
)
}
#[tokio::test]
async fn manually_abort_before_drop() {
let dropped = Arc::new(RwLock::new(false));
let sentry = Sentry(dropped.clone());
let task = ScopedTask::from(tokio::spawn(async move {
let _sentry = sentry;
pending::<()>().await
}));
yield_now().await;
assert!(!*dropped.read().unwrap());
task.abort();
yield_now().await;
assert!(*dropped.read().unwrap());
}
#[tokio::test]
async fn manually_abort_then_join() {
let dropped = Arc::new(RwLock::new(false));
let sentry = Sentry(dropped.clone());
let task = ScopedTask::from(tokio::spawn(async move {
let _sentry = sentry;
pending::<()>().await
}));
yield_now().await;
assert!(!*dropped.read().unwrap());
task.abort();
yield_now().await;
assert!(task.await.is_err());
}
}

View File

@ -22,7 +22,7 @@ use zerocopy::FromBytes;
use crate::{
common::{
constants, error::Error, global_ctx::ArcGlobalCtx, join_joinset_background, netns::NetNS,
stun::StunInfoCollectorTrait, PeerId,
scoped_task::ScopedTask, stun::StunInfoCollectorTrait, PeerId,
},
defer,
peers::peer_manager::PeerManager,
@ -417,12 +417,12 @@ impl UdpHolePunchService for UdpHolePunchRpcServer {
}
// send max k1 packets if we are predicting the dst port
let max_k1 = 180;
let max_k1 = 60;
// send max k2 packets if we are sending to random port
let max_k2 = rand::thread_rng().gen_range(600..800);
// this means the NAT is allocating port in a predictable way
if max_port.abs_diff(min_port) <= max_k1 && round <= 6 && punch_predictablely {
if max_port.abs_diff(min_port) <= 3 * max_k1 && round <= 6 && punch_predictablely {
let (min_port, max_port) = {
// round begin from 0. if round is even, we guess port in increasing order
let port_delta = (max_k1 as u32) / ip_count as u32;
@ -849,10 +849,10 @@ impl UdpHolePunchConnector {
return Err(anyhow::anyhow!("failed to get public ips"));
}
let mut last_port_idx = 0;
let mut last_port_idx = rand::thread_rng().gen_range(0..data.shuffled_port_vec.len());
for round in 0..30 {
let Some(next_last_port_idx) = data
for round in 0..5 {
let ret = data
.peer_mgr
.get_peer_rpc_mgr()
.do_client_rpc_scoped(
@ -879,11 +879,20 @@ impl UdpHolePunchConnector {
last_port_idx
},
)
.await?
else {
return Err(anyhow::anyhow!("failed to get remote mapped addr"));
.await;
let next_last_port_idx = match ret {
Ok(Some(idx)) => idx,
err => {
tracing::error!(?err, "failed to get remote mapped addr");
rand::thread_rng().gen_range(0..data.shuffled_port_vec.len())
}
};
// wait for some time to increase the chance of receiving hole punching packet
tokio::time::sleep(Duration::from_secs(2)).await;
// no matter what the result is, we should check if we received any hole punching packet
while let Some(socket) = udp_array.try_fetch_punched_socket(tid) {
if let Ok(tunnel) = Self::try_connect_with_socket(socket, remote_mapped_addr).await
{
@ -901,8 +910,8 @@ impl UdpHolePunchConnector {
data: Arc<UdpHolePunchConnectorData>,
peer_id: PeerId,
) -> Result<(), anyhow::Error> {
const MAX_BACKOFF_TIME: u64 = 600;
let mut backoff_time = vec![15, 15, 30, 30, 60, 120, 300, MAX_BACKOFF_TIME];
const MAX_BACKOFF_TIME: u64 = 300;
let mut backoff_time = vec![15, 15, 30, 30, 60, 120, 180, MAX_BACKOFF_TIME];
let my_nat_type = data.my_nat_type();
loop {
@ -942,7 +951,7 @@ impl UdpHolePunchConnector {
async fn main_loop(data: Arc<UdpHolePunchConnectorData>) {
type JoinTaskRet = Result<(), anyhow::Error>;
type JoinTask = tokio::task::JoinHandle<JoinTaskRet>;
type JoinTask = ScopedTask<JoinTaskRet>;
let punching_task = Arc::new(DashMap::<(PeerId, NatType), JoinTask>::new());
let mut last_my_nat_type = NatType::Unknown;
@ -978,23 +987,27 @@ impl UdpHolePunchConnector {
last_my_nat_type = my_nat_type;
if !peers_to_connect.is_empty() {
let my_nat_type = data.my_nat_type();
if my_nat_type == NatType::Symmetric || my_nat_type == NatType::SymUdpFirewall {
let mut udp_array = data.udp_array.lock().await;
if udp_array.is_none() {
*udp_array = Some(Arc::new(UdpSocketArray::new(
data.udp_array_size.load(Ordering::Relaxed),
data.global_ctx.net_ns.clone(),
)));
}
let udp_array = udp_array.as_ref().unwrap();
udp_array.start().await.unwrap();
}
for item in peers_to_connect {
if punching_task.contains_key(&item) {
continue;
}
let my_nat_type = data.my_nat_type();
if my_nat_type == NatType::Symmetric || my_nat_type == NatType::SymUdpFirewall {
let mut udp_array = data.udp_array.lock().await;
if udp_array.is_none() {
*udp_array = Some(Arc::new(UdpSocketArray::new(
data.udp_array_size.load(Ordering::Relaxed),
data.global_ctx.net_ns.clone(),
)));
}
let udp_array = udp_array.as_ref().unwrap();
udp_array.start().await.unwrap();
}
punching_task.insert(
item,
tokio::spawn(Self::peer_punching_task(data.clone(), item.0)),
tokio::spawn(Self::peer_punching_task(data.clone(), item.0)).into(),
);
}
} else if punching_task.is_empty() {
@ -1173,9 +1186,9 @@ pub mod tests {
let udp_self = Arc::new(UdpSocket::bind("0.0.0.0:40144").await.unwrap());
let udp_inc = Arc::new(UdpSocket::bind("0.0.0.0:40147").await.unwrap());
let udp_inc2 = Arc::new(UdpSocket::bind("0.0.0.0:40400").await.unwrap());
let udp_inc2 = Arc::new(UdpSocket::bind("0.0.0.0:40200").await.unwrap());
let udp_dec = Arc::new(UdpSocket::bind("0.0.0.0:40140").await.unwrap());
let udp_dec2 = Arc::new(UdpSocket::bind("0.0.0.0:40350").await.unwrap());
let udp_dec2 = Arc::new(UdpSocket::bind("0.0.0.0:40050").await.unwrap());
let udps = vec![udp_self, udp_inc, udp_inc2, udp_dec, udp_dec2];
let counter = Arc::new(AtomicU32::new(0));
@ -1186,7 +1199,7 @@ pub mod tests {
tokio::spawn(async move {
let mut buf = [0u8; 1024];
let (len, addr) = udp.recv_from(&mut buf).await.unwrap();
println!("{:?} {:?}", len, addr);
println!("{:?} {:?} {:?}", len, addr, udp.local_addr());
counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
}

View File

@ -340,7 +340,7 @@ impl Cli {
}
fn check_tcp_available(port: u16) -> Option<SocketAddr> {
let s = format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap();
let s = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
TcpSocket::new_v4().unwrap().bind(s).map(|_| s).ok()
}
@ -353,9 +353,9 @@ impl Cli {
return s;
}
}
return "127.0.0.1:0".parse().unwrap();
return "0.0.0.0:0".parse().unwrap();
}
return format!("127.0.0.1:{}", port).parse().unwrap();
return format!("0.0.0.0:{}", port).parse().unwrap();
}
self.rpc_portal.parse().unwrap()

View File

@ -24,7 +24,7 @@ use crate::{
mpsc::{MpscTunnel, MpscTunnelSender},
packet_def::{PacketType, ZCPacket, ZCPacketType},
wireguard::{WgConfig, WgTunnelListener},
Tunnel, TunnelError, TunnelListener,
Tunnel, TunnelListener,
},
};

View File

@ -188,7 +188,7 @@ listeners = [
]
exit_nodes = []
peer = []
rpc_portal = "127.0.0.1:15888"
rpc_portal = "0.0.0.0:15888"
[network_identity]
network_name = "default"