allow foreign network forward nic data

This commit is contained in:
sijie.sun 2024-03-23 22:20:19 +08:00 committed by Sijie.Sun
parent 269146c9f8
commit 6c2a240966
8 changed files with 188 additions and 59 deletions

View File

@ -16,8 +16,8 @@ pub enum Error {
TunnelError(#[from] tunnels::TunnelError),
#[error("Peer has no conn, PeerId: {0}")]
PeerNoConnectionError(PeerId),
#[error("RouteError: {0}")]
RouteError(String),
#[error("RouteError: {0:?}")]
RouteError(Option<String>),
#[error("Not found")]
NotFound,
#[error("Invalid Url: {0}")]

View File

@ -150,9 +150,18 @@ impl ForeignNetworkClient {
pub async fn send_msg(&self, msg: Bytes, peer_id: PeerId) -> Result<(), Error> {
if let Some(next_hop) = self.get_next_hop(peer_id) {
return self.peer_map.send_msg_directly(msg, next_hop).await;
let ret = self.peer_map.send_msg_directly(msg, next_hop).await;
if ret.is_err() {
tracing::error!(
?ret,
?peer_id,
?next_hop,
"foreign network client send msg failed"
);
}
Err(Error::RouteError("no next hop".to_string()))
return ret;
}
Err(Error::RouteError(Some("no next hop".to_string())))
}
pub fn list_foreign_peers(&self) -> Vec<PeerId> {

View File

@ -57,12 +57,12 @@ impl ForeignNetworkManagerData {
let network_name = self
.peer_network_map
.get(&dst_peer_id)
.ok_or_else(|| Error::RouteError("network not found".to_string()))?
.ok_or_else(|| Error::RouteError(Some("network not found".to_string())))?
.clone();
let entry = self
.network_peer_maps
.get(&network_name)
.ok_or_else(|| Error::RouteError("no peer in network".to_string()))?
.ok_or_else(|| Error::RouteError(Some("no peer in network".to_string())))?
.clone();
entry.peer_map.send_msg(msg, dst_peer_id).await
}
@ -287,6 +287,28 @@ impl ForeignNetworkManager {
self.start_packet_recv().await;
self.register_peer_rpc_service().await;
}
pub async fn list_foreign_networks(&self) -> DashMap<String, Vec<PeerId>> {
let ret = DashMap::new();
for item in self.data.network_peer_maps.iter() {
let network_name = item.key().clone();
ret.insert(network_name, vec![]);
}
for mut n in ret.iter_mut() {
let network_name = n.key().clone();
let Some(item) = self
.data
.network_peer_maps
.get(&network_name)
.map(|v| v.clone())
else {
continue;
};
n.value_mut().extend(item.peer_map.list_peers().await);
}
ret
}
}
#[cfg(test)]

View File

@ -56,7 +56,7 @@ macro_rules! wait_response {
match &resp_payload {
$pattern => $out_var = $value,
_ => {
log::error!(
tracing::error!(
"unexpected packet: {:?}, pattern: {:?}",
rsp_bytes,
stringify!($pattern)
@ -67,6 +67,7 @@ macro_rules! wait_response {
};
}
#[derive(Debug)]
pub struct PeerInfo {
magic: u32,
pub my_peer_id: PeerId,
@ -348,13 +349,15 @@ impl PeerConn {
self.conn_id
}
#[tracing::instrument]
pub async fn do_handshake_as_server(&mut self) -> Result<(), TunnelError> {
let mut stream = self.tunnel.pin_stream();
let mut sink = self.tunnel.pin_sink();
tracing::info!("waiting for handshake request from client");
wait_response!(stream, hs_req, CtrlPacketPayload::HandShake(x) => x);
self.info = Some(PeerInfo::from(hs_req));
log::info!("handshake request: {:?}", hs_req);
tracing::info!("handshake request: {:?}", hs_req);
let hs_req = self
.global_ctx
@ -365,6 +368,7 @@ impl PeerConn {
Ok(())
}
#[tracing::instrument]
pub async fn do_handshake_as_client(&mut self) -> Result<(), TunnelError> {
let mut stream = self.tunnel.pin_stream();
let mut sink = self.tunnel.pin_sink();
@ -375,9 +379,10 @@ impl PeerConn {
.run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network));
sink.send(hs_req.into()).await?;
tracing::info!("waiting for handshake request from server");
wait_response!(stream, hs_rsp, CtrlPacketPayload::HandShake(x) => x);
self.info = Some(PeerInfo::from(hs_rsp));
log::info!("handshake response: {:?}", hs_rsp);
tracing::info!("handshake response: {:?}", hs_rsp);
Ok(())
}
@ -535,6 +540,16 @@ impl Drop for PeerConn {
}
}
impl Debug for PeerConn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerConn")
.field("conn_id", &self.conn_id)
.field("my_peer_id", &self.my_peer_id)
.field("info", &self.info)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@ -5,7 +5,7 @@ use std::{
};
use async_trait::async_trait;
use futures::{StreamExt, TryFutureExt};
use futures::StreamExt;
use tokio::{
sync::{
@ -67,11 +67,18 @@ impl PeerRpcManagerTransport for RpcTransport {
.ok_or(Error::Unknown)?;
let peers = self.peers.upgrade().ok_or(Error::Unknown)?;
if foreign_peers.has_next_hop(dst_peer_id) {
let ret = peers.send_msg(msg.clone(), dst_peer_id).await;
if matches!(ret, Err(Error::RouteError(..))) && foreign_peers.has_next_hop(dst_peer_id) {
tracing::info!(
?dst_peer_id,
?self.my_peer_id,
"failed to send msg to peer, try foreign network",
);
return foreign_peers.send_msg(msg, dst_peer_id).await;
}
peers.send_msg(msg, dst_peer_id).map_err(|e| e.into()).await
ret
}
async fn recv(&self) -> Result<Bytes, Error> {
@ -484,13 +491,18 @@ impl PeerManager {
let mut errs: Vec<Error> = vec![];
for peer_id in dst_peers.iter() {
let send_ret = self
.peers
.send_msg(
packet::Packet::new_data_packet(self.my_peer_id, peer_id.clone(), &msg).into(),
*peer_id,
)
.await;
let msg: Bytes =
packet::Packet::new_data_packet(self.my_peer_id, peer_id.clone(), &msg).into();
let send_ret = self.peers.send_msg(msg.clone(), *peer_id).await;
if matches!(send_ret, Err(Error::RouteError(..)))
&& self.foreign_network_client.has_next_hop(*peer_id)
{
let foreign_send_ret = self.foreign_network_client.send_msg(msg, *peer_id).await;
if foreign_send_ret.is_ok() {
continue;
}
}
if let Err(send_ret) = send_ret {
errs.push(send_ret);

View File

@ -87,7 +87,7 @@ impl PeerMap {
}
None => {
log::error!("no peer for dst_peer_id: {}", dst_peer_id);
return Ok(());
return Err(Error::RouteError(None));
}
}
@ -119,13 +119,13 @@ impl PeerMap {
}
let Some(gateway_peer_id) = gateway_peer_id else {
log::error!(
tracing::trace!(
"no gateway for dst_peer_id: {}, peers: {:?}, my_peer_id: {}",
dst_peer_id,
self.peer_map.iter().map(|v| *v.key()).collect::<Vec<_>>(),
self.my_peer_id
);
return Ok(());
return Err(Error::RouteError(None));
};
self.send_msg_directly(msg.clone(), gateway_peer_id).await?;

View File

@ -1,4 +1,7 @@
use std::sync::{atomic::AtomicU32, Arc};
use std::{
sync::{atomic::AtomicU32, Arc},
time::Duration,
};
use tokio::{net::UdpSocket, task::JoinSet};
@ -6,10 +9,11 @@ use super::*;
use crate::{
common::{
config::{ConfigLoader, TomlConfigLoader},
config::{ConfigLoader, NetworkIdentity, TomlConfigLoader},
netns::{NetNS, ROOT_NETNS_NAME},
},
instance::instance::Instance,
peers::tests::wait_for_condition,
tunnels::{
common::tests::_tunnel_pingpong_netns,
ring_tunnel::RingTunnelConnector,
@ -307,3 +311,65 @@ pub async fn udp_broadcast_test() {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 2);
}
#[tokio::test]
#[serial_test::serial]
pub async fn foreign_network_forward_nic_data() {
prepare_linux_namespaces();
let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1");
center_node_config.set_network_identity(NetworkIdentity {
network_name: "center".to_string(),
network_secret: "".to_string(),
});
let mut center_inst = Instance::new(center_node_config);
let mut inst1 = Instance::new(get_inst_config("inst1", Some("net_b"), "10.144.145.1"));
let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_c"), "10.144.145.2"));
center_inst.run().await.unwrap();
inst1.run().await.unwrap();
inst2.run().await.unwrap();
assert_ne!(inst1.id(), center_inst.id());
assert_ne!(inst2.id(), center_inst.id());
inst1
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center_inst.id()).parse().unwrap(),
));
inst2
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center_inst.id()).parse().unwrap(),
));
wait_for_condition(
|| async {
inst1.get_peer_manager().list_routes().await.len() == 1
&& inst2.get_peer_manager().list_routes().await.len() == 1
},
Duration::from_secs(5),
)
.await;
let _g = NetNS::new(Some(ROOT_NETNS_NAME.to_owned())).guard();
let code = tokio::process::Command::new("ip")
.args(&[
"netns",
"exec",
"net_b",
"ping",
"-c",
"1",
"-W",
"1",
"10.144.145.2",
])
.status()
.await
.unwrap();
assert_eq!(code.code().unwrap(), 0);
}

View File

@ -10,7 +10,10 @@ use crossbeam_queue::ArrayQueue;
use async_trait::async_trait;
use futures::Sink;
use once_cell::sync::Lazy;
use tokio::sync::{Mutex, Notify};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex, Notify,
};
use futures::FutureExt;
use tokio_util::bytes::BytesMut;
@ -197,7 +200,6 @@ impl RingTunnel {
struct Connection {
client: RingTunnel,
server: RingTunnel,
connect_notify: Arc<Notify>,
}
impl Tunnel for RingTunnel {
@ -219,18 +221,23 @@ impl Tunnel for RingTunnel {
}
}
static CONNECTION_MAP: Lazy<Arc<Mutex<HashMap<uuid::Uuid, Arc<Connection>>>>> =
static CONNECTION_MAP: Lazy<Arc<Mutex<HashMap<uuid::Uuid, UnboundedSender<Arc<Connection>>>>>> =
Lazy::new(|| Arc::new(Mutex::new(HashMap::new())));
#[derive(Debug)]
pub struct RingTunnelListener {
listerner_addr: url::Url,
conn_sender: UnboundedSender<Arc<Connection>>,
conn_receiver: UnboundedReceiver<Arc<Connection>>,
}
impl RingTunnelListener {
pub fn new(key: url::Url) -> Self {
let (conn_sender, conn_receiver) = tokio::sync::mpsc::unbounded_channel();
RingTunnelListener {
listerner_addr: key,
conn_sender,
conn_receiver,
}
}
}
@ -279,17 +286,6 @@ impl Tunnel for ConnectionForClient {
}
impl RingTunnelListener {
async fn add_connection(listener_addr: uuid::Uuid) {
CONNECTION_MAP.lock().await.insert(
listener_addr.clone(),
Arc::new(Connection {
client: RingTunnel::new(RING_TUNNEL_CAP),
server: RingTunnel::new_with_id(listener_addr.clone(), RING_TUNNEL_CAP),
connect_notify: Arc::new(Notify::new()),
}),
);
}
fn get_addr(&self) -> Result<uuid::Uuid, TunnelError> {
check_scheme_and_get_socket_addr::<Uuid>(&self.listerner_addr, "ring")
}
@ -299,23 +295,29 @@ impl RingTunnelListener {
impl TunnelListener for RingTunnelListener {
async fn listen(&mut self) -> Result<(), TunnelError> {
log::info!("listen new conn of key: {}", self.listerner_addr);
Self::add_connection(self.get_addr()?).await;
CONNECTION_MAP
.lock()
.await
.insert(self.get_addr()?, self.conn_sender.clone());
Ok(())
}
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
log::info!("waiting accept new conn of key: {}", self.listerner_addr);
let val = CONNECTION_MAP
.lock()
.await
.get(&self.get_addr()?)
.unwrap()
.clone();
val.connect_notify.notified().await;
CONNECTION_MAP.lock().await.remove(&self.get_addr()?);
Self::add_connection(self.get_addr()?).await;
let my_addr = self.get_addr()?;
if let Some(conn) = self.conn_receiver.recv().await {
if conn.server.id == my_addr {
log::info!("accept new conn of key: {}", self.listerner_addr);
Ok(Box::new(ConnectionForServer { conn: val }))
return Ok(Box::new(ConnectionForServer { conn }));
} else {
tracing::error!(?conn.server.id, ?my_addr, "got new conn with wrong id");
return Err(TunnelError::CommonError(
"accept got wrong ring server id".to_owned(),
));
}
}
return Err(TunnelError::CommonError("conn receiver stopped".to_owned()));
}
fn local_url(&self) -> url::Url {
@ -336,18 +338,22 @@ impl RingTunnelConnector {
#[async_trait]
impl TunnelConnector for RingTunnelConnector {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
let val = CONNECTION_MAP
let remote_addr = check_scheme_and_get_socket_addr::<Uuid>(&self.remote_addr, "ring")?;
let entry = CONNECTION_MAP
.lock()
.await
.get(&check_scheme_and_get_socket_addr::<Uuid>(
&self.remote_addr,
"ring",
)?)
.get(&remote_addr)
.unwrap()
.clone();
val.connect_notify.notify_one();
log::info!("connecting");
Ok(Box::new(ConnectionForClient { conn: val }))
let conn = Arc::new(Connection {
client: RingTunnel::new(RING_TUNNEL_CAP),
server: RingTunnel::new_with_id(remote_addr.clone(), RING_TUNNEL_CAP),
});
entry
.send(conn.clone())
.map_err(|_| TunnelError::CommonError("send conn to listner failed".to_owned()))?;
Ok(Box::new(ConnectionForClient { conn }))
}
fn remote_url(&self) -> url::Url {
@ -359,11 +365,10 @@ pub fn create_ring_tunnel_pair() -> (Box<dyn Tunnel>, Box<dyn Tunnel>) {
let conn = Arc::new(Connection {
client: RingTunnel::new(RING_TUNNEL_CAP),
server: RingTunnel::new(RING_TUNNEL_CAP),
connect_notify: Arc::new(Notify::new()),
});
(
Box::new(ConnectionForServer { conn: conn.clone() }),
Box::new(ConnectionForClient { conn: conn }),
Box::new(ConnectionForClient { conn }),
)
}