diff --git a/api/iprecoder/interface.go b/api/iprecoder/interface.go index 570c558..6095b42 100644 --- a/api/iprecoder/interface.go +++ b/api/iprecoder/interface.go @@ -1,7 +1,9 @@ package iprecoder -import "github.com/Yuzuki616/V2bX/core/app/dispatcher" +import ( + "github.com/Yuzuki616/V2bX/limiter" +) type IpRecorder interface { - SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpList, error) + SyncOnlineIp(Ips []limiter.UserIpList) ([]limiter.UserIpList, error) } diff --git a/api/iprecoder/recorder.go b/api/iprecoder/recorder.go index ce91561..c8204b8 100644 --- a/api/iprecoder/recorder.go +++ b/api/iprecoder/recorder.go @@ -3,7 +3,7 @@ package iprecoder import ( "errors" "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" + "github.com/Yuzuki616/V2bX/limiter" "github.com/go-resty/resty/v2" "github.com/goccy/go-json" "time" @@ -21,7 +21,7 @@ func NewRecorder(c *conf.RecorderConfig) *Recorder { } } -func (r *Recorder) SyncOnlineIp(ips []dispatcher.UserIpList) ([]dispatcher.UserIpList, error) { +func (r *Recorder) SyncOnlineIp(ips []limiter.UserIpList) ([]limiter.UserIpList, error) { rsp, err := r.client.R(). SetBody(ips). Post(r.Url + "/api/v1/SyncOnlineIp?token=" + r.Token) @@ -31,7 +31,7 @@ func (r *Recorder) SyncOnlineIp(ips []dispatcher.UserIpList) ([]dispatcher.UserI if rsp.StatusCode() != 200 { return nil, errors.New(rsp.String()) } - ips = []dispatcher.UserIpList{} + ips = []limiter.UserIpList{} err = json.Unmarshal(rsp.Body(), &ips) if err != nil { return nil, err diff --git a/api/iprecoder/redis.go b/api/iprecoder/redis.go index 490d425..5222ce3 100644 --- a/api/iprecoder/redis.go +++ b/api/iprecoder/redis.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" + "github.com/Yuzuki616/V2bX/limiter" "github.com/go-redis/redis/v8" "strconv" "time" @@ -26,7 +26,7 @@ func NewRedis(c *conf.RedisConfig) *Redis { } } -func (r *Redis) SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpList, error) { +func (r *Redis) SyncOnlineIp(Ips []limiter.UserIpList) ([]limiter.UserIpList, error) { ctx := context.Background() for i := range Ips { err := r.client.SAdd(ctx, "UserList", Ips[i].Uid).Err() @@ -46,7 +46,7 @@ func (r *Redis) SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpLi if c.Err() != nil { return nil, fmt.Errorf("get user list failed: %s", c.Err()) } - Ips = make([]dispatcher.UserIpList, 0, len(c.Val())) + Ips = make([]limiter.UserIpList, 0, len(c.Val())) for _, uid := range c.Val() { uidInt, err := strconv.Atoi(uid) if err != nil { @@ -56,7 +56,7 @@ func (r *Redis) SyncOnlineIp(Ips []dispatcher.UserIpList) ([]dispatcher.UserIpLi if ips.Err() != nil { return nil, fmt.Errorf("get ip list failed: %s", ips.Err()) } - Ips = append(Ips, dispatcher.UserIpList{ + Ips = append(Ips, limiter.UserIpList{ Uid: uidInt, IpList: ips.Val(), }) diff --git a/api/iprecoder/redis_test.go b/api/iprecoder/redis_test.go index 5e08f8e..3e013be 100644 --- a/api/iprecoder/redis_test.go +++ b/api/iprecoder/redis_test.go @@ -2,7 +2,7 @@ package iprecoder import ( "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" + "github.com/Yuzuki616/V2bX/limiter" "log" "testing" ) @@ -13,7 +13,7 @@ func TestRedis_SyncOnlineIp(t *testing.T) { Password: "", Db: 0, }) - users, err := r.SyncOnlineIp([]dispatcher.UserIpList{ + users, err := r.SyncOnlineIp([]limiter.UserIpList{ {1, []string{"5.5.5.5", "4.4.4.4"}}, }) if err != nil { diff --git a/api/panel/node.go b/api/panel/node.go index 6b9f145..9dbf29b 100644 --- a/api/panel/node.go +++ b/api/panel/node.go @@ -36,12 +36,8 @@ type DestinationRule struct { Pattern *regexp.Regexp } type localNodeConfig struct { - NodeId int - NodeType string - EnableVless bool - EnableTls bool - SpeedLimit int - DeviceLimit int + NodeId int + NodeType string } func (c *Client) GetNodeInfo() (nodeInfo *NodeInfo, err error) { diff --git a/api/panel/panel.go b/api/panel/panel.go index b61678f..76643d9 100644 --- a/api/panel/panel.go +++ b/api/panel/panel.go @@ -28,8 +28,6 @@ type Client struct { Key string NodeType string NodeId int - SpeedLimit int - DeviceLimit int LocalRuleList []DestinationRule etag string } @@ -69,8 +67,6 @@ func New(c *conf.ApiConfig) (Panel, error) { Key: c.Key, APIHost: c.APIHost, NodeType: c.NodeType, - SpeedLimit: c.SpeedLimit, - DeviceLimit: c.DeviceLimit, NodeId: c.NodeID, LocalRuleList: localRuleList, }, nil diff --git a/api/panel/user.go b/api/panel/user.go index 1dd999c..fbb4992 100644 --- a/api/panel/user.go +++ b/api/panel/user.go @@ -10,18 +10,9 @@ type OnlineUser struct { IP string } -type V2RayUserInfo struct { - Uuid string `json:"uuid"` - Email string `json:"email"` - AlterId int `json:"alter_id"` -} -type TrojanUserInfo struct { - Password string `json:"password"` -} type UserInfo struct { Id int `json:"id"` Uuid string `json:"uuid"` - Email string `json:"-"` SpeedLimit int `json:"speed_limit"` Traffic int64 `json:"-"` } diff --git a/common/rate/rate.go b/common/rate/rate.go new file mode 100644 index 0000000..a41e6b6 --- /dev/null +++ b/common/rate/rate.go @@ -0,0 +1,28 @@ +package rate + +import ( + "github.com/juju/ratelimit" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/buf" +) + +type Writer struct { + writer buf.Writer + limiter *ratelimit.Bucket +} + +func NewRateLimitWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer { + return &Writer{ + writer: writer, + limiter: limiter, + } +} + +func (w *Writer) Close() error { + return common.Close(w.writer) +} + +func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error { + w.limiter.Wait(int64(mb.Len())) + return w.writer.WriteMultiBuffer(mb) +} diff --git a/conf/conf.go b/conf/conf.go index e00b48b..ceab89f 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -67,7 +67,7 @@ func (p *Conf) Watch(filePath string, reload func()) error { } case err := <-watcher.Errors: if err != nil { - log.Panicf("watcher error: %s", err) + log.Printf("File watcher error: %s", err) } } } diff --git a/conf/node.go b/conf/node.go index 8e4d118..941c574 100644 --- a/conf/node.go +++ b/conf/node.go @@ -52,6 +52,11 @@ type ControllerConfig struct { SendIP string `yaml:"SendIP"` EnableDNS bool `yaml:"EnableDNS"` DNSType string `yaml:"DNSType"` + EnableVless bool `yaml:"EnableVless"` + EnableTls bool `yaml:"EnableTls"` + SpeedLimit int `yaml:"SpeedLimit"` + IPLimit int `yaml:"DeviceLimit"` + ConnLimit int `yaml:"ConnLimit"` DisableUploadTraffic bool `yaml:"DisableUploadTraffic"` DisableGetRule bool `yaml:"DisableGetRule"` EnableProxyProtocol bool `yaml:"EnableProxyProtocol"` @@ -67,16 +72,12 @@ type ControllerConfig struct { } type ApiConfig struct { - APIHost string `yaml:"ApiHost"` - NodeID int `yaml:"NodeID"` - Key string `yaml:"ApiKey"` - NodeType string `yaml:"NodeType"` - EnableVless bool `yaml:"EnableVless"` - Timeout int `yaml:"Timeout"` - SpeedLimit int `yaml:"SpeedLimit"` - DeviceLimit int `yaml:"DeviceLimit"` - RuleListPath string `yaml:"RuleListPath"` - DisableCustomConfig bool `yaml:"DisableCustomConfig"` + APIHost string `yaml:"ApiHost"` + NodeID int `yaml:"NodeID"` + Key string `yaml:"ApiKey"` + NodeType string `yaml:"NodeType"` + Timeout int `yaml:"Timeout"` + RuleListPath string `yaml:"RuleListPath"` } type NodeConfig struct { diff --git a/core/app/dispatcher/default.go b/core/app/dispatcher/default.go index daa6c29..7ef40c5 100644 --- a/core/app/dispatcher/default.go +++ b/core/app/dispatcher/default.go @@ -5,6 +5,8 @@ package dispatcher import ( "context" "fmt" + "github.com/Yuzuki616/V2bX/common/rate" + "github.com/Yuzuki616/V2bX/limiter" routingSession "github.com/xtls/xray-core/features/routing/session" "strings" "sync" @@ -89,14 +91,12 @@ func (r *cachedReader) Interrupt() { // DefaultDispatcher is a default implementation of Dispatcher. type DefaultDispatcher struct { - ohm outbound.Manager - router routing.Router - policy policy.Manager - stats stats.Manager - dns dns.Client - fdns dns.FakeDNSEngine - Limiter *Limiter - RuleManager *Rule + ohm outbound.Manager + router routing.Router + policy policy.Manager + stats stats.Manager + dns dns.Client + fdns dns.FakeDNSEngine } func init() { @@ -121,8 +121,6 @@ func (d *DefaultDispatcher) Init(config *Config, om outbound.Manager, router rou d.policy = pm d.stats = sm d.dns = dns - d.Limiter = NewLimiter() - d.RuleManager = NewRule() return nil } @@ -139,7 +137,7 @@ func (*DefaultDispatcher) Start() error { // Close implements common.Closable. func (*DefaultDispatcher) Close() error { return nil } -func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link) { +func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link, *limiter.Limiter, error) { downOpt := pipe.OptionsFromContext(ctx) upOpt := downOpt @@ -226,21 +224,31 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn if sessionInbound != nil { user = sessionInbound.User } - + var limit *limiter.Limiter if user != nil && len(user.Email) > 0 { - // Speed Limit and Device Limit - bucket, ok, reject := d.Limiter.CheckSpeedAndDeviceLimit(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String()) - if reject { - newError("Devices reach the limit: ", user.Email).AtError().WriteToLog() + var err error + limit, err = limiter.GetLimiter(sessionInbound.Tag) + if err != nil { + newError("Get limit info error: ", err).AtError().WriteToLog() common.Close(outboundLink.Writer) common.Close(inboundLink.Writer) common.Interrupt(outboundLink.Reader) common.Interrupt(inboundLink.Reader) - return nil, nil + return nil, nil, nil, newError("Get limit info error: ", err) } - if ok { - inboundLink.Writer = d.Limiter.RateWriter(inboundLink.Writer, bucket) - outboundLink.Writer = d.Limiter.RateWriter(outboundLink.Writer, bucket) + // Speed Limit and Device Limit + w, reject := limit.CheckLimit(user.Email, sessionInbound.Source.Address.IP().String()) + if reject { + newError("Limited ", user.Email, " by conn or ip").AtWarning().WriteToLog() + common.Close(outboundLink.Writer) + common.Close(inboundLink.Writer) + common.Interrupt(outboundLink.Reader) + common.Interrupt(inboundLink.Reader) + return nil, nil, nil, newError("Limited ", user.Email, " by conn or ip") + } + if w != nil { + inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w) + outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w) } p := d.policy.ForLevel(user.Level) if p.Stats.UserUplink { @@ -263,7 +271,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn } } - return inboundLink, outboundLink + return inboundLink, outboundLink, limit, nil } func (d *DefaultDispatcher) shouldOverride(ctx context.Context, result SniffResult, request session.SniffingRequest, destination net.Destination) bool { @@ -313,11 +321,13 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin content = new(session.Content) ctx = session.ContextWithContent(ctx, content) } - sniffingRequest := content.SniffingRequest - inbound, outbound := d.getLink(ctx, destination.Network, sniffingRequest) + inbound, outbound, l, err := d.getLink(ctx, destination.Network, sniffingRequest) + if err != nil { + return nil, err + } if !sniffingRequest.Enabled { - go d.routedDispatch(ctx, outbound, destination, "") + go d.routedDispatch(ctx, outbound, destination, l) } else { go func() { cReader := &cachedReader{ @@ -338,7 +348,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin ob.Target = destination } } - d.routedDispatch(ctx, outbound, destination, content.Protocol) + d.routedDispatch(ctx, outbound, destination, l) }() } return inbound, nil @@ -360,7 +370,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De } sniffingRequest := content.SniffingRequest if !sniffingRequest.Enabled { - go d.routedDispatch(ctx, outbound, destination, content.Protocol) + go d.routedDispatch(ctx, outbound, destination, nil) } else { go func() { cReader := &cachedReader{ @@ -381,10 +391,9 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De ob.Target = destination } } - d.routedDispatch(ctx, outbound, destination, content.Protocol) + d.routedDispatch(ctx, outbound, destination, nil) }() } - return nil } @@ -434,7 +443,7 @@ func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, netw return contentResult, contentErr } -func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination, protocol string) { +func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination, l *limiter.Limiter) { ob := session.OutboundFromContext(ctx) if hosts, ok := d.dns.(dns.HostsLookup); ok && destination.Address.Family().IsDomain() { proxied := hosts.LookupHosts(ob.Target.String()) @@ -455,9 +464,22 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport. sessionInbound := session.InboundFromContext(ctx) // Whether the inbound connection contains a user if sessionInbound.User != nil { - if d.RuleManager.Detect(sessionInbound.Tag, destination.String(), protocol) { + if l == nil { + var err error + l, err = limiter.GetLimiter(sessionInbound.Tag) + if err != nil { + newError("Get limiter error: ", err).AtError().WriteToLog() + common.Close(link.Writer) + common.Interrupt(link.Reader) + return + } + } else { + defer func() { + l.ConnLimiter.DelConnCount(sessionInbound.User.Email, sessionInbound.Source.Address.IP().String()) + }() + } + if l.CheckDomainRule(destination.String()) { newError(fmt.Sprintf("User %s access %s reject by rule", sessionInbound.User.Email, destination.String())).AtError().WriteToLog() - newError("destination is reject by rule") common.Close(link.Writer) common.Interrupt(link.Reader) return diff --git a/core/app/dispatcher/limiter.go b/core/app/dispatcher/limiter.go deleted file mode 100644 index 0b3a9d6..0000000 --- a/core/app/dispatcher/limiter.go +++ /dev/null @@ -1,305 +0,0 @@ -package dispatcher - -import ( - "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/juju/ratelimit" - "github.com/xtls/xray-core/common" - "github.com/xtls/xray-core/common/buf" - "io" - "sync" - "time" -) - -type UserLimitInfo struct { - UID int - SpeedLimit int - DynamicSpeedLimit int - ExpireTime int64 -} - -type InboundInfo struct { - Tag string - NodeSpeedLimit int - NodeDeviceLimit int - UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo - SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket - UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool -} - -type Limiter struct { - InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo -} - -func NewLimiter() *Limiter { - return &Limiter{ - InboundInfo: new(sync.Map), - } -} - -func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, users []panel.UserInfo) error { - inboundInfo := &InboundInfo{ - Tag: tag, - NodeSpeedLimit: nodeInfo.SpeedLimit, - NodeDeviceLimit: nodeInfo.DeviceLimit, - UserLimitInfo: new(sync.Map), - SpeedLimiter: new(sync.Map), - UserOnlineIP: new(sync.Map), - } - for i := range users { - if users[i].SpeedLimit != 0 { - userLimit := &UserLimitInfo{ - UID: users[i].Id, - SpeedLimit: users[i].SpeedLimit, - ExpireTime: 0, - } - inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, users[i].Uuid, users[i].Id), userLimit) - } - } - l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info - return nil -} - -func (l *Limiter) UpdateInboundLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - for i := range deleted { - inboundInfo.UserLimitInfo.Delete(fmt.Sprintf("%s|%s|%d", tag, - (deleted)[i].Uuid, (deleted)[i].Id)) - } - for i := range added { - if added[i].SpeedLimit != 0 { - userLimit := &UserLimitInfo{ - UID: added[i].Id, - SpeedLimit: added[i].SpeedLimit, - ExpireTime: 0, - } - inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, - (added)[i].Uuid, (added)[i].Id), userLimit) - } - } - } else { - return fmt.Errorf("no such inbound in limiter: %s", tag) - } - return nil -} - -func (l *Limiter) DeleteInboundLimiter(tag string) error { - l.InboundInfo.Delete(tag) - return nil -} - -func (l *Limiter) AddDynamicSpeedLimit(tag string, userInfo *panel.UserInfo, limit int, expire int64) error { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - userLimit := &UserLimitInfo{ - DynamicSpeedLimit: limit, - ExpireTime: time.Now().Add(time.Duration(expire) * time.Second).Unix(), - } - inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, userInfo.Uuid, userInfo.Id), userLimit) - return nil - } else { - return fmt.Errorf("no such inbound in limiter: %s", tag) - } -} - -type UserIpList struct { - Uid int `json:"Uid"` - IpList []string `json:"Ips"` -} - -func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - onlineUser := make([]UserIpList, 0) - var ipMap *sync.Map - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - ipMap = value.(*sync.Map) - var ip []string - ipMap.Range(func(key, v interface{}) bool { - if v.(bool) { - ip = append(ip, key.(string)) - } - return true - }) - if len(ip) > 0 { - if u, ok := inboundInfo.UserLimitInfo.Load(key.(string)); ok { - onlineUser = append(onlineUser, UserIpList{ - Uid: u.(*UserLimitInfo).UID, - IpList: ip, - }) - } - } - return true - }) - if len(onlineUser) == 0 { - return nil, nil - } - return onlineUser, nil - } else { - return nil, fmt.Errorf("no such inbound in limiter: %s", tag) - } -} - -func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIpList) { - if v, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := v.(*InboundInfo) - //Clear old IP - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - inboundInfo.UserOnlineIP.Delete(key) - return true - }) - // Update User Online IP - for i := range userIpList { - ipMap := new(sync.Map) - for _, userIp := range (userIpList)[i].IpList { - ipMap.Store(userIp, false) - } - inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap) - } - inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool { - if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { - inboundInfo.SpeedLimiter.Delete(key.(string)) - } - return true - }) - } -} - -func (l *Limiter) ClearOnlineUserIpAndSpeedLimiter(tag string) { - if v, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := v.(*InboundInfo) - inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool { - if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { - inboundInfo.SpeedLimiter.Delete(key.(string)) - } - return true - }) - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - inboundInfo.UserOnlineIP.Delete(key) - return true - }) - } -} - -func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) (speedLimiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) { - if value, ok := l.InboundInfo.Load(tag); ok { - inboundInfo := value.(*InboundInfo) - nodeLimit := inboundInfo.NodeSpeedLimit - userLimit := 0 - if v, ok := inboundInfo.UserLimitInfo.Load(email); ok { - u := v.(*UserLimitInfo) - if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 { - if u.SpeedLimit != 0 { - userLimit = u.SpeedLimit - u.DynamicSpeedLimit = 0 - u.ExpireTime = 0 - } else { - inboundInfo.UserLimitInfo.Delete(email) - } - } else { - userLimit = determineSpeedLimit(u.SpeedLimit, u.DynamicSpeedLimit) - } - } - ipMap := new(sync.Map) - ipMap.Store(ip, true) - // If any device is online - if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok { - ipMap := v.(*sync.Map) - // If this ip is a new device - if online, ok := ipMap.LoadOrStore(ip, true); !ok { - counter := 0 - ipMap.Range(func(key, value interface{}) bool { - counter++ - return true - }) - if counter > inboundInfo.NodeDeviceLimit && inboundInfo.NodeDeviceLimit > 0 { - ipMap.Delete(ip) - return nil, false, true - } - } else { - if !online.(bool) { - ipMap.Store(ip, true) - } - } - } - limit := int64(determineSpeedLimit(nodeLimit, userLimit)) * 1000000 / 8 // If you need the Speed limit - if limit > 0 { - limiter := ratelimit.NewBucketWithQuantum(time.Second, limit, limit) // Byte/s - if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok { - return v.(*ratelimit.Bucket), true, false - } else { - inboundInfo.SpeedLimiter.Store(email, limiter) - return limiter, true, false - } - } else { - return nil, false, false - } - } else { - newError("Get Inbound Limiter information failed").AtDebug().WriteToLog() - return nil, false, false - } -} - -type Writer struct { - writer buf.Writer - limiter *ratelimit.Bucket - w io.Writer -} - -func (l *Limiter) RateWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer { - return &Writer{ - writer: writer, - limiter: limiter, - } -} - -func (w *Writer) Close() error { - return common.Close(w.writer) -} - -func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error { - w.limiter.Wait(int64(mb.Len())) - return w.writer.WriteMultiBuffer(mb) -} - -// determineSpeedLimit returns the minimum non-zero rate -func determineSpeedLimit(limit1, limit2 int) (limit int) { - if limit1 == 0 || limit2 == 0 { - if limit1 > limit2 { - return limit1 - } else if limit1 < limit2 { - return limit2 - } else { - return 0 - } - } else { - if limit1 > limit2 { - return limit2 - } else if limit1 < limit2 { - return limit1 - } else { - return limit1 - } - } -} - -func determineDeviceLimit(nodeLimit, userLimit int) (limit int) { - if nodeLimit == 0 || userLimit == 0 { - if nodeLimit > userLimit { - return nodeLimit - } else if nodeLimit < userLimit { - return userLimit - } else { - return 0 - } - } else { - if nodeLimit > userLimit { - return userLimit - } else if nodeLimit < userLimit { - return nodeLimit - } else { - return nodeLimit - } - } -} diff --git a/core/app/dispatcher/rule.go b/core/app/dispatcher/rule.go deleted file mode 100644 index 71e316d..0000000 --- a/core/app/dispatcher/rule.go +++ /dev/null @@ -1,42 +0,0 @@ -package dispatcher - -import ( - "github.com/Yuzuki616/V2bX/api/panel" - "reflect" - "sync" -) - -type Rule struct { - Rule *sync.Map // Key: Tag, Value: *panel.DetectRule -} - -func NewRule() *Rule { - return &Rule{ - Rule: new(sync.Map), - } -} - -func (r *Rule) UpdateRule(tag string, newRuleList []panel.DestinationRule) error { - if value, ok := r.Rule.LoadOrStore(tag, newRuleList); ok { - oldRuleList := value.([]panel.DestinationRule) - if !reflect.DeepEqual(oldRuleList, newRuleList) { - r.Rule.Store(tag, newRuleList) - } - } - return nil -} - -func (r *Rule) Detect(tag string, destination string, protocol string) (reject bool) { - reject = false - // If we have some rule for this inbound - if value, ok := r.Rule.Load(tag); ok { - ruleList := value.([]panel.DestinationRule) - for i := range ruleList { - if ruleList[i].Pattern.Match([]byte(destination)) { - reject = true - break - } - } - } - return reject -} diff --git a/core/distro/all/all.go b/core/distro/all/all.go index 9268f90..8f2186c 100644 --- a/core/distro/all/all.go +++ b/core/distro/all/all.go @@ -30,9 +30,6 @@ import ( // Fix dependency cycle caused by core import in internet package _ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl" - // Developer preview features - _ "github.com/xtls/xray-core/app/observatory" - // Inbound and outbound proxies. _ "github.com/xtls/xray-core/proxy/blackhole" _ "github.com/xtls/xray-core/proxy/dns" diff --git a/core/inbound.go b/core/inbound.go index f62b431..69af4ab 100644 --- a/core/inbound.go +++ b/core/inbound.go @@ -3,8 +3,6 @@ package core import ( "context" "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/features/inbound" ) @@ -27,27 +25,3 @@ func (p *Core) AddInbound(config *core.InboundHandlerConfig) error { } return nil } - -func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, users []panel.UserInfo) error { - return p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, users) -} - -func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) { - limit, ok := p.dispatcher.Limiter.InboundInfo.Load(tag) - if ok { - return limit.(*dispatcher.InboundInfo), nil - } - return nil, fmt.Errorf("not found limiter") -} - -func (p *Core) UpdateInboundLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { - return p.dispatcher.Limiter.UpdateInboundLimiter(tag, added, deleted) -} - -func (p *Core) DeleteInboundLimiter(tag string) error { - return p.dispatcher.Limiter.DeleteInboundLimiter(tag) -} - -func (p *Core) UpdateRule(tag string, newRuleList []panel.DestinationRule) error { - return p.dispatcher.RuleManager.UpdateRule(tag, newRuleList) -} diff --git a/core/user.go b/core/user.go index 236ee43..ae9fbfc 100644 --- a/core/user.go +++ b/core/user.go @@ -3,8 +3,6 @@ package core import ( "context" "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/core/app/dispatcher" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/proxy" ) @@ -79,19 +77,3 @@ func (p *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) { } return up, down } - -func (p *Core) AddUserSpeedLimit(tag string, user *panel.UserInfo, speedLimit int, expire int64) error { - return p.dispatcher.Limiter.AddDynamicSpeedLimit(tag, user, speedLimit, expire) -} - -func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) { - return p.dispatcher.Limiter.ListOnlineUserIp(tag) -} - -func (p *Core) UpdateOnlineIp(tag string, ips []dispatcher.UserIpList) { - p.dispatcher.Limiter.UpdateOnlineUserIP(tag, ips) -} - -func (p *Core) ClearOnlineIp(tag string) { - p.dispatcher.Limiter.ClearOnlineUserIpAndSpeedLimiter(tag) -} diff --git a/limiter/conn.go b/limiter/conn.go new file mode 100644 index 0000000..4c853fe --- /dev/null +++ b/limiter/conn.go @@ -0,0 +1,96 @@ +package limiter + +import ( + "sync" +) + +type ConnLimiter struct { + ipLimit int + connLimit int + count sync.Map //map[string]int + ip sync.Map //map[string]map[string]*sync.Map +} + +func NewConnLimiter(conn int, ip int) *ConnLimiter { + return &ConnLimiter{ + connLimit: conn, + ipLimit: ip, + count: sync.Map{}, + ip: sync.Map{}, + } +} + +func (c *ConnLimiter) AddConnCount(user string, ip string) (limit bool) { + if c.connLimit != 0 { + if v, ok := c.count.Load(user); ok { + if v.(int) >= c.connLimit { + return true + } else { + c.count.Store(user, v.(int)+1) + } + } else { + c.count.Store(user, 1) + } + } + if c.ipLimit == 0 { + return false + } + ipMap := new(sync.Map) + ipMap.Store(ip, 1) + if v, ok := c.ip.LoadOrStore(user, ipMap); ok { + // have online ip + ips := v.(*sync.Map) + cn := 0 + if online, ok := ips.Load(ip); !ok { + ips.Range(func(key, value interface{}) bool { + cn++ + if cn >= c.ipLimit { + limit = true + return false + } + return true + }) + if limit { + return + } + ips.Store(ip, 1) + } else { + // have this ip + ips.Store(ip, online.(int)+1) + } + } + return false +} + +func (c *ConnLimiter) DelConnCount(user string, ip string) { + if c.connLimit != 0 { + if v, ok := c.count.Load(user); ok { + if v.(int) == 1 { + c.count.Delete(user) + } else { + c.count.Store(user, v.(int)-1) + } + } + } + if c.ipLimit == 0 { + return + } + if i, ok := c.ip.Load(user); ok { + is := i.(*sync.Map) + if i, ok := is.Load(ip); ok { + if i.(int) == 1 { + is.Delete(ip) + } else { + is.Store(user, i.(int)-1) + } + notDel := false + c.ip.Range(func(_, _ any) bool { + notDel = true + return true + }) + if !notDel { + c.ip.Delete(user) + } + } + } +} diff --git a/limiter/conn_test.go b/limiter/conn_test.go new file mode 100644 index 0000000..c685910 --- /dev/null +++ b/limiter/conn_test.go @@ -0,0 +1,38 @@ +package limiter + +import ( + "sync" + "testing" +) + +var c *ConnLimiter + +func init() { + c = NewConnLimiter(1, 1) +} + +func TestConnLimiter_AddConnCount(t *testing.T) { + t.Log(c.AddConnCount("1", "1")) + t.Log(c.AddConnCount("1", "2")) +} + +func TestConnLimiter_DelConnCount(t *testing.T) { + t.Log(c.AddConnCount("1", "1")) + t.Log(c.AddConnCount("1", "2")) + c.DelConnCount("1", "1") + t.Log(c.AddConnCount("1", "2")) +} + +func BenchmarkConnLimiter(b *testing.B) { + wg := sync.WaitGroup{} + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + c.AddConnCount("1", "2") + c.DelConnCount("1", "2") + wg.Done() + }() + } + wg.Wait() + +} diff --git a/limiter/dynamic.go b/limiter/dynamic.go new file mode 100644 index 0000000..7e7d8c5 --- /dev/null +++ b/limiter/dynamic.go @@ -0,0 +1,37 @@ +package limiter + +import ( + "fmt" + "github.com/Yuzuki616/V2bX/api/panel" + "time" +) + +func (l *Limiter) AddDynamicSpeedLimit(tag string, userInfo *panel.UserInfo, limitNum int, expire int64) error { + userLimit := &UserLimitInfo{ + DynamicSpeedLimit: limitNum, + ExpireTime: time.Now().Add(time.Duration(expire) * time.Second).Unix(), + } + l.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, userInfo.Uuid, userInfo.Id), userLimit) + return nil +} + +// determineSpeedLimit returns the minimum non-zero rate +func determineSpeedLimit(limit1, limit2 int) (limit int) { + if limit1 == 0 || limit2 == 0 { + if limit1 > limit2 { + return limit1 + } else if limit1 < limit2 { + return limit2 + } else { + return 0 + } + } else { + if limit1 > limit2 { + return limit2 + } else if limit1 < limit2 { + return limit1 + } else { + return limit1 + } + } +} diff --git a/limiter/limiter.go b/limiter/limiter.go new file mode 100644 index 0000000..df3ba34 --- /dev/null +++ b/limiter/limiter.go @@ -0,0 +1,166 @@ +package limiter + +import ( + "errors" + "fmt" + "github.com/Yuzuki616/V2bX/api/panel" + "github.com/juju/ratelimit" + "sync" + "time" +) + +var limitLock sync.RWMutex +var limiter map[string]*Limiter + +func Init() { + limiter = map[string]*Limiter{} +} + +type Limiter struct { + Rules []panel.DestinationRule + ProtocolRules []string + SpeedLimit int + UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo + ConnLimiter *ConnLimiter // Key: Uid value: ConnLimiter + SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket +} + +type UserLimitInfo struct { + UID int + SpeedLimit int + DynamicSpeedLimit int + ExpireTime int64 +} + +type LimitConfig struct { + SpeedLimit int + IpLimit int + ConnLimit int +} + +func AddLimiter(tag string, l *LimitConfig, users []panel.UserInfo) *Limiter { + info := &Limiter{ + SpeedLimit: l.SpeedLimit, + UserLimitInfo: new(sync.Map), + ConnLimiter: NewConnLimiter(l.ConnLimit, l.IpLimit), + SpeedLimiter: new(sync.Map), + } + for i := range users { + if users[i].SpeedLimit != 0 { + userLimit := &UserLimitInfo{ + UID: users[i].Id, + SpeedLimit: users[i].SpeedLimit, + ExpireTime: 0, + } + info.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, users[i].Uuid, users[i].Id), userLimit) + } + } + limitLock.Lock() + limiter[tag] = info + limitLock.Unlock() + return info +} + +func GetLimiter(tag string) (info *Limiter, err error) { + limitLock.RLock() + info, ok := limiter[tag] + limitLock.RUnlock() + if !ok { + return nil, errors.New("not found") + } + return +} + +func UpdateLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { + l, err := GetLimiter(tag) + if err != nil { + return fmt.Errorf("get limit error: %s", err) + } + for i := range deleted { + l.UserLimitInfo.Delete(fmt.Sprintf("%s|%s|%d", + tag, + deleted[i].Uuid, + deleted[i].Id)) + } + for i := range added { + if added[i].SpeedLimit != 0 { + userLimit := &UserLimitInfo{ + UID: added[i].Id, + SpeedLimit: added[i].SpeedLimit, + ExpireTime: 0, + } + l.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", + tag, + added[i].Uuid, + added[i].Id), userLimit) + } + } + return nil +} + +func DeleteLimiter(tag string) { + limitLock.Lock() + delete(limiter, tag) + limitLock.Unlock() +} + +func (l *Limiter) CheckLimit(email string, ip string) (Bucket *ratelimit.Bucket, Reject bool) { + // ip and conn limiter + if l.ConnLimiter.AddConnCount(email, ip) { + return nil, true + } + // check and gen speed limit Bucket + nodeLimit := l.SpeedLimit + userLimit := 0 + if v, ok := l.UserLimitInfo.Load(email); ok { + u := v.(*UserLimitInfo) + if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 { + if u.SpeedLimit != 0 { + userLimit = u.SpeedLimit + u.DynamicSpeedLimit = 0 + u.ExpireTime = 0 + } else { + l.UserLimitInfo.Delete(email) + } + } else { + userLimit = determineSpeedLimit(u.SpeedLimit, u.DynamicSpeedLimit) + } + } + limit := int64(determineSpeedLimit(nodeLimit, userLimit)) * 1000000 / 8 // If you need the Speed limit + if limit > 0 { + Bucket = ratelimit.NewBucketWithQuantum(time.Second, limit, limit) // Byte/s + if v, ok := l.SpeedLimiter.LoadOrStore(email, Bucket); ok { + return v.(*ratelimit.Bucket), false + } else { + l.SpeedLimiter.Store(email, Bucket) + return Bucket, false + } + } else { + return nil, false + } +} + +type UserIpList struct { + Uid int `json:"Uid"` + IpList []string `json:"Ips"` +} + +func determineDeviceLimit(nodeLimit, userLimit int) (limit int) { + if nodeLimit == 0 || userLimit == 0 { + if nodeLimit > userLimit { + return nodeLimit + } else if nodeLimit < userLimit { + return userLimit + } else { + return 0 + } + } else { + if nodeLimit > userLimit { + return userLimit + } else if nodeLimit < userLimit { + return nodeLimit + } else { + return nodeLimit + } + } +} diff --git a/limiter/rule.go b/limiter/rule.go new file mode 100644 index 0000000..1e95343 --- /dev/null +++ b/limiter/rule.go @@ -0,0 +1,34 @@ +package limiter + +import ( + "github.com/Yuzuki616/V2bX/api/panel" + "reflect" +) + +func (l *Limiter) CheckDomainRule(destination string) (reject bool) { + // have rule + for i := range l.Rules { + if l.Rules[i].Pattern.MatchString(destination) { + reject = true + break + } + } + return +} + +func (l *Limiter) CheckProtocolRule(protocol string) (reject bool) { + for i := range l.ProtocolRules { + if l.ProtocolRules[i] == protocol { + reject = true + break + } + } + return +} + +func (l *Limiter) UpdateRule(newRuleList []panel.DestinationRule) error { + if !reflect.DeepEqual(l.Rules, newRuleList) { + l.Rules = newRuleList + } + return nil +} diff --git a/limiter/task.go b/limiter/task.go new file mode 100644 index 0000000..6a935d2 --- /dev/null +++ b/limiter/task.go @@ -0,0 +1 @@ +package limiter diff --git a/main.go b/main.go index 60fdc7c..e64b5a8 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" + "github.com/Yuzuki616/V2bX/limiter" "github.com/Yuzuki616/V2bX/node" "log" "os" @@ -40,6 +41,7 @@ func main() { if err != nil { log.Panicf("can't unmarshal config file: %s \n", err) } + limiter.Init() log.Println("Start V2bX...") x := core.New(config) err = x.Start() @@ -69,9 +71,9 @@ func main() { log.Panicf("watch config file error: %s", err) } } - //Explicitly triggering GC to remove garbage from config loading. + // clear memory runtime.GC() - // Running backend + // wait exit signal { osSignals := make(chan os.Signal, 1) signal.Notify(osSignals, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM) diff --git a/node/controller/controller.go b/node/controller.go similarity index 79% rename from node/controller/controller.go rename to node/controller.go index 587c08d..d5d68a9 100644 --- a/node/controller/controller.go +++ b/node/controller.go @@ -1,4 +1,4 @@ -package controller +package node import ( "errors" @@ -7,11 +7,12 @@ import ( "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" + "github.com/Yuzuki616/V2bX/limiter" "github.com/xtls/xray-core/common/task" "log" ) -type Node struct { +type Controller struct { server *core.Core clientInfo panel.ClientInfo apiClient panel.Panel @@ -27,9 +28,9 @@ type Node struct { *conf.ControllerConfig } -// New return a Node service with default parameters. -func New(server *core.Core, api panel.Panel, config *conf.ControllerConfig) *Node { - controller := &Node{ +// NewController return a Node controller with default parameters. +func NewController(server *core.Core, api panel.Panel, config *conf.ControllerConfig) *Controller { + controller := &Controller{ server: server, ControllerConfig: config, apiClient: api, @@ -38,7 +39,7 @@ func New(server *core.Core, api panel.Panel, config *conf.ControllerConfig) *Nod } // Start implement the Start() function of the service interface -func (c *Node) Start() error { +func (c *Controller) Start() error { c.clientInfo = c.apiClient.Describe() // First fetch Node Info var err error @@ -46,12 +47,6 @@ func (c *Node) Start() error { if err != nil { return fmt.Errorf("get node info failed: %s", err) } - c.Tag = c.buildNodeTag() - // Add new tag - err = c.addNewTag(c.nodeInfo) - if err != nil { - return fmt.Errorf("add new tag failed: %s", err) - } // Update user c.userList, err = c.apiClient.GetUserList() if err != nil { @@ -60,25 +55,36 @@ func (c *Node) Start() error { if len(c.userList) == 0 { return errors.New("add users failed: not have any user") } + c.Tag = c.buildNodeTag() + + // add limiter + l := limiter.AddLimiter(c.Tag, &limiter.LimitConfig{ + SpeedLimit: c.SpeedLimit, + IpLimit: c.IPLimit, + ConnLimit: c.ConnLimit, + }, c.userList) + // add rule limiter + if !c.DisableGetRule { + if err = l.UpdateRule(c.nodeInfo.Rules); err != nil { + log.Printf("Update rule filed: %s", err) + } + } + // Add new tag + err = c.addNewNode(c.nodeInfo) + if err != nil { + return fmt.Errorf("add new tag failed: %s", err) + } err = c.addNewUser(c.userList, c.nodeInfo) if err != nil { return err } - if err := c.server.AddInboundLimiter(c.Tag, c.nodeInfo, c.userList); err != nil { - return fmt.Errorf("add inbound limiter failed: %s", err) - } - // Add Rule Manager - if !c.DisableGetRule { - if err := c.server.UpdateRule(c.Tag, c.nodeInfo.Rules); err != nil { - log.Printf("Update rule filed: %s", err) - } - } c.initTask() return nil } // Close implement the Close() function of the service interface -func (c *Node) Close() error { +func (c *Controller) Close() error { + limiter.DeleteLimiter(c.Tag) if c.nodeInfoMonitorPeriodic != nil { err := c.nodeInfoMonitorPeriodic.Close() if err != nil { @@ -112,6 +118,6 @@ func (c *Node) Close() error { return nil } -func (c *Node) buildNodeTag() string { +func (c *Controller) buildNodeTag() string { return fmt.Sprintf("%s_%s_%d", c.nodeInfo.NodeType, c.ListenIP, c.nodeInfo.NodeId) } diff --git a/node/controller/controller_test.go b/node/controller_test.go similarity index 96% rename from node/controller/controller_test.go rename to node/controller_test.go index 7323fd5..a7a711a 100644 --- a/node/controller/controller_test.go +++ b/node/controller_test.go @@ -1,4 +1,4 @@ -package controller_test +package node_test import ( "fmt" @@ -6,7 +6,7 @@ import ( "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" _ "github.com/Yuzuki616/V2bX/core/distro/all" - . "github.com/Yuzuki616/V2bX/node/controller" + . "github.com/Yuzuki616/V2bX/node" xCore "github.com/xtls/xray-core/core" coreConf "github.com/xtls/xray-core/infra/conf" "os" diff --git a/node/controller/inbound.go b/node/inbound.go similarity index 98% rename from node/controller/inbound.go rename to node/inbound.go index a786d9e..35ff84c 100644 --- a/node/controller/inbound.go +++ b/node/inbound.go @@ -1,4 +1,4 @@ -package controller +package node import ( "crypto/rand" @@ -8,7 +8,7 @@ import ( "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/common/file" "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/node/controller/lego" + "github.com/Yuzuki616/V2bX/node/lego" "github.com/goccy/go-json" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/core" @@ -65,7 +65,7 @@ func buildInbound(config *conf.ControllerConfig, nodeInfo *panel.NodeInfo, tag s AcceptProxyProtocol: config.EnableProxyProtocol} //Enable proxy protocol } // Set TLS and XTLS settings - if nodeInfo.EnableTls && config.CertConfig.CertMode != "none" { + if config.EnableTls && config.CertConfig.CertMode != "none" { inbound.StreamSetting.Security = "tls" certFile, keyFile, err := getCertFile(config.CertConfig) if err != nil { @@ -91,7 +91,7 @@ func buildInbound(config *conf.ControllerConfig, nodeInfo *panel.NodeInfo, tag s } func buildV2ray(config *conf.ControllerConfig, nodeInfo *panel.NodeInfo, inbound *coreConf.InboundDetourConfig) error { - if nodeInfo.EnableVless { + if config.EnableVless { //Set vless inbound.Protocol = "vless" if config.EnableFallback { diff --git a/node/controller/inbound_test.go b/node/inbound_test.go similarity index 96% rename from node/controller/inbound_test.go rename to node/inbound_test.go index 6ea0a2d..c80dd0a 100644 --- a/node/controller/inbound_test.go +++ b/node/inbound_test.go @@ -1,8 +1,8 @@ -package controller_test +package node_test import ( "github.com/Yuzuki616/V2bX/api/panel" - . "github.com/Yuzuki616/V2bX/node/controller" + . "github.com/Yuzuki616/V2bX/node" "testing" ) diff --git a/node/controller/lego/cert.go b/node/lego/cert.go similarity index 100% rename from node/controller/lego/cert.go rename to node/lego/cert.go diff --git a/node/controller/lego/lego.go b/node/lego/lego.go similarity index 100% rename from node/controller/lego/lego.go rename to node/lego/lego.go diff --git a/node/controller/lego/lego_test.go b/node/lego/lego_test.go similarity index 100% rename from node/controller/lego/lego_test.go rename to node/lego/lego_test.go diff --git a/node/controller/lego/user.go b/node/lego/user.go similarity index 100% rename from node/controller/lego/user.go rename to node/lego/user.go diff --git a/node/node.go b/node/node.go index 872a2d5..8c1541e 100644 --- a/node/node.go +++ b/node/node.go @@ -4,11 +4,10 @@ import ( "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/conf" "github.com/Yuzuki616/V2bX/core" - "github.com/Yuzuki616/V2bX/node/controller" ) type Node struct { - controllers []*controller.Node + controllers []*Controller } func New() *Node { @@ -16,14 +15,14 @@ func New() *Node { } func (n *Node) Start(nodes []*conf.NodeConfig, core *core.Core) error { - n.controllers = make([]*controller.Node, len(nodes)) + n.controllers = make([]*Controller, len(nodes)) for i, c := range nodes { p, err := panel.New(c.ApiConfig) if err != nil { return err } // Register controller service - n.controllers[i] = controller.New(core, p, c.ControllerConfig) + n.controllers[i] = NewController(core, p, c.ControllerConfig) err = n.controllers[i].Start() if err != nil { return err diff --git a/node/controller/outbound.go b/node/outbound.go similarity index 98% rename from node/controller/outbound.go rename to node/outbound.go index 0e5e52e..17cc33f 100644 --- a/node/controller/outbound.go +++ b/node/outbound.go @@ -1,4 +1,4 @@ -package controller +package node import ( "fmt" diff --git a/node/controller/task.go b/node/task.go similarity index 64% rename from node/controller/task.go rename to node/task.go index de368c5..3267f44 100644 --- a/node/controller/task.go +++ b/node/task.go @@ -1,10 +1,10 @@ -package controller +package node import ( "fmt" - "github.com/Yuzuki616/V2bX/api/iprecoder" "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/node/controller/lego" + "github.com/Yuzuki616/V2bX/limiter" + "github.com/Yuzuki616/V2bX/node/lego" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/task" "log" @@ -13,7 +13,7 @@ import ( "time" ) -func (c *Node) initTask() { +func (c *Controller) initTask() { // fetch node info task c.nodeInfoMonitorPeriodic = &task.Periodic{ Interval: time.Duration(c.nodeInfo.BaseConfig.PullInterval.(int)) * time.Second, @@ -36,7 +36,7 @@ func (c *Node) initTask() { time.Sleep(time.Duration(c.nodeInfo.BaseConfig.PushInterval.(int)) * time.Second) _ = c.userReportPeriodic.Start() }() - if c.nodeInfo.EnableTls && c.CertConfig.CertMode != "none" && + if c.EnableTls && c.CertConfig.CertMode != "none" && (c.CertConfig.CertMode == "dns" || c.CertConfig.CertMode == "http") { c.renewCertPeriodic = &task.Periodic{ Interval: time.Hour * 24, @@ -48,42 +48,9 @@ func (c *Node) initTask() { _ = c.renewCertPeriodic.Start() }() } - if c.EnableDynamicSpeedLimit { - // Check dynamic speed limit task - c.dynamicSpeedLimitPeriodic = &task.Periodic{ - Interval: time.Duration(c.DynamicSpeedLimitConfig.Periodic) * time.Second, - Execute: c.dynamicSpeedLimit, - } - go func() { - time.Sleep(time.Duration(c.DynamicSpeedLimitConfig.Periodic) * time.Second) - _ = c.dynamicSpeedLimitPeriodic.Start() - }() - log.Printf("[%s: %d] Start dynamic speed limit", c.nodeInfo.NodeType, c.nodeInfo.NodeId) - } - if c.EnableIpRecorder { - switch c.IpRecorderConfig.Type { - case "Recorder": - c.ipRecorder = iprecoder.NewRecorder(c.IpRecorderConfig.RecorderConfig) - case "Redis": - c.ipRecorder = iprecoder.NewRedis(c.IpRecorderConfig.RedisConfig) - default: - log.Printf("recorder type: %s is not vail, disable recorder", c.IpRecorderConfig.Type) - return - } - // report and fetch online ip list task - c.onlineIpReportPeriodic = &task.Periodic{ - Interval: time.Duration(c.IpRecorderConfig.Periodic) * time.Second, - Execute: c.reportOnlineIp, - } - go func() { - time.Sleep(time.Duration(c.IpRecorderConfig.Periodic) * time.Second) - _ = c.onlineIpReportPeriodic.Start() - }() - log.Printf("[%s: %d] Start report online ip", c.nodeInfo.NodeType, c.nodeInfo.NodeId) - } } -func (c *Node) nodeInfoMonitor() (err error) { +func (c *Controller) nodeInfoMonitor() (err error) { // First fetch Node Info newNodeInfo, err := c.apiClient.GetNodeInfo() if err != nil { @@ -95,28 +62,22 @@ func (c *Node) nodeInfoMonitor() (err error) { if newNodeInfo != nil { // Remove old tag oldTag := c.Tag - err := c.removeOldTag(oldTag) + err := c.removeOldNode(oldTag) if err != nil { log.Print(err) return nil } + // Remove Old limiter + limiter.DeleteLimiter(oldTag) // Add new tag c.nodeInfo = newNodeInfo c.Tag = c.buildNodeTag() - err = c.addNewTag(newNodeInfo) + err = c.addNewNode(newNodeInfo) if err != nil { log.Print(err) return nil } nodeInfoChanged = true - // Remove Old limiter - if err = c.server.DeleteInboundLimiter(oldTag); err != nil { - log.Print(err) - return nil - } - if err := c.server.UpdateRule(c.Tag, newNodeInfo.Rules); err != nil { - log.Print(err) - } } // Update User newUserInfo, err := c.apiClient.GetUserList() @@ -126,15 +87,20 @@ func (c *Node) nodeInfoMonitor() (err error) { } if nodeInfoChanged { c.userList = newUserInfo - err = c.addNewUser(c.userList, newNodeInfo) + // Add new Limiter + l := limiter.AddLimiter(c.Tag, &limiter.LimitConfig{ + SpeedLimit: c.SpeedLimit, + IpLimit: c.IPLimit, + ConnLimit: c.ConnLimit, + }, newUserInfo) + err = c.addNewUser(newUserInfo, newNodeInfo) if err != nil { log.Print(err) return nil } - // Add Limiter - if err := c.server.AddInboundLimiter(c.Tag, newNodeInfo, newUserInfo); err != nil { - log.Print(err) - return nil + err = l.UpdateRule(newNodeInfo.Rules) + if err != nil { + log.Printf("Update Rule error: %s", err) } // Check interval if c.nodeInfoMonitorPeriodic.Interval != time.Duration(newNodeInfo.BaseConfig.PullInterval.(int))*time.Second { @@ -176,8 +142,9 @@ func (c *Node) nodeInfoMonitor() (err error) { } if len(added) > 0 || len(deleted) > 0 { // Update Limiter - if err := c.server.UpdateInboundLimiter(c.Tag, added, deleted); err != nil { - log.Print(err) + err = limiter.UpdateLimiter(c.Tag, added, deleted) + if err != nil { + log.Print("update limiter:", err) } } log.Printf("[%s: %d] %d user deleted, %d user added", c.nodeInfo.NodeType, c.nodeInfo.NodeId, @@ -187,7 +154,7 @@ func (c *Node) nodeInfoMonitor() (err error) { return nil } -func (c *Node) removeOldTag(oldTag string) (err error) { +func (c *Controller) removeOldNode(oldTag string) (err error) { err = c.server.RemoveInbound(oldTag) if err != nil { return err @@ -199,7 +166,7 @@ func (c *Node) removeOldTag(oldTag string) (err error) { return nil } -func (c *Node) addNewTag(newNodeInfo *panel.NodeInfo) (err error) { +func (c *Controller) addNewNode(newNodeInfo *panel.NodeInfo) (err error) { inboundConfig, err := buildInbound(c.ControllerConfig, newNodeInfo, c.Tag) if err != nil { return fmt.Errorf("build inbound error: %s", err) @@ -219,10 +186,10 @@ func (c *Node) addNewTag(newNodeInfo *panel.NodeInfo) (err error) { return nil } -func (c *Node) addNewUser(userInfo []panel.UserInfo, nodeInfo *panel.NodeInfo) (err error) { +func (c *Controller) addNewUser(userInfo []panel.UserInfo, nodeInfo *panel.NodeInfo) (err error) { users := make([]*protocol.User, 0) if nodeInfo.NodeType == "V2ray" { - if nodeInfo.EnableVless { + if c.EnableVless { users = c.buildVlessUsers(userInfo) } else { users = c.buildVmessUsers(userInfo) @@ -270,7 +237,7 @@ func compareUserList(old, new []panel.UserInfo) (deleted, added []panel.UserInfo return deleted, added } -func (c *Node) reportUserTraffic() (err error) { +func (c *Controller) reportUserTraffic() (err error) { // Get User traffic userTraffic := make([]panel.UserTraffic, 0) for i := range c.userList { @@ -294,52 +261,11 @@ func (c *Node) reportUserTraffic() (err error) { } } userTraffic = nil - if !c.EnableIpRecorder { - c.server.ClearOnlineIp(c.Tag) - } runtime.GC() return nil } -func (c *Node) reportOnlineIp() (err error) { - onlineIp, err := c.server.ListOnlineIp(c.Tag) - if err != nil { - log.Print(err) - return nil - } - onlineIp, err = c.ipRecorder.SyncOnlineIp(onlineIp) - if err != nil { - log.Print("Report online ip error: ", err) - c.server.ClearOnlineIp(c.Tag) - } - if c.IpRecorderConfig.EnableIpSync { - c.server.UpdateOnlineIp(c.Tag, onlineIp) - log.Printf("[Node: %d] Updated %d online ip", c.nodeInfo.NodeId, len(onlineIp)) - } - log.Printf("[Node: %d] Report %d online ip", c.nodeInfo.NodeId, len(onlineIp)) - return nil -} - -func (c *Node) dynamicSpeedLimit() error { - if c.EnableDynamicSpeedLimit { - for i := range c.userList { - up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), false) - if c.userList[i].Traffic+down+up/1024/1024 > c.DynamicSpeedLimitConfig.Traffic { - err := c.server.AddUserSpeedLimit(c.Tag, - &c.userList[i], - c.DynamicSpeedLimitConfig.SpeedLimit, - time.Now().Add(time.Second*time.Duration(c.DynamicSpeedLimitConfig.ExpireTime)).Unix()) - if err != nil { - log.Print(err) - } - } - c.userList[i].Traffic = 0 - } - } - return nil -} - -func (c *Node) RenewCert() { +func (c *Controller) RenewCert() { l, err := lego.New(c.CertConfig) if err != nil { log.Print(err) diff --git a/node/controller/user.go b/node/user.go similarity index 76% rename from node/controller/user.go rename to node/user.go index dce63bb..b8d6436 100644 --- a/node/controller/user.go +++ b/node/user.go @@ -1,4 +1,4 @@ -package controller +package node import ( "encoding/base64" @@ -14,7 +14,7 @@ import ( "strings" ) -func (c *Node) buildVmessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { +func (c *Controller) buildVmessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i, user := range userInfo { users[i] = c.buildVmessUser(&user, 0) @@ -22,7 +22,7 @@ func (c *Node) buildVmessUsers(userInfo []panel.UserInfo) (users []*protocol.Use return users } -func (c *Node) buildVmessUser(userInfo *panel.UserInfo, serverAlterID uint16) (user *protocol.User) { +func (c *Controller) buildVmessUser(userInfo *panel.UserInfo, serverAlterID uint16) (user *protocol.User) { vmessAccount := &conf.VMessAccount{ ID: userInfo.Uuid, AlterIds: serverAlterID, @@ -35,7 +35,7 @@ func (c *Node) buildVmessUser(userInfo *panel.UserInfo, serverAlterID uint16) (u } } -func (c *Node) buildVlessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { +func (c *Controller) buildVlessUsers(userInfo []panel.UserInfo) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i := range userInfo { users[i] = c.buildVlessUser(&(userInfo)[i]) @@ -43,7 +43,7 @@ func (c *Node) buildVlessUsers(userInfo []panel.UserInfo) (users []*protocol.Use return users } -func (c *Node) buildVlessUser(userInfo *panel.UserInfo) (user *protocol.User) { +func (c *Controller) buildVlessUser(userInfo *panel.UserInfo) (user *protocol.User) { vlessAccount := &vless.Account{ Id: userInfo.Uuid, Flow: "xtls-rprx-direct", @@ -55,7 +55,7 @@ func (c *Node) buildVlessUser(userInfo *panel.UserInfo) (user *protocol.User) { } } -func (c *Node) buildTrojanUsers(userInfo []panel.UserInfo) (users []*protocol.User) { +func (c *Controller) buildTrojanUsers(userInfo []panel.UserInfo) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i := range userInfo { users[i] = c.buildTrojanUser(&(userInfo)[i]) @@ -63,7 +63,7 @@ func (c *Node) buildTrojanUsers(userInfo []panel.UserInfo) (users []*protocol.Us return users } -func (c *Node) buildTrojanUser(userInfo *panel.UserInfo) (user *protocol.User) { +func (c *Controller) buildTrojanUser(userInfo *panel.UserInfo) (user *protocol.User) { trojanAccount := &trojan.Account{ Password: userInfo.Uuid, Flow: "xtls-rprx-direct", @@ -90,7 +90,7 @@ func getCipherFromString(c string) shadowsocks.CipherType { } } -func (c *Node) buildSSUsers(userInfo []panel.UserInfo, cypher shadowsocks.CipherType) (users []*protocol.User) { +func (c *Controller) buildSSUsers(userInfo []panel.UserInfo, cypher shadowsocks.CipherType) (users []*protocol.User) { users = make([]*protocol.User, len(userInfo)) for i := range userInfo { users[i] = c.buildSSUser(&(userInfo)[i], cypher) @@ -98,7 +98,7 @@ func (c *Node) buildSSUsers(userInfo []panel.UserInfo, cypher shadowsocks.Cipher return users } -func (c *Node) buildSSUser(userInfo *panel.UserInfo, cypher shadowsocks.CipherType) (user *protocol.User) { +func (c *Controller) buildSSUser(userInfo *panel.UserInfo, cypher shadowsocks.CipherType) (user *protocol.User) { if c.nodeInfo.ServerKey == "" { ssAccount := &shadowsocks.Account{ Password: userInfo.Uuid, @@ -121,6 +121,6 @@ func (c *Node) buildSSUser(userInfo *panel.UserInfo, cypher shadowsocks.CipherTy } } -func (c *Node) buildUserTag(user *panel.UserInfo) string { +func (c *Controller) buildUserTag(user *panel.UserInfo) string { return fmt.Sprintf("%s|%s|%d", c.Tag, user.Uuid, user.Id) }