From aca9a0e35bfc7f74c8e4953ce4ed1aabe374a340 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Sun, 22 Sep 2024 10:10:32 +0800 Subject: [PATCH] use ospf route to propogate foreign network info --- easytier/build.rs | 1 + easytier/src/peers/foreign_network_manager.rs | 352 ++++++++++-------- easytier/src/peers/peer_manager.rs | 38 +- easytier/src/peers/peer_ospf_route.rs | 195 +++++++++- easytier/src/peers/route_trait.rs | 13 +- easytier/src/proto/peer_rpc.proto | 20 + easytier/src/tests/three_node.rs | 5 +- 7 files changed, 449 insertions(+), 175 deletions(-) diff --git a/easytier/build.rs b/easytier/build.rs index 704f72b..3f0118f 100644 --- a/easytier/build.rs +++ b/easytier/build.rs @@ -151,6 +151,7 @@ fn main() -> Result<(), Box> { ) .type_attribute("peer_rpc.DirectConnectedPeerInfo", "#[derive(Hash)]") .type_attribute("peer_rpc.PeerInfoForGlobalMap", "#[derive(Hash)]") + .type_attribute("peer_rpc.ForeignNetworkRouteInfoKey", "#[derive(Hash, Eq)]") .type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]") .service_generator(Box::new(rpc_build::ServiceGenerator::new())) .btree_map(&["."]) diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 7d48f09..3a114b8 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -5,7 +5,10 @@ only forward packets of peers that directly connected to this node. in future, with the help wo peer center we can forward packets of peers that connected to any node in the local network. */ -use std::sync::{Arc, Weak}; +use std::{ + sync::{Arc, Weak}, + time::SystemTime, +}; use dashmap::DashMap; use tokio::{ @@ -44,33 +47,33 @@ use super::{ }; struct ForeignNetworkEntry { + my_peer_id: PeerId, + global_ctx: ArcGlobalCtx, network: NetworkIdentity, peer_map: Arc, relay_data: bool, + route: ArcRoute, + peer_rpc: Weak, + rpc_sender: UnboundedSender, + + packet_recv: Mutex>, + + tasks: Mutex>, } impl ForeignNetworkEntry { fn new( network: NetworkIdentity, - packet_sender: PacketRecvChan, global_ctx: ArcGlobalCtx, my_peer_id: PeerId, relay_data: bool, - peer_rpc: Arc, ) -> Self { - let config = TomlConfigLoader::default(); - config.set_network_identity(network.clone()); - config.set_hostname(Some(format!("PublicServer_{}", global_ctx.get_hostname()))); - let foreign_global_ctx = Arc::new(GlobalCtx::new(config)); - foreign_global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector { - udp_nat_type: NatType::Unknown, - })); - let mut feature_flag = global_ctx.get_feature_flags(); - feature_flag.is_public_server = true; - global_ctx.set_feature_flags(feature_flag); + let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone()); + + let (packet_sender, packet_recv) = mpsc::channel(1000); let peer_map = Arc::new(PeerMap::new( packet_sender, @@ -78,11 +81,9 @@ impl ForeignNetworkEntry { my_peer_id, )); - let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone()); + let (peer_rpc, rpc_transport_sender) = Self::build_rpc_tspt(my_peer_id, peer_map.clone()); - for u in global_ctx.get_running_listeners().into_iter() { - foreign_global_ctx.add_running_listener(u); - } + let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone()); peer_rpc.rpc_server().registry().register( DirectConnectorRpcServer::new(DirectConnectorManagerRpcServer::new( foreign_global_ctx.clone(), @@ -91,16 +92,101 @@ impl ForeignNetworkEntry { ); Self { + my_peer_id, + global_ctx: foreign_global_ctx, network, peer_map, relay_data, route: Arc::new(Box::new(route)), + peer_rpc: Arc::downgrade(&peer_rpc), + rpc_sender: rpc_transport_sender, + + packet_recv: Mutex::new(Some(packet_recv)), + + tasks: Mutex::new(JoinSet::new()), } } - async fn prepare(&self, my_peer_id: PeerId) { + fn build_foreign_global_ctx( + network: &NetworkIdentity, + global_ctx: ArcGlobalCtx, + ) -> ArcGlobalCtx { + let config = TomlConfigLoader::default(); + config.set_network_identity(network.clone()); + config.set_hostname(Some(format!("PublicServer_{}", global_ctx.get_hostname()))); + + let foreign_global_ctx = Arc::new(GlobalCtx::new(config)); + foreign_global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector { + udp_nat_type: NatType::Unknown, + })); + + let mut feature_flag = global_ctx.get_feature_flags(); + feature_flag.is_public_server = true; + foreign_global_ctx.set_feature_flags(feature_flag); + + for u in global_ctx.get_running_listeners().into_iter() { + foreign_global_ctx.add_running_listener(u); + } + + foreign_global_ctx + } + + fn build_rpc_tspt( + my_peer_id: PeerId, + peer_map: Arc, + ) -> (Arc, UnboundedSender) { + struct RpcTransport { + my_peer_id: PeerId, + peer_map: Weak, + + packet_recv: Mutex>, + } + + #[async_trait::async_trait] + impl PeerRpcManagerTransport for RpcTransport { + fn my_peer_id(&self) -> PeerId { + self.my_peer_id + } + + async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { + tracing::debug!( + "foreign network manager send rpc to peer: {:?}", + dst_peer_id + ); + let peer_map = self + .peer_map + .upgrade() + .ok_or(anyhow::anyhow!("peer map is gone"))?; + + peer_map + .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) + .await + } + + async fn recv(&self) -> Result { + if let Some(o) = self.packet_recv.lock().await.recv().await { + tracing::info!("recv rpc packet in foreign network manager rpc transport"); + Ok(o) + } else { + Err(Error::Unknown) + } + } + } + + let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); + let tspt = RpcTransport { + my_peer_id, + peer_map: Arc::downgrade(&peer_map), + packet_recv: Mutex::new(peer_rpc_tspt_recv), + }; + + let peer_rpc = Arc::new(PeerRpcManager::new(tspt)); + (peer_rpc, rpc_transport_sender) + } + + async fn prepare_route(&self, my_peer_id: PeerId) { struct Interface { my_peer_id: PeerId, peer_map: Weak, @@ -131,6 +217,52 @@ impl ForeignNetworkEntry { self.peer_map.add_route(self.route.clone()).await; } + + async fn start_packet_recv(&self) { + let mut recv = self.packet_recv.lock().await.take().unwrap(); + let my_node_id = self.my_peer_id; + let rpc_sender = self.rpc_sender.clone(); + let peer_map = self.peer_map.clone(); + let relay_data = self.relay_data; + + self.tasks.lock().await.spawn(async move { + while let Some(packet_bytes) = recv.recv().await { + let Some(hdr) = packet_bytes.peer_manager_header() else { + tracing::warn!("invalid packet, skip"); + continue; + }; + tracing::info!(?hdr, "recv packet in foreign network manager"); + let to_peer_id = hdr.to_peer_id.get(); + if to_peer_id == my_node_id { + if hdr.packet_type == PacketType::TaRpc as u8 + || hdr.packet_type == PacketType::RpcReq as u8 + || hdr.packet_type == PacketType::RpcResp as u8 + { + rpc_sender.send(packet_bytes).unwrap(); + continue; + } + tracing::trace!(?hdr, "ignore packet in foreign network"); + } else { + if !relay_data && hdr.packet_type == PacketType::Data as u8 { + continue; + } + + let ret = peer_map + .send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop) + .await; + if ret.is_err() { + tracing::error!("forward packet to peer failed: {:?}", ret.err()); + } + } + } + }); + } + + async fn prepare(&self, my_peer_id: PeerId) { + self.prepare_route(my_peer_id).await; + self.start_packet_recv().await; + self.peer_rpc.upgrade().unwrap().run(); + } } impl Drop for ForeignNetworkEntry { @@ -147,27 +279,11 @@ impl Drop for ForeignNetworkEntry { struct ForeignNetworkManagerData { network_peer_maps: DashMap>, peer_network_map: DashMap, + network_peer_last_update: DashMap, lock: std::sync::Mutex<()>, } impl ForeignNetworkManagerData { - async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { - let network_name = self - .peer_network_map - .get(&dst_peer_id) - .ok_or_else(|| Error::RouteError(Some("network not found".to_string())))? - .clone(); - let entry = self - .network_peer_maps - .get(&network_name) - .ok_or_else(|| Error::RouteError(Some("no peer in network".to_string())))? - .clone(); - entry - .peer_map - .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) - .await - } - fn get_peer_network(&self, peer_id: PeerId) -> Option { self.peer_network_map.get(&peer_id).map(|v| v.clone()) } @@ -179,8 +295,12 @@ impl ForeignNetworkManagerData { fn remove_peer(&self, peer_id: PeerId, network_name: &String) { let _l = self.lock.lock().unwrap(); self.peer_network_map.remove(&peer_id); - self.network_peer_maps - .remove_if(network_name, |_, v| v.peer_map.is_empty()); + if let Some(_) = self + .network_peer_maps + .remove_if(network_name, |_, v| v.peer_map.is_empty()) + { + self.network_peer_last_update.remove(network_name); + } } async fn clear_no_conn_peer(&self, network_name: &String) { @@ -197,37 +317,7 @@ impl ForeignNetworkManagerData { let _l = self.lock.lock().unwrap(); self.peer_network_map.retain(|_, v| v != network_name); self.network_peer_maps.remove(network_name); - } -} - -struct RpcTransport { - my_peer_id: PeerId, - data: Arc, - - packet_recv: Mutex>, -} - -#[async_trait::async_trait] -impl PeerRpcManagerTransport for RpcTransport { - fn my_peer_id(&self) -> PeerId { - self.my_peer_id - } - - async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { - tracing::debug!( - "foreign network manager send rpc to peer: {:?}", - dst_peer_id - ); - self.data.send_msg(msg, dst_peer_id).await - } - - async fn recv(&self) -> Result { - if let Some(o) = self.packet_recv.lock().await.recv().await { - tracing::info!("recv rpc packet in foreign network manager rpc transport"); - Ok(o) - } else { - Err(Error::Unknown) - } + self.network_peer_last_update.remove(network_name); } } @@ -238,12 +328,7 @@ pub struct ForeignNetworkManager { global_ctx: ArcGlobalCtx, packet_sender_to_mgr: PacketRecvChan, - packet_sender: PacketRecvChan, - packet_recv: Mutex>, - data: Arc, - rpc_mgr: Arc, - rpc_transport_sender: UnboundedSender, tasks: Mutex>, } @@ -254,34 +339,19 @@ impl ForeignNetworkManager { global_ctx: ArcGlobalCtx, packet_sender_to_mgr: PacketRecvChan, ) -> Self { - // recv packet from all foreign networks - let (packet_sender, packet_recv) = mpsc::channel(1000); - let data = Arc::new(ForeignNetworkManagerData { network_peer_maps: DashMap::new(), peer_network_map: DashMap::new(), + network_peer_last_update: DashMap::new(), lock: std::sync::Mutex::new(()), }); - // handle rpc from foreign networks - let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); - let rpc_mgr = Arc::new(PeerRpcManager::new(RpcTransport { - my_peer_id, - data: data.clone(), - packet_recv: Mutex::new(peer_rpc_tspt_recv), - })); - Self { my_peer_id, global_ctx, packet_sender_to_mgr, - packet_sender, - packet_recv: Mutex::new(Some(packet_recv)), - data, - rpc_mgr, - rpc_transport_sender, tasks: Mutex::new(JoinSet::new()), } @@ -323,11 +393,9 @@ impl ForeignNetworkManager { new_added = true; Arc::new(ForeignNetworkEntry::new( peer_conn.get_network_identity(), - self.packet_sender.clone(), self.global_ctx.clone(), self.my_peer_id, !ret.is_err(), - self.rpc_mgr.clone(), )) }) .clone(); @@ -337,6 +405,11 @@ impl ForeignNetworkManager { peer_conn.get_network_identity().network_name.clone(), ); + self.data.network_peer_last_update.insert( + peer_conn.get_network_identity().network_name.clone(), + SystemTime::now(), + ); + entry }; @@ -363,83 +436,29 @@ impl ForeignNetworkManager { let mut s = entry.global_ctx.subscribe(); self.tasks.lock().await.spawn(async move { while let Ok(e) = s.recv().await { - if let GlobalCtxEvent::PeerRemoved(peer_id) = &e { - tracing::info!(?e, "remove peer from foreign network manager"); - data.remove_peer(*peer_id, &network_name); - } else if let GlobalCtxEvent::PeerConnRemoved(..) = &e { - tracing::info!(?e, "clear no conn peer from foreign network manager"); - data.clear_no_conn_peer(&network_name).await; + match &e { + GlobalCtxEvent::PeerRemoved(peer_id) => { + tracing::info!(?e, "remove peer from foreign network manager"); + data.remove_peer(*peer_id, &network_name); + data.network_peer_last_update + .insert(network_name.clone(), SystemTime::now()); + } + GlobalCtxEvent::PeerConnRemoved(..) => { + tracing::info!(?e, "clear no conn peer from foreign network manager"); + data.clear_no_conn_peer(&network_name).await; + } + GlobalCtxEvent::PeerAdded(_) => { + tracing::info!(?e, "add peer to foreign network manager"); + data.network_peer_last_update + .insert(network_name.clone(), SystemTime::now()); + } + _ => continue, } } // if lagged or recv done just remove the network tracing::error!("global event handler at foreign network manager exit"); data.remove_network(&network_name); }); - - self.tasks.lock().await.spawn(async move {}); - } - - async fn start_packet_recv(&self) { - let mut recv = self.packet_recv.lock().await.take().unwrap(); - let sender_to_mgr = self.packet_sender_to_mgr.clone(); - let my_node_id = self.my_peer_id; - let rpc_sender = self.rpc_transport_sender.clone(); - let data = self.data.clone(); - - self.tasks.lock().await.spawn(async move { - while let Some(packet_bytes) = recv.recv().await { - let Some(hdr) = packet_bytes.peer_manager_header() else { - tracing::warn!("invalid packet, skip"); - continue; - }; - tracing::info!(?hdr, "recv packet in foreign network manager"); - let from_peer_id = hdr.from_peer_id.get(); - let to_peer_id = hdr.to_peer_id.get(); - if to_peer_id == my_node_id { - if hdr.packet_type == PacketType::TaRpc as u8 - || hdr.packet_type == PacketType::RpcReq as u8 - || hdr.packet_type == PacketType::RpcResp as u8 - { - rpc_sender.send(packet_bytes).unwrap(); - continue; - } - if let Err(e) = sender_to_mgr.send(packet_bytes).await { - tracing::error!("send packet to mgr failed: {:?}", e); - } - } else { - let Some(from_network) = data.get_peer_network(from_peer_id) else { - continue; - }; - let Some(to_network) = data.get_peer_network(to_peer_id) else { - continue; - }; - if from_network != to_network { - continue; - } - - if let Some(entry) = data.get_network_entry(&from_network) { - if !entry.relay_data && hdr.packet_type == PacketType::Data as u8 { - continue; - } - - let ret = entry - .peer_map - .send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop) - .await; - if ret.is_err() { - tracing::error!("forward packet to peer failed: {:?}", ret.err()); - } - } else { - tracing::error!("foreign network not found: {}", from_network); - } - } - } - }); - } - - pub async fn run(&self) { - self.start_packet_recv().await; - self.rpc_mgr.run(); } pub async fn list_foreign_networks(&self) -> ListForeignNetworkResponse { @@ -473,6 +492,13 @@ impl ForeignNetworkManager { } ret } + + pub fn get_foreign_network_last_update(&self, network_name: &str) -> Option { + self.data + .network_peer_last_update + .get(network_name) + .map(|v| v.clone()) + } } impl Drop for ForeignNetworkManager { @@ -496,7 +522,7 @@ mod tests { tests::{connect_peer_manager, wait_route_appear}, }, proto::common::NatType, - tunnel::common::tests::wait_for_condition, + tunnel::common::tests::{enable_log, wait_for_condition}, }; use super::*; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 9bc7579..a9ef4e7 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -2,11 +2,13 @@ use std::{ fmt::Debug, net::Ipv4Addr, sync::{Arc, Weak}, + time::{Instant, SystemTime}, }; use anyhow::Context; use async_trait::async_trait; +use dashmap::DashMap; use futures::StreamExt; use tokio::{ @@ -26,10 +28,13 @@ use crate::{ peers::{ peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, - route_trait::{NextHopPolicy, RouteInterface}, + route_trait::{ForeignNetworkRouteInfoMap, NextHopPolicy, RouteInterface}, PeerPacketFilter, }, - proto::cli, + proto::{ + cli, + peer_rpc::{ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey}, + }, tunnel::{ self, packet_def::{PacketType, ZCPacket}, @@ -463,6 +468,7 @@ impl PeerManager { my_peer_id: PeerId, peers: Weak, foreign_network_client: Weak, + foreign_network_manager: Weak, } #[async_trait] @@ -484,6 +490,32 @@ impl PeerManager { fn my_peer_id(&self) -> PeerId { self.my_peer_id } + + async fn list_foreign_networks(&self) -> ForeignNetworkRouteInfoMap { + let ret = DashMap::new(); + let Some(foreign_mgr) = self.foreign_network_manager.upgrade() else { + return ret; + }; + + let networks = foreign_mgr.list_foreign_networks().await; + for (network_name, info) in networks.foreign_networks.iter() { + let last_update = foreign_mgr + .get_foreign_network_last_update(network_name) + .unwrap_or(SystemTime::now()); + ret.insert( + ForeignNetworkRouteInfoKey { + network_name: network_name.clone(), + peer_id: self.my_peer_id, + }, + ForeignNetworkRouteInfoEntry { + foreign_peer_ids: info.peers.iter().map(|x| x.peer_id).collect(), + last_update: Some(last_update.into()), + version: 0, + }, + ); + } + ret + } } let my_peer_id = self.my_peer_id; @@ -492,6 +524,7 @@ impl PeerManager { my_peer_id, peers: Arc::downgrade(&self.peers), foreign_network_client: Arc::downgrade(&self.foreign_network_client), + foreign_network_manager: Arc::downgrade(&self.foreign_network_manager), })) .await .unwrap(); @@ -672,7 +705,6 @@ impl PeerManager { .await .replace(Arc::downgrade(&self.foreign_network_client)); - self.foreign_network_manager.run().await; self.foreign_network_client.run().await; } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 2cab1ad..4d0abff 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -9,6 +9,7 @@ use std::{ time::{Duration, SystemTime}, }; +use crossbeam::atomic::AtomicCell; use dashmap::DashMap; use petgraph::{ algo::{all_simple_paths, astar, dijkstra}, @@ -30,9 +31,10 @@ use crate::{ proto::{ common::{NatType, StunInfo}, peer_rpc::{ + route_foreign_network_infos, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerIdVersion, - RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError, SyncRouteInfoRequest, - SyncRouteInfoResponse, + RouteForeignNetworkInfos, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError, + SyncRouteInfoRequest, SyncRouteInfoResponse, }, rpc_types::{ self, @@ -44,7 +46,7 @@ use crate::{ use super::{ peer_rpc::PeerRpcManager, route_trait::{ - DefaultRouteCostCalculator, NextHopPolicy, RouteCostCalculator, + DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator, RouteCostCalculatorInterface, }, PeerPacketFilter, @@ -89,6 +91,16 @@ impl From for AtomicVersion { } } +fn is_foreign_network_info_newer( + next: &ForeignNetworkRouteInfoEntry, + prev: &ForeignNetworkRouteInfoEntry, +) -> Option { + Some( + SystemTime::try_from(next.last_update?).ok()? + > SystemTime::try_from(prev.last_update?).ok()?, + ) +} + impl RoutePeerInfo { pub fn new() -> Self { Self { @@ -243,6 +255,7 @@ type Error = SyncRouteInfoError; struct SyncedRouteInfo { peer_infos: DashMap, conn_map: DashMap, AtomicVersion)>, + foreign_network: DashMap, } impl SyncedRouteInfo { @@ -256,6 +269,7 @@ impl SyncedRouteInfo { tracing::warn!(?peer_id, "remove_peer from synced_route_info"); self.peer_infos.remove(&peer_id); self.conn_map.remove(&peer_id); + self.foreign_network.retain(|k, _| k.peer_id != peer_id); } fn fill_empty_peer_info(&self, peer_ids: &BTreeSet) { @@ -353,6 +367,28 @@ impl SyncedRouteInfo { } } + fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) { + for item in foreign_network.infos.iter().map(Clone::clone) { + let Some(key) = item.key else { + continue; + }; + let Some(mut entry) = item.value else { + continue; + }; + + entry.last_update = Some(SystemTime::now().into()); + + self.foreign_network + .entry(key.clone()) + .and_modify(|old_entry| { + if entry.version > old_entry.version { + *old_entry = entry.clone(); + } + }) + .or_insert_with(|| entry.clone()); + } + } + fn update_my_peer_info(&self, my_peer_id: PeerId, global_ctx: &ArcGlobalCtx) -> bool { let mut old = self .peer_infos @@ -383,6 +419,51 @@ impl SyncedRouteInfo { } } + fn update_my_foreign_network( + &self, + my_peer_id: PeerId, + foreign_networks: ForeignNetworkRouteInfoMap, + ) -> bool { + let mut updated = false; + for mut item in self + .foreign_network + .iter_mut() + .filter(|x| x.key().peer_id == my_peer_id) + { + let (key, entry) = item.pair_mut(); + if let Some(mut new_entry) = foreign_networks.get_mut(key) { + if let Some(is_newer) = is_foreign_network_info_newer(&new_entry, entry) { + if is_newer { + new_entry.version = entry.version + 1; + *entry = new_entry.clone(); + updated = true; + } + } + drop(new_entry); + foreign_networks.remove(key).unwrap(); + } else if !item.foreign_peer_ids.is_empty() { + item.foreign_peer_ids.clear(); + item.last_update = Some(SystemTime::now().into()); + item.version += 1; + updated = true; + } + } + + for item in foreign_networks.iter() { + self.foreign_network + .entry(item.key().clone()) + .and_modify(|v| panic!("key should not exist, {:?}", v)) + .or_insert_with(|| { + let mut v = item.value().clone(); + v.version = 1; + v + }); + updated = true; + } + + updated + } + fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool { self.conn_map .get(&src_peer_id) @@ -670,6 +751,7 @@ struct SyncRouteSession { dst_peer_id: PeerId, dst_saved_peer_info_versions: DashMap, dst_saved_conn_bitmap_version: DashMap, + dst_saved_foreign_network_versions: DashMap, my_session_id: AtomicSessionId, dst_session_id: AtomicSessionId, @@ -692,6 +774,7 @@ impl SyncRouteSession { dst_peer_id, dst_saved_peer_info_versions: DashMap::new(), dst_saved_conn_bitmap_version: DashMap::new(), + dst_saved_foreign_network_versions: DashMap::new(), my_session_id: AtomicSessionId::new(rand::random()), dst_session_id: AtomicSessionId::new(0), @@ -737,6 +820,15 @@ impl SyncRouteSession { } } + fn update_dst_saved_foreign_network_version(&self, foreign_network: &RouteForeignNetworkInfos) { + for item in foreign_network.infos.iter() { + self.dst_saved_foreign_network_versions + .entry(item.key.clone().unwrap()) + .or_insert_with(|| AtomicVersion::new()) + .set_if_larger(item.value.as_ref().unwrap().version); + } + } + fn update_initiator_flag(&self, is_initiator: bool) { self.we_are_initiator.store(is_initiator, Ordering::Relaxed); self.need_sync_initiator_info.store(true, Ordering::Relaxed); @@ -778,6 +870,8 @@ struct PeerRouteServiceImpl { route_table_with_cost: RouteTable, synced_route_info: Arc, cached_local_conn_map: std::sync::Mutex, + + last_update_my_foreign_network: AtomicCell>, } impl Debug for PeerRouteServiceImpl { @@ -815,8 +909,11 @@ impl PeerRouteServiceImpl { synced_route_info: Arc::new(SyncedRouteInfo { peer_infos: DashMap::new(), conn_map: DashMap::new(), + foreign_network: DashMap::new(), }), cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()), + + last_update_my_foreign_network: AtomicCell::new(None), } } @@ -876,6 +973,31 @@ impl PeerRouteServiceImpl { updated } + async fn update_my_foreign_network(&self) -> bool { + let last_time = self.last_update_my_foreign_network.load(); + if last_time.is_some() && last_time.unwrap().elapsed().as_secs() < 10 { + return false; + } + + self.last_update_my_foreign_network + .store(Some(std::time::Instant::now())); + + let foreign_networks = self + .interface + .lock() + .await + .as_ref() + .unwrap() + .list_foreign_networks() + .await; + + let updated = self + .synced_route_info + .update_my_foreign_network(self.my_peer_id, foreign_networks); + + updated + } + fn update_route_table(&self) { let mut calc_locked = self.cost_calculator.lock().unwrap(); @@ -991,20 +1113,56 @@ impl PeerRouteServiceImpl { Some(self.cached_local_conn_map.lock().unwrap().clone()) } + fn build_foreign_network_info( + &self, + session: &SyncRouteSession, + ) -> Option { + let mut foreign_networks = RouteForeignNetworkInfos::default(); + for item in self.synced_route_info.foreign_network.iter() { + if session + .dst_saved_foreign_network_versions + .get(&item.key()) + .map(|x| x.get() >= item.value().version) + .unwrap_or(false) + { + continue; + } + + foreign_networks + .infos + .push(route_foreign_network_infos::Info { + key: Some(item.key().clone()), + value: Some(item.value().clone()), + }); + } + + if foreign_networks.infos.is_empty() { + None + } else { + Some(foreign_networks) + } + } + async fn update_my_infos(&self) -> bool { let mut ret = self.update_my_peer_info(); ret |= self.update_my_conn_info().await; + ret |= self.update_my_foreign_network().await; ret } fn build_sync_request( &self, session: &SyncRouteSession, - ) -> (Option>, Option) { + ) -> ( + Option>, + Option, + Option, + ) { let route_infos = self.build_route_info(&session); let conn_bitmap = self.build_conn_bitmap(&session); + let foreign_network = self.build_foreign_network_info(&session); - (route_infos, conn_bitmap) + (route_infos, conn_bitmap, foreign_network) } fn clear_expired_peer(&self) { @@ -1022,6 +1180,28 @@ impl PeerRouteServiceImpl { for p in to_remove.iter() { self.synced_route_info.remove_peer(*p); } + + // clear expired foreign network info + let mut to_remove = Vec::new(); + for item in self.synced_route_info.foreign_network.iter() { + let Some(since_last_update) = item + .value() + .last_update + .and_then(|x| SystemTime::try_from(x).ok()) + .and_then(|x| now.duration_since(x).ok()) + else { + to_remove.push(item.key().clone()); + continue; + }; + + if since_last_update > REMOVE_DEAD_PEER_INFO_AFTER { + to_remove.push(item.key().clone()); + } + } + + for p in to_remove.iter() { + self.synced_route_info.foreign_network.remove(p); + } } async fn sync_route_with_peer( @@ -1036,8 +1216,8 @@ impl PeerRouteServiceImpl { let my_peer_id = self.my_peer_id; - let (peer_infos, conn_bitmap) = self.build_sync_request(&session); - tracing::info!("building sync_route request. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", + let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session); + tracing::info!(?foreign_network, "building sync_route request. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session); if peer_infos.is_none() @@ -1070,6 +1250,7 @@ impl PeerRouteServiceImpl { is_initiator: session.we_are_initiator.load(Ordering::Relaxed), peer_infos: peer_infos.clone().map(|x| RoutePeerInfos { items: x }), conn_bitmap: conn_bitmap.clone().map(Into::into), + foreign_network_infos: foreign_network, }, ) .await; diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index 68fd3cd..ed0ab5c 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -1,6 +1,11 @@ use std::{net::Ipv4Addr, sync::Arc}; -use crate::common::PeerId; +use dashmap::DashMap; + +use crate::{ + common::PeerId, + proto::peer_rpc::{ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey}, +}; #[derive(Clone, Debug)] pub enum NextHopPolicy { @@ -14,10 +19,16 @@ impl Default for NextHopPolicy { } } +pub type ForeignNetworkRouteInfoMap = + DashMap; + #[async_trait::async_trait] pub trait RouteInterface { async fn list_peers(&self) -> Vec; fn my_peer_id(&self) -> PeerId; + async fn list_foreign_networks(&self) -> ForeignNetworkRouteInfoMap { + DashMap::new() + } } pub type RouteInterfaceBox = Box; diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 8ba7d0c..4bb5101 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -33,12 +33,32 @@ message RouteConnBitmap { message RoutePeerInfos { repeated RoutePeerInfo items = 1; } +message ForeignNetworkRouteInfoKey { + uint32 peer_id = 1; + string network_name = 2; +} + +message ForeignNetworkRouteInfoEntry { + repeated uint32 foreign_peer_ids = 1; + google.protobuf.Timestamp last_update = 2; + uint32 version = 3; +} + +message RouteForeignNetworkInfos { + message Info { + ForeignNetworkRouteInfoKey key = 1; + ForeignNetworkRouteInfoEntry value = 2; + } + repeated Info infos = 1; +} + message SyncRouteInfoRequest { uint32 my_peer_id = 1; uint64 my_session_id = 2; bool is_initiator = 3; RoutePeerInfos peer_infos = 4; RouteConnBitmap conn_bitmap = 5; + RouteForeignNetworkInfos foreign_network_infos = 6; } enum SyncRouteInfoError { diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 1cf16e7..ce2718e 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -716,7 +716,10 @@ pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) { } let mut center_inst = Instance::new(center_node_config); - let mut inst1 = Instance::new(get_inst_config("inst1", Some("net_b"), "10.144.145.1")); + let inst1_config = get_inst_config("inst1", Some("net_b"), "10.144.145.1"); + inst1_config.set_listeners(vec![]); + let mut inst1 = Instance::new(inst1_config); + let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_c"), "10.144.145.2")); center_inst.run().await.unwrap();