diff --git a/easytier/src/connector/udp_hole_punch/both_easy_sym.rs b/easytier/src/connector/udp_hole_punch/both_easy_sym.rs index 4e66c17..fc84406 100644 --- a/easytier/src/connector/udp_hole_punch/both_easy_sym.rs +++ b/easytier/src/connector/udp_hole_punch/both_easy_sym.rs @@ -185,7 +185,7 @@ impl PunchBothEasySymHoleClient { my_nat_info: UdpNatType, peer_nat_info: UdpNatType, is_busy: &mut bool, - ) -> Result, anyhow::Error> { + ) -> Result>, anyhow::Error> { *is_busy = false; let udp_array = UdpSocketArray::new( @@ -301,7 +301,7 @@ impl PunchBothEasySymHoleClient { .await { Ok(tunnel) => { - return Ok(tunnel); + return Ok(Some(tunnel)); } Err(e) => { tracing::error!(?e, "failed to connect with socket"); @@ -312,7 +312,7 @@ impl PunchBothEasySymHoleClient { udp_array.add_new_socket(socket.socket).await?; } - anyhow::bail!("failed to punch hole for both easy sym"); + Ok(None) } } @@ -325,6 +325,7 @@ pub mod tests { use tokio::net::UdpSocket; + use crate::connector::udp_hole_punch::RUN_TESTING; use crate::{ connector::udp_hole_punch::{ tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector, @@ -338,6 +339,8 @@ pub mod tests { #[tokio::test] #[serial_test::serial(hole_punch)] async fn hole_punching_easy_sym(#[values("true", "false")] is_inc: bool) { + RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed); + let p_a = create_mock_peer_manager_with_mock_stun(if is_inc { NatType::SymmetricEasyInc } else { diff --git a/easytier/src/connector/udp_hole_punch/cone.rs b/easytier/src/connector/udp_hole_punch/cone.rs index 8822e57..7e11e80 100644 --- a/easytier/src/connector/udp_hole_punch/cone.rs +++ b/easytier/src/connector/udp_hole_punch/cone.rs @@ -94,7 +94,7 @@ impl PunchConeHoleClient { pub(crate) async fn do_hole_punching( &self, dst_peer_id: PeerId, - ) -> Result, anyhow::Error> { + ) -> Result>, anyhow::Error> { tracing::info!(?dst_peer_id, "start hole punching"); let tid = rand::random(); @@ -212,7 +212,7 @@ impl PunchConeHoleClient { { Ok(tunnel) => { tracing::info!(?tunnel, "hole punched"); - return Ok(tunnel); + return Ok(Some(tunnel)); } Err(e) => { tracing::error!(?e, "failed to connect with socket"); @@ -221,7 +221,7 @@ impl PunchConeHoleClient { } } - return Err(anyhow::anyhow!("punch task finished but no hole punched")); + return Ok(None); } } diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index 6307058..aabb0f6 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{atomic::AtomicBool, Arc}; use anyhow::{Context, Error}; use both_easy_sym::{PunchBothEasySymHoleClient, PunchBothEasySymHoleServer}; @@ -27,6 +27,7 @@ use crate::{ }, rpc_types::{self, controller::BaseController}, }, + tunnel::Tunnel, }; pub(crate) mod both_easy_sym; @@ -36,6 +37,7 @@ pub(crate) mod sym_to_cone; // sym punch should be serialized static SYM_PUNCH_LOCK: Lazy>>> = Lazy::new(|| DashMap::new()); +static RUN_TESTING: Lazy = Lazy::new(|| AtomicBool::new(false)); fn get_sym_punch_lock(peer_id: PeerId) -> Arc> { SYM_PUNCH_LOCK @@ -103,6 +105,9 @@ impl UdpHolePunchRpc for UdpHolePunchServer { _ctrl: Self::Controller, input: SendPunchPacketHardSymRequest, ) -> rpc_types::error::Result { + let _locked = get_sym_punch_lock(self.common.get_peer_mgr().my_peer_id()) + .try_lock_owned() + .with_context(|| "sym punch lock is busy")?; self.sym_to_cone_server .send_punch_packet_hard_sym(input) .await @@ -113,6 +118,9 @@ impl UdpHolePunchRpc for UdpHolePunchServer { _ctrl: Self::Controller, input: SendPunchPacketEasySymRequest, ) -> rpc_types::error::Result { + let _locked = get_sym_punch_lock(self.common.get_peer_mgr().my_peer_id()) + .try_lock_owned() + .with_context(|| "sym punch lock is busy")?; self.sym_to_cone_server .send_punch_packet_easy_sym(input) .await @@ -134,6 +142,7 @@ impl UdpHolePunchRpc for UdpHolePunchServer { } } +#[derive(Debug)] struct BackOff { backoffs_ms: Vec, current_idx: usize, @@ -186,6 +195,53 @@ impl UdpHoePunchConnectorData { }) } + #[tracing::instrument(skip(self))] + async fn handle_punch_result( + self: &Self, + ret: Result>, Error>, + backoff: Option<&mut BackOff>, + round: Option<&mut u32>, + ) -> bool { + let op = |rollback: bool| { + if rollback { + if let Some(backoff) = backoff { + backoff.rollback(); + } + if let Some(round) = round { + *round = round.saturating_sub(1); + } + } else { + if let Some(round) = round { + *round += 1; + } + } + }; + + match ret { + Ok(Some(tunnel)) => { + tracing::info!(?tunnel, "hole punching get tunnel success"); + + if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel).await { + tracing::warn!(?e, "add client tunnel failed"); + op(true); + false + } else { + true + } + } + Ok(None) => { + tracing::info!("hole punching failed, no punch tunnel"); + op(false); + false + } + Err(e) => { + tracing::info!(?e, "hole punching failed"); + op(true); + false + } + } + } + #[tracing::instrument(skip(self))] async fn cone_to_cone(self: Arc, task_info: PunchTaskInfo) -> Result<(), Error> { let mut backoff = BackOff::new(vec![0, 1000, 2000, 4000, 4000, 8000, 8000, 16000]); @@ -197,20 +253,15 @@ impl UdpHoePunchConnectorData { .cone_client .do_hole_punching(task_info.dst_peer_id) .await; - if let Err(e) = ret { - tracing::info!(?e, "cone_to_cone hole punching failed"); - continue; - } - if let Err(e) = self.peer_mgr.add_client_tunnel(ret.unwrap()).await { - tracing::warn!(?e, "cone_to_cone add client tunnel failed"); - continue; + if self + .handle_punch_result(ret, Some(&mut backoff), None) + .await + { + break; } - - break; } - tracing::info!("cone_to_cone hole punching success"); Ok(()) } @@ -223,6 +274,17 @@ impl UdpHoePunchConnectorData { loop { backoff.sleep_for_next_backoff().await; + // always try cone first + if !RUN_TESTING.load(std::sync::atomic::Ordering::Relaxed) { + let ret = self + .cone_client + .do_hole_punching(task_info.dst_peer_id) + .await; + if self.handle_punch_result(ret, None, None).await { + break; + } + } + let ret = { let _lock = get_sym_punch_lock(self.peer_mgr.my_peer_id()) .lock_owned() @@ -237,19 +299,12 @@ impl UdpHoePunchConnectorData { .await }; - round += 1; - - if let Err(e) = ret { - tracing::info!(?e, "sym_to_cone hole punching failed"); - continue; + if self + .handle_punch_result(ret, Some(&mut backoff), Some(&mut round)) + .await + { + break; } - - if let Err(e) = self.peer_mgr.add_client_tunnel(ret.unwrap()).await { - tracing::warn!(?e, "sym_to_cone add client tunnel failed"); - continue; - } - - break; } Ok(()) @@ -262,6 +317,17 @@ impl UdpHoePunchConnectorData { loop { backoff.sleep_for_next_backoff().await; + // always try cone first + if !RUN_TESTING.load(std::sync::atomic::Ordering::Relaxed) { + let ret = self + .cone_client + .do_hole_punching(task_info.dst_peer_id) + .await; + if self.handle_punch_result(ret, None, None).await { + break; + } + } + let mut is_busy = false; let ret = { @@ -280,19 +346,12 @@ impl UdpHoePunchConnectorData { if is_busy { backoff.rollback(); + } else if self + .handle_punch_result(ret, Some(&mut backoff), None) + .await + { + break; } - - if let Err(e) = ret { - tracing::info!(?e, "both_easy_sym hole punching failed"); - continue; - } - - if let Err(e) = self.peer_mgr.add_client_tunnel(ret.unwrap()).await { - tracing::warn!(?e, "both_easy_sym add client tunnel failed"); - continue; - } - - break; } Ok(()) diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index 5d2afb3..e935181 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -258,7 +258,7 @@ impl PunchSymToConeHoleClient { round: u32, last_port_idx: &mut usize, my_nat_info: UdpNatType, - ) -> Result, anyhow::Error> { + ) -> Result>, anyhow::Error> { let udp_array = self.prepare_udp_array().await?; let global_ctx = self.peer_mgr.get_global_ctx(); @@ -291,7 +291,7 @@ impl PunchSymToConeHoleClient { ) .await { - return Ok(tunnel); + return Ok(Some(tunnel)); } } @@ -411,14 +411,7 @@ impl PunchSymToConeHoleClient { *last_port_idx = rand::random(); } - if let Some(tunnel) = ret_tunnel { - Ok(tunnel) - } else { - anyhow::bail!( - "failed to hole punch, punch task result: {:?}", - punch_task_result - ) - } + Ok(ret_tunnel) } } @@ -433,7 +426,7 @@ pub mod tests { use crate::{ connector::udp_hole_punch::{ - tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector, + tests::create_mock_peer_manager_with_mock_stun, UdpHolePunchConnector, RUN_TESTING, }, peers::tests::{connect_peer_manager, wait_route_appear, wait_route_appear_with_cost}, proto::common::NatType, @@ -443,6 +436,8 @@ pub mod tests { #[tokio::test] #[serial_test::serial(hole_punch)] async fn hole_punching_symmetric_only_random() { + RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed); + let p_a = create_mock_peer_manager_with_mock_stun(NatType::Symmetric).await; let p_b = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; let p_c = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; @@ -518,6 +513,8 @@ pub mod tests { #[tokio::test] #[serial_test::serial(hole_punch)] async fn hole_punching_symmetric_only_predict(#[values("true", "false")] is_inc: bool) { + RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed); + let p_a = create_mock_peer_manager_with_mock_stun(if is_inc { NatType::SymmetricEasyInc } else { diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index d699d83..39d60ba 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -305,26 +305,32 @@ impl SyncedRouteInfo { my_peer_id: PeerId, my_peer_route_id: u64, dst_peer_id: PeerId, - route_infos: &Vec, + dst_peer_route_id: Option, + info: &RoutePeerInfo, ) -> Result<(), Error> { // 1. check if we are duplicated. - for info in route_infos.iter() { - if info.peer_id == my_peer_id { - if info.version > self.get_peer_info_version_with_default(info.peer_id) { - // if dst peer send to us with higher version info of my peer, our peer id is duplicated - // TODO: handle this better. restart peer manager? - panic!("my peer id is duplicated"); - // return Err(Error::DuplicatePeerId); - } + if info.peer_id == my_peer_id { + if info.peer_route_id != my_peer_route_id + && info.version > self.get_peer_info_version_with_default(info.peer_id) + { + // if dst peer send to us with higher version info of my peer, our peer id is duplicated + // TODO: handle this better. restart peer manager? + panic!("my peer id is duplicated"); + // return Err(Error::DuplicatePeerId); } + } else if info.peer_id == dst_peer_id { + let Some(dst_peer_route_id) = dst_peer_route_id else { + return Ok(()); + }; - if info.peer_id == dst_peer_id && info.peer_route_id != my_peer_route_id { - if info.version < self.get_peer_info_version_with_default(info.peer_id) { - // if dst peer send to us with lower version info of dst peer, dst peer id is duplicated - return Err(Error::DuplicatePeerId); - } + if dst_peer_route_id != info.peer_route_id + && info.version < self.get_peer_info_version_with_default(info.peer_id) + { + // if dst peer send to us with lower version info of dst peer, dst peer id is duplicated + return Err(Error::DuplicatePeerId); } } + Ok(()) } @@ -335,8 +341,19 @@ impl SyncedRouteInfo { dst_peer_id: PeerId, peer_infos: &Vec, ) -> Result<(), Error> { - self.check_duplicate_peer_id(my_peer_id, my_peer_route_id, dst_peer_id, peer_infos)?; for mut route_info in peer_infos.iter().map(Clone::clone) { + self.check_duplicate_peer_id( + my_peer_id, + my_peer_route_id, + dst_peer_id, + if route_info.peer_id == dst_peer_id { + self.peer_infos.get(&dst_peer_id).map(|x| x.peer_route_id) + } else { + None + }, + &route_info, + )?; + // time between peers may not be synchronized, so update last_update to local now. // note only last_update with larger version will be updated to local saved peer info. route_info.last_update = Some(SystemTime::now().into());