mirror of
https://github.com/EasyTier/EasyTier.git
synced 2024-11-16 03:32:43 +08:00
fix connection loss when traffic is huge
Some checks are pending
EasyTier Core / pre_job (push) Waiting to run
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-latest, x86_64-unknown-freebsd) (push) Blocked by required conditions
EasyTier Core / build (linux-aarch64, ubuntu-latest, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-arm, ubuntu-latest, arm-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armhf, ubuntu-latest, arm-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7, ubuntu-latest, armv7-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7hf, ubuntu-latest, armv7-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-mips, ubuntu-latest, mips-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-mipsel, ubuntu-latest, mipsel-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-x86_64, ubuntu-latest, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier Core / core-result (push) Blocked by required conditions
EasyTier GUI / pre_job (push) Waiting to run
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-latest, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-latest, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier GUI / gui-result (push) Blocked by required conditions
EasyTier Mobile / pre_job (push) Waiting to run
EasyTier Mobile / build-mobile (android, ubuntu-latest, android) (push) Blocked by required conditions
EasyTier Mobile / mobile-result (push) Blocked by required conditions
EasyTier Test / pre_job (push) Waiting to run
EasyTier Test / test (push) Blocked by required conditions
Some checks are pending
EasyTier Core / pre_job (push) Waiting to run
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-latest, x86_64-unknown-freebsd) (push) Blocked by required conditions
EasyTier Core / build (linux-aarch64, ubuntu-latest, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-arm, ubuntu-latest, arm-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armhf, ubuntu-latest, arm-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7, ubuntu-latest, armv7-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7hf, ubuntu-latest, armv7-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-mips, ubuntu-latest, mips-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-mipsel, ubuntu-latest, mipsel-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-x86_64, ubuntu-latest, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier Core / core-result (push) Blocked by required conditions
EasyTier GUI / pre_job (push) Waiting to run
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-latest, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-latest, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier GUI / gui-result (push) Blocked by required conditions
EasyTier Mobile / pre_job (push) Waiting to run
EasyTier Mobile / build-mobile (android, ubuntu-latest, android) (push) Blocked by required conditions
EasyTier Mobile / mobile-result (push) Blocked by required conditions
EasyTier Test / pre_job (push) Waiting to run
EasyTier Test / test (push) Blocked by required conditions
This commit is contained in:
parent
7b4a01e7fb
commit
2496cf51c3
|
@ -26,5 +26,6 @@ pub const UDP_HOLE_PUNCH_CONNECTOR_SERVICE_ID: u32 = 2;
|
||||||
pub const EASYTIER_VERSION: &str = git_version::git_version!(
|
pub const EASYTIER_VERSION: &str = git_version::git_version!(
|
||||||
args = ["--abbrev=8", "--always", "--dirty=~"],
|
args = ["--abbrev=8", "--always", "--dirty=~"],
|
||||||
prefix = concat!(env!("CARGO_PKG_VERSION"), "-"),
|
prefix = concat!(env!("CARGO_PKG_VERSION"), "-"),
|
||||||
suffix = ""
|
suffix = "",
|
||||||
|
fallback = env!("CARGO_PKG_VERSION")
|
||||||
);
|
);
|
||||||
|
|
|
@ -12,12 +12,10 @@ use pnet::packet::{
|
||||||
udp::{self, MutableUdpPacket},
|
udp::{self, MutableUdpPacket},
|
||||||
Packet,
|
Packet,
|
||||||
};
|
};
|
||||||
|
use tachyonix::{channel, Receiver, Sender, TrySendError};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
sync::{
|
sync::Mutex,
|
||||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
|
||||||
Mutex,
|
|
||||||
},
|
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
time::timeout,
|
time::timeout,
|
||||||
};
|
};
|
||||||
|
@ -85,7 +83,7 @@ impl UdpNatEntry {
|
||||||
|
|
||||||
async fn compose_ipv4_packet(
|
async fn compose_ipv4_packet(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
packet_sender: &mut UnboundedSender<ZCPacket>,
|
packet_sender: &mut Sender<ZCPacket>,
|
||||||
buf: &mut [u8],
|
buf: &mut [u8],
|
||||||
src_v4: &SocketAddrV4,
|
src_v4: &SocketAddrV4,
|
||||||
payload_len: usize,
|
payload_len: usize,
|
||||||
|
@ -122,11 +120,13 @@ impl UdpNatEntry {
|
||||||
p.fill_peer_manager_hdr(self.my_peer_id, self.src_peer_id, PacketType::Data as u8);
|
p.fill_peer_manager_hdr(self.my_peer_id, self.src_peer_id, PacketType::Data as u8);
|
||||||
p.mut_peer_manager_header().unwrap().set_no_proxy(true);
|
p.mut_peer_manager_header().unwrap().set_no_proxy(true);
|
||||||
|
|
||||||
if let Err(e) = packet_sender.send(p) {
|
match packet_sender.try_send(p) {
|
||||||
|
Err(TrySendError::Closed(e)) => {
|
||||||
tracing::error!("send icmp packet to peer failed: {:?}, may exiting..", e);
|
tracing::error!("send icmp packet to peer failed: {:?}, may exiting..", e);
|
||||||
return Err(Error::AnyhowError(e.into()));
|
Err(Error::Unknown)
|
||||||
|
}
|
||||||
|
_ => Ok(()),
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ impl UdpNatEntry {
|
||||||
|
|
||||||
async fn forward_task(
|
async fn forward_task(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut packet_sender: UnboundedSender<ZCPacket>,
|
mut packet_sender: Sender<ZCPacket>,
|
||||||
virtual_ipv4: Ipv4Addr,
|
virtual_ipv4: Ipv4Addr,
|
||||||
) {
|
) {
|
||||||
let mut buf = [0u8; 65536];
|
let mut buf = [0u8; 65536];
|
||||||
|
@ -182,7 +182,7 @@ impl UdpNatEntry {
|
||||||
&mut buf,
|
&mut buf,
|
||||||
&src_v4,
|
&src_v4,
|
||||||
len,
|
len,
|
||||||
1200,
|
1256,
|
||||||
ip_id,
|
ip_id,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
@ -213,8 +213,8 @@ pub struct UdpProxy {
|
||||||
|
|
||||||
nat_table: Arc<DashMap<UdpNatKey, Arc<UdpNatEntry>>>,
|
nat_table: Arc<DashMap<UdpNatKey, Arc<UdpNatEntry>>>,
|
||||||
|
|
||||||
sender: UnboundedSender<ZCPacket>,
|
sender: Sender<ZCPacket>,
|
||||||
receiver: Mutex<Option<UnboundedReceiver<ZCPacket>>>,
|
receiver: Mutex<Option<Receiver<ZCPacket>>>,
|
||||||
|
|
||||||
tasks: Mutex<JoinSet<()>>,
|
tasks: Mutex<JoinSet<()>>,
|
||||||
|
|
||||||
|
@ -350,7 +350,7 @@ impl UdpProxy {
|
||||||
peer_manager: Arc<PeerManager>,
|
peer_manager: Arc<PeerManager>,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
let cidr_set = CidrSet::new(global_ctx.clone());
|
let cidr_set = CidrSet::new(global_ctx.clone());
|
||||||
let (sender, receiver) = unbounded_channel();
|
let (sender, receiver) = channel(64);
|
||||||
let ret = Self {
|
let ret = Self {
|
||||||
global_ctx,
|
global_ctx,
|
||||||
peer_manager,
|
peer_manager,
|
||||||
|
@ -398,7 +398,7 @@ impl UdpProxy {
|
||||||
let mut receiver = self.receiver.lock().await.take().unwrap();
|
let mut receiver = self.receiver.lock().await.take().unwrap();
|
||||||
let peer_manager = self.peer_manager.clone();
|
let peer_manager = self.peer_manager.clone();
|
||||||
self.tasks.lock().await.spawn(async move {
|
self.tasks.lock().await.spawn(async move {
|
||||||
while let Some(msg) = receiver.recv().await {
|
while let Ok(msg) = receiver.recv().await {
|
||||||
let to_peer_id: PeerId = msg.peer_manager_header().unwrap().to_peer_id.get();
|
let to_peer_id: PeerId = msg.peer_manager_header().unwrap().to_peer_id.get();
|
||||||
tracing::trace!(?msg, ?to_peer_id, "udp nat packet response send");
|
tracing::trace!(?msg, ?to_peer_id, "udp nat packet response send");
|
||||||
let ret = peer_manager.send_msg(msg, to_peer_id).await;
|
let ret = peer_manager.send_msg(msg, to_peer_id).await;
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::{
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{SinkExt, StreamExt, TryFutureExt};
|
use futures::{StreamExt, TryFutureExt};
|
||||||
|
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ use tokio::{
|
||||||
time::{timeout, Duration},
|
time::{timeout, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio_util::sync::PollSender;
|
|
||||||
use tracing::Instrument;
|
use tracing::Instrument;
|
||||||
use zerocopy::AsBytes;
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
|
@ -246,7 +245,7 @@ impl PeerConn {
|
||||||
pub async fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) {
|
pub async fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) {
|
||||||
let mut stream = self.recv.lock().await.take().unwrap();
|
let mut stream = self.recv.lock().await.take().unwrap();
|
||||||
let sink = self.sink.clone();
|
let sink = self.sink.clone();
|
||||||
let mut sender = PollSender::new(packet_recv_chan.clone());
|
let sender = packet_recv_chan.clone();
|
||||||
let close_event_sender = self.close_event_sender.clone().unwrap();
|
let close_event_sender = self.close_event_sender.clone().unwrap();
|
||||||
let conn_id = self.conn_id;
|
let conn_id = self.conn_id;
|
||||||
let ctrl_sender = self.ctrl_resp_sender.clone();
|
let ctrl_sender = self.ctrl_resp_sender.clone();
|
||||||
|
@ -283,7 +282,9 @@ impl PeerConn {
|
||||||
tracing::error!(?e, "peer conn send ctrl resp error");
|
tracing::error!(?e, "peer conn send ctrl resp error");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if sender.send(zc_packet).await.is_err() {
|
if zc_packet.is_lossy() {
|
||||||
|
let _ = sender.try_send(zc_packet);
|
||||||
|
} else if sender.send(zc_packet).await.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -500,6 +500,12 @@ impl ZCPacket {
|
||||||
pub fn mut_inner(&mut self) -> &mut BytesMut {
|
pub fn mut_inner(&mut self) -> &mut BytesMut {
|
||||||
&mut self.inner
|
&mut self.inner
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_lossy(&self) -> bool {
|
||||||
|
self.peer_manager_header()
|
||||||
|
.and_then(|hdr| Some(hdr.packet_type == PacketType::Data as u8))
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -26,7 +26,8 @@ use super::{
|
||||||
StreamItem, Tunnel, TunnelConnector, TunnelError, TunnelInfo, TunnelListener,
|
StreamItem, Tunnel, TunnelConnector, TunnelError, TunnelInfo, TunnelListener,
|
||||||
};
|
};
|
||||||
|
|
||||||
static RING_TUNNEL_CAP: usize = 128;
|
static RING_TUNNEL_CAP: usize = 64;
|
||||||
|
static RING_TUNNEL_RESERVERD_CAP: usize = 4;
|
||||||
|
|
||||||
type RingLock = parking_lot::Mutex<()>;
|
type RingLock = parking_lot::Mutex<()>;
|
||||||
|
|
||||||
|
@ -46,7 +47,7 @@ impl RingTunnel {
|
||||||
|
|
||||||
pub fn new(cap: usize) -> Self {
|
pub fn new(cap: usize) -> Self {
|
||||||
let id = Uuid::new_v4();
|
let id = Uuid::new_v4();
|
||||||
let ring_impl = AsyncHeapRb::new(cap);
|
let ring_impl = AsyncHeapRb::new(std::cmp::max(RING_TUNNEL_RESERVERD_CAP * 2, cap));
|
||||||
let (ring_prod_impl, ring_cons_impl) = ring_impl.split();
|
let (ring_prod_impl, ring_cons_impl) = ring_impl.split();
|
||||||
Self {
|
Self {
|
||||||
id: id.clone(),
|
id: id.clone(),
|
||||||
|
@ -121,6 +122,14 @@ impl RingSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_send(&mut self, item: RingItem) -> Result<(), RingItem> {
|
pub fn try_send(&mut self, item: RingItem) -> Result<(), RingItem> {
|
||||||
|
let base = self.ring_prod_impl.base();
|
||||||
|
if base.occupied_len() >= base.capacity().get() - RING_TUNNEL_RESERVERD_CAP {
|
||||||
|
return Err(item);
|
||||||
|
}
|
||||||
|
self.ring_prod_impl.try_push(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn force_send(&mut self, item: RingItem) -> Result<(), RingItem> {
|
||||||
self.ring_prod_impl.try_push(item)
|
self.ring_prod_impl.try_push(item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,14 +10,14 @@ use std::net::SocketAddr;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
|
sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
|
||||||
task::{JoinHandle, JoinSet},
|
task::JoinSet,
|
||||||
};
|
};
|
||||||
|
|
||||||
use tracing::{instrument, Instrument};
|
use tracing::{instrument, Instrument};
|
||||||
|
|
||||||
use super::TunnelInfo;
|
use super::TunnelInfo;
|
||||||
use crate::{
|
use crate::{
|
||||||
common::join_joinset_background,
|
common::{join_joinset_background, scoped_task::ScopedTask},
|
||||||
tunnel::{
|
tunnel::{
|
||||||
build_url_from_socket_addr,
|
build_url_from_socket_addr,
|
||||||
common::{reserve_buf, TunnelWrapper},
|
common::{reserve_buf, TunnelWrapper},
|
||||||
|
@ -190,7 +190,7 @@ struct UdpConnection {
|
||||||
dst_addr: SocketAddr,
|
dst_addr: SocketAddr,
|
||||||
|
|
||||||
ring_sender: RingSink,
|
ring_sender: RingSink,
|
||||||
forward_task: JoinHandle<()>,
|
forward_task: ScopedTask<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UdpConnection {
|
impl UdpConnection {
|
||||||
|
@ -209,7 +209,8 @@ impl UdpConnection {
|
||||||
if let Err(e) = close_event_sender.send((dst_addr, err)) {
|
if let Err(e) = close_event_sender.send((dst_addr, err)) {
|
||||||
tracing::error!(?e, "udp send close event error");
|
tracing::error!(?e, "udp send close event error");
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
socket,
|
socket,
|
||||||
|
@ -232,20 +233,20 @@ impl UdpConnection {
|
||||||
return Err(TunnelError::ConnIdNotMatch(self.conn_id, conn_id));
|
return Err(TunnelError::ConnIdNotMatch(self.conn_id, conn_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if zc_packet.is_lossy() {
|
||||||
if let Err(e) = self.ring_sender.try_send(zc_packet) {
|
if let Err(e) = self.ring_sender.try_send(zc_packet) {
|
||||||
tracing::trace!(?e, "ring sender full, drop packet");
|
tracing::trace!(?e, "ring sender full, drop lossy packet");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if let Err(e) = self.ring_sender.force_send(zc_packet) {
|
||||||
|
tracing::trace!(?e, "ring sender full, drop non-lossy packet");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for UdpConnection {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.forward_task.abort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct UdpTunnelListenerData {
|
struct UdpTunnelListenerData {
|
||||||
local_url: url::Url,
|
local_url: url::Url,
|
||||||
|
@ -555,8 +556,8 @@ impl UdpTunnelConnector {
|
||||||
dst_addr: SocketAddr,
|
dst_addr: SocketAddr,
|
||||||
conn_id: u32,
|
conn_id: u32,
|
||||||
) -> Result<Box<dyn super::Tunnel>, super::TunnelError> {
|
) -> Result<Box<dyn super::Tunnel>, super::TunnelError> {
|
||||||
let ring_for_send_udp = Arc::new(RingTunnel::new(128));
|
let ring_for_send_udp = Arc::new(RingTunnel::new(32));
|
||||||
let ring_for_recv_udp = Arc::new(RingTunnel::new(128));
|
let ring_for_recv_udp = Arc::new(RingTunnel::new(32));
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
?ring_for_send_udp,
|
?ring_for_send_udp,
|
||||||
?ring_for_recv_udp,
|
?ring_for_recv_udp,
|
||||||
|
|
Loading…
Reference in New Issue
Block a user