From a78b7597416f1a12c27eb94d9f1f4f6b92425e59 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 26 Oct 2024 00:04:22 +0800 Subject: [PATCH] feat/web (Patchset 2) (#444) This patch implement a restful server without any auth. usage: ```bash # run easytier-web, which acts as an gateway and registry for all easytier-core $> easytier-web # run easytier-core and connect to easytier-web with a token $> easytier-core --config-server udp://127.0.0.1:22020/fdsafdsa # use restful api to list session $> curl -H "Content-Type: application/json" -X GET 127.0.0.1:11211/api/v1/sessions [{"token":"fdsafdsa","client_url":"udp://127.0.0.1:48915","machine_id":"de3f5b8f-0f2f-d9d0-fb30-a2ac8951d92f"}]% # use restful api to run a network instance $> curl -H "Content-Type: application/json" -X POST 127.0.0.1:11211/api/v1/network/de3f5b8f-0f2f-d9d0-fb30-a2ac8951d92f -d '{"config": "listeners = [\"udp://0.0.0.0:12344\"]"}' # use restful api to get network instance info $> curl -H "Content-Type: application/json" -X GET 127.0.0.1:11211/api/v1/network/de3f5b8f-0f2f-d9d0-fb30-a2ac8951d92f/65437e50-b286-4098-a624-74429f2cb839 ``` --- .github/workflows/core.yml | 5 +- Cargo.lock | 128 ++++++++- Cargo.toml | 2 +- easytier-gui/src/types/network.ts | 2 +- easytier-web/Cargo.toml | 23 +- easytier-web/src/client_manager/mod.rs | 134 ++++++++++ easytier-web/src/client_manager/session.rs | 144 ++++++++++ easytier-web/src/client_manager/storage.rs | 72 +++++ easytier-web/src/main.rs | 23 +- easytier-web/src/restful/mod.rs | 246 ++++++++++++++++++ easytier/Cargo.toml | 4 + easytier/build.rs | 2 + easytier/src/common/config.rs | 8 +- easytier/src/common/mod.rs | 30 +++ .../connector/udp_hole_punch/sym_to_cone.rs | 2 +- easytier/src/easytier-cli.rs | 109 ++++---- easytier/src/easytier-core.rs | 50 +++- easytier/src/launcher.rs | 55 ++-- easytier/src/lib.rs | 1 + easytier/src/peers/foreign_network_manager.rs | 9 +- easytier/src/proto/cli.proto | 5 + easytier/src/proto/cli.rs | 112 ++++++++ easytier/src/proto/error.proto | 2 +- easytier/src/proto/error.rs | 34 +-- easytier/src/proto/mod.rs | 1 + easytier/src/proto/rpc_impl/bidirect.rs | 19 +- easytier/src/proto/tests.rs | 6 + easytier/src/proto/web.proto | 100 +++++++ easytier/src/proto/web.rs | 1 + easytier/src/utils.rs | 128 +-------- easytier/src/web_client/controller.rs | 171 ++++++++++++ easytier/src/web_client/mod.rs | 48 ++++ easytier/src/web_client/session.rs | 126 +++++++++ 33 files changed, 1539 insertions(+), 263 deletions(-) create mode 100644 easytier-web/src/client_manager/mod.rs create mode 100644 easytier-web/src/client_manager/session.rs create mode 100644 easytier-web/src/client_manager/storage.rs create mode 100644 easytier-web/src/restful/mod.rs create mode 100644 easytier/src/proto/web.proto create mode 100644 easytier/src/proto/web.rs create mode 100644 easytier/src/web_client/controller.rs create mode 100644 easytier/src/web_client/mod.rs create mode 100644 easytier/src/web_client/session.rs diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index caeb123..846cc1d 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -111,7 +111,7 @@ jobs: run: | bash ./.github/workflows/install_rust.sh if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then - cargo +nightly build -r --verbose --target $TARGET -Z build-std=std,panic_abort --no-default-features --features mips + cargo +nightly build -r --verbose --target $TARGET -Z build-std=std,panic_abort --no-default-features --features mips --package=easytier else cargo build --release --verbose --target $TARGET fi @@ -182,6 +182,9 @@ jobs: mv ./target/$TARGET/release/easytier-core"$SUFFIX" ./artifacts/objects/ mv ./target/$TARGET/release/easytier-cli"$SUFFIX" ./artifacts/objects/ + if [[ ! $TARGET =~ ^mips.*$ ]]; then + mv ./target/$TARGET/release/easytier-web"$SUFFIX" ./artifacts/objects/ + fi mv ./artifacts/objects/* ./artifacts/ rm -rf ./artifacts/objects/ diff --git a/Cargo.lock b/Cargo.lock index 9ac867e..9cfb919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,6 +428,73 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core", + "axum-macros", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "itoa 1.0.11", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower 0.5.1", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -1312,9 +1379,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1574,6 +1641,7 @@ dependencies = [ "http 1.1.0", "humansize", "indexmap 1.9.3", + "machine-uid", "mimalloc-rust", "network-interface", "nix 0.27.1", @@ -1678,7 +1746,18 @@ dependencies = [ name = "easytier-web" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", + "axum", + "clap", + "dashmap", "easytier", + "serde", + "thiserror", + "tokio", + "tracing", + "url", + "uuid", ] [[package]] @@ -2733,6 +2812,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa 1.0.11", "pin-project-lite", "smallvec", @@ -2768,7 +2848,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -3258,6 +3338,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" +[[package]] +name = "machine-uid" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4506fa0abb0a2ea93f5862f55973da0a662d2ad0e98f337a1c5aac657f0892" +dependencies = [ + "libc", + "winreg 0.52.0", +] + [[package]] name = "malloc_buf" version = "0.0.6" @@ -3302,6 +3392,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md5" version = "0.7.0" @@ -5326,6 +5422,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa 1.0.11", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.19" @@ -6583,6 +6689,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/Cargo.toml b/Cargo.toml index 61fe0f0..33d740e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" members = ["easytier", "easytier-gui/src-tauri", "easytier-rpc-build", "easytier-web"] -default-members = ["easytier"] +default-members = ["easytier", "easytier-web"] [profile.dev] panic = "unwind" diff --git a/easytier-gui/src/types/network.ts b/easytier-gui/src/types/network.ts index 0c7f6e1..91fe982 100644 --- a/easytier-gui/src/types/network.ts +++ b/easytier-gui/src/types/network.ts @@ -11,7 +11,7 @@ export interface NetworkConfig { dhcp: boolean virtual_ipv4: string - network_length: number, + network_length: number hostname?: string network_name: string network_secret: string diff --git a/easytier-web/Cargo.toml b/easytier-web/Cargo.toml index 57fd26b..c876d9d 100644 --- a/easytier-web/Cargo.toml +++ b/easytier-web/Cargo.toml @@ -4,4 +4,25 @@ version = "0.1.0" edition = "2021" [dependencies] -easytier = { path = "../easytier" } \ No newline at end of file +easytier = { path = "../easytier" } +tracing = { version = "0.1", features = ["log"] } +anyhow = { version = "1.0" } +thiserror = "1.0" +tokio = { version = "1", features = ["full"] } +dashmap = "6.1" +url = "2.2" +async-trait = "0.1" +axum = { version = "0.7", features = ["macros"] } +clap = { version = "4.4.8", features = [ + "string", + "unicode", + "derive", + "wrap_help", +] } +serde = { version = "1.0", features = ["derive"] } +uuid = { version = "1.5.0", features = [ + "v4", + "fast-rng", + "macro-diagnostics", + "serde", +] } diff --git a/easytier-web/src/client_manager/mod.rs b/easytier-web/src/client_manager/mod.rs new file mode 100644 index 0000000..7d4a81f --- /dev/null +++ b/easytier-web/src/client_manager/mod.rs @@ -0,0 +1,134 @@ +pub mod session; +pub mod storage; + +use std::sync::Arc; + +use dashmap::DashMap; +use easytier::{common::scoped_task::ScopedTask, tunnel::TunnelListener}; +use session::Session; +use storage::{Storage, StorageToken}; + +#[derive(Debug)] +pub struct ClientManager { + accept_task: Option>, + clear_task: Option>, + + client_sessions: Arc>>, + storage: Storage, +} + +impl ClientManager { + pub fn new() -> Self { + ClientManager { + accept_task: None, + clear_task: None, + + client_sessions: Arc::new(DashMap::new()), + storage: Storage::new(), + } + } + + pub async fn serve( + &mut self, + mut listener: L, + ) -> Result<(), anyhow::Error> { + listener.listen().await?; + + let sessions = self.client_sessions.clone(); + let storage = self.storage.weak_ref(); + let task = tokio::spawn(async move { + while let Ok(tunnel) = listener.accept().await { + let info = tunnel.info().unwrap(); + let client_url: url::Url = info.remote_addr.unwrap().into(); + println!("New session from {:?}", tunnel.info()); + let session = Session::new(tunnel, storage.clone(), client_url.clone()); + sessions.insert(client_url, Arc::new(session)); + } + }); + + self.accept_task = Some(ScopedTask::from(task)); + + let sessions = self.client_sessions.clone(); + let task = tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(15)).await; + sessions.retain(|_, session| session.is_running()); + } + }); + self.clear_task = Some(ScopedTask::from(task)); + + Ok(()) + } + + pub fn is_running(&self) -> bool { + self.accept_task.is_some() && self.clear_task.is_some() + } + + pub async fn list_sessions(&self) -> Vec { + let sessions = self + .client_sessions + .iter() + .map(|item| item.value().clone()) + .collect::>(); + + let mut ret: Vec = vec![]; + for s in sessions { + if let Some(t) = s.get_token().await { + ret.push(t); + } + } + + ret + } + + pub fn get_session_by_machine_id(&self, machine_id: &uuid::Uuid) -> Option> { + let c_url = self.storage.get_client_url_by_machine_id(machine_id)?; + self.client_sessions + .get(&c_url) + .map(|item| item.value().clone()) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use easytier::{ + tunnel::{ + common::tests::wait_for_condition, + udp::{UdpTunnelConnector, UdpTunnelListener}, + }, + web_client::WebClient, + }; + + use crate::client_manager::ClientManager; + + #[tokio::test] + async fn test_client() { + let listener = UdpTunnelListener::new("udp://0.0.0.0:54333".parse().unwrap()); + let mut mgr = ClientManager::new(); + mgr.serve(Box::new(listener)).await.unwrap(); + + let connector = UdpTunnelConnector::new("udp://127.0.0.1:54333".parse().unwrap()); + let _c = WebClient::new(connector, "test"); + + wait_for_condition( + || async { mgr.client_sessions.len() == 1 }, + Duration::from_secs(6), + ) + .await; + + let mut a = mgr + .client_sessions + .iter() + .next() + .unwrap() + .data() + .read() + .await + .heartbeat_waiter(); + let req = a.recv().await.unwrap(); + println!("{:?}", req); + println!("{:?}", mgr); + } +} diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs new file mode 100644 index 0000000..506974d --- /dev/null +++ b/easytier-web/src/client_manager/session.rs @@ -0,0 +1,144 @@ +use std::{fmt::Debug, sync::Arc}; + +use easytier::{ + proto::{ + rpc_impl::bidirect::BidirectRpcManager, + rpc_types::{self, controller::BaseController}, + web::{ + HeartbeatRequest, HeartbeatResponse, WebClientService, WebClientServiceClientFactory, + WebServerService, WebServerServiceServer, + }, + }, + tunnel::Tunnel, +}; +use tokio::sync::{broadcast, RwLock}; + +use super::storage::{Storage, StorageToken, WeakRefStorage}; + +#[derive(Debug)] +pub struct SessionData { + storage: WeakRefStorage, + client_url: url::Url, + + storage_token: Option, + notifier: broadcast::Sender, + req: Option, +} + +impl SessionData { + fn new(storage: WeakRefStorage, client_url: url::Url) -> Self { + let (tx, _rx1) = broadcast::channel(2); + + SessionData { + storage, + client_url, + storage_token: None, + notifier: tx, + req: None, + } + } + + pub fn req(&self) -> Option { + self.req.clone() + } + + pub fn heartbeat_waiter(&self) -> broadcast::Receiver { + self.notifier.subscribe() + } +} + +impl Drop for SessionData { + fn drop(&mut self) { + if let Ok(storage) = Storage::try_from(self.storage.clone()) { + if let Some(token) = self.storage_token.as_ref() { + storage.remove_client(token); + } + } + } +} + +pub type SharedSessionData = Arc>; + +#[derive(Clone)] +struct SessionRpcService { + data: SharedSessionData, +} + +#[async_trait::async_trait] +impl WebServerService for SessionRpcService { + type Controller = BaseController; + + async fn heartbeat( + &self, + _: BaseController, + req: HeartbeatRequest, + ) -> rpc_types::error::Result { + let mut data = self.data.write().await; + if data.req.replace(req.clone()).is_none() { + assert!(data.storage_token.is_none()); + data.storage_token = Some(StorageToken { + token: req.user_token.clone().into(), + client_url: data.client_url.clone(), + machine_id: req + .machine_id + .clone() + .map(Into::into) + .unwrap_or(uuid::Uuid::new_v4()), + }); + if let Ok(storage) = Storage::try_from(data.storage.clone()) { + storage.add_client(data.storage_token.as_ref().unwrap().clone()); + } + } + let _ = data.notifier.send(req); + Ok(HeartbeatResponse {}) + } +} + +pub struct Session { + rpc_mgr: BidirectRpcManager, + + data: SharedSessionData, +} + +impl Debug for Session { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Session").field("data", &self.data).finish() + } +} + +impl Session { + pub fn new(tunnel: Box, storage: WeakRefStorage, client_url: url::Url) -> Self { + let rpc_mgr = + BidirectRpcManager::new().set_rx_timeout(Some(std::time::Duration::from_secs(30))); + rpc_mgr.run_with_tunnel(tunnel); + + let data = Arc::new(RwLock::new(SessionData::new(storage, client_url))); + + rpc_mgr.rpc_server().registry().register( + WebServerServiceServer::new(SessionRpcService { data: data.clone() }), + "", + ); + + Session { rpc_mgr, data } + } + + pub fn is_running(&self) -> bool { + self.rpc_mgr.is_running() + } + + pub fn data(&self) -> SharedSessionData { + self.data.clone() + } + + pub fn scoped_rpc_client( + &self, + ) -> Box + Send> { + self.rpc_mgr + .rpc_client() + .scoped_client::>(1, 1, "".to_string()) + } + + pub async fn get_token(&self) -> Option { + self.data.read().await.storage_token.clone() + } +} diff --git a/easytier-web/src/client_manager/storage.rs b/easytier-web/src/client_manager/storage.rs new file mode 100644 index 0000000..79dac9a --- /dev/null +++ b/easytier-web/src/client_manager/storage.rs @@ -0,0 +1,72 @@ +use std::sync::{Arc, Weak}; + +use dashmap::{DashMap, DashSet}; + +// use this to maintain Storage +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct StorageToken { + pub token: String, + pub client_url: url::Url, + pub machine_id: uuid::Uuid, +} + +#[derive(Debug)] +pub struct StorageInner { + // some map for indexing + pub token_clients_map: DashMap>, + pub machine_client_url_map: DashMap, +} + +#[derive(Debug, Clone)] +pub struct Storage(Arc); +pub type WeakRefStorage = Weak; + +impl TryFrom for Storage { + type Error = (); + + fn try_from(weak: Weak) -> Result { + weak.upgrade().map(|inner| Storage(inner)).ok_or(()) + } +} + +impl Storage { + pub fn new() -> Self { + Storage(Arc::new(StorageInner { + token_clients_map: DashMap::new(), + machine_client_url_map: DashMap::new(), + })) + } + + pub fn add_client(&self, stoken: StorageToken) { + let inner = self + .0 + .token_clients_map + .entry(stoken.token) + .or_insert_with(DashSet::new); + inner.insert(stoken.client_url.clone()); + + self.0 + .machine_client_url_map + .insert(stoken.machine_id, stoken.client_url.clone()); + } + + pub fn remove_client(&self, stoken: &StorageToken) { + self.0.token_clients_map.remove_if(&stoken.token, |_, set| { + set.remove(&stoken.client_url); + set.is_empty() + }); + + self.0.machine_client_url_map.remove(&stoken.machine_id); + } + + pub fn weak_ref(&self) -> WeakRefStorage { + Arc::downgrade(&self.0) + } + + pub fn get_client_url_by_machine_id(&self, machine_id: &uuid::Uuid) -> Option { + self.0 + .machine_client_url_map + .get(&machine_id) + .map(|url| url.clone()) + } +} diff --git a/easytier-web/src/main.rs b/easytier-web/src/main.rs index e7a11a9..2df5702 100644 --- a/easytier-web/src/main.rs +++ b/easytier-web/src/main.rs @@ -1,3 +1,22 @@ -fn main() { - println!("Hello, world!"); +#![allow(dead_code)] + +use std::sync::Arc; + +use easytier::tunnel::udp::UdpTunnelListener; + +mod client_manager; +mod restful; + +#[tokio::main] +async fn main() { + let listener = UdpTunnelListener::new("udp://0.0.0.0:22020".parse().unwrap()); + let mut mgr = client_manager::ClientManager::new(); + mgr.serve(listener).await.unwrap(); + let mgr = Arc::new(mgr); + + let mut restful_server = + restful::RestfulServer::new("0.0.0.0:11211".parse().unwrap(), mgr.clone()); + restful_server.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap(); } diff --git a/easytier-web/src/restful/mod.rs b/easytier-web/src/restful/mod.rs new file mode 100644 index 0000000..fe5c06e --- /dev/null +++ b/easytier-web/src/restful/mod.rs @@ -0,0 +1,246 @@ +use std::vec; +use std::{net::SocketAddr, sync::Arc}; + +use axum::extract::{Path, Query}; +use axum::http::StatusCode; +use axum::routing::post; +use axum::{extract::State, routing::get, Json, Router}; +use easytier::proto::{self, rpc_types, web::*}; +use easytier::{common::scoped_task::ScopedTask, proto::rpc_types::controller::BaseController}; +use tokio::net::TcpListener; + +use crate::client_manager::session::Session; +use crate::client_manager::storage::StorageToken; +use crate::client_manager::ClientManager; + +pub struct RestfulServer { + bind_addr: SocketAddr, + client_mgr: Arc, + + serve_task: Option>, +} + +type AppStateInner = Arc; +type AppState = State; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ListSessionJsonResp(Vec); + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ValidateConfigJsonReq { + config: String, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct RunNetworkJsonReq { + config: String, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ColletNetworkInfoJsonReq { + inst_ids: Option>, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct RemoveNetworkJsonReq { + inst_ids: Vec, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ListNetworkInstanceIdsJsonResp(Vec); + +type Error = proto::error::Error; +type ErrorKind = proto::error::error::ErrorKind; +type RpcError = rpc_types::error::Error; +type HttpHandleError = (StatusCode, Json); + +fn convert_rpc_error(e: RpcError) -> (StatusCode, Json) { + let status_code = match &e { + RpcError::ExecutionError(_) => StatusCode::BAD_REQUEST, + RpcError::Timeout(_) => StatusCode::GATEWAY_TIMEOUT, + _ => StatusCode::BAD_GATEWAY, + }; + let error = Error::from(&e); + (status_code, Json(error)) +} + +impl RestfulServer { + pub fn new(bind_addr: SocketAddr, client_mgr: Arc) -> Self { + assert!(client_mgr.is_running()); + RestfulServer { + bind_addr, + client_mgr, + serve_task: None, + } + } + + async fn get_session_by_machine_id( + client_mgr: &ClientManager, + machine_id: &uuid::Uuid, + ) -> Result, HttpHandleError> { + let Some(result) = client_mgr.get_session_by_machine_id(machine_id) else { + return Err(( + StatusCode::NOT_FOUND, + Error { + error_kind: Some(ErrorKind::OtherError(proto::error::OtherError { + error_message: "No such session".to_string(), + })), + } + .into(), + )); + }; + + Ok(result) + } + + async fn handle_list_all_sessions( + State(client_mgr): AppState, + ) -> Result, HttpHandleError> { + let ret = client_mgr.list_sessions().await; + Ok(ListSessionJsonResp(ret).into()) + } + + async fn handle_validate_config( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(payload): Json, + ) -> Result<(), HttpHandleError> { + let config = payload.config; + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + c.validate_config(BaseController::default(), ValidateConfigRequest { config }) + .await + .map_err(convert_rpc_error)?; + Ok(()) + } + + async fn handle_run_network_instance( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(payload): Json, + ) -> Result<(), HttpHandleError> { + let config = payload.config; + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + c.run_network_instance( + BaseController::default(), + RunNetworkInstanceRequest { config }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(()) + } + + async fn handle_collect_one_network_info( + State(client_mgr): AppState, + Path((machine_id, inst_id)): Path<(uuid::Uuid, uuid::Uuid)>, + ) -> Result, HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + let ret = c + .collect_network_info( + BaseController::default(), + CollectNetworkInfoRequest { + inst_ids: vec![inst_id.into()], + }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(ret.into()) + } + + async fn handle_collect_network_info( + State(client_mgr): AppState, + Path(machine_id): Path, + Query(payload): Query, + ) -> Result, HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + let ret = c + .collect_network_info( + BaseController::default(), + CollectNetworkInfoRequest { + inst_ids: payload + .inst_ids + .unwrap_or_default() + .into_iter() + .map(Into::into) + .collect(), + }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(ret.into()) + } + + async fn handle_list_network_instance_ids( + State(client_mgr): AppState, + Path(machine_id): Path, + ) -> Result, HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + let ret = c + .list_network_instance(BaseController::default(), ListNetworkInstanceRequest {}) + .await + .map_err(convert_rpc_error)?; + Ok( + ListNetworkInstanceIdsJsonResp(ret.inst_ids.into_iter().map(Into::into).collect()) + .into(), + ) + } + + async fn handle_remove_network_instance( + State(client_mgr): AppState, + Path((machine_id, inst_id)): Path<(uuid::Uuid, uuid::Uuid)>, + ) -> Result<(), HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + c.delete_network_instance( + BaseController::default(), + DeleteNetworkInstanceRequest { + inst_ids: vec![inst_id.into()], + }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(()) + } + + pub async fn start(&mut self) -> Result<(), anyhow::Error> { + let listener = TcpListener::bind(self.bind_addr).await.unwrap(); + + let app = Router::new() + .route("/api/v1/sessions", get(Self::handle_list_all_sessions)) + .route( + "/api/v1/network/:machine-id/validate-config", + post(Self::handle_validate_config), + ) + .route( + "/api/v1/network/:machine-id", + post(Self::handle_run_network_instance).get(Self::handle_list_network_instance_ids), + ) + .route( + "/api/v1/network/:machine-id/info", + get(Self::handle_collect_network_info), + ) + .route( + "/api/v1/network/:machine-id/:inst-id", + get(Self::handle_collect_one_network_info) + .delete(Self::handle_remove_network_instance), + ) + .with_state(self.client_mgr.clone()); + + let task = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + self.serve_task = Some(task.into()); + + Ok(()) + } +} diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 516a898..9f3bec5 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -17,6 +17,7 @@ readme = "README.md" [[bin]] name = "easytier-core" path = "src/easytier-core.rs" +test = false [[bin]] name = "easytier-cli" @@ -180,6 +181,9 @@ sys-locale = "0.3" ringbuf = "0.4.5" async-ringbuf = "0.3.1" +[target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies] +machine-uid = "0.5.3" + [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.52", features = [ "Win32_Networking_WinSock", diff --git a/easytier/build.rs b/easytier/build.rs index 499346d..44430fa 100644 --- a/easytier/build.rs +++ b/easytier/build.rs @@ -127,6 +127,7 @@ fn main() -> Result<(), Box> { "src/proto/error.proto", "src/proto/tests.proto", "src/proto/cli.proto", + "src/proto/web.proto", ]; for proto_file in &proto_files { @@ -138,6 +139,7 @@ fn main() -> Result<(), Box> { .type_attribute(".common", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".error", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".cli", "#[derive(serde::Serialize, serde::Deserialize)]") + .type_attribute(".web", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute( "peer_rpc.GetIpListResponse", "#[derive(serde::Serialize, serde::Deserialize)]", diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index ce61f61..6cda91a 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -231,12 +231,8 @@ impl Default for TomlConfigLoader { impl TomlConfigLoader { pub fn new_from_str(config_str: &str) -> Result { - let mut config = toml::de::from_str::(config_str).with_context(|| { - format!( - "failed to parse config file: {}\n{}", - config_str, config_str - ) - })?; + let mut config = toml::de::from_str::(config_str) + .with_context(|| format!("failed to parse config file: {}", config_str))?; config.flags_struct = Some(Self::gen_flags(config.flags.clone().unwrap_or_default())); diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index 8f3bd45..ecd68ec 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -80,6 +80,36 @@ pub fn join_joinset_background( ); } +pub fn get_machine_id() -> uuid::Uuid { + // TODO: load from local file + + #[cfg(any( + target_os = "linux", + target_os = "macos", + target_os = "windows", + target_os = "freebsd" + ))] + let gen_mid = machine_uid::get() + .map(|x| { + let mut b = [0u8; 16]; + crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b); + uuid::Uuid::from_bytes(b) + }) + .unwrap_or(uuid::Uuid::new_v4()); + + #[cfg(not(any( + target_os = "linux", + target_os = "macos", + target_os = "windows", + target_os = "freebsd" + )))] + let gen_mid = uuid::Uuid::new_v4(); + + // TODO: save to local file + + gen_mid +} + #[cfg(test)] mod tests { use super::*; diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index e935181..3a563b1 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -318,7 +318,7 @@ impl PunchSymToConeHoleClient { let punch_random = self.punch_randomly.load(Ordering::Relaxed); let punch_predicable = self.punch_predicablely.load(Ordering::Relaxed); let scoped_punch_task: ScopedTask> = tokio::spawn(async move { - if punch_predicable { + if punch_predicable && base_port_for_easy_sym.is_some() { if let Some(inc) = my_nat_info.get_inc_of_easy_sym() { let req = SendPunchPacketEasySymRequest { listener_mapped_addr: remote_mapped_addr.clone().into(), diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 7cfce53..a2d7879 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -7,14 +7,18 @@ use tabled::settings::Style; use tokio::time::timeout; use easytier::{ - common::{constants::EASYTIER_VERSION, stun::StunInfoCollector, stun::StunInfoCollectorTrait}, + common::{ + constants::EASYTIER_VERSION, + stun::{StunInfoCollector, StunInfoCollectorTrait}, + }, proto::{ cli::{ - ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest, - GetVpnPortalInfoRequest, ListConnectorRequest, ListForeignNetworkRequest, - ListGlobalForeignNetworkRequest, ListPeerRequest, ListPeerResponse, ListRouteRequest, - ListRouteResponse, NodeInfo, PeerManageRpc, PeerManageRpcClientFactory, - ShowNodeInfoRequest, VpnPortalRpc, VpnPortalRpcClientFactory, + list_peer_route_pair, ConnectorManageRpc, ConnectorManageRpcClientFactory, + DumpRouteRequest, GetVpnPortalInfoRequest, ListConnectorRequest, + ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListPeerRequest, + ListPeerResponse, ListRouteRequest, ListRouteResponse, NodeInfo, PeerManageRpc, + PeerManageRpcClientFactory, ShowNodeInfoRequest, VpnPortalRpc, + VpnPortalRpcClientFactory, }, common::NatType, peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory}, @@ -22,7 +26,7 @@ use easytier::{ rpc_types::controller::BaseController, }, tunnel::tcp::TcpTunnelConnector, - utils::{cost_to_str, float_to_str, list_peer_route_pair, PeerRoutePair}, + utils::{cost_to_str, float_to_str, PeerRoutePair}, }; #[derive(Parser, Debug)] @@ -222,25 +226,26 @@ impl CommandHandler { impl From for PeerTableItem { fn from(p: PeerRoutePair) -> Self { + let route = p.route.clone().unwrap_or_default(); PeerTableItem { - ipv4: p - .route - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default(), - hostname: p.route.hostname.clone(), - cost: cost_to_str(p.route.cost), + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + cost: cost_to_str(route.cost), lat_ms: float_to_str(p.get_latency_ms().unwrap_or(0.0), 3), loss_rate: float_to_str(p.get_loss_rate().unwrap_or(0.0), 3), rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL), tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL), - tunnel_proto: p.get_conn_protos().unwrap_or_default().join(",").to_string(), + tunnel_proto: p + .get_conn_protos() + .unwrap_or_default() + .join(",") + .to_string(), nat_type: p.get_udp_nat_type(), - id: p.route.peer_id.to_string(), - version: if p.route.version.is_empty() { + id: route.peer_id.to_string(), + version: if route.version.is_empty() { "unknown".to_string() } else { - p.route.version.to_string() + route.version.to_string() }, } } @@ -287,10 +292,7 @@ impl CommandHandler { items.push(p.into()); } - println!( - "{}", - tabled::Table::new(items).with(Style::modern()) - ); + println!("{}", tabled::Table::new(items).with(Style::modern())); Ok(()) } @@ -404,62 +406,59 @@ impl CommandHandler { }); let peer_routes = self.list_peer_route_pair().await?; for p in peer_routes.iter() { - let Some(next_hop_pair) = peer_routes - .iter() - .find(|pair| pair.route.peer_id == p.route.next_hop_peer_id) - else { + let Some(next_hop_pair) = peer_routes.iter().find(|pair| { + pair.route.clone().unwrap_or_default().peer_id + == p.route.clone().unwrap_or_default().next_hop_peer_id + }) else { continue; }; - if p.route.cost == 1 { + let route = p.route.clone().unwrap_or_default(); + if route.cost == 1 { items.push(RouteTableItem { - ipv4: p - .route - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default(), - hostname: p.route.hostname.clone(), - proxy_cidrs: p.route.proxy_cidrs.clone().join(",").to_string(), + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), next_hop_ipv4: "DIRECT".to_string(), next_hop_hostname: "".to_string(), next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - cost: p.route.cost, - version: if p.route.version.is_empty() { + cost: route.cost, + version: if route.version.is_empty() { "unknown".to_string() } else { - p.route.version.to_string() + route.version.to_string() }, }); } else { items.push(RouteTableItem { - ipv4: p - .route - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default(), - hostname: p.route.hostname.clone(), - proxy_cidrs: p.route.proxy_cidrs.clone().join(",").to_string(), + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), next_hop_ipv4: next_hop_pair .route + .clone() + .unwrap_or_default() .ipv4_addr .map(|ip| ip.to_string()) .unwrap_or_default(), - next_hop_hostname: next_hop_pair.route.hostname.clone(), + next_hop_hostname: next_hop_pair + .route + .clone() + .unwrap_or_default() + .hostname + .clone(), next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - cost: p.route.cost, - version: if p.route.version.is_empty() { + cost: route.cost, + version: if route.version.is_empty() { "unknown".to_string() } else { - p.route.version.to_string() + route.version.to_string() }, }); } } - println!( - "{}", - tabled::Table::new(items).with(Style::modern()) - ); + println!("{}", tabled::Table::new(items).with(Style::modern())); Ok(()) } @@ -576,11 +575,7 @@ async fn main() -> Result<(), Error> { }); } - println!( - "{}", - tabled::Table::new(table_rows) - .with(Style::modern()) - ); + println!("{}", tabled::Table::new(table_rows).with(Style::modern())); } SubCommand::VpnPortal => { let vpn_portal_client = handler.get_vpn_portal_client().await?; diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 7b7103e..4c11ede 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + #[macro_use] extern crate rust_i18n; @@ -21,7 +23,9 @@ use easytier::{ scoped_task::ScopedTask, }, launcher, proto, + tunnel::udp::UdpTunnelConnector, utils::{init_logger, setup_panic_handler}, + web_client, }; #[cfg(feature = "mimalloc")] @@ -34,6 +38,13 @@ static GLOBAL_MIMALLOC: GlobalMiMalloc = GlobalMiMalloc; #[derive(Parser, Debug)] #[command(name = "easytier-core", author, version = EASYTIER_VERSION , about, long_about = None)] struct Cli { + #[arg( + short = 'w', + long, + help = t!("core_clap.config_server").to_string() + )] + config_server: Option, + #[arg( short, long, @@ -640,12 +651,47 @@ pub fn handle_event(mut events: EventBusSubscriber) -> tokio::task::JoinHandle<( #[tokio::main] async fn main() { - setup_panic_handler(); - let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US")); rust_i18n::set_locale(&locale); let cli = Cli::parse(); + + setup_panic_handler(); + + if cli.config_server.is_some() { + let config_server_url_s = cli.config_server.clone().unwrap(); + let config_server_url = match url::Url::parse(&config_server_url_s) { + Ok(u) => u, + Err(_) => format!( + "udp://config-server.easytier.top:22020/{}", + config_server_url_s + ) + .parse() + .unwrap(), + }; + + let mut c_url = config_server_url.clone(); + c_url.set_path(""); + let token = config_server_url + .path_segments() + .and_then(|mut x| x.next()) + .map(|x| x.to_string()) + .unwrap_or_default(); + + println!( + "Entering config client mode...\n server: {}\n token: {}", + c_url, token, + ); + + if token.is_empty() { + panic!("empty token"); + } + + let _wc = web_client::WebClient::new(UdpTunnelConnector::new(c_url), token.to_string()); + tokio::signal::ctrl_c().await.unwrap(); + return; + } + let cfg = TomlConfigLoader::from(cli); init_logger(&cfg, false).unwrap(); diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 4eccd08..0e56c09 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -12,27 +12,12 @@ use crate::{ }, instance::instance::Instance, peers::rpc_service::PeerManagerRpcService, - proto::{ - cli::{PeerInfo, Route}, - common::StunInfo, - peer_rpc::GetIpListResponse, - }, - utils::{list_peer_route_pair, PeerRoutePair}, + proto::cli::{list_peer_route_pair, PeerInfo, Route}, }; use chrono::{DateTime, Local}; -use serde::{Deserialize, Serialize}; use tokio::{sync::broadcast, task::JoinSet}; -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -pub struct MyNodeInfo { - pub virtual_ipv4: String, - pub hostname: String, - pub version: String, - pub ips: GetIpListResponse, - pub stun_info: StunInfo, - pub listeners: Vec, - pub vpn_portal_cfg: Option, -} +pub type MyNodeInfo = crate::proto::web::MyNodeInfo; struct EasyTierData { events: RwLock, GlobalCtxEvent)>>, @@ -164,18 +149,15 @@ impl EasyTierLauncher { global_ctx_c.get_flags().dev_name.clone(); let node_info = MyNodeInfo { - virtual_ipv4: global_ctx_c - .get_ipv4() - .map(|x| x.to_string()) - .unwrap_or_default(), + virtual_ipv4: global_ctx_c.get_ipv4().map(|x| x.address().into()), hostname: global_ctx_c.get_hostname(), version: EASYTIER_VERSION.to_string(), - ips: global_ctx_c.get_ip_collector().collect_ip_addrs().await, - stun_info: global_ctx_c.get_stun_info_collector().get_stun_info(), + ips: Some(global_ctx_c.get_ip_collector().collect_ip_addrs().await), + stun_info: Some(global_ctx_c.get_stun_info_collector().get_stun_info()), listeners: global_ctx_c .get_running_listeners() - .iter() - .map(|x| x.to_string()) + .into_iter() + .map(Into::into) .collect(), vpn_portal_cfg: Some( vpn_portal @@ -311,18 +293,7 @@ impl Drop for EasyTierLauncher { } } -#[derive(Deserialize, Serialize, Debug)] -pub struct NetworkInstanceRunningInfo { - pub dev_name: String, - pub my_node_info: MyNodeInfo, - pub events: Vec<(DateTime, GlobalCtxEvent)>, - pub node_info: MyNodeInfo, - pub routes: Vec, - pub peers: Vec, - pub peer_route_pairs: Vec, - pub running: bool, - pub error_msg: Option, -} +pub type NetworkInstanceRunningInfo = crate::proto::web::NetworkInstanceRunningInfo; pub struct NetworkInstance { config: TomlConfigLoader, @@ -362,9 +333,13 @@ impl NetworkInstance { Some(NetworkInstanceRunningInfo { dev_name: launcher.get_dev_name(), - my_node_info: launcher.get_node_info(), - events: launcher.get_events(), - node_info: launcher.get_node_info(), + my_node_info: Some(launcher.get_node_info()), + events: launcher + .get_events() + .iter() + .map(|(t, e)| (t.to_string(), format!("{:?}", e))) + .collect(), + node_info: Some(launcher.get_node_info()), routes, peers, peer_route_pairs, diff --git a/easytier/src/lib.rs b/easytier/src/lib.rs index f719d88..367e8e7 100644 --- a/easytier/src/lib.rs +++ b/easytier/src/lib.rs @@ -13,6 +13,7 @@ pub mod peers; pub mod proto; pub mod tunnel; pub mod utils; +pub mod web_client; #[cfg(test)] mod tests; diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 1ff813d..08944cd 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -353,12 +353,13 @@ impl ForeignNetworkManagerData { } async fn clear_no_conn_peer(&self, network_name: &String) { - let peer_map = self + let Some(peer_map) = self .network_peer_maps .get(network_name) - .unwrap() - .peer_map - .clone(); + .and_then(|v| Some(v.peer_map.clone())) + else { + return; + }; peer_map.clean_peer_without_conn().await; } diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index 74321ca..4221635 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -56,6 +56,11 @@ message Route { common.PeerFeatureFlag feature_flag = 10; } +message PeerRoutePair { + Route route = 1; + PeerInfo peer = 2; +} + message NodeInfo { uint32 peer_id = 1; string ipv4_addr = 2; diff --git a/easytier/src/proto/cli.rs b/easytier/src/proto/cli.rs index 31bbf4d..236df89 100644 --- a/easytier/src/proto/cli.rs +++ b/easytier/src/proto/cli.rs @@ -1 +1,113 @@ include!(concat!(env!("OUT_DIR"), "/cli.rs")); + +impl PeerRoutePair { + pub fn get_latency_ms(&self) -> Option { + let mut ret = u64::MAX; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(stats) = &conn.stats else { + continue; + }; + ret = ret.min(stats.latency_us); + } + + if ret == u64::MAX { + None + } else { + Some(f64::from(ret as u32) / 1000.0) + } + } + + pub fn get_rx_bytes(&self) -> Option { + let mut ret = 0; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(stats) = &conn.stats else { + continue; + }; + ret += stats.rx_bytes; + } + + if ret == 0 { + None + } else { + Some(ret) + } + } + + pub fn get_tx_bytes(&self) -> Option { + let mut ret = 0; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(stats) = &conn.stats else { + continue; + }; + ret += stats.tx_bytes; + } + + if ret == 0 { + None + } else { + Some(ret) + } + } + + pub fn get_loss_rate(&self) -> Option { + let mut ret = 0.0; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + ret += conn.loss_rate; + } + + if ret == 0.0 { + None + } else { + Some(ret as f64) + } + } + + pub fn get_conn_protos(&self) -> Option> { + let mut ret = vec![]; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(tunnel_info) = &conn.tunnel else { + continue; + }; + // insert if not exists + if !ret.contains(&tunnel_info.tunnel_type) { + ret.push(tunnel_info.tunnel_type.clone()); + } + } + + if ret.is_empty() { + None + } else { + Some(ret) + } + } + + pub fn get_udp_nat_type(self: &Self) -> String { + use crate::proto::common::NatType; + let mut ret = NatType::Unknown; + if let Some(r) = &self.route.clone().unwrap_or_default().stun_info { + ret = NatType::try_from(r.udp_nat_type).unwrap(); + } + format!("{:?}", ret) + } +} + +pub fn list_peer_route_pair(peers: Vec, routes: Vec) -> Vec { + let mut pairs: Vec = vec![]; + + for route in routes.iter() { + let peer = peers.iter().find(|peer| peer.peer_id == route.peer_id); + let pair = PeerRoutePair { + route: Some(route.clone()), + peer: peer.cloned(), + }; + + pairs.push(pair); + } + + pairs +} diff --git a/easytier/src/proto/error.proto b/easytier/src/proto/error.proto index 80d12f4..5b5f537 100644 --- a/easytier/src/proto/error.proto +++ b/easytier/src/proto/error.proto @@ -21,7 +21,7 @@ message MalformatRpcPacket { string error_message = 1; } message Timeout { string error_message = 1; } message Error { - oneof error { + oneof error_kind { OtherError other_error = 1; InvalidMethodIndex invalid_method_index = 2; InvalidService invalid_service = 3; diff --git a/easytier/src/proto/error.rs b/easytier/src/proto/error.rs index c3ab2bd..b2cb2d3 100644 --- a/easytier/src/proto/error.rs +++ b/easytier/src/proto/error.rs @@ -6,44 +6,44 @@ include!(concat!(env!("OUT_DIR"), "/error.rs")); impl From<&rpc_types::error::Error> for Error { fn from(e: &rpc_types::error::Error) -> Self { - use super::error::error::Error as ProtoError; + use super::error::error::ErrorKind as ProtoError; match e { rpc_types::error::Error::ExecutionError(e) => Self { - error: Some(ProtoError::ExecuteError(ExecuteError { - error_message: e.to_string(), + error_kind: Some(ProtoError::ExecuteError(ExecuteError { + error_message: format!("{:?}", e), })), }, rpc_types::error::Error::DecodeError(_) => Self { - error: Some(ProtoError::ProstDecodeError(ProstDecodeError {})), + error_kind: Some(ProtoError::ProstDecodeError(ProstDecodeError {})), }, rpc_types::error::Error::EncodeError(_) => Self { - error: Some(ProtoError::ProstEncodeError(ProstEncodeError {})), + error_kind: Some(ProtoError::ProstEncodeError(ProstEncodeError {})), }, rpc_types::error::Error::InvalidMethodIndex(m, s) => Self { - error: Some(ProtoError::InvalidMethodIndex(InvalidMethodIndex { + error_kind: Some(ProtoError::InvalidMethodIndex(InvalidMethodIndex { method_index: *m as u32, - service_name: s.to_string(), + service_name: format!("{:?}", s), })), }, rpc_types::error::Error::InvalidServiceKey(s, _) => Self { - error: Some(ProtoError::InvalidService(InvalidService { - service_name: s.to_string(), + error_kind: Some(ProtoError::InvalidService(InvalidService { + service_name: format!("{:?}", s), })), }, rpc_types::error::Error::MalformatRpcPacket(e) => Self { - error: Some(ProtoError::MalformatRpcPacket(MalformatRpcPacket { - error_message: e.to_string(), + error_kind: Some(ProtoError::MalformatRpcPacket(MalformatRpcPacket { + error_message: format!("{:?}", e), })), }, rpc_types::error::Error::Timeout(e) => Self { - error: Some(ProtoError::Timeout(Timeout { - error_message: e.to_string(), + error_kind: Some(ProtoError::Timeout(Timeout { + error_message: format!("{:?}", e), })), }, #[allow(unreachable_patterns)] e => Self { - error: Some(ProtoError::OtherError(OtherError { - error_message: e.to_string(), + error_kind: Some(ProtoError::OtherError(OtherError { + error_message: format!("{:?}", e), })), }, } @@ -52,8 +52,8 @@ impl From<&rpc_types::error::Error> for Error { impl From<&Error> for rpc_types::error::Error { fn from(e: &Error) -> Self { - use super::error::error::Error as ProtoError; - match &e.error { + use super::error::error::ErrorKind as ProtoError; + match &e.error_kind { Some(ProtoError::ExecuteError(e)) => { Self::ExecutionError(anyhow::anyhow!(e.error_message.clone())) } diff --git a/easytier/src/proto/mod.rs b/easytier/src/proto/mod.rs index 4610ba6..0db7766 100644 --- a/easytier/src/proto/mod.rs +++ b/easytier/src/proto/mod.rs @@ -5,6 +5,7 @@ pub mod cli; pub mod common; pub mod error; pub mod peer_rpc; +pub mod web; #[cfg(test)] pub mod tests; diff --git a/easytier/src/proto/rpc_impl/bidirect.rs b/easytier/src/proto/rpc_impl/bidirect.rs index 28d94b1..a673949 100644 --- a/easytier/src/proto/rpc_impl/bidirect.rs +++ b/easytier/src/proto/rpc_impl/bidirect.rs @@ -1,9 +1,10 @@ -use std::sync::{Arc, Mutex}; +use std::sync::{atomic::AtomicBool, Arc, Mutex}; use futures::{SinkExt as _, StreamExt}; use tokio::{task::JoinSet, time::timeout}; use crate::{ + defer, proto::rpc_types::error::Error, tunnel::{packet_def::PacketType, ring::create_ring_tunnel_pair, Tunnel}, }; @@ -17,6 +18,7 @@ pub struct BidirectRpcManager { rx_timeout: Option, error: Arc>>, tunnel: Mutex>>, + running: Arc, tasks: Mutex>>, } @@ -30,6 +32,7 @@ impl BidirectRpcManager { rx_timeout: None, error: Arc::new(Mutex::new(None)), tunnel: Mutex::new(None), + running: Arc::new(AtomicBool::new(false)), tasks: Mutex::new(None), } @@ -50,6 +53,8 @@ impl BidirectRpcManager { let mut tasks = JoinSet::new(); self.rpc_client.run(); self.rpc_server.run(); + self.running + .store(true, std::sync::atomic::Ordering::Relaxed); let (server_tx, mut server_rx) = ( self.rpc_server.get_transport_sink(), @@ -64,7 +69,11 @@ impl BidirectRpcManager { self.tunnel.lock().unwrap().replace(inner); let e_clone = self.error.clone(); + let r_clone = self.running.clone(); tasks.spawn(async move { + defer! { + r_clone.store(false, std::sync::atomic::Ordering::Relaxed); + } loop { let packet = tokio::select! { Some(Ok(packet)) = server_rx.next() => { @@ -90,7 +99,11 @@ impl BidirectRpcManager { let recv_timeout = self.rx_timeout; let e_clone = self.error.clone(); + let r_clone = self.running.clone(); tasks.spawn(async move { + defer! { + r_clone.store(false, std::sync::atomic::Ordering::Relaxed); + } loop { let ret = if let Some(recv_timeout) = recv_timeout { match timeout(recv_timeout, inner_rx.next()).await { @@ -161,4 +174,8 @@ impl BidirectRpcManager { tasks.abort_all(); } } + + pub fn is_running(&self) -> bool { + self.running.load(std::sync::atomic::Ordering::Relaxed) + } } diff --git a/easytier/src/proto/tests.rs b/easytier/src/proto/tests.rs index 546f1c4..93c872f 100644 --- a/easytier/src/proto/tests.rs +++ b/easytier/src/proto/tests.rs @@ -307,6 +307,7 @@ async fn test_bidirect_rpc_manager() { use crate::proto::rpc_impl::bidirect::BidirectRpcManager; use crate::tunnel::tcp::{TcpTunnelConnector, TcpTunnelListener}; use crate::tunnel::{TunnelConnector, TunnelListener}; + use tokio::sync::Notify; let c = BidirectRpcManager::new(); let s = BidirectRpcManager::new(); @@ -323,6 +324,8 @@ async fn test_bidirect_rpc_manager() { }); s.rpc_server().registry().register(service, "test"); + let server_test_done = Arc::new(Notify::new()); + let server_test_done_clone = server_test_done.clone(); let mut tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:55443".parse().unwrap()); let s_task: ScopedTask<()> = tokio::spawn(async move { tcp_listener.listen().await.unwrap(); @@ -344,6 +347,8 @@ async fn test_bidirect_rpc_manager() { assert_eq!(ret.greeting, "Hello Client world!"); println!("server done, {:?}", ret); + server_test_done_clone.notify_one(); + s.wait().await; }) .into(); @@ -369,6 +374,7 @@ async fn test_bidirect_rpc_manager() { assert_eq!(ret.greeting, "Hello Server world!"); println!("client done, {:?}", ret); + server_test_done.notified().await; drop(c); s_task.await.unwrap(); } diff --git a/easytier/src/proto/web.proto b/easytier/src/proto/web.proto new file mode 100644 index 0000000..9976c3f --- /dev/null +++ b/easytier/src/proto/web.proto @@ -0,0 +1,100 @@ +syntax = "proto3"; + +import "common.proto"; +import "peer_rpc.proto"; +import "cli.proto"; + +package web; + +message MyNodeInfo { + common.Ipv4Addr virtual_ipv4 = 1; + string hostname = 2; + string version = 3; + peer_rpc.GetIpListResponse ips = 4; + common.StunInfo stun_info = 5; + repeated common.Url listeners = 6; + optional string vpn_portal_cfg = 7; +} + +message NetworkInstanceRunningInfo { + string dev_name = 1; + MyNodeInfo my_node_info = 2; + map events = 3; + MyNodeInfo node_info = 4; + repeated cli.Route routes = 5; + repeated cli.PeerInfo peers = 6; + repeated cli.PeerRoutePair peer_route_pairs = 7; + bool running = 8; + optional string error_msg = 9; +} + +message NetworkInstanceRunningInfoMap { + map map = 1; +} + +message HeartbeatRequest { + common.UUID machine_id = 1; + common.UUID inst_id = 2; + string user_token = 3; +} + +message HeartbeatResponse { +} + +service WebServerService { + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {} +} + +message ValidateConfigRequest { + string config = 1; +} + +message ValidateConfigResponse { +} + +message RunNetworkInstanceRequest { + string config = 1; +} + +message RunNetworkInstanceResponse { +} + +message RetainNetworkInstanceRequest { + repeated common.UUID inst_ids = 1; +} + +message RetainNetworkInstanceResponse { + repeated common.UUID remain_inst_ids = 1; +} + +message CollectNetworkInfoRequest { + repeated common.UUID inst_ids = 1; +} + +message CollectNetworkInfoResponse { + NetworkInstanceRunningInfoMap info = 1; +} + +message ListNetworkInstanceRequest { +} + +message ListNetworkInstanceResponse { + repeated common.UUID inst_ids = 1; +} + +message DeleteNetworkInstanceRequest { + repeated common.UUID inst_ids = 1; +} + +message DeleteNetworkInstanceResponse { + repeated common.UUID remain_inst_ids = 1; +} + +service WebClientService { + rpc ValidateConfig(ValidateConfigRequest) returns (ValidateConfigResponse) {} + rpc RunNetworkInstance(RunNetworkInstanceRequest) returns (RunNetworkInstanceResponse) {} + rpc RetainNetworkInstance(RetainNetworkInstanceRequest) returns (RetainNetworkInstanceResponse) {} + rpc CollectNetworkInfo(CollectNetworkInfoRequest) returns (CollectNetworkInfoResponse) {} + rpc ListNetworkInstance(ListNetworkInstanceRequest) returns (ListNetworkInstanceResponse) {} + rpc DeleteNetworkInstance(DeleteNetworkInstanceRequest) returns (DeleteNetworkInstanceResponse) {} +} diff --git a/easytier/src/proto/web.rs b/easytier/src/proto/web.rs new file mode 100644 index 0000000..a3254ec --- /dev/null +++ b/easytier/src/proto/web.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/web.rs")); diff --git a/easytier/src/utils.rs b/easytier/src/utils.rs index 31e3a56..36583c5 100644 --- a/easytier/src/utils.rs +++ b/easytier/src/utils.rs @@ -1,132 +1,10 @@ use anyhow::Context; -use serde::{Deserialize, Serialize}; use tracing::level_filters::LevelFilter; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; -use crate::{ - common::{config::ConfigLoader, get_logger_timer_rfc3339}, - proto::{ - cli::{PeerInfo, Route}, - common::NatType, - }, -}; +use crate::common::{config::ConfigLoader, get_logger_timer_rfc3339}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerRoutePair { - pub route: Route, - pub peer: Option, -} - -impl PeerRoutePair { - pub fn get_latency_ms(&self) -> Option { - let mut ret = u64::MAX; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(stats) = &conn.stats else { - continue; - }; - ret = ret.min(stats.latency_us); - } - - if ret == u64::MAX { - None - } else { - Some(f64::from(ret as u32) / 1000.0) - } - } - - pub fn get_rx_bytes(&self) -> Option { - let mut ret = 0; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(stats) = &conn.stats else { - continue; - }; - ret += stats.rx_bytes; - } - - if ret == 0 { - None - } else { - Some(ret) - } - } - - pub fn get_tx_bytes(&self) -> Option { - let mut ret = 0; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(stats) = &conn.stats else { - continue; - }; - ret += stats.tx_bytes; - } - - if ret == 0 { - None - } else { - Some(ret) - } - } - - pub fn get_loss_rate(&self) -> Option { - let mut ret = 0.0; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - ret += conn.loss_rate; - } - - if ret == 0.0 { - None - } else { - Some(ret as f64) - } - } - - pub fn get_conn_protos(&self) -> Option> { - let mut ret = vec![]; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(tunnel_info) = &conn.tunnel else { - continue; - }; - // insert if not exists - if !ret.contains(&tunnel_info.tunnel_type) { - ret.push(tunnel_info.tunnel_type.clone()); - } - } - - if ret.is_empty() { - None - } else { - Some(ret) - } - } - - pub fn get_udp_nat_type(self: &Self) -> String { - let mut ret = NatType::Unknown; - if let Some(r) = &self.route.stun_info { - ret = NatType::try_from(r.udp_nat_type).unwrap(); - } - format!("{:?}", ret) - } -} - -pub fn list_peer_route_pair(peers: Vec, routes: Vec) -> Vec { - let mut pairs: Vec = vec![]; - - for route in routes.iter() { - let peer = peers.iter().find(|peer| peer.peer_id == route.peer_id); - let pair = PeerRoutePair { - route: route.clone(), - peer: peer.cloned(), - }; - - pairs.push(pair); - } - - pairs -} +pub type PeerRoutePair = crate::proto::cli::PeerRoutePair; pub fn cost_to_str(cost: i32) -> String { if cost == 1 { @@ -250,7 +128,7 @@ pub fn setup_panic_handler() { use std::io::Write; std::panic::set_hook(Box::new(|info| { let backtrace = backtrace::Backtrace::force_capture(); - println!("panic occurred: {:?}", info); + println!("panic occurred: {:?}, backtrace: {:#?}", info, backtrace); let _ = std::fs::File::create("easytier-panic.log") .and_then(|mut f| f.write_all(format!("{:?}\n{:#?}", info, backtrace).as_bytes())); std::process::exit(1); diff --git a/easytier/src/web_client/controller.rs b/easytier/src/web_client/controller.rs new file mode 100644 index 0000000..fa1c0d1 --- /dev/null +++ b/easytier/src/web_client/controller.rs @@ -0,0 +1,171 @@ +use std::collections::BTreeMap; + +use dashmap::DashMap; + +use crate::{ + common::config::{ConfigLoader, TomlConfigLoader}, + launcher::NetworkInstance, + proto::{ + rpc_types::{self, controller::BaseController}, + web::{ + CollectNetworkInfoRequest, CollectNetworkInfoResponse, DeleteNetworkInstanceRequest, + DeleteNetworkInstanceResponse, ListNetworkInstanceRequest, ListNetworkInstanceResponse, + NetworkInstanceRunningInfoMap, RetainNetworkInstanceRequest, + RetainNetworkInstanceResponse, RunNetworkInstanceRequest, RunNetworkInstanceResponse, + ValidateConfigRequest, ValidateConfigResponse, WebClientService, + }, + }, +}; + +pub struct Controller { + token: String, + instance_map: DashMap, +} + +impl Controller { + pub fn new(token: String) -> Self { + Controller { + token, + instance_map: DashMap::new(), + } + } + + pub fn run_network_instance(&self, cfg: TomlConfigLoader) -> Result<(), anyhow::Error> { + let instance_id = cfg.get_id(); + if self.instance_map.contains_key(&instance_id) { + anyhow::bail!("instance {} already exists", instance_id); + } + + let mut instance = NetworkInstance::new(cfg); + instance.start()?; + + println!("instance {} started", instance_id); + self.instance_map.insert(instance_id, instance); + Ok(()) + } + + pub fn retain_network_instance( + &self, + instance_ids: Vec, + ) -> Result { + self.instance_map.retain(|k, _| instance_ids.contains(k)); + let remain = self + .instance_map + .iter() + .map(|item| item.key().clone().into()) + .collect::>(); + println!("instance {:?} retained", remain); + Ok(RetainNetworkInstanceResponse { + remain_inst_ids: remain, + }) + } + + pub fn collect_network_infos(&self) -> Result { + let mut map = BTreeMap::new(); + for instance in self.instance_map.iter() { + if let Some(info) = instance.get_running_info() { + map.insert(instance.key().to_string(), info); + } + } + Ok(NetworkInstanceRunningInfoMap { map }) + } + + pub fn list_network_instance_ids(&self) -> Vec { + self.instance_map + .iter() + .map(|item| item.key().clone()) + .collect() + } + + pub fn token(&self) -> String { + self.token.clone() + } +} + +#[async_trait::async_trait] +impl WebClientService for Controller { + type Controller = BaseController; + + async fn validate_config( + &self, + _: BaseController, + req: ValidateConfigRequest, + ) -> Result { + let _ = TomlConfigLoader::new_from_str(&req.config)?; + Ok(ValidateConfigResponse {}) + } + + async fn run_network_instance( + &self, + _: BaseController, + req: RunNetworkInstanceRequest, + ) -> Result { + let cfg = TomlConfigLoader::new_from_str(&req.config)?; + self.run_network_instance(cfg)?; + Ok(RunNetworkInstanceResponse {}) + } + + async fn retain_network_instance( + &self, + _: BaseController, + req: RetainNetworkInstanceRequest, + ) -> Result { + Ok(self.retain_network_instance(req.inst_ids.into_iter().map(Into::into).collect())?) + } + + async fn collect_network_info( + &self, + _: BaseController, + req: CollectNetworkInfoRequest, + ) -> Result { + let mut ret = self.collect_network_infos()?; + let include_inst_ids = req + .inst_ids + .iter() + .cloned() + .map(|id| id.to_string()) + .collect::>(); + if !include_inst_ids.is_empty() { + let mut to_remove = Vec::new(); + for (k, _) in ret.map.iter() { + if !include_inst_ids.contains(&k) { + to_remove.push(k.clone()); + } + } + + for k in to_remove { + ret.map.remove(&k); + } + } + Ok(CollectNetworkInfoResponse { info: Some(ret) }) + } + + // rpc ListNetworkInstance(ListNetworkInstanceRequest) returns (ListNetworkInstanceResponse) {} + async fn list_network_instance( + &self, + _: BaseController, + _: ListNetworkInstanceRequest, + ) -> Result { + Ok(ListNetworkInstanceResponse { + inst_ids: self + .list_network_instance_ids() + .into_iter() + .map(Into::into) + .collect(), + }) + } + + // rpc DeleteNetworkInstance(DeleteNetworkInstanceRequest) returns (DeleteNetworkInstanceResponse) {} + async fn delete_network_instance( + &self, + _: BaseController, + req: DeleteNetworkInstanceRequest, + ) -> Result { + let mut inst_ids = self.list_network_instance_ids(); + inst_ids.retain(|id| !req.inst_ids.contains(&(id.clone().into()))); + self.retain_network_instance(inst_ids.clone())?; + Ok(DeleteNetworkInstanceResponse { + remain_inst_ids: inst_ids.into_iter().map(Into::into).collect(), + }) + } +} diff --git a/easytier/src/web_client/mod.rs b/easytier/src/web_client/mod.rs new file mode 100644 index 0000000..524afc1 --- /dev/null +++ b/easytier/src/web_client/mod.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use crate::{common::scoped_task::ScopedTask, tunnel::TunnelConnector}; + +pub mod controller; +pub mod session; + +pub struct WebClient { + controller: Arc, + tasks: ScopedTask<()>, +} + +impl WebClient { + pub fn new(connector: T, token: S) -> Self { + let controller = Arc::new(controller::Controller::new(token.to_string())); + + let controller_clone = controller.clone(); + let tasks = ScopedTask::from(tokio::spawn(async move { + Self::routine(controller_clone, Box::new(connector)).await; + })); + + WebClient { controller, tasks } + } + + async fn routine( + controller: Arc, + mut connector: Box, + ) { + loop { + let conn = match connector.connect().await { + Ok(conn) => conn, + Err(e) => { + println!( + "Failed to connect to the server ({}), retrying in 5 seconds...", + e + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + + println!("Successfully connected to {:?}", conn.info()); + + let mut session = session::Session::new(conn, controller.clone()); + session.wait().await; + } + } +} diff --git a/easytier/src/web_client/session.rs b/easytier/src/web_client/session.rs new file mode 100644 index 0000000..5706f18 --- /dev/null +++ b/easytier/src/web_client/session.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinSet, + time::interval, +}; + +use crate::{ + common::get_machine_id, + proto::{ + rpc_impl::bidirect::BidirectRpcManager, + rpc_types::controller::BaseController, + web::{ + HeartbeatRequest, HeartbeatResponse, WebClientServiceServer, + WebServerServiceClientFactory, + }, + }, + tunnel::Tunnel, +}; + +use super::controller::Controller; + +#[derive(Debug, Clone)] +struct HeartbeatCtx { + notifier: Arc>, + resp: Arc>>, +} + +pub struct Session { + rpc_mgr: BidirectRpcManager, + controller: Arc, + + heartbeat_ctx: HeartbeatCtx, + + tasks: Mutex>, +} + +impl Session { + pub fn new(tunnel: Box, controller: Arc) -> Self { + let rpc_mgr = BidirectRpcManager::new(); + rpc_mgr.run_with_tunnel(tunnel); + + rpc_mgr + .rpc_server() + .registry() + .register(WebClientServiceServer::new(controller.clone()), ""); + + let mut tasks: JoinSet<()> = JoinSet::new(); + let heartbeat_ctx = Self::heartbeat_routine(&rpc_mgr, controller.token(), &mut tasks); + + Session { + rpc_mgr, + controller, + heartbeat_ctx, + tasks: Mutex::new(tasks), + } + } + + fn heartbeat_routine( + rpc_mgr: &BidirectRpcManager, + token: String, + tasks: &mut JoinSet<()>, + ) -> HeartbeatCtx { + let (tx, _rx1) = broadcast::channel(2); + + let ctx = HeartbeatCtx { + notifier: Arc::new(tx), + resp: Arc::new(Mutex::new(None)), + }; + + let mid = get_machine_id(); + let inst_id = uuid::Uuid::new_v4(); + let token = token; + + let ctx_clone = ctx.clone(); + let mut tick = interval(std::time::Duration::from_secs(1)); + let client = rpc_mgr + .rpc_client() + .scoped_client::>(1, 1, "".to_string()); + tasks.spawn(async move { + let req = HeartbeatRequest { + machine_id: Some(mid.into()), + inst_id: Some(inst_id.into()), + user_token: token.to_string(), + }; + loop { + tick.tick().await; + match client + .heartbeat(BaseController::default(), req.clone()) + .await + { + Err(e) => { + tracing::error!("heartbeat failed: {:?}", e); + break; + } + Ok(resp) => { + tracing::debug!("heartbeat response: {:?}", resp); + let _ = ctx_clone.notifier.send(resp.clone()); + ctx_clone.resp.lock().await.replace(resp); + } + } + } + }); + + ctx + } + + async fn wait_routines(&self) { + self.tasks.lock().await.join_next().await; + // if any task failed, we should abort all tasks + self.tasks.lock().await.abort_all(); + } + + pub async fn wait(&mut self) { + tokio::select! { + _ = self.rpc_mgr.wait() => {} + _ = self.wait_routines() => {} + } + } + + pub async fn wait_next_heartbeat(&self) -> Option { + let mut rx = self.heartbeat_ctx.notifier.subscribe(); + rx.recv().await.ok() + } +}