From b1d75812c545e21691e17c78ee8de7ae8ae79a18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Tue, 11 Jun 2024 21:16:33 +0800 Subject: [PATCH] platform: Prepare connections list --- box.go | 1 + constant/proxy.go | 8 + experimental/clashapi/connections.go | 5 +- experimental/clashapi/server.go | 48 +--- .../clashapi/trafficontrol/manager.go | 91 ++++-- .../clashapi/trafficontrol/tracker.go | 249 +++++++++------- experimental/libbox/command.go | 2 + experimental/libbox/command_client.go | 8 + .../libbox/command_close_connection.go | 53 ++++ experimental/libbox/command_connections.go | 269 ++++++++++++++++++ experimental/libbox/command_group.go | 60 ++-- experimental/libbox/command_server.go | 6 + experimental/libbox/command_status.go | 2 +- experimental/libbox/iterator.go | 4 + experimental/libbox/service.go | 54 ++-- experimental/libbox/setup.go | 6 + inbound/builder.go | 34 +-- log/format.go | 6 +- 18 files changed, 652 insertions(+), 254 deletions(-) create mode 100644 experimental/libbox/command_close_connection.go create mode 100644 experimental/libbox/command_connections.go diff --git a/box.go b/box.go index 70235fd3..3c514cfe 100644 --- a/box.go +++ b/box.go @@ -111,6 +111,7 @@ func New(options Options) (*Box, error) { ctx, router, logFactory.NewLogger(F.ToString("inbound/", inboundOptions.Type, "[", tag, "]")), + tag, inboundOptions, options.PlatformInterface, ) diff --git a/constant/proxy.go b/constant/proxy.go index 1e9baee2..3197de60 100644 --- a/constant/proxy.go +++ b/constant/proxy.go @@ -32,6 +32,12 @@ const ( func ProxyDisplayName(proxyType string) string { switch proxyType { + case TypeTun: + return "TUN" + case TypeRedirect: + return "Redirect" + case TypeTProxy: + return "TProxy" case TypeDirect: return "Direct" case TypeBlock: @@ -42,6 +48,8 @@ func ProxyDisplayName(proxyType string) string { return "SOCKS" case TypeHTTP: return "HTTP" + case TypeMixed: + return "Mixed" case TypeShadowsocks: return "Shadowsocks" case TypeVMess: diff --git a/experimental/clashapi/connections.go b/experimental/clashapi/connections.go index c9471207..999d5898 100644 --- a/experimental/clashapi/connections.go +++ b/experimental/clashapi/connections.go @@ -14,6 +14,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/render" + "github.com/gofrs/uuid/v5" ) func connectionRouter(router adapter.Router, trafficManager *trafficontrol.Manager) http.Handler { @@ -76,10 +77,10 @@ func getConnections(trafficManager *trafficontrol.Manager) func(w http.ResponseW func closeConnection(trafficManager *trafficontrol.Manager) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - id := chi.URLParam(r, "id") + id := uuid.FromStringOrNil(chi.URLParam(r, "id")) snapshot := trafficManager.Snapshot() for _, c := range snapshot.Connections { - if id == c.ID() { + if id == c.Metadata().ID { c.Close() break } diff --git a/experimental/clashapi/server.go b/experimental/clashapi/server.go index 375045ac..5ace3351 100644 --- a/experimental/clashapi/server.go +++ b/experimental/clashapi/server.go @@ -19,7 +19,6 @@ import ( "github.com/sagernet/sing-box/option" "github.com/sagernet/sing/common" E "github.com/sagernet/sing/common/exceptions" - F "github.com/sagernet/sing/common/format" "github.com/sagernet/sing/common/json" N "github.com/sagernet/sing/common/network" "github.com/sagernet/sing/service" @@ -218,58 +217,15 @@ func (s *Server) TrafficManager() *trafficontrol.Manager { } func (s *Server) RoutedConnection(ctx context.Context, conn net.Conn, metadata adapter.InboundContext, matchedRule adapter.Rule) (net.Conn, adapter.Tracker) { - tracker := trafficontrol.NewTCPTracker(conn, s.trafficManager, castMetadata(metadata), s.router, matchedRule) + tracker := trafficontrol.NewTCPTracker(conn, s.trafficManager, metadata, s.router, matchedRule) return tracker, tracker } func (s *Server) RoutedPacketConnection(ctx context.Context, conn N.PacketConn, metadata adapter.InboundContext, matchedRule adapter.Rule) (N.PacketConn, adapter.Tracker) { - tracker := trafficontrol.NewUDPTracker(conn, s.trafficManager, castMetadata(metadata), s.router, matchedRule) + tracker := trafficontrol.NewUDPTracker(conn, s.trafficManager, metadata, s.router, matchedRule) return tracker, tracker } -func castMetadata(metadata adapter.InboundContext) trafficontrol.Metadata { - var inbound string - if metadata.Inbound != "" { - inbound = metadata.InboundType + "/" + metadata.Inbound - } else { - inbound = metadata.InboundType - } - var domain string - if metadata.Domain != "" { - domain = metadata.Domain - } else { - domain = metadata.Destination.Fqdn - } - var processPath string - if metadata.ProcessInfo != nil { - if metadata.ProcessInfo.ProcessPath != "" { - processPath = metadata.ProcessInfo.ProcessPath - } else if metadata.ProcessInfo.PackageName != "" { - processPath = metadata.ProcessInfo.PackageName - } - if processPath == "" { - if metadata.ProcessInfo.UserId != -1 { - processPath = F.ToString(metadata.ProcessInfo.UserId) - } - } else if metadata.ProcessInfo.User != "" { - processPath = F.ToString(processPath, " (", metadata.ProcessInfo.User, ")") - } else if metadata.ProcessInfo.UserId != -1 { - processPath = F.ToString(processPath, " (", metadata.ProcessInfo.UserId, ")") - } - } - return trafficontrol.Metadata{ - NetWork: metadata.Network, - Type: inbound, - SrcIP: metadata.Source.Addr, - DstIP: metadata.Destination.Addr, - SrcPort: F.ToString(metadata.Source.Port), - DstPort: F.ToString(metadata.Destination.Port), - Host: domain, - DNSMode: "normal", - ProcessPath: processPath, - } -} - func authentication(serverSecret string) func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { diff --git a/experimental/clashapi/trafficontrol/manager.go b/experimental/clashapi/trafficontrol/manager.go index eac7aee4..9b22f1e3 100644 --- a/experimental/clashapi/trafficontrol/manager.go +++ b/experimental/clashapi/trafficontrol/manager.go @@ -2,10 +2,17 @@ package trafficontrol import ( "runtime" + "sync" "time" + C "github.com/sagernet/sing-box/constant" "github.com/sagernet/sing-box/experimental/clashapi/compatible" + "github.com/sagernet/sing/common" "github.com/sagernet/sing/common/atomic" + "github.com/sagernet/sing/common/json" + "github.com/sagernet/sing/common/x/list" + + "github.com/gofrs/uuid/v5" ) type Manager struct { @@ -16,9 +23,11 @@ type Manager struct { uploadTotal atomic.Int64 downloadTotal atomic.Int64 - connections compatible.Map[string, tracker] - ticker *time.Ticker - done chan struct{} + connections compatible.Map[uuid.UUID, Tracker] + closedConnectionsAccess sync.Mutex + closedConnections list.List[TrackerMetadata] + ticker *time.Ticker + done chan struct{} // process *process.Process memory uint64 } @@ -33,12 +42,22 @@ func NewManager() *Manager { return manager } -func (m *Manager) Join(c tracker) { - m.connections.Store(c.ID(), c) +func (m *Manager) Join(c Tracker) { + m.connections.Store(c.Metadata().ID, c) } -func (m *Manager) Leave(c tracker) { - m.connections.Delete(c.ID()) +func (m *Manager) Leave(c Tracker) { + metadata := c.Metadata() + _, loaded := m.connections.LoadAndDelete(metadata.ID) + if loaded { + metadata.ClosedAt = time.Now() + m.closedConnectionsAccess.Lock() + defer m.closedConnectionsAccess.Unlock() + if m.closedConnections.Len() >= 1000 { + m.closedConnections.PopFront() + } + m.closedConnections.PushBack(metadata) + } } func (m *Manager) PushUploaded(size int64) { @@ -59,14 +78,39 @@ func (m *Manager) Total() (up int64, down int64) { return m.uploadTotal.Load(), m.downloadTotal.Load() } -func (m *Manager) Connections() int { +func (m *Manager) ConnectionsLen() int { return m.connections.Len() } +func (m *Manager) Connections() []TrackerMetadata { + var connections []TrackerMetadata + m.connections.Range(func(_ uuid.UUID, value Tracker) bool { + connections = append(connections, value.Metadata()) + return true + }) + return connections +} + +func (m *Manager) ClosedConnections() []TrackerMetadata { + m.closedConnectionsAccess.Lock() + defer m.closedConnectionsAccess.Unlock() + return m.closedConnections.Array() +} + +func (m *Manager) Connection(id uuid.UUID) Tracker { + connection, loaded := m.connections.Load(id) + if !loaded { + return nil + } + return connection +} + func (m *Manager) Snapshot() *Snapshot { - var connections []tracker - m.connections.Range(func(_ string, value tracker) bool { - connections = append(connections, value) + var connections []Tracker + m.connections.Range(func(_ uuid.UUID, value Tracker) bool { + if value.Metadata().OutboundType != C.TypeDNS { + connections = append(connections, value) + } return true }) @@ -75,10 +119,10 @@ func (m *Manager) Snapshot() *Snapshot { m.memory = memStats.StackInuse + memStats.HeapInuse + memStats.HeapIdle - memStats.HeapReleased return &Snapshot{ - UploadTotal: m.uploadTotal.Load(), - DownloadTotal: m.downloadTotal.Load(), - Connections: connections, - Memory: m.memory, + Upload: m.uploadTotal.Load(), + Download: m.downloadTotal.Load(), + Connections: connections, + Memory: m.memory, } } @@ -114,8 +158,17 @@ func (m *Manager) Close() error { } type Snapshot struct { - DownloadTotal int64 `json:"downloadTotal"` - UploadTotal int64 `json:"uploadTotal"` - Connections []tracker `json:"connections"` - Memory uint64 `json:"memory"` + Download int64 + Upload int64 + Connections []Tracker + Memory uint64 +} + +func (s *Snapshot) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{ + "downloadTotal": s.Download, + "uploadTotal": s.Upload, + "connections": common.Map(s.Connections, func(t Tracker) TrackerMetadata { return t.Metadata() }), + "memory": s.Memory, + }) } diff --git a/experimental/clashapi/trafficontrol/tracker.go b/experimental/clashapi/trafficontrol/tracker.go index 4e635d12..73c28e69 100644 --- a/experimental/clashapi/trafficontrol/tracker.go +++ b/experimental/clashapi/trafficontrol/tracker.go @@ -2,97 +2,135 @@ package trafficontrol import ( "net" - "net/netip" "time" "github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing/common" "github.com/sagernet/sing/common/atomic" "github.com/sagernet/sing/common/bufio" + F "github.com/sagernet/sing/common/format" "github.com/sagernet/sing/common/json" N "github.com/sagernet/sing/common/network" "github.com/gofrs/uuid/v5" ) -type Metadata struct { - NetWork string `json:"network"` - Type string `json:"type"` - SrcIP netip.Addr `json:"sourceIP"` - DstIP netip.Addr `json:"destinationIP"` - SrcPort string `json:"sourcePort"` - DstPort string `json:"destinationPort"` - Host string `json:"host"` - DNSMode string `json:"dnsMode"` - ProcessPath string `json:"processPath"` +type TrackerMetadata struct { + ID uuid.UUID + Metadata adapter.InboundContext + CreatedAt time.Time + ClosedAt time.Time + Upload *atomic.Int64 + Download *atomic.Int64 + Chain []string + Rule adapter.Rule + Outbound string + OutboundType string } -type tracker interface { - ID() string - Close() error - Leave() -} - -type trackerInfo struct { - UUID uuid.UUID `json:"id"` - Metadata Metadata `json:"metadata"` - UploadTotal *atomic.Int64 `json:"upload"` - DownloadTotal *atomic.Int64 `json:"download"` - Start time.Time `json:"start"` - Chain []string `json:"chains"` - Rule string `json:"rule"` - RulePayload string `json:"rulePayload"` -} - -func (t trackerInfo) MarshalJSON() ([]byte, error) { +func (t TrackerMetadata) MarshalJSON() ([]byte, error) { + var inbound string + if t.Metadata.Inbound != "" { + inbound = t.Metadata.InboundType + "/" + t.Metadata.Inbound + } else { + inbound = t.Metadata.InboundType + } + var domain string + if t.Metadata.Domain != "" { + domain = t.Metadata.Domain + } else { + domain = t.Metadata.Destination.Fqdn + } + var processPath string + if t.Metadata.ProcessInfo != nil { + if t.Metadata.ProcessInfo.ProcessPath != "" { + processPath = t.Metadata.ProcessInfo.ProcessPath + } else if t.Metadata.ProcessInfo.PackageName != "" { + processPath = t.Metadata.ProcessInfo.PackageName + } + if processPath == "" { + if t.Metadata.ProcessInfo.UserId != -1 { + processPath = F.ToString(t.Metadata.ProcessInfo.UserId) + } + } else if t.Metadata.ProcessInfo.User != "" { + processPath = F.ToString(processPath, " (", t.Metadata.ProcessInfo.User, ")") + } else if t.Metadata.ProcessInfo.UserId != -1 { + processPath = F.ToString(processPath, " (", t.Metadata.ProcessInfo.UserId, ")") + } + } + var rule string + if t.Rule != nil { + rule = F.ToString(t.Rule, " => ", t.Rule.Outbound()) + } else { + rule = "final" + } return json.Marshal(map[string]any{ - "id": t.UUID.String(), - "metadata": t.Metadata, - "upload": t.UploadTotal.Load(), - "download": t.DownloadTotal.Load(), - "start": t.Start, + "id": t.ID, + "metadata": map[string]any{ + "network": t.Metadata.Network, + "type": inbound, + "sourceIP": t.Metadata.Source.Addr, + "destinationIP": t.Metadata.Destination.Addr, + "sourcePort": F.ToString(t.Metadata.Source.Port), + "destinationPort": F.ToString(t.Metadata.Destination.Port), + "host": domain, + "dnsMode": "normal", + "processPath": processPath, + }, + "upload": t.Upload.Load(), + "download": t.Download.Load(), + "start": t.CreatedAt, "chains": t.Chain, - "rule": t.Rule, - "rulePayload": t.RulePayload, + "rule": rule, + "rulePayload": "", }) } -type tcpTracker struct { - N.ExtendedConn `json:"-"` - *trackerInfo - manager *Manager +type Tracker interface { + adapter.Tracker + Metadata() TrackerMetadata + Close() error } -func (tt *tcpTracker) ID() string { - return tt.UUID.String() +type TCPConn struct { + N.ExtendedConn + metadata TrackerMetadata + manager *Manager } -func (tt *tcpTracker) Close() error { +func (tt *TCPConn) Metadata() TrackerMetadata { + return tt.metadata +} + +func (tt *TCPConn) Close() error { tt.manager.Leave(tt) return tt.ExtendedConn.Close() } -func (tt *tcpTracker) Leave() { +func (tt *TCPConn) Leave() { tt.manager.Leave(tt) } -func (tt *tcpTracker) Upstream() any { +func (tt *TCPConn) Upstream() any { return tt.ExtendedConn } -func (tt *tcpTracker) ReaderReplaceable() bool { +func (tt *TCPConn) ReaderReplaceable() bool { return true } -func (tt *tcpTracker) WriterReplaceable() bool { +func (tt *TCPConn) WriterReplaceable() bool { return true } -func NewTCPTracker(conn net.Conn, manager *Manager, metadata Metadata, router adapter.Router, rule adapter.Rule) *tcpTracker { - uuid, _ := uuid.NewV4() - - var chain []string - var next string +func NewTCPTracker(conn net.Conn, manager *Manager, metadata adapter.InboundContext, router adapter.Router, rule adapter.Rule) *TCPConn { + id, _ := uuid.NewV4() + var ( + chain []string + next string + outbound string + outboundType string + ) if rule == nil { if defaultOutbound, err := router.DefaultOutbound(N.NetworkTCP); err == nil { next = defaultOutbound.Tag() @@ -106,17 +144,17 @@ func NewTCPTracker(conn net.Conn, manager *Manager, metadata Metadata, router ad if !loaded { break } + outbound = detour.Tag() + outboundType = detour.Type() group, isGroup := detour.(adapter.OutboundGroup) if !isGroup { break } next = group.Now() } - upload := new(atomic.Int64) download := new(atomic.Int64) - - t := &tcpTracker{ + tracker := &TCPConn{ ExtendedConn: bufio.NewCounterConn(conn, []N.CountFunc{func(n int64) { upload.Add(n) manager.PushUploaded(n) @@ -124,64 +162,62 @@ func NewTCPTracker(conn net.Conn, manager *Manager, metadata Metadata, router ad download.Add(n) manager.PushDownloaded(n) }}), - manager: manager, - trackerInfo: &trackerInfo{ - UUID: uuid, - Start: time.Now(), - Metadata: metadata, - Chain: common.Reverse(chain), - Rule: "", - UploadTotal: upload, - DownloadTotal: download, + metadata: TrackerMetadata{ + ID: id, + Metadata: metadata, + CreatedAt: time.Now(), + Upload: upload, + Download: download, + Chain: common.Reverse(chain), + Rule: rule, + Outbound: outbound, + OutboundType: outboundType, }, + manager: manager, } - - if rule != nil { - t.trackerInfo.Rule = rule.String() + " => " + rule.Outbound() - } else { - t.trackerInfo.Rule = "final" - } - - manager.Join(t) - return t + manager.Join(tracker) + return tracker } -type udpTracker struct { +type UDPConn struct { N.PacketConn `json:"-"` - *trackerInfo - manager *Manager + metadata TrackerMetadata + manager *Manager } -func (ut *udpTracker) ID() string { - return ut.UUID.String() +func (ut *UDPConn) Metadata() TrackerMetadata { + return ut.metadata } -func (ut *udpTracker) Close() error { +func (ut *UDPConn) Close() error { ut.manager.Leave(ut) return ut.PacketConn.Close() } -func (ut *udpTracker) Leave() { +func (ut *UDPConn) Leave() { ut.manager.Leave(ut) } -func (ut *udpTracker) Upstream() any { +func (ut *UDPConn) Upstream() any { return ut.PacketConn } -func (ut *udpTracker) ReaderReplaceable() bool { +func (ut *UDPConn) ReaderReplaceable() bool { return true } -func (ut *udpTracker) WriterReplaceable() bool { +func (ut *UDPConn) WriterReplaceable() bool { return true } -func NewUDPTracker(conn N.PacketConn, manager *Manager, metadata Metadata, router adapter.Router, rule adapter.Rule) *udpTracker { - uuid, _ := uuid.NewV4() - - var chain []string - var next string +func NewUDPTracker(conn N.PacketConn, manager *Manager, metadata adapter.InboundContext, router adapter.Router, rule adapter.Rule) *UDPConn { + id, _ := uuid.NewV4() + var ( + chain []string + next string + outbound string + outboundType string + ) if rule == nil { if defaultOutbound, err := router.DefaultOutbound(N.NetworkUDP); err == nil { next = defaultOutbound.Tag() @@ -195,17 +231,17 @@ func NewUDPTracker(conn N.PacketConn, manager *Manager, metadata Metadata, route if !loaded { break } + outbound = detour.Tag() + outboundType = detour.Type() group, isGroup := detour.(adapter.OutboundGroup) if !isGroup { break } next = group.Now() } - upload := new(atomic.Int64) download := new(atomic.Int64) - - ut := &udpTracker{ + trackerConn := &UDPConn{ PacketConn: bufio.NewCounterPacketConn(conn, []N.CountFunc{func(n int64) { upload.Add(n) manager.PushUploaded(n) @@ -213,24 +249,19 @@ func NewUDPTracker(conn N.PacketConn, manager *Manager, metadata Metadata, route download.Add(n) manager.PushDownloaded(n) }}), - manager: manager, - trackerInfo: &trackerInfo{ - UUID: uuid, - Start: time.Now(), - Metadata: metadata, - Chain: common.Reverse(chain), - Rule: "", - UploadTotal: upload, - DownloadTotal: download, + metadata: TrackerMetadata{ + ID: id, + Metadata: metadata, + CreatedAt: time.Now(), + Upload: upload, + Download: download, + Chain: common.Reverse(chain), + Rule: rule, + Outbound: outbound, + OutboundType: outboundType, }, + manager: manager, } - - if rule != nil { - ut.trackerInfo.Rule = rule.String() + " => " + rule.Outbound() - } else { - ut.trackerInfo.Rule = "final" - } - - manager.Join(ut) - return ut + manager.Join(trackerConn) + return trackerConn } diff --git a/experimental/libbox/command.go b/experimental/libbox/command.go index 7915419d..f9aca13f 100644 --- a/experimental/libbox/command.go +++ b/experimental/libbox/command.go @@ -14,4 +14,6 @@ const ( CommandSetClashMode CommandGetSystemProxyStatus CommandSetSystemProxyEnabled + CommandConnections + CommandCloseConnection ) diff --git a/experimental/libbox/command_client.go b/experimental/libbox/command_client.go index f3c9ad2a..199dce0d 100644 --- a/experimental/libbox/command_client.go +++ b/experimental/libbox/command_client.go @@ -31,6 +31,7 @@ type CommandClientHandler interface { WriteGroups(message OutboundGroupIterator) InitializeClashMode(modeList StringIterator, currentMode string) UpdateClashMode(newMode string) + WriteConnections(message *Connections) } func NewStandaloneCommandClient() *CommandClient { @@ -116,6 +117,13 @@ func (c *CommandClient) Connect() error { return nil } go c.handleModeConn(conn) + case CommandConnections: + err = binary.Write(conn, binary.BigEndian, c.options.StatusInterval) + if err != nil { + return E.Cause(err, "write interval") + } + c.handler.Connected() + go c.handleConnectionsConn(conn) } return nil } diff --git a/experimental/libbox/command_close_connection.go b/experimental/libbox/command_close_connection.go new file mode 100644 index 00000000..62f5dc84 --- /dev/null +++ b/experimental/libbox/command_close_connection.go @@ -0,0 +1,53 @@ +package libbox + +import ( + "bufio" + "net" + + "github.com/sagernet/sing-box/experimental/clashapi" + "github.com/sagernet/sing/common/binary" + E "github.com/sagernet/sing/common/exceptions" + + "github.com/gofrs/uuid/v5" +) + +func (c *CommandClient) CloseConnection(connId string) error { + conn, err := c.directConnect() + if err != nil { + return err + } + defer conn.Close() + writer := bufio.NewWriter(conn) + err = binary.WriteData(writer, binary.BigEndian, connId) + if err != nil { + return err + } + err = writer.Flush() + if err != nil { + return err + } + return readError(conn) +} + +func (s *CommandServer) handleCloseConnection(conn net.Conn) error { + reader := bufio.NewReader(conn) + var connId string + err := binary.ReadData(reader, binary.BigEndian, &connId) + if err != nil { + return E.Cause(err, "read connection id") + } + service := s.service + if service == nil { + return writeError(conn, E.New("service not ready")) + } + clashServer := service.instance.Router().ClashServer() + if clashServer == nil { + return writeError(conn, E.New("Clash API disabled")) + } + targetConn := clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(connId)) + if targetConn == nil { + return writeError(conn, E.New("connection already closed")) + } + targetConn.Close() + return writeError(conn, nil) +} diff --git a/experimental/libbox/command_connections.go b/experimental/libbox/command_connections.go new file mode 100644 index 00000000..9aaa995a --- /dev/null +++ b/experimental/libbox/command_connections.go @@ -0,0 +1,269 @@ +package libbox + +import ( + "bufio" + "net" + "slices" + "strings" + "time" + + "github.com/sagernet/sing-box/experimental/clashapi" + "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol" + "github.com/sagernet/sing/common/binary" + E "github.com/sagernet/sing/common/exceptions" + M "github.com/sagernet/sing/common/metadata" + + "github.com/gofrs/uuid/v5" +) + +func (c *CommandClient) handleConnectionsConn(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + var connections Connections + for { + rawConnections = nil + err := binary.ReadData(reader, binary.BigEndian, &connections.connections) + if err != nil { + c.handler.Disconnected(err.Error()) + return + } + c.handler.WriteConnections(&connections) + } +} + +func (s *CommandServer) handleConnectionsConn(conn net.Conn) error { + var interval int64 + err := binary.Read(conn, binary.BigEndian, &interval) + if err != nil { + return E.Cause(err, "read interval") + } + ticker := time.NewTicker(time.Duration(interval)) + defer ticker.Stop() + ctx := connKeepAlive(conn) + var trafficManager *trafficontrol.Manager + for { + service := s.service + if service != nil { + clashServer := service.instance.Router().ClashServer() + if clashServer == nil { + return E.New("Clash API disabled") + } + trafficManager = clashServer.(*clashapi.Server).TrafficManager() + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } + var ( + connections = make(map[uuid.UUID]*Connection) + outConnections []Connection + ) + writer := bufio.NewWriter(conn) + for { + outConnections = outConnections[:0] + for _, connection := range trafficManager.Connections() { + outConnections = append(outConnections, newConnection(connections, connection, false)) + } + for _, connection := range trafficManager.ClosedConnections() { + outConnections = append(outConnections, newConnection(connections, connection, true)) + } + err = binary.WriteData(writer, binary.BigEndian, outConnections) + if err != nil { + return err + } + err = writer.Flush() + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +const ( + ConnectionStateAll = iota + ConnectionStateActive + ConnectionStateClosed +) + +type Connections struct { + connections []Connection + filteredConnections []Connection + outConnections *[]Connection +} + +func (c *Connections) FilterState(state int32) { + c.filteredConnections = c.filteredConnections[:0] + switch state { + case ConnectionStateAll: + c.filteredConnections = append(c.filteredConnections, c.connections...) + case ConnectionStateActive: + for _, connection := range c.connections { + if connection.ClosedAt == 0 { + c.filteredConnections = append(c.filteredConnections, connection) + } + } + case ConnectionStateClosed: + for _, connection := range c.connections { + if connection.ClosedAt != 0 { + c.filteredConnections = append(c.filteredConnections, connection) + } + } + } +} + +func (c *Connections) SortByDate() { + slices.SortStableFunc(c.filteredConnections, func(x, y Connection) int { + if x.CreatedAt < y.CreatedAt { + return 1 + } else if x.CreatedAt > y.CreatedAt { + return -1 + } else { + return strings.Compare(y.ID, x.ID) + } + }) +} + +func (c *Connections) SortByTraffic() { + slices.SortStableFunc(c.filteredConnections, func(x, y Connection) int { + xTraffic := x.Uplink + x.Downlink + yTraffic := y.Uplink + y.Downlink + if xTraffic < yTraffic { + return 1 + } else if xTraffic > yTraffic { + return -1 + } else { + return strings.Compare(y.ID, x.ID) + } + }) +} + +func (c *Connections) SortByTrafficTotal() { + slices.SortStableFunc(c.filteredConnections, func(x, y Connection) int { + xTraffic := x.UplinkTotal + x.DownlinkTotal + yTraffic := y.UplinkTotal + y.DownlinkTotal + if xTraffic < yTraffic { + return 1 + } else if xTraffic > yTraffic { + return -1 + } else { + return strings.Compare(y.ID, x.ID) + } + }) +} + +func (c *Connections) Iterator() ConnectionIterator { + return newPtrIterator(c.filteredConnections) +} + +type Connection struct { + ID string + Inbound string + InboundType string + IPVersion int32 + Network string + Source string + Destination string + Domain string + Protocol string + User string + FromOutbound string + CreatedAt int64 + ClosedAt int64 + Uplink int64 + Downlink int64 + UplinkTotal int64 + DownlinkTotal int64 + Rule string + Outbound string + OutboundType string + ChainList []string +} + +func (c *Connection) Chain() StringIterator { + return newIterator(c.ChainList) +} + +func (c *Connection) DisplayDestination() string { + destination := M.ParseSocksaddr(c.Destination) + if destination.IsIP() && c.Domain != "" { + destination = M.Socksaddr{ + Fqdn: c.Domain, + Port: destination.Port, + } + return destination.String() + } + return c.Destination +} + +type ConnectionIterator interface { + Next() *Connection + HasNext() bool +} + +func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) Connection { + if oldConnection, loaded := connections[metadata.ID]; loaded { + if isClosed { + if oldConnection.ClosedAt == 0 { + oldConnection.Uplink = 0 + oldConnection.Downlink = 0 + oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli() + } + return *oldConnection + } + lastUplink := oldConnection.UplinkTotal + lastDownlink := oldConnection.DownlinkTotal + uplinkTotal := metadata.Upload.Load() + downlinkTotal := metadata.Download.Load() + oldConnection.Uplink = uplinkTotal - lastUplink + oldConnection.Downlink = downlinkTotal - lastDownlink + oldConnection.UplinkTotal = uplinkTotal + oldConnection.DownlinkTotal = downlinkTotal + return *oldConnection + } + var rule string + if metadata.Rule != nil { + rule = metadata.Rule.String() + } + uplinkTotal := metadata.Upload.Load() + downlinkTotal := metadata.Download.Load() + uplink := uplinkTotal + downlink := downlinkTotal + var closedAt int64 + if !metadata.ClosedAt.IsZero() { + closedAt = metadata.ClosedAt.UnixMilli() + uplink = 0 + downlink = 0 + } + connection := Connection{ + ID: metadata.ID.String(), + Inbound: metadata.Metadata.Inbound, + InboundType: metadata.Metadata.InboundType, + IPVersion: int32(metadata.Metadata.IPVersion), + Network: metadata.Metadata.Network, + Source: metadata.Metadata.Source.String(), + Destination: metadata.Metadata.Destination.String(), + Domain: metadata.Metadata.Domain, + Protocol: metadata.Metadata.Protocol, + User: metadata.Metadata.User, + FromOutbound: metadata.Metadata.Outbound, + CreatedAt: metadata.CreatedAt.UnixMilli(), + ClosedAt: closedAt, + Uplink: uplink, + Downlink: downlink, + UplinkTotal: uplinkTotal, + DownlinkTotal: downlinkTotal, + Rule: rule, + Outbound: metadata.Outbound, + OutboundType: metadata.OutboundType, + ChainList: metadata.Chain, + } + connections[metadata.ID] = &connection + return connection +} diff --git a/experimental/libbox/command_group.go b/experimental/libbox/command_group.go index 21fd39d2..a5572ea1 100644 --- a/experimental/libbox/command_group.go +++ b/experimental/libbox/command_group.go @@ -14,36 +14,6 @@ import ( "github.com/sagernet/sing/service" ) -type OutboundGroup struct { - Tag string - Type string - Selectable bool - Selected string - IsExpand bool - items []*OutboundGroupItem -} - -func (g *OutboundGroup) GetItems() OutboundGroupItemIterator { - return newIterator(g.items) -} - -type OutboundGroupIterator interface { - Next() *OutboundGroup - HasNext() bool -} - -type OutboundGroupItem struct { - Tag string - Type string - URLTestTime int64 - URLTestDelay int32 -} - -type OutboundGroupItemIterator interface { - Next() *OutboundGroupItem - HasNext() bool -} - func (c *CommandClient) handleGroupConn(conn net.Conn) { defer conn.Close() @@ -92,6 +62,36 @@ func (s *CommandServer) handleGroupConn(conn net.Conn) error { } } +type OutboundGroup struct { + Tag string + Type string + Selectable bool + Selected string + IsExpand bool + items []*OutboundGroupItem +} + +func (g *OutboundGroup) GetItems() OutboundGroupItemIterator { + return newIterator(g.items) +} + +type OutboundGroupIterator interface { + Next() *OutboundGroup + HasNext() bool +} + +type OutboundGroupItem struct { + Tag string + Type string + URLTestTime int64 + URLTestDelay int32 +} + +type OutboundGroupItemIterator interface { + Next() *OutboundGroupItem + HasNext() bool +} + func readGroups(reader io.Reader) (OutboundGroupIterator, error) { var groupLength uint16 err := binary.Read(reader, binary.BigEndian, &groupLength) diff --git a/experimental/libbox/command_server.go b/experimental/libbox/command_server.go index da931ef5..8918756d 100644 --- a/experimental/libbox/command_server.go +++ b/experimental/libbox/command_server.go @@ -33,6 +33,8 @@ type CommandServer struct { urlTestUpdate chan struct{} modeUpdate chan struct{} logReset chan struct{} + + closedConnections []Connection } type CommandServerHandler interface { @@ -176,6 +178,10 @@ func (s *CommandServer) handleConnection(conn net.Conn) error { return s.handleGetSystemProxyStatus(conn) case CommandSetSystemProxyEnabled: return s.handleSetSystemProxyEnabled(conn) + case CommandConnections: + return s.handleConnectionsConn(conn) + case CommandCloseConnection: + return s.handleCloseConnection(conn) default: return E.New("unknown command: ", command) } diff --git a/experimental/libbox/command_status.go b/experimental/libbox/command_status.go index 7f1eca8c..4ab09d4b 100644 --- a/experimental/libbox/command_status.go +++ b/experimental/libbox/command_status.go @@ -36,7 +36,7 @@ func (s *CommandServer) readStatus() StatusMessage { trafficManager := clashServer.(*clashapi.Server).TrafficManager() message.Uplink, message.Downlink = trafficManager.Now() message.UplinkTotal, message.DownlinkTotal = trafficManager.Total() - message.ConnectionsIn = int32(trafficManager.Connections()) + message.ConnectionsIn = int32(trafficManager.ConnectionsLen()) } } diff --git a/experimental/libbox/iterator.go b/experimental/libbox/iterator.go index db64a259..530a7e43 100644 --- a/experimental/libbox/iterator.go +++ b/experimental/libbox/iterator.go @@ -17,6 +17,10 @@ func newIterator[T any](values []T) *iterator[T] { return &iterator[T]{values} } +func newPtrIterator[T any](values []T) *iterator[*T] { + return &iterator[*T]{common.Map(values, func(value T) *T { return &value })} +} + func (i *iterator[T]) Next() T { if len(i.values) == 0 { return common.DefaultValue[T]() diff --git a/experimental/libbox/service.go b/experimental/libbox/service.go index 0a54d7ab..c6509010 100644 --- a/experimental/libbox/service.go +++ b/experimental/libbox/service.go @@ -149,33 +149,6 @@ func (w *platformInterfaceWrapper) OpenTun(options *tun.Options, platformOptions return tun.New(*options) } -func (w *platformInterfaceWrapper) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*process.Info, error) { - var uid int32 - if w.useProcFS { - uid = procfs.ResolveSocketByProcSearch(network, source, destination) - if uid == -1 { - return nil, E.New("procfs: not found") - } - } else { - var ipProtocol int32 - switch N.NetworkName(network) { - case N.NetworkTCP: - ipProtocol = syscall.IPPROTO_TCP - case N.NetworkUDP: - ipProtocol = syscall.IPPROTO_UDP - default: - return nil, E.New("unknown network: ", network) - } - var err error - uid, err = w.iif.FindConnectionOwner(ipProtocol, source.Addr().String(), int32(source.Port()), destination.Addr().String(), int32(destination.Port())) - if err != nil { - return nil, err - } - } - packageName, _ := w.iif.PackageNameByUid(uid) - return &process.Info{UserId: uid, PackageName: packageName}, nil -} - func (w *platformInterfaceWrapper) UsePlatformDefaultInterfaceMonitor() bool { return w.iif.UsePlatformDefaultInterfaceMonitor() } @@ -229,6 +202,33 @@ func (w *platformInterfaceWrapper) ReadWIFIState() adapter.WIFIState { return (adapter.WIFIState)(*wifiState) } +func (w *platformInterfaceWrapper) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*process.Info, error) { + var uid int32 + if w.useProcFS { + uid = procfs.ResolveSocketByProcSearch(network, source, destination) + if uid == -1 { + return nil, E.New("procfs: not found") + } + } else { + var ipProtocol int32 + switch N.NetworkName(network) { + case N.NetworkTCP: + ipProtocol = syscall.IPPROTO_TCP + case N.NetworkUDP: + ipProtocol = syscall.IPPROTO_UDP + default: + return nil, E.New("unknown network: ", network) + } + var err error + uid, err = w.iif.FindConnectionOwner(ipProtocol, source.Addr().String(), int32(source.Port()), destination.Addr().String(), int32(destination.Port())) + if err != nil { + return nil, err + } + } + packageName, _ := w.iif.PackageNameByUid(uid) + return &process.Info{UserId: uid, PackageName: packageName}, nil +} + func (w *platformInterfaceWrapper) DisableColors() bool { return runtime.GOOS != "android" } diff --git a/experimental/libbox/setup.go b/experimental/libbox/setup.go index ea468f39..31611354 100644 --- a/experimental/libbox/setup.go +++ b/experimental/libbox/setup.go @@ -4,10 +4,12 @@ import ( "os" "os/user" "strconv" + "time" "github.com/sagernet/sing-box/common/humanize" C "github.com/sagernet/sing-box/constant" _ "github.com/sagernet/sing-box/include" + "github.com/sagernet/sing-box/log" ) var ( @@ -59,6 +61,10 @@ func FormatMemoryBytes(length int64) string { return humanize.MemoryBytes(uint64(length)) } +func FormatDuration(duration int64) string { + return log.FormatDuration(time.Duration(duration) * time.Millisecond) +} + func ProxyDisplayType(proxyType string) string { return C.ProxyDisplayName(proxyType) } diff --git a/inbound/builder.go b/inbound/builder.go index 513b016f..ddfd361d 100644 --- a/inbound/builder.go +++ b/inbound/builder.go @@ -11,43 +11,43 @@ import ( E "github.com/sagernet/sing/common/exceptions" ) -func New(ctx context.Context, router adapter.Router, logger log.ContextLogger, options option.Inbound, platformInterface platform.Interface) (adapter.Inbound, error) { +func New(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, options option.Inbound, platformInterface platform.Interface) (adapter.Inbound, error) { if options.Type == "" { return nil, E.New("missing inbound type") } switch options.Type { case C.TypeTun: - return NewTun(ctx, router, logger, options.Tag, options.TunOptions, platformInterface) + return NewTun(ctx, router, logger, tag, options.TunOptions, platformInterface) case C.TypeRedirect: - return NewRedirect(ctx, router, logger, options.Tag, options.RedirectOptions), nil + return NewRedirect(ctx, router, logger, tag, options.RedirectOptions), nil case C.TypeTProxy: - return NewTProxy(ctx, router, logger, options.Tag, options.TProxyOptions), nil + return NewTProxy(ctx, router, logger, tag, options.TProxyOptions), nil case C.TypeDirect: - return NewDirect(ctx, router, logger, options.Tag, options.DirectOptions), nil + return NewDirect(ctx, router, logger, tag, options.DirectOptions), nil case C.TypeSOCKS: - return NewSocks(ctx, router, logger, options.Tag, options.SocksOptions), nil + return NewSocks(ctx, router, logger, tag, options.SocksOptions), nil case C.TypeHTTP: - return NewHTTP(ctx, router, logger, options.Tag, options.HTTPOptions) + return NewHTTP(ctx, router, logger, tag, options.HTTPOptions) case C.TypeMixed: - return NewMixed(ctx, router, logger, options.Tag, options.MixedOptions), nil + return NewMixed(ctx, router, logger, tag, options.MixedOptions), nil case C.TypeShadowsocks: - return NewShadowsocks(ctx, router, logger, options.Tag, options.ShadowsocksOptions) + return NewShadowsocks(ctx, router, logger, tag, options.ShadowsocksOptions) case C.TypeVMess: - return NewVMess(ctx, router, logger, options.Tag, options.VMessOptions) + return NewVMess(ctx, router, logger, tag, options.VMessOptions) case C.TypeTrojan: - return NewTrojan(ctx, router, logger, options.Tag, options.TrojanOptions) + return NewTrojan(ctx, router, logger, tag, options.TrojanOptions) case C.TypeNaive: - return NewNaive(ctx, router, logger, options.Tag, options.NaiveOptions) + return NewNaive(ctx, router, logger, tag, options.NaiveOptions) case C.TypeHysteria: - return NewHysteria(ctx, router, logger, options.Tag, options.HysteriaOptions) + return NewHysteria(ctx, router, logger, tag, options.HysteriaOptions) case C.TypeShadowTLS: - return NewShadowTLS(ctx, router, logger, options.Tag, options.ShadowTLSOptions) + return NewShadowTLS(ctx, router, logger, tag, options.ShadowTLSOptions) case C.TypeVLESS: - return NewVLESS(ctx, router, logger, options.Tag, options.VLESSOptions) + return NewVLESS(ctx, router, logger, tag, options.VLESSOptions) case C.TypeTUIC: - return NewTUIC(ctx, router, logger, options.Tag, options.TUICOptions) + return NewTUIC(ctx, router, logger, tag, options.TUICOptions) case C.TypeHysteria2: - return NewHysteria2(ctx, router, logger, options.Tag, options.Hysteria2Options) + return NewHysteria2(ctx, router, logger, tag, options.Hysteria2Options) default: return nil, E.New("unknown inbound type: ", options.Type) } diff --git a/log/format.go b/log/format.go index 6fb91d31..6f4347b1 100644 --- a/log/format.go +++ b/log/format.go @@ -43,7 +43,7 @@ func (f Formatter) Format(ctx context.Context, level Level, tag string, message id, hasId = IDFromContext(ctx) } if hasId { - activeDuration := formatDuration(time.Since(id.CreatedAt)) + activeDuration := FormatDuration(time.Since(id.CreatedAt)) if !f.DisableColors { var color aurora.Color color = aurora.Color(uint8(id.ID)) @@ -113,7 +113,7 @@ func (f Formatter) FormatWithSimple(ctx context.Context, level Level, tag string id, hasId = IDFromContext(ctx) } if hasId { - activeDuration := formatDuration(time.Since(id.CreatedAt)) + activeDuration := FormatDuration(time.Since(id.CreatedAt)) if !f.DisableColors { var color aurora.Color color = aurora.Color(uint8(id.ID)) @@ -163,7 +163,7 @@ func xd(value int, x int) string { return message } -func formatDuration(duration time.Duration) string { +func FormatDuration(duration time.Duration) string { if duration < time.Second { return F.ToString(duration.Milliseconds(), "ms") } else if duration < time.Minute {