From 15c36a9580bcc7e792ea2759c560fefb26895e3b Mon Sep 17 00:00:00 2001 From: yuzuki999 Date: Tue, 16 May 2023 09:15:29 +0800 Subject: [PATCH] update refactor limiter fix getLink bug add connection limit move limit config to ControllerConfig del dynamic speed limit (next version will be re add) del online ip sync (next version will be re add) --- api/iprecoder/interface.go | 6 +- api/iprecoder/recorder.go | 6 +- api/iprecoder/redis.go | 8 +- api/iprecoder/redis_test.go | 4 +- api/panel/node.go | 8 +- api/panel/panel.go | 4 - api/panel/user.go | 9 - common/rate/rate.go | 28 +++ conf/conf.go | 2 +- conf/node.go | 21 +- core/app/dispatcher/default.go | 84 ++++--- core/app/dispatcher/limiter.go | 305 ----------------------- core/app/dispatcher/rule.go | 42 ---- core/distro/all/all.go | 3 - core/inbound.go | 26 -- core/user.go | 18 -- limiter/conn.go | 96 +++++++ limiter/conn_test.go | 38 +++ limiter/dynamic.go | 37 +++ limiter/limiter.go | 166 ++++++++++++ limiter/rule.go | 34 +++ limiter/task.go | 1 + main.go | 6 +- node/{controller => }/controller.go | 52 ++-- node/{controller => }/controller_test.go | 4 +- node/{controller => }/inbound.go | 8 +- node/{controller => }/inbound_test.go | 4 +- node/{controller => }/lego/cert.go | 0 node/{controller => }/lego/lego.go | 0 node/{controller => }/lego/lego_test.go | 0 node/{controller => }/lego/user.go | 0 node/node.go | 7 +- node/{controller => }/outbound.go | 2 +- node/{controller => }/task.go | 132 +++------- node/{controller => }/user.go | 20 +- 35 files changed, 564 insertions(+), 617 deletions(-) create mode 100644 common/rate/rate.go delete mode 100644 core/app/dispatcher/limiter.go delete mode 100644 core/app/dispatcher/rule.go create mode 100644 limiter/conn.go create mode 100644 limiter/conn_test.go create mode 100644 limiter/dynamic.go create mode 100644 limiter/limiter.go create mode 100644 limiter/rule.go create mode 100644 limiter/task.go rename node/{controller => }/controller.go (79%) rename node/{controller => }/controller_test.go (96%) rename node/{controller => }/inbound.go (98%) rename node/{controller => }/inbound_test.go (96%) rename node/{controller => }/lego/cert.go (100%) rename node/{controller => }/lego/lego.go (100%) rename node/{controller => }/lego/lego_test.go (100%) rename node/{controller => }/lego/user.go (100%) rename node/{controller => }/outbound.go (98%) rename node/{controller => }/task.go (64%) rename node/{controller => }/user.go (76%) diff --git a/api/iprecoder/interface.go b/api/iprecoder/interface.go index 570c558..6095b42 100644 --- a/api/iprecoder/interface.go +++ b/api/iprecoder/interface.go @@ -1,7 +1,9 @@ package iprecoder -import "github.com/Yuzuki616/V2bX/core/app/dispatcher" +import ( + "github.com/Yuzuki616/V2bX/limiter" +) type IpRecorder interface { - SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpList, error) + SyncOnlineIp(Ips []limiter.UserIpList) ([]limiter.UserIpList, error) } diff --git a/api/iprecoder/recorder.go b/api/iprecoder/recorder.go index ce91561..c8204b8 100644 --- a/api/iprecoder/recorder.go +++ b/api/iprecoder/recorder.go @@ -3,7 +3,7 @@ package iprecoder import ( "errors" "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" + "github.com/Yuzuki616/V2bX/limiter" "github.com/go-resty/resty/v2" "github.com/goccy/go-json" "time" @@ -21,7 +21,7 @@ func NewRecorder(c *conf.RecorderConfig) *Recorder { } } -func (r *Recorder) SyncOnlineIp(ips []dispatcher.UserIpList) ([]dispatcher.UserIpList, error) { +func (r *Recorder) SyncOnlineIp(ips []limiter.UserIpList) ([]limiter.UserIpList, error) { rsp, err := r.client.R(). SetBody(ips). Post(r.Url + "/api/v1/SyncOnlineIp?token=" + r.Token) @@ -31,7 +31,7 @@ func (r *Recorder) SyncOnlineIp(ips []dispatcher.UserIpList) ([]dispatcher.UserI if rsp.StatusCode() != 200 { return nil, errors.New(rsp.String()) } - ips = []dispatcher.UserIpList{} + ips = []limiter.UserIpList{} err = json.Unmarshal(rsp.Body(), &ips) if err != nil { return nil, err diff --git a/api/iprecoder/redis.go b/api/iprecoder/redis.go index 490d425..5222ce3 100644 --- a/api/iprecoder/redis.go +++ b/api/iprecoder/redis.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" + "github.com/Yuzuki616/V2bX/limiter" "github.com/go-redis/redis/v8" "strconv" "time" @@ -26,7 +26,7 @@ func NewRedis(c *conf.RedisConfig) *Redis { } } -func (r *Redis) SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpList, error) { +func (r *Redis) SyncOnlineIp(Ips []limiter.UserIpList) ([]limiter.UserIpList, error) { ctx := context.Background() for i := range Ips { err := r.client.SAdd(ctx, "UserList", Ips[i].Uid).Err() @@ -46,7 +46,7 @@ func (r *Redis) SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpLi if c.Err() != nil { return nil, fmt.Errorf("get user list failed: %s", c.Err()) } - Ips = make([]dispatcher.UserIpList, 0, len(c.Val())) + Ips = make([]limiter.UserIpList, 0, len(c.Val())) for _, uid := range c.Val() { uidInt, err := strconv.Atoi(uid) if err != nil { @@ -56,7 +56,7 @@ func (r *Redis) SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpLi if ips.Err() != nil { return nil, fmt.Errorf("get ip list failed: %s", ips.Err()) } - Ips = append(Ips, dispatcher.UserIpList{ + Ips = append(Ips, limiter.UserIpList{ Uid: uidInt, IpList: ips.Val(), }) diff --git a/api/iprecoder/redis_test.go b/api/iprecoder/redis_test.go index 5e08f8e..3e013be 100644 --- a/api/iprecoder/redis_test.go +++ b/api/iprecoder/redis_test.go @@ -2,7 +2,7 @@ package iprecoder import ( "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" + "github.com/Yuzuki616/V2bX/limiter" "log" "testing" ) @@ -13,7 +13,7 @@ func TestRedis_SyncOnlineIp(t *testing.T) { Password: "", Db: 0, }) - users, err := r.SyncOnlineIp([]dispatcher.UserIpList{ + users, err := r.SyncOnlineIp([]limiter.UserIpList{ {1, []string{"5.5.5.5", "4.4.4.4"}}, }) if err != nil { diff --git a/api/panel/node.go b/api/panel/node.go index 6b9f145..9dbf29b 100644 --- a/api/panel/node.go +++ b/api/panel/node.go @@ -36,12 +36,8 @@ type DestinationRule struct { Pattern *regexp.Regexp } type localNodeConfig struct { - NodeId int - NodeType string - EnableVless bool - EnableTls bool - SpeedLimit int - DeviceLimit int + NodeId int + NodeType string } func (c *Client) GetNodeInfo() (nodeInfo *NodeInfo, err error) { diff --git a/api/panel/panel.go b/api/panel/panel.go index b61678f..76643d9 100644 --- a/api/panel/panel.go +++ b/api/panel/panel.go @@ -28,8 +28,6 @@ type Client struct { Key string NodeType string NodeId int - SpeedLimit int - DeviceLimit int LocalRuleList []DestinationRule etag string } @@ -69,8 +67,6 @@ func New(c *conf.ApiConfig) (Panel, error) { Key: c.Key, APIHost: c.APIHost, NodeType: c.NodeType, - SpeedLimit: c.SpeedLimit, - DeviceLimit: c.DeviceLimit, NodeId: c.NodeID, LocalRuleList: localRuleList, }, nil diff --git a/api/panel/user.go b/api/panel/user.go index 1dd999c..fbb4992 100644 --- a/api/panel/user.go +++ b/api/panel/user.go @@ -10,18 +10,9 @@ type OnlineUser struct { IP string } -type V2RayUserInfo struct { - Uuid string `json:"uuid"` - Email string `json:"email"` - AlterId int `json:"alter_id"` -} -type TrojanUserInfo struct { - Password string `json:"password"` -} type UserInfo struct { Id int `json:"id"` Uuid string `json:"uuid"` - Email string `json:"-"` SpeedLimit int `json:"speed_limit"` Traffic int64 `json:"-"` } diff --git a/common/rate/rate.go b/common/rate/rate.go new file mode 100644 index 0000000..a41e6b6 --- /dev/null +++ b/common/rate/rate.go @@ -0,0 +1,28 @@ +package rate + +import ( + "github.com/juju/ratelimit" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/buf" +) + +type Writer struct { + writer buf.Writer + limiter *ratelimit.Bucket +} + +func NewRateLimitWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer { + return &Writer{ + writer: writer, + limiter: limiter, + } +} + +func (w *Writer) Close() error { + return common.Close(w.writer) +} + +func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error { + w.limiter.Wait(int64(mb.Len())) + return w.writer.WriteMultiBuffer(mb) +} diff --git a/conf/conf.go b/conf/conf.go index e00b48b..ceab89f 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -67,7 +67,7 @@ func (p *Conf) Watch(filePath string, reload func()) error { } case err := <-watcher.Errors: if err != nil { - log.Panicf("watcher error: %s", err) + log.Printf("File watcher error: %s", err) } } } diff --git a/conf/node.go b/conf/node.go index 8e4d118..941c574 100644 --- a/conf/node.go +++ b/conf/node.go @@ -52,6 +52,11 @@ type ControllerConfig struct { SendIP string `yaml:"SendIP"` EnableDNS bool `yaml:"EnableDNS"` DNSType string `yaml:"DNSType"` + EnableVless bool `yaml:"EnableVless"` + EnableTls bool `yaml:"EnableTls"` + SpeedLimit int `yaml:"SpeedLimit"` + IPLimit int `yaml:"DeviceLimit"` + ConnLimit int `yaml:"ConnLimit"` DisableUploadTraffic bool `yaml:"DisableUploadTraffic"` DisableGetRule bool `yaml:"DisableGetRule"` EnableProxyProtocol bool `yaml:"EnableProxyProtocol"` @@ -67,16 +72,12 @@ type ControllerConfig struct { } type ApiConfig struct { - APIHost string `yaml:"ApiHost"` - NodeID int `yaml:"NodeID"` - Key string `yaml:"ApiKey"` - NodeType string `yaml:"NodeType"` - EnableVless bool `yaml:"EnableVless"` - Timeout int `yaml:"Timeout"` - SpeedLimit int `yaml:"SpeedLimit"` - DeviceLimit int `yaml:"DeviceLimit"` - RuleListPath string `yaml:"RuleListPath"` - DisableCustomConfig bool `yaml:"DisableCustomConfig"` + APIHost string `yaml:"ApiHost"` + NodeID int `yaml:"NodeID"` + Key string `yaml:"ApiKey"` + NodeType string `yaml:"NodeType"` + Timeout int `yaml:"Timeout"` + RuleListPath string `yaml:"RuleListPath"` } type NodeConfig struct { diff --git a/core/app/dispatcher/default.go b/core/app/dispatcher/default.go index daa6c29..7ef40c5 100644 --- a/core/app/dispatcher/default.go +++ b/core/app/dispatcher/default.go @@ -5,6 +5,8 @@ package dispatcher import ( "context" "fmt" + "github.com/Yuzuki616/V2bX/common/rate" + "github.com/Yuzuki616/V2bX/limiter" routingSession "github.com/xtls/xray-core/features/routing/session" "strings" "sync" @@ -89,14 +91,12 @@ func (r *cachedReader) Interrupt() { // DefaultDispatcher is a default implementation of Dispatcher. type DefaultDispatcher struct { - ohm outbound.Manager - router routing.Router - policy policy.Manager - stats stats.Manager - dns dns.Client - fdns dns.FakeDNSEngine - Limiter *Limiter - RuleManager *Rule + ohm outbound.Manager + router routing.Router + policy policy.Manager + stats stats.Manager + dns dns.Client + fdns dns.FakeDNSEngine } func init() { @@ -121,8 +121,6 @@ func (d *DefaultDispatcher) Init(config *Config, om outbound.Manager, router rou d.policy = pm d.stats = sm d.dns = dns - d.Limiter = NewLimiter() - d.RuleManager = NewRule() return nil } @@ -139,7 +137,7 @@ func (*DefaultDispatcher) Start() error { // Close implements common.Closable. func (*DefaultDispatcher) Close() error { return nil } -func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link) { +func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link, *limiter.Limiter, error) { downOpt := pipe.OptionsFromContext(ctx) upOpt := downOpt @@ -226,21 +224,31 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn if sessionInbound != nil { user = sessionInbound.User } - + var limit *limiter.Limiter if user != nil && len(user.Email) > 0 { - // Speed Limit and Device Limit - bucket, ok, reject := d.Limiter.CheckSpeedAndDeviceLimit(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String()) - if reject { - newError("Devices reach the limit: ", user.Email).AtError().WriteToLog() + var err error + limit, err = limiter.GetLimiter(sessionInbound.Tag) + if err != nil { + newError("Get limit info error: ", err).AtError().WriteToLog() common.Close(outboundLink.Writer) common.Close(inboundLink.Writer) common.Interrupt(outboundLink.Reader) common.Interrupt(inboundLink.Reader) - return nil, nil + return nil, nil, nil, newError("Get limit info error: ", err) } - if ok { - inboundLink.Writer = d.Limiter.RateWriter(inboundLink.Writer, bucket) - outboundLink.Writer = d.Limiter.RateWriter(outboundLink.Writer, bucket) + // Speed Limit and Device Limit + w, reject := limit.CheckLimit(user.Email, sessionInbound.Source.Address.IP().String()) + if reject { + newError("Limited ", user.Email, " by conn or ip").AtWarning().WriteToLog() + common.Close(outboundLink.Writer) + common.Close(inboundLink.Writer) + common.Interrupt(outboundLink.Reader) + common.Interrupt(inboundLink.Reader) + return nil, nil, nil, newError("Limited ", user.Email, " by conn or ip") + } + if w != nil { + inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w) + outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w) } p := d.policy.ForLevel(user.Level) if p.Stats.UserUplink { @@ -263,7 +271,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn } } - return inboundLink, outboundLink + return inboundLink, outboundLink, limit, nil } func (d *DefaultDispatcher) shouldOverride(ctx context.Context, result SniffResult, request session.SniffingRequest, destination net.Destination) bool { @@ -313,11 +321,13 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin content = new(session.Content) ctx = session.ContextWithContent(ctx, content) } - sniffingRequest := content.SniffingRequest - inbound, outbound := d.getLink(ctx, destination.Network, sniffingRequest) + inbound, outbound, l, err := d.getLink(ctx, destination.Network, sniffingRequest) + if err != nil { + return nil, err + } if !sniffingRequest.Enabled { - go d.routedDispatch(ctx, outbound, destination, "") + go d.routedDispatch(ctx, outbound, destination, l) } else { go func() { cReader := &cachedReader{ @@ -338,7 +348,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin ob.Target = destination } } - d.routedDispatch(ctx, outbound, destination, content.Protocol) + d.routedDispatch(ctx, outbound, destination, l) }() } return inbound, nil @@ -360,7 +370,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De } sniffingRequest := content.SniffingRequest if !sniffingRequest.Enabled { - go d.routedDispatch(ctx, outbound, destination, content.Protocol) + go d.routedDispatch(ctx, outbound, destination, nil) } else { go func() { cReader := &cachedReader{ @@ -381,10 +391,9 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De ob.Target = destination } } - d.routedDispatch(ctx, outbound, destination, content.Protocol) + d.routedDispatch(ctx, outbound, destination, nil) }() } - return nil } @@ -434,7 +443,7 @@ func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, netw return contentResult, contentErr } -func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination, protocol string) { +func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination, l *limiter.Limiter) { ob := session.OutboundFromContext(ctx) if hosts, ok := d.dns.(dns.HostsLookup); ok && destination.Address.Family().IsDomain() { proxied := hosts.LookupHosts(ob.Target.String()) @@ -455,9 +464,22 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport. sessionInbound := session.InboundFromContext(ctx) // Whether the inbound connection contains a user if sessionInbound.User != nil { - if d.RuleManager.Detect(sessionInbound.Tag, destination.String(), protocol) { + if l == nil { + var err error + l, err = limiter.GetLimiter(sessionInbound.Tag) + if err != nil { + newError("Get limiter error: ", err).AtError().WriteToLog() + common.Close(link.Writer) + common.Interrupt(link.Reader) + return + } + } else { + defer func() { + l.ConnLimiter.DelConnCount(sessionInbound.User.Email, sessionInbound.Source.Address.IP().String()) + }() + } + if l.CheckDomainRule(destination.String()) { newError(fmt.Sprintf("User %s access %s reject by rule", sessionInbound.User.Email, destination.String())).AtError().WriteToLog() - newError("destination is reject by rule") common.Close(link.Writer) common.Interrupt(link.Reader) return diff --git a/core/app/dispatcher/limiter.go b/core/app/dispatcher/limiter.go deleted file mode 100644 index 0b3a9d6..0000000 --- a/core/app/dispatcher/limiter.go +++ /dev/null @@ -1,305 +0,0 @@ -package dispatcher - -import ( - "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/juju/ratelimit" - "github.com/xtls/xray-core/common" - "github.com/xtls/xray-core/common/buf" - "io" - "sync" - "time" -) - -type UserLimitInfo struct { - UID int - SpeedLimit int - DynamicSpeedLimit int - ExpireTime int64 -} - -type InboundInfo struct { - Tag string - NodeSpeedLimit int - NodeDeviceLimit int - UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo - SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket - UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool -} - -type Limiter struct { - InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo -} - -func NewLimiter() *Limiter { - return &Limiter{ - InboundInfo: new(sync.Map), - } -} - -func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, users []panel.UserInfo) error { - inboundInfo := &InboundInfo{ - Tag: tag, - NodeSpeedLimit: nodeInfo.SpeedLimit, - NodeDeviceLimit: nodeInfo.DeviceLimit, - UserLimitInfo: new(sync.Map), - SpeedLimiter: new(sync.Map), - UserOnlineIP: new(sync.Map), - } - for i := range users { - if users[i].SpeedLimit != 0 { - userLimit := &UserLimitInfo{ - UID: users[i].Id, - SpeedLimit: users[i].SpeedLimit, - ExpireTime: 0, - } - inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, users[i].Uuid, users[i].Id), userLimit) - } - } - l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info - return nil -} - -func (l *Limiter) UpdateInboundLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - for i := range deleted { - inboundInfo.UserLimitInfo.Delete(fmt.Sprintf("%s|%s|%d", tag, - (deleted)[i].Uuid, (deleted)[i].Id)) - } - for i := range added { - if added[i].SpeedLimit != 0 { - userLimit := &UserLimitInfo{ - UID: added[i].Id, - SpeedLimit: added[i].SpeedLimit, - ExpireTime: 0, - } - inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, - (added)[i].Uuid, (added)[i].Id), userLimit) - } - } - } else { - return fmt.Errorf("no such inbound in limiter: %s", tag) - } - return nil -} - -func (l *Limiter) DeleteInboundLimiter(tag string) error { - l.InboundInfo.Delete(tag) - return nil -} - -func (l *Limiter) AddDynamicSpeedLimit(tag string, userInfo *panel.UserInfo, limit int, expire int64) error { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - userLimit := &UserLimitInfo{ - DynamicSpeedLimit: limit, - ExpireTime: time.Now().Add(time.Duration(expire) * time.Second).Unix(), - } - inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, userInfo.Uuid, userInfo.Id), userLimit) - return nil - } else { - return fmt.Errorf("no such inbound in limiter: %s", tag) - } -} - -type UserIpList struct { - Uid int `json:"Uid"` - IpList []string `json:"Ips"` -} - -func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - onlineUser := make([]UserIpList, 0) - var ipMap *sync.Map - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - ipMap = value.(*sync.Map) - var ip []string - ipMap.Range(func(key, v interface{}) bool { - if v.(bool) { - ip = append(ip, key.(string)) - } - return true - }) - if len(ip) > 0 { - if u, ok := inboundInfo.UserLimitInfo.Load(key.(string)); ok { - onlineUser = append(onlineUser, UserIpList{ - Uid: u.(*UserLimitInfo).UID, - IpList: ip, - }) - } - } - return true - }) - if len(onlineUser) == 0 { - return nil, nil - } - return onlineUser, nil - } else { - return nil, fmt.Errorf("no such inbound in limiter: %s", tag) - } -} - -func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIpList) { - if v, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := v.(*InboundInfo) - //Clear old IP - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - inboundInfo.UserOnlineIP.Delete(key) - return true - }) - // Update User Online IP - for i := range userIpList { - ipMap := new(sync.Map) - for _, userIp := range (userIpList)[i].IpList { - ipMap.Store(userIp, false) - } - inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap) - } - inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool { - if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { - inboundInfo.SpeedLimiter.Delete(key.(string)) - } - return true - }) - } -} - -func (l *Limiter) ClearOnlineUserIpAndSpeedLimiter(tag string) { - if v, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := v.(*InboundInfo) - inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool { - if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { - inboundInfo.SpeedLimiter.Delete(key.(string)) - } - return true - }) - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - inboundInfo.UserOnlineIP.Delete(key) - return true - }) - } -} - -func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) (speedLimiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - nodeLimit := inboundInfo.NodeSpeedLimit - userLimit := 0 - if v, ok := inboundInfo.UserLimitInfo.Load(email); ok { - u := v.(*UserLimitInfo) - if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 { - if u.SpeedLimit != 0 { - userLimit = u.SpeedLimit - u.DynamicSpeedLimit = 0 - u.ExpireTime = 0 - } else { - inboundInfo.UserLimitInfo.Delete(email) - } - } else { - userLimit = determineSpeedLimit(u.SpeedLimit, u.DynamicSpeedLimit) - } - } - ipMap := new(sync.Map) - ipMap.Store(ip, true) - // If any device is online - if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok { - ipMap := v.(*sync.Map) - // If this ip is a new device - if online, ok := ipMap.LoadOrStore(ip, true); !ok { - counter := 0 - ipMap.Range(func(key, value interface{}) bool { - counter++ - return true - }) - if counter > inboundInfo.NodeDeviceLimit && inboundInfo.NodeDeviceLimit > 0 { - ipMap.Delete(ip) - return nil, false, true - } - } else { - if !online.(bool) { - ipMap.Store(ip, true) - } - } - } - limit := int64(determineSpeedLimit(nodeLimit, userLimit)) * 1000000 / 8 // If you need the Speed limit - if limit > 0 { - limiter := ratelimit.NewBucketWithQuantum(time.Second, limit, limit) // Byte/s - if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok { - return v.(*ratelimit.Bucket), true, false - } else { - inboundInfo.SpeedLimiter.Store(email, limiter) - return limiter, true, false - } - } else { - return nil, false, false - } - } else { - newError("Get Inbound Limiter information failed").AtDebug().WriteToLog() - return nil, false, false - } -} - -type Writer struct { - writer buf.Writer - limiter *ratelimit.Bucket - w io.Writer -} - -func (l *Limiter) RateWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer { - return &Writer{ - writer: writer, - limiter: limiter, - } -} - -func (w *Writer) Close() error { - return common.Close(w.writer) -} - -func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error { - w.limiter.Wait(int64(mb.Len())) - return w.writer.WriteMultiBuffer(mb) -} - -// determineSpeedLimit returns the minimum non-zero rate -func determineSpeedLimit(limit1, limit2 int) (limit int) { - if limit1 == 0 || limit2 == 0 { - if limit1 > limit2 { - return limit1 - } else if limit1 < limit2 { - return limit2 - } else { - return 0 - } - } else { - if limit1 > limit2 { - return limit2 - } else if limit1 < limit2 { - return limit1 - } else { - return limit1 - } - } -} - -func determineDeviceLimit(nodeLimit, userLimit int) (limit int) { - if nodeLimit == 0 || userLimit == 0 { - if nodeLimit > userLimit { - return nodeLimit - } else if nodeLimit < userLimit { - return userLimit - } else { - return 0 - } - } else { - if nodeLimit > userLimit { - return userLimit - } else if nodeLimit < userLimit { - return nodeLimit - } else { - return nodeLimit - } - } -} diff --git a/core/app/dispatcher/rule.go b/core/app/dispatcher/rule.go deleted file mode 100644 index 71e316d..0000000 --- a/core/app/dispatcher/rule.go +++ /dev/null @@ -1,42 +0,0 @@ -package dispatcher - -import ( - "github.com/Yuzuki616/V2bX/api/panel" - "reflect" - "sync" -) - -type Rule struct { - Rule *sync.Map // Key: Tag, Value: *panel.DetectRule -} - -func NewRule() *Rule { - return &Rule{ - Rule: new(sync.Map), - } -} - -func (r *Rule) UpdateRule(tag string, newRuleList []panel.DestinationRule) error { - if value, ok := r.Rule.LoadOrStore(tag, newRuleList); ok { - oldRuleList := value.([]panel.DestinationRule) - if !reflect.DeepEqual(oldRuleList, newRuleList) { - r.Rule.Store(tag, newRuleList) - } - } - return nil -} - -func (r *Rule) Detect(tag string, destination string, protocol string) (reject bool) { - reject = false - // If we have some rule for this inbound - if value, ok := r.Rule.Load(tag); ok { - ruleList := value.([]panel.DestinationRule) - for i := range ruleList { - if ruleList[i].Pattern.Match([]byte(destination)) { - reject = true - break - } - } - } - return reject -} diff --git a/core/distro/all/all.go b/core/distro/all/all.go index 9268f90..8f2186c 100644 --- a/core/distro/all/all.go +++ b/core/distro/all/all.go @@ -30,9 +30,6 @@ import ( // Fix dependency cycle caused by core import in internet package _ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl" - // Developer preview features - _ "github.com/xtls/xray-core/app/observatory" - // Inbound and outbound proxies. _ "github.com/xtls/xray-core/proxy/blackhole" _ "github.com/xtls/xray-core/proxy/dns" diff --git a/core/inbound.go b/core/inbound.go index f62b431..69af4ab 100644 --- a/core/inbound.go +++ b/core/inbound.go @@ -3,8 +3,6 @@ package core import ( "context" "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/features/inbound" ) @@ -27,27 +25,3 @@ func (p *Core) AddInbound(config *core.InboundHandlerConfig) error { } return nil } - -func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, users []panel.UserInfo) error { - return p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, users) -} - -func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) { - limit, ok := p.dispatcher.Limiter.InboundInfo.Load(tag) - if ok { - return limit.(*dispatcher.InboundInfo), nil - } - return nil, fmt.Errorf("not found limiter") -} - -func (p *Core) UpdateInboundLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { - return p.dispatcher.Limiter.UpdateInboundLimiter(tag, added, deleted) -} - -func (p *Core) DeleteInboundLimiter(tag string) error { - return p.dispatcher.Limiter.DeleteInboundLimiter(tag) -} - -func (p *Core) UpdateRule(tag string, newRuleList []panel.DestinationRule) error { - return p.dispatcher.RuleManager.UpdateRule(tag, newRuleList) -} diff --git a/core/user.go b/core/user.go index 236ee43..ae9fbfc 100644 --- a/core/user.go +++ b/core/user.go @@ -3,8 +3,6 @@ package core import ( "context" "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/proxy" ) @@ -79,19 +77,3 @@ func (p *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) { } return up, down } - -func (p *Core) AddUserSpeedLimit(tag string, user *panel.UserInfo, speedLimit int, expire int64) error { - return p.dispatcher.Limiter.AddDynamicSpeedLimit(tag, user, speedLimit, expire) -} - -func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) { - return p.dispatcher.Limiter.ListOnlineUserIp(tag) -} - -func (p *Core) UpdateOnlineIp(tag string, ips []dispatcher.UserIpList) { - p.dispatcher.Limiter.UpdateOnlineUserIP(tag, ips) -} - -func (p *Core) ClearOnlineIp(tag string) { - p.dispatcher.Limiter.ClearOnlineUserIpAndSpeedLimiter(tag) -} diff --git a/limiter/conn.go b/limiter/conn.go new file mode 100644 index 0000000..4c853fe --- /dev/null +++ b/limiter/conn.go @@ -0,0 +1,96 @@ +package limiter + +import ( + "sync" +) + +type ConnLimiter struct { + ipLimit int + connLimit int + count sync.Map //map[string]int + ip sync.Map //map[string]map[string]*sync.Map +} + +func NewConnLimiter(conn int, ip int) *ConnLimiter { + return &ConnLimiter{ + connLimit: conn, + ipLimit: ip, + count: sync.Map{}, + ip: sync.Map{}, + } +} + +func (c *ConnLimiter) AddConnCount(user string, ip string) (limit bool) { + if c.connLimit != 0 { + if v, ok := c.count.Load(user); ok { + if v.(int) >= c.connLimit { + return true + } else { + c.count.Store(user, v.(int)+1) + } + } else { + c.count.Store(user, 1) + } + } + if c.ipLimit == 0 { + return false + } + ipMap := new(sync.Map) + ipMap.Store(ip, 1) + if v, ok := c.ip.LoadOrStore(user, ipMap); ok { + // have online ip + ips := v.(*sync.Map) + cn := 0 + if online, ok := ips.Load(ip); !ok { + ips.Range(func(key, value interface{}) bool { + cn++ + if cn >= c.ipLimit { + limit = true + return false + } + return true + }) + if limit { + return + } + ips.Store(ip, 1) + } else { + // have this ip + ips.Store(ip, online.(int)+1) + } + } + return false +} + +func (c *ConnLimiter) DelConnCount(user string, ip string) { + if c.connLimit != 0 { + if v, ok := c.count.Load(user); ok { + if v.(int) == 1 { + c.count.Delete(user) + } else { + c.count.Store(user, v.(int)-1) + } + } + } + if c.ipLimit == 0 { + return + } + if i, ok := c.ip.Load(user); ok { + is := i.(*sync.Map) + if i, ok := is.Load(ip); ok { + if i.(int) == 1 { + is.Delete(ip) + } else { + is.Store(user, i.(int)-1) + } + notDel := false + c.ip.Range(func(_, _ any) bool { + notDel = true + return true + }) + if !notDel { + c.ip.Delete(user) + } + } + } +} diff --git a/limiter/conn_test.go b/limiter/conn_test.go new file mode 100644 index 0000000..c685910 --- /dev/null +++ b/limiter/conn_test.go @@ -0,0 +1,38 @@ +package limiter + +import ( + "sync" + "testing" +) + +var c *ConnLimiter + +func init() { + c = NewConnLimiter(1, 1) +} + +func TestConnLimiter_AddConnCount(t *testing.T) { + t.Log(c.AddConnCount("1", "1")) + t.Log(c.AddConnCount("1", "2")) +} + +func TestConnLimiter_DelConnCount(t *testing.T) { + t.Log(c.AddConnCount("1", "1")) + t.Log(c.AddConnCount("1", "2")) + c.DelConnCount("1", "1") + t.Log(c.AddConnCount("1", "2")) +} + +func BenchmarkConnLimiter(b *testing.B) { + wg := sync.WaitGroup{} + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + c.AddConnCount("1", "2") + c.DelConnCount("1", "2") + wg.Done() + }() + } + wg.Wait() + +} diff --git a/limiter/dynamic.go b/limiter/dynamic.go new file mode 100644 index 0000000..7e7d8c5 --- /dev/null +++ b/limiter/dynamic.go @@ -0,0 +1,37 @@ +package limiter + +import ( + "fmt" + "github.com/Yuzuki616/V2bX/api/panel" + "time" +) + +func (l *Limiter) AddDynamicSpeedLimit(tag string, userInfo *panel.UserInfo, limitNum int, expire int64) error { + userLimit := &UserLimitInfo{ + DynamicSpeedLimit: limitNum, + ExpireTime: time.Now().Add(time.Duration(expire) * time.Second).Unix(), + } + l.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, userInfo.Uuid, userInfo.Id), userLimit) + return nil +} + +// determineSpeedLimit returns the minimum non-zero rate +func determineSpeedLimit(limit1, limit2 int) (limit int) { + if limit1 == 0 || limit2 == 0 { + if limit1 > limit2 { + return limit1 + } else if limit1 < limit2 { + return limit2 + } else { + return 0 + } + } else { + if limit1 > limit2 { + return limit2 + } else if limit1 < limit2 { + return limit1 + } else { + return limit1 + } + } +} diff --git a/limiter/limiter.go b/limiter/limiter.go new file mode 100644 index 0000000..df3ba34 --- /dev/null +++ b/limiter/limiter.go @@ -0,0 +1,166 @@ +package limiter + +import ( + "errors" + "fmt" + "github.com/Yuzuki616/V2bX/api/panel" + "github.com/juju/ratelimit" + "sync" + "time" +) + +var limitLock sync.RWMutex +var limiter map[string]*Limiter + +func Init() { + limiter = map[string]*Limiter{} +} + +type Limiter struct { + Rules []panel.DestinationRule + ProtocolRules []string + SpeedLimit int + UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo + ConnLimiter *ConnLimiter // Key: Uid value: ConnLimiter + SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket +} + +type UserLimitInfo struct { + UID int + SpeedLimit int + DynamicSpeedLimit int + ExpireTime int64 +} + +type LimitConfig struct { + SpeedLimit int + IpLimit int + ConnLimit int +} + +func AddLimiter(tag string, l *LimitConfig, users []panel.UserInfo) *Limiter { + info := &Limiter{ + SpeedLimit: l.SpeedLimit, + UserLimitInfo: new(sync.Map), + ConnLimiter: NewConnLimiter(l.ConnLimit, l.IpLimit), + SpeedLimiter: new(sync.Map), + } + for i := range users { + if users[i].SpeedLimit != 0 { + userLimit := &UserLimitInfo{ + UID: users[i].Id, + SpeedLimit: users[i].SpeedLimit, + ExpireTime: 0, + } + info.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, users[i].Uuid, users[i].Id), userLimit) + } + } + limitLock.Lock() + limiter[tag] = info + limitLock.Unlock() + return info +} + +func GetLimiter(tag string) (info *Limiter, err error) { + limitLock.RLock() + info, ok := limiter[tag] + limitLock.RUnlock() + if !ok { + return nil, errors.New("not found") + } + return +} + +func UpdateLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { + l, err := GetLimiter(tag) + if err != nil { + return fmt.Errorf("get limit error: %s", err) + } + for i := range deleted { + l.UserLimitInfo.Delete(fmt.Sprintf("%s|%s|%d", + tag, + deleted[i].Uuid, + deleted[i].Id)) + } + for i := range added { + if added[i].SpeedLimit != 0 { + userLimit := &UserLimitInfo{ + UID: added[i].Id, + SpeedLimit: added[i].SpeedLimit, + ExpireTime: 0, + } + l.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", + tag, + added[i].Uuid, + added[i].Id), userLimit) + } + } + return nil +} + +func DeleteLimiter(tag string) { + limitLock.Lock() + delete(limiter, tag) + limitLock.Unlock() +} + +func (l *Limiter) CheckLimit(email string, ip string) (Bucket *ratelimit.Bucket, Reject bool) { + // ip and conn limiter + if l.ConnLimiter.AddConnCount(email, ip) { + return nil, true + } + // check and gen speed limit Bucket + nodeLimit := l.SpeedLimit + userLimit := 0 + if v, ok := l.UserLimitInfo.Load(email); ok { + u := v.(*UserLimitInfo) + if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 { + if u.SpeedLimit != 0 { + userLimit = u.SpeedLimit + u.DynamicSpeedLimit = 0 + u.ExpireTime = 0 + } else { + l.UserLimitInfo.Delete(email) + } + } else { + userLimit = determineSpeedLimit(u.SpeedLimit, u.DynamicSpeedLimit) + } + } + limit := int64(determineSpeedLimit(nodeLimit, userLimit)) * 1000000 / 8 // If you need the Speed limit + if limit > 0 { + Bucket = ratelimit.NewBucketWithQuantum(time.Second, limit, limit) // Byte/s + if v, ok := l.SpeedLimiter.LoadOrStore(email, Bucket); ok { + return v.(*ratelimit.Bucket), false + } else { + l.SpeedLimiter.Store(email, Bucket) + return Bucket, false + } + } else { + return nil, false + } +} + +type UserIpList struct { + Uid int `json:"Uid"` + IpList []string `json:"Ips"` +} + +func determineDeviceLimit(nodeLimit, userLimit int) (limit int) { + if nodeLimit == 0 || userLimit == 0 { + if nodeLimit > userLimit { + return nodeLimit + } else if nodeLimit < userLimit { + return userLimit + } else { + return 0 + } + } else { + if nodeLimit > userLimit { + return userLimit + } else if nodeLimit < userLimit { + return nodeLimit + } else { + return nodeLimit + } + } +} diff --git a/limiter/rule.go b/limiter/rule.go new file mode 100644 index 0000000..1e95343 --- /dev/null +++ b/limiter/rule.go @@ -0,0 +1,34 @@ +package limiter + +import ( + "github.com/Yuzuki616/V2bX/api/panel" + "reflect" +) + +func (l *Limiter) CheckDomainRule(destination string) (reject bool) { + // have rule + for i := range l.Rules { + if l.Rules[i].Pattern.MatchString(destination) { + reject = true + break + } + } + return +} + +func (l *Limiter) CheckProtocolRule(protocol string) (reject bool) { + for i := range l.ProtocolRules { + if l.ProtocolRules[i] == protocol { + reject = true + break + } + } + return +} + +func (l *Limiter) UpdateRule(newRuleList []panel.DestinationRule) error { + if !reflect.DeepEqual(l.Rules, newRuleList) { + l.Rules = newRuleList + } + return nil +} diff --git a/limiter/task.go b/limiter/task.go new file mode 100644 index 0000000..6a935d2 --- /dev/null +++ b/limiter/task.go @@ -0,0 +1 @@ +package limiter diff --git a/main.go b/main.go index 60fdc7c..e64b5a8 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" + "github.com/Yuzuki616/V2bX/limiter" "github.com/Yuzuki616/V2bX/node" "log" "os" @@ -40,6 +41,7 @@ func main() { if err != nil { log.Panicf("can't unmarshal config file: %s \n", err) } + limiter.Init() log.Println("Start V2bX...") x := core.New(config) err = x.Start() @@ -69,9 +71,9 @@ func main() { log.Panicf("watch config file error: %s", err) } } - //Explicitly triggering GC to remove garbage from config loading. + // clear memory runtime.GC() - // Running backend + // wait exit signal { osSignals := make(chan os.Signal, 1) signal.Notify(osSignals, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM) diff --git a/node/controller/controller.go b/node/controller.go similarity index 79% rename from node/controller/controller.go rename to node/controller.go index 587c08d..d5d68a9 100644 --- a/node/controller/controller.go +++ b/node/controller.go @@ -1,4 +1,4 @@ -package controller +package node import ( "errors" @@ -7,11 +7,12 @@ import ( "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" + "github.com/Yuzuki616/V2bX/limiter" "github.com/xtls/xray-core/common/task" "log" ) -type Node struct { +type Controller struct { server *core.Core clientInfo panel.ClientInfo apiClient panel.Panel @@ -27,9 +28,9 @@ type Node struct { *conf.ControllerConfig } -// New return a Node service with default parameters. -func New(server *core.Core, api panel.Panel, config *conf.ControllerConfig) *Node { - controller := &Node{ +// NewController return a Node controller with default parameters. +func NewController(server *core.Core, api panel.Panel, config *conf.ControllerConfig) *Controller { + controller := &Controller{ server: server, ControllerConfig: config, apiClient: api, @@ -38,7 +39,7 @@ func New(server *core.Core, api panel.Panel, config *conf.ControllerConfig) *Nod } // Start implement the Start() function of the service interface -func (c *Node) Start() error { +func (c *Controller) Start() error { c.clientInfo = c.apiClient.Describe() // First fetch Node Info var err error @@ -46,12 +47,6 @@ func (c *Node) Start() error { if err != nil { return fmt.Errorf("get node info failed: %s", err) } - c.Tag = c.buildNodeTag() - // Add new tag - err = c.addNewTag(c.nodeInfo) - if err != nil { - return fmt.Errorf("add new tag failed: %s", err) - } // Update user c.userList, err = c.apiClient.GetUserList() if err != nil { @@ -60,25 +55,36 @@ func (c *Node) Start() error { if len(c.userList) == 0 { return errors.New("add users failed: not have any user") } + c.Tag = c.buildNodeTag() + + // add limiter + l := limiter.AddLimiter(c.Tag, &limiter.LimitConfig{ + SpeedLimit: c.SpeedLimit, + IpLimit: c.IPLimit, + ConnLimit: c.ConnLimit, + }, c.userList) + // add rule limiter + if !c.DisableGetRule { + if err = l.UpdateRule(c.nodeInfo.Rules); err != nil { + log.Printf("Update rule filed: %s", err) + } + } + // Add new tag + err = c.addNewNode(c.nodeInfo) + if err != nil { + return fmt.Errorf("add new tag failed: %s", err) + } err = c.addNewUser(c.userList, c.nodeInfo) if err != nil { return err } - if err := c.server.AddInboundLimiter(c.Tag, c.nodeInfo, c.userList); err != nil { - return fmt.Errorf("add inbound limiter failed: %s", err) - } - // Add Rule Manager - if !c.DisableGetRule { - if err := c.server.UpdateRule(c.Tag, c.nodeInfo.Rules); err != nil { - log.Printf("Update rule filed: %s", err) - } - } c.initTask() return nil } // Close implement the Close() function of the service interface -func (c *Node) Close() error { +func (c *Controller) Close() error { + limiter.DeleteLimiter(c.Tag) if c.nodeInfoMonitorPeriodic != nil { err := c.nodeInfoMonitorPeriodic.Close() if err != nil { @@ -112,6 +118,6 @@ func (c *Node) Close() error { return nil } -func (c *Node) buildNodeTag() string { +func (c *Controller) buildNodeTag() string { return fmt.Sprintf("%s_%s_%d", c.nodeInfo.NodeType, c.ListenIP, c.nodeInfo.NodeId) } diff --git a/node/controller/controller_test.go b/node/controller_test.go similarity index 96% rename from node/controller/controller_test.go rename to node/controller_test.go index 7323fd5..a7a711a 100644 --- a/node/controller/controller_test.go +++ b/node/controller_test.go @@ -1,4 +1,4 @@ -package controller_test +package node_test import ( "fmt" @@ -6,7 +6,7 @@ import ( "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" _ "github.com/Yuzuki616/V2bX/core/distro/all" - . "github.com/Yuzuki616/V2bX/node/controller" + . "github.com/Yuzuki616/V2bX/node" xCore "github.com/xtls/xray-core/core" coreConf "github.com/xtls/xray-core/infra/conf" "os" diff --git a/node/controller/inbound.go b/node/inbound.go similarity index 98% rename from node/controller/inbound.go rename to node/inbound.go index a786d9e..35ff84c 100644 --- a/node/controller/inbound.go +++ b/node/inbound.go @@ -1,4 +1,4 @@ -package controller +package node import ( "crypto/rand" @@ -8,7 +8,7 @@ import ( "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/common/file" "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/node/controller/lego" + "github.com/Yuzuki616/V2bX/node/lego" "github.com/goccy/go-json" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/core" @@ -65,7 +65,7 @@ func buildInbound(config *conf.ControllerConfig, nodeInfo *panel.NodeInfo, tag s AcceptProxyProtocol: config.EnableProxyProtocol} //Enable proxy protocol } // Set TLS and XTLS settings - if nodeInfo.EnableTls && config.CertConfig.CertMode != "none" { + if config.EnableTls && config.CertConfig.CertMode != "none" { inbound.StreamSetting.Security = "tls" certFile, keyFile, err := getCertFile(config.CertConfig) if err != nil { @@ -91,7 +91,7 @@ func buildInbound(config *conf.ControllerConfig, nodeInfo *panel.NodeInfo, tag s } func buildV2ray(config *conf.ControllerConfig, nodeInfo *panel.NodeInfo, inbound *coreConf.InboundDetourConfig) error { - if nodeInfo.EnableVless { + if config.EnableVless { //Set vless inbound.Protocol = "vless" if config.EnableFallback { diff --git a/node/controller/inbound_test.go b/node/inbound_test.go similarity index 96% rename from node/controller/inbound_test.go rename to node/inbound_test.go index 6ea0a2d..c80dd0a 100644 --- a/node/controller/inbound_test.go +++ b/node/inbound_test.go @@ -1,8 +1,8 @@ -package controller_test +package node_test import ( "github.com/Yuzuki616/V2bX/api/panel" - . "github.com/Yuzuki616/V2bX/node/controller" + . "github.com/Yuzuki616/V2bX/node" "testing" ) diff --git a/node/controller/lego/cert.go b/node/lego/cert.go similarity index 100% rename from node/controller/lego/cert.go rename to node/lego/cert.go diff --git a/node/controller/lego/lego.go b/node/lego/lego.go similarity index 100% rename from node/controller/lego/lego.go rename to node/lego/lego.go diff --git a/node/controller/lego/lego_test.go b/node/lego/lego_test.go similarity index 100% rename from node/controller/lego/lego_test.go rename to node/lego/lego_test.go diff --git a/node/controller/lego/user.go b/node/lego/user.go similarity index 100% rename from node/controller/lego/user.go rename to node/lego/user.go diff --git a/node/node.go b/node/node.go index 872a2d5..8c1541e 100644 --- a/node/node.go +++ b/node/node.go @@ -4,11 +4,10 @@ import ( "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" - "github.com/Yuzuki616/V2bX/node/controller" ) type Node struct { - controllers []*controller.Node + controllers []*Controller } func New() *Node { @@ -16,14 +15,14 @@ func New() *Node { } func (n *Node) Start(nodes []*conf.NodeConfig, core *core.Core) error { - n.controllers = make([]*controller.Node, len(nodes)) + n.controllers = make([]*Controller, len(nodes)) for i, c := range nodes { p, err := panel.New(c.ApiConfig) if err != nil { return err } // Register controller service - n.controllers[i] = controller.New(core, p, c.ControllerConfig) + n.controllers[i] = NewController(core, p, c.ControllerConfig) err = n.controllers[i].Start() if err != nil { return err diff --git a/node/controller/outbound.go b/node/outbound.go similarity index 98% rename from node/controller/outbound.go rename to node/outbound.go index 0e5e52e..17cc33f 100644 --- a/node/controller/outbound.go +++ b/node/outbound.go @@ -1,4 +1,4 @@ -package controller +package node import ( "fmt" diff --git a/node/controller/task.go b/node/task.go similarity index 64% rename from node/controller/task.go rename to node/task.go index de368c5..3267f44 100644 --- a/node/controller/task.go +++ b/node/task.go @@ -1,10 +1,10 @@ -package controller +package node import ( "fmt" - "github.com/Yuzuki616/V2bX/api/iprecoder" "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/node/controller/lego" + "github.com/Yuzuki616/V2bX/limiter" + "github.com/Yuzuki616/V2bX/node/lego" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/task" "log" @@ -13,7 +13,7 @@ import ( "time" ) -func (c *Node) initTask() { +func (c *Controller) initTask() { // fetch node info task c.nodeInfoMonitorPeriodic = &task.Periodic{ Interval: time.Duration(c.nodeInfo.BaseConfig.PullInterval.(int)) * time.Second, @@ -36,7 +36,7 @@ func (c *Node) initTask() { time.Sleep(time.Duration(c.nodeInfo.BaseConfig.PushInterval.(int)) * time.Second) _ = c.userReportPeriodic.Start() }() - if c.nodeInfo.EnableTls && c.CertConfig.CertMode != "none" && + if c.EnableTls && c.CertConfig.CertMode != "none" && (c.CertConfig.CertMode == "dns" || c.CertConfig.CertMode == "http") { c.renewCertPeriodic = &task.Periodic{ Interval: time.Hour * 24, @@ -48,42 +48,9 @@ func (c *Node) initTask() { _ = c.renewCertPeriodic.Start() }() } - if c.EnableDynamicSpeedLimit { - // Check dynamic speed limit task - c.dynamicSpeedLimitPeriodic = &task.Periodic{ - Interval: time.Duration(c.DynamicSpeedLimitConfig.Periodic) * time.Second, - Execute: c.dynamicSpeedLimit, - } - go func() { - time.Sleep(time.Duration(c.DynamicSpeedLimitConfig.Periodic) * time.Second) - _ = c.dynamicSpeedLimitPeriodic.Start() - }() - log.Printf("[%s: %d] Start dynamic speed limit", c.nodeInfo.NodeType, c.nodeInfo.NodeId) - } - if c.EnableIpRecorder { - switch c.IpRecorderConfig.Type { - case "Recorder": - c.ipRecorder = iprecoder.NewRecorder(c.IpRecorderConfig.RecorderConfig) - case "Redis": - c.ipRecorder = iprecoder.NewRedis(c.IpRecorderConfig.RedisConfig) - default: - log.Printf("recorder type: %s is not vail, disable recorder", c.IpRecorderConfig.Type) - return - } - // report and fetch online ip list task - c.onlineIpReportPeriodic = &task.Periodic{ - Interval: time.Duration(c.IpRecorderConfig.Periodic) * time.Second, - Execute: c.reportOnlineIp, - } - go func() { - time.Sleep(time.Duration(c.IpRecorderConfig.Periodic) * time.Second) - _ = c.onlineIpReportPeriodic.Start() - }() - log.Printf("[%s: %d] Start report online ip", c.nodeInfo.NodeType, c.nodeInfo.NodeId) - } } -func (c *Node) nodeInfoMonitor() (err error) { +func (c *Controller) nodeInfoMonitor() (err error) { // First fetch Node Info newNodeInfo, err := c.apiClient.GetNodeInfo() if err != nil { @@ -95,28 +62,22 @@ func (c *Node) nodeInfoMonitor() (err error) { if newNodeInfo != nil { // Remove old tag oldTag := c.Tag - err := c.removeOldTag(oldTag) + err := c.removeOldNode(oldTag) if err != nil { log.Print(err) return nil } + // Remove Old limiter + limiter.DeleteLimiter(oldTag) // Add new tag c.nodeInfo = newNodeInfo c.Tag = c.buildNodeTag() - err = c.addNewTag(newNodeInfo) + err = c.addNewNode(newNodeInfo) if err != nil { log.Print(err) return nil } nodeInfoChanged = true - // Remove Old limiter - if err = c.server.DeleteInboundLimiter(oldTag); err != nil { - log.Print(err) - return nil - } - if err := c.server.UpdateRule(c.Tag, newNodeInfo.Rules); err != nil { - log.Print(err) - } } // Update User newUserInfo, err := c.apiClient.GetUserList() @@ -126,15 +87,20 @@ func (c *Node) nodeInfoMonitor() (err error) { } if nodeInfoChanged { c.userList = newUserInfo - err = c.addNewUser(c.userList, newNodeInfo) + // Add new Limiter + l := limiter.AddLimiter(c.Tag, &limiter.LimitConfig{ + SpeedLimit: c.SpeedLimit, + IpLimit: c.IPLimit, + ConnLimit: c.ConnLimit, + }, newUserInfo) + err = c.addNewUser(newUserInfo, newNodeInfo) if err != nil { log.Print(err) return nil } - // Add Limiter - if err := c.server.AddInboundLimiter(c.Tag, newNodeInfo, newUserInfo); err != nil { - log.Print(err) - return nil + err = l.UpdateRule(newNodeInfo.Rules) + if err != nil { + log.Printf("Update Rule error: %s", err) } // Check interval if c.nodeInfoMonitorPeriodic.Interval != time.Duration(newNodeInfo.BaseConfig.PullInterval.(int))*time.Second { @@ -176,8 +142,9 @@ func (c *Node) nodeInfoMonitor() (err error) { } if len(added) > 0 || len(deleted) > 0 { // Update Limiter - if err := c.server.UpdateInboundLimiter(c.Tag, added, deleted); err != nil { - log.Print(err) + err = limiter.UpdateLimiter(c.Tag, added, deleted) + if err != nil { + log.Print("update limiter:", err) } } log.Printf("[%s: %d] %d user deleted, %d user added", c.nodeInfo.NodeType, c.nodeInfo.NodeId, @@ -187,7 +154,7 @@ func (c *Node) nodeInfoMonitor() (err error) { return nil } -func (c *Node) removeOldTag(oldTag string) (err error) { +func (c *Controller) removeOldNode(oldTag string) (err error) { err = c.server.RemoveInbound(oldTag) if err != nil { return err @@ -199,7 +166,7 @@ func (c *Node) removeOldTag(oldTag string) (err error) { return nil } -func (c *Node) addNewTag(newNodeInfo *panel.NodeInfo) (err error) { +func (c *Controller) addNewNode(newNodeInfo *panel.NodeInfo) (err error) { inboundConfig, err := buildInbound(c.ControllerConfig, newNodeInfo, c.Tag) if err != nil { return fmt.Errorf("build inbound error: %s", err) @@ -219,10 +186,10 @@ func (c *Node) addNewTag(newNodeInfo *panel.NodeInfo) (err error) { return nil } -func (c *Node) addNewUser(userInfo []panel.UserInfo, nodeInfo *panel.NodeInfo) (err error) { +func (c *Controller) addNewUser(userInfo []panel.UserInfo, nodeInfo *panel.NodeInfo) (err error) { users := make([]*protocol.User, 0) if nodeInfo.NodeType == "V2ray" { - if nodeInfo.EnableVless { + if c.EnableVless { users = c.buildVlessUsers(userInfo) } else { users = c.buildVmessUsers(userInfo) @@ -270,7 +237,7 @@ func compareUserList(old, new []panel.UserInfo) (deleted, added []panel.UserInfo return deleted, added } -func (c *Node) reportUserTraffic() (err error) { +func (c *Controller) reportUserTraffic() (err error) { // Get User traffic userTraffic := make([]panel.UserTraffic, 0) for i := range c.userList { @@ -294,52 +261,11 @@ func (c *Node) reportUserTraffic() (err error) { } } userTraffic = nil - if !c.EnableIpRecorder { - c.server.ClearOnlineIp(c.Tag) - } runtime.GC() return nil } -func (c *Node) reportOnlineIp() (err error) { - onlineIp, err := c.server.ListOnlineIp(c.Tag) - if err != nil { - log.Print(err) - return nil - } - onlineIp, err = c.ipRecorder.SyncOnlineIp(onlineIp) - if err != nil { - log.Print("Report online ip error: ", err) - c.server.ClearOnlineIp(c.Tag) - } - if c.IpRecorderConfig.EnableIpSync { - c.server.UpdateOnlineIp(c.Tag, onlineIp) - log.Printf("[Node: %d] Updated %d online ip", c.nodeInfo.NodeId, len(onlineIp)) - } - log.Printf("[Node: %d] Report %d online ip", c.nodeInfo.NodeId, len(onlineIp)) - return nil -} - -func (c *Node) dynamicSpeedLimit() error { - if c.EnableDynamicSpeedLimit { - for i := range c.userList { - up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), false) - if c.userList[i].Traffic+down+up/1024/1024 > c.DynamicSpeedLimitConfig.Traffic { - err := c.server.AddUserSpeedLimit(c.Tag, - &c.userList[i], - c.DynamicSpeedLimitConfig.SpeedLimit, - time.Now().Add(time.Second*time.Duration(c.DynamicSpeedLimitConfig.ExpireTime)).Unix()) - if err != nil { - log.Print(err) - } - } - c.userList[i].Traffic = 0 - } - } - return nil -} - -func (c *Node) RenewCert() { +func (c *Controller) RenewCert() { l, err := lego.New(c.CertConfig) if err != nil { log.Print(err) diff --git a/node/controller/user.go b/node/user.go similarity index 76% rename from node/controller/user.go rename to node/user.go index dce63bb..b8d6436 100644 --- a/node/controller/user.go +++ b/node/user.go @@ -1,4 +1,4 @@ -package controller +package node import ( "encoding/base64" @@ -14,7 +14,7 @@ import ( "strings" ) -func (c *Node) buildVmessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { +func (c *Controller) buildVmessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i, user := range userInfo { users[i] = c.buildVmessUser(&user, 0) @@ -22,7 +22,7 @@ func (c *Node) buildVmessUsers(userInfo []panel.UserInfo) (users []*protocol.Use return users } -func (c *Node) buildVmessUser(userInfo *panel.UserInfo, serverAlterID uint16) (user *protocol.User) { +func (c *Controller) buildVmessUser(userInfo *panel.UserInfo, serverAlterID uint16) (user *protocol.User) { vmessAccount := &conf.VMessAccount{ ID: userInfo.Uuid, AlterIds: serverAlterID, @@ -35,7 +35,7 @@ func (c *Node) buildVmessUser(userInfo *panel.UserInfo, serverAlterID uint16) (u } } -func (c *Node) buildVlessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { +func (c *Controller) buildVlessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i := range userInfo { users[i] = c.buildVlessUser(&(userInfo)[i]) @@ -43,7 +43,7 @@ func (c *Node) buildVlessUsers(userInfo []panel.UserInfo) (users []*protocol.Use return users } -func (c *Node) buildVlessUser(userInfo *panel.UserInfo) (user *protocol.User) { +func (c *Controller) buildVlessUser(userInfo *panel.UserInfo) (user *protocol.User) { vlessAccount := &vless.Account{ Id: userInfo.Uuid, Flow: "xtls-rprx-direct", @@ -55,7 +55,7 @@ func (c *Node) buildVlessUser(userInfo *panel.UserInfo) (user *protocol.User) { } } -func (c *Node) buildTrojanUsers(userInfo []panel.UserInfo) (users []*protocol.User) { +func (c *Controller) buildTrojanUsers(userInfo []panel.UserInfo) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i := range userInfo { users[i] = c.buildTrojanUser(&(userInfo)[i]) @@ -63,7 +63,7 @@ func (c *Node) buildTrojanUsers(userInfo []panel.UserInfo) (users []*protocol.Us return users } -func (c *Node) buildTrojanUser(userInfo *panel.UserInfo) (user *protocol.User) { +func (c *Controller) buildTrojanUser(userInfo *panel.UserInfo) (user *protocol.User) { trojanAccount := &trojan.Account{ Password: userInfo.Uuid, Flow: "xtls-rprx-direct", @@ -90,7 +90,7 @@ func getCipherFromString(c string) shadowsocks.CipherType { } } -func (c *Node) buildSSUsers(userInfo []panel.UserInfo, cypher shadowsocks.CipherType) (users []*protocol.User) { +func (c *Controller) buildSSUsers(userInfo []panel.UserInfo, cypher shadowsocks.CipherType) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i := range userInfo { users[i] = c.buildSSUser(&(userInfo)[i], cypher) @@ -98,7 +98,7 @@ func (c *Node) buildSSUsers(userInfo []panel.UserInfo, cypher shadowsocks.Cipher return users } -func (c *Node) buildSSUser(userInfo *panel.UserInfo, cypher shadowsocks.CipherType) (user *protocol.User) { +func (c *Controller) buildSSUser(userInfo *panel.UserInfo, cypher shadowsocks.CipherType) (user *protocol.User) { if c.nodeInfo.ServerKey == "" { ssAccount := &shadowsocks.Account{ Password: userInfo.Uuid, @@ -121,6 +121,6 @@ func (c *Node) buildSSUser(userInfo *panel.UserInfo, cypher shadowsocks.CipherTy } } -func (c *Node) buildUserTag(user *panel.UserInfo) string { +func (c *Controller) buildUserTag(user *panel.UserInfo) string { return fmt.Sprintf("%s|%s|%d", c.Tag, user.Uuid, user.Id) }