From d87a440c0459ba5f373738aed47bcaf2fae2cbe3 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sun, 13 Oct 2024 11:59:16 +0800 Subject: [PATCH] fix 202 bugs (#418) * fix peer rpc stop working because of mpsc tunnel close unexpectedly * fix gui: 1. allow set network prefix for virtual ipv4 2. fix android crash 3. fix subnet proxy cannot be set on android --- easytier-gui/src-tauri/src/lib.rs | 13 +++- easytier-gui/src/components/Config.vue | 24 +++++-- easytier-gui/src/components/Status.vue | 4 ++ easytier-gui/src/composables/mobile_vpn.ts | 2 +- easytier-gui/src/types/network.ts | 2 + easytier/src/peers/peer_conn.rs | 2 +- easytier/src/peers/peer_ospf_route.rs | 4 +- easytier/src/proto/rpc_impl/client.rs | 4 +- easytier/src/proto/rpc_impl/server.rs | 4 +- easytier/src/proto/tests.rs | 77 ++++++++++++++++++++++ easytier/src/tunnel/mpsc.rs | 76 +++++++++++++++------ easytier/src/tunnel/ring.rs | 2 +- easytier/src/vpn_portal/wireguard.rs | 2 +- 13 files changed, 181 insertions(+), 35 deletions(-) diff --git a/easytier-gui/src-tauri/src/lib.rs b/easytier-gui/src-tauri/src/lib.rs index 2433dbe..7ef603a 100644 --- a/easytier-gui/src-tauri/src/lib.rs +++ b/easytier-gui/src-tauri/src/lib.rs @@ -41,6 +41,7 @@ struct NetworkConfig { dhcp: bool, virtual_ipv4: String, + network_length: i32, hostname: Option, network_name: String, network_secret: String, @@ -83,9 +84,15 @@ impl NetworkConfig { if !self.dhcp { if self.virtual_ipv4.len() > 0 { - cfg.set_ipv4(Some(self.virtual_ipv4.parse().with_context(|| { - format!("failed to parse ipv4 address: {}", self.virtual_ipv4) - })?)) + let ip = format!("{}/{}", self.virtual_ipv4, self.network_length) + .parse() + .with_context(|| { + format!( + "failed to parse ipv4 inet address: {}, {}", + self.virtual_ipv4, self.network_length + ) + })?; + cfg.set_ipv4(Some(ip)); } } diff --git a/easytier-gui/src/components/Config.vue b/easytier-gui/src/components/Config.vue index 2eeb54f..9593f95 100644 --- a/easytier-gui/src/components/Config.vue +++ b/easytier-gui/src/components/Config.vue @@ -85,6 +85,20 @@ function searchPeerSuggestions(e: { query: string }) { peerSuggestions.value = searchUrlSuggestions(e) } +const inetSuggestions = ref(['']) + +function searchInetSuggestions(e: { query: string }) { + if (e.query.search('/') >= 0) { + inetSuggestions.value = [e.query] + } else { + const ret = [] + for (let i = 0; i < 32; i++) { + ret.push(`${e.query}/${i}`) + } + inetSuggestions.value = ret + } +} + const listenerSuggestions = ref(['']) function searchListenerSuggestiong(e: { query: string }) { @@ -153,8 +167,9 @@ onMounted(async () => { aria-describedby="virtual_ipv4-help" /> - /24 + / + @@ -221,9 +236,10 @@ onMounted(async () => {
-
diff --git a/easytier-gui/src/components/Status.vue b/easytier-gui/src/components/Status.vue index 27799e0..b25d41b 100644 --- a/easytier-gui/src/components/Status.vue +++ b/easytier-gui/src/components/Status.vue @@ -214,6 +214,8 @@ const myNodeInfoChips = computed(() => { PortRestricted = 5, Symmetric = 6, SymUdpFirewall = 7, + SymmetricEasyInc = 8, + SymmetricEasyDec = 9, }; const udpNatType: NatType = my_node_info.stun_info?.udp_nat_type if (udpNatType !== undefined) { @@ -226,6 +228,8 @@ const myNodeInfoChips = computed(() => { [NatType.PortRestricted]: 'Port Restricted', [NatType.Symmetric]: 'Symmetric', [NatType.SymUdpFirewall]: 'Symmetric UDP Firewall', + [NatType.SymmetricEasyInc]: 'Symmetric Easy Inc', + [NatType.SymmetricEasyDec]: 'Symmetric Easy Dec', } chips.push({ diff --git a/easytier-gui/src/composables/mobile_vpn.ts b/easytier-gui/src/composables/mobile_vpn.ts index 318fe4e..c504d78 100644 --- a/easytier-gui/src/composables/mobile_vpn.ts +++ b/easytier-gui/src/composables/mobile_vpn.ts @@ -48,7 +48,7 @@ async function doStartVpn(ipv4Addr: string, cidr: number, routes: string[]) { console.log('start vpn') const start_ret = await start_vpn({ - ipv4Addr: `${ipv4Addr}/${cidr}`, + ipv4Addr: `${ipv4Addr}`, routes, disallowedApplications: ['com.kkrainbow.easytier'], mtu: 1300, diff --git a/easytier-gui/src/types/network.ts b/easytier-gui/src/types/network.ts index 8422328..0c7f6e1 100644 --- a/easytier-gui/src/types/network.ts +++ b/easytier-gui/src/types/network.ts @@ -11,6 +11,7 @@ export interface NetworkConfig { dhcp: boolean virtual_ipv4: string + network_length: number, hostname?: string network_name: string network_secret: string @@ -42,6 +43,7 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig { dhcp: true, virtual_ipv4: '', + network_length: 24, network_name: 'easytier', network_secret: '', diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index b1c1962..d5ad8ba 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -93,7 +93,7 @@ impl PeerConn { let peer_conn_tunnel_filter = StatsRecorderTunnelFilter::new(); let throughput = peer_conn_tunnel_filter.filter_output(); let peer_conn_tunnel = TunnelWithFilter::new(tunnel, peer_conn_tunnel_filter); - let mut mpsc_tunnel = MpscTunnel::new(peer_conn_tunnel); + let mut mpsc_tunnel = MpscTunnel::new(peer_conn_tunnel, Some(Duration::from_secs(7))); let (recv, sink) = (mpsc_tunnel.get_stream(), mpsc_tunnel.get_sink()); diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index f9cb74c..e4f945b 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -1387,7 +1387,9 @@ impl PeerRouteServiceImpl { if resp.error.is_some() { let err = resp.error.unwrap(); if err == Error::DuplicatePeerId as i32 { - panic!("duplicate peer id"); + if !self.global_ctx.get_feature_flags().is_public_server { + panic!("duplicate peer id"); + } } else { tracing::error!(?ret, ?my_peer_id, ?dst_peer_id, "sync_route_info failed"); session diff --git a/easytier/src/proto/rpc_impl/client.rs b/easytier/src/proto/rpc_impl/client.rs index 5838339..e8e161c 100644 --- a/easytier/src/proto/rpc_impl/client.rs +++ b/easytier/src/proto/rpc_impl/client.rs @@ -61,8 +61,8 @@ impl Client { pub fn new() -> Self { let (ring_a, ring_b) = create_ring_tunnel_pair(); Self { - mpsc: Mutex::new(MpscTunnel::new(ring_a)), - transport: Mutex::new(MpscTunnel::new(ring_b)), + mpsc: Mutex::new(MpscTunnel::new(ring_a, None)), + transport: Mutex::new(MpscTunnel::new(ring_b, None)), inflight_requests: Arc::new(DashMap::new()), tasks: Arc::new(Mutex::new(JoinSet::new())), } diff --git a/easytier/src/proto/rpc_impl/server.rs b/easytier/src/proto/rpc_impl/server.rs index 372cd2a..bf14a7b 100644 --- a/easytier/src/proto/rpc_impl/server.rs +++ b/easytier/src/proto/rpc_impl/server.rs @@ -56,8 +56,8 @@ impl Server { Self { registry, - mpsc: Mutex::new(Some(MpscTunnel::new(ring_a))), - transport: Mutex::new(MpscTunnel::new(ring_b)), + mpsc: Mutex::new(Some(MpscTunnel::new(ring_a, None))), + transport: Mutex::new(MpscTunnel::new(ring_b, None)), tasks: Arc::new(Mutex::new(JoinSet::new())), packet_mergers: Arc::new(DashMap::new()), } diff --git a/easytier/src/proto/tests.rs b/easytier/src/proto/tests.rs index 7fb978b..4f62de6 100644 --- a/easytier/src/proto/tests.rs +++ b/easytier/src/proto/tests.rs @@ -175,6 +175,83 @@ async fn rpc_timeout_test() { assert_eq!(0, ctx.server.inflight_count()); } +#[tokio::test] +async fn rpc_tunnel_stuck_test() { + use crate::proto::rpc_types; + use crate::tunnel::ring::RING_TUNNEL_CAP; + + let rpc_server = Server::new(); + rpc_server.run(); + let server = GreetingServer::new(GreetingService { + delay_ms: 0, + prefix: "Hello".to_string(), + }); + rpc_server.registry().register(server, "test"); + + let client = Client::new(); + client.run(); + + let rpc_tasks = Arc::new(Mutex::new(JoinSet::new())); + let (mut rx, tx) = ( + rpc_server.get_transport_stream(), + client.get_transport_sink(), + ); + + rpc_tasks.lock().unwrap().spawn(async move { + while let Some(Ok(packet)) = rx.next().await { + if let Err(err) = tx.send(packet).await { + println!("{:?}", err); + break; + } + } + }); + + // mock server is stuck (no task to do forwards) + + let mut tasks = JoinSet::new(); + for _ in 0..RING_TUNNEL_CAP + 15 { + let out = + client.scoped_client::>(1, 1, "test".to_string()); + tasks.spawn(async move { + let mut ctrl = RpcController::default(); + ctrl.timeout_ms = 1000; + + let input = SayHelloRequest { + name: "world".to_string(), + }; + + out.say_hello(ctrl, input).await + }); + } + while let Some(ret) = tasks.join_next().await { + assert!(matches!(ret, Ok(Err(rpc_types::error::Error::Timeout(_))))); + } + + // start server consumer, new requests should be processed + let (mut rx, tx) = ( + client.get_transport_stream(), + rpc_server.get_transport_sink(), + ); + rpc_tasks.lock().unwrap().spawn(async move { + while let Some(Ok(packet)) = rx.next().await { + if let Err(err) = tx.send(packet).await { + println!("{:?}", err); + break; + } + } + }); + + let out = + client.scoped_client::>(1, 1, "test".to_string()); + let mut ctrl = RpcController::default(); + ctrl.timeout_ms = 1000; + let input = SayHelloRequest { + name: "fuck world".to_string(), + }; + let ret = out.say_hello(ctrl, input).await.unwrap(); + assert_eq!(ret.greeting, "Hello fuck world!"); +} + #[tokio::test] async fn standalone_rpc_test() { use crate::proto::rpc_impl::standalone::{StandAloneClient, StandAloneServer}; diff --git a/easytier/src/tunnel/mpsc.rs b/easytier/src/tunnel/mpsc.rs index 927688c..bc39a1b 100644 --- a/easytier/src/tunnel/mpsc.rs +++ b/easytier/src/tunnel/mpsc.rs @@ -41,13 +41,13 @@ pub struct MpscTunnel { } impl MpscTunnel { - pub fn new(tunnel: T) -> Self { + pub fn new(tunnel: T, send_timeout: Option) -> Self { let (tx, mut rx) = channel(32); let (stream, mut sink) = tunnel.split(); let task = tokio::spawn(async move { loop { - if let Err(e) = Self::forward_one_round(&mut rx, &mut sink).await { + if let Err(e) = Self::forward_one_round(&mut rx, &mut sink, send_timeout).await { tracing::error!(?e, "forward error"); break; } @@ -68,21 +68,44 @@ impl MpscTunnel { async fn forward_one_round( rx: &mut Receiver, sink: &mut Pin>, + send_timeout_ms: Option, ) -> Result<(), TunnelError> { let item = rx.recv().await.with_context(|| "recv error")?; + if let Some(timeout_ms) = send_timeout_ms { + Self::forward_one_round_with_timeout(rx, sink, item, timeout_ms).await + } else { + Self::forward_one_round_no_timeout(rx, sink, item).await + } + } - match timeout(Duration::from_secs(10), async move { - sink.feed(item).await?; - while let Ok(item) = rx.try_recv() { - match sink.feed(item).await { - Err(e) => { - tracing::error!(?e, "feed error"); - return Err(e); - } - Ok(_) => {} + async fn forward_one_round_no_timeout( + rx: &mut Receiver, + sink: &mut Pin>, + initial_item: ZCPacket, + ) -> Result<(), TunnelError> { + sink.feed(initial_item).await?; + + while let Ok(item) = rx.try_recv() { + match sink.feed(item).await { + Err(e) => { + tracing::error!(?e, "feed error"); + return Err(e); } + Ok(_) => {} } - sink.flush().await + } + + sink.flush().await + } + + async fn forward_one_round_with_timeout( + rx: &mut Receiver, + sink: &mut Pin>, + initial_item: ZCPacket, + timeout_ms: Duration, + ) -> Result<(), TunnelError> { + match timeout(timeout_ms, async move { + Self::forward_one_round_no_timeout(rx, sink, initial_item).await }) .await { @@ -112,17 +135,12 @@ impl MpscTunnel { } } -impl From for MpscTunnel { - fn from(tunnel: T) -> Self { - Self::new(tunnel) - } -} - #[cfg(test)] mod tests { use futures::StreamExt; use crate::tunnel::{ + ring::{create_ring_tunnel_pair, RING_TUNNEL_CAP}, tcp::{TcpTunnelConnector, TcpTunnelListener}, TunnelConnector, TunnelListener, }; @@ -162,7 +180,7 @@ mod tests { }); let tunnel = connector.connect().await.unwrap(); - let mpsc_tunnel = MpscTunnel::from(tunnel); + let mpsc_tunnel = MpscTunnel::new(tunnel, None); let sink1 = mpsc_tunnel.get_sink(); let t2 = tokio::spawn(async move { @@ -213,4 +231,24 @@ mod tests { let _ = tokio::join!(t1, t2, t3, t4); } + + #[tokio::test] + async fn mpsc_slow_receiver_with_send_timeout() { + let (a, _b) = create_ring_tunnel_pair(); + let mpsc_tunnel = MpscTunnel::new(a, Some(Duration::from_secs(1))); + let s = mpsc_tunnel.get_sink(); + for _ in 0..RING_TUNNEL_CAP { + s.send(ZCPacket::new_with_payload(&[0; 1024])) + .await + .unwrap(); + } + tokio::time::sleep(Duration::from_millis(1500)).await; + let e = s.send(ZCPacket::new_with_payload(&[0; 1024])).await; + assert!(e.is_ok()); + + tokio::time::sleep(Duration::from_millis(1500)).await; + + let e = s.send(ZCPacket::new_with_payload(&[0; 1024])).await; + assert!(e.is_err()); + } } diff --git a/easytier/src/tunnel/ring.rs b/easytier/src/tunnel/ring.rs index 7394932..60b5607 100644 --- a/easytier/src/tunnel/ring.rs +++ b/easytier/src/tunnel/ring.rs @@ -26,7 +26,7 @@ use super::{ StreamItem, Tunnel, TunnelConnector, TunnelError, TunnelInfo, TunnelListener, }; -static RING_TUNNEL_CAP: usize = 128; +pub static RING_TUNNEL_CAP: usize = 128; static RING_TUNNEL_RESERVERD_CAP: usize = 4; type RingLock = parking_lot::Mutex<()>; diff --git a/easytier/src/vpn_portal/wireguard.rs b/easytier/src/vpn_portal/wireguard.rs index d62ea4f..e695d8d 100644 --- a/easytier/src/vpn_portal/wireguard.rs +++ b/easytier/src/vpn_portal/wireguard.rs @@ -81,7 +81,7 @@ impl WireGuardImpl { wg_peer_ip_table: WgPeerIpTable, ) { let info = t.info().unwrap_or_default(); - let mut mpsc_tunnel = MpscTunnel::new(t); + let mut mpsc_tunnel = MpscTunnel::new(t, None); let mut stream = mpsc_tunnel.get_stream(); let mut ip_registered = false;