From c295c5e412b46bb0d735390e76ab84e560c9e245 Mon Sep 17 00:00:00 2001 From: Dreamacro <305009791@qq.com> Date: Fri, 15 Feb 2019 14:25:20 +0800 Subject: [PATCH] Feature: add load-balance group --- adapters/outbound/loadbalance.go | 94 +++++++++++++++++++ common/murmur3/murmur.go | 50 +++++++++++ common/murmur3/murmur32.go | 149 +++++++++++++++++++++++++++++++ config/config.go | 21 ++++- constant/adapters.go | 3 + go.mod | 2 +- 6 files changed, 314 insertions(+), 5 deletions(-) create mode 100644 adapters/outbound/loadbalance.go create mode 100644 common/murmur3/murmur.go create mode 100644 common/murmur3/murmur32.go diff --git a/adapters/outbound/loadbalance.go b/adapters/outbound/loadbalance.go new file mode 100644 index 00000000..3edeae5d --- /dev/null +++ b/adapters/outbound/loadbalance.go @@ -0,0 +1,94 @@ +package adapters + +import ( + "encoding/json" + "errors" + "net" + + "github.com/Dreamacro/clash/common/murmur3" + C "github.com/Dreamacro/clash/constant" + + "golang.org/x/net/publicsuffix" +) + +type LoadBalance struct { + *Base + proxies []C.Proxy + maxRetry int +} + +func getKey(metadata *C.Metadata) string { + if metadata.Host != "" { + // ip host + if ip := net.ParseIP(metadata.Host); ip != nil { + return metadata.Host + } + + if etld, err := publicsuffix.EffectiveTLDPlusOne(metadata.Host); err == nil { + return etld + } + } + + if metadata.IP == nil { + return "" + } + + return metadata.IP.String() +} + +func jumpHash(key uint64, buckets int32) int32 { + var b, j int64 + + for j < int64(buckets) { + b = j + key = key*2862933555777941757 + 1 + j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) + } + + return int32(b) +} + +func (lb *LoadBalance) Generator(metadata *C.Metadata) (net.Conn, error) { + key := uint64(murmur3.Sum32([]byte(getKey(metadata)))) + buckets := int32(len(lb.proxies)) + for i := 0; i < lb.maxRetry; i++ { + idx := jumpHash(key, buckets) + if proxy, err := lb.proxies[idx].Generator(metadata); err == nil { + return proxy, nil + } + key++ + } + + return lb.proxies[0].Generator(metadata) +} + +func (lb *LoadBalance) MarshalJSON() ([]byte, error) { + var all []string + for _, proxy := range lb.proxies { + all = append(all, proxy.Name()) + } + return json.Marshal(map[string]interface{}{ + "type": lb.Type().String(), + "all": all, + }) +} + +type LoadBalanceOption struct { + Name string `proxy:"name"` + Proxies []string `proxy:"proxies"` +} + +func NewLoadBalance(name string, proxies []C.Proxy) (*LoadBalance, error) { + if len(proxies) == 0 { + return nil, errors.New("Provide at least one proxy") + } + + return &LoadBalance{ + Base: &Base{ + name: name, + tp: C.LoadBalance, + }, + proxies: proxies, + maxRetry: 3, + }, nil +} diff --git a/common/murmur3/murmur.go b/common/murmur3/murmur.go new file mode 100644 index 00000000..f4470290 --- /dev/null +++ b/common/murmur3/murmur.go @@ -0,0 +1,50 @@ +package murmur3 + +type bmixer interface { + bmix(p []byte) (tail []byte) + Size() (n int) + reset() +} + +type digest struct { + clen int // Digested input cumulative length. + tail []byte // 0 to Size()-1 bytes view of `buf'. + buf [16]byte // Expected (but not required) to be Size() large. + seed uint32 // Seed for initializing the hash. + bmixer +} + +func (d *digest) BlockSize() int { return 1 } + +func (d *digest) Write(p []byte) (n int, err error) { + n = len(p) + d.clen += n + + if len(d.tail) > 0 { + // Stick back pending bytes. + nfree := d.Size() - len(d.tail) // nfree ∈ [1, d.Size()-1]. + if nfree < len(p) { + // One full block can be formed. + block := append(d.tail, p[:nfree]...) + p = p[nfree:] + _ = d.bmix(block) // No tail. + } else { + // Tail's buf is large enough to prevent reallocs. + p = append(d.tail, p...) + } + } + + d.tail = d.bmix(p) + + // Keep own copy of the 0 to Size()-1 pending bytes. + nn := copy(d.buf[:], d.tail) + d.tail = d.buf[:nn] + + return n, nil +} + +func (d *digest) Reset() { + d.clen = 0 + d.tail = nil + d.bmixer.reset() +} diff --git a/common/murmur3/murmur32.go b/common/murmur3/murmur32.go new file mode 100644 index 00000000..a4e4801e --- /dev/null +++ b/common/murmur3/murmur32.go @@ -0,0 +1,149 @@ +package murmur3 + +// https://github.com/spaolacci/murmur3/blob/master/murmur32.go + +import ( + "hash" + "unsafe" +) + +// Make sure interfaces are correctly implemented. +var ( + _ hash.Hash32 = new(digest32) + _ bmixer = new(digest32) +) + +const ( + c1_32 uint32 = 0xcc9e2d51 + c2_32 uint32 = 0x1b873593 +) + +// digest32 represents a partial evaluation of a 32 bites hash. +type digest32 struct { + digest + h1 uint32 // Unfinalized running hash. +} + +// New32 returns new 32-bit hasher +func New32() hash.Hash32 { return New32WithSeed(0) } + +// New32WithSeed returns new 32-bit hasher set with explicit seed value +func New32WithSeed(seed uint32) hash.Hash32 { + d := new(digest32) + d.seed = seed + d.bmixer = d + d.Reset() + return d +} + +func (d *digest32) Size() int { return 4 } + +func (d *digest32) reset() { d.h1 = d.seed } + +func (d *digest32) Sum(b []byte) []byte { + h := d.Sum32() + return append(b, byte(h>>24), byte(h>>16), byte(h>>8), byte(h)) +} + +// Digest as many blocks as possible. +func (d *digest32) bmix(p []byte) (tail []byte) { + h1 := d.h1 + + nblocks := len(p) / 4 + for i := 0; i < nblocks; i++ { + k1 := *(*uint32)(unsafe.Pointer(&p[i*4])) + + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + + h1 ^= k1 + h1 = (h1 << 13) | (h1 >> 19) // rotl32(h1, 13) + h1 = h1*4 + h1 + 0xe6546b64 + } + d.h1 = h1 + return p[nblocks*d.Size():] +} + +func (d *digest32) Sum32() (h1 uint32) { + + h1 = d.h1 + + var k1 uint32 + switch len(d.tail) & 3 { + case 3: + k1 ^= uint32(d.tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint32(d.tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint32(d.tail[0]) + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + h1 ^= k1 + } + + h1 ^= uint32(d.clen) + + h1 ^= h1 >> 16 + h1 *= 0x85ebca6b + h1 ^= h1 >> 13 + h1 *= 0xc2b2ae35 + h1 ^= h1 >> 16 + + return h1 +} + +func Sum32(data []byte) uint32 { return Sum32WithSeed(data, 0) } + +func Sum32WithSeed(data []byte, seed uint32) uint32 { + h1 := seed + + nblocks := len(data) / 4 + var p uintptr + if len(data) > 0 { + p = uintptr(unsafe.Pointer(&data[0])) + } + p1 := p + uintptr(4*nblocks) + for ; p < p1; p += 4 { + k1 := *(*uint32)(unsafe.Pointer(p)) + + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + + h1 ^= k1 + h1 = (h1 << 13) | (h1 >> 19) // rotl32(h1, 13) + h1 = h1*4 + h1 + 0xe6546b64 + } + + tail := data[nblocks*4:] + + var k1 uint32 + switch len(tail) & 3 { + case 3: + k1 ^= uint32(tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint32(tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint32(tail[0]) + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + h1 ^= k1 + } + + h1 ^= uint32(len(data)) + + h1 ^= h1 >> 16 + h1 *= 0x85ebca6b + h1 ^= h1 >> 13 + h1 *= 0xc2b2ae35 + h1 ^= h1 >> 16 + + return h1 +} diff --git a/config/config.go b/config/config.go index 8d9baa4b..7f725406 100644 --- a/config/config.go +++ b/config/config.go @@ -195,7 +195,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { } var proxy C.Proxy - var err error + err := fmt.Errorf("can't parse") switch proxyType { case "ss": ssOption := &adapters.ShadowSocksOption{} @@ -251,8 +251,9 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { return nil, fmt.Errorf("ProxyGroup %s: the duplicate name", groupName) } var group C.Proxy - var ps []C.Proxy - var err error + ps := []C.Proxy{} + + err := fmt.Errorf("can't parse") switch groupType { case "url-test": urlTestOption := &adapters.URLTestOption{} @@ -290,6 +291,18 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { return nil, fmt.Errorf("ProxyGroup %s: %s", groupName, err.Error()) } group, err = adapters.NewFallback(*fallbackOption, ps) + case "load-balance": + loadBalanceOption := &adapters.LoadBalanceOption{} + err = decoder.Decode(mapping, loadBalanceOption) + if err != nil { + break + } + + ps, err = getProxies(proxies, loadBalanceOption.Proxies) + if err != nil { + return nil, fmt.Errorf("ProxyGroup %s: %s", groupName, err.Error()) + } + group, err = adapters.NewLoadBalance(loadBalanceOption.Name, ps) } if err != nil { return nil, fmt.Errorf("Proxy %s: %s", groupName, err.Error()) @@ -297,7 +310,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { proxies[groupName] = group } - var ps []C.Proxy + ps := []C.Proxy{} for _, v := range proxies { ps = append(ps, v) } diff --git a/constant/adapters.go b/constant/adapters.go index 293b50ed..082743cf 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -15,6 +15,7 @@ const ( Http URLTest Vmess + LoadBalance ) type ServerAdapter interface { @@ -52,6 +53,8 @@ func (at AdapterType) String() string { return "URLTest" case Vmess: return "Vmess" + case LoadBalance: + return "LoadBalance" default: return "Unknow" } diff --git a/go.mod b/go.mod index cb4146de..31c879ec 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/oschwald/maxminddb-golang v1.3.0 // indirect github.com/sirupsen/logrus v1.3.0 golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 - golang.org/x/net v0.0.0-20181108082009-03003ca0c849 // indirect + golang.org/x/net v0.0.0-20181108082009-03003ca0c849 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect gopkg.in/eapache/channels.v1 v1.1.0 gopkg.in/yaml.v2 v2.2.2