improve hole punch (#403)
Some checks are pending
EasyTier Core / pre_job (push) Waiting to run
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-latest, x86_64-unknown-freebsd) (push) Blocked by required conditions
EasyTier Core / build (linux-aarch64, ubuntu-latest, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-arm, ubuntu-latest, arm-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armhf, ubuntu-latest, arm-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7, ubuntu-latest, armv7-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7hf, ubuntu-latest, armv7-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-mips, ubuntu-latest, mips-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-mipsel, ubuntu-latest, mipsel-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-x86_64, ubuntu-latest, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier Core / core-result (push) Blocked by required conditions
EasyTier GUI / pre_job (push) Waiting to run
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-latest, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-latest, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier GUI / gui-result (push) Blocked by required conditions
EasyTier Mobile / pre_job (push) Waiting to run
EasyTier Mobile / build-mobile (android, ubuntu-latest, android) (push) Blocked by required conditions
EasyTier Mobile / mobile-result (push) Blocked by required conditions
EasyTier Test / pre_job (push) Waiting to run
EasyTier Test / test (push) Blocked by required conditions

* fix duplicated peer id (again)

* improve udp hole punch

1. always try cone punch for any nat type, tolerate fault stun type.
2. serializing all sym punch request, including server side.
This commit is contained in:
Sijie.Sun 2024-10-10 00:07:42 +08:00 committed by GitHub
parent d9453589ac
commit 2c017e0fc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 143 additions and 67 deletions

View File

@ -185,7 +185,7 @@ impl PunchBothEasySymHoleClient {
my_nat_info: UdpNatType, my_nat_info: UdpNatType,
peer_nat_info: UdpNatType, peer_nat_info: UdpNatType,
is_busy: &mut bool, is_busy: &mut bool,
) -> Result<Box<dyn Tunnel>, anyhow::Error> { ) -> Result<Option<Box<dyn Tunnel>>, anyhow::Error> {
*is_busy = false; *is_busy = false;
let udp_array = UdpSocketArray::new( let udp_array = UdpSocketArray::new(
@ -301,7 +301,7 @@ impl PunchBothEasySymHoleClient {
.await .await
{ {
Ok(tunnel) => { Ok(tunnel) => {
return Ok(tunnel); return Ok(Some(tunnel));
} }
Err(e) => { Err(e) => {
tracing::error!(?e, "failed to connect with socket"); tracing::error!(?e, "failed to connect with socket");
@ -312,7 +312,7 @@ impl PunchBothEasySymHoleClient {
udp_array.add_new_socket(socket.socket).await?; udp_array.add_new_socket(socket.socket).await?;
} }
anyhow::bail!("failed to punch hole for both easy sym"); Ok(None)
} }
} }
@ -325,6 +325,7 @@ pub mod tests {
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use crate::connector::udp_hole_punch::RUN_TESTING;
use crate::{ use crate::{
connector::udp_hole_punch::{ connector::udp_hole_punch::{
tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector, tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector,
@ -338,6 +339,8 @@ pub mod tests {
#[tokio::test] #[tokio::test]
#[serial_test::serial(hole_punch)] #[serial_test::serial(hole_punch)]
async fn hole_punching_easy_sym(#[values("true", "false")] is_inc: bool) { async fn hole_punching_easy_sym(#[values("true", "false")] is_inc: bool) {
RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager_with_mock_stun(if is_inc { let p_a = create_mock_peer_manager_with_mock_stun(if is_inc {
NatType::SymmetricEasyInc NatType::SymmetricEasyInc
} else { } else {

View File

@ -94,7 +94,7 @@ impl PunchConeHoleClient {
pub(crate) async fn do_hole_punching( pub(crate) async fn do_hole_punching(
&self, &self,
dst_peer_id: PeerId, dst_peer_id: PeerId,
) -> Result<Box<dyn Tunnel>, anyhow::Error> { ) -> Result<Option<Box<dyn Tunnel>>, anyhow::Error> {
tracing::info!(?dst_peer_id, "start hole punching"); tracing::info!(?dst_peer_id, "start hole punching");
let tid = rand::random(); let tid = rand::random();
@ -212,7 +212,7 @@ impl PunchConeHoleClient {
{ {
Ok(tunnel) => { Ok(tunnel) => {
tracing::info!(?tunnel, "hole punched"); tracing::info!(?tunnel, "hole punched");
return Ok(tunnel); return Ok(Some(tunnel));
} }
Err(e) => { Err(e) => {
tracing::error!(?e, "failed to connect with socket"); tracing::error!(?e, "failed to connect with socket");
@ -221,7 +221,7 @@ impl PunchConeHoleClient {
} }
} }
return Err(anyhow::anyhow!("punch task finished but no hole punched")); return Ok(None);
} }
} }

View File

@ -1,4 +1,4 @@
use std::sync::Arc; use std::sync::{atomic::AtomicBool, Arc};
use anyhow::{Context, Error}; use anyhow::{Context, Error};
use both_easy_sym::{PunchBothEasySymHoleClient, PunchBothEasySymHoleServer}; use both_easy_sym::{PunchBothEasySymHoleClient, PunchBothEasySymHoleServer};
@ -27,6 +27,7 @@ use crate::{
}, },
rpc_types::{self, controller::BaseController}, rpc_types::{self, controller::BaseController},
}, },
tunnel::Tunnel,
}; };
pub(crate) mod both_easy_sym; pub(crate) mod both_easy_sym;
@ -36,6 +37,7 @@ pub(crate) mod sym_to_cone;
// sym punch should be serialized // sym punch should be serialized
static SYM_PUNCH_LOCK: Lazy<DashMap<PeerId, Arc<Mutex<()>>>> = Lazy::new(|| DashMap::new()); static SYM_PUNCH_LOCK: Lazy<DashMap<PeerId, Arc<Mutex<()>>>> = Lazy::new(|| DashMap::new());
static RUN_TESTING: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));
fn get_sym_punch_lock(peer_id: PeerId) -> Arc<Mutex<()>> { fn get_sym_punch_lock(peer_id: PeerId) -> Arc<Mutex<()>> {
SYM_PUNCH_LOCK SYM_PUNCH_LOCK
@ -103,6 +105,9 @@ impl UdpHolePunchRpc for UdpHolePunchServer {
_ctrl: Self::Controller, _ctrl: Self::Controller,
input: SendPunchPacketHardSymRequest, input: SendPunchPacketHardSymRequest,
) -> rpc_types::error::Result<SendPunchPacketHardSymResponse> { ) -> rpc_types::error::Result<SendPunchPacketHardSymResponse> {
let _locked = get_sym_punch_lock(self.common.get_peer_mgr().my_peer_id())
.try_lock_owned()
.with_context(|| "sym punch lock is busy")?;
self.sym_to_cone_server self.sym_to_cone_server
.send_punch_packet_hard_sym(input) .send_punch_packet_hard_sym(input)
.await .await
@ -113,6 +118,9 @@ impl UdpHolePunchRpc for UdpHolePunchServer {
_ctrl: Self::Controller, _ctrl: Self::Controller,
input: SendPunchPacketEasySymRequest, input: SendPunchPacketEasySymRequest,
) -> rpc_types::error::Result<Void> { ) -> rpc_types::error::Result<Void> {
let _locked = get_sym_punch_lock(self.common.get_peer_mgr().my_peer_id())
.try_lock_owned()
.with_context(|| "sym punch lock is busy")?;
self.sym_to_cone_server self.sym_to_cone_server
.send_punch_packet_easy_sym(input) .send_punch_packet_easy_sym(input)
.await .await
@ -134,6 +142,7 @@ impl UdpHolePunchRpc for UdpHolePunchServer {
} }
} }
#[derive(Debug)]
struct BackOff { struct BackOff {
backoffs_ms: Vec<u64>, backoffs_ms: Vec<u64>,
current_idx: usize, current_idx: usize,
@ -186,6 +195,53 @@ impl UdpHoePunchConnectorData {
}) })
} }
#[tracing::instrument(skip(self))]
async fn handle_punch_result(
self: &Self,
ret: Result<Option<Box<dyn Tunnel>>, Error>,
backoff: Option<&mut BackOff>,
round: Option<&mut u32>,
) -> bool {
let op = |rollback: bool| {
if rollback {
if let Some(backoff) = backoff {
backoff.rollback();
}
if let Some(round) = round {
*round = round.saturating_sub(1);
}
} else {
if let Some(round) = round {
*round += 1;
}
}
};
match ret {
Ok(Some(tunnel)) => {
tracing::info!(?tunnel, "hole punching get tunnel success");
if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel).await {
tracing::warn!(?e, "add client tunnel failed");
op(true);
false
} else {
true
}
}
Ok(None) => {
tracing::info!("hole punching failed, no punch tunnel");
op(false);
false
}
Err(e) => {
tracing::info!(?e, "hole punching failed");
op(true);
false
}
}
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
async fn cone_to_cone(self: Arc<Self>, task_info: PunchTaskInfo) -> Result<(), Error> { async fn cone_to_cone(self: Arc<Self>, task_info: PunchTaskInfo) -> Result<(), Error> {
let mut backoff = BackOff::new(vec![0, 1000, 2000, 4000, 4000, 8000, 8000, 16000]); let mut backoff = BackOff::new(vec![0, 1000, 2000, 4000, 4000, 8000, 8000, 16000]);
@ -197,20 +253,15 @@ impl UdpHoePunchConnectorData {
.cone_client .cone_client
.do_hole_punching(task_info.dst_peer_id) .do_hole_punching(task_info.dst_peer_id)
.await; .await;
if let Err(e) = ret {
tracing::info!(?e, "cone_to_cone hole punching failed");
continue;
}
if let Err(e) = self.peer_mgr.add_client_tunnel(ret.unwrap()).await { if self
tracing::warn!(?e, "cone_to_cone add client tunnel failed"); .handle_punch_result(ret, Some(&mut backoff), None)
continue; .await
{
break;
} }
break;
} }
tracing::info!("cone_to_cone hole punching success");
Ok(()) Ok(())
} }
@ -223,6 +274,17 @@ impl UdpHoePunchConnectorData {
loop { loop {
backoff.sleep_for_next_backoff().await; backoff.sleep_for_next_backoff().await;
// always try cone first
if !RUN_TESTING.load(std::sync::atomic::Ordering::Relaxed) {
let ret = self
.cone_client
.do_hole_punching(task_info.dst_peer_id)
.await;
if self.handle_punch_result(ret, None, None).await {
break;
}
}
let ret = { let ret = {
let _lock = get_sym_punch_lock(self.peer_mgr.my_peer_id()) let _lock = get_sym_punch_lock(self.peer_mgr.my_peer_id())
.lock_owned() .lock_owned()
@ -237,19 +299,12 @@ impl UdpHoePunchConnectorData {
.await .await
}; };
round += 1; if self
.handle_punch_result(ret, Some(&mut backoff), Some(&mut round))
if let Err(e) = ret { .await
tracing::info!(?e, "sym_to_cone hole punching failed"); {
continue; break;
} }
if let Err(e) = self.peer_mgr.add_client_tunnel(ret.unwrap()).await {
tracing::warn!(?e, "sym_to_cone add client tunnel failed");
continue;
}
break;
} }
Ok(()) Ok(())
@ -262,6 +317,17 @@ impl UdpHoePunchConnectorData {
loop { loop {
backoff.sleep_for_next_backoff().await; backoff.sleep_for_next_backoff().await;
// always try cone first
if !RUN_TESTING.load(std::sync::atomic::Ordering::Relaxed) {
let ret = self
.cone_client
.do_hole_punching(task_info.dst_peer_id)
.await;
if self.handle_punch_result(ret, None, None).await {
break;
}
}
let mut is_busy = false; let mut is_busy = false;
let ret = { let ret = {
@ -280,19 +346,12 @@ impl UdpHoePunchConnectorData {
if is_busy { if is_busy {
backoff.rollback(); backoff.rollback();
} else if self
.handle_punch_result(ret, Some(&mut backoff), None)
.await
{
break;
} }
if let Err(e) = ret {
tracing::info!(?e, "both_easy_sym hole punching failed");
continue;
}
if let Err(e) = self.peer_mgr.add_client_tunnel(ret.unwrap()).await {
tracing::warn!(?e, "both_easy_sym add client tunnel failed");
continue;
}
break;
} }
Ok(()) Ok(())

View File

@ -258,7 +258,7 @@ impl PunchSymToConeHoleClient {
round: u32, round: u32,
last_port_idx: &mut usize, last_port_idx: &mut usize,
my_nat_info: UdpNatType, my_nat_info: UdpNatType,
) -> Result<Box<dyn Tunnel>, anyhow::Error> { ) -> Result<Option<Box<dyn Tunnel>>, anyhow::Error> {
let udp_array = self.prepare_udp_array().await?; let udp_array = self.prepare_udp_array().await?;
let global_ctx = self.peer_mgr.get_global_ctx(); let global_ctx = self.peer_mgr.get_global_ctx();
@ -291,7 +291,7 @@ impl PunchSymToConeHoleClient {
) )
.await .await
{ {
return Ok(tunnel); return Ok(Some(tunnel));
} }
} }
@ -411,14 +411,7 @@ impl PunchSymToConeHoleClient {
*last_port_idx = rand::random(); *last_port_idx = rand::random();
} }
if let Some(tunnel) = ret_tunnel { Ok(ret_tunnel)
Ok(tunnel)
} else {
anyhow::bail!(
"failed to hole punch, punch task result: {:?}",
punch_task_result
)
}
} }
} }
@ -433,7 +426,7 @@ pub mod tests {
use crate::{ use crate::{
connector::udp_hole_punch::{ connector::udp_hole_punch::{
tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector, tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector, RUN_TESTING,
}, },
peers::tests::{connect_peer_manager, wait_route_appear, wait_route_appear_with_cost}, peers::tests::{connect_peer_manager, wait_route_appear, wait_route_appear_with_cost},
proto::common::NatType, proto::common::NatType,
@ -443,6 +436,8 @@ pub mod tests {
#[tokio::test] #[tokio::test]
#[serial_test::serial(hole_punch)] #[serial_test::serial(hole_punch)]
async fn hole_punching_symmetric_only_random() { async fn hole_punching_symmetric_only_random() {
RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager_with_mock_stun(NatType::Symmetric).await; let p_a = create_mock_peer_manager_with_mock_stun(NatType::Symmetric).await;
let p_b = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; let p_b = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await;
let p_c = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; let p_c = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await;
@ -518,6 +513,8 @@ pub mod tests {
#[tokio::test] #[tokio::test]
#[serial_test::serial(hole_punch)] #[serial_test::serial(hole_punch)]
async fn hole_punching_symmetric_only_predict(#[values("true", "false")] is_inc: bool) { async fn hole_punching_symmetric_only_predict(#[values("true", "false")] is_inc: bool) {
RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager_with_mock_stun(if is_inc { let p_a = create_mock_peer_manager_with_mock_stun(if is_inc {
NatType::SymmetricEasyInc NatType::SymmetricEasyInc
} else { } else {

View File

@ -305,26 +305,32 @@ impl SyncedRouteInfo {
my_peer_id: PeerId, my_peer_id: PeerId,
my_peer_route_id: u64, my_peer_route_id: u64,
dst_peer_id: PeerId, dst_peer_id: PeerId,
route_infos: &Vec<RoutePeerInfo>, dst_peer_route_id: Option<u64>,
info: &RoutePeerInfo,
) -> Result<(), Error> { ) -> Result<(), Error> {
// 1. check if we are duplicated. // 1. check if we are duplicated.
for info in route_infos.iter() { if info.peer_id == my_peer_id {
if info.peer_id == my_peer_id { if info.peer_route_id != my_peer_route_id
if info.version > self.get_peer_info_version_with_default(info.peer_id) { && info.version > self.get_peer_info_version_with_default(info.peer_id)
// if dst peer send to us with higher version info of my peer, our peer id is duplicated {
// TODO: handle this better. restart peer manager? // if dst peer send to us with higher version info of my peer, our peer id is duplicated
panic!("my peer id is duplicated"); // TODO: handle this better. restart peer manager?
// return Err(Error::DuplicatePeerId); panic!("my peer id is duplicated");
} // return Err(Error::DuplicatePeerId);
} }
} else if info.peer_id == dst_peer_id {
let Some(dst_peer_route_id) = dst_peer_route_id else {
return Ok(());
};
if info.peer_id == dst_peer_id && info.peer_route_id != my_peer_route_id { if dst_peer_route_id != info.peer_route_id
if info.version < self.get_peer_info_version_with_default(info.peer_id) { && info.version < self.get_peer_info_version_with_default(info.peer_id)
// if dst peer send to us with lower version info of dst peer, dst peer id is duplicated {
return Err(Error::DuplicatePeerId); // if dst peer send to us with lower version info of dst peer, dst peer id is duplicated
} return Err(Error::DuplicatePeerId);
} }
} }
Ok(()) Ok(())
} }
@ -335,8 +341,19 @@ impl SyncedRouteInfo {
dst_peer_id: PeerId, dst_peer_id: PeerId,
peer_infos: &Vec<RoutePeerInfo>, peer_infos: &Vec<RoutePeerInfo>,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.check_duplicate_peer_id(my_peer_id, my_peer_route_id, dst_peer_id, peer_infos)?;
for mut route_info in peer_infos.iter().map(Clone::clone) { for mut route_info in peer_infos.iter().map(Clone::clone) {
self.check_duplicate_peer_id(
my_peer_id,
my_peer_route_id,
dst_peer_id,
if route_info.peer_id == dst_peer_id {
self.peer_infos.get(&dst_peer_id).map(|x| x.peer_route_id)
} else {
None
},
&route_info,
)?;
// time between peers may not be synchronized, so update last_update to local now. // time between peers may not be synchronized, so update last_update to local now.
// note only last_update with larger version will be updated to local saved peer info. // note only last_update with larger version will be updated to local saved peer info.
route_info.last_update = Some(SystemTime::now().into()); route_info.last_update = Some(SystemTime::now().into());