diff --git a/api/panel/node.go b/api/panel/node.go index 86f311f..4661b06 100644 --- a/api/panel/node.go +++ b/api/panel/node.go @@ -4,18 +4,16 @@ import ( "bytes" "encoding/base64" "fmt" - log "github.com/sirupsen/logrus" - coreConf "github.com/xtls/xray-core/infra/conf" "os" "reflect" - "regexp" "strconv" "strings" "time" "github.com/Yuzuki616/V2bX/common/crypt" - "github.com/goccy/go-json" + log "github.com/sirupsen/logrus" + coreConf "github.com/xtls/xray-core/infra/conf" ) type CommonNodeRsp struct { @@ -58,7 +56,7 @@ type HysteriaNodeRsp struct { type NodeInfo struct { Id int Type string - Rules []*regexp.Regexp + Rules Rules Host string Port int Network string @@ -75,6 +73,11 @@ type NodeInfo struct { PullInterval time.Duration } +type Rules struct { + Regexp []string + Protocol []string +} + type V2rayExtraConfig struct { EnableVless string `json:"EnableVless"` VlessFlow string `json:"VlessFlow"` @@ -97,7 +100,7 @@ func (c *Client) GetNodeInfo() (node *NodeInfo, err error) { const path = "/api/v1/server/UniProxy/config" r, err := c.client. R(). - SetHeader("If-None-Match", c.etag). + SetHeader("If-None-Match", c.nodeEtag). Get(path) if err = c.checkResponse(r, path, err); err != nil { return @@ -131,7 +134,13 @@ func (c *Client) GetNodeInfo() (node *NodeInfo, err error) { switch common.Routes[i].Action { case "block": for _, v := range matchs { - node.Rules = append(node.Rules, regexp.MustCompile(v)) + if strings.HasPrefix(v, "protocol:") { + // protocol + node.Rules.Protocol = append(node.Rules.Protocol, strings.TrimPrefix(v, "protocol:")) + } else { + // domain + node.Rules.Regexp = append(node.Rules.Regexp, strings.TrimPrefix(v, "regexp:")) + } } case "dns": if matchs[0] != "main" { @@ -218,7 +227,7 @@ func (c *Client) GetNodeInfo() (node *NodeInfo, err error) { node.UpMbps = rsp.UpMbps node.HyObfs = rsp.Obfs } - c.etag = r.Header().Get("ETag") + c.nodeEtag = r.Header().Get("ETag") return } diff --git a/api/panel/panel.go b/api/panel/panel.go index c1acaa2..29aca2b 100644 --- a/api/panel/panel.go +++ b/api/panel/panel.go @@ -23,7 +23,8 @@ type Client struct { NodeType string NodeId int LocalRuleList []*regexp.Regexp - etag string + nodeEtag string + userEtag string } func New(c *conf.ApiConfig) (*Client, error) { diff --git a/api/panel/user.go b/api/panel/user.go index fbb4992..8357a7d 100644 --- a/api/panel/user.go +++ b/api/panel/user.go @@ -2,6 +2,7 @@ package panel import ( "fmt" + "github.com/goccy/go-json" ) @@ -14,7 +15,6 @@ type UserInfo struct { Id int `json:"id"` Uuid string `json:"uuid"` SpeedLimit int `json:"speed_limit"` - Traffic int64 `json:"-"` } type UserListBody struct { @@ -25,17 +25,23 @@ type UserListBody struct { // GetUserList will pull user form sspanel func (c *Client) GetUserList() (UserList []UserInfo, err error) { const path = "/api/v1/server/UniProxy/user" - res, err := c.client.R(). + r, err := c.client.R(). + SetHeader("If-None-Match", c.userEtag). Get(path) - err = c.checkResponse(res, path, err) + err = c.checkResponse(r, path, err) if err != nil { return nil, err } + err = c.checkResponse(r, path, err) + if r.StatusCode() == 304 { + return nil, nil + } var userList *UserListBody - err = json.Unmarshal(res.Body(), &userList) + err = json.Unmarshal(r.Body(), &userList) if err != nil { return nil, fmt.Errorf("unmarshal userlist error: %s", err) } + c.userEtag = r.Header().Get("ETag") return userList.Users, nil } @@ -52,11 +58,11 @@ func (c *Client) ReportUserTraffic(userTraffic []UserTraffic) error { data[userTraffic[i].UID] = []int64{userTraffic[i].Upload, userTraffic[i].Download} } const path = "/api/v1/server/UniProxy/push" - res, err := c.client.R(). + r, err := c.client.R(). SetBody(data). ForceContentType("application/json"). Post(path) - err = c.checkResponse(res, path, err) + err = c.checkResponse(r, path, err) if err != nil { return err } diff --git a/core/hy/server.go b/core/hy/server.go index 6e34831..7f919c4 100644 --- a/core/hy/server.go +++ b/core/hy/server.go @@ -3,16 +3,16 @@ package hy import ( "crypto/tls" "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/limiter" - "github.com/apernet/hysteria/core/sockopt" "io" "net" "sync" "sync/atomic" "time" + "github.com/Yuzuki616/V2bX/api/panel" + "github.com/Yuzuki616/V2bX/conf" + "github.com/Yuzuki616/V2bX/limiter" + "github.com/apernet/hysteria/core/sockopt" "github.com/quic-go/quic-go" "github.com/apernet/hysteria/core/acl" @@ -121,22 +121,6 @@ func (s *Server) runServer(node *panel.NodeInfo, c *conf.ControllerConfig) error } // ACL var aclEngine *acl.Engine - /*if len(config.ACL) > 0 { - aclEngine, err = acl.LoadFromFile(config.ACL, func(addr string) (*net.IPAddr, error) { - ipAddr, _, err := transport.DefaultServerTransport.ResolveIPAddr(addr) - return ipAddr, err - }, - func() (*geoip2.Reader, error) { - return loadMMDBReader(config.MMDB) - }) - if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - "file": config.ACL, - }).Fatal("Failed to parse ACL") - } - aclEngine.DefaultAction = acl.ActionDirect - }*/ // Prometheus s.counter = NewUserTrafficCounter() // Packet conn diff --git a/core/xray/app/dispatcher/default.go b/core/xray/app/dispatcher/default.go index 7c33636..d872b7f 100644 --- a/core/xray/app/dispatcher/default.go +++ b/core/xray/app/dispatcher/default.go @@ -337,7 +337,13 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin reader: outbound.Reader.(*pipe.Reader), } outbound.Reader = cReader - result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network) + result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network, l) + if _, ok := err.(limitedError); ok { + newError(err).AtInfo().WriteToLog() + common.Close(outbound.Writer) + common.Interrupt(outbound.Reader) + return + } if err == nil { content.Protocol = result.Protocol() } @@ -380,7 +386,13 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De reader: outbound.Reader.(*pipe.Reader), } outbound.Reader = cReader - result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network) + result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network, nil) + if _, ok := err.(limitedError); ok { + newError(err).AtInfo().WriteToLog() + common.Close(outbound.Writer) + common.Interrupt(outbound.Reader) + return + } if err == nil { content.Protocol = result.Protocol() } @@ -400,18 +412,50 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De return nil } -func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, network net.Network) (SniffResult, error) { +type limitedError string + +func (l limitedError) Error() string { + return string(l) +} + +func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, network net.Network, l *limiter.Limiter) (result SniffResult, err error) { payload := buf.New() defer payload.Release() + defer func() { + if err != nil { + return + } + // Check if domain and protocol hit the rule + sessionInbound := session.InboundFromContext(ctx) + // Whether the inbound connection contains a user + if sessionInbound.User != nil { + if l == nil { + l, err = limiter.GetLimiter(sessionInbound.Tag) + if err != nil { + return + } + } + if l.CheckDomainRule(result.Domain()) { + err = limitedError(fmt.Sprintf( + "User %s access domain %s reject by rule", + sessionInbound.User.Email, + result.Domain())) + } + if l.CheckProtocolRule(result.Protocol()) { + err = limitedError(fmt.Sprintf( + "User %s access protocol %s reject by rule", + sessionInbound.User.Email, + result.Protocol())) + } + } + }() + sniffer := NewSniffer(ctx) - metaresult, metadataErr := sniffer.SniffMetadata(ctx) - if metadataOnly { return metaresult, metadataErr } - contentResult, contentErr := func() (SniffResult, error) { totalAttempt := 0 for { @@ -460,32 +504,17 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport. } } } - var handler outbound.Handler - // Check if domain and protocol hit the rule - sessionInbound := session.InboundFromContext(ctx) - // Whether the inbound connection contains a user - if sessionInbound.User != nil { - if l == nil { - var err error - l, err = limiter.GetLimiter(sessionInbound.Tag) - if err != nil { - newError("Get limiter error: ", err).AtError().WriteToLog() - common.Close(link.Writer) - common.Interrupt(link.Reader) - return + // del connect count + if l != nil { + sessionInbound := session.InboundFromContext(ctx) + if sessionInbound.User != nil { + if destination.Network == net.Network_TCP { + defer func() { + l.ConnLimiter.DelConnCount(sessionInbound.User.Email, sessionInbound.Source.Address.IP().String()) + }() } - } else if destination.Network == net.Network_TCP { - defer func() { - l.ConnLimiter.DelConnCount(sessionInbound.User.Email, sessionInbound.Source.Address.IP().String()) - }() - } - if l.CheckDomainRule(destination.Address.String()) { - newError(fmt.Sprintf("User %s access %s reject by rule", sessionInbound.User.Email, destination.String())).AtError().WriteToLog() - common.Close(link.Writer) - common.Interrupt(link.Reader) - return } } diff --git a/core/xray/xray.go b/core/xray/xray.go index d718094..b11baa2 100644 --- a/core/xray/xray.go +++ b/core/xray/xray.go @@ -1,6 +1,9 @@ package xray import ( + "os" + "sync" + "github.com/Yuzuki616/V2bX/conf" vCore "github.com/Yuzuki616/V2bX/core" "github.com/Yuzuki616/V2bX/core/xray/app/dispatcher" @@ -16,8 +19,6 @@ import ( "github.com/xtls/xray-core/features/routing" statsFeature "github.com/xtls/xray-core/features/stats" coreConf "github.com/xtls/xray-core/infra/conf" - "os" - "sync" ) func init() { diff --git a/limiter/limiter.go b/limiter/limiter.go index 149538a..60973b0 100644 --- a/limiter/limiter.go +++ b/limiter/limiter.go @@ -2,16 +2,16 @@ package limiter import ( "errors" - "fmt" + "regexp" + "sync" + "time" + "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/common/format" "github.com/Yuzuki616/V2bX/conf" "github.com/juju/ratelimit" log "github.com/sirupsen/logrus" "github.com/xtls/xray-core/common/task" - "regexp" - "sync" - "time" ) var limitLock sync.RWMutex @@ -32,7 +32,7 @@ func Init() { } type Limiter struct { - Rules []*regexp.Regexp + DomainRules []*regexp.Regexp ProtocolRules []string SpeedLimit int UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo @@ -59,7 +59,6 @@ func AddLimiter(tag string, l *conf.LimitConfig, users []panel.UserInfo) *Limite userLimit := &UserLimitInfo{ UID: users[i].Id, SpeedLimit: users[i].SpeedLimit, - ExpireTime: 0, } info.UserLimitInfo.Store(format.UserTag(tag, users[i].Uuid), userLimit) } @@ -80,11 +79,13 @@ func GetLimiter(tag string) (info *Limiter, err error) { return } -func UpdateLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { - l, err := GetLimiter(tag) - if err != nil { - return fmt.Errorf("get limit error: %s", err) - } +func DeleteLimiter(tag string) { + limitLock.Lock() + delete(limiter, tag) + limitLock.Unlock() +} + +func (l *Limiter) UpdateUser(tag string, added []panel.UserInfo, deleted []panel.UserInfo) { for i := range deleted { l.UserLimitInfo.Delete(format.UserTag(tag, deleted[i].Uuid)) } @@ -98,13 +99,17 @@ func UpdateLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) l.UserLimitInfo.Store(format.UserTag(tag, added[i].Uuid), userLimit) } } - return nil } -func DeleteLimiter(tag string) { - limitLock.Lock() - delete(limiter, tag) - limitLock.Unlock() +func (l *Limiter) UpdateDynamicSpeedLimit(tag, uuid string, limit int, expire time.Time) error { + if v, ok := l.UserLimitInfo.Load(format.UserTag(tag, uuid)); ok { + info := v.(*UserLimitInfo) + info.DynamicSpeedLimit = limit + info.ExpireTime = expire.Unix() + } else { + return errors.New("not found") + } + return nil } func (l *Limiter) CheckLimit(email string, ip string, isTcp bool) (Bucket *ratelimit.Bucket, Reject bool) { diff --git a/limiter/rule.go b/limiter/rule.go index ce4dcf5..4f6e4c8 100644 --- a/limiter/rule.go +++ b/limiter/rule.go @@ -1,14 +1,15 @@ package limiter import ( - "reflect" "regexp" + + "github.com/Yuzuki616/V2bX/api/panel" ) func (l *Limiter) CheckDomainRule(destination string) (reject bool) { // have rule - for i := range l.Rules { - if l.Rules[i].MatchString(destination) { + for i := range l.DomainRules { + if l.DomainRules[i].MatchString(destination) { reject = true break } @@ -26,9 +27,11 @@ func (l *Limiter) CheckProtocolRule(protocol string) (reject bool) { return } -func (l *Limiter) UpdateRule(newRuleList []*regexp.Regexp) error { - if !reflect.DeepEqual(l.Rules, newRuleList) { - l.Rules = newRuleList +func (l *Limiter) UpdateRule(rule *panel.Rules) error { + l.DomainRules = make([]*regexp.Regexp, len(rule.Regexp)) + for i := range rule.Regexp { + l.DomainRules[i] = regexp.MustCompile(rule.Regexp[i]) } + l.ProtocolRules = rule.Protocol return nil } diff --git a/node/controller.go b/node/controller.go index 725b2ad..4f389cd 100644 --- a/node/controller.go +++ b/node/controller.go @@ -3,6 +3,7 @@ package node import ( "errors" "fmt" + "github.com/Yuzuki616/V2bX/api/iprecoder" "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/common/task" @@ -15,8 +16,9 @@ import ( type Controller struct { server vCore.Core apiClient *panel.Client - nodeInfo *panel.NodeInfo - Tag string + tag string + limiter *limiter.Limiter + traffic map[string]int64 userList []panel.UserInfo ipRecorder iprecoder.IpRecorder nodeInfoMonitorPeriodic *task.Task @@ -41,7 +43,7 @@ func NewController(server vCore.Core, api *panel.Client, config *conf.Controller func (c *Controller) Start() error { // First fetch Node Info var err error - c.nodeInfo, err = c.apiClient.GetNodeInfo() + node, err := c.apiClient.GetNodeInfo() if err != nil { return fmt.Errorf("get node info error: %s", err) } @@ -53,42 +55,43 @@ func (c *Controller) Start() error { if len(c.userList) == 0 { return errors.New("add users error: not have any user") } - c.Tag = c.buildNodeTag() + c.tag = c.buildNodeTag(node) // add limiter - l := limiter.AddLimiter(c.Tag, &c.LimitConfig, c.userList) + l := limiter.AddLimiter(c.tag, &c.LimitConfig, c.userList) // add rule limiter - if err = l.UpdateRule(c.nodeInfo.Rules); err != nil { + if err = l.UpdateRule(&node.Rules); err != nil { return fmt.Errorf("update rule error: %s", err) } - if c.nodeInfo.Tls || c.nodeInfo.Type == "hysteria" { + c.limiter = l + if node.Tls || node.Type == "hysteria" { err = c.requestCert() if err != nil { return fmt.Errorf("request cert error: %s", err) } } // Add new tag - err = c.server.AddNode(c.Tag, c.nodeInfo, c.ControllerConfig) + err = c.server.AddNode(c.tag, node, c.ControllerConfig) if err != nil { return fmt.Errorf("add new node error: %s", err) } added, err := c.server.AddUsers(&vCore.AddUsersParams{ - Tag: c.Tag, + Tag: c.tag, Config: c.ControllerConfig, UserInfo: c.userList, - NodeInfo: c.nodeInfo, + NodeInfo: node, }) if err != nil { return fmt.Errorf("add users error: %s", err) } - log.WithField("tag", c.Tag).Infof("Added %d new users", added) - c.initTask() + log.WithField("tag", c.tag).Infof("Added %d new users", added) + c.startTasks(node) return nil } // Close implement the Close() function of the service interface func (c *Controller) Close() error { - limiter.DeleteLimiter(c.Tag) + limiter.DeleteLimiter(c.tag) if c.nodeInfoMonitorPeriodic != nil { c.nodeInfoMonitorPeriodic.Close() } @@ -107,6 +110,6 @@ func (c *Controller) Close() error { return nil } -func (c *Controller) buildNodeTag() string { - return fmt.Sprintf("%s-%s-%d", c.apiClient.APIHost, c.nodeInfo.Type, c.nodeInfo.Id) +func (c *Controller) buildNodeTag(node *panel.NodeInfo) string { + return fmt.Sprintf("%s-%s-%d", c.apiClient.APIHost, node.Type, node.Id) } diff --git a/node/task.go b/node/task.go index 550f11d..8940dd1 100644 --- a/node/task.go +++ b/node/task.go @@ -1,30 +1,32 @@ package node import ( + "time" + + "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/common/task" vCore "github.com/Yuzuki616/V2bX/core" "github.com/Yuzuki616/V2bX/limiter" log "github.com/sirupsen/logrus" - "time" ) -func (c *Controller) initTask() { +func (c *Controller) startTasks(node *panel.NodeInfo) { // fetch node info task c.nodeInfoMonitorPeriodic = &task.Task{ - Interval: c.nodeInfo.PullInterval, + Interval: node.PullInterval, Execute: c.nodeInfoMonitor, } // fetch user list task c.userReportPeriodic = &task.Task{ - Interval: c.nodeInfo.PushInterval, + Interval: node.PushInterval, Execute: c.reportUserTrafficTask, } - log.WithField("tag", c.Tag).Info("Start monitor node status") + log.WithField("tag", c.tag).Info("Start monitor node status") // delay to start nodeInfoMonitor _ = c.nodeInfoMonitorPeriodic.Start(false) - log.WithField("tag", c.Tag).Info("Start report node status") + log.WithField("tag", c.tag).Info("Start report node status") _ = c.userReportPeriodic.Start(false) - if c.nodeInfo.Tls { + if node.Tls { switch c.CertConfig.CertMode { case "reality", "none", "": default: @@ -32,11 +34,18 @@ func (c *Controller) initTask() { Interval: time.Hour * 24, Execute: c.reportUserTrafficTask, } - log.WithField("tag", c.Tag).Info("Start renew cert") + log.WithField("tag", c.tag).Info("Start renew cert") // delay to start renewCert _ = c.renewCertPeriodic.Start(true) } } + if c.LimitConfig.EnableDynamicSpeedLimit { + c.traffic = make(map[string]int64) + c.renewCertPeriodic = &task.Task{ + Interval: time.Duration(c.LimitConfig.DynamicSpeedLimitConfig.Periodic) * time.Minute, + Execute: c.reportUserTrafficTask, + } + } } func (c *Controller) nodeInfoMonitor() (err error) { @@ -44,7 +53,7 @@ func (c *Controller) nodeInfoMonitor() (err error) { newNodeInfo, err := c.apiClient.GetNodeInfo() if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Get node info failed") return nil @@ -53,69 +62,74 @@ func (c *Controller) nodeInfoMonitor() (err error) { newUserInfo, err := c.apiClient.GetUserList() if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Get user list failed") return nil } if newNodeInfo != nil { // nodeInfo changed + if newUserInfo != nil { + c.userList = newUserInfo + } + c.traffic = make(map[string]int64) // Remove old tag - log.WithField("tag", c.Tag).Info("Node changed, reload") - err = c.server.DelNode(c.Tag) + log.WithField("tag", c.tag).Info("Node changed, reload") + err = c.server.DelNode(c.tag) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Delete node failed") return nil } // Remove Old limiter - limiter.DeleteLimiter(c.Tag) + limiter.DeleteLimiter(c.tag) // Add new Limiter - c.Tag = c.buildNodeTag() - l := limiter.AddLimiter(c.Tag, &c.LimitConfig, newUserInfo) + c.tag = c.buildNodeTag(newNodeInfo) + l := limiter.AddLimiter(c.tag, &c.LimitConfig, c.userList) // check cert if newNodeInfo.Tls || newNodeInfo.Type == "hysteria" { err = c.requestCert() if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Request cert failed") return nil } } // add new node - err = c.server.AddNode(c.Tag, newNodeInfo, c.ControllerConfig) + err = c.server.AddNode(c.tag, newNodeInfo, c.ControllerConfig) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Add node failed") return nil } _, err = c.server.AddUsers(&vCore.AddUsersParams{ - Tag: c.Tag, + Tag: c.tag, Config: c.ControllerConfig, - UserInfo: newUserInfo, + UserInfo: c.userList, NodeInfo: newNodeInfo, }) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Add users failed") return nil } - err = l.UpdateRule(newNodeInfo.Rules) + err = l.UpdateRule(&newNodeInfo.Rules) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Update Rule failed") return nil } + c.limiter = l // Check interval if c.nodeInfoMonitorPeriodic.Interval != newNodeInfo.PullInterval && newNodeInfo.PullInterval != 0 { @@ -129,21 +143,22 @@ func (c *Controller) nodeInfoMonitor() (err error) { c.userReportPeriodic.Close() _ = c.userReportPeriodic.Start(false) } - c.nodeInfo = newNodeInfo - c.userList = newUserInfo - log.WithField("tag", c.Tag).Infof("Added %d new users", len(newUserInfo)) + log.WithField("tag", c.tag).Infof("Added %d new users", len(c.userList)) // exit return nil } // node no changed, check users + if len(newUserInfo) == 0 { + return nil + } deleted, added := compareUserList(c.userList, newUserInfo) if len(deleted) > 0 { // have deleted users - err = c.server.DelUsers(deleted, c.Tag) + err = c.server.DelUsers(deleted, c.tag) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Delete users failed") return nil @@ -152,14 +167,13 @@ func (c *Controller) nodeInfoMonitor() (err error) { if len(added) > 0 { // have added users _, err = c.server.AddUsers(&vCore.AddUsersParams{ - Tag: c.Tag, + Tag: c.tag, Config: c.ControllerConfig, UserInfo: added, - NodeInfo: c.nodeInfo, }) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Add users failed") return nil @@ -167,19 +181,37 @@ func (c *Controller) nodeInfoMonitor() (err error) { } if len(added) > 0 || len(deleted) > 0 { // update Limiter - err = limiter.UpdateLimiter(c.Tag, added, deleted) + c.limiter.UpdateUser(c.tag, added, deleted) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("limiter users failed") return nil } + // clear traffic record + if c.LimitConfig.EnableDynamicSpeedLimit { + for i := range deleted { + delete(c.traffic, deleted[i].Uuid) + } + } } c.userList = newUserInfo if len(added)+len(deleted) != 0 { - log.WithField("tag", c.Tag). + log.WithField("tag", c.tag). Infof("%d user deleted, %d user added", len(deleted), len(added)) } return nil } + +func (c *Controller) SpeedChecker() { + for u, t := range c.traffic { + if t >= c.LimitConfig.DynamicSpeedLimitConfig.Traffic { + err := c.limiter.UpdateDynamicSpeedLimit(c.tag, u, + c.LimitConfig.DynamicSpeedLimitConfig.SpeedLimit, + time.Now().Add(time.Duration(c.LimitConfig.DynamicSpeedLimitConfig.ExpireTime)*time.Minute)) + log.WithField("err", err).Error("Update dynamic speed limit failed") + delete(c.traffic, u) + } + } +} diff --git a/node/user.go b/node/user.go index 91aa5df..be17479 100644 --- a/node/user.go +++ b/node/user.go @@ -1,20 +1,25 @@ package node import ( - "github.com/Yuzuki616/V2bX/api/panel" - log "github.com/sirupsen/logrus" "runtime" "strconv" + + "github.com/Yuzuki616/V2bX/api/panel" + log "github.com/sirupsen/logrus" ) func (c *Controller) reportUserTrafficTask() (err error) { // Get User traffic userTraffic := make([]panel.UserTraffic, 0) for i := range c.userList { - up, down := c.server.GetUserTraffic(c.Tag, c.userList[i].Uuid, true) + up, down := c.server.GetUserTraffic(c.tag, c.userList[i].Uuid, true) if up > 0 || down > 0 { if c.LimitConfig.EnableDynamicSpeedLimit { - c.userList[i].Traffic += up + down + if _, ok := c.traffic[c.userList[i].Uuid]; ok { + c.traffic[c.userList[i].Uuid] += up + down + } else { + c.traffic[c.userList[i].Uuid] = up + down + } } userTraffic = append(userTraffic, panel.UserTraffic{ UID: (c.userList)[i].Id, @@ -26,11 +31,11 @@ func (c *Controller) reportUserTrafficTask() (err error) { err = c.apiClient.ReportUserTraffic(userTraffic) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Info("Report user traffic failed") } else { - log.WithField("tag", c.Tag).Infof("Report %d online users", len(userTraffic)) + log.WithField("tag", c.tag).Infof("Report %d online users", len(userTraffic)) } } userTraffic = nil