From 2496cf51c3356212eec12da56d6dde28583f7786 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Thu, 26 Sep 2024 21:59:17 +0800 Subject: [PATCH] fix connection loss when traffic is huge --- easytier/src/common/constants.rs | 3 ++- easytier/src/gateway/udp_proxy.rs | 30 +++++++++++++++--------------- easytier/src/peers/peer_conn.rs | 9 +++++---- easytier/src/tunnel/packet_def.rs | 6 ++++++ easytier/src/tunnel/ring.rs | 13 +++++++++++-- easytier/src/tunnel/udp.rs | 29 +++++++++++++++-------------- 6 files changed, 54 insertions(+), 36 deletions(-) diff --git a/easytier/src/common/constants.rs b/easytier/src/common/constants.rs index eff5e0e..84b4173 100644 --- a/easytier/src/common/constants.rs +++ b/easytier/src/common/constants.rs @@ -26,5 +26,6 @@ pub const UDP_HOLE_PUNCH_CONNECTOR_SERVICE_ID: u32 = 2; pub const EASYTIER_VERSION: &str = git_version::git_version!( args = ["--abbrev=8", "--always", "--dirty=~"], prefix = concat!(env!("CARGO_PKG_VERSION"), "-"), - suffix = "" + suffix = "", + fallback = env!("CARGO_PKG_VERSION") ); diff --git a/easytier/src/gateway/udp_proxy.rs b/easytier/src/gateway/udp_proxy.rs index 4355f37..8307dcf 100644 --- a/easytier/src/gateway/udp_proxy.rs +++ b/easytier/src/gateway/udp_proxy.rs @@ -12,12 +12,10 @@ use pnet::packet::{ udp::{self, MutableUdpPacket}, Packet, }; +use tachyonix::{channel, Receiver, Sender, TrySendError}; use tokio::{ net::UdpSocket, - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex, - }, + sync::Mutex, task::{JoinHandle, JoinSet}, time::timeout, }; @@ -85,7 +83,7 @@ impl UdpNatEntry { async fn compose_ipv4_packet( self: &Arc, - packet_sender: &mut UnboundedSender, + packet_sender: &mut Sender, buf: &mut [u8], src_v4: &SocketAddrV4, payload_len: usize, @@ -122,11 +120,13 @@ impl UdpNatEntry { p.fill_peer_manager_hdr(self.my_peer_id, self.src_peer_id, PacketType::Data as u8); p.mut_peer_manager_header().unwrap().set_no_proxy(true); - if let Err(e) = packet_sender.send(p) { - tracing::error!("send icmp packet to peer failed: {:?}, may exiting..", e); - return Err(Error::AnyhowError(e.into())); + match packet_sender.try_send(p) { + Err(TrySendError::Closed(e)) => { + tracing::error!("send icmp packet to peer failed: {:?}, may exiting..", e); + Err(Error::Unknown) + } + _ => Ok(()), } - Ok(()) }, )?; @@ -135,7 +135,7 @@ impl UdpNatEntry { async fn forward_task( self: Arc, - mut packet_sender: UnboundedSender, + mut packet_sender: Sender, virtual_ipv4: Ipv4Addr, ) { let mut buf = [0u8; 65536]; @@ -182,7 +182,7 @@ impl UdpNatEntry { &mut buf, &src_v4, len, - 1200, + 1256, ip_id, ) .await @@ -213,8 +213,8 @@ pub struct UdpProxy { nat_table: Arc>>, - sender: UnboundedSender, - receiver: Mutex>>, + sender: Sender, + receiver: Mutex>>, tasks: Mutex>, @@ -350,7 +350,7 @@ impl UdpProxy { peer_manager: Arc, ) -> Result, Error> { let cidr_set = CidrSet::new(global_ctx.clone()); - let (sender, receiver) = unbounded_channel(); + let (sender, receiver) = channel(64); let ret = Self { global_ctx, peer_manager, @@ -398,7 +398,7 @@ impl UdpProxy { let mut receiver = self.receiver.lock().await.take().unwrap(); let peer_manager = self.peer_manager.clone(); self.tasks.lock().await.spawn(async move { - while let Some(msg) = receiver.recv().await { + while let Ok(msg) = receiver.recv().await { let to_peer_id: PeerId = msg.peer_manager_header().unwrap().to_peer_id.get(); tracing::trace!(?msg, ?to_peer_id, "udp nat packet response send"); let ret = peer_manager.send_msg(msg, to_peer_id).await; diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 0fa73cf..2cb8284 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -8,7 +8,7 @@ use std::{ }, }; -use futures::{SinkExt, StreamExt, TryFutureExt}; +use futures::{StreamExt, TryFutureExt}; use prost::Message; @@ -18,7 +18,6 @@ use tokio::{ time::{timeout, Duration}, }; -use tokio_util::sync::PollSender; use tracing::Instrument; use zerocopy::AsBytes; @@ -246,7 +245,7 @@ impl PeerConn { pub async fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) { let mut stream = self.recv.lock().await.take().unwrap(); let sink = self.sink.clone(); - let mut sender = PollSender::new(packet_recv_chan.clone()); + let sender = packet_recv_chan.clone(); let close_event_sender = self.close_event_sender.clone().unwrap(); let conn_id = self.conn_id; let ctrl_sender = self.ctrl_resp_sender.clone(); @@ -283,7 +282,9 @@ impl PeerConn { tracing::error!(?e, "peer conn send ctrl resp error"); } } else { - if sender.send(zc_packet).await.is_err() { + if zc_packet.is_lossy() { + let _ = sender.try_send(zc_packet); + } else if sender.send(zc_packet).await.is_err() { break; } } diff --git a/easytier/src/tunnel/packet_def.rs b/easytier/src/tunnel/packet_def.rs index 1886d8e..6ad9295 100644 --- a/easytier/src/tunnel/packet_def.rs +++ b/easytier/src/tunnel/packet_def.rs @@ -500,6 +500,12 @@ impl ZCPacket { pub fn mut_inner(&mut self) -> &mut BytesMut { &mut self.inner } + + pub fn is_lossy(&self) -> bool { + self.peer_manager_header() + .and_then(|hdr| Some(hdr.packet_type == PacketType::Data as u8)) + .unwrap_or(false) + } } #[cfg(test)] diff --git a/easytier/src/tunnel/ring.rs b/easytier/src/tunnel/ring.rs index 58a221d..c382c7d 100644 --- a/easytier/src/tunnel/ring.rs +++ b/easytier/src/tunnel/ring.rs @@ -26,7 +26,8 @@ use super::{ StreamItem, Tunnel, TunnelConnector, TunnelError, TunnelInfo, TunnelListener, }; -static RING_TUNNEL_CAP: usize = 128; +static RING_TUNNEL_CAP: usize = 64; +static RING_TUNNEL_RESERVERD_CAP: usize = 4; type RingLock = parking_lot::Mutex<()>; @@ -46,7 +47,7 @@ impl RingTunnel { pub fn new(cap: usize) -> Self { let id = Uuid::new_v4(); - let ring_impl = AsyncHeapRb::new(cap); + let ring_impl = AsyncHeapRb::new(std::cmp::max(RING_TUNNEL_RESERVERD_CAP * 2, cap)); let (ring_prod_impl, ring_cons_impl) = ring_impl.split(); Self { id: id.clone(), @@ -121,6 +122,14 @@ impl RingSink { } pub fn try_send(&mut self, item: RingItem) -> Result<(), RingItem> { + let base = self.ring_prod_impl.base(); + if base.occupied_len() >= base.capacity().get() - RING_TUNNEL_RESERVERD_CAP { + return Err(item); + } + self.ring_prod_impl.try_push(item) + } + + pub fn force_send(&mut self, item: RingItem) -> Result<(), RingItem> { self.ring_prod_impl.try_push(item) } } diff --git a/easytier/src/tunnel/udp.rs b/easytier/src/tunnel/udp.rs index 61fcd80..bde119e 100644 --- a/easytier/src/tunnel/udp.rs +++ b/easytier/src/tunnel/udp.rs @@ -10,14 +10,14 @@ use std::net::SocketAddr; use tokio::{ net::UdpSocket, sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, - task::{JoinHandle, JoinSet}, + task::JoinSet, }; use tracing::{instrument, Instrument}; use super::TunnelInfo; use crate::{ - common::join_joinset_background, + common::{join_joinset_background, scoped_task::ScopedTask}, tunnel::{ build_url_from_socket_addr, common::{reserve_buf, TunnelWrapper}, @@ -190,7 +190,7 @@ struct UdpConnection { dst_addr: SocketAddr, ring_sender: RingSink, - forward_task: JoinHandle<()>, + forward_task: ScopedTask<()>, } impl UdpConnection { @@ -209,7 +209,8 @@ impl UdpConnection { if let Err(e) = close_event_sender.send((dst_addr, err)) { tracing::error!(?e, "udp send close event error"); } - }); + }) + .into(); Self { socket, @@ -232,20 +233,20 @@ impl UdpConnection { return Err(TunnelError::ConnIdNotMatch(self.conn_id, conn_id)); } - if let Err(e) = self.ring_sender.try_send(zc_packet) { - tracing::trace!(?e, "ring sender full, drop packet"); + if zc_packet.is_lossy() { + if let Err(e) = self.ring_sender.try_send(zc_packet) { + tracing::trace!(?e, "ring sender full, drop lossy packet"); + } + } else { + if let Err(e) = self.ring_sender.force_send(zc_packet) { + tracing::trace!(?e, "ring sender full, drop non-lossy packet"); + } } Ok(()) } } -impl Drop for UdpConnection { - fn drop(&mut self) { - self.forward_task.abort(); - } -} - #[derive(Clone)] struct UdpTunnelListenerData { local_url: url::Url, @@ -555,8 +556,8 @@ impl UdpTunnelConnector { dst_addr: SocketAddr, conn_id: u32, ) -> Result, super::TunnelError> { - let ring_for_send_udp = Arc::new(RingTunnel::new(128)); - let ring_for_recv_udp = Arc::new(RingTunnel::new(128)); + let ring_for_send_udp = Arc::new(RingTunnel::new(32)); + let ring_for_recv_udp = Arc::new(RingTunnel::new(32)); tracing::debug!( ?ring_for_send_udp, ?ring_for_recv_udp,