From 54c6418f9760fdfee02814e5268217f8324792d8 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sun, 25 Aug 2024 11:12:01 +0800 Subject: [PATCH] only add necessary conn to alive urls (#277) too many alive conns may cause high cpu usage and lagged broadcast recv. --- easytier-gui/src-tauri/src/lib.rs | 23 ++++++------ easytier/proto/cli.proto | 2 ++ easytier/src/connector/manual.rs | 50 ++++++++++++++++---------- easytier/src/easytier-cli.rs | 2 +- easytier/src/peers/peer_conn.rs | 9 ++++- easytier/src/tests/three_node.rs | 60 +++++++++++++++++++++++++++++++ 6 files changed, 115 insertions(+), 31 deletions(-) diff --git a/easytier-gui/src-tauri/src/lib.rs b/easytier-gui/src-tauri/src/lib.rs index cbbcaac..5c057dd 100644 --- a/easytier-gui/src-tauri/src/lib.rs +++ b/easytier-gui/src-tauri/src/lib.rs @@ -300,23 +300,24 @@ pub fn run() { #[cfg(not(any(target_os = "android", target_os = "ios")))] { - builder = builder - .plugin(tauri_plugin_single_instance::init(|app, args, cwd| { - app.webview_windows() - .values() - .next() - .expect("Sorry, no window found") - .set_focus() - .expect("Can't Bring Window to Focus"); - })); + builder = builder.plugin(tauri_plugin_single_instance::init(|app, _args, _cwd| { + app.webview_windows() + .values() + .next() + .expect("Sorry, no window found") + .set_focus() + .expect("Can't Bring Window to Focus"); + })); } - builder + builder = builder .plugin(tauri_plugin_os::init()) .plugin(tauri_plugin_clipboard_manager::init()) .plugin(tauri_plugin_process::init()) .plugin(tauri_plugin_shell::init()) - .plugin(tauri_plugin_vpnservice::init()) + .plugin(tauri_plugin_vpnservice::init()); + + builder .setup(|app| { // for logging config let Ok(log_dir) = app.path().app_log_dir() else { diff --git a/easytier/proto/cli.proto b/easytier/proto/cli.proto index c98fe14..f422381 100644 --- a/easytier/proto/cli.proto +++ b/easytier/proto/cli.proto @@ -30,6 +30,8 @@ message PeerConnInfo { TunnelInfo tunnel = 5; PeerConnStats stats = 6; float loss_rate = 7; + bool is_client = 8; + string network_name = 9; } message PeerInfo { diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 944e9fa..3d00e3f 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -46,7 +46,7 @@ struct ConnectorManagerData { connectors: ConnectorMap, reconnecting: DashSet, peer_manager: Arc, - alive_conn_urls: Arc>>, + alive_conn_urls: Arc>, // user removed connector urls removed_conn_urls: Arc>, net_ns: NetNS, @@ -71,7 +71,7 @@ impl ManualConnectorManager { connectors, reconnecting: DashSet::new(), peer_manager, - alive_conn_urls: Arc::new(Mutex::new(BTreeSet::new())), + alive_conn_urls: Arc::new(DashSet::new()), removed_conn_urls: Arc::new(DashSet::new()), net_ns: global_ctx.net_ns.clone(), global_ctx, @@ -80,7 +80,11 @@ impl ManualConnectorManager { }; ret.tasks - .spawn(Self::conn_mgr_routine(ret.data.clone(), event_subscriber)); + .spawn(Self::conn_mgr_reconn_routine(ret.data.clone())); + ret.tasks.spawn(Self::conn_mgr_handle_event_routine( + ret.data.clone(), + event_subscriber, + )); ret } @@ -159,10 +163,17 @@ impl ManualConnectorManager { ret } - async fn conn_mgr_routine( + async fn conn_mgr_handle_event_routine( data: Arc, mut event_recv: Receiver, ) { + loop { + let event = event_recv.recv().await.expect("event_recv got error"); + Self::handle_event(&event, &data).await; + } + } + + async fn conn_mgr_reconn_routine(data: Arc) { tracing::warn!("conn_mgr_routine started"); let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis( use_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS), @@ -171,15 +182,6 @@ impl ManualConnectorManager { loop { tokio::select! { - event = event_recv.recv() => { - if let Ok(event) = event { - Self::handle_event(&event, data.clone()).await; - } else { - tracing::warn!(?event, "event_recv got error"); - panic!("event_recv got error, err: {:?}", event); - } - } - _ = reconn_interval.tick() => { let dead_urls = Self::collect_dead_conns(data.clone()).await; if dead_urls.is_empty() { @@ -210,17 +212,24 @@ impl ManualConnectorManager { } } - async fn handle_event(event: &GlobalCtxEvent, data: Arc) { + async fn handle_event(event: &GlobalCtxEvent, data: &ConnectorManagerData) { + let need_add_alive = |conn_info: &easytier_rpc::PeerConnInfo| conn_info.is_client; match event { GlobalCtxEvent::PeerConnAdded(conn_info) => { + if !need_add_alive(conn_info) { + return; + } let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone(); - data.alive_conn_urls.lock().await.insert(addr); + data.alive_conn_urls.insert(addr); tracing::warn!("peer conn added: {:?}", conn_info); } GlobalCtxEvent::PeerConnRemoved(conn_info) => { + if !need_add_alive(conn_info) { + return; + } let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone(); - data.alive_conn_urls.lock().await.remove(&addr); + data.alive_conn_urls.remove(&addr); tracing::warn!("peer conn removed: {:?}", conn_info); } @@ -252,13 +261,18 @@ impl ManualConnectorManager { async fn collect_dead_conns(data: Arc) -> BTreeSet { Self::handle_remove_connector(data.clone()); - let curr_alive = data.alive_conn_urls.lock().await.clone(); let all_urls: BTreeSet = data .connectors .iter() .map(|x| x.key().clone().into()) .collect(); - &all_urls - &curr_alive + let mut ret = BTreeSet::new(); + for url in all_urls.iter() { + if !data.alive_conn_urls.contains(url) { + ret.insert(url.clone()); + } + } + ret } async fn conn_reconnect_with_ip_version( diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index a83c0f0..2a731ea 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -24,7 +24,7 @@ use crate::{ utils::{cost_to_str, float_to_str}, }; use humansize::format_size; -use tabled::{col, row, settings::Style}; +use tabled::settings::Style; #[derive(Parser, Debug)] #[command(name = "easytier-cli", author, version, about, long_about = None)] diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index b57876f..a842cea 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -61,6 +61,7 @@ pub struct PeerConn { tasks: JoinSet>, info: Option, + is_client: Option, close_event_sender: Option>, @@ -107,6 +108,7 @@ impl PeerConn { tasks: JoinSet::new(), info: None, + is_client: None, close_event_sender: None, ctrl_resp_sender: ctrl_sender, @@ -215,6 +217,7 @@ impl PeerConn { let rsp = self.wait_handshake_loop().await?; tracing::info!("handshake request: {:?}", rsp); self.info = Some(rsp); + self.is_client = Some(false); self.send_handshake().await?; Ok(()) } @@ -226,6 +229,7 @@ impl PeerConn { let rsp = self.wait_handshake_loop().await?; tracing::info!("handshake response: {:?}", rsp); self.info = Some(rsp); + self.is_client = Some(true); Ok(()) } @@ -359,14 +363,17 @@ impl PeerConn { } pub fn get_conn_info(&self) -> PeerConnInfo { + let info = self.info.as_ref().unwrap(); PeerConnInfo { conn_id: self.conn_id.to_string(), my_peer_id: self.my_peer_id, peer_id: self.get_peer_id(), - features: self.info.as_ref().unwrap().features.clone(), + features: info.features.clone(), tunnel: self.tunnel_info.clone(), stats: Some(self.get_stats()), loss_rate: (f64::from(self.loss_rate_stats.load(Ordering::Relaxed)) / 100.0) as f32, + is_client: self.is_client.unwrap_or_default(), + network_name: info.network_name.clone(), } } } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 2d470e6..0ae7f97 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -690,3 +690,63 @@ pub async fn socks5_vpn_portal(#[values("10.144.144.1", "10.144.144.3")] dst_add tokio::join!(task).0.unwrap(); } + +#[rstest::rstest] +#[tokio::test] +#[serial_test::serial] +pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) { + prepare_linux_namespaces(); + + let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1"); + if is_foreign { + center_node_config + .set_network_identity(NetworkIdentity::new("center".to_string(), "".to_string())); + } + let mut center_inst = Instance::new(center_node_config); + + let mut inst1 = Instance::new(get_inst_config("inst1", Some("net_b"), "10.144.145.1")); + let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_c"), "10.144.145.2")); + + center_inst.run().await.unwrap(); + inst1.run().await.unwrap(); + inst2.run().await.unwrap(); + + assert_ne!(inst1.id(), center_inst.id()); + assert_ne!(inst2.id(), center_inst.id()); + + inst1 + .get_conn_manager() + .add_connector(RingTunnelConnector::new( + format!("ring://{}", center_inst.id()).parse().unwrap(), + )); + + inst2 + .get_conn_manager() + .add_connector(RingTunnelConnector::new( + format!("ring://{}", center_inst.id()).parse().unwrap(), + )); + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + let peer_map = if !is_foreign { + inst1.get_peer_manager().get_peer_map() + } else { + inst1 + .get_peer_manager() + .get_foreign_network_client() + .get_peer_map() + }; + + let conns = peer_map + .list_peer_conns(center_inst.peer_id()) + .await + .unwrap(); + + assert_eq!(1, conns.len()); + + wait_for_condition( + || async { ping_test("net_b", "10.144.145.2", None).await }, + Duration::from_secs(5), + ) + .await; +}