From 637a8b6ed5eb1e9b63215f84bf8ff0668a57e823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Thu, 9 Jun 2022 21:16:42 +0800 Subject: [PATCH] refactor: udp --- adapter/inbound/packet.go | 6 +-- adapter/inbound/util.go | 18 +++++++ adapter/outbound/base.go | 21 +++++++- adapter/outbound/shadowsocks.go | 6 --- common/pool/alloc.go | 8 ++- common/pool/pool.go | 4 +- config/config.go | 2 +- constant/adapters.go | 8 +-- constant/metadata.go | 14 +++++ go.mod | 2 +- go.sum | 4 +- listener/socks/udp.go | 23 ++++---- listener/socks/utils.go | 21 +++++--- listener/tproxy/packet.go | 22 +++++--- listener/tproxy/udp.go | 18 +++---- listener/tun/ipstack/gvisor/handler.go | 21 ++++---- listener/tun/ipstack/gvisor/udp.go | 16 +++--- listener/tun/ipstack/system/stack.go | 26 +++++---- listener/tun/ipstack/system/udp.go | 18 ++++--- test/go.mod | 2 +- test/go.sum | 4 +- tunnel/connection.go | 73 +++++++++++++++----------- tunnel/statistic/tracker.go | 21 ++++++++ 23 files changed, 231 insertions(+), 127 deletions(-) diff --git a/adapter/inbound/packet.go b/adapter/inbound/packet.go index 80b136cd..280102ea 100644 --- a/adapter/inbound/packet.go +++ b/adapter/inbound/packet.go @@ -2,7 +2,7 @@ package inbound import ( C "github.com/Dreamacro/clash/constant" - "github.com/Dreamacro/clash/transport/socks5" + M "github.com/sagernet/sing/common/metadata" ) // PacketAdapter is a UDP Packet adapter for socks/redir/tun @@ -17,8 +17,8 @@ func (s *PacketAdapter) Metadata() *C.Metadata { } // NewPacket is PacketAdapter generator -func NewPacket(target socks5.Addr, packet C.UDPPacket, source C.Type) *PacketAdapter { - metadata := parseSocksAddr(target) +func NewPacket(target M.Socksaddr, packet C.UDPPacket, source C.Type) *PacketAdapter { + metadata := socksAddrToMetadata(target) metadata.NetWork = C.UDP metadata.Type = source if ip, port, err := parseAddr(packet.LocalAddr().String()); err == nil { diff --git a/adapter/inbound/util.go b/adapter/inbound/util.go index 5dd4148d..e7dc1cf8 100644 --- a/adapter/inbound/util.go +++ b/adapter/inbound/util.go @@ -10,8 +10,26 @@ import ( "github.com/Dreamacro/clash/common/nnip" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/transport/socks5" + M "github.com/sagernet/sing/common/metadata" ) +func socksAddrToMetadata(addr M.Socksaddr) *C.Metadata { + metadata := &C.Metadata{} + switch addr.Family() { + case M.AddressFamilyIPv4: + metadata.AddrType = C.AtypIPv4 + metadata.DstIP = addr.Addr + case M.AddressFamilyIPv6: + metadata.AddrType = C.AtypIPv6 + metadata.DstIP = addr.Addr + case M.AddressFamilyFqdn: + metadata.AddrType = C.AtypDomainName + metadata.Host = addr.Fqdn + } + metadata.DstPort = strconv.Itoa(int(addr.Port)) + return metadata +} + func parseSocksAddr(target socks5.Addr) *C.Metadata { metadata := &C.Metadata{ AddrType: int(target[0]), diff --git a/adapter/outbound/base.go b/adapter/outbound/base.go index 6d7f6a8d..8fc67710 100644 --- a/adapter/outbound/base.go +++ b/adapter/outbound/base.go @@ -10,6 +10,10 @@ import ( "github.com/Dreamacro/clash/component/dialer" C "github.com/Dreamacro/clash/constant" "github.com/gofrs/uuid" + "github.com/sagernet/sing/common/buf" + "github.com/sagernet/sing/common/bufio" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" ) type Base struct { @@ -157,6 +161,7 @@ func NewConn(c net.Conn, a C.ProxyAdapter) C.Conn { type packetConn struct { net.PacketConn + nc N.PacketConn chain C.Chain actualRemoteDestination string } @@ -175,8 +180,22 @@ func (c *packetConn) AppendToChains(a C.ProxyAdapter) { c.chain = append(c.chain, a.Name()) } +func (c *packetConn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) { + return c.nc.ReadPacket(buffer) +} + +func (c *packetConn) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error { + return c.nc.WritePacket(buffer, addr) +} + func newPacketConn(pc net.PacketConn, a C.ProxyAdapter) C.PacketConn { - return &packetConn{pc, []string{a.Name()}, parseRemoteDestination(a.Addr())} + var nc N.PacketConn + if n, isNc := pc.(N.PacketConn); isNc { + nc = n + } else { + nc = &bufio.PacketConnWrapper{PacketConn: pc} + } + return &packetConn{pc, nc, []string{a.Name()}, parseRemoteDestination(a.Addr())} } func parseRemoteDestination(addr string) string { diff --git a/adapter/outbound/shadowsocks.go b/adapter/outbound/shadowsocks.go index 5f53edae..1099a179 100644 --- a/adapter/outbound/shadowsocks.go +++ b/adapter/outbound/shadowsocks.go @@ -7,7 +7,6 @@ import ( "net" "strconv" - "github.com/Dreamacro/clash/common/pool" "github.com/Dreamacro/clash/common/structure" "github.com/Dreamacro/clash/component/dialer" C "github.com/Dreamacro/clash/constant" @@ -16,15 +15,10 @@ import ( v2rayObfs "github.com/Dreamacro/clash/transport/v2ray-plugin" "github.com/sagernet/sing-shadowsocks" "github.com/sagernet/sing-shadowsocks/shadowimpl" - "github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/bufio" M "github.com/sagernet/sing/common/metadata" ) -func init() { - buf.DefaultAllocator = pool.DefaultAllocator -} - type ShadowSocks struct { *Base method shadowsocks.Method diff --git a/common/pool/alloc.go b/common/pool/alloc.go index 96fd059e..7b4a1d9e 100644 --- a/common/pool/alloc.go +++ b/common/pool/alloc.go @@ -6,9 +6,15 @@ import ( "errors" "math/bits" "sync" + + "github.com/sagernet/sing/common/buf" ) -var DefaultAllocator = NewAllocator() +var defaultAllocator = NewAllocator() + +func init() { + buf.DefaultAllocator = defaultAllocator +} // Allocator for incoming frames, optimized to prevent overwriting after zeroing type Allocator struct { diff --git a/common/pool/pool.go b/common/pool/pool.go index a2beb082..bee4887f 100644 --- a/common/pool/pool.go +++ b/common/pool/pool.go @@ -13,9 +13,9 @@ const ( ) func Get(size int) []byte { - return DefaultAllocator.Get(size) + return defaultAllocator.Get(size) } func Put(buf []byte) error { - return DefaultAllocator.Put(buf) + return defaultAllocator.Put(buf) } diff --git a/config/config.go b/config/config.go index c1999a08..b7620f9c 100644 --- a/config/config.go +++ b/config/config.go @@ -26,8 +26,8 @@ import ( "github.com/Dreamacro/clash/component/trie" C "github.com/Dreamacro/clash/constant" providerTypes "github.com/Dreamacro/clash/constant/provider" - "github.com/Dreamacro/clash/constant/sniffer" snifferTypes "github.com/Dreamacro/clash/constant/sniffer" + "github.com/Dreamacro/clash/constant/sniffer" "github.com/Dreamacro/clash/dns" "github.com/Dreamacro/clash/listener/tun/ipstack/commons" "github.com/Dreamacro/clash/log" diff --git a/constant/adapters.go b/constant/adapters.go index f1603fd1..9fa34b48 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -7,6 +7,8 @@ import ( "time" "github.com/Dreamacro/clash/component/dialer" + "github.com/sagernet/sing/common/buf" + N "github.com/sagernet/sing/common/network" ) // Adapter Type @@ -75,6 +77,7 @@ type Conn interface { type PacketConn interface { net.PacketConn Connection + N.PacketConn // Deprecate WriteWithMetadata because of remote resolve DNS cause TURN failed // WriteWithMetadata(p []byte, metadata *Metadata) (n int, err error) } @@ -184,7 +187,7 @@ func (at AdapterType) String() string { // UDPPacket contains the data of UDP packet, and offers control/info of UDP packet's source type UDPPacket interface { // Data get the payload of UDP Packet - Data() []byte + Data() *buf.Buffer // WriteBack writes the payload with source IP/Port equals addr // - variable source IP/Port is important to STUN @@ -192,8 +195,7 @@ type UDPPacket interface { // this is important when using Fake-IP. WriteBack(b []byte, addr net.Addr) (n int, err error) - // Drop call after packet is used, could recycle buffer in this function. - Drop() + N.PacketWriter // LocalAddr returns the source IP/Port of packet LocalAddr() net.Addr diff --git a/constant/metadata.go b/constant/metadata.go index b58847ff..e4fb33f2 100644 --- a/constant/metadata.go +++ b/constant/metadata.go @@ -6,6 +6,8 @@ import ( "net" "net/netip" "strconv" + + M "github.com/sagernet/sing/common/metadata" ) // Socks addr type @@ -170,6 +172,18 @@ func (m *Metadata) UDPAddr() *net.UDPAddr { } } +func (m *Metadata) Socksaddr() M.Socksaddr { + port, _ := strconv.ParseUint(m.DstPort, 10, 16) + if m.Host != "" { + return M.Socksaddr{ + Fqdn: m.Host, + Port: uint16(port), + } + } else { + return M.SocksaddrFromAddrPort(m.DstIP, uint16(port)) + } +} + func (m *Metadata) String() string { if m.Host != "" { return m.Host diff --git a/go.mod b/go.mod index 0ab813a5..8014e64d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/lucas-clemente/quic-go v0.27.2 github.com/miekg/dns v1.1.49 github.com/oschwald/geoip2-golang v1.7.0 - github.com/sagernet/sing v0.0.0-20220609091055-86d0144940e7 + github.com/sagernet/sing v0.0.0-20220609123159-a93588755159 github.com/sagernet/sing-shadowsocks v0.0.0-20220609092835-699292971c13 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.2 diff --git a/go.sum b/go.sum index 27332828..726d8b52 100644 --- a/go.sum +++ b/go.sum @@ -305,8 +305,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= -github.com/sagernet/sing v0.0.0-20220609091055-86d0144940e7 h1:Q+uNKLNSKqpx+p96qcBTVFh8RUKiQFr4IrNVi5Q5yl0= -github.com/sagernet/sing v0.0.0-20220609091055-86d0144940e7/go.mod h1:w2HnJzXKHpD6F5Z/9XlSD4qbcpHY2RSZuQnFzqgELMg= +github.com/sagernet/sing v0.0.0-20220609123159-a93588755159 h1:G3fww5jjADkHWi6yDOEzkZbQ6lnrytv0mKesBtEslxo= +github.com/sagernet/sing v0.0.0-20220609123159-a93588755159/go.mod h1:w2HnJzXKHpD6F5Z/9XlSD4qbcpHY2RSZuQnFzqgELMg= github.com/sagernet/sing-shadowsocks v0.0.0-20220609092835-699292971c13 h1:bQN0hjTHdB7SyaD9yjEYAl+bDl/kXW9zC0xNa+LMTrA= github.com/sagernet/sing-shadowsocks v0.0.0-20220609092835-699292971c13/go.mod h1:Fp/9+odJhtgDmiHbZClMLnxaVvmDRJxwA7u/+uXWDiQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= diff --git a/listener/socks/udp.go b/listener/socks/udp.go index 8bc439fb..74331bcf 100644 --- a/listener/socks/udp.go +++ b/listener/socks/udp.go @@ -4,11 +4,11 @@ import ( "net" "github.com/Dreamacro/clash/adapter/inbound" - "github.com/Dreamacro/clash/common/pool" "github.com/Dreamacro/clash/common/sockopt" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/log" - "github.com/Dreamacro/clash/transport/socks5" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" ) type UDPListener struct { @@ -49,34 +49,35 @@ func NewUDP(addr string, in chan<- *inbound.PacketAdapter) (*UDPListener, error) } go func() { for { - buf := pool.Get(pool.UDPBufferSize) - n, remoteAddr, err := l.ReadFrom(buf) + buffer := buf.NewPacket() + n, remoteAddr, err := l.ReadFrom(buffer.FreeBytes()) if err != nil { - pool.Put(buf) + buffer.Release() if sl.closed { break } continue } - handleSocksUDP(l, in, buf[:n], remoteAddr) + buffer.Extend(n) + handleSocksUDP(l, in, buffer, remoteAddr) } }() return sl, nil } -func handleSocksUDP(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buf []byte, addr net.Addr) { - target, payload, err := socks5.DecodeUDPPacket(buf) +func handleSocksUDP(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buffer *buf.Buffer, addr net.Addr) { + buffer.Advance(3) + target, err := M.SocksaddrSerializer.ReadAddrPort(buffer) if err != nil { // Unresolved UDP packet, return buffer to the pool - pool.Put(buf) + buffer.Release() return } packet := &packet{ pc: pc, rAddr: addr, - payload: payload, - bufRef: buf, + payload: buffer, } select { case in <- inbound.NewPacket(target, packet, C.SOCKS5): diff --git a/listener/socks/utils.go b/listener/socks/utils.go index 28dfef72..478591ba 100644 --- a/listener/socks/utils.go +++ b/listener/socks/utils.go @@ -3,18 +3,19 @@ package socks import ( "net" - "github.com/Dreamacro/clash/common/pool" "github.com/Dreamacro/clash/transport/socks5" + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" ) type packet struct { pc net.PacketConn rAddr net.Addr - payload []byte - bufRef []byte + payload *buf.Buffer } -func (c *packet) Data() []byte { +func (c *packet) Data() *buf.Buffer { return c.payload } @@ -27,11 +28,15 @@ func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) { return c.pc.WriteTo(packet, c.rAddr) } +func (c *packet) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error { + defer buffer.Release() + header := buf.With(buffer.ExtendHeader(3 + M.SocksaddrSerializer.AddrPortLen(destination))) + common.Must(header.WriteZeroN(3)) + common.Must(M.SocksaddrSerializer.WriteAddrPort(header, destination)) + return common.Error(c.pc.WriteTo(buffer.Bytes(), c.rAddr)) +} + // LocalAddr returns the source IP/Port of UDP Packet func (c *packet) LocalAddr() net.Addr { return c.rAddr } - -func (c *packet) Drop() { - pool.Put(c.bufRef) -} diff --git a/listener/tproxy/packet.go b/listener/tproxy/packet.go index 8aa3e9bf..961d9330 100644 --- a/listener/tproxy/packet.go +++ b/listener/tproxy/packet.go @@ -3,15 +3,16 @@ package tproxy import ( "net" - "github.com/Dreamacro/clash/common/pool" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" ) type packet struct { lAddr *net.UDPAddr - buf []byte + buf *buf.Buffer } -func (c *packet) Data() []byte { +func (c *packet) Data() *buf.Buffer { return c.buf } @@ -27,11 +28,18 @@ func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) { return } +func (c *packet) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error { + defer buffer.Release() + tc, err := dialUDP("udp", addr.UDPAddr(), c.lAddr) + defer tc.Close() + if err != nil { + return err + } + _, err = tc.Write(buffer.Bytes()) + return nil +} + // LocalAddr returns the source IP/Port of UDP Packet func (c *packet) LocalAddr() net.Addr { return c.lAddr } - -func (c *packet) Drop() { - pool.Put(c.buf) -} diff --git a/listener/tproxy/udp.go b/listener/tproxy/udp.go index c7e6d99e..a3725672 100644 --- a/listener/tproxy/udp.go +++ b/listener/tproxy/udp.go @@ -4,9 +4,9 @@ import ( "net" "github.com/Dreamacro/clash/adapter/inbound" - "github.com/Dreamacro/clash/common/pool" C "github.com/Dreamacro/clash/constant" - "github.com/Dreamacro/clash/transport/socks5" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" ) type UDPListener struct { @@ -57,10 +57,10 @@ func NewUDP(addr string, in chan<- *inbound.PacketAdapter) (*UDPListener, error) go func() { oob := make([]byte, 1024) for { - buf := pool.Get(pool.UDPBufferSize) - n, oobn, _, lAddr, err := c.ReadMsgUDP(buf, oob) + buffer := buf.NewPacket() + n, oobn, _, lAddr, err := c.ReadMsgUDP(buffer.FreeBytes(), oob) if err != nil { - pool.Put(buf) + buffer.Release() if rl.closed { break } @@ -71,21 +71,21 @@ func NewUDP(addr string, in chan<- *inbound.PacketAdapter) (*UDPListener, error) if err != nil { continue } - handlePacketConn(l, in, buf[:n], lAddr, rAddr) + buffer.Extend(n) + handlePacketConn(l, in, buffer, lAddr, rAddr) } }() return rl, nil } -func handlePacketConn(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buf []byte, lAddr *net.UDPAddr, rAddr *net.UDPAddr) { - target := socks5.ParseAddrToSocksAddr(rAddr) +func handlePacketConn(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buf *buf.Buffer, lAddr *net.UDPAddr, rAddr *net.UDPAddr) { pkt := &packet{ lAddr: lAddr, buf: buf, } select { - case in <- inbound.NewPacket(target, pkt, C.TPROXY): + case in <- inbound.NewPacket(M.SocksaddrFromNet(rAddr), pkt, C.TPROXY): default: } } diff --git a/listener/tun/ipstack/gvisor/handler.go b/listener/tun/ipstack/gvisor/handler.go index ed119fc3..81578137 100644 --- a/listener/tun/ipstack/gvisor/handler.go +++ b/listener/tun/ipstack/gvisor/handler.go @@ -15,6 +15,8 @@ import ( D "github.com/Dreamacro/clash/listener/tun/ipstack/commons" "github.com/Dreamacro/clash/listener/tun/ipstack/gvisor/adapter" "github.com/Dreamacro/clash/transport/socks5" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" ) var _ adapter.Handler = (*gvHandler)(nil) @@ -96,27 +98,24 @@ func (gh *gvHandler) HandleUDP(tunConn adapter.UDPConn) { return } - target := socks5.ParseAddrToSocksAddr(rAddr) + target := M.SocksaddrFromNet(rAddr) go func() { for { - buf := pool.Get(pool.UDPBufferSize) + buffer := buf.NewPacket() - n, addr, err := tunConn.ReadFrom(buf) + n, addr, err := tunConn.ReadFrom(buffer.FreeBytes()) if err != nil { - _ = pool.Put(buf) + buffer.Release() break } - - payload := buf[:n] + buffer.Truncate(n) if D.ShouldHijackDns(gh.dnsHijack, rAddrPort) { go func() { - defer func() { - _ = pool.Put(buf) - }() + defer buffer.Release() - msg, err1 := D.RelayDnsPacket(payload) + msg, err1 := D.RelayDnsPacket(buffer.Bytes()) if err1 != nil { return } @@ -130,7 +129,7 @@ func (gh *gvHandler) HandleUDP(tunConn adapter.UDPConn) { gvPacket := &packet{ pc: tunConn, rAddr: addr, - payload: payload, + payload: buffer, } select { diff --git a/listener/tun/ipstack/gvisor/udp.go b/listener/tun/ipstack/gvisor/udp.go index 80098887..8142c45f 100644 --- a/listener/tun/ipstack/gvisor/udp.go +++ b/listener/tun/ipstack/gvisor/udp.go @@ -5,10 +5,12 @@ package gvisor import ( "net" - "github.com/Dreamacro/clash/common/pool" "github.com/Dreamacro/clash/listener/tun/ipstack/gvisor/adapter" "github.com/Dreamacro/clash/listener/tun/ipstack/gvisor/option" "github.com/Dreamacro/clash/log" + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" "gvisor.dev/gvisor/pkg/tcpip/stack" "gvisor.dev/gvisor/pkg/tcpip/transport/udp" @@ -51,10 +53,10 @@ func (c *udpConn) ID() *stack.TransportEndpointID { type packet struct { pc adapter.UDPConn rAddr net.Addr - payload []byte + payload *buf.Buffer } -func (c *packet) Data() []byte { +func (c *packet) Data() *buf.Buffer { return c.payload } @@ -63,11 +65,11 @@ func (c *packet) WriteBack(b []byte, _ net.Addr) (n int, err error) { return c.pc.WriteTo(b, c.rAddr) } +func (c *packet) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error { + return common.Error(c.pc.WriteTo(buffer.Bytes(), c.rAddr)) +} + // LocalAddr returns the source IP/Port of UDP Packet func (c *packet) LocalAddr() net.Addr { return c.rAddr } - -func (c *packet) Drop() { - _ = pool.Put(c.payload) -} diff --git a/listener/tun/ipstack/system/stack.go b/listener/tun/ipstack/system/stack.go index b4668293..d71d3488 100644 --- a/listener/tun/ipstack/system/stack.go +++ b/listener/tun/ipstack/system/stack.go @@ -21,7 +21,8 @@ import ( "github.com/Dreamacro/clash/listener/tun/ipstack/system/mars" "github.com/Dreamacro/clash/listener/tun/ipstack/system/mars/nat" "github.com/Dreamacro/clash/log" - "github.com/Dreamacro/clash/transport/socks5" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" ) type sysStack struct { @@ -153,37 +154,37 @@ func New(device device.Device, dnsHijack []netip.AddrPort, tunAddress netip.Pref }(stack.UDP()) for !ipStack.closed { - buf := pool.Get(pool.UDPBufferSize) + buffer := buf.NewPacket() - n, lRAddr, rRAddr, err := stack.UDP().ReadFrom(buf) + n, lRAddr, rRAddr, err := stack.UDP().ReadFrom(buffer.FreeBytes()) if err != nil { - _ = pool.Put(buf) + buffer.Release() break } + buffer.Truncate(n) - raw := buf[:n] lAddr := lRAddr.(*net.UDPAddr) rAddr := rRAddr.(*net.UDPAddr) rAddrPort := netip.AddrPortFrom(nnip.IpToAddr(rAddr.IP), uint16(rAddr.Port)) if rAddrPort.Addr().IsLoopback() || rAddrPort.Addr() == gateway { - _ = pool.Put(buf) + buffer.Release() continue } if D.ShouldHijackDns(dnsAddr, rAddrPort) { go func() { - msg, err := D.RelayDnsPacket(raw) + msg, err := D.RelayDnsPacket(buffer.Bytes()) if err != nil { - _ = pool.Put(buf) + buffer.Release() return } _, _ = stack.UDP().WriteTo(msg, rAddr, lAddr) - _ = pool.Put(buf) + buffer.Release() }() continue @@ -191,17 +192,14 @@ func New(device device.Device, dnsHijack []netip.AddrPort, tunAddress netip.Pref pkt := &packet{ local: lAddr, - data: raw, + data: buffer, writeBack: func(b []byte, addr net.Addr) (int, error) { return stack.UDP().WriteTo(b, rAddr, lAddr) }, - drop: func() { - _ = pool.Put(buf) - }, } select { - case udpIn <- inbound.NewPacket(socks5.ParseAddrToSocksAddr(rAddr), pkt, C.TUN): + case udpIn <- inbound.NewPacket(M.SocksaddrFromNet(rAddr), pkt, C.TUN): default: } } diff --git a/listener/tun/ipstack/system/udp.go b/listener/tun/ipstack/system/udp.go index cb2761e8..e101dc0e 100644 --- a/listener/tun/ipstack/system/udp.go +++ b/listener/tun/ipstack/system/udp.go @@ -1,15 +1,20 @@ package system -import "net" +import ( + "net" + + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" +) type packet struct { local *net.UDPAddr - data []byte + data *buf.Buffer writeBack func(b []byte, addr net.Addr) (int, error) - drop func() } -func (pkt *packet) Data() []byte { +func (pkt *packet) Data() *buf.Buffer { return pkt.data } @@ -17,8 +22,9 @@ func (pkt *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) { return pkt.writeBack(b, addr) } -func (pkt *packet) Drop() { - pkt.drop() +func (pkt *packet) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error { + defer buffer.Release() + return common.Error(pkt.writeBack(buffer.Bytes(), addr.UDPAddr())) } func (pkt *packet) LocalAddr() net.Addr { diff --git a/test/go.mod b/test/go.mod index ac4e1756..bbcadc1c 100644 --- a/test/go.mod +++ b/test/go.mod @@ -61,7 +61,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/sagernet/sing v0.0.0-20220609091055-86d0144940e7 // indirect + github.com/sagernet/sing v0.0.0-20220609123159-a93588755159 // indirect github.com/sagernet/sing-shadowsocks v0.0.0-20220609092835-699292971c13 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/tobyxdd/hysteria v1.0.4 // indirect diff --git a/test/go.sum b/test/go.sum index 5ea06f65..ff75686c 100644 --- a/test/go.sum +++ b/test/go.sum @@ -323,8 +323,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= -github.com/sagernet/sing v0.0.0-20220609091055-86d0144940e7 h1:Q+uNKLNSKqpx+p96qcBTVFh8RUKiQFr4IrNVi5Q5yl0= -github.com/sagernet/sing v0.0.0-20220609091055-86d0144940e7/go.mod h1:w2HnJzXKHpD6F5Z/9XlSD4qbcpHY2RSZuQnFzqgELMg= +github.com/sagernet/sing v0.0.0-20220609123159-a93588755159 h1:G3fww5jjADkHWi6yDOEzkZbQ6lnrytv0mKesBtEslxo= +github.com/sagernet/sing v0.0.0-20220609123159-a93588755159/go.mod h1:w2HnJzXKHpD6F5Z/9XlSD4qbcpHY2RSZuQnFzqgELMg= github.com/sagernet/sing-shadowsocks v0.0.0-20220609092835-699292971c13 h1:bQN0hjTHdB7SyaD9yjEYAl+bDl/kXW9zC0xNa+LMTrA= github.com/sagernet/sing-shadowsocks v0.0.0-20220609092835-699292971c13/go.mod h1:Fp/9+odJhtgDmiHbZClMLnxaVvmDRJxwA7u/+uXWDiQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= diff --git a/tunnel/connection.go b/tunnel/connection.go index 0384e805..fd0d9725 100644 --- a/tunnel/connection.go +++ b/tunnel/connection.go @@ -1,19 +1,20 @@ package tunnel import ( - "errors" "net" "time" - N "github.com/Dreamacro/clash/common/net" - "github.com/Dreamacro/clash/common/pool" + cN "github.com/Dreamacro/clash/common/net" "github.com/Dreamacro/clash/component/resolver" C "github.com/Dreamacro/clash/constant" + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" ) func handleUDPToRemote(packet C.UDPPacket, pc C.PacketConn, metadata *C.Metadata) error { - defer packet.Drop() - + defer packet.Data().Release() // local resolve UDP dns if !metadata.Resolved() { ip, err := resolver.ResolveIP(metadata.Host) @@ -23,12 +24,7 @@ func handleUDPToRemote(packet C.UDPPacket, pc C.PacketConn, metadata *C.Metadata metadata.DstIP = ip } - addr := metadata.UDPAddr() - if addr == nil { - return errors.New("udp addr invalid") - } - - if _, err := pc.WriteTo(packet.Data(), addr); err != nil { + if err := pc.WritePacket(packet.Data(), metadata.Socksaddr()); err != nil { return err } // reset timeout @@ -37,30 +33,45 @@ func handleUDPToRemote(packet C.UDPPacket, pc C.PacketConn, metadata *C.Metadata return nil } -func handleUDPToLocal(packet C.UDPPacket, pc net.PacketConn, key string, fAddr net.Addr) { - buf := pool.Get(pool.UDPBufferSize) - defer pool.Put(buf) +func handleUDPToLocal(packet C.UDPPacket, pc C.PacketConn, key string, fAddr net.Addr) { defer natTable.Delete(key) defer pc.Close() - - for { - pc.SetReadDeadline(time.Now().Add(udpTimeout)) - n, from, err := pc.ReadFrom(buf) - if err != nil { - return - } - - if fAddr != nil { - from = fAddr - } - - _, err = packet.WriteBack(buf[:n], from) - if err != nil { - return - } + var fSocksaddr M.Socksaddr + if fAddr != nil { + fSocksaddr = M.SocksaddrFromNet(fAddr) } + _, _ = copyPacketTimeout(packet, pc, udpTimeout, fSocksaddr, fAddr != nil) } func handleSocket(ctx C.ConnContext, outbound net.Conn) { - N.Relay(ctx.Conn(), outbound) + cN.Relay(ctx.Conn(), outbound) +} + +func copyPacketTimeout(dst N.PacketWriter, src N.TimeoutPacketReader, timeout time.Duration, fAddr M.Socksaddr, fOverride bool) (n int64, err error) { + _buffer := buf.StackNewPacket() + defer common.KeepAlive(_buffer) + buffer := common.Dup(_buffer) + buffer.IncRef() + defer buffer.DecRef() + var destination M.Socksaddr + for { + buffer.Reset() + err = src.SetReadDeadline(time.Now().Add(timeout)) + if err != nil { + return + } + destination, err = src.ReadPacket(buffer) + if err != nil { + return + } + if fOverride { + destination = fAddr + } + dataLen := buffer.Len() + err = dst.WritePacket(buffer, destination) + if err != nil { + return + } + n += int64(dataLen) + } } diff --git a/tunnel/statistic/tracker.go b/tunnel/statistic/tracker.go index 2a7ab028..6b6d7a82 100644 --- a/tunnel/statistic/tracker.go +++ b/tunnel/statistic/tracker.go @@ -6,6 +6,8 @@ import ( C "github.com/Dreamacro/clash/constant" "github.com/gofrs/uuid" + "github.com/sagernet/sing/common/buf" + M "github.com/sagernet/sing/common/metadata" "go.uber.org/atomic" ) @@ -115,6 +117,25 @@ func (ut *udpTracker) WriteTo(b []byte, addr net.Addr) (int, error) { return n, err } +func (ut *udpTracker) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) { + addr, err = ut.PacketConn.ReadPacket(buffer) + download := int64(buffer.Len()) + ut.manager.PushDownloaded(download) + ut.DownloadTotal.Add(download) + return +} + +func (ut *udpTracker) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error { + dataLen := buffer.Len() + err := ut.PacketConn.WritePacket(buffer, addr) + if err == nil { + upload := int64(dataLen) + ut.manager.PushUploaded(upload) + ut.UploadTotal.Add(upload) + } + return err +} + func (ut *udpTracker) Close() error { ut.manager.Leave(ut) return ut.PacketConn.Close()