diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 3865622..caeb123 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -37,28 +37,28 @@ jobs: matrix: include: - TARGET: aarch64-unknown-linux-musl - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-aarch64 - TARGET: x86_64-unknown-linux-musl - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-x86_64 - TARGET: mips-unknown-linux-musl - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-mips - TARGET: mipsel-unknown-linux-musl - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-mipsel - TARGET: armv7-unknown-linux-musleabihf # raspberry pi 2-3-4, not tested - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-armv7hf - TARGET: armv7-unknown-linux-musleabi # raspberry pi 2-3-4, not tested - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-armv7 - TARGET: arm-unknown-linux-musleabihf # raspberry pi 0-1, not tested - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-armhf - TARGET: arm-unknown-linux-musleabi # raspberry pi 0-1, not tested - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: linux-arm - TARGET: x86_64-apple-darwin @@ -73,7 +73,7 @@ jobs: ARTIFACT_NAME: windows-x86_64 - TARGET: x86_64-unknown-freebsd - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: freebsd-13.2-x86_64 BSD_VERSION: 13.2 diff --git a/.github/workflows/gui.yml b/.github/workflows/gui.yml index c6fd682..cd51b9e 100644 --- a/.github/workflows/gui.yml +++ b/.github/workflows/gui.yml @@ -36,11 +36,11 @@ jobs: matrix: include: - TARGET: aarch64-unknown-linux-musl - OS: ubuntu-latest + OS: ubuntu-22.04 GUI_TARGET: aarch64-unknown-linux-gnu ARTIFACT_NAME: linux-aarch64 - TARGET: x86_64-unknown-linux-musl - OS: ubuntu-latest + OS: ubuntu-22.04 GUI_TARGET: x86_64-unknown-linux-gnu ARTIFACT_NAME: linux-x86_64 @@ -123,30 +123,30 @@ jobs: if: ${{ matrix.TARGET == 'aarch64-unknown-linux-musl' }} run: | # see https://tauri.app/v1/guides/building/linux/ - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble main restricted" | sudo tee /etc/apt/sources.list - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble-updates main restricted" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble universe" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble-updates universe" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble-updates multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ noble-backports main restricted universe multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://security.ubuntu.com/ubuntu/ noble-security main restricted" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://security.ubuntu.com/ubuntu/ noble-security universe" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=amd64] http://security.ubuntu.com/ubuntu/ noble-security multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy main restricted" | sudo tee /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy universe" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy-updates universe" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy-updates multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://security.ubuntu.com/ubuntu/ jammy-security main restricted" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://security.ubuntu.com/ubuntu/ jammy-security universe" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=amd64] http://security.ubuntu.com/ubuntu/ jammy-security multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble main restricted" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-updates main restricted" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble universe" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-updates universe" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-updates multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-backports main restricted universe multiverse" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-security main restricted" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-security universe" | sudo tee -a /etc/apt/sources.list - echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports noble-security multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy main restricted" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-updates main restricted" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy universe" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-updates universe" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-updates multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-backports main restricted universe multiverse" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-security main restricted" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-security universe" | sudo tee -a /etc/apt/sources.list + echo "deb [arch=armhf,arm64] http://ports.ubuntu.com/ubuntu-ports jammy-security multiverse" | sudo tee -a /etc/apt/sources.list sudo dpkg --add-architecture arm64 - sudo apt-get update && sudo apt-get upgrade -y + sudo apt-get update sudo apt install -f -o Dpkg::Options::="--force-overwrite" libwebkit2gtk-4.1-dev:arm64 libssl-dev:arm64 gcc-aarch64-linux-gnu echo "PKG_CONFIG_SYSROOT_DIR=/usr/aarch64-linux-gnu/" >> "$GITHUB_ENV" echo "PKG_CONFIG_PATH=/usr/lib/aarch64-linux-gnu/pkgconfig/" >> "$GITHUB_ENV" @@ -157,7 +157,7 @@ jobs: with: projectPath: ./easytier-gui # https://tauri.app/v1/guides/building/linux/#cross-compiling-tauri-applications-for-arm-based-devices - args: --verbose --target ${{ matrix.GUI_TARGET }} ${{ matrix.OS == 'ubuntu-latest' && contains(matrix.TARGET, 'aarch64') && '--bundles deb' || '' }} + args: --verbose --target ${{ matrix.GUI_TARGET }} ${{ matrix.OS == 'ubuntu-22.04' && contains(matrix.TARGET, 'aarch64') && '--bundles deb' || '' }} - name: Compress run: | diff --git a/.github/workflows/mobile.yml b/.github/workflows/mobile.yml index 72bbda7..bd578da 100644 --- a/.github/workflows/mobile.yml +++ b/.github/workflows/mobile.yml @@ -36,7 +36,7 @@ jobs: matrix: include: - TARGET: android - OS: ubuntu-latest + OS: ubuntu-22.04 ARTIFACT_NAME: android runs-on: ${{ matrix.OS }} env: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7fd602f..0e8e877 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,7 +30,7 @@ jobs: skip_after_successful_duplicate: 'true' paths: '["Cargo.toml", "Cargo.lock", "easytier/**", ".github/workflows/test.yml"]' test: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 needs: pre_job if: needs.pre_job.outputs.should_skip != 'true' steps: diff --git a/Cargo.lock b/Cargo.lock index 0986cad..9ac867e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1564,6 +1564,7 @@ dependencies = [ "dashmap", "defguard_wireguard_rs", "derivative", + "easytier-rpc-build 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "encoding", "futures", "futures-util", @@ -1592,7 +1593,6 @@ dependencies = [ "reqwest 0.11.27", "ring 0.17.8", "ringbuf", - "rpc_build", "rstest", "rust-i18n", "rustls", @@ -1656,6 +1656,31 @@ dependencies = [ "tokio", ] +[[package]] +name = "easytier-rpc-build" +version = "0.1.0" +dependencies = [ + "heck 0.5.0", + "prost-build", +] + +[[package]] +name = "easytier-rpc-build" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24829168c28f6a448f57d18116c255dcbd2b8c25e76dbc60f6cd16d68ad2cf07" +dependencies = [ + "heck 0.5.0", + "prost-build", +] + +[[package]] +name = "easytier-web" +version = "0.1.0" +dependencies = [ + "easytier", +] + [[package]] name = "either" version = "1.13.0" @@ -4904,14 +4929,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "rpc_build" -version = "0.1.0" -dependencies = [ - "heck 0.5.0", - "prost-build", -] - [[package]] name = "rstest" version = "0.18.2" diff --git a/Cargo.toml b/Cargo.toml index 23104ce..61fe0f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["easytier", "easytier-gui/src-tauri"] +members = ["easytier", "easytier-gui/src-tauri", "easytier-rpc-build", "easytier-web"] default-members = ["easytier"] [profile.dev] diff --git a/EasyTier.code-workspace b/EasyTier.code-workspace index 68d0252..ac3cdf3 100644 --- a/EasyTier.code-workspace +++ b/EasyTier.code-workspace @@ -14,6 +14,10 @@ { "name": "vpnservice", "path": "tauri-plugin-vpnservice" + }, + { + "name": "rpc-build", + "path": "easytier-rpc-build" } ], "settings": { diff --git a/easytier-gui/src-tauri/src/lib.rs b/easytier-gui/src-tauri/src/lib.rs index 64156d0..adda0c3 100644 --- a/easytier-gui/src-tauri/src/lib.rs +++ b/easytier-gui/src-tauri/src/lib.rs @@ -16,7 +16,6 @@ use easytier::{ use serde::{Deserialize, Serialize}; use tauri::Manager as _; -use tauri::RunEvent; pub const AUTOSTART_ARG: &str = "--autostart"; @@ -336,7 +335,7 @@ pub fn run() { .plugin(tauri_plugin_shell::init()) .plugin(tauri_plugin_vpnservice::init()); - let mut app = builder + let app = builder .setup(|app| { // for logging config let Ok(log_dir) = app.path().app_log_dir() else { @@ -402,10 +401,13 @@ pub fn run() { app.run(|_app, _event| {}); #[cfg(target_os = "macos")] - app.run(|app, event| match event { - RunEvent::Reopen { .. } => { - toggle_window_visibility(app); - } - _ => {} - }); + { + use tauri::RunEvent; + app.run(|app, event| match event { + RunEvent::Reopen { .. } => { + toggle_window_visibility(app); + } + _ => {} + }); + } } diff --git a/easytier-rpc-build/Cargo.toml b/easytier-rpc-build/Cargo.toml new file mode 100644 index 0000000..b70b6c0 --- /dev/null +++ b/easytier-rpc-build/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "easytier-rpc-build" +description = "Protobuf RPC Service Generator for EasyTier" +version = "0.1.0" +edition = "2021" +homepage = "https://github.com/EasyTier/EasyTier" +repository = "https://github.com/EasyTier/EasyTier" +authors = ["kkrainbow"] +keywords = ["vpn", "p2p", "network", "easytier"] +categories = ["network-programming", "command-line-utilities"] +rust-version = "1.77.0" +license-file = "LICENSE" +readme = "README.md" + +[dependencies] +heck = "0.5" +prost-build = "0.13" + +[features] +default = [] +internal-namespace = [] diff --git a/easytier-rpc-build/LICENSE b/easytier-rpc-build/LICENSE new file mode 120000 index 0000000..ea5b606 --- /dev/null +++ b/easytier-rpc-build/LICENSE @@ -0,0 +1 @@ +../LICENSE \ No newline at end of file diff --git a/easytier-rpc-build/README.md b/easytier-rpc-build/README.md new file mode 100644 index 0000000..66b1cae --- /dev/null +++ b/easytier-rpc-build/README.md @@ -0,0 +1,3 @@ +# Introduction + +This is a protobuf rpc service stub generator for [EasyTier](https://github.com/EasyTier/EasyTier) project. diff --git a/easytier/src/proto/rpc_build/src/lib.rs b/easytier-rpc-build/src/lib.rs similarity index 98% rename from easytier/src/proto/rpc_build/src/lib.rs rename to easytier-rpc-build/src/lib.rs index 1d1bf07..a61ff1b 100644 --- a/easytier/src/proto/rpc_build/src/lib.rs +++ b/easytier-rpc-build/src/lib.rs @@ -3,8 +3,12 @@ extern crate prost_build; use std::fmt; +#[cfg(feature = "internal-namespace")] const NAMESPACE: &str = "crate::proto::rpc_types"; +#[cfg(not(feature = "internal-namespace"))] +const NAMESPACE: &str = "easytier::proto::rpc_types"; + /// The service generator to be used with `prost-build` to generate RPC implementations for /// `prost-simple-rpc`. /// diff --git a/easytier/src/proto/rpc_build/Cargo.toml b/easytier-web/Cargo.toml similarity index 51% rename from easytier/src/proto/rpc_build/Cargo.toml rename to easytier-web/Cargo.toml index 6c3a7b4..57fd26b 100644 --- a/easytier/src/proto/rpc_build/Cargo.toml +++ b/easytier-web/Cargo.toml @@ -1,8 +1,7 @@ [package] -name = "rpc_build" +name = "easytier-web" version = "0.1.0" edition = "2021" [dependencies] -heck = "0.5" -prost-build = "0.13" +easytier = { path = "../easytier" } \ No newline at end of file diff --git a/easytier-web/src/main.rs b/easytier-web/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/easytier-web/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 20a7b7f..74e9538 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -197,7 +197,7 @@ tonic-build = "0.12" globwalk = "0.8.1" regex = "1" prost-build = "0.13.2" -rpc_build = { path = "src/proto/rpc_build" } +rpc_build = { package = "easytier-rpc-build", version = "0.1.0", features = ["internal-namespace"] } [target.'cfg(windows)'.build-dependencies] reqwest = { version = "0.11", features = ["blocking"] } diff --git a/easytier/build.rs b/easytier/build.rs index 0ca1840..53179e5 100644 --- a/easytier/build.rs +++ b/easytier/build.rs @@ -134,6 +134,7 @@ fn main() -> Result<(), Box> { } prost_build::Config::new() + .protoc_arg("--experimental_allow_proto3_optional") .type_attribute(".common", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".error", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".cli", "#[derive(serde::Serialize, serde::Deserialize)]") diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index e834da0..ce61f61 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -164,7 +164,7 @@ pub struct Flags { pub enable_ipv6: bool, #[derivative(Default(value = "1380"))] pub mtu: u16, - #[derivative(Default(value = "true"))] + #[derivative(Default(value = "false"))] pub latency_first: bool, #[derivative(Default(value = "false"))] pub enable_exit_node: bool, @@ -182,6 +182,8 @@ pub struct Flags { pub disable_udp_hole_punching: bool, #[derivative(Default(value = "\"udp://[::]:0\".to_string()"))] pub ipv6_listener: String, + #[derivative(Default(value = "false"))] + pub multi_thread: bool, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -529,7 +531,28 @@ impl ConfigLoader for TomlConfigLoader { } fn dump(&self) -> String { - toml::to_string_pretty(&*self.config.lock().unwrap()).unwrap() + let default_flags_json = serde_json::to_string(&Flags::default()).unwrap(); + let default_flags_hashmap = + serde_json::from_str::>(&default_flags_json) + .unwrap(); + + let cur_flags_json = serde_json::to_string(&self.get_flags()).unwrap(); + let cur_flags_hashmap = + serde_json::from_str::>(&cur_flags_json) + .unwrap(); + + let mut flag_map: serde_json::Map = Default::default(); + for (key, value) in default_flags_hashmap { + if let Some(v) = cur_flags_hashmap.get(&key) { + if *v != value { + flag_map.insert(key, v.clone()); + } + } + } + + let mut config = self.config.lock().unwrap().clone(); + config.flags = Some(flag_map); + toml::to_string_pretty(&config).unwrap() } fn get_routes(&self) -> Option> { diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 3d481b1..0ac732f 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -44,8 +44,8 @@ pub enum GlobalCtxEvent { DhcpIpv4Conflicted(Option), } -type EventBus = tokio::sync::broadcast::Sender; -type EventBusSubscriber = tokio::sync::broadcast::Receiver; +pub type EventBus = tokio::sync::broadcast::Sender; +pub type EventBusSubscriber = tokio::sync::broadcast::Receiver; pub struct GlobalCtx { pub inst_name: String, diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 9f033f4..2b86714 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -19,6 +19,7 @@ mod common; mod connector; mod gateway; mod instance; +mod launcher; mod peer_center; mod peers; mod proto; @@ -29,8 +30,9 @@ mod vpn_portal; use common::{ config::{ConsoleLoggerConfig, FileLoggerConfig, NetworkIdentity, PeerConfig, VpnPortalConfig}, constants::EASYTIER_VERSION, + global_ctx::EventBusSubscriber, + scoped_task::ScopedTask, }; -use instance::instance::Instance; use tokio::net::TcpSocket; use utils::setup_panic_handler; @@ -525,6 +527,7 @@ impl From for TomlConfigLoader { .with_context(|| format!("failed to parse ipv6 listener: {}", ipv6_listener)) .unwrap(); } + f.multi_thread = cli.multi_thread; cfg.set_flags(f); cfg.set_exit_nodes(cli.exit_nodes.clone()); @@ -549,13 +552,7 @@ fn peer_conn_info_to_string(p: crate::proto::cli::PeerConnInfo) -> String { } #[tracing::instrument] -pub async fn async_main(cli: Cli) { - let cfg: TomlConfigLoader = cli.into(); - - init_logger(&cfg, false).unwrap(); - let mut inst = Instance::new(cfg.clone()); - - let mut events = inst.get_global_ctx().subscribe(); +pub fn handle_event(mut events: EventBusSubscriber) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { while let Ok(e) = events.recv().await { match e { @@ -658,39 +655,28 @@ pub async fn async_main(cli: Cli) { } } } - }); - - println!("Starting easytier with config:"); - println!("############### TOML ###############\n"); - println!("{}", cfg.dump()); - println!("-----------------------------------"); - - inst.run().await.unwrap(); - - inst.wait().await; + }) } -fn main() { +#[tokio::main] +async fn main() { setup_panic_handler(); let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US")); rust_i18n::set_locale(&locale); let cli = Cli::parse(); - tracing::info!(cli = ?cli, "cli args parsed"); + let cfg = TomlConfigLoader::from(cli); + init_logger(&cfg, false).unwrap(); - if cli.multi_thread { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .unwrap() - .block_on(async move { async_main(cli).await }) - } else { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async move { async_main(cli).await }) + println!("Starting easytier with config:"); + println!("############### TOML ###############\n"); + println!("{}", cfg.dump()); + println!("-----------------------------------"); + + let mut l = launcher::NetworkInstance::new(cfg).set_fetch_node_info(false); + let _t = ScopedTask::from(handle_event(l.start().unwrap())); + if let Some(e) = l.wait().await { + panic!("launcher error: {:?}", e); } } diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 341548e..4eccd08 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -7,7 +7,7 @@ use crate::{ common::{ config::{ConfigLoader, TomlConfigLoader}, constants::EASYTIER_VERSION, - global_ctx::GlobalCtxEvent, + global_ctx::{EventBusSubscriber, GlobalCtxEvent}, stun::StunInfoCollectorTrait, }, instance::instance::Instance, @@ -21,7 +21,7 @@ use crate::{ }; use chrono::{DateTime, Local}; use serde::{Deserialize, Serialize}; -use tokio::task::JoinSet; +use tokio::{sync::broadcast, task::JoinSet}; #[derive(Default, Clone, Debug, Serialize, Deserialize)] pub struct MyNodeInfo { @@ -34,14 +34,31 @@ pub struct MyNodeInfo { pub vpn_portal_cfg: Option, } -#[derive(Default, Clone)] struct EasyTierData { - events: Arc, GlobalCtxEvent)>>>, - node_info: Arc>, - routes: Arc>>, - peers: Arc>>, + events: RwLock, GlobalCtxEvent)>>, + node_info: RwLock, + routes: RwLock>, + peers: RwLock>, tun_fd: Arc>>, - tun_dev_name: Arc>, + tun_dev_name: RwLock, + event_subscriber: RwLock>, + instance_stop_notifier: Arc, +} + +impl Default for EasyTierData { + fn default() -> Self { + let (tx, _) = broadcast::channel(100); + Self { + event_subscriber: RwLock::new(tx), + events: RwLock::new(VecDeque::new()), + node_info: RwLock::new(MyNodeInfo::default()), + routes: RwLock::new(Vec::new()), + peers: RwLock::new(Vec::new()), + tun_fd: Arc::new(RwLock::new(None)), + tun_dev_name: RwLock::new(String::new()), + instance_stop_notifier: Arc::new(tokio::sync::Notify::new()), + } + } } pub struct EasyTierLauncher { @@ -49,27 +66,30 @@ pub struct EasyTierLauncher { stop_flag: Arc, thread_handle: Option>, running_cfg: String, + fetch_node_info: bool, error_msg: Arc>>, - data: EasyTierData, + data: Arc, } impl EasyTierLauncher { - pub fn new() -> Self { + pub fn new(fetch_node_info: bool) -> Self { let instance_alive = Arc::new(AtomicBool::new(false)); Self { instance_alive, thread_handle: None, error_msg: Arc::new(RwLock::new(None)), running_cfg: String::new(), + fetch_node_info, stop_flag: Arc::new(AtomicBool::new(false)), - data: EasyTierData::default(), + data: Arc::new(EasyTierData::default()), } } - async fn handle_easytier_event(event: GlobalCtxEvent, data: EasyTierData) { + async fn handle_easytier_event(event: GlobalCtxEvent, data: &EasyTierData) { let mut events = data.events.write().unwrap(); + let _ = data.event_subscriber.read().unwrap().send(event.clone()); events.push_back((chrono::Local::now(), event)); if events.len() > 100 { events.pop_front(); @@ -113,7 +133,8 @@ impl EasyTierLauncher { async fn easytier_routine( cfg: TomlConfigLoader, stop_signal: Arc, - data: EasyTierData, + data: Arc, + fetch_node_info: bool, ) -> Result<(), anyhow::Error> { let mut instance = Instance::new(cfg); let peer_mgr = instance.get_peer_manager(); @@ -126,50 +147,53 @@ impl EasyTierLauncher { tasks.spawn(async move { let mut receiver = global_ctx.subscribe(); while let Ok(event) = receiver.recv().await { - Self::handle_easytier_event(event, data_c.clone()).await; + Self::handle_easytier_event(event, &data_c).await; } }); // update my node info - let data_c = data.clone(); - let global_ctx_c = instance.get_global_ctx(); - let peer_mgr_c = peer_mgr.clone(); - let vpn_portal = instance.get_vpn_portal_inst(); - tasks.spawn(async move { - loop { - // Update TUN Device Name - *data_c.tun_dev_name.write().unwrap() = global_ctx_c.get_flags().dev_name.clone(); + if fetch_node_info { + let data_c = data.clone(); + let global_ctx_c = instance.get_global_ctx(); + let peer_mgr_c = peer_mgr.clone(); + let vpn_portal = instance.get_vpn_portal_inst(); + tasks.spawn(async move { + loop { + // Update TUN Device Name + *data_c.tun_dev_name.write().unwrap() = + global_ctx_c.get_flags().dev_name.clone(); - let node_info = MyNodeInfo { - virtual_ipv4: global_ctx_c - .get_ipv4() - .map(|x| x.to_string()) - .unwrap_or_default(), - hostname: global_ctx_c.get_hostname(), - version: EASYTIER_VERSION.to_string(), - ips: global_ctx_c.get_ip_collector().collect_ip_addrs().await, - stun_info: global_ctx_c.get_stun_info_collector().get_stun_info(), - listeners: global_ctx_c - .get_running_listeners() - .iter() - .map(|x| x.to_string()) - .collect(), - vpn_portal_cfg: Some( - vpn_portal - .lock() - .await - .dump_client_config(peer_mgr_c.clone()) - .await, - ), - }; - *data_c.node_info.write().unwrap() = node_info.clone(); - *data_c.routes.write().unwrap() = peer_mgr_c.list_routes().await; - *data_c.peers.write().unwrap() = PeerManagerRpcService::new(peer_mgr_c.clone()) - .list_peers() - .await; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }); + let node_info = MyNodeInfo { + virtual_ipv4: global_ctx_c + .get_ipv4() + .map(|x| x.to_string()) + .unwrap_or_default(), + hostname: global_ctx_c.get_hostname(), + version: EASYTIER_VERSION.to_string(), + ips: global_ctx_c.get_ip_collector().collect_ip_addrs().await, + stun_info: global_ctx_c.get_stun_info_collector().get_stun_info(), + listeners: global_ctx_c + .get_running_listeners() + .iter() + .map(|x| x.to_string()) + .collect(), + vpn_portal_cfg: Some( + vpn_portal + .lock() + .await + .dump_client_config(peer_mgr_c.clone()) + .await, + ), + }; + *data_c.node_info.write().unwrap() = node_info.clone(); + *data_c.routes.write().unwrap() = peer_mgr_c.list_routes().await; + *data_c.peers.write().unwrap() = PeerManagerRpcService::new(peer_mgr_c.clone()) + .list_peers() + .await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }); + } #[cfg(target_os = "android")] Self::run_routine_for_android(&instance, &data, &mut tasks).await; @@ -188,13 +212,15 @@ impl EasyTierLauncher { F: FnOnce() -> Result + Send + Sync, { let error_msg = self.error_msg.clone(); - let cfg = cfg_generator(); - if let Err(e) = cfg { - error_msg.write().unwrap().replace(e.to_string()); - return; - } + let cfg = match cfg_generator() { + Err(e) => { + error_msg.write().unwrap().replace(e.to_string()); + return; + } + Ok(cfg) => cfg, + }; - self.running_cfg = cfg.as_ref().unwrap().dump(); + self.running_cfg = cfg.dump(); let stop_flag = self.stop_flag.clone(); @@ -202,12 +228,21 @@ impl EasyTierLauncher { instance_alive.store(true, std::sync::atomic::Ordering::Relaxed); let data = self.data.clone(); + let fetch_node_info = self.fetch_node_info; self.thread_handle = Some(std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); + let rt = if cfg.get_flags().multi_thread { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + } else { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + } + .unwrap(); + let stop_notifier = Arc::new(tokio::sync::Notify::new()); let stop_notifier_clone = stop_notifier.clone(); @@ -218,15 +253,18 @@ impl EasyTierLauncher { stop_notifier_clone.notify_one(); }); + let notifier = data.instance_stop_notifier.clone(); let ret = rt.block_on(Self::easytier_routine( - cfg.unwrap(), + cfg, stop_notifier.clone(), data, + fetch_node_info, )); if let Err(e) = ret { error_msg.write().unwrap().replace(e.to_string()); } instance_alive.store(false, std::sync::atomic::Ordering::Relaxed); + notifier.notify_one(); })); } @@ -289,6 +327,8 @@ pub struct NetworkInstanceRunningInfo { pub struct NetworkInstance { config: TomlConfigLoader, launcher: Option, + + fetch_node_info: bool, } impl NetworkInstance { @@ -296,9 +336,15 @@ impl NetworkInstance { Self { config, launcher: None, + fetch_node_info: true, } } + pub fn set_fetch_node_info(mut self, fetch_node_info: bool) -> Self { + self.fetch_node_info = fetch_node_info; + self + } + pub fn is_easytier_running(&self) -> bool { self.launcher.is_some() && self.launcher.as_ref().unwrap().running() } @@ -333,15 +379,37 @@ impl NetworkInstance { } } - pub fn start(&mut self) -> Result<(), anyhow::Error> { + pub fn start(&mut self) -> Result { if self.is_easytier_running() { - return Ok(()); + return Ok(self.subscribe_event().unwrap()); } - let mut launcher = EasyTierLauncher::new(); - launcher.start(|| Ok(self.config.clone())); - + let launcher = EasyTierLauncher::new(self.fetch_node_info); self.launcher = Some(launcher); - Ok(()) + let ev = self.subscribe_event().unwrap(); + + self.launcher + .as_mut() + .unwrap() + .start(|| Ok(self.config.clone())); + + Ok(ev) + } + + fn subscribe_event(&self) -> Option> { + if let Some(launcher) = self.launcher.as_ref() { + Some(launcher.data.event_subscriber.read().unwrap().subscribe()) + } else { + None + } + } + + pub async fn wait(&self) -> Option { + if let Some(launcher) = self.launcher.as_ref() { + launcher.data.instance_stop_notifier.notified().await; + launcher.error_msg.read().unwrap().clone() + } else { + None + } } } diff --git a/easytier/src/lib.rs b/easytier/src/lib.rs index 4501ff1..e1e0dac 100644 --- a/easytier/src/lib.rs +++ b/easytier/src/lib.rs @@ -6,11 +6,11 @@ mod gateway; mod instance; mod peer_center; mod peers; -mod proto; mod vpn_portal; pub mod common; pub mod launcher; +pub mod proto; pub mod tunnel; pub mod utils; diff --git a/easytier/src/peers/peer_rpc.rs b/easytier/src/peers/peer_rpc.rs index 4f2fafe..739bdd8 100644 --- a/easytier/src/peers/peer_rpc.rs +++ b/easytier/src/peers/peer_rpc.rs @@ -1,12 +1,12 @@ use std::sync::{Arc, Mutex}; -use futures::StreamExt; +use futures::{SinkExt as _, StreamExt}; use tokio::task::JoinSet; use crate::{ common::{error::Error, PeerId}, - proto::rpc_impl, - tunnel::packet_def::{PacketType, ZCPacket}, + proto::rpc_impl::{self, bidirect::BidirectRpcManager}, + tunnel::packet_def::ZCPacket, }; const RPC_PACKET_CONTENT_MTU: usize = 1300; @@ -25,9 +25,7 @@ pub trait PeerRpcManagerTransport: Send + Sync + 'static { // handle rpc request from one peer pub struct PeerRpcManager { tspt: Arc>, - rpc_client: rpc_impl::client::Client, - rpc_server: rpc_impl::server::Server, - + bidirect_rpc: BidirectRpcManager, tasks: Arc>>, } @@ -43,78 +41,41 @@ impl PeerRpcManager { pub fn new(tspt: impl PeerRpcManagerTransport) -> Self { Self { tspt: Arc::new(Box::new(tspt)), - rpc_client: rpc_impl::client::Client::new(), - rpc_server: rpc_impl::server::Server::new(), + bidirect_rpc: BidirectRpcManager::new(), tasks: Arc::new(Mutex::new(JoinSet::new())), } } pub fn run(&self) { - self.rpc_client.run(); - self.rpc_server.run(); - - let (server_tx, mut server_rx) = ( - self.rpc_server.get_transport_sink(), - self.rpc_server.get_transport_stream(), - ); - let (client_tx, mut client_rx) = ( - self.rpc_client.get_transport_sink(), - self.rpc_client.get_transport_stream(), - ); - + let ret = self.bidirect_rpc.run_and_create_tunnel(); + let (mut rx, mut tx) = ret.split(); let tspt = self.tspt.clone(); - self.tasks.lock().unwrap().spawn(async move { - loop { - let packet = tokio::select! { - Some(Ok(packet)) = server_rx.next() => { - tracing::trace!(?packet, "recv rpc packet from server"); - packet - } - Some(Ok(packet)) = client_rx.next() => { - tracing::trace!(?packet, "recv rpc packet from client"); - packet - } - else => { - tracing::warn!("rpc transport read aborted, exiting"); - break; - } - }; - + while let Some(Ok(packet)) = rx.next().await { let dst_peer_id = packet.peer_manager_header().unwrap().to_peer_id.into(); if let Err(e) = tspt.send(packet, dst_peer_id).await { - tracing::error!(error = ?e, dst_peer_id = ?dst_peer_id, "send to peer failed"); + tracing::error!("send to rpc tspt error: {:?}", e); } } }); let tspt = self.tspt.clone(); self.tasks.lock().unwrap().spawn(async move { - loop { - let Ok(o) = tspt.recv().await else { - tracing::warn!("peer rpc transport read aborted, exiting"); - break; - }; - - if o.peer_manager_header().unwrap().packet_type == PacketType::RpcReq as u8 { - server_tx.send(o).await.unwrap(); - continue; - } else if o.peer_manager_header().unwrap().packet_type == PacketType::RpcResp as u8 - { - client_tx.send(o).await.unwrap(); - continue; + while let Ok(packet) = tspt.recv().await { + if let Err(e) = tx.send(packet).await { + tracing::error!("send to rpc tspt error: {:?}", e); } } }); } pub fn rpc_client(&self) -> &rpc_impl::client::Client { - &self.rpc_client + self.bidirect_rpc.rpc_client() } pub fn rpc_server(&self) -> &rpc_impl::server::Server { - &self.rpc_server + self.bidirect_rpc.rpc_server() } pub fn my_peer_id(&self) -> PeerId { diff --git a/easytier/src/proto/rpc_impl/bidirect.rs b/easytier/src/proto/rpc_impl/bidirect.rs new file mode 100644 index 0000000..28d94b1 --- /dev/null +++ b/easytier/src/proto/rpc_impl/bidirect.rs @@ -0,0 +1,164 @@ +use std::sync::{Arc, Mutex}; + +use futures::{SinkExt as _, StreamExt}; +use tokio::{task::JoinSet, time::timeout}; + +use crate::{ + proto::rpc_types::error::Error, + tunnel::{packet_def::PacketType, ring::create_ring_tunnel_pair, Tunnel}, +}; + +use super::{client::Client, server::Server}; + +pub struct BidirectRpcManager { + rpc_client: Client, + rpc_server: Server, + + rx_timeout: Option, + error: Arc>>, + tunnel: Mutex>>, + + tasks: Mutex>>, +} + +impl BidirectRpcManager { + pub fn new() -> Self { + Self { + rpc_client: Client::new(), + rpc_server: Server::new(), + + rx_timeout: None, + error: Arc::new(Mutex::new(None)), + tunnel: Mutex::new(None), + + tasks: Mutex::new(None), + } + } + + pub fn set_rx_timeout(mut self, timeout: Option) -> Self { + self.rx_timeout = timeout; + self + } + + pub fn run_and_create_tunnel(&self) -> Box { + let (ret, inner) = create_ring_tunnel_pair(); + self.run_with_tunnel(inner); + ret + } + + pub fn run_with_tunnel(&self, inner: Box) { + let mut tasks = JoinSet::new(); + self.rpc_client.run(); + self.rpc_server.run(); + + let (server_tx, mut server_rx) = ( + self.rpc_server.get_transport_sink(), + self.rpc_server.get_transport_stream(), + ); + let (client_tx, mut client_rx) = ( + self.rpc_client.get_transport_sink(), + self.rpc_client.get_transport_stream(), + ); + + let (mut inner_rx, mut inner_tx) = inner.split(); + self.tunnel.lock().unwrap().replace(inner); + + let e_clone = self.error.clone(); + tasks.spawn(async move { + loop { + let packet = tokio::select! { + Some(Ok(packet)) = server_rx.next() => { + tracing::trace!(?packet, "recv rpc packet from server"); + packet + } + Some(Ok(packet)) = client_rx.next() => { + tracing::trace!(?packet, "recv rpc packet from client"); + packet + } + else => { + tracing::warn!("rpc transport read aborted, exiting"); + break; + } + }; + + if let Err(e) = inner_tx.send(packet).await { + tracing::error!(error = ?e, "send to peer failed"); + e_clone.lock().unwrap().replace(Error::from(e)); + } + } + }); + + let recv_timeout = self.rx_timeout; + let e_clone = self.error.clone(); + tasks.spawn(async move { + loop { + let ret = if let Some(recv_timeout) = recv_timeout { + match timeout(recv_timeout, inner_rx.next()).await { + Ok(ret) => ret, + Err(e) => { + e_clone.lock().unwrap().replace(e.into()); + break; + } + } + } else { + inner_rx.next().await + }; + + let o = match ret { + Some(Ok(o)) => o, + Some(Err(e)) => { + tracing::error!(error = ?e, "recv from peer failed"); + e_clone.lock().unwrap().replace(Error::from(e)); + break; + } + None => { + tracing::warn!("peer rpc transport read aborted, exiting"); + e_clone.lock().unwrap().replace(Error::Shutdown); + break; + } + }; + + if o.peer_manager_header().unwrap().packet_type == PacketType::RpcReq as u8 { + server_tx.send(o).await.unwrap(); + continue; + } else if o.peer_manager_header().unwrap().packet_type == PacketType::RpcResp as u8 + { + client_tx.send(o).await.unwrap(); + continue; + } + } + }); + + self.tasks.lock().unwrap().replace(tasks); + } + + pub fn rpc_client(&self) -> &Client { + &self.rpc_client + } + + pub fn rpc_server(&self) -> &Server { + &self.rpc_server + } + + pub async fn stop(&self) { + let Some(mut tasks) = self.tasks.lock().unwrap().take() else { + return; + }; + tasks.abort_all(); + while let Some(_) = tasks.join_next().await {} + } + + pub fn take_error(&self) -> Option { + self.error.lock().unwrap().take() + } + + pub async fn wait(&self) { + let Some(mut tasks) = self.tasks.lock().unwrap().take() else { + return; + }; + while let Some(_) = tasks.join_next().await { + // when any task is done, abort all tasks + tasks.abort_all(); + } + } +} diff --git a/easytier/src/proto/rpc_impl/mod.rs b/easytier/src/proto/rpc_impl/mod.rs index 5cdccd5..8d7da71 100644 --- a/easytier/src/proto/rpc_impl/mod.rs +++ b/easytier/src/proto/rpc_impl/mod.rs @@ -2,6 +2,7 @@ use crate::tunnel::{mpsc::MpscTunnel, Tunnel}; pub type RpcController = super::rpc_types::controller::BaseController; +pub mod bidirect; pub mod client; pub mod packet; pub mod server; diff --git a/easytier/src/proto/rpc_impl/service_registry.rs b/easytier/src/proto/rpc_impl/service_registry.rs index 4d5e200..1ca440d 100644 --- a/easytier/src/proto/rpc_impl/service_registry.rs +++ b/easytier/src/proto/rpc_impl/service_registry.rs @@ -59,6 +59,14 @@ impl ServiceRegistry { } } + pub fn replace_registry(&self, registry: &ServiceRegistry) { + self.table.clear(); + for item in registry.table.iter() { + let (k, v) = item.pair(); + self.table.insert(k.clone(), v.clone()); + } + } + pub fn register>(&self, h: H, domain_name: &str) { let desc = h.service_descriptor(); let key = ServiceKey { diff --git a/easytier/src/proto/rpc_impl/standalone.rs b/easytier/src/proto/rpc_impl/standalone.rs index 76f8829..e191a3c 100644 --- a/easytier/src/proto/rpc_impl/standalone.rs +++ b/easytier/src/proto/rpc_impl/standalone.rs @@ -4,66 +4,18 @@ use std::{ }; use anyhow::Context as _; -use futures::{SinkExt as _, StreamExt}; use tokio::task::JoinSet; use crate::{ common::join_joinset_background, - proto::rpc_types::{__rt::RpcClientFactory, error::Error}, + proto::{ + rpc_impl::bidirect::BidirectRpcManager, + rpc_types::{__rt::RpcClientFactory, error::Error}, + }, tunnel::{Tunnel, TunnelConnector, TunnelListener}, }; -use super::{client::Client, server::Server, service_registry::ServiceRegistry}; - -struct StandAloneServerOneTunnel { - tunnel: Box, - rpc_server: Server, -} - -impl StandAloneServerOneTunnel { - pub fn new(tunnel: Box, registry: Arc) -> Self { - let rpc_server = Server::new_with_registry(registry); - StandAloneServerOneTunnel { tunnel, rpc_server } - } - - pub async fn run(self) { - use tokio_stream::StreamExt as _; - - let (tunnel_rx, tunnel_tx) = self.tunnel.split(); - let (rpc_rx, rpc_tx) = ( - self.rpc_server.get_transport_stream(), - self.rpc_server.get_transport_sink(), - ); - - let mut tasks = JoinSet::new(); - - tasks.spawn(async move { - let ret = tunnel_rx.timeout(Duration::from_secs(60)); - tokio::pin!(ret); - while let Ok(Some(Ok(p))) = ret.try_next().await { - if let Err(e) = rpc_tx.send(p).await { - tracing::error!("tunnel_rx send to rpc_tx error: {:?}", e); - break; - } - } - tracing::info!("forward tunnel_rx to rpc_tx done"); - }); - - tasks.spawn(async move { - let ret = rpc_rx.forward(tunnel_tx).await; - tracing::info!("rpc_rx forward tunnel_tx done: {:?}", ret); - }); - - self.rpc_server.run(); - - while let Some(ret) = tasks.join_next().await { - self.rpc_server.close(); - tracing::info!("task done: {:?}", ret); - } - - tracing::info!("all tasks done"); - } -} +use super::service_registry::ServiceRegistry; pub struct StandAloneServer { registry: Arc, @@ -102,11 +54,15 @@ impl StandAloneServer { self.tasks.lock().unwrap().spawn(async move { while let Ok(tunnel) = listener.accept().await { - let server = StandAloneServerOneTunnel::new(tunnel, registry.clone()); + let registry = registry.clone(); let inflight_server = inflight_server.clone(); inflight_server.fetch_add(1, std::sync::atomic::Ordering::Relaxed); tasks.lock().unwrap().spawn(async move { - server.run().await; + let server = + BidirectRpcManager::new().set_rx_timeout(Some(Duration::from_secs(60))); + server.rpc_server().registry().replace_registry(®istry); + server.run_with_tunnel(tunnel); + server.wait().await; inflight_server.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); }); } @@ -122,86 +78,9 @@ impl StandAloneServer { } } -struct StandAloneClientOneTunnel { - rpc_client: Client, - tasks: Arc>>, - error: Arc>>, -} - -impl StandAloneClientOneTunnel { - pub fn new(tunnel: Box) -> Self { - let rpc_client = Client::new(); - let (mut rpc_rx, rpc_tx) = ( - rpc_client.get_transport_stream(), - rpc_client.get_transport_sink(), - ); - let tasks = Arc::new(Mutex::new(JoinSet::new())); - - let (mut tunnel_rx, mut tunnel_tx) = tunnel.split(); - - let error_store = Arc::new(Mutex::new(None)); - - let error = error_store.clone(); - tasks.lock().unwrap().spawn(async move { - while let Some(p) = rpc_rx.next().await { - match p { - Ok(p) => { - if let Err(e) = tunnel_tx - .send(p) - .await - .with_context(|| "failed to send packet") - { - *error.lock().unwrap() = Some(e.into()); - } - } - Err(e) => { - *error.lock().unwrap() = Some(anyhow::Error::from(e).into()); - } - } - } - - *error.lock().unwrap() = Some(anyhow::anyhow!("rpc_rx next exit").into()); - }); - - let error = error_store.clone(); - tasks.lock().unwrap().spawn(async move { - while let Some(p) = tunnel_rx.next().await { - match p { - Ok(p) => { - if let Err(e) = rpc_tx - .send(p) - .await - .with_context(|| "failed to send packet") - { - *error.lock().unwrap() = Some(e.into()); - } - } - Err(e) => { - *error.lock().unwrap() = Some(anyhow::Error::from(e).into()); - } - } - } - - *error.lock().unwrap() = Some(anyhow::anyhow!("tunnel_rx next exit").into()); - }); - - rpc_client.run(); - - StandAloneClientOneTunnel { - rpc_client, - tasks, - error: error_store, - } - } - - pub fn take_error(&self) -> Option { - self.error.lock().unwrap().take() - } -} - pub struct StandAloneClient { connector: C, - client: Option, + client: Option, } impl StandAloneClient { @@ -230,7 +109,9 @@ impl StandAloneClient { if c.is_none() || error.is_some() { tracing::info!("reconnect due to error: {:?}", error); let tunnel = self.connect().await?; - c = Some(StandAloneClientOneTunnel::new(tunnel)); + let mgr = BidirectRpcManager::new().set_rx_timeout(Some(Duration::from_secs(60))); + mgr.run_with_tunnel(tunnel); + c = Some(mgr); } self.client = c; @@ -239,7 +120,7 @@ impl StandAloneClient { .client .as_ref() .unwrap() - .rpc_client + .rpc_client() .scoped_client::(1, 1, domain_name)) } } diff --git a/easytier/src/proto/rpc_types/error.rs b/easytier/src/proto/rpc_types/error.rs index 1def040..ba86d75 100644 --- a/easytier/src/proto/rpc_types/error.rs +++ b/easytier/src/proto/rpc_types/error.rs @@ -29,6 +29,9 @@ pub enum Error { #[error("Tunnel error: {0}")] TunnelError(#[from] crate::tunnel::TunnelError), + + #[error("Shutdown")] + Shutdown, } pub type Result = result::Result; diff --git a/easytier/src/proto/tests.rs b/easytier/src/proto/tests.rs index 4f62de6..546f1c4 100644 --- a/easytier/src/proto/tests.rs +++ b/easytier/src/proto/tests.rs @@ -300,3 +300,75 @@ async fn standalone_rpc_test() { tokio::time::sleep(std::time::Duration::from_secs(1)).await; assert_eq!(0, server.inflight_server()); } + +#[tokio::test] +async fn test_bidirect_rpc_manager() { + use crate::common::scoped_task::ScopedTask; + use crate::proto::rpc_impl::bidirect::BidirectRpcManager; + use crate::tunnel::tcp::{TcpTunnelConnector, TcpTunnelListener}; + use crate::tunnel::{TunnelConnector, TunnelListener}; + + let c = BidirectRpcManager::new(); + let s = BidirectRpcManager::new(); + + let service = GreetingServer::new(GreetingService { + delay_ms: 0, + prefix: "Hello Client".to_string(), + }); + c.rpc_server().registry().register(service, "test"); + + let service = GreetingServer::new(GreetingService { + delay_ms: 0, + prefix: "Hello Server".to_string(), + }); + s.rpc_server().registry().register(service, "test"); + + let mut tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:55443".parse().unwrap()); + let s_task: ScopedTask<()> = tokio::spawn(async move { + tcp_listener.listen().await.unwrap(); + let tunnel = tcp_listener.accept().await.unwrap(); + s.run_with_tunnel(tunnel); + + let s_c = s + .rpc_client() + .scoped_client::>(1, 1, "test".to_string()); + let ret = s_c + .say_hello( + RpcController::default(), + SayHelloRequest { + name: "world".to_string(), + }, + ) + .await + .unwrap(); + assert_eq!(ret.greeting, "Hello Client world!"); + println!("server done, {:?}", ret); + + s.wait().await; + }) + .into(); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let mut tcp_connector = TcpTunnelConnector::new("tcp://0.0.0.0:55443".parse().unwrap()); + let c_tunnel = tcp_connector.connect().await.unwrap(); + c.run_with_tunnel(c_tunnel); + + let c_c = c + .rpc_client() + .scoped_client::>(1, 1, "test".to_string()); + let ret = c_c + .say_hello( + RpcController::default(), + SayHelloRequest { + name: "world".to_string(), + }, + ) + .await + .unwrap(); + assert_eq!(ret.greeting, "Hello Server world!"); + println!("client done, {:?}", ret); + + drop(c); + s_task.await.unwrap(); +} diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index c5f1866..e2450fe 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -789,7 +789,7 @@ pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) { .await .unwrap(); - assert_eq!(1, conns.len()); + assert!(conns.len() >= 1); wait_for_condition( || async { ping_test("net_b", "10.144.145.2", None).await },