diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 9183f6e..7472e04 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -700,11 +700,21 @@ impl PeerManager { let policy = Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first()); - if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy).await { - peers.send_msg_directly(msg, gateway).await - } else if foreign_network_client.has_next_hop(dst_peer_id) { - foreign_network_client.send_msg(msg, dst_peer_id).await + if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await { + if peers.has_peer(gateway) { + peers.send_msg_directly(msg, gateway).await + } else if foreign_network_client.has_next_hop(gateway) { + foreign_network_client.send_msg(msg, gateway).await + } else { + tracing::warn!( + ?gateway, + ?dst_peer_id, + "cannot send msg to peer through gateway" + ); + Err(Error::RouteError(None)) + } } else { + tracing::debug!(?dst_peer_id, "no gateway for peer"); Err(Error::RouteError(None)) } } @@ -767,10 +777,8 @@ impl PeerManager { .unwrap() .set_latency_first(is_latency_first) .set_exit_node(is_exit_node); - let next_hop_policy = Self::get_next_hop_policy(is_latency_first); let mut errs: Vec = vec![]; - let mut msg = Some(msg); let total_dst_peers = dst_peers.len(); for i in 0..total_dst_peers { @@ -786,28 +794,11 @@ impl PeerManager { .to_peer_id .set(*peer_id); - if let Some(gateway) = self - .peers - .get_gateway_peer_id(*peer_id, next_hop_policy.clone()) - .await + if let Err(e) = + Self::send_msg_internal(&self.peers, &self.foreign_network_client, msg, *peer_id) + .await { - if self.peers.has_peer(gateway) { - if let Err(e) = self.peers.send_msg_directly(msg, gateway).await { - errs.push(e); - } - } else if self.foreign_network_client.has_next_hop(gateway) { - if let Err(e) = self.foreign_network_client.send_msg(msg, gateway).await { - errs.push(e); - } - } else { - tracing::warn!( - ?gateway, - ?peer_id, - "cannot send msg to peer through gateway" - ); - } - } else { - tracing::debug!(?peer_id, "no gateway for peer"); + errs.push(e); } } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 4be6972..c5f1866 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -1,3 +1,4 @@ +use core::panic; use std::{ sync::{atomic::AtomicU32, Arc}, time::Duration, @@ -61,12 +62,13 @@ pub fn get_inst_config(inst_name: &str, ns: Option<&str>, ipv4: &str) -> TomlCon } pub async fn init_three_node(proto: &str) -> Vec { - init_three_node_ex(proto, |cfg| cfg).await + init_three_node_ex(proto, |cfg| cfg, false).await } pub async fn init_three_node_ex TomlConfigLoader>( proto: &str, cfg_cb: F, + use_public_server: bool, ) -> Vec { prepare_linux_namespaces(); @@ -91,26 +93,26 @@ pub async fn init_three_node_ex TomlConfigLoader>( inst3.run().await.unwrap(); if proto == "tcp" { - inst2 + inst1 .get_conn_manager() .add_connector(TcpTunnelConnector::new( - "tcp://10.1.1.1:11010".parse().unwrap(), + "tcp://10.1.1.2:11010".parse().unwrap(), )); } else if proto == "udp" { - inst2 + inst1 .get_conn_manager() .add_connector(UdpTunnelConnector::new( - "udp://10.1.1.1:11010".parse().unwrap(), + "udp://10.1.1.2:11010".parse().unwrap(), )); } else if proto == "wg" { #[cfg(feature = "wireguard")] - inst2 + inst1 .get_conn_manager() .add_connector(WgTunnelConnector::new( - "wg://10.1.1.1:11011".parse().unwrap(), + "wg://10.1.1.2:11011".parse().unwrap(), WgConfig::new_from_network_identity( - &inst1.get_global_ctx().get_network_identity().network_name, - &inst1 + &inst2.get_global_ctx().get_network_identity().network_name, + &inst2 .get_global_ctx() .get_network_identity() .network_secret @@ -119,36 +121,53 @@ pub async fn init_three_node_ex TomlConfigLoader>( )); } else if proto == "ws" { #[cfg(feature = "websocket")] - inst2 + inst1 .get_conn_manager() .add_connector(crate::tunnel::websocket::WSTunnelConnector::new( - "ws://10.1.1.1:11011".parse().unwrap(), + "ws://10.1.1.2:11011".parse().unwrap(), )); } else if proto == "wss" { #[cfg(feature = "websocket")] - inst2 + inst1 .get_conn_manager() .add_connector(crate::tunnel::websocket::WSTunnelConnector::new( - "wss://10.1.1.1:11012".parse().unwrap(), + "wss://10.1.1.2:11012".parse().unwrap(), )); } - inst2 + inst3 .get_conn_manager() .add_connector(RingTunnelConnector::new( - format!("ring://{}", inst3.id()).parse().unwrap(), + format!("ring://{}", inst2.id()).parse().unwrap(), )); // wait inst2 have two route. wait_for_condition( - || async { inst2.get_peer_manager().list_routes().await.len() == 2 }, - Duration::from_secs(5000), + || async { + if !use_public_server { + inst2.get_peer_manager().list_routes().await.len() == 2 + } else { + inst2 + .get_peer_manager() + .get_foreign_network_manager() + .list_foreign_networks() + .await + .foreign_networks + .len() + == 1 + } + }, + Duration::from_secs(5), ) .await; wait_for_condition( - || async { inst1.get_peer_manager().list_routes().await.len() == 2 }, - Duration::from_secs(5000), + || async { + let routes = inst1.get_peer_manager().list_routes().await; + println!("routes: {:?}", routes); + routes.len() == 2 + }, + Duration::from_secs(5), ) .await; @@ -340,17 +359,30 @@ async fn subnet_proxy_test_icmp() { #[serial_test::serial] pub async fn subnet_proxy_three_node_test( #[values("tcp", "udp", "wg")] proto: &str, - #[values(true)] no_tun: bool, + #[values(true, false)] no_tun: bool, + #[values(true, false)] relay_by_public_server: bool, ) { - let insts = init_three_node_ex(proto, |cfg| { - if cfg.get_inst_name() == "inst3" { - let mut flags = cfg.get_flags(); - flags.no_tun = no_tun; - cfg.set_flags(flags); - cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap()); - } - cfg - }) + let insts = init_three_node_ex( + proto, + |cfg| { + if cfg.get_inst_name() == "inst3" { + let mut flags = cfg.get_flags(); + flags.no_tun = no_tun; + cfg.set_flags(flags); + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap()); + } + + if cfg.get_inst_name() == "inst2" && relay_by_public_server { + cfg.set_network_identity(NetworkIdentity::new( + "public".to_string(), + "public".to_string(), + )); + } + + cfg + }, + relay_by_public_server, + ) .await; assert_eq!(insts[2].get_global_ctx().get_proxy_cidrs().len(), 1);