add client gui for easytier (#50)

This commit is contained in:
Sijie.Sun 2024-04-06 22:44:30 +08:00 committed by GitHub
parent 4eb7efe5fc
commit 727ef37ae4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1763 additions and 177 deletions

View File

@ -104,6 +104,7 @@ jobs:
TAG=$GITHUB_SHA
fi
mv ./target/$TARGET/release/easytier-core"$SUFFIX" ./artifacts/objects/
mv ./target/$TARGET/release/easytier-gui"$SUFFIX" ./artifacts/objects/
mv ./target/$TARGET/release/easytier-cli"$SUFFIX" ./artifacts/objects/
tar -cvf ./artifacts/$NAME-$TARGET-$TAG.tar -C ./artifacts/objects/ .
rm -rf ./artifacts/objects/
@ -124,7 +125,6 @@ jobs:
remote-path: /easytier-releases/${{ github.sha }}/
no-delete-remote-files: true
retry: 5
increment: true
test:
runs-on: ubuntu-latest
steps:

View File

@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["easytier"]
members = ["easytier", "easytier-gui"]
[profile.dev]
panic = "unwind"

48
easytier-gui/Cargo.toml Normal file
View File

@ -0,0 +1,48 @@
[package]
name = "easytier-gui"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/KKRainbow/EasyTier"
repository = "https://github.com/KKRainbow/EasyTier"
version = "0.1.0"
edition = "2021"
authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"]
categories = ["network-programming"]
rust-version = "1.75"
license-file = "LICENSE"
readme = "README.md"
[dependencies]
easytier = { path = "../easytier" }
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
chrono = "0.4.37"
once_cell = "1.18.0"
dashmap = "5.5.3"
egui = { version = "0.27.2" }
egui-modal = "0.3.6"
humansize = "2.1.3"
eframe = { version = "0.27.2", features = [
"default",
"serde",
"persistence",
"wgpu"
] }
wgpu = { version = "0.19.3", features = [ "webgpu", "webgl"] }
# For image support:
egui_extras = { version = "0.27.2", features = ["default", "image"] }
env_logger = { version = "0.10", default-features = false, features = [
"auto-color",
"humantime",
] }
egui_tiles = "0.8.0"
derivative = "2.2.0"
serde = { version = "1.0", features = ["derive"] }
elevated-command = "1.1.2"

1
easytier-gui/LICENSE Symbolic link
View File

@ -0,0 +1 @@
../LICENSE

0
easytier-gui/README.md Normal file
View File

Binary file not shown.

View File

@ -0,0 +1,217 @@
use std::{
collections::VecDeque,
sync::{atomic::AtomicBool, Arc, RwLock},
};
use chrono::{DateTime, Local};
use easytier::{
common::{
config::{ConfigLoader, TomlConfigLoader},
global_ctx::GlobalCtxEvent,
stun::StunInfoCollectorTrait,
},
instance::instance::Instance,
peers::rpc_service::PeerManagerRpcService,
rpc::{
cli::{PeerInfo, Route, StunInfo},
peer::GetIpListResponse,
},
};
#[derive(Default, Clone)]
pub struct MyNodeInfo {
pub virtual_ipv4: String,
pub ips: GetIpListResponse,
pub stun_info: StunInfo,
pub listeners: Vec<String>,
pub vpn_portal_cfg: Option<String>,
}
#[derive(Default, Clone)]
struct EasyTierData {
events: Arc<RwLock<VecDeque<(DateTime<Local>, GlobalCtxEvent)>>>,
node_info: Arc<RwLock<MyNodeInfo>>,
routes: Arc<RwLock<Vec<Route>>>,
peers: Arc<RwLock<Vec<PeerInfo>>>,
}
pub struct EasyTierLauncher {
instance_alive: Arc<AtomicBool>,
stop_flag: Arc<AtomicBool>,
thread_handle: Option<std::thread::JoinHandle<()>>,
running_cfg: String,
error_msg: Arc<RwLock<Option<String>>>,
data: EasyTierData,
}
impl EasyTierLauncher {
pub fn new() -> Self {
let instance_alive = Arc::new(AtomicBool::new(false));
Self {
instance_alive,
thread_handle: None,
error_msg: Arc::new(RwLock::new(None)),
running_cfg: String::new(),
stop_flag: Arc::new(AtomicBool::new(false)),
data: EasyTierData::default(),
}
}
async fn handle_easytier_event(event: GlobalCtxEvent, data: EasyTierData) {
let mut events = data.events.write().unwrap();
events.push_back((chrono::Local::now(), event));
if events.len() > 100 {
events.pop_front();
}
}
async fn easytier_routine(
cfg: TomlConfigLoader,
stop_signal: Arc<tokio::sync::Notify>,
data: EasyTierData,
) -> Result<(), anyhow::Error> {
let mut instance = Instance::new(cfg);
let peer_mgr = instance.get_peer_manager();
// Subscribe to global context events
let global_ctx = instance.get_global_ctx();
let data_c = data.clone();
tokio::spawn(async move {
let mut receiver = global_ctx.subscribe();
while let Ok(event) = receiver.recv().await {
Self::handle_easytier_event(event, data_c.clone()).await;
}
});
// update my node info
let data_c = data.clone();
let global_ctx_c = instance.get_global_ctx();
let peer_mgr_c = peer_mgr.clone();
let vpn_portal = instance.get_vpn_portal_inst();
tokio::spawn(async move {
loop {
let node_info = MyNodeInfo {
virtual_ipv4: global_ctx_c
.get_ipv4()
.map(|x| x.to_string())
.unwrap_or_default(),
ips: global_ctx_c.get_ip_collector().collect_ip_addrs().await,
stun_info: global_ctx_c.get_stun_info_collector().get_stun_info(),
listeners: global_ctx_c
.get_running_listeners()
.iter()
.map(|x| x.to_string())
.collect(),
vpn_portal_cfg: Some(
vpn_portal
.lock()
.await
.dump_client_config(peer_mgr_c.clone())
.await,
),
};
*data_c.node_info.write().unwrap() = node_info.clone();
*data_c.routes.write().unwrap() = peer_mgr_c.list_routes().await;
*data_c.peers.write().unwrap() = PeerManagerRpcService::new(peer_mgr_c.clone())
.list_peers()
.await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
instance.run().await?;
stop_signal.notified().await;
Ok(())
}
pub fn start<F>(&mut self, cfg_generator: F)
where
F: FnOnce() -> Result<TomlConfigLoader, anyhow::Error> + Send + Sync,
{
let error_msg = self.error_msg.clone();
let cfg = cfg_generator();
if let Err(e) = cfg {
error_msg.write().unwrap().replace(e.to_string());
return;
}
self.running_cfg = cfg.as_ref().unwrap().dump();
let stop_flag = self.stop_flag.clone();
let instance_alive = self.instance_alive.clone();
instance_alive.store(true, std::sync::atomic::Ordering::Relaxed);
let data = self.data.clone();
self.thread_handle = Some(std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let stop_notifier = Arc::new(tokio::sync::Notify::new());
let stop_notifier_clone = stop_notifier.clone();
rt.spawn(async move {
while !stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
stop_notifier_clone.notify_one();
});
let ret = rt.block_on(Self::easytier_routine(
cfg.unwrap(),
stop_notifier.clone(),
data,
));
if let Err(e) = ret {
error_msg.write().unwrap().replace(e.to_string());
}
instance_alive.store(false, std::sync::atomic::Ordering::Relaxed);
}));
}
pub fn error_msg(&self) -> Option<String> {
self.error_msg.read().unwrap().clone()
}
pub fn running(&self) -> bool {
self.instance_alive
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_events(&self) -> Vec<(DateTime<Local>, GlobalCtxEvent)> {
let events = self.data.events.read().unwrap();
events.iter().cloned().collect()
}
pub fn get_node_info(&self) -> MyNodeInfo {
self.data.node_info.read().unwrap().clone()
}
pub fn get_routes(&self) -> Vec<Route> {
self.data.routes.read().unwrap().clone()
}
pub fn get_peers(&self) -> Vec<PeerInfo> {
self.data.peers.read().unwrap().clone()
}
pub fn running_cfg(&self) -> String {
self.running_cfg.clone()
}
}
impl Drop for EasyTierLauncher {
fn drop(&mut self) {
self.stop_flag
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Some(handle) = self.thread_handle.take() {
if let Err(e) = handle.join() {
println!("Error when joining thread: {:?}", e);
}
}
}
}

1095
easytier-gui/src/main.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,57 @@
#[derive(Default)]
pub struct TextListOption {
pub hint: String,
}
pub fn text_list_ui(
ui: &mut egui::Ui,
id: &str,
texts: &mut Vec<String>,
option: Option<TextListOption>,
) {
let option = option.unwrap_or_default();
// convert text vec to (index, text) vec
let mut add_new_item = false;
let mut remove_idxs = vec![];
egui::Grid::new(id).max_col_width(200.0).show(ui, |ui| {
for i in 0..texts.len() {
egui::TextEdit::singleline(&mut texts[i])
.hint_text(&option.hint)
.show(ui);
ui.horizontal(|ui| {
if ui.button("").clicked() {
remove_idxs.push(i);
}
if i == texts.len() - 1 {
if ui.button("").clicked() {
add_new_item = true;
}
}
});
ui.end_row();
}
if texts.len() == 0 {
if ui.button("").clicked() {
add_new_item = true;
}
ui.end_row();
}
});
let new_texts = texts
.iter()
.enumerate()
.filter(|(i, _)| !remove_idxs.contains(i))
.map(|(_, t)| t.clone())
.collect::<Vec<String>>();
*texts = new_texts;
if add_new_item && texts.last().map(|t| !t.is_empty()).unwrap_or(true) {
texts.push("".to_string());
}
}

View File

@ -0,0 +1,107 @@
//! Source code example of how to create your own widget.
//! This is meant to be read as a tutorial, hence the plethora of comments.
/// iOS-style toggle switch:
///
/// ``` text
/// _____________
/// / /.....\
/// | |.......|
/// \_______\_____/
/// ```
///
/// ## Example:
/// ``` ignore
/// toggle_ui(ui, &mut my_bool);
/// ```
pub fn toggle_ui(ui: &mut egui::Ui, on: &mut bool) -> egui::Response {
// Widget code can be broken up in four steps:
// 1. Decide a size for the widget
// 2. Allocate space for it
// 3. Handle interactions with the widget (if any)
// 4. Paint the widget
// 1. Deciding widget size:
// You can query the `ui` how much space is available,
// but in this example we have a fixed size widget based on the height of a standard button:
let desired_size = ui.spacing().interact_size.y * egui::vec2(2.0, 1.0);
// 2. Allocating space:
// This is where we get a region of the screen assigned.
// We also tell the Ui to sense clicks in the allocated region.
let (rect, mut response) = ui.allocate_exact_size(desired_size, egui::Sense::click());
// 3. Interact: Time to check for clicks!
if response.clicked() {
*on = !*on;
response.mark_changed(); // report back that the value changed
}
// Attach some meta-data to the response which can be used by screen readers:
response.widget_info(|| egui::WidgetInfo::selected(egui::WidgetType::Checkbox, *on, ""));
// 4. Paint!
// Make sure we need to paint:
if ui.is_rect_visible(rect) {
// Let's ask for a simple animation from egui.
// egui keeps track of changes in the boolean associated with the id and
// returns an animated value in the 0-1 range for how much "on" we are.
let how_on = ui.ctx().animate_bool(response.id, *on);
// We will follow the current style by asking
// "how should something that is being interacted with be painted?".
// This will, for instance, give us different colors when the widget is hovered or clicked.
let visuals = ui.style().interact_selectable(&response, *on);
// All coordinates are in absolute screen coordinates so we use `rect` to place the elements.
let rect = rect.expand(visuals.expansion);
let radius = 0.5 * rect.height();
ui.painter()
.rect(rect, radius, visuals.bg_fill, visuals.bg_stroke);
// Paint the circle, animating it from left to right with `how_on`:
let circle_x = egui::lerp((rect.left() + radius)..=(rect.right() - radius), how_on);
let center = egui::pos2(circle_x, rect.center().y);
ui.painter()
.circle(center, 0.75 * radius, visuals.bg_fill, visuals.fg_stroke);
}
// All done! Return the interaction response so the user can check what happened
// (hovered, clicked, ...) and maybe show a tooltip:
response
}
/// Here is the same code again, but a bit more compact:
#[allow(dead_code)]
fn toggle_ui_compact(ui: &mut egui::Ui, on: &mut bool) -> egui::Response {
let desired_size = ui.spacing().interact_size.y * egui::vec2(2.0, 1.0);
let (rect, mut response) = ui.allocate_exact_size(desired_size, egui::Sense::click());
if response.clicked() {
*on = !*on;
response.mark_changed();
}
response.widget_info(|| egui::WidgetInfo::selected(egui::WidgetType::Checkbox, *on, ""));
if ui.is_rect_visible(rect) {
let how_on = ui.ctx().animate_bool(response.id, *on);
let visuals = ui.style().interact_selectable(&response, *on);
let rect = rect.expand(visuals.expansion);
let radius = 0.5 * rect.height();
ui.painter()
.rect(rect, radius, visuals.bg_fill, visuals.bg_stroke);
let circle_x = egui::lerp((rect.left() + radius)..=(rect.right() - radius), how_on);
let center = egui::pos2(circle_x, rect.center().y);
ui.painter()
.circle(center, 0.75 * radius, visuals.bg_fill, visuals.fg_stroke);
}
response
}
// A wrapper that allows the more idiomatic usage pattern: `ui.add(toggle(&mut my_bool))`
/// iOS-style toggle switch.
///
/// ## Example:
/// ``` ignore
/// ui.add(toggle(&mut my_bool));
/// ```
pub fn toggle(on: &mut bool) -> impl egui::Widget + '_ {
move |ui: &mut egui::Ui| toggle_ui(ui, on)
}

View File

@ -23,6 +23,11 @@ name = "easytier-cli"
path = "src/easytier-cli.rs"
test = false
[lib]
name = "easytier"
path = "src/lib.rs"
test = false
[dependencies]
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = [

1
easytier/LICENSE Symbolic link
View File

@ -0,0 +1 @@
../LICENSE

View File

@ -4,11 +4,13 @@ use std::{net::SocketAddr, vec};
use clap::{command, Args, Parser, Subcommand};
use rpc::vpn_portal_rpc_client::VpnPortalRpcClient;
use utils::{list_peer_route_pair, PeerRoutePair};
mod arch;
mod common;
mod rpc;
mod tunnels;
mod utils;
use crate::{
common::stun::{StunInfoCollector, UdpNatTypeDetector},
@ -17,6 +19,7 @@ use crate::{
peer_center_rpc_client::PeerCenterRpcClient, peer_manage_rpc_client::PeerManageRpcClient,
*,
},
utils::{cost_to_str, float_to_str},
};
use humansize::format_size;
use tabled::settings::Style;
@ -94,107 +97,6 @@ enum Error {
TonicRpcError(#[from] tonic::Status),
}
#[derive(Debug)]
struct PeerRoutePair {
route: Route,
peer: Option<PeerInfo>,
}
impl PeerRoutePair {
fn get_latency_ms(&self) -> Option<f64> {
let mut ret = u64::MAX;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
ret = ret.min(stats.latency_us);
}
if ret == u64::MAX {
None
} else {
Some(f64::from(ret as u32) / 1000.0)
}
}
fn get_rx_bytes(&self) -> Option<u64> {
let mut ret = 0;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
ret += stats.rx_bytes;
}
if ret == 0 {
None
} else {
Some(ret)
}
}
fn get_tx_bytes(&self) -> Option<u64> {
let mut ret = 0;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
ret += stats.tx_bytes;
}
if ret == 0 {
None
} else {
Some(ret)
}
}
fn get_loss_rate(&self) -> Option<f64> {
let mut ret = 0.0;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
ret += conn.loss_rate;
}
if ret == 0.0 {
None
} else {
Some(ret as f64)
}
}
fn get_conn_protos(&self) -> Option<Vec<String>> {
let mut ret = vec![];
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(tunnel_info) = &conn.tunnel else {
continue;
};
// insert if not exists
if !ret.contains(&tunnel_info.tunnel_type) {
ret.push(tunnel_info.tunnel_type.clone());
}
}
if ret.is_empty() {
None
} else {
Some(ret)
}
}
fn get_udp_nat_type(self: &Self) -> String {
let mut ret = NatType::Unknown;
if let Some(r) = &self.route.stun_info {
ret = NatType::try_from(r.udp_nat_type).unwrap();
}
format!("{:?}", ret)
}
}
struct CommandHandler {
addr: String,
}
@ -239,19 +141,9 @@ impl CommandHandler {
}
async fn list_peer_route_pair(&self) -> Result<Vec<PeerRoutePair>, Error> {
let mut peers = self.list_peers().await?.peer_infos;
let mut routes = self.list_routes().await?.routes;
let mut pairs: Vec<PeerRoutePair> = vec![];
for route in routes.iter_mut() {
let peer = peers.iter_mut().find(|peer| peer.peer_id == route.peer_id);
pairs.push(PeerRoutePair {
route: route.clone(),
peer: peer.cloned(),
});
}
Ok(pairs)
let peers = self.list_peers().await?.peer_infos;
let routes = self.list_routes().await?.routes;
Ok(list_peer_route_pair(peers, routes))
}
#[allow(dead_code)]
@ -279,18 +171,6 @@ impl CommandHandler {
id: String,
}
fn cost_to_str(cost: i32) -> String {
if cost == 1 {
"p2p".to_string()
} else {
format!("relay({})", cost)
}
}
fn float_to_str(f: f64, precision: usize) -> String {
format!("{:.1$}", f, precision)
}
impl From<PeerRoutePair> for PeerTableItem {
fn from(p: PeerRoutePair) -> Self {
PeerTableItem {

View File

@ -35,6 +35,35 @@ use tokio_stream::wrappers::ReceiverStream;
use super::listeners::ListenerManager;
use super::virtual_nic;
#[derive(Clone)]
struct IpProxy {
tcp_proxy: Arc<TcpProxy>,
icmp_proxy: Arc<IcmpProxy>,
udp_proxy: Arc<UdpProxy>,
}
impl IpProxy {
fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Result<Self, Error> {
let tcp_proxy = TcpProxy::new(global_ctx.clone(), peer_manager.clone());
let icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create icmp proxy failed")?;
let udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create udp proxy failed")?;
Ok(IpProxy {
tcp_proxy,
icmp_proxy,
udp_proxy,
})
}
async fn start(&self) -> Result<(), Error> {
self.tcp_proxy.start().await?;
self.icmp_proxy.start().await?;
self.udp_proxy.start().await?;
Ok(())
}
}
pub struct Instance {
inst_name: String,
@ -51,9 +80,7 @@ pub struct Instance {
direct_conn_manager: Arc<DirectConnectorManager>,
udp_hole_puncher: Arc<Mutex<UdpHolePunchConnector>>,
tcp_proxy: Arc<TcpProxy>,
icmp_proxy: Arc<IcmpProxy>,
udp_proxy: Arc<UdpProxy>,
ip_proxy: Option<IpProxy>,
peer_center: Arc<PeerCenterInstance>,
@ -97,14 +124,6 @@ impl Instance {
let udp_hole_puncher = UdpHolePunchConnector::new(global_ctx.clone(), peer_manager.clone());
let arc_tcp_proxy = TcpProxy::new(global_ctx.clone(), peer_manager.clone());
let arc_icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create icmp proxy failed")
.unwrap();
let arc_udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create udp proxy failed")
.unwrap();
let peer_center = Arc::new(PeerCenterInstance::new(peer_manager.clone()));
let vpn_portal_inst = vpn_portal::wireguard::WireGuard::default();
@ -123,9 +142,7 @@ impl Instance {
direct_conn_manager: Arc::new(direct_conn_manager),
udp_hole_puncher: Arc::new(Mutex::new(udp_hole_puncher)),
tcp_proxy: arc_tcp_proxy,
icmp_proxy: arc_icmp_proxy,
udp_proxy: arc_udp_proxy,
ip_proxy: None,
peer_center,
@ -269,9 +286,12 @@ impl Instance {
self.run_rpc_server().unwrap();
self.tcp_proxy.start().await.unwrap();
self.icmp_proxy.start().await.unwrap();
self.udp_proxy.start().await.unwrap();
self.ip_proxy = Some(IpProxy::new(
self.get_global_ctx(),
self.get_peer_manager(),
)?);
self.ip_proxy.as_ref().unwrap().start().await?;
self.run_proxy_cidrs_route_updater();
self.udp_hole_puncher.lock().await.run().await?;
@ -478,4 +498,8 @@ impl Instance {
pub fn get_global_ctx(&self) -> ArcGlobalCtx {
self.global_ctx.clone()
}
pub fn get_vpn_portal_inst(&self) -> Arc<Mutex<Box<dyn VpnPortal>>> {
self.vpn_portal.clone()
}
}

13
easytier/src/lib.rs Normal file
View File

@ -0,0 +1,13 @@
#![allow(dead_code)]
pub mod arch;
pub mod common;
pub mod connector;
pub mod gateway;
pub mod instance;
pub mod peer_center;
pub mod peers;
pub mod rpc;
pub mod tunnels;
pub mod utils;
pub mod vpn_portal;

View File

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize, Default)]
pub struct GetIpListResponse {
pub public_ipv4: String,
pub interface_ipv4s: Vec<String>,

View File

@ -6,7 +6,7 @@ use std::{
};
use async_stream::stream;
use futures::{Future, FutureExt, Sink, SinkExt, Stream, StreamExt};
use futures::{stream::FuturesUnordered, Future, FutureExt, Sink, SinkExt, Stream, StreamExt};
use network_interface::NetworkInterfaceConfig;
use tokio::{sync::Mutex, time::error::Elapsed};
@ -319,6 +319,29 @@ pub(crate) fn setup_sokcet2_ext(
Ok(())
}
pub(crate) async fn wait_for_connect_futures<Fut, Ret, E>(
mut futures: FuturesUnordered<Fut>,
) -> Result<Ret, super::TunnelError>
where
Fut: Future<Output = Result<Ret, E>> + Send + Sync,
E: std::error::Error + Into<super::TunnelError> + Send + Sync + 'static,
{
// return last error
let mut last_err = None;
while let Some(ret) = futures.next().await {
if let Err(e) = ret {
last_err = Some(e.into());
} else {
return ret.map_err(|e| e.into());
}
}
Err(last_err.unwrap_or(super::TunnelError::CommonError(
"no connect futures".to_string(),
)))
}
pub(crate) fn setup_sokcet2(
socket2_socket: &socket2::Socket,
bind_addr: &SocketAddr,

View File

@ -22,7 +22,7 @@ pub enum TunnelError {
CommonError(String),
#[error("io error")]
IOError(#[from] std::io::Error),
#[error("wait resp error")]
#[error("wait resp error {0}")]
WaitRespError(String),
#[error("Connect Error: {0}")]
ConnectError(String),

View File

@ -1,14 +1,16 @@
use std::net::SocketAddr;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::stream::FuturesUnordered;
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::tunnels::common::setup_sokcet2;
use super::{
check_scheme_and_get_socket_addr, common::FramedTunnel, Tunnel, TunnelInfo, TunnelListener,
check_scheme_and_get_socket_addr,
common::{wait_for_connect_futures, FramedTunnel},
Tunnel, TunnelInfo, TunnelListener,
};
#[derive(Debug)]
@ -115,7 +117,7 @@ impl TcpTunnelConnector {
}
async fn connect_with_custom_bind(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
let mut futures = FuturesUnordered::new();
let futures = FuturesUnordered::new();
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&self.addr, "tcp")?;
for bind_addr in self.bind_addrs.iter() {
@ -132,12 +134,7 @@ impl TcpTunnelConnector {
futures.push(socket.connect(dst_addr.clone()));
}
let Some(ret) = futures.next().await else {
return Err(super::TunnelError::CommonError(
"join connect futures failed".to_owned(),
));
};
let ret = wait_for_connect_futures(futures).await;
return get_tunnel_with_tcp_stream(ret?, self.addr.clone().into());
}
}
@ -162,7 +159,7 @@ impl super::TunnelConnector for TcpTunnelConnector {
#[cfg(test)]
mod tests {
use futures::SinkExt;
use futures::{SinkExt, StreamExt};
use crate::tunnels::{
common::tests::{_tunnel_bench, _tunnel_pingpong},

View File

@ -23,7 +23,10 @@ use crate::{
use super::{
codec::BytesCodec,
common::{setup_sokcet2, setup_sokcet2_ext, FramedTunnel, TunnelWithCustomInfo},
common::{
setup_sokcet2, setup_sokcet2_ext, wait_for_connect_futures, FramedTunnel,
TunnelWithCustomInfo,
},
ring_tunnel::create_ring_tunnel_pair,
DatagramSink, DatagramStream, Tunnel, TunnelListener, TunnelUrl,
};
@ -555,7 +558,7 @@ impl UdpTunnelConnector {
}
async fn connect_with_custom_bind(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
let mut futures = FuturesUnordered::new();
let futures = FuturesUnordered::new();
for bind_addr in self.bind_addrs.iter() {
let socket2_socket = socket2::Socket::new(
@ -567,14 +570,7 @@ impl UdpTunnelConnector {
let socket = UdpSocket::from_std(socket2_socket.into())?;
futures.push(self.try_connect_with_socket(socket));
}
let Some(ret) = futures.next().await else {
return Err(super::TunnelError::CommonError(
"join connect futures failed".to_owned(),
));
};
return ret;
wait_for_connect_futures(futures).await
}
}

View File

@ -27,7 +27,7 @@ use crate::{
use super::{
check_scheme_and_get_socket_addr,
common::{setup_sokcet2, setup_sokcet2_ext},
common::{setup_sokcet2, setup_sokcet2_ext, wait_for_connect_futures},
ring_tunnel::create_ring_tunnel_pair,
DatagramSink, DatagramStream, Tunnel, TunnelError, TunnelListener, TunnelUrl,
};
@ -689,7 +689,7 @@ impl super::TunnelConnector for WgTunnelConnector {
} else {
self.bind_addrs.clone()
};
let mut futures = FuturesUnordered::new();
let futures = FuturesUnordered::new();
for bind_addr in bind_addrs.into_iter() {
let socket2_socket = socket2::Socket::new(
@ -707,13 +707,7 @@ impl super::TunnelConnector for WgTunnelConnector {
));
}
let Some(ret) = futures.next().await else {
return Err(super::TunnelError::CommonError(
"join connect futures failed".to_owned(),
));
};
return ret;
wait_for_connect_futures(futures).await
}
fn remote_url(&self) -> url::Url {

128
easytier/src/utils.rs Normal file
View File

@ -0,0 +1,128 @@
use crate::rpc::cli::{NatType, PeerInfo, Route};
#[derive(Debug)]
pub struct PeerRoutePair {
pub route: Route,
pub peer: Option<PeerInfo>,
}
impl PeerRoutePair {
pub fn get_latency_ms(&self) -> Option<f64> {
let mut ret = u64::MAX;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
ret = ret.min(stats.latency_us);
}
if ret == u64::MAX {
None
} else {
Some(f64::from(ret as u32) / 1000.0)
}
}
pub fn get_rx_bytes(&self) -> Option<u64> {
let mut ret = 0;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
ret += stats.rx_bytes;
}
if ret == 0 {
None
} else {
Some(ret)
}
}
pub fn get_tx_bytes(&self) -> Option<u64> {
let mut ret = 0;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
ret += stats.tx_bytes;
}
if ret == 0 {
None
} else {
Some(ret)
}
}
pub fn get_loss_rate(&self) -> Option<f64> {
let mut ret = 0.0;
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
ret += conn.loss_rate;
}
if ret == 0.0 {
None
} else {
Some(ret as f64)
}
}
pub fn get_conn_protos(&self) -> Option<Vec<String>> {
let mut ret = vec![];
let p = self.peer.as_ref()?;
for conn in p.conns.iter() {
let Some(tunnel_info) = &conn.tunnel else {
continue;
};
// insert if not exists
if !ret.contains(&tunnel_info.tunnel_type) {
ret.push(tunnel_info.tunnel_type.clone());
}
}
if ret.is_empty() {
None
} else {
Some(ret)
}
}
pub fn get_udp_nat_type(self: &Self) -> String {
let mut ret = NatType::Unknown;
if let Some(r) = &self.route.stun_info {
ret = NatType::try_from(r.udp_nat_type).unwrap();
}
format!("{:?}", ret)
}
}
pub fn list_peer_route_pair(peers: Vec<PeerInfo>, routes: Vec<Route>) -> Vec<PeerRoutePair> {
let mut pairs: Vec<PeerRoutePair> = vec![];
for route in routes.iter() {
let peer = peers.iter().find(|peer| peer.peer_id == route.peer_id);
pairs.push(PeerRoutePair {
route: route.clone(),
peer: peer.cloned(),
});
}
pairs
}
pub fn cost_to_str(cost: i32) -> String {
if cost == 1 {
"p2p".to_string()
} else {
format!("relay({})", cost)
}
}
pub fn float_to_str(f: f64, precision: usize) -> String {
format!("{:.1$}", f, precision)
}