From ff5ee8a05e857386b33124d5d366efa9fcc21e1e Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Fri, 27 Sep 2024 21:42:11 +0800 Subject: [PATCH] support forward foreign network packet between peers --- easytier/src/common/config.rs | 2 +- easytier/src/common/constants.rs | 2 + easytier/src/peers/foreign_network_manager.rs | 437 +++++++++++++++--- easytier/src/peers/peer_manager.rs | 112 ++++- easytier/src/peers/peer_map.rs | 29 +- easytier/src/peers/peer_ospf_route.rs | 191 ++++++-- easytier/src/peers/peer_rpc.rs | 21 +- easytier/src/peers/route_trait.rs | 9 +- easytier/src/proto/cli.proto | 5 +- easytier/src/proto/peer_rpc.proto | 1 + easytier/src/tunnel/packet_def.rs | 95 ++++ 11 files changed, 776 insertions(+), 128 deletions(-) diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 40266fb..78b3416 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -72,7 +72,7 @@ pub trait ConfigLoader: Send + Sync { pub type NetworkSecretDigest = [u8; 32]; -#[derive(Debug, Clone, Deserialize, Serialize, Default)] +#[derive(Debug, Clone, Deserialize, Serialize, Default, Eq, Hash)] pub struct NetworkIdentity { pub network_name: String, pub network_secret: Option, diff --git a/easytier/src/common/constants.rs b/easytier/src/common/constants.rs index 84b4173..3b86dba 100644 --- a/easytier/src/common/constants.rs +++ b/easytier/src/common/constants.rs @@ -21,6 +21,8 @@ macro_rules! set_global_var { define_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, u64, 1000); +define_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, u64, 10); + pub const UDP_HOLE_PUNCH_CONNECTOR_SERVICE_ID: u32 = 2; pub const EASYTIER_VERSION: &str = git_version::git_version!( diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 67167ad..1ff813d 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -42,10 +42,16 @@ use super::{ peer_ospf_route::PeerRoute, peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, peer_rpc_service::DirectConnectorManagerRpcServer, - route_trait::{ArcRoute, NextHopPolicy}, + route_trait::NextHopPolicy, PacketRecvChan, PacketRecvChanReceiver, }; +#[async_trait::async_trait] +#[auto_impl::auto_impl(&, Box, Arc)] +pub trait GlobalForeignNetworkAccessor: Send + Sync + 'static { + async fn list_global_foreign_peer(&self, network_identity: &NetworkIdentity) -> Vec; +} + struct ForeignNetworkEntry { my_peer_id: PeerId, @@ -53,10 +59,9 @@ struct ForeignNetworkEntry { network: NetworkIdentity, peer_map: Arc, relay_data: bool, + pm_packet_sender: Mutex>, - route: ArcRoute, - - peer_rpc: Weak, + peer_rpc: Arc, rpc_sender: UnboundedSender, packet_recv: Mutex>, @@ -70,10 +75,11 @@ impl ForeignNetworkEntry { global_ctx: ArcGlobalCtx, my_peer_id: PeerId, relay_data: bool, + pm_packet_sender: PacketRecvChan, ) -> Self { let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone()); - let (packet_sender, packet_recv) = mpsc::channel(1000); + let (packet_sender, packet_recv) = mpsc::channel(64); let peer_map = Arc::new(PeerMap::new( packet_sender, @@ -83,7 +89,6 @@ impl ForeignNetworkEntry { let (peer_rpc, rpc_transport_sender) = Self::build_rpc_tspt(my_peer_id, peer_map.clone()); - let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone()); peer_rpc.rpc_server().registry().register( DirectConnectorRpcServer::new(DirectConnectorManagerRpcServer::new( foreign_global_ctx.clone(), @@ -98,9 +103,9 @@ impl ForeignNetworkEntry { network, peer_map, relay_data, - route: Arc::new(Box::new(route)), + pm_packet_sender: Mutex::new(Some(pm_packet_sender)), - peer_rpc: Arc::downgrade(&peer_rpc), + peer_rpc, rpc_sender: rpc_transport_sender, packet_recv: Mutex::new(Some(packet_recv)), @@ -160,9 +165,8 @@ impl ForeignNetworkEntry { .upgrade() .ok_or(anyhow::anyhow!("peer map is gone"))?; - peer_map - .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) - .await + // send to ourselves so we can handle it in forward logic. + peer_map.send_msg_directly(msg, self.my_peer_id).await } async fn recv(&self) -> Result { @@ -186,10 +190,16 @@ impl ForeignNetworkEntry { (peer_rpc, rpc_transport_sender) } - async fn prepare_route(&self, my_peer_id: PeerId) { + async fn prepare_route( + &self, + my_peer_id: PeerId, + accessor: Box, + ) { struct Interface { my_peer_id: PeerId, peer_map: Weak, + network_identity: NetworkIdentity, + accessor: Box, } #[async_trait::async_trait] @@ -199,7 +209,17 @@ impl ForeignNetworkEntry { return vec![]; }; - peer_map.list_peers_with_conn().await + let mut global = self + .accessor + .list_global_foreign_peer(&self.network_identity) + .await; + let local = peer_map.list_peers_with_conn().await; + tracing::debug!(?global, ?local, ?self.my_peer_id, "list peers in foreign network manager"); + global.extend(local.iter().cloned()); + global + .into_iter() + .filter(|x| *x != self.my_peer_id) + .collect() } fn my_peer_id(&self) -> PeerId { @@ -207,15 +227,18 @@ impl ForeignNetworkEntry { } } - self.route + let route = PeerRoute::new(my_peer_id, self.global_ctx.clone(), self.peer_rpc.clone()); + route .open(Box::new(Interface { my_peer_id, + network_identity: self.network.clone(), peer_map: Arc::downgrade(&self.peer_map), + accessor, })) .await .unwrap(); - self.peer_map.add_route(self.route.clone()).await; + self.peer_map.add_route(Arc::new(Box::new(route))).await; } async fn start_packet_recv(&self) { @@ -224,10 +247,12 @@ impl ForeignNetworkEntry { let rpc_sender = self.rpc_sender.clone(); let peer_map = self.peer_map.clone(); let relay_data = self.relay_data; + let pm_sender = self.pm_packet_sender.lock().await.take().unwrap(); + let network_name = self.network.network_name.clone(); self.tasks.lock().await.spawn(async move { - while let Some(packet_bytes) = recv.recv().await { - let Some(hdr) = packet_bytes.peer_manager_header() else { + while let Some(zc_packet) = recv.recv().await { + let Some(hdr) = zc_packet.peer_manager_header() else { tracing::warn!("invalid packet, skip"); continue; }; @@ -238,7 +263,7 @@ impl ForeignNetworkEntry { || hdr.packet_type == PacketType::RpcReq as u8 || hdr.packet_type == PacketType::RpcResp as u8 { - rpc_sender.send(packet_bytes).unwrap(); + rpc_sender.send(zc_packet).unwrap(); continue; } tracing::trace!(?hdr, "ignore packet in foreign network"); @@ -247,32 +272,55 @@ impl ForeignNetworkEntry { continue; } - let ret = peer_map - .send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop) + let gateway_peer_id = peer_map + .get_gateway_peer_id(to_peer_id, NextHopPolicy::LeastHop) .await; - if ret.is_err() { - tracing::error!("forward packet to peer failed: {:?}", ret.err()); + + if gateway_peer_id.is_some() && peer_map.has_peer(gateway_peer_id.unwrap()) { + if let Err(e) = peer_map + .send_msg_directly(zc_packet, gateway_peer_id.unwrap()) + .await + { + tracing::error!( + ?e, + "send packet to foreign peer inside peer map failed" + ); + } + } else { + let mut foreign_packet = ZCPacket::new_for_foreign_network( + &network_name, + to_peer_id, + &zc_packet, + ); + foreign_packet.fill_peer_manager_hdr( + my_node_id, + gateway_peer_id.unwrap_or(to_peer_id), + PacketType::ForeignNetworkPacket as u8, + ); + if let Err(e) = pm_sender.send(foreign_packet).await { + tracing::error!("send packet to peer with pm failed: {:?}", e); + } } } } }); } - async fn prepare(&self, my_peer_id: PeerId) { - self.prepare_route(my_peer_id).await; + async fn prepare(&self, my_peer_id: PeerId, accessor: Box) { + self.prepare_route(my_peer_id, accessor).await; self.start_packet_recv().await; - self.peer_rpc.upgrade().unwrap().run(); + self.peer_rpc.run(); } } impl Drop for ForeignNetworkEntry { fn drop(&mut self) { - if let Some(peer_rpc) = self.peer_rpc.upgrade() { - peer_rpc - .rpc_server() - .registry() - .unregister_by_domain(&self.network.network_name); - } + self.peer_rpc + .rpc_server() + .registry() + .unregister_by_domain(&self.network.network_name); + + tracing::debug!(self.my_peer_id, ?self.network, "drop foreign network entry"); } } @@ -280,6 +328,7 @@ struct ForeignNetworkManagerData { network_peer_maps: DashMap>, peer_network_map: DashMap, network_peer_last_update: DashMap, + accessor: Arc>, lock: std::sync::Mutex<()>, } @@ -319,6 +368,50 @@ impl ForeignNetworkManagerData { self.network_peer_maps.remove(network_name); self.network_peer_last_update.remove(network_name); } + + async fn get_or_insert_entry( + &self, + network_identity: &NetworkIdentity, + my_peer_id: PeerId, + dst_peer_id: PeerId, + relay_data: bool, + global_ctx: &ArcGlobalCtx, + pm_packet_sender: &PacketRecvChan, + ) -> (Arc, bool) { + let mut new_added = false; + + let l = self.lock.lock().unwrap(); + let entry = self + .network_peer_maps + .entry(network_identity.network_name.clone()) + .or_insert_with(|| { + new_added = true; + Arc::new(ForeignNetworkEntry::new( + network_identity.clone(), + global_ctx.clone(), + my_peer_id, + relay_data, + pm_packet_sender.clone(), + )) + }) + .clone(); + + self.peer_network_map + .insert(dst_peer_id, network_identity.network_name.clone()); + + self.network_peer_last_update + .insert(network_identity.network_name.clone(), SystemTime::now()); + + drop(l); + + if new_added { + entry + .prepare(my_peer_id, Box::new(self.accessor.clone())) + .await; + } + + (entry, new_added) + } } pub const FOREIGN_NETWORK_SERVICE_ID: u32 = 1; @@ -338,11 +431,13 @@ impl ForeignNetworkManager { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, packet_sender_to_mgr: PacketRecvChan, + accessor: Box, ) -> Self { let data = Arc::new(ForeignNetworkManagerData { network_peer_maps: DashMap::new(), peer_network_map: DashMap::new(), network_peer_last_update: DashMap::new(), + accessor: Arc::new(accessor), lock: std::sync::Mutex::new(()), }); @@ -381,44 +476,23 @@ impl ForeignNetworkManager { return ret; } - let mut new_added = false; - - let entry = { - let _l = self.data.lock.lock().unwrap(); - let entry = self - .data - .network_peer_maps - .entry(peer_conn.get_network_identity().network_name.clone()) - .or_insert_with(|| { - new_added = true; - Arc::new(ForeignNetworkEntry::new( - peer_conn.get_network_identity(), - self.global_ctx.clone(), - self.my_peer_id, - !ret.is_err(), - )) - }) - .clone(); - - self.data.peer_network_map.insert( + let (entry, new_added) = self + .data + .get_or_insert_entry( + &peer_conn.get_network_identity(), + self.my_peer_id, peer_conn.get_peer_id(), - peer_conn.get_network_identity().network_name.clone(), - ); - - self.data.network_peer_last_update.insert( - peer_conn.get_network_identity().network_name.clone(), - SystemTime::now(), - ); - - entry - }; - - if new_added { - entry.prepare(self.my_peer_id).await; - self.start_event_handler(&entry).await; - } + !ret.is_err(), + &self.global_ctx, + &self.packet_sender_to_mgr, + ) + .await; if entry.network != peer_conn.get_network_identity() { + if new_added { + self.data + .remove_network(&entry.network.network_name.clone()); + } return Err(anyhow::anyhow!( "network secret not match. exp: {:?} real: {:?}", entry.network, @@ -427,6 +501,10 @@ impl ForeignNetworkManager { .into()); } + if new_added { + self.start_event_handler(&entry).await; + } + Ok(entry.peer_map.add_new_peer_conn(peer_conn).await) } @@ -480,7 +558,14 @@ impl ForeignNetworkManager { continue; }; - let mut entry = ForeignNetworkEntryPb::default(); + let mut entry = ForeignNetworkEntryPb { + network_secret_digest: item + .network + .network_secret_digest + .unwrap_or_default() + .to_vec(), + ..Default::default() + }; for peer in item.peer_map.list_peers().await { let mut peer_info = PeerInfo::default(); peer_info.peer_id = peer; @@ -499,6 +584,22 @@ impl ForeignNetworkManager { .get(network_name) .map(|v| v.clone()) } + + pub async fn send_msg_to_peer( + &self, + network_name: &str, + dst_peer_id: PeerId, + msg: ZCPacket, + ) -> Result<(), Error> { + if let Some(entry) = self.data.get_network_entry(network_name) { + entry + .peer_map + .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) + .await + } else { + Err(Error::RouteError(Some("network not found".to_string()))) + } + } } impl Drop for ForeignNetworkManager { @@ -522,18 +623,22 @@ mod tests { tests::{connect_peer_manager, wait_route_appear}, }, proto::common::NatType, + set_global_var, tunnel::common::tests::wait_for_condition, }; use super::*; - async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc { + async fn create_mock_peer_manager_for_foreign_network_ext( + network: &str, + secret: &str, + ) -> Arc { let (s, _r) = tokio::sync::mpsc::channel(1000); let peer_mgr = Arc::new(PeerManager::new( RouteAlgoType::Ospf, get_mock_global_ctx_with_network(Some(NetworkIdentity::new( network.to_string(), - network.to_string(), + secret.to_string(), ))), s, )); @@ -542,6 +647,10 @@ mod tests { peer_mgr } + async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc { + create_mock_peer_manager_for_foreign_network_ext(network, network).await + } + #[tokio::test] async fn foreign_network_basic() { let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; @@ -780,4 +889,194 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_foreign_network_manager_cluster() { + set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1); + + let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await; + connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await; + + tracing::debug!( + "pm_center: {:?}, pm_center2: {:?}", + pm_center1.my_peer_id(), + pm_center2.my_peer_id() + ); + + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center2.clone()).await; + + tracing::debug!( + "pma_net1: {:?}, pmb_net1: {:?}", + pma_net1.my_peer_id(), + pmb_net1.my_peer_id() + ); + + wait_route_appear(pma_net1.clone(), pmb_net1.clone()) + .await + .unwrap(); + + assert_eq!(3, pma_net1.list_routes().await.len(),); + + let pmc_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + connect_peer_manager(pmc_net1.clone(), pm_center3.clone()).await; + wait_route_appear(pma_net1.clone(), pmc_net1.clone()) + .await + .unwrap(); + assert_eq!(5, pma_net1.list_routes().await.len(),); + + println!( + "pm_center1: {:?}, pm_center2: {:?}, pm_center3: {:?}", + pm_center1.my_peer_id(), + pm_center2.my_peer_id(), + pm_center3.my_peer_id() + ); + println!( + "pma_net1: {:?}, pmb_net1: {:?}, pmc_net1: {:?}", + pma_net1.my_peer_id(), + pmb_net1.my_peer_id(), + pmc_net1.my_peer_id() + ); + + println!("drop pmc_net1, id: {:?}", pmc_net1.my_peer_id()); + + // foreign network node disconnect + drop(pmc_net1); + wait_for_condition( + || async { pma_net1.list_routes().await.len() == 3 }, + Duration::from_secs(15), + ) + .await; + + println!("drop pm_center1, id: {:?}", pm_center1.my_peer_id()); + drop(pm_center1); + wait_for_condition( + || async { pma_net1.list_routes().await.len() == 0 }, + Duration::from_secs(5), + ) + .await; + wait_for_condition( + || async { + let n = pmb_net1 + .get_route() + .get_next_hop(pma_net1.my_peer_id()) + .await; + n.is_none() + }, + Duration::from_secs(5), + ) + .await; + wait_for_condition( + || async { + // only remain pmb center + pmb_net1.list_routes().await.len() == 1 + }, + Duration::from_secs(15), + ) + .await; + } + + #[tokio::test] + async fn test_foreign_network_manager_cluster_multi_net() { + set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1); + + let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await; + connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await; + + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center2.clone()).await; + + let pma_net2 = create_mock_peer_manager_for_foreign_network("net2").await; + let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await; + connect_peer_manager(pma_net2.clone(), pm_center2.clone()).await; + connect_peer_manager(pmb_net2.clone(), pm_center3.clone()).await; + + let pma_net3 = create_mock_peer_manager_for_foreign_network("net3").await; + let pmb_net3 = create_mock_peer_manager_for_foreign_network("net3").await; + connect_peer_manager(pma_net3.clone(), pm_center1.clone()).await; + connect_peer_manager(pmb_net3.clone(), pm_center3.clone()).await; + + let pma_net4 = create_mock_peer_manager_for_foreign_network("net4").await; + let pmb_net4 = create_mock_peer_manager_for_foreign_network("net4").await; + let pmc_net4 = create_mock_peer_manager_for_foreign_network("net4").await; + connect_peer_manager(pma_net4.clone(), pm_center1.clone()).await; + connect_peer_manager(pmb_net4.clone(), pm_center2.clone()).await; + connect_peer_manager(pmc_net4.clone(), pm_center3.clone()).await; + + tokio::time::sleep(Duration::from_secs(5)).await; + + wait_route_appear(pma_net1.clone(), pmb_net1.clone()) + .await + .unwrap(); + wait_route_appear(pma_net2.clone(), pmb_net2.clone()) + .await + .unwrap(); + wait_route_appear(pma_net3.clone(), pmb_net3.clone()) + .await + .unwrap(); + wait_route_appear(pma_net4.clone(), pmb_net4.clone()) + .await + .unwrap(); + wait_route_appear(pma_net4.clone(), pmc_net4.clone()) + .await + .unwrap(); + wait_route_appear(pmb_net4.clone(), pmc_net4.clone()) + .await + .unwrap(); + + assert_eq!(3, pma_net1.list_routes().await.len()); + assert_eq!(3, pmb_net1.list_routes().await.len()); + + assert_eq!(3, pma_net2.list_routes().await.len()); + assert_eq!(3, pmb_net2.list_routes().await.len()); + + assert_eq!(3, pma_net3.list_routes().await.len()); + assert_eq!(3, pmb_net3.list_routes().await.len()); + + assert_eq!(5, pma_net4.list_routes().await.len()); + assert_eq!(5, pmb_net4.list_routes().await.len()); + assert_eq!(5, pmc_net4.list_routes().await.len()); + + drop(pm_center3); + tokio::time::sleep(Duration::from_secs(5)).await; + assert_eq!(1, pma_net2.list_routes().await.len()); + assert_eq!(1, pma_net3.list_routes().await.len()); + assert_eq!(3, pma_net4.list_routes().await.len()); + } + + #[tokio::test] + async fn test_foreign_network_manager_cluster_secret_mismatch() { + set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1); + + let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await; + connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await; + + let pma_net4 = create_mock_peer_manager_for_foreign_network_ext("net4", "1").await; + let pmb_net4 = create_mock_peer_manager_for_foreign_network_ext("net4", "2").await; + let pmc_net4 = create_mock_peer_manager_for_foreign_network_ext("net4", "3").await; + connect_peer_manager(pma_net4.clone(), pm_center1.clone()).await; + connect_peer_manager(pmb_net4.clone(), pm_center2.clone()).await; + connect_peer_manager(pmc_net4.clone(), pm_center3.clone()).await; + + tokio::time::sleep(Duration::from_secs(5)).await; + assert_eq!(1, pma_net4.list_routes().await.len()); + assert_eq!(1, pmb_net4.list_routes().await.len()); + assert_eq!(1, pmc_net4.list_routes().await.len()); + } } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 4a788aa..6a4534d 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -9,7 +9,6 @@ use anyhow::Context; use async_trait::async_trait; use dashmap::DashMap; -use futures::StreamExt; use tokio::{ sync::{ @@ -18,12 +17,14 @@ use tokio::{ }, task::JoinSet, }; -use tokio_stream::wrappers::ReceiverStream; use crate::{ common::{ - constants::EASYTIER_VERSION, error::Error, global_ctx::ArcGlobalCtx, - stun::StunInfoCollectorTrait, PeerId, + constants::EASYTIER_VERSION, + error::Error, + global_ctx::{ArcGlobalCtx, NetworkIdentity}, + stun::StunInfoCollectorTrait, + PeerId, }, peers::{ peer_conn::PeerConn, @@ -48,7 +49,7 @@ use crate::{ use super::{ encrypt::{Encryptor, NullCipher}, foreign_network_client::ForeignNetworkClient, - foreign_network_manager::ForeignNetworkManager, + foreign_network_manager::{ForeignNetworkManager, GlobalForeignNetworkAccessor}, peer_conn::PeerConnId, peer_map::PeerMap, peer_ospf_route::PeerRoute, @@ -236,6 +237,7 @@ impl PeerManager { my_peer_id, global_ctx.clone(), packet_send.clone(), + Self::build_foreign_network_manager_accessor(&peers), )); let foreign_network_client = Arc::new(ForeignNetworkClient::new( global_ctx.clone(), @@ -274,6 +276,34 @@ impl PeerManager { } } + fn build_foreign_network_manager_accessor( + peer_map: &Arc, + ) -> Box { + struct T { + peer_map: Weak, + } + + #[async_trait::async_trait] + impl GlobalForeignNetworkAccessor for T { + async fn list_global_foreign_peer( + &self, + network_identity: &NetworkIdentity, + ) -> Vec { + let Some(peer_map) = self.peer_map.upgrade() else { + return vec![]; + }; + + peer_map + .list_peers_own_foreign_network(network_identity) + .await + } + } + + Box::new(T { + peer_map: Arc::downgrade(peer_map), + }) + } + async fn add_new_peer_conn(&self, peer_conn: PeerConn) -> Result<(), Error> { if self.global_ctx.get_network_identity() != peer_conn.get_network_identity() { return Err(Error::SecretKeyError( @@ -329,20 +359,85 @@ impl PeerManager { Ok(()) } + async fn try_handle_foreign_network_packet( + packet: ZCPacket, + my_peer_id: PeerId, + peer_map: &PeerMap, + foreign_network_mgr: &ForeignNetworkManager, + ) -> Result<(), ZCPacket> { + let pm_header = packet.peer_manager_header().unwrap(); + if pm_header.packet_type != PacketType::ForeignNetworkPacket as u8 { + return Err(packet); + } + + let from_peer_id = pm_header.from_peer_id.get(); + let to_peer_id = pm_header.to_peer_id.get(); + + let foreign_hdr = packet.foreign_network_hdr().unwrap(); + let foreign_network_name = foreign_hdr.get_network_name(packet.payload()); + let foreign_peer_id = foreign_hdr.get_dst_peer_id(); + + if to_peer_id == my_peer_id { + // packet sent from other peer to me, extract the inner packet and forward it + if let Err(e) = foreign_network_mgr + .send_msg_to_peer( + &foreign_network_name, + foreign_peer_id, + packet.foreign_network_packet(), + ) + .await + { + tracing::debug!( + ?e, + ?foreign_network_name, + ?foreign_peer_id, + "foreign network mgr send_msg_to_peer failed" + ); + } + Ok(()) + } else if from_peer_id == my_peer_id { + // packet is generated from foreign network mgr and should be forward to other peer + if let Err(e) = peer_map + .send_msg(packet, to_peer_id, NextHopPolicy::LeastHop) + .await + { + tracing::debug!( + ?e, + ?to_peer_id, + "send_msg_directly failed when forward local generated foreign network packet" + ); + } + + Ok(()) + } else { + // target is not me, forward it + Err(packet) + } + } + async fn start_peer_recv(&self) { - let mut recv = ReceiverStream::new(self.packet_recv.lock().await.take().unwrap()); + let mut recv = self.packet_recv.lock().await.take().unwrap(); let my_peer_id = self.my_peer_id; let peers = self.peers.clone(); let pipe_line = self.peer_packet_process_pipeline.clone(); let foreign_client = self.foreign_network_client.clone(); + let foreign_mgr = self.foreign_network_manager.clone(); let encryptor = self.encryptor.clone(); self.tasks.lock().await.spawn(async move { tracing::trace!("start_peer_recv"); - while let Some(mut ret) = recv.next().await { + while let Some(ret) = recv.recv().await { + let Err(mut ret) = + Self::try_handle_foreign_network_packet(ret, my_peer_id, &peers, &foreign_mgr) + .await + else { + continue; + }; + let Some(hdr) = ret.mut_peer_manager_header() else { tracing::warn!(?ret, "invalid packet, skip"); continue; }; + tracing::trace!(?hdr, "peer recv a packet..."); let from_peer_id = hdr.from_peer_id.get(); let to_peer_id = hdr.to_peer_id.get(); @@ -511,13 +606,14 @@ impl PeerManager { .unwrap_or(SystemTime::now()); ret.insert( ForeignNetworkRouteInfoKey { - network_name: network_name.clone(), peer_id: self.my_peer_id, + network_name: network_name.clone(), }, ForeignNetworkRouteInfoEntry { foreign_peer_ids: info.peers.iter().map(|x| x.peer_id).collect(), last_update: Some(last_update.into()), version: 0, + network_secret_digest: info.network_secret_digest.clone(), }, ); } diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index cf811a4..c104931 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -7,12 +7,11 @@ use tokio::sync::RwLock; use crate::{ common::{ error::Error, - global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, PeerId, }, proto::cli::PeerConnInfo, - tunnel::packet_def::ZCPacket, - tunnel::TunnelError, + tunnel::{packet_def::ZCPacket, TunnelError}, }; use super::{ @@ -121,6 +120,20 @@ impl PeerMap { None } + pub async fn list_peers_own_foreign_network( + &self, + network_identity: &NetworkIdentity, + ) -> Vec { + let mut ret = Vec::new(); + for route in self.routes.read().await.iter() { + let peers = route + .list_peers_own_foreign_network(&network_identity) + .await; + ret.extend(peers); + } + ret + } + pub async fn send_msg( &self, msg: ZCPacket, @@ -238,3 +251,13 @@ impl PeerMap { route_map } } + +impl Drop for PeerMap { + fn drop(&mut self) { + tracing::debug!( + self.my_peer_id, + network = ?self.global_ctx.get_network_identity(), + "PeerMap is dropped" + ); + } +} diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index ecf2f06..d65bdf9 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -25,7 +25,8 @@ use tokio::{ use crate::{ common::{ - constants::EASYTIER_VERSION, global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait, PeerId, + config::NetworkIdentity, constants::EASYTIER_VERSION, global_ctx::ArcGlobalCtx, + stun::StunInfoCollectorTrait, PeerId, }, peers::route_trait::{Route, RouteInterfaceBox}, proto::{ @@ -41,6 +42,7 @@ use crate::{ controller::{BaseController, Controller}, }, }, + use_global_var, }; use super::{ @@ -716,12 +718,14 @@ type SessionId = u64; type AtomicSessionId = atomic_shim::AtomicU64; struct SessionTask { + my_peer_id: PeerId, task: Arc>>>, } impl SessionTask { - fn new() -> Self { + fn new(my_peer_id: PeerId) -> Self { SessionTask { + my_peer_id, task: Arc::new(std::sync::Mutex::new(None)), } } @@ -746,6 +750,7 @@ impl Drop for SessionTask { if let Some(task) = self.task.lock().unwrap().take() { task.abort(); } + tracing::debug!(my_peer_id = self.my_peer_id, "drop SessionTask"); } } @@ -760,6 +765,7 @@ impl Debug for SessionTask { // if we need to sync route info with one peer, we create a SyncRouteSession with that peer. #[derive(Debug)] struct SyncRouteSession { + my_peer_id: PeerId, dst_peer_id: PeerId, dst_saved_peer_info_versions: DashMap, dst_saved_conn_bitmap_version: DashMap, @@ -781,8 +787,9 @@ struct SyncRouteSession { } impl SyncRouteSession { - fn new(dst_peer_id: PeerId) -> Self { + fn new(my_peer_id: PeerId, dst_peer_id: PeerId) -> Self { SyncRouteSession { + my_peer_id, dst_peer_id, dst_saved_peer_info_versions: DashMap::new(), dst_saved_conn_bitmap_version: DashMap::new(), @@ -799,7 +806,7 @@ impl SyncRouteSession { rpc_tx_count: AtomicU32::new(0), rpc_rx_count: AtomicU32::new(0), - task: SessionTask::new(), + task: SessionTask::new(my_peer_id), } } @@ -870,17 +877,24 @@ impl SyncRouteSession { } } +impl Drop for SyncRouteSession { + fn drop(&mut self) { + tracing::debug!(?self, "drop SyncRouteSession"); + } +} + struct PeerRouteServiceImpl { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, sessions: DashMap>, - interface: Arc>>, + interface: Mutex>, - cost_calculator: Arc>>, + cost_calculator: std::sync::Mutex>, route_table: RouteTable, route_table_with_cost: RouteTable, - synced_route_info: Arc, + foreign_network_owner_map: DashMap>, + synced_route_info: SyncedRouteInfo, cached_local_conn_map: std::sync::Mutex, last_update_my_foreign_network: AtomicCell>, @@ -890,10 +904,12 @@ impl Debug for PeerRouteServiceImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PeerRouteServiceImpl") .field("my_peer_id", &self.my_peer_id) + .field("network", &self.global_ctx.get_network_identity()) .field("sessions", &self.sessions) .field("route_table", &self.route_table) .field("route_table_with_cost", &self.route_table_with_cost) .field("synced_route_info", &self.synced_route_info) + .field("foreign_network_owner_map", &self.foreign_network_owner_map) .field( "cached_local_conn_map", &self.cached_local_conn_map.lock().unwrap(), @@ -909,20 +925,19 @@ impl PeerRouteServiceImpl { global_ctx, sessions: DashMap::new(), - interface: Arc::new(Mutex::new(None)), + interface: Mutex::new(None), - cost_calculator: Arc::new(std::sync::Mutex::new(Some(Box::new( - DefaultRouteCostCalculator, - )))), + cost_calculator: std::sync::Mutex::new(Some(Box::new(DefaultRouteCostCalculator))), route_table: RouteTable::new(), route_table_with_cost: RouteTable::new(), + foreign_network_owner_map: DashMap::new(), - synced_route_info: Arc::new(SyncedRouteInfo { + synced_route_info: SyncedRouteInfo { peer_infos: DashMap::new(), conn_map: DashMap::new(), foreign_network: DashMap::new(), - }), + }, cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()), last_update_my_foreign_network: AtomicCell::new(None), @@ -932,7 +947,7 @@ impl PeerRouteServiceImpl { fn get_or_create_session(&self, dst_peer_id: PeerId) -> Arc { self.sessions .entry(dst_peer_id) - .or_insert_with(|| Arc::new(SyncRouteSession::new(dst_peer_id))) + .or_insert_with(|| Arc::new(SyncRouteSession::new(self.my_peer_id, dst_peer_id))) .value() .clone() } @@ -987,7 +1002,10 @@ impl PeerRouteServiceImpl { async fn update_my_foreign_network(&self) -> bool { let last_time = self.last_update_my_foreign_network.load(); - if last_time.is_some() && last_time.unwrap().elapsed().as_secs() < 10 { + if last_time.is_some() + && last_time.unwrap().elapsed().as_secs() + < use_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC) + { return false; } @@ -1007,6 +1025,8 @@ impl PeerRouteServiceImpl { .synced_route_info .update_my_foreign_network(self.my_peer_id, foreign_networks); + // do not need update owner map because we always filter out my peer id. + updated } @@ -1030,6 +1050,35 @@ impl PeerRouteServiceImpl { calc_locked.as_mut().unwrap().end_update(); } + fn update_foreign_network_owner_map(&self) { + self.foreign_network_owner_map.clear(); + for item in self.synced_route_info.foreign_network.iter() { + let key = item.key(); + let entry = item.value(); + if key.peer_id == self.my_peer_id + || !self.route_table.peer_reachable(key.peer_id) + || entry.foreign_peer_ids.is_empty() + { + continue; + } + let network_identity = NetworkIdentity { + network_name: key.network_name.clone(), + network_secret: None, + network_secret_digest: Some( + entry + .network_secret_digest + .clone() + .try_into() + .unwrap_or_default(), + ), + }; + self.foreign_network_owner_map + .entry(network_identity) + .or_insert_with(|| Vec::new()) + .push(key.peer_id); + } + } + fn cost_calculator_need_update(&self) -> bool { self.cost_calculator .lock() @@ -1156,10 +1205,13 @@ impl PeerRouteServiceImpl { } async fn update_my_infos(&self) -> bool { - let mut ret = self.update_my_peer_info(); - ret |= self.update_my_conn_info().await; - ret |= self.update_my_foreign_network().await; - ret + let my_peer_info_updated = self.update_my_peer_info(); + let my_conn_info_updated = self.update_my_conn_info().await; + let my_foreign_network_updated = self.update_my_foreign_network().await; + if my_conn_info_updated || my_peer_info_updated { + self.update_foreign_network_owner_map(); + } + my_peer_info_updated || my_conn_info_updated || my_foreign_network_updated } fn build_sync_request( @@ -1317,6 +1369,12 @@ impl PeerRouteServiceImpl { } } +impl Drop for PeerRouteServiceImpl { + fn drop(&mut self) { + tracing::debug!(?self, "drop PeerRouteServiceImpl"); + } +} + #[derive(Clone)] struct RouteSessionManager { service_impl: Weak, @@ -1387,29 +1445,46 @@ impl RouteSessionManager { mut sync_now: tokio::sync::broadcast::Receiver<()>, ) { loop { - let Some(service_impl) = service_impl.upgrade() else { - return; - }; + let mut first_time = true; - let Some(peer_rpc) = peer_rpc.upgrade() else { - return; - }; + loop { + let Some(service_impl) = service_impl.clone().upgrade() else { + return; + }; + + let Some(peer_rpc) = peer_rpc.clone().upgrade() else { + return; + }; + + if first_time { + first_time = false; + service_impl.update_my_infos().await; + } + + if service_impl + .sync_route_with_peer(dst_peer_id, peer_rpc.clone()) + .await + { + break; + } + + drop(service_impl); + drop(peer_rpc); - while !service_impl - .sync_route_with_peer(dst_peer_id, peer_rpc.clone()) - .await - { tokio::time::sleep(Duration::from_millis(50)).await; - service_impl.update_my_infos().await; } - sync_now.resubscribe(); - drop(service_impl); - drop(peer_rpc); + sync_now = sync_now.resubscribe(); select! { _ = tokio::time::sleep(Duration::from_secs(1)) => {} - _ = sync_now.recv() => {} + ret = sync_now.recv() => match ret { + Err(e) => { + tracing::debug!(?e, "session_task sync_now recv failed, ospf route may exit"); + break; + }, + _ => {} + } } } } @@ -1423,7 +1498,7 @@ impl RouteSessionManager { Ok(()) } - fn start_session_task(&self, session: &Arc) { + fn start_session_task(&self, session: &SyncRouteSession) { if !session.task.is_running() { session.task.set_task(tokio::spawn(Self::session_task( self.peer_rpc.clone(), @@ -1597,6 +1672,8 @@ impl RouteSessionManager { session.update_dst_session_id(from_session_id); + let mut need_update_route_table = false; + if let Some(peer_infos) = &peer_infos { service_impl.synced_route_info.update_peer_infos( my_peer_id, @@ -1604,11 +1681,17 @@ impl RouteSessionManager { peer_infos, )?; session.update_dst_saved_peer_info_version(peer_infos); + need_update_route_table = true; } if let Some(conn_bitmap) = &conn_bitmap { service_impl.synced_route_info.update_conn_map(&conn_bitmap); session.update_dst_saved_conn_bitmap_version(conn_bitmap); + need_update_route_table = true; + } + + if need_update_route_table { + service_impl.update_route_table_and_cached_local_conn_bitmap(); } if let Some(foreign_network) = &foreign_network { @@ -1618,7 +1701,9 @@ impl RouteSessionManager { session.update_dst_saved_foreign_network_version(foreign_network); } - service_impl.update_route_table_and_cached_local_conn_bitmap(); + if need_update_route_table || foreign_network.is_some() { + service_impl.update_foreign_network_owner_map(); + } tracing::info!( "handling sync_route_info rpc: from_peer_id: {:?}, is_initiator: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}, new_route_table: {:?}", @@ -1643,7 +1728,7 @@ impl RouteSessionManager { pub struct PeerRoute { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, - peer_rpc: Arc, + peer_rpc: Weak, service_impl: Arc, session_mgr: RouteSessionManager, @@ -1673,7 +1758,7 @@ impl PeerRoute { Arc::new(PeerRoute { my_peer_id, global_ctx: global_ctx.clone(), - peer_rpc, + peer_rpc: Arc::downgrade(&peer_rpc), service_impl, session_mgr, @@ -1725,7 +1810,11 @@ impl PeerRoute { } async fn start(&self) { - self.peer_rpc.rpc_server().registry().register( + let Some(peer_rpc) = self.peer_rpc.upgrade() else { + return; + }; + + peer_rpc.rpc_server().registry().register( OspfRouteRpcServer::new(self.session_mgr.clone()), &self.global_ctx.get_network_name(), ); @@ -1755,7 +1844,18 @@ impl PeerRoute { impl Drop for PeerRoute { fn drop(&mut self) { - self.peer_rpc.rpc_server().registry().unregister( + tracing::debug!( + self.my_peer_id, + network = ?self.global_ctx.get_network_identity(), + service = ?self.service_impl, + "PeerRoute drop" + ); + + let Some(peer_rpc) = self.peer_rpc.upgrade() else { + return; + }; + + peer_rpc.rpc_server().registry().unregister( OspfRouteRpcServer::new(self.session_mgr.clone()), &self.global_ctx.get_network_name(), ); @@ -1851,6 +1951,17 @@ impl Route for PeerRoute { } foreign_networks } + + async fn list_peers_own_foreign_network( + &self, + network_identity: &NetworkIdentity, + ) -> Vec { + self.service_impl + .foreign_network_owner_map + .get(network_identity) + .map(|x| x.clone()) + .unwrap_or_default() + } } impl PeerPacketFilter for Arc {} diff --git a/easytier/src/peers/peer_rpc.rs b/easytier/src/peers/peer_rpc.rs index 9acd921..c34d490 100644 --- a/easytier/src/peers/peer_rpc.rs +++ b/easytier/src/peers/peer_rpc.rs @@ -1,6 +1,7 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use futures::StreamExt; +use tokio::task::JoinSet; use crate::{ common::{error::Error, PeerId}, @@ -26,6 +27,8 @@ pub struct PeerRpcManager { tspt: Arc>, rpc_client: rpc_impl::client::Client, rpc_server: rpc_impl::server::Server, + + tasks: Arc>>, } impl std::fmt::Debug for PeerRpcManager { @@ -42,6 +45,8 @@ impl PeerRpcManager { tspt: Arc::new(Box::new(tspt)), rpc_client: rpc_impl::client::Client::new(), rpc_server: rpc_impl::server::Server::new(), + + tasks: Arc::new(Mutex::new(JoinSet::new())), } } @@ -60,7 +65,7 @@ impl PeerRpcManager { let tspt = self.tspt.clone(); - tokio::spawn(async move { + self.tasks.lock().unwrap().spawn(async move { loop { let packet = tokio::select! { Some(Ok(packet)) = server_rx.next() => { @@ -85,7 +90,7 @@ impl PeerRpcManager { }); let tspt = self.tspt.clone(); - tokio::spawn(async move { + self.tasks.lock().unwrap().spawn(async move { loop { let Ok(o) = tspt.recv().await else { tracing::warn!("peer rpc transport read aborted, exiting"); @@ -117,6 +122,12 @@ impl PeerRpcManager { } } +impl Drop for PeerRpcManager { + fn drop(&mut self) { + tracing::debug!("PeerRpcManager drop, my_peer_id: {:?}", self.my_peer_id()); + } +} + #[cfg(test)] pub mod tests { use std::{pin::Pin, sync::Arc}; @@ -135,8 +146,8 @@ pub mod tests { tests::{GreetingClientFactory, GreetingServer, GreetingService, SayHelloRequest}, }, tunnel::{ - packet_def::ZCPacket, ring::create_ring_tunnel_pair, Tunnel, - ZCPacketSink, ZCPacketStream, + packet_def::ZCPacket, ring::create_ring_tunnel_pair, Tunnel, ZCPacketSink, + ZCPacketStream, }, }; diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index 37c63ac..9cf6c24 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -3,7 +3,7 @@ use std::{net::Ipv4Addr, sync::Arc}; use dashmap::DashMap; use crate::{ - common::PeerId, + common::{global_ctx::NetworkIdentity, PeerId}, proto::peer_rpc::{ ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos, }, @@ -81,6 +81,13 @@ pub trait Route { None } + async fn list_peers_own_foreign_network( + &self, + _network_identity: &NetworkIdentity, + ) -> Vec { + vec![] + } + async fn list_foreign_network_info(&self) -> RouteForeignNetworkInfos { Default::default() } diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index 4457cc0..1b2dd1f 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -83,7 +83,10 @@ message DumpRouteResponse { string result = 1; } message ListForeignNetworkRequest {} -message ForeignNetworkEntryPb { repeated PeerInfo peers = 1; } +message ForeignNetworkEntryPb { + repeated PeerInfo peers = 1; + bytes network_secret_digest = 2; +} message ListForeignNetworkResponse { // foreign network in local diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 4bb5101..87213c9 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -42,6 +42,7 @@ message ForeignNetworkRouteInfoEntry { repeated uint32 foreign_peer_ids = 1; google.protobuf.Timestamp last_update = 2; uint32 version = 3; + bytes network_secret_digest = 4; } message RouteForeignNetworkInfos { diff --git a/easytier/src/tunnel/packet_def.rs b/easytier/src/tunnel/packet_def.rs index 6ad9295..e9c4f00 100644 --- a/easytier/src/tunnel/packet_def.rs +++ b/easytier/src/tunnel/packet_def.rs @@ -56,6 +56,7 @@ pub enum PacketType { Route = 7, RpcReq = 8, RpcResp = 9, + ForeignNetworkPacket = 10, } bitflags::bitflags! { @@ -151,6 +152,46 @@ impl PeerManagerHeader { } } +#[repr(C, packed)] +#[derive(AsBytes, FromBytes, FromZeroes, Clone, Debug, Default)] +pub struct ForeignNetworkPacketHeader { + pub header_len: U16, + pub dst_peer_id: U32, + pub network_name_offset: U16, + pub network_name_len: U16, + /* variable length network_name string */ +} + +impl ForeignNetworkPacketHeader { + pub fn new(dst_peer_id: u32, network_name: &str) -> Self { + let network_name_offset = std::mem::size_of::() as u16; + let network_name_len = network_name.len() as u16; + let header_len = network_name_offset + network_name_len; + Self { + header_len: U16::new(header_len), + dst_peer_id: U32::new(dst_peer_id), + network_name_offset: U16::new(network_name_offset), + network_name_len: U16::new(network_name_len), + } + } + + pub fn get_network_name(&self, zc_packet_payload: &[u8]) -> String { + let offset = self.network_name_offset.get() as usize; + let len = self.network_name_len.get() as usize; + std::str::from_utf8(&zc_packet_payload[offset..offset + len]) + .unwrap() + .to_string() + } + + pub fn get_dst_peer_id(&self) -> u32 { + self.dst_peer_id.get() + } + + pub fn get_header_len(&self) -> usize { + self.header_len.get() as usize + } +} + // reserve the space for aes tag and nonce #[repr(C, packed)] #[derive(AsBytes, FromBytes, FromZeroes, Clone, Debug, Default)] @@ -320,6 +361,40 @@ impl ZCPacket { ret } + pub fn new_for_foreign_network( + network_name: &String, + dst_peer_id: u32, + foreign_zc_packet: &ZCPacket, + ) -> Self { + let foreign_network_hdr = ForeignNetworkPacketHeader::new(dst_peer_id, &network_name); + let total_payload_len = + foreign_network_hdr.get_header_len() + foreign_zc_packet.tunnel_payload().len(); + + let mut ret = Self::new_nic_packet(); + let payload_off = ret.packet_type.get_packet_offsets().payload_offset; + ret.inner.reserve(payload_off + total_payload_len); + unsafe { ret.inner.set_len(payload_off + total_payload_len) }; + + let fixed_hdr_len = std::mem::size_of::(); + ret.mut_payload()[..fixed_hdr_len].copy_from_slice(foreign_network_hdr.as_bytes()); + + let name_offset = foreign_network_hdr.network_name_offset.get() as usize; + let name_len = foreign_network_hdr.network_name_len.get() as usize; + ret.mut_payload()[name_offset..name_offset + name_len] + .copy_from_slice(network_name.as_bytes()); + + ret.mut_payload()[foreign_network_hdr.get_header_len()..] + .copy_from_slice(foreign_zc_packet.tunnel_payload()); + + let hdr = ret.mut_peer_manager_header().unwrap(); + hdr.from_peer_id = 0.into(); + hdr.to_peer_id = 0.into(); + hdr.packet_type = PacketType::ForeignNetworkPacket as u8; + hdr.len.set(total_payload_len as u32); + + ret + } + pub fn packet_type(&self) -> ZCPacketType { self.packet_type } @@ -506,6 +581,26 @@ impl ZCPacket { .and_then(|hdr| Some(hdr.packet_type == PacketType::Data as u8)) .unwrap_or(false) } + + pub fn foreign_network_hdr(&self) -> Option<&ForeignNetworkPacketHeader> { + if self.peer_manager_header().unwrap().packet_type == PacketType::ForeignNetworkPacket as u8 + { + ForeignNetworkPacketHeader::ref_from_prefix(self.payload()) + } else { + None + } + } + + pub fn foreign_network_packet(mut self) -> Self { + let hdr = self.foreign_network_hdr().unwrap(); + let foreign_hdr_len = hdr.get_header_len(); + + Self::new_from_buf( + self.inner + .split_off(foreign_hdr_len + self.payload_offset()), + ZCPacketType::DummyTunnel, + ) + } } #[cfg(test)]