chore: using xsync.MapOf replace sync.Map

This commit is contained in:
wwqgtxx 2023-09-02 16:54:35 +08:00
parent 73fa79bf3f
commit 1cad615b25
14 changed files with 110 additions and 86 deletions

View File

@ -10,7 +10,6 @@ import (
"net/netip"
"net/url"
"strconv"
"sync"
"time"
"github.com/Dreamacro/clash/common/atomic"
@ -19,6 +18,8 @@ import (
"github.com/Dreamacro/clash/component/dialer"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
"github.com/puzpuzpuz/xsync/v2"
)
var UnifiedDelay = atomic.NewBool(false)
@ -37,7 +38,7 @@ type Proxy struct {
history *queue.Queue[C.DelayHistory]
alive *atomic.Bool
url string
extra sync.Map
extra *xsync.MapOf[string, *extraProxyState]
}
// Alive implements C.Proxy
@ -48,7 +49,7 @@ func (p *Proxy) Alive() bool {
// AliveForTestUrl implements C.Proxy
func (p *Proxy) AliveForTestUrl(url string) bool {
if state, ok := p.extra.Load(url); ok {
return state.(*extraProxyState).alive.Load()
return state.alive.Load()
}
return p.alive.Load()
@ -96,7 +97,7 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
var queueM []C.DelayHistory
if state, ok := p.extra.Load(url); ok {
queueM = state.(*extraProxyState).history.Copy()
queueM = state.history.Copy()
}
if queueM == nil {
@ -113,10 +114,10 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extraHistory := map[string][]C.DelayHistory{}
p.extra.Range(func(k, v interface{}) bool {
p.extra.Range(func(k string, v *extraProxyState) bool {
testUrl := k.(string)
state := v.(*extraProxyState)
testUrl := k
state := v
histories := []C.DelayHistory{}
queueM := state.history.Copy()
@ -155,8 +156,8 @@ func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
history := p.history.Last()
if state, ok := p.extra.Load(url); ok {
alive = state.(*extraProxyState).alive.Load()
history = state.(*extraProxyState).history.Last()
alive = state.alive.Load()
history = state.history.Last()
}
if !alive {
@ -226,10 +227,10 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
p.extra.Store(url, state)
}
state.(*extraProxyState).alive.Store(alive)
state.(*extraProxyState).history.Put(record)
if state.(*extraProxyState).history.Len() > defaultHistoriesNum {
state.(*extraProxyState).history.Pop()
state.alive.Store(alive)
state.history.Put(record)
if state.history.Len() > defaultHistoriesNum {
state.history.Pop()
}
default:
log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t)
@ -311,7 +312,7 @@ func NewProxy(adapter C.ProxyAdapter) *Proxy {
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
url: "",
extra: sync.Map{}}
extra: xsync.NewMapOf[*extraProxyState]()}
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
@ -355,7 +356,7 @@ func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url strin
}
length := 0
p.extra.Range(func(_, _ interface{}) bool {
p.extra.Range(func(_ string, _ *extraProxyState) bool {
length++
return length < 2*C.DefaultMaxHealthCheckUrlNum
})

View File

@ -1,7 +1,7 @@
package auth
import (
"sync"
"github.com/puzpuzpuz/xsync/v2"
)
type Authenticator interface {
@ -15,7 +15,7 @@ type AuthUser struct {
}
type inMemoryAuthenticator struct {
storage *sync.Map
storage *xsync.MapOf[string, string]
usernames []string
}
@ -31,13 +31,13 @@ func NewAuthenticator(users []AuthUser) Authenticator {
return nil
}
au := &inMemoryAuthenticator{storage: &sync.Map{}}
au := &inMemoryAuthenticator{storage: xsync.NewMapOf[string]()}
for _, user := range users {
au.storage.Store(user.User, user.Pass)
}
usernames := make([]string, 0, len(users))
au.storage.Range(func(key, value any) bool {
usernames = append(usernames, key.(string))
au.storage.Range(func(key string, value string) bool {
usernames = append(usernames, key)
return true
})
au.usernames = usernames

View File

@ -5,23 +5,28 @@ import (
"sync"
C "github.com/Dreamacro/clash/constant"
"github.com/puzpuzpuz/xsync/v2"
)
type Table struct {
mapping sync.Map
mapping *xsync.MapOf[string, *Entry]
lockMap *xsync.MapOf[string, *sync.Cond]
}
type Entry struct {
PacketConn C.PacketConn
WriteBackProxy C.WriteBackProxy
LocalUDPConnMap sync.Map
LocalUDPConnMap *xsync.MapOf[string, *net.UDPConn]
LocalLockMap *xsync.MapOf[string, *sync.Cond]
}
func (t *Table) Set(key string, e C.PacketConn, w C.WriteBackProxy) {
t.mapping.Store(key, &Entry{
PacketConn: e,
WriteBackProxy: w,
LocalUDPConnMap: sync.Map{},
LocalUDPConnMap: xsync.NewMapOf[*net.UDPConn](),
LocalLockMap: xsync.NewMapOf[*sync.Cond](),
})
}
@ -34,15 +39,19 @@ func (t *Table) Get(key string) (C.PacketConn, C.WriteBackProxy) {
}
func (t *Table) GetOrCreateLock(key string) (*sync.Cond, bool) {
item, loaded := t.mapping.LoadOrStore(key, sync.NewCond(&sync.Mutex{}))
return item.(*sync.Cond), loaded
item, loaded := t.lockMap.LoadOrCompute(key, makeLock)
return item, loaded
}
func (t *Table) Delete(key string) {
t.mapping.Delete(key)
}
func (t *Table) GetLocalConn(lAddr, rAddr string) *net.UDPConn {
func (t *Table) DeleteLock(lockKey string) {
t.lockMap.Delete(lockKey)
}
func (t *Table) GetForLocalConn(lAddr, rAddr string) *net.UDPConn {
entry, exist := t.getEntry(lAddr)
if !exist {
return nil
@ -51,10 +60,10 @@ func (t *Table) GetLocalConn(lAddr, rAddr string) *net.UDPConn {
if !exist {
return nil
}
return item.(*net.UDPConn)
return item
}
func (t *Table) AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool {
func (t *Table) AddForLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool {
entry, exist := t.getEntry(lAddr)
if !exist {
return false
@ -63,7 +72,7 @@ func (t *Table) AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool {
return true
}
func (t *Table) RangeLocalConn(lAddr string, f func(key, value any) bool) {
func (t *Table) RangeForLocalConn(lAddr string, f func(key string, value *net.UDPConn) bool) {
entry, exist := t.getEntry(lAddr)
if !exist {
return
@ -76,11 +85,11 @@ func (t *Table) GetOrCreateLockForLocalConn(lAddr, key string) (*sync.Cond, bool
if !loaded {
return nil, false
}
item, loaded := entry.LocalUDPConnMap.LoadOrStore(key, sync.NewCond(&sync.Mutex{}))
return item.(*sync.Cond), loaded
item, loaded := entry.LocalLockMap.LoadOrCompute(key, makeLock)
return item, loaded
}
func (t *Table) DeleteLocalConnMap(lAddr, key string) {
func (t *Table) DeleteForLocalConn(lAddr, key string) {
entry, loaded := t.getEntry(lAddr)
if !loaded {
return
@ -88,17 +97,26 @@ func (t *Table) DeleteLocalConnMap(lAddr, key string) {
entry.LocalUDPConnMap.Delete(key)
}
func (t *Table) getEntry(key string) (*Entry, bool) {
item, ok := t.mapping.Load(key)
// This should not happen usually since this function called after PacketConn created
if !ok {
return nil, false
func (t *Table) DeleteLockForLocalConn(lAddr, key string) {
entry, loaded := t.getEntry(lAddr)
if !loaded {
return
}
entry, ok := item.(*Entry)
return entry, ok
entry.LocalLockMap.Delete(key)
}
func (t *Table) getEntry(key string) (*Entry, bool) {
return t.mapping.Load(key)
}
func makeLock() *sync.Cond {
return sync.NewCond(&sync.Mutex{})
}
// New return *Cache
func New() *Table {
return &Table{}
return &Table{
mapping: xsync.NewMapOf[*Entry](),
lockMap: xsync.NewMapOf[*sync.Cond](),
}
}

View File

@ -267,13 +267,17 @@ type NatTable interface {
Delete(key string)
GetLocalConn(lAddr, rAddr string) *net.UDPConn
DeleteLock(key string)
AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool
GetForLocalConn(lAddr, rAddr string) *net.UDPConn
RangeLocalConn(lAddr string, f func(key, value any) bool)
AddForLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool
GetOrCreateLockForLocalConn(lAddr, key string) (*sync.Cond, bool)
RangeForLocalConn(lAddr string, f func(key string, value *net.UDPConn) bool)
DeleteLocalConnMap(lAddr, key string)
GetOrCreateLockForLocalConn(lAddr string, key string) (*sync.Cond, bool)
DeleteForLocalConn(lAddr, key string)
DeleteLockForLocalConn(lAddr, key string)
}

1
go.mod
View File

@ -30,6 +30,7 @@ require (
github.com/mroth/weightedrand/v2 v2.1.0
github.com/openacid/low v0.1.21
github.com/oschwald/maxminddb-golang v1.12.0
github.com/puzpuzpuz/xsync/v2 v2.5.0
github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97
github.com/sagernet/sing v0.2.10-0.20230807080248-4db0062caa0a
github.com/sagernet/sing-mux v0.1.3-0.20230811111955-dc1639b5204c

2
go.sum
View File

@ -134,6 +134,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/puzpuzpuz/xsync/v2 v2.5.0 h1:2k4qrO/orvmEXZ3hmtHqIy9XaQtPTwzMZk1+iErpE8c=
github.com/puzpuzpuz/xsync/v2 v2.5.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-20 v0.3.2 h1:rRgN3WfnKbyik4dBV8A6girlJVxGand/d+jVKbQq5GI=

View File

@ -55,16 +55,15 @@ func (c *packet) InAddr() net.Addr {
func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) {
remote := rAddr.String()
local := lAddr.String()
localConn := natTable.GetLocalConn(local, remote)
localConn := natTable.GetForLocalConn(local, remote)
// localConn not exist
if localConn == nil {
lockKey := remote + "-lock"
cond, loaded := natTable.GetOrCreateLockForLocalConn(local, lockKey)
cond, loaded := natTable.GetOrCreateLockForLocalConn(local, remote)
if loaded {
cond.L.Lock()
cond.Wait()
// we should get localConn here
localConn = natTable.GetLocalConn(local, remote)
localConn = natTable.GetForLocalConn(local, remote)
if localConn == nil {
return nil, fmt.Errorf("localConn is nil, nat entry not exist")
}
@ -74,7 +73,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT
return nil, fmt.Errorf("cond is nil, nat entry not exist")
}
defer func() {
natTable.DeleteLocalConnMap(local, lockKey)
natTable.DeleteLockForLocalConn(local, remote)
cond.Broadcast()
}()
conn, err := listenLocalConn(rAddr, lAddr, in, natTable)
@ -82,7 +81,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT
log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local)
return nil, err
}
natTable.AddLocalConn(local, remote, conn)
natTable.AddForLocalConn(local, remote, conn)
localConn = conn
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/Dreamacro/clash/transport/tuic/common"
"github.com/metacubex/quic-go"
"github.com/puzpuzpuz/xsync/v2"
"github.com/zhangyunhao116/fastrand"
)
@ -49,7 +50,7 @@ type clientImpl struct {
openStreams atomic.Int64
closed atomic.Bool
udpInputMap sync.Map
udpInputMap *xsync.MapOf[uint32, net.Conn]
// only ready for PoolClient
dialerRef C.Dialer
@ -263,11 +264,10 @@ func (t *clientImpl) forceClose(quicConn quic.Connection, err error) {
if quicConn != nil {
_ = quicConn.CloseWithError(ProtocolError, errStr)
}
udpInputMap := &t.udpInputMap
udpInputMap.Range(func(key, value any) bool {
if conn, ok := value.(net.Conn); ok {
_ = conn.Close()
}
udpInputMap := t.udpInputMap
udpInputMap.Range(func(key uint32, value net.Conn) bool {
conn := value
_ = conn.Close()
udpInputMap.Delete(key)
return true
})
@ -469,6 +469,7 @@ func NewClient(clientOption *ClientOption, udp bool, dialerRef C.Dialer) *Client
ClientOption: clientOption,
udp: udp,
dialerRef: dialerRef,
udpInputMap: xsync.NewIntegerMapOf[uint32, net.Conn](),
}
c := &Client{ci}
runtime.SetFinalizer(c, closeClient)

View File

@ -17,6 +17,7 @@ import (
"github.com/gofrs/uuid/v5"
"github.com/metacubex/quic-go"
"github.com/puzpuzpuz/xsync/v2"
)
type ServerOption struct {
@ -33,6 +34,7 @@ func NewServerHandler(option *ServerOption, quicConn quic.EarlyConnection, uuid
quicConn: quicConn,
uuid: uuid,
authCh: make(chan struct{}),
udpInputMap: xsync.NewIntegerMapOf[uint32, *atomic.Bool](),
}
}
@ -45,7 +47,7 @@ type serverHandler struct {
authOk atomic.Bool
authOnce sync.Once
udpInputMap sync.Map
udpInputMap *xsync.MapOf[uint32, *atomic.Bool]
}
func (s *serverHandler) AuthOk() bool {
@ -78,8 +80,7 @@ func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelay
assocId = packet.ASSOC_ID
v, _ := s.udpInputMap.LoadOrStore(assocId, &atomic.Bool{})
writeClosed := v.(*atomic.Bool)
writeClosed, _ := s.udpInputMap.LoadOrCompute(assocId, func() *atomic.Bool { return &atomic.Bool{} })
if writeClosed.Load() {
return nil
}
@ -173,8 +174,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) {
if err != nil {
return
}
if v, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
writeClosed := v.(*atomic.Bool)
if writeClosed, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
writeClosed.Store(true)
}
case HeartbeatType:

View File

@ -20,6 +20,7 @@ import (
"github.com/Dreamacro/clash/transport/tuic/common"
"github.com/metacubex/quic-go"
"github.com/puzpuzpuz/xsync/v2"
"github.com/zhangyunhao116/fastrand"
)
@ -46,7 +47,7 @@ type clientImpl struct {
openStreams atomic.Int64
closed atomic.Bool
udpInputMap sync.Map
udpInputMap xsync.MapOf[uint16, net.Conn]
// only ready for PoolClient
dialerRef C.Dialer
@ -270,10 +271,9 @@ func (t *clientImpl) forceClose(quicConn quic.Connection, err error) {
_ = quicConn.CloseWithError(ProtocolError, errStr)
}
udpInputMap := &t.udpInputMap
udpInputMap.Range(func(key, value any) bool {
if conn, ok := value.(net.Conn); ok {
_ = conn.Close()
}
udpInputMap.Range(func(key uint16, value net.Conn) bool {
conn := value
_ = conn.Close()
udpInputMap.Delete(key)
return true
})
@ -406,6 +406,7 @@ func NewClient(clientOption *ClientOption, udp bool, dialerRef C.Dialer) *Client
ClientOption: clientOption,
udp: udp,
dialerRef: dialerRef,
udpInputMap: *xsync.NewIntegerMapOf[uint16, net.Conn](),
}
c := &Client{ci}
runtime.SetFinalizer(c, closeClient)

View File

@ -16,6 +16,7 @@ import (
"github.com/gofrs/uuid/v5"
"github.com/metacubex/quic-go"
"github.com/puzpuzpuz/xsync/v2"
)
type ServerOption struct {
@ -32,6 +33,7 @@ func NewServerHandler(option *ServerOption, quicConn quic.EarlyConnection, uuid
quicConn: quicConn,
uuid: uuid,
authCh: make(chan struct{}),
udpInputMap: xsync.NewIntegerMapOf[uint16, *serverUDPInput](),
}
}
@ -45,7 +47,7 @@ type serverHandler struct {
authUUID atomic.TypedValue[string]
authOnce sync.Once
udpInputMap sync.Map
udpInputMap *xsync.MapOf[uint16, *serverUDPInput]
}
func (s *serverHandler) AuthOk() bool {
@ -94,8 +96,7 @@ func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelay
assocId = packet.ASSOC_ID
v, _ := s.udpInputMap.LoadOrStore(assocId, &serverUDPInput{})
input := v.(*serverUDPInput)
input, _ := s.udpInputMap.LoadOrCompute(assocId, func() *serverUDPInput { return &serverUDPInput{} })
if input.writeClosed.Load() {
return nil
}
@ -186,8 +187,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) {
if err != nil {
return
}
if v, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
input := v.(*serverUDPInput)
if input, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
input.writeClosed.Store(true)
}
}

View File

@ -73,12 +73,9 @@ func handleUDPToLocal(writeBack C.WriteBack, pc N.EnhancePacketConn, key string,
}
func closeAllLocalCoon(lAddr string) {
natTable.RangeLocalConn(lAddr, func(key, value any) bool {
conn, ok := value.(*net.UDPConn)
if !ok || conn == nil {
log.Debugln("Value %#v unknown value when closing TProxy local conn...", conn)
return true
}
natTable.RangeForLocalConn(lAddr, func(key string, value *net.UDPConn) bool {
conn := value
conn.Close()
log.Debugln("Closing TProxy local conn... lAddr=%s rAddr=%s", lAddr, key)
return true

View File

@ -2,11 +2,11 @@ package statistic
import (
"os"
"sync"
"time"
"github.com/Dreamacro/clash/common/atomic"
"github.com/puzpuzpuz/xsync/v2"
"github.com/shirou/gopsutil/v3/process"
)
@ -14,6 +14,7 @@ var DefaultManager *Manager
func init() {
DefaultManager = &Manager{
connections: xsync.NewMapOf[Tracker](),
uploadTemp: atomic.NewInt64(0),
downloadTemp: atomic.NewInt64(0),
uploadBlip: atomic.NewInt64(0),
@ -27,7 +28,7 @@ func init() {
}
type Manager struct {
connections sync.Map
connections *xsync.MapOf[string, Tracker]
uploadTemp *atomic.Int64
downloadTemp *atomic.Int64
uploadBlip *atomic.Int64
@ -48,14 +49,14 @@ func (m *Manager) Leave(c Tracker) {
func (m *Manager) Get(id string) (c Tracker) {
if value, ok := m.connections.Load(id); ok {
c = value.(Tracker)
c = value
}
return
}
func (m *Manager) Range(f func(c Tracker) bool) {
m.connections.Range(func(key, value any) bool {
return f(value.(Tracker))
m.connections.Range(func(key string, value Tracker) bool {
return f(value)
})
}

View File

@ -318,8 +318,7 @@ func handleUDPConn(packet C.PacketAdapter) {
return
}
lockKey := key + "-lock"
cond, loaded := natTable.GetOrCreateLock(lockKey)
cond, loaded := natTable.GetOrCreateLock(key)
go func() {
defer packet.Drop()
@ -333,7 +332,7 @@ func handleUDPConn(packet C.PacketAdapter) {
}
defer func() {
natTable.Delete(lockKey)
natTable.DeleteLock(key)
cond.Broadcast()
}()