From 2cfc5a6ef6dd2f316c2211888521cca3f975bc82 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Sat, 23 Mar 2024 00:56:27 +0800 Subject: [PATCH] better user interface --- easytier-cli/src/main.rs | 36 +- easytier-core/Cargo.toml | 14 +- easytier-core/src/common/config.rs | 381 +++++++++++++++++++++ easytier-core/src/common/config_fs.rs | 161 --------- easytier-core/src/common/global_ctx.rs | 137 +++----- easytier-core/src/common/mod.rs | 2 +- easytier-core/src/connector/direct.rs | 6 +- easytier-core/src/connector/manual.rs | 15 +- easytier-core/src/instance/instance.rs | 133 +++---- easytier-core/src/instance/listeners.rs | 77 +++-- easytier-core/src/instance/virtual_nic.rs | 26 +- easytier-core/src/main.rs | 308 ++++++++++++++--- easytier-core/src/peers/peer.rs | 10 +- easytier-core/src/peers/peer_conn.rs | 25 +- easytier-core/src/peers/peer_ospf_route.rs | 1 + easytier-core/src/tests/three_node.rs | 43 +-- easytier-core/src/tunnels/common.rs | 4 +- easytier-core/src/tunnels/tcp_tunnel.rs | 4 +- 18 files changed, 872 insertions(+), 511 deletions(-) create mode 100644 easytier-core/src/common/config.rs delete mode 100644 easytier-core/src/common/config_fs.rs diff --git a/easytier-cli/src/main.rs b/easytier-cli/src/main.rs index 875ab31..4e563ba 100644 --- a/easytier-cli/src/main.rs +++ b/easytier-cli/src/main.rs @@ -1,4 +1,4 @@ -use std::vec; +use std::{net::SocketAddr, vec}; use clap::{command, Args, Parser, Subcommand}; use easytier_core::{ @@ -11,15 +11,13 @@ use easytier_core::{ }; use humansize::format_size; use tabled::settings::Style; -use tracing::level_filters::LevelFilter; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { /// the instance name - #[arg(short = 'n', long, default_value = "default")] - instance_name: String, + #[arg(short = 'p', long, default_value = "127.0.0.1:15888")] + rpc_portal: SocketAddr, #[command(subcommand)] sub_command: SubCommand, @@ -358,38 +356,12 @@ impl CommandHandler { } } -fn init_logger() { - // logger to rolling file - let file_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env() - .unwrap(); - let file_appender = tracing_appender::rolling::Builder::new() - .rotation(tracing_appender::rolling::Rotation::DAILY) - .max_log_files(1) - .filename_prefix("cli.log") - .build("/tmp") - .expect("failed to initialize rolling file appender"); - let mut file_layer = tracing_subscriber::fmt::layer(); - file_layer.set_ansi(false); - let file_layer = file_layer - .with_writer(file_appender) - .with_timer(easytier_core::common::get_logger_timer_rfc3339()) - .with_filter(file_filter); - - tracing_subscriber::Registry::default() - .with(file_layer) - .init(); -} - #[tokio::main] #[tracing::instrument] async fn main() -> Result<(), Error> { - init_logger(); - let cli = Cli::parse(); let handler = CommandHandler { - addr: "http://127.0.0.1:15888".to_string(), + addr: format!("http://{}:{}", cli.rpc_portal.ip(), cli.rpc_portal.port()), }; match cli.sub_command { diff --git a/easytier-core/Cargo.toml b/easytier-core/Cargo.toml index 8be35a0..123e29d 100644 --- a/easytier-core/Cargo.toml +++ b/easytier-core/Cargo.toml @@ -24,6 +24,8 @@ thiserror = "1.0" auto_impl = "1.1.0" crossbeam = "0.8.4" time = "0.3" +toml = "0.8.12" +chrono = "0.4.35" gethostname = "0.4.3" @@ -56,8 +58,14 @@ crossbeam-queue = "0.3" once_cell = "1.18.0" # for packet -rkyv = { "version" = "0.7.42", features = ["validation", "archive_le", "strict", "copy_unsafe", "arbitrary_enum_discriminant"] } -postcard = {"version"= "*", features = ["alloc"]} +rkyv = { "version" = "0.7.42", features = [ + "validation", + "archive_le", + "strict", + "copy_unsafe", + "arbitrary_enum_discriminant", +] } +postcard = { "version" = "*", features = ["alloc"] } # for rpc tonic = "0.10" @@ -65,7 +73,7 @@ prost = "0.12" anyhow = "1.0" tarpc = { version = "0.32", features = ["tokio1", "serde1"] } -url = "2.5.0" +url = { version = "2.5", features = ["serde"] } # for tun packet byteorder = "1.5.0" diff --git a/easytier-core/src/common/config.rs b/easytier-core/src/common/config.rs new file mode 100644 index 0000000..9dd87ea --- /dev/null +++ b/easytier-core/src/common/config.rs @@ -0,0 +1,381 @@ +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use anyhow::Context; +use serde::{Deserialize, Serialize}; + +#[auto_impl::auto_impl(Box, &)] +pub trait ConfigLoader: Send + Sync { + fn get_id(&self) -> uuid::Uuid; + + fn get_inst_name(&self) -> String; + fn set_inst_name(&self, name: String); + + fn get_netns(&self) -> Option; + fn set_netns(&self, ns: Option); + + fn get_ipv4(&self) -> Option; + fn set_ipv4(&self, addr: std::net::Ipv4Addr); + + fn add_proxy_cidr(&self, cidr: cidr::IpCidr); + fn remove_proxy_cidr(&self, cidr: cidr::IpCidr); + fn get_proxy_cidrs(&self) -> Vec; + + fn get_network_identity(&self) -> NetworkIdentity; + fn set_network_identity(&self, identity: NetworkIdentity); + + fn get_listener_uris(&self) -> Vec; + + fn get_file_logger_config(&self) -> FileLoggerConfig; + fn set_file_logger_config(&self, config: FileLoggerConfig); + fn get_console_logger_config(&self) -> ConsoleLoggerConfig; + fn set_console_logger_config(&self, config: ConsoleLoggerConfig); + + fn get_peers(&self) -> Vec; + fn set_peers(&self, peers: Vec); + + fn get_listeners(&self) -> Vec; + fn set_listeners(&self, listeners: Vec); + + fn get_rpc_portal(&self) -> Option; + fn set_rpc_portal(&self, addr: SocketAddr); + + fn dump(&self) -> String; +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct NetworkIdentity { + pub network_name: String, + pub network_secret: String, +} + +impl NetworkIdentity { + pub fn new(network_name: String, network_secret: String) -> Self { + NetworkIdentity { + network_name, + network_secret, + } + } + + pub fn default() -> Self { + Self::new("default".to_string(), "".to_string()) + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct PeerConfig { + pub uri: url::Url, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct NetworkConfig { + pub cidr: String, + pub allow: Option>, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)] +pub struct FileLoggerConfig { + pub level: Option, + pub file: Option, + pub dir: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)] +pub struct ConsoleLoggerConfig { + pub level: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +struct Config { + netns: Option, + instance_name: Option, + instance_id: Option, + ipv4: Option, + network_identity: Option, + listeners: Option>, + + peer: Option>, + proxy_network: Option>, + + file_logger: Option, + console_logger: Option, + + rpc_portal: Option, +} + +#[derive(Debug, Clone)] +pub struct TomlConfigLoader { + config: Arc>, +} + +impl Default for TomlConfigLoader { + fn default() -> Self { + TomlConfigLoader::new_from_str("").unwrap() + } +} + +impl TomlConfigLoader { + pub fn new_from_str(config_str: &str) -> Result { + let config = toml::de::from_str::(config_str).with_context(|| { + format!( + "failed to parse config file: {}\n{}", + config_str, config_str + ) + })?; + Ok(TomlConfigLoader { + config: Arc::new(Mutex::new(config)), + }) + } + + pub fn new(config_path: &str) -> Result { + let config_str = std::fs::read_to_string(config_path) + .with_context(|| format!("failed to read config file: {}", config_path))?; + Self::new_from_str(&config_str) + } +} + +impl ConfigLoader for TomlConfigLoader { + fn get_inst_name(&self) -> String { + self.config + .lock() + .unwrap() + .instance_name + .clone() + .unwrap_or("default".to_string()) + } + + fn set_inst_name(&self, name: String) { + self.config.lock().unwrap().instance_name = Some(name); + } + + fn get_netns(&self) -> Option { + self.config.lock().unwrap().netns.clone() + } + + fn set_netns(&self, ns: Option) { + self.config.lock().unwrap().netns = ns; + } + + fn get_ipv4(&self) -> Option { + let locked_config = self.config.lock().unwrap(); + locked_config + .ipv4 + .as_ref() + .map(|s| s.parse().ok()) + .flatten() + } + + fn set_ipv4(&self, addr: std::net::Ipv4Addr) { + self.config.lock().unwrap().ipv4 = Some(addr.to_string()); + } + + fn add_proxy_cidr(&self, cidr: cidr::IpCidr) { + let mut locked_config = self.config.lock().unwrap(); + if locked_config.proxy_network.is_none() { + locked_config.proxy_network = Some(vec![]); + } + let cidr_str = cidr.to_string(); + // insert if no duplicate + if !locked_config + .proxy_network + .as_ref() + .unwrap() + .iter() + .any(|c| c.cidr == cidr_str) + { + locked_config + .proxy_network + .as_mut() + .unwrap() + .push(NetworkConfig { + cidr: cidr_str, + allow: None, + }); + } + } + + fn remove_proxy_cidr(&self, cidr: cidr::IpCidr) { + let mut locked_config = self.config.lock().unwrap(); + if let Some(proxy_cidrs) = &mut locked_config.proxy_network { + let cidr_str = cidr.to_string(); + proxy_cidrs.retain(|c| c.cidr != cidr_str); + } + } + + fn get_proxy_cidrs(&self) -> Vec { + self.config + .lock() + .unwrap() + .proxy_network + .as_ref() + .map(|v| { + v.iter() + .map(|c| c.cidr.parse().unwrap()) + .collect::>() + }) + .unwrap_or_default() + } + + fn get_id(&self) -> uuid::Uuid { + let mut locked_config = self.config.lock().unwrap(); + if locked_config.instance_id.is_none() { + let id = uuid::Uuid::new_v4(); + locked_config.instance_id = Some(id.to_string()); + id + } else { + uuid::Uuid::parse_str(locked_config.instance_id.as_ref().unwrap()) + .with_context(|| { + format!( + "failed to parse instance id as uuid: {}, you can use this id: {}", + locked_config.instance_id.as_ref().unwrap(), + uuid::Uuid::new_v4() + ) + }) + .unwrap() + } + } + + fn get_network_identity(&self) -> NetworkIdentity { + self.config + .lock() + .unwrap() + .network_identity + .clone() + .unwrap_or_else(NetworkIdentity::default) + } + + fn set_network_identity(&self, identity: NetworkIdentity) { + self.config.lock().unwrap().network_identity = Some(identity); + } + + fn get_listener_uris(&self) -> Vec { + self.config + .lock() + .unwrap() + .listeners + .clone() + .unwrap_or_default() + } + + fn get_file_logger_config(&self) -> FileLoggerConfig { + self.config + .lock() + .unwrap() + .file_logger + .clone() + .unwrap_or_default() + } + + fn set_file_logger_config(&self, config: FileLoggerConfig) { + self.config.lock().unwrap().file_logger = Some(config); + } + + fn get_console_logger_config(&self) -> ConsoleLoggerConfig { + self.config + .lock() + .unwrap() + .console_logger + .clone() + .unwrap_or_default() + } + + fn set_console_logger_config(&self, config: ConsoleLoggerConfig) { + self.config.lock().unwrap().console_logger = Some(config); + } + + fn get_peers(&self) -> Vec { + self.config.lock().unwrap().peer.clone().unwrap_or_default() + } + + fn set_peers(&self, peers: Vec) { + self.config.lock().unwrap().peer = Some(peers); + } + + fn get_listeners(&self) -> Vec { + self.config + .lock() + .unwrap() + .listeners + .clone() + .unwrap_or_default() + } + + fn set_listeners(&self, listeners: Vec) { + self.config.lock().unwrap().listeners = Some(listeners); + } + + fn get_rpc_portal(&self) -> Option { + self.config.lock().unwrap().rpc_portal + } + + fn set_rpc_portal(&self, addr: SocketAddr) { + self.config.lock().unwrap().rpc_portal = Some(addr); + } + + fn dump(&self) -> String { + toml::to_string_pretty(&*self.config.lock().unwrap()).unwrap() + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + #[tokio::test] + async fn full_example_test() { + let config_str = r#" +instance_name = "default" +instance_id = "87ede5a2-9c3d-492d-9bbe-989b9d07e742" +ipv4 = "10.144.144.10" +listeners = [ "tcp://0.0.0.0:11010", "udp://0.0.0.0:11010" ] + +[network_identity] +network_name = "default" +network_secret = "" + +[[peer]] +uri = "tcp://public.kkrainbow.top:11010" + +[[peer]] +uri = "udp://192.168.94.33:11010" + +[[proxy_network]] +cidr = "10.147.223.0/24" +allow = ["tcp", "udp", "icmp"] + +[[proxy_network]] +cidr = "10.1.1.0/24" +allow = ["tcp", "icmp"] + +[file_logger] +level = "info" +file = "easytier" +dir = "/tmp/easytier" + +[console_logger] +level = "warn" +"#; + let ret = TomlConfigLoader::new_from_str(config_str); + if let Err(e) = &ret { + println!("{}", e); + } else { + println!("{:?}", ret.as_ref().unwrap()); + } + assert!(ret.is_ok()); + + let ret = ret.unwrap(); + assert_eq!("10.144.144.10", ret.get_ipv4().unwrap().to_string()); + + assert_eq!( + vec!["tcp://0.0.0.0:11010", "udp://0.0.0.0:11010"], + ret.get_listener_uris() + .iter() + .map(|u| u.to_string()) + .collect::>() + ); + + println!("{}", ret.dump()); + } +} diff --git a/easytier-core/src/common/config_fs.rs b/easytier-core/src/common/config_fs.rs deleted file mode 100644 index 7022143..0000000 --- a/easytier-core/src/common/config_fs.rs +++ /dev/null @@ -1,161 +0,0 @@ -// use filesystem as a config store - -use std::{ - ffi::OsStr, - io::Write, - path::{Path, PathBuf}, -}; - -static DEFAULT_BASE_DIR: &str = "/var/lib/easytier"; -static DIR_ROOT_CONFIG_FILE_NAME: &str = "__root__"; - -pub struct ConfigFs { - _db_name: String, - db_path: PathBuf, -} - -impl ConfigFs { - pub fn new(db_name: &str) -> Self { - Self::new_with_dir(db_name, DEFAULT_BASE_DIR) - } - - pub fn new_with_dir(db_name: &str, dir: &str) -> Self { - let p = Path::new(OsStr::new(dir)).join(OsStr::new(db_name)); - std::fs::create_dir_all(&p).unwrap(); - ConfigFs { - _db_name: db_name.to_string(), - db_path: p, - } - } - - pub fn get(&self, key: &str) -> Result { - let path = self.db_path.join(OsStr::new(key)); - // if path is dir, read the DIR_ROOT_CONFIG_FILE_NAME in it - if path.is_dir() { - let path = path.join(OsStr::new(DIR_ROOT_CONFIG_FILE_NAME)); - std::fs::read_to_string(path) - } else if path.is_file() { - return std::fs::read_to_string(path); - } else { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "key not found", - )); - } - } - - pub fn list_keys(&self, key: &str) -> Result, std::io::Error> { - let path = self.db_path.join(OsStr::new(key)); - let mut keys = Vec::new(); - for entry in std::fs::read_dir(path)? { - let entry = entry?; - let path = entry.path(); - let key = path.file_name().unwrap().to_str().unwrap().to_string(); - if key != DIR_ROOT_CONFIG_FILE_NAME { - keys.push(key); - } - } - Ok(keys) - } - - #[allow(dead_code)] - pub fn remove(&self, key: &str) -> Result<(), std::io::Error> { - let path = self.db_path.join(OsStr::new(key)); - // if path is dir, remove the DIR_ROOT_CONFIG_FILE_NAME in it - if path.is_dir() { - std::fs::remove_dir_all(path) - } else if path.is_file() { - return std::fs::remove_file(path); - } else { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "key not found", - )); - } - } - - pub fn add_dir(&self, key: &str) -> Result { - let path = self.db_path.join(OsStr::new(key)); - // if path is dir, write the DIR_ROOT_CONFIG_FILE_NAME in it - if path.is_file() { - Err(std::io::Error::new( - std::io::ErrorKind::AlreadyExists, - "key already exists", - )) - } else { - std::fs::create_dir_all(&path)?; - return std::fs::File::create(path.join(OsStr::new(DIR_ROOT_CONFIG_FILE_NAME))); - } - } - - pub fn add_file(&self, key: &str) -> Result { - let path = self.db_path.join(OsStr::new(key)); - let base_dir = path.parent().unwrap(); - if !path.is_file() { - std::fs::create_dir_all(base_dir)?; - } - std::fs::File::create(path) - } - - pub fn get_or_add( - &self, - key: &str, - val_fn: F, - add_dir: bool, - ) -> Result - where - F: FnOnce() -> String, - { - let get_ret = self.get(key); - match get_ret { - Ok(v) => Ok(v), - Err(e) => { - if e.kind() == std::io::ErrorKind::NotFound { - let val = val_fn(); - if add_dir { - let mut f = self.add_dir(key)?; - f.write_all(val.as_bytes())?; - } else { - let mut f = self.add_file(key)?; - f.write_all(val.as_bytes())?; - } - Ok(val) - } else { - Err(e) - } - } - } - } - - #[allow(dead_code)] - pub fn get_or_add_dir(&self, key: &str, val_fn: F) -> Result - where - F: FnOnce() -> String, - { - self.get_or_add(key, val_fn, true) - } - - pub fn get_or_add_file(&self, key: &str, val_fn: F) -> Result - where - F: FnOnce() -> String, - { - self.get_or_add(key, val_fn, false) - } - - pub fn get_or_default(&self, key: &str, default: F) -> Result - where - F: FnOnce() -> String, - { - let get_ret = self.get(key); - match get_ret { - Ok(v) => Ok(v), - Err(e) => { - if e.kind() == std::io::ErrorKind::NotFound { - Ok(default()) - } else { - Err(e) - } - } - } - } -} diff --git a/easytier-core/src/common/global_ctx.rs b/easytier-core/src/common/global_ctx.rs index 3b4557f..d2ee625 100644 --- a/easytier-core/src/common/global_ctx.rs +++ b/easytier-core/src/common/global_ctx.rs @@ -1,51 +1,42 @@ -use std::{io::Write, sync::Arc}; +use std::sync::Arc; use crate::rpc::PeerConnInfo; use crossbeam::atomic::AtomicCell; -use serde::{Deserialize, Serialize}; use super::{ - config_fs::ConfigFs, + config::ConfigLoader, netns::NetNS, network::IPCollector, stun::{StunInfoCollector, StunInfoCollectorTrait}, PeerId, }; +pub type NetworkIdentity = crate::common::config::NetworkIdentity; + #[derive(Debug, Clone, PartialEq)] pub enum GlobalCtxEvent { + TunDeviceReady(String), + PeerAdded(PeerId), PeerRemoved(PeerId), PeerConnAdded(PeerConnInfo), PeerConnRemoved(PeerConnInfo), + + ListenerAdded(url::Url), + ConnectionAccepted(String, String), // (local url, remote url) + ConnectionError(String, String, String), // (local url, remote url, error message) + + Connecting(url::Url), + ConnectError(String, String), // (dst, error message) } type EventBus = tokio::sync::broadcast::Sender; type EventBusSubscriber = tokio::sync::broadcast::Receiver; -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] -pub struct NetworkIdentity { - pub network_name: String, - pub network_secret: String, -} - -impl NetworkIdentity { - pub fn new(network_name: String, network_secret: String) -> Self { - NetworkIdentity { - network_name, - network_secret, - } - } - - pub fn default() -> Self { - Self::new("default".to_string(), "".to_string()) - } -} - pub struct GlobalCtx { pub inst_name: String, pub id: uuid::Uuid, - pub config_fs: ConfigFs, + pub config: Box, pub net_ns: NetNS, pub network: NetworkIdentity, @@ -76,24 +67,17 @@ impl std::fmt::Debug for GlobalCtx { pub type ArcGlobalCtx = std::sync::Arc; impl GlobalCtx { - pub fn new( - inst_name: &str, - config_fs: ConfigFs, - net_ns: NetNS, - network: Option, - ) -> Self { - let id = config_fs - .get_or_add_file("inst_id", || uuid::Uuid::new_v4().to_string()) - .unwrap(); - let id = uuid::Uuid::parse_str(&id).unwrap(); - let network = network.unwrap_or(NetworkIdentity::default()); + pub fn new(config_fs: impl ConfigLoader + 'static + Send + Sync) -> Self { + let id = config_fs.get_id(); + let network = config_fs.get_network_identity(); + let net_ns = NetNS::new(config_fs.get_netns()); let (event_bus, _) = tokio::sync::broadcast::channel(100); GlobalCtx { - inst_name: inst_name.to_string(), + inst_name: config_fs.get_inst_name(), id, - config_fs, + config: Box::new(config_fs), net_ns: net_ns.clone(), network, @@ -125,42 +109,24 @@ impl GlobalCtx { if let Some(ret) = self.cached_ipv4.load() { return Some(ret); } - - let Ok(addr) = self.config_fs.get("ipv4") else { - return None; - }; - - let Ok(addr) = addr.parse() else { - tracing::error!("invalid ipv4 addr: {}", addr); - return None; - }; - - self.cached_ipv4.store(Some(addr)); - return Some(addr); + let addr = self.config.get_ipv4(); + self.cached_ipv4.store(addr.clone()); + return addr; } pub fn set_ipv4(&mut self, addr: std::net::Ipv4Addr) { - self.config_fs - .add_file("ipv4") - .unwrap() - .write_all(addr.to_string().as_bytes()) - .unwrap(); - + self.config.set_ipv4(addr); self.cached_ipv4.store(None); } pub fn add_proxy_cidr(&self, cidr: cidr::IpCidr) -> Result<(), std::io::Error> { - let escaped_cidr = cidr.to_string().replace("/", "_"); - self.config_fs - .add_file(&format!("proxy_cidrs/{}", escaped_cidr))?; + self.config.add_proxy_cidr(cidr); self.cached_proxy_cidrs.store(None); Ok(()) } pub fn remove_proxy_cidr(&self, cidr: cidr::IpCidr) -> Result<(), std::io::Error> { - let escaped_cidr = cidr.to_string().replace("/", "_"); - self.config_fs - .remove(&format!("proxy_cidrs/{}", escaped_cidr))?; + self.config.remove_proxy_cidr(cidr); self.cached_proxy_cidrs.store(None); Ok(()) } @@ -171,24 +137,19 @@ impl GlobalCtx { return proxy_cidrs; } - let Ok(keys) = self.config_fs.list_keys("proxy_cidrs") else { - return vec![]; - }; - - let mut ret = Vec::new(); - for key in keys.iter() { - let key = key.replace("_", "/"); - let Ok(cidr) = key.parse() else { - tracing::error!("invalid proxy cidr: {}", key); - continue; - }; - ret.push(cidr); - } - + let ret = self.config.get_proxy_cidrs(); self.cached_proxy_cidrs.store(Some(ret.clone())); ret } + pub fn get_id(&self) -> uuid::Uuid { + self.config.get_id() + } + + pub fn get_network_identity(&self) -> NetworkIdentity { + self.config.get_network_identity() + } + pub fn get_ip_collector(&self) -> Arc { self.ip_collector.clone() } @@ -219,27 +180,18 @@ impl GlobalCtx { std::ptr::write(ptr, collector); } } - - pub fn get_id(&self) -> uuid::Uuid { - self.id - } - - pub fn get_network_identity(&self) -> NetworkIdentity { - self.network.clone() - } } #[cfg(test)] pub mod tests { - use crate::common::new_peer_id; + use crate::common::{config::TomlConfigLoader, new_peer_id}; use super::*; #[tokio::test] async fn test_global_ctx() { - let config_fs = ConfigFs::new("/tmp/easytier"); - let net_ns = NetNS::new(None); - let global_ctx = GlobalCtx::new("test", config_fs, net_ns, None); + let config = TomlConfigLoader::default(); + let global_ctx = GlobalCtx::new(config); let mut subscriber = global_ctx.subscribe(); let peer_id = new_peer_id(); @@ -269,15 +221,10 @@ pub mod tests { pub fn get_mock_global_ctx_with_network( network_identy: Option, ) -> ArcGlobalCtx { - let node_id = uuid::Uuid::new_v4(); - let config_fs = ConfigFs::new_with_dir(node_id.to_string().as_str(), "/tmp/easytier"); - let net_ns = NetNS::new(None); - std::sync::Arc::new(GlobalCtx::new( - format!("test_{}", node_id).as_str(), - config_fs, - net_ns, - network_identy, - )) + let config_fs = TomlConfigLoader::default(); + config_fs.set_inst_name(format!("test_{}", config_fs.get_id())); + config_fs.set_network_identity(network_identy.unwrap_or(NetworkIdentity::default())); + std::sync::Arc::new(GlobalCtx::new(config_fs)) } pub fn get_mock_global_ctx() -> ArcGlobalCtx { diff --git a/easytier-core/src/common/mod.rs b/easytier-core/src/common/mod.rs index ca02025..b6b2559 100644 --- a/easytier-core/src/common/mod.rs +++ b/easytier-core/src/common/mod.rs @@ -1,4 +1,4 @@ -pub mod config_fs; +pub mod config; pub mod constants; pub mod error; pub mod global_ctx; diff --git a/easytier-core/src/connector/direct.rs b/easytier-core/src/connector/direct.rs index af90acb..b598a82 100644 --- a/easytier-core/src/connector/direct.rs +++ b/easytier-core/src/connector/direct.rs @@ -296,11 +296,7 @@ mod tests { dm_a.run_as_client(); dm_c.run_as_server(); - let mut lis_c = ListenerManager::new( - p_c.my_node_id(), - p_c.get_global_ctx().net_ns.clone(), - p_c.clone(), - ); + let mut lis_c = ListenerManager::new(p_c.get_global_ctx(), p_c.clone()); lis_c .add_listener(TcpTunnelListener::new( diff --git a/easytier-core/src/connector/manual.rs b/easytier-core/src/connector/manual.rs index 7107863..49538e0 100644 --- a/easytier-core/src/connector/manual.rs +++ b/easytier-core/src/connector/manual.rs @@ -214,8 +214,7 @@ impl ManualConnectorManager { log::warn!("peer conn removed: {:?}", conn_info); } - GlobalCtxEvent::PeerAdded(..) => {} - GlobalCtxEvent::PeerRemoved(..) => {} + _ => {} } } @@ -271,6 +270,11 @@ impl ManualConnectorManager { let conn = locked.as_mut().unwrap(); // TODO: should support set v6 here, use url in connector array set_bind_addr_for_peer_connector(conn, true, &ip_collector).await; + + data_clone + .global_ctx + .issue_event(GlobalCtxEvent::Connecting(conn.remote_url().clone())); + let _g = net_ns.guard(); log::info!("reconnect try connect... conn: {:?}", conn); let tunnel = conn.connect().await?; @@ -293,6 +297,13 @@ impl ManualConnectorManager { let ret = timeout(std::time::Duration::from_secs(1), reconn_task).await; log::info!("reconnect: {} done, ret: {:?}", dead_url, ret); + if ret.is_err() || ret.as_ref().unwrap().is_err() { + data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( + dead_url.clone(), + format!("{:?}", ret), + )); + } + let conn = connector.lock().await.take().unwrap(); data.reconnecting.remove(&dead_url).unwrap(); data.connectors.insert(dead_url.clone(), conn); diff --git a/easytier-core/src/instance/instance.rs b/easytier-core/src/instance/instance.rs index 866f033..d212ebf 100644 --- a/easytier-core/src/instance/instance.rs +++ b/easytier-core/src/instance/instance.rs @@ -1,7 +1,8 @@ use std::borrow::BorrowMut; -use std::io::Write; +use std::net::Ipv4Addr; use std::sync::Arc; +use anyhow::Context; use futures::StreamExt; use pnet::packet::ethernet::EthernetPacket; use pnet::packet::ipv4::Ipv4Packet; @@ -10,10 +11,9 @@ use tokio::{sync::Mutex, task::JoinSet}; use tokio_util::bytes::{Bytes, BytesMut}; use tonic::transport::Server; -use crate::common::config_fs::ConfigFs; +use crate::common::config::ConfigLoader; use crate::common::error::Error; -use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx}; -use crate::common::netns::NetNS; +use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent}; use crate::common::PeerId; use crate::connector::direct::DirectConnectorManager; use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager}; @@ -32,43 +32,6 @@ use tokio_stream::wrappers::ReceiverStream; use super::listeners::ListenerManager; use super::virtual_nic; -pub struct InstanceConfigWriter { - config: ConfigFs, -} - -impl InstanceConfigWriter { - pub fn new(inst_name: &str) -> Self { - InstanceConfigWriter { - config: ConfigFs::new(inst_name), - } - } - - pub fn set_ns(self, net_ns: Option) -> Self { - let net_ns_in_conf = if let Some(net_ns) = net_ns { - net_ns - } else { - "".to_string() - }; - - self.config - .add_file("net_ns") - .unwrap() - .write_all(net_ns_in_conf.as_bytes()) - .unwrap(); - - self - } - - pub fn set_addr(self, addr: String) -> Self { - self.config - .add_file("ipv4") - .unwrap() - .write_all(addr.as_bytes()) - .unwrap(); - self - } -} - pub struct Instance { inst_name: String, @@ -95,30 +58,16 @@ pub struct Instance { } impl Instance { - pub fn new(inst_name: &str) -> Self { - let config = ConfigFs::new(inst_name); - let net_ns_in_conf = config.get_or_default("net_ns", || "".to_string()).unwrap(); - let net_ns = NetNS::new(if net_ns_in_conf.is_empty() { - None - } else { - Some(net_ns_in_conf.clone()) - }); - - let addr = config - .get_or_default("ipv4", || "10.144.144.10".to_string()) - .unwrap(); + pub fn new(config: impl ConfigLoader + Send + Sync + 'static) -> Self { + let global_ctx = Arc::new(GlobalCtx::new(config)); log::info!( - "[INIT] instance creating. inst_name: {}, addr: {}, netns: {}", - inst_name, - addr, - net_ns_in_conf + "[INIT] instance creating. config: {}", + global_ctx.config.dump() ); let (peer_packet_sender, peer_packet_receiver) = tokio::sync::mpsc::channel(100); - let global_ctx = Arc::new(GlobalCtx::new(inst_name, config, net_ns.clone(), None)); - let id = global_ctx.get_id(); let peer_manager = Arc::new(PeerManager::new( @@ -128,8 +77,7 @@ impl Instance { )); let listener_manager = Arc::new(Mutex::new(ListenerManager::new( - peer_manager.my_node_id(), - net_ns.clone(), + global_ctx.clone(), peer_manager.clone(), ))); @@ -145,13 +93,17 @@ impl Instance { let udp_hole_puncher = UdpHolePunchConnector::new(global_ctx.clone(), peer_manager.clone()); let arc_tcp_proxy = TcpProxy::new(global_ctx.clone(), peer_manager.clone()); - let arc_icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone()).unwrap(); - let arc_udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone()).unwrap(); + let arc_icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone()) + .with_context(|| "create icmp proxy failed") + .unwrap(); + let arc_udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone()) + .with_context(|| "create udp proxy failed") + .unwrap(); let peer_center = Arc::new(PeerCenterInstance::new(peer_manager.clone())); Instance { - inst_name: inst_name.to_string(), + inst_name: global_ctx.inst_name.clone(), id, virtual_nic: None, @@ -251,22 +203,22 @@ impl Instance { }); } - pub async fn run(&mut self) -> Result<(), Error> { - let ipv4_addr = self.global_ctx.get_ipv4().unwrap(); + async fn add_initial_peers(&mut self) -> Result<(), Error> { + for peer in self.global_ctx.config.get_peers().iter() { + self.get_conn_manager() + .add_connector_by_url(peer.uri.as_str()) + .await?; + } + Ok(()) + } - let mut nic = virtual_nic::VirtualNic::new(self.get_global_ctx()) + async fn prepare_tun_device(&mut self) -> Result<(), Error> { + let nic = virtual_nic::VirtualNic::new(self.get_global_ctx()) .create_dev() - .await? - .link_up() - .await? - .remove_ip(None) - .await? - .add_ip(ipv4_addr, 24) .await?; - if cfg!(target_os = "macos") { - nic = nic.add_route(ipv4_addr, 24).await?; - } + self.global_ctx + .issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string())); self.virtual_nic = Some(Arc::new(nic)); @@ -277,6 +229,26 @@ impl Instance { self.peer_packet_receiver.take(), ); + Ok(()) + } + + async fn assign_ipv4_to_tun_device(&mut self, ipv4_addr: Ipv4Addr) -> Result<(), Error> { + let nic = self.virtual_nic.as_ref().unwrap().clone(); + nic.link_up().await?; + nic.remove_ip(None).await?; + nic.add_ip(ipv4_addr, 24).await?; + if cfg!(target_os = "macos") { + nic.add_route(ipv4_addr, 24).await?; + } + Ok(()) + } + + pub async fn run(&mut self) -> Result<(), Error> { + self.prepare_tun_device().await?; + if let Some(ipv4_addr) = self.global_ctx.get_ipv4() { + self.assign_ipv4_to_tun_device(ipv4_addr).await?; + } + self.listener_manager .lock() .await @@ -296,6 +268,8 @@ impl Instance { self.peer_center.init().await; + self.add_initial_peers().await?; + Ok(()) } @@ -331,7 +305,10 @@ impl Instance { } fn run_rpc_server(&mut self) -> Result<(), Box> { - let addr = "0.0.0.0:15888".parse()?; + let Some(addr) = self.global_ctx.config.get_rpc_portal() else { + tracing::info!("rpc server not enabled, because rpc_portal is not set."); + return Ok(()); + }; let peer_mgr = self.peer_manager.clone(); let conn_manager = self.conn_manager.clone(); let net_ns = self.global_ctx.net_ns.clone(); @@ -339,7 +316,6 @@ impl Instance { self.tasks.spawn(async move { let _g = net_ns.guard(); - log::info!("[INIT RPC] start rpc server. addr: {}", addr); Server::builder() .add_service( crate::rpc::peer_manage_rpc_server::PeerManageRpcServer::new( @@ -358,6 +334,7 @@ impl Instance { ) .serve(addr) .await + .with_context(|| format!("rpc server failed. addr: {}", addr)) .unwrap(); }); Ok(()) diff --git a/easytier-core/src/instance/listeners.rs b/easytier-core/src/instance/listeners.rs index 47309c3..d938a5a 100644 --- a/easytier-core/src/instance/listeners.rs +++ b/easytier-core/src/instance/listeners.rs @@ -1,10 +1,15 @@ use std::{fmt::Debug, sync::Arc}; +use anyhow::Context; use async_trait::async_trait; use tokio::{sync::Mutex, task::JoinSet}; use crate::{ - common::{error::Error, netns::NetNS}, + common::{ + error::Error, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, + netns::NetNS, + }, peers::peer_manager::PeerManager, tunnels::{ ring_tunnel::RingTunnelListener, tcp_tunnel::TcpTunnelListener, @@ -26,7 +31,7 @@ impl TunnelHandlerForListener for PeerManager { } pub struct ListenerManager { - my_node_id: uuid::Uuid, + global_ctx: ArcGlobalCtx, net_ns: NetNS, listeners: Vec>>, peer_manager: Arc, @@ -35,10 +40,10 @@ pub struct ListenerManager { } impl ListenerManager { - pub fn new(my_node_id: uuid::Uuid, net_ns: NetNS, peer_manager: Arc) -> Self { + pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc) -> Self { Self { - my_node_id, - net_ns, + global_ctx: global_ctx.clone(), + net_ns: global_ctx.net_ns.clone(), listeners: Vec::new(), peer_manager, tasks: JoinSet::new(), @@ -46,18 +51,27 @@ impl ListenerManage } pub async fn prepare_listeners(&mut self) -> Result<(), Error> { - self.add_listener(UdpTunnelListener::new( - "udp://0.0.0.0:11010".parse().unwrap(), - )) - .await?; - self.add_listener(TcpTunnelListener::new( - "tcp://0.0.0.0:11010".parse().unwrap(), - )) - .await?; self.add_listener(RingTunnelListener::new( - format!("ring://{}", self.my_node_id).parse().unwrap(), + format!("ring://{}", self.global_ctx.get_id()) + .parse() + .unwrap(), )) .await?; + + for l in self.global_ctx.config.get_listener_uris().iter() { + match l.scheme() { + "tcp" => { + self.add_listener(TcpTunnelListener::new(l.clone())).await?; + } + "udp" => { + self.add_listener(UdpTunnelListener::new(l.clone())).await?; + } + _ => { + log::warn!("unsupported listener uri: {}", l); + } + } + } + Ok(()) } @@ -71,12 +85,27 @@ impl ListenerManage } #[tracing::instrument] - async fn run_listener(listener: Arc>, peer_manager: Arc) { + async fn run_listener( + listener: Arc>, + peer_manager: Arc, + global_ctx: ArcGlobalCtx, + ) { let mut l = listener.lock().await; + global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url())); while let Ok(ret) = l.accept().await { + let tunnel_info = ret.info().unwrap(); + global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted( + tunnel_info.local_addr.clone(), + tunnel_info.remote_addr.clone(), + )); tracing::info!(ret = ?ret, "conn accepted"); let server_ret = peer_manager.handle_tunnel(ret).await; if let Err(e) = &server_ret { + global_ctx.issue_event(GlobalCtxEvent::ConnectionError( + tunnel_info.local_addr, + tunnel_info.remote_addr, + e.to_string(), + )); tracing::error!(error = ?e, "handle conn error"); } } @@ -85,11 +114,18 @@ impl ListenerManage pub async fn run(&mut self) -> Result<(), Error> { for listener in &self.listeners { let _guard = self.net_ns.guard(); + let addr = listener.lock().await.local_url(); log::warn!("run listener: {:?}", listener); - listener.lock().await.listen().await?; + listener + .lock() + .await + .listen() + .await + .with_context(|| format!("failed to add listener {}", addr))?; self.tasks.spawn(Self::run_listener( listener.clone(), self.peer_manager.clone(), + self.global_ctx.clone(), )); } @@ -102,7 +138,10 @@ mod tests { use futures::{SinkExt, StreamExt}; use tokio::time::timeout; - use crate::tunnels::{ring_tunnel::RingTunnelConnector, TunnelConnector}; + use crate::{ + common::global_ctx::tests::get_mock_global_ctx, + tunnels::{ring_tunnel::RingTunnelConnector, TunnelConnector}, + }; use super::*; @@ -120,10 +159,8 @@ mod tests { #[tokio::test] async fn handle_error_in_accept() { - let net_ns = NetNS::new(None); let handler = Arc::new(MockListenerHandler {}); - let mut listener_mgr = - ListenerManager::new(uuid::Uuid::new_v4(), net_ns.clone(), handler.clone()); + let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone()); let ring_id = format!("ring://{}", uuid::Uuid::new_v4()); diff --git a/easytier-core/src/instance/virtual_nic.rs b/easytier-core/src/instance/virtual_nic.rs index 3f92874..fcc414c 100644 --- a/easytier-core/src/instance/virtual_nic.rs +++ b/easytier-core/src/instance/virtual_nic.rs @@ -119,32 +119,32 @@ impl VirtualNic { self.ifname.as_ref().unwrap().as_str() } - pub async fn link_up(self) -> Result { + pub async fn link_up(&self) -> Result<()> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg.set_link_status(self.ifname(), true).await?; - Ok(self) + Ok(()) } - pub async fn add_route(self, address: Ipv4Addr, cidr: u8) -> Result { + pub async fn add_route(&self, address: Ipv4Addr, cidr: u8) -> Result<()> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg .add_ipv4_route(self.ifname(), address, cidr) .await?; - Ok(self) + Ok(()) } - pub async fn remove_ip(self, ip: Option) -> Result { + pub async fn remove_ip(&self, ip: Option) -> Result<()> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg.remove_ip(self.ifname(), ip).await?; - Ok(self) + Ok(()) } - pub async fn add_ip(self, ip: Ipv4Addr, cidr: i32) -> Result { + pub async fn add_ip(&self, ip: Ipv4Addr, cidr: i32) -> Result<()> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg .add_ipv4_ip(self.ifname(), ip, cidr as u8) .await?; - Ok(self) + Ok(()) } pub fn pin_recv_stream(&self) -> Pin> { @@ -170,12 +170,10 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - dev.link_up() - .await? - .remove_ip(None) - .await? - .add_ip("10.144.111.1".parse().unwrap(), 24) - .await + dev.link_up().await?; + dev.remove_ip(None).await?; + dev.add_ip("10.144.111.1".parse().unwrap(), 24).await?; + Ok(dev) } #[tokio::test] diff --git a/easytier-core/src/main.rs b/easytier-core/src/main.rs index 0457e23..215b501 100644 --- a/easytier-core/src/main.rs +++ b/easytier-core/src/main.rs @@ -3,6 +3,9 @@ #[cfg(test)] mod tests; +use std::net::SocketAddr; + +use anyhow::Context; use clap::Parser; mod arch; @@ -15,51 +18,210 @@ mod peers; mod rpc; mod tunnels; -use common::get_logger_timer_rfc3339; -use instance::instance::{Instance, InstanceConfigWriter}; +use common::{ + config::{ConsoleLoggerConfig, FileLoggerConfig, PeerConfig}, + get_logger_timer_rfc3339, +}; +use instance::instance::Instance; use tracing::level_filters::LevelFilter; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +use crate::common::{ + config::{ConfigLoader, TomlConfigLoader}, + global_ctx::GlobalCtxEvent, +}; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { /// the instance name - #[arg(short = 'n', long, default_value = "default")] + #[arg( + short = 'm', + long, + default_value = "default", + help = "instance name to identify this vpn node in same machine" + )] instance_name: String, - /// specify the network namespace, default is the root namespace + #[arg( + short = 'd', + long, + help = "instance uuid to identify this vpn node in whole vpn network example: 123e4567-e89b-12d3-a456-426614174000" + )] + instance_id: Option, + + #[arg(short, long, help = "ipv4 address of this vpn node")] + ipv4: Option, + + #[arg(short, long, help = "peers to connect initially")] + peers: Vec, + + #[arg(short, long, help = "use a public shared node to discover peers")] + external_node: Option, + + #[arg( + short = 'n', + long, + help = "export local networks to other peers in the vpn" + )] + proxy_networks: Vec, + + #[arg( + short, + long, + default_value = "127.0.0.1:15888", + help = "rpc portal address to listen for management" + )] + rpc_portal: SocketAddr, + + #[arg(short, long, help = "listeners to accept connections, pass '' to avoid listening.", + default_values_t = ["tcp://0.0.0.0:11010".to_string(), + "udp://0.0.0.0:11010".to_string()])] + listeners: Vec, + + /// specify the linux network namespace, default is the root namespace #[arg(long)] net_ns: Option, - #[arg(short, long)] - ipv4: Option, + #[arg(long, help = "console log level", + value_parser = clap::builder::PossibleValuesParser::new(["trace", "debug", "info", "warn", "error", "off"]))] + console_log_level: Option, - #[arg(short, long)] - peers: Vec, + #[arg(long, help = "file log level", + value_parser = clap::builder::PossibleValuesParser::new(["trace", "debug", "info", "warn", "error", "off"]))] + file_log_level: Option, + #[arg(long, help = "directory to store log files")] + file_log_dir: Option, } -fn init_logger(dir: Option<&str>, file: Option<&str>) { +impl From for TomlConfigLoader { + fn from(cli: Cli) -> Self { + let cfg = TomlConfigLoader::default(); + cfg.set_inst_name(cli.instance_name.clone()); + + cfg.set_netns(cli.net_ns.clone()); + if let Some(ipv4) = &cli.ipv4 { + cfg.set_ipv4( + ipv4.parse() + .with_context(|| format!("failed to parse ipv4 address: {}", ipv4)) + .unwrap(), + ) + } + + cfg.set_peers( + cli.peers + .iter() + .map(|s| PeerConfig { + uri: s + .parse() + .with_context(|| format!("failed to parse peer uri: {}", s)) + .unwrap(), + }) + .collect(), + ); + + cfg.set_listeners( + cli.listeners + .iter() + .filter_map(|s| { + if s.is_empty() { + return None; + } + + Some( + s.parse() + .with_context(|| format!("failed to parse listener uri: {}", s)) + .unwrap(), + ) + }) + .collect(), + ); + + for n in cli.proxy_networks.iter() { + cfg.add_proxy_cidr( + n.parse() + .with_context(|| format!("failed to parse proxy network: {}", n)) + .unwrap(), + ); + } + + cfg.set_rpc_portal(cli.rpc_portal); + + if cli.external_node.is_some() { + let mut old_peers = cfg.get_peers(); + old_peers.push(PeerConfig { + uri: cli + .external_node + .clone() + .unwrap() + .parse() + .with_context(|| { + format!( + "failed to parse external node uri: {}", + cli.external_node.unwrap() + ) + }) + .unwrap(), + }); + cfg.set_peers(old_peers); + } + + if cli.console_log_level.is_some() { + cfg.set_console_logger_config(ConsoleLoggerConfig { + level: cli.console_log_level.clone(), + }); + } + + if cli.file_log_dir.is_some() || cli.file_log_level.is_some() { + cfg.set_file_logger_config(FileLoggerConfig { + level: cli.file_log_level.clone(), + dir: cli.file_log_dir.clone(), + file: Some(format!("easytier-{}", cli.instance_name)), + }); + } + + cfg + } +} + +fn init_logger(config: impl ConfigLoader) { + let file_config = config.get_file_logger_config(); + let file_level = file_config + .level + .map(|s| s.parse().unwrap()) + .unwrap_or(LevelFilter::OFF); + // logger to rolling file - let file_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env() - .unwrap(); - let file_appender = tracing_appender::rolling::Builder::new() - .rotation(tracing_appender::rolling::Rotation::DAILY) - .max_log_files(5) - .filename_prefix(file.unwrap_or("easytier")) - .build(dir.unwrap_or("/tmp/easytier")) - .expect("failed to initialize rolling file appender"); - let mut file_layer = tracing_subscriber::fmt::layer(); - file_layer.set_ansi(false); - let file_layer = file_layer - .with_writer(file_appender) - .with_timer(get_logger_timer_rfc3339()) - .with_filter(file_filter); + let mut file_layer = None; + if file_level != LevelFilter::OFF { + let mut l = tracing_subscriber::fmt::layer(); + l.set_ansi(false); + let file_filter = EnvFilter::builder() + .with_default_directive(file_level.into()) + .from_env() + .unwrap(); + let file_appender = tracing_appender::rolling::Builder::new() + .rotation(tracing_appender::rolling::Rotation::DAILY) + .max_log_files(5) + .filename_prefix(file_config.file.unwrap_or("easytier".to_string())) + .build(file_config.dir.unwrap_or("./".to_string())) + .expect("failed to initialize rolling file appender"); + file_layer = Some( + l.with_writer(file_appender) + .with_timer(get_logger_timer_rfc3339()) + .with_filter(file_filter), + ); + } // logger to console + let console_config = config.get_console_logger_config(); + let console_level = console_config + .level + .map(|s| s.parse().unwrap()) + .unwrap_or(LevelFilter::OFF); + let console_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::WARN.into()) + .with_default_directive(console_level.into()) .from_env() .unwrap(); let console_layer = tracing_subscriber::fmt::layer() @@ -74,36 +236,100 @@ fn init_logger(dir: Option<&str>, file: Option<&str>) { .init(); } +fn print_event(msg: String) { + println!( + "{}: {}", + chrono::Local::now().format("%Y-%m-%d %H:%M:%S"), + msg + ); +} + +fn peer_conn_info_to_string(p: crate::rpc::PeerConnInfo) -> String { + format!( + "my_peer_id: {}, dst_peer_id: {}, tunnel_info: {:?}", + p.my_peer_id, p.peer_id, p.tunnel + ) +} + #[tokio::main(flavor = "current_thread")] #[tracing::instrument] pub async fn main() { - init_logger(Some("/var/log/easytier"), Some("core.log")); - let cli = Cli::parse(); tracing::info!(cli = ?cli, "cli args parsed"); - let cfg = InstanceConfigWriter::new(cli.instance_name.as_str()).set_ns(cli.net_ns.clone()); - if let Some(ipv4) = &cli.ipv4 { - cfg.set_addr(ipv4.clone()); - } + let cfg: TomlConfigLoader = cli.into(); - let mut inst = Instance::new(cli.instance_name.as_str()); + init_logger(&cfg); + let mut inst = Instance::new(cfg.clone()); let mut events = inst.get_global_ctx().subscribe(); tokio::spawn(async move { while let Ok(e) = events.recv().await { - log::warn!("event: {:?}", e); + match e { + GlobalCtxEvent::PeerAdded(p) => { + print_event(format!("new peer added. peer_id: {}", p)); + } + + GlobalCtxEvent::PeerRemoved(p) => { + print_event(format!("peer removed. peer_id: {}", p)); + } + + GlobalCtxEvent::PeerConnAdded(p) => { + print_event(format!( + "new peer connection added. conn_info: {}", + peer_conn_info_to_string(p) + )); + } + + GlobalCtxEvent::PeerConnRemoved(p) => { + print_event(format!( + "peer connection removed. conn_info: {}", + peer_conn_info_to_string(p) + )); + } + + GlobalCtxEvent::ListenerAdded(p) => { + if p.scheme() == "ring" { + continue; + } + print_event(format!("new listener added. listener: {}", p)); + } + + GlobalCtxEvent::ConnectionAccepted(local, remote) => { + print_event(format!( + "new connection accepted. local: {}, remote: {}", + local, remote + )); + } + + GlobalCtxEvent::ConnectionError(local, remote, err) => { + print_event(format!( + "connection error. local: {}, remote: {}, err: {}", + local, remote, err + )); + } + + GlobalCtxEvent::TunDeviceReady(dev) => { + print_event(format!("tun device ready. dev: {}", dev)); + } + + GlobalCtxEvent::Connecting(dst) => { + print_event(format!("connecting to peer. dst: {}", dst)); + } + + GlobalCtxEvent::ConnectError(dst, err) => { + print_event(format!("connect to peer error. dst: {}, err: {}", dst, err)); + } + } } }); - inst.run().await.unwrap(); + println!("Starting easytier with config:"); + println!("############### TOML ##############\n"); + println!("{}", cfg.dump()); + println!("-----------------------------------"); - for peer in cli.peers { - inst.get_conn_manager() - .add_connector_by_url(peer.as_str()) - .await - .unwrap(); - } + inst.run().await.unwrap(); inst.wait().await; } diff --git a/easytier-core/src/peers/peer.rs b/easytier-core/src/peers/peer.rs index 8bf0557..fea4a39 100644 --- a/easytier-core/src/peers/peer.rs +++ b/easytier-core/src/peers/peer.rs @@ -152,12 +152,11 @@ impl Drop for Peer { #[cfg(test)] mod tests { - use std::sync::Arc; use tokio::{sync::mpsc, time::timeout}; use crate::{ - common::{config_fs::ConfigFs, global_ctx::GlobalCtx, netns::NetNS, new_peer_id}, + common::{global_ctx::tests::get_mock_global_ctx, new_peer_id}, peers::peer_conn::PeerConn, tunnels::ring_tunnel::create_ring_tunnel_pair, }; @@ -168,12 +167,7 @@ mod tests { async fn close_peer() { let (local_packet_send, _local_packet_recv) = mpsc::channel(10); let (remote_packet_send, _remote_packet_recv) = mpsc::channel(10); - let global_ctx = Arc::new(GlobalCtx::new( - "test", - ConfigFs::new("/tmp/easytier-test"), - NetNS::new(None), - None, - )); + let global_ctx = get_mock_global_ctx(); let local_peer = Peer::new(new_peer_id(), local_packet_send, global_ctx.clone()); let remote_peer = Peer::new(new_peer_id(), remote_packet_send, global_ctx.clone()); diff --git a/easytier-core/src/peers/peer_conn.rs b/easytier-core/src/peers/peer_conn.rs index 416cff1..78bcd24 100644 --- a/easytier-core/src/peers/peer_conn.rs +++ b/easytier-core/src/peers/peer_conn.rs @@ -540,10 +540,7 @@ mod tests { use std::sync::Arc; use super::*; - use crate::common::config_fs::ConfigFs; use crate::common::global_ctx::tests::get_mock_global_ctx; - use crate::common::global_ctx::GlobalCtx; - use crate::common::netns::NetNS; use crate::common::new_peer_id; use crate::tunnels::tunnel_filter::tests::DropSendTunnelFilter; use crate::tunnels::tunnel_filter::{PacketRecorderTunnelFilter, TunnelWithFilter}; @@ -562,27 +559,9 @@ mod tests { let c_peer_id = new_peer_id(); let s_peer_id = new_peer_id(); - let mut c_peer = PeerConn::new( - c_peer_id, - Arc::new(GlobalCtx::new( - "c", - ConfigFs::new_with_dir("c", "/tmp"), - NetNS::new(None), - None, - )), - Box::new(c), - ); + let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c)); - let mut s_peer = PeerConn::new( - s_peer_id, - Arc::new(GlobalCtx::new( - "c", - ConfigFs::new_with_dir("c", "/tmp"), - NetNS::new(None), - None, - )), - Box::new(s), - ); + let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s)); let (c_ret, s_ret) = tokio::join!( c_peer.do_handshake_as_client(), diff --git a/easytier-core/src/peers/peer_ospf_route.rs b/easytier-core/src/peers/peer_ospf_route.rs index 97a8b58..7c245db 100644 --- a/easytier-core/src/peers/peer_ospf_route.rs +++ b/easytier-core/src/peers/peer_ospf_route.rs @@ -1004,6 +1004,7 @@ impl RouteSessionManager { tracing::info!(?service_impl.my_peer_id, ?peers, ?session_peers, ?initiator_candidates, "maintain_sessions begin"); if initiator_candidates.is_empty() { + next_sleep_ms = 1000; continue; } diff --git a/easytier-core/src/tests/three_node.rs b/easytier-core/src/tests/three_node.rs index d5fca58..bcfa36b 100644 --- a/easytier-core/src/tests/three_node.rs +++ b/easytier-core/src/tests/three_node.rs @@ -5,8 +5,11 @@ use tokio::{net::UdpSocket, task::JoinSet}; use super::*; use crate::{ - common::netns::{NetNS, ROOT_NETNS_NAME}, - instance::instance::{Instance, InstanceConfigWriter}, + common::{ + config::{ConfigLoader, TomlConfigLoader}, + netns::{NetNS, ROOT_NETNS_NAME}, + }, + instance::instance::Instance, tunnels::{ common::tests::_tunnel_pingpong_netns, ring_tunnel::RingTunnelConnector, @@ -35,29 +38,25 @@ pub fn prepare_linux_namespaces() { add_ns_to_bridge("br_b", "net_d"); } -pub async fn prepare_inst_configs() { - InstanceConfigWriter::new("inst1") - .set_ns(Some("net_a".into())) - .set_addr("10.144.144.1".to_owned()); - - InstanceConfigWriter::new("inst2") - .set_ns(Some("net_b".into())) - .set_addr("10.144.144.2".to_owned()); - - InstanceConfigWriter::new("inst3") - .set_ns(Some("net_c".into())) - .set_addr("10.144.144.3".to_owned()); +pub fn get_inst_config(inst_name: &str, ns: Option<&str>, ipv4: &str) -> TomlConfigLoader { + let config = TomlConfigLoader::default(); + config.set_inst_name(inst_name.to_owned()); + config.set_netns(ns.map(|s| s.to_owned())); + config.set_ipv4(ipv4.parse().unwrap()); + config.set_listeners(vec![ + "tcp://0.0.0.0:11010".parse().unwrap(), + "udp://0.0.0.0:11010".parse().unwrap(), + ]); + config } pub async fn init_three_node(proto: &str) -> Vec { log::set_max_level(log::LevelFilter::Info); prepare_linux_namespaces(); - prepare_inst_configs().await; - - let mut inst1 = Instance::new("inst1"); - let mut inst2 = Instance::new("inst2"); - let mut inst3 = Instance::new("inst3"); + let mut inst1 = Instance::new(get_inst_config("inst1", Some("net_a"), "10.144.144.1")); + let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_b"), "10.144.144.2")); + let mut inst3 = Instance::new(get_inst_config("inst3", Some("net_c"), "10.144.144.3")); inst1.run().await.unwrap(); inst2.run().await.unwrap(); @@ -205,11 +204,7 @@ pub async fn icmp_proxy_three_node_test() { #[tokio::test] #[serial_test::serial] pub async fn proxy_three_node_disconnect_test() { - InstanceConfigWriter::new("inst4") - .set_ns(Some("net_d".into())) - .set_addr("10.144.144.4".to_owned()); - - let mut inst4 = Instance::new("inst4"); + let mut inst4 = Instance::new(get_inst_config("inst4", Some("net_d"), "10.144.144.4")); inst4 .get_conn_manager() .add_connector(TcpTunnelConnector::new( diff --git a/easytier-core/src/tunnels/common.rs b/easytier-core/src/tunnels/common.rs index 682da1f..ab2b931 100644 --- a/easytier-core/src/tunnels/common.rs +++ b/easytier-core/src/tunnels/common.rs @@ -289,8 +289,8 @@ pub(crate) fn setup_sokcet2( socket2_socket.set_reuse_address(true)?; socket2_socket.bind(&socket2::SockAddr::from(*bind_addr))?; - #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] - socket2_socket.set_reuse_port(true)?; + // #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] + // socket2_socket.set_reuse_port(true)?; if bind_addr.ip().is_unspecified() { return Ok(()); diff --git a/easytier-core/src/tunnels/tcp_tunnel.rs b/easytier-core/src/tunnels/tcp_tunnel.rs index 19aa703..a6931a3 100644 --- a/easytier-core/src/tunnels/tcp_tunnel.rs +++ b/easytier-core/src/tunnels/tcp_tunnel.rs @@ -38,8 +38,8 @@ impl TunnelListener for TcpTunnelListener { }; socket.set_reuseaddr(true)?; - #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] - socket.set_reuseport(true)?; + // #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] + // socket.set_reuseport(true)?; socket.bind(addr)?; self.listener = Some(socket.listen(1024)?);