From aeb8554b13af6d6eed60bc5d033b6ce2c847b88d Mon Sep 17 00:00:00 2001 From: Yuzuki616 Date: Sat, 22 Jul 2023 02:38:07 +0800 Subject: [PATCH] re-support dynamic speed limit --- api/panel/node.go | 4 +-- api/panel/panel.go | 3 +- api/panel/user.go | 18 ++++++---- core/hy/server.go | 24 +++---------- core/xray/xray.go | 5 +-- limiter/limiter.go | 28 ++++++++------- node/controller.go | 32 +++++++++-------- node/task.go | 88 ++++++++++++++++++++++++++++++---------------- node/user.go | 17 +++++---- 9 files changed, 124 insertions(+), 95 deletions(-) diff --git a/api/panel/node.go b/api/panel/node.go index d902060..4661b06 100644 --- a/api/panel/node.go +++ b/api/panel/node.go @@ -100,7 +100,7 @@ func (c *Client) GetNodeInfo() (node *NodeInfo, err error) { const path = "/api/v1/server/UniProxy/config" r, err := c.client. R(). - SetHeader("If-None-Match", c.etag). + SetHeader("If-None-Match", c.nodeEtag). Get(path) if err = c.checkResponse(r, path, err); err != nil { return @@ -227,7 +227,7 @@ func (c *Client) GetNodeInfo() (node *NodeInfo, err error) { node.UpMbps = rsp.UpMbps node.HyObfs = rsp.Obfs } - c.etag = r.Header().Get("ETag") + c.nodeEtag = r.Header().Get("ETag") return } diff --git a/api/panel/panel.go b/api/panel/panel.go index c1acaa2..29aca2b 100644 --- a/api/panel/panel.go +++ b/api/panel/panel.go @@ -23,7 +23,8 @@ type Client struct { NodeType string NodeId int LocalRuleList []*regexp.Regexp - etag string + nodeEtag string + userEtag string } func New(c *conf.ApiConfig) (*Client, error) { diff --git a/api/panel/user.go b/api/panel/user.go index fbb4992..8357a7d 100644 --- a/api/panel/user.go +++ b/api/panel/user.go @@ -2,6 +2,7 @@ package panel import ( "fmt" + "github.com/goccy/go-json" ) @@ -14,7 +15,6 @@ type UserInfo struct { Id int `json:"id"` Uuid string `json:"uuid"` SpeedLimit int `json:"speed_limit"` - Traffic int64 `json:"-"` } type UserListBody struct { @@ -25,17 +25,23 @@ type UserListBody struct { // GetUserList will pull user form sspanel func (c *Client) GetUserList() (UserList []UserInfo, err error) { const path = "/api/v1/server/UniProxy/user" - res, err := c.client.R(). + r, err := c.client.R(). + SetHeader("If-None-Match", c.userEtag). Get(path) - err = c.checkResponse(res, path, err) + err = c.checkResponse(r, path, err) if err != nil { return nil, err } + err = c.checkResponse(r, path, err) + if r.StatusCode() == 304 { + return nil, nil + } var userList *UserListBody - err = json.Unmarshal(res.Body(), &userList) + err = json.Unmarshal(r.Body(), &userList) if err != nil { return nil, fmt.Errorf("unmarshal userlist error: %s", err) } + c.userEtag = r.Header().Get("ETag") return userList.Users, nil } @@ -52,11 +58,11 @@ func (c *Client) ReportUserTraffic(userTraffic []UserTraffic) error { data[userTraffic[i].UID] = []int64{userTraffic[i].Upload, userTraffic[i].Download} } const path = "/api/v1/server/UniProxy/push" - res, err := c.client.R(). + r, err := c.client.R(). SetBody(data). ForceContentType("application/json"). Post(path) - err = c.checkResponse(res, path, err) + err = c.checkResponse(r, path, err) if err != nil { return err } diff --git a/core/hy/server.go b/core/hy/server.go index 6e34831..7f919c4 100644 --- a/core/hy/server.go +++ b/core/hy/server.go @@ -3,16 +3,16 @@ package hy import ( "crypto/tls" "fmt" - "github.com/Yuzuki616/V2bX/api/panel" - "github.com/Yuzuki616/V2bX/conf" - "github.com/Yuzuki616/V2bX/limiter" - "github.com/apernet/hysteria/core/sockopt" "io" "net" "sync" "sync/atomic" "time" + "github.com/Yuzuki616/V2bX/api/panel" + "github.com/Yuzuki616/V2bX/conf" + "github.com/Yuzuki616/V2bX/limiter" + "github.com/apernet/hysteria/core/sockopt" "github.com/quic-go/quic-go" "github.com/apernet/hysteria/core/acl" @@ -121,22 +121,6 @@ func (s *Server) runServer(node *panel.NodeInfo, c *conf.ControllerConfig) error } // ACL var aclEngine *acl.Engine - /*if len(config.ACL) > 0 { - aclEngine, err = acl.LoadFromFile(config.ACL, func(addr string) (*net.IPAddr, error) { - ipAddr, _, err := transport.DefaultServerTransport.ResolveIPAddr(addr) - return ipAddr, err - }, - func() (*geoip2.Reader, error) { - return loadMMDBReader(config.MMDB) - }) - if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - "file": config.ACL, - }).Fatal("Failed to parse ACL") - } - aclEngine.DefaultAction = acl.ActionDirect - }*/ // Prometheus s.counter = NewUserTrafficCounter() // Packet conn diff --git a/core/xray/xray.go b/core/xray/xray.go index d718094..b11baa2 100644 --- a/core/xray/xray.go +++ b/core/xray/xray.go @@ -1,6 +1,9 @@ package xray import ( + "os" + "sync" + "github.com/Yuzuki616/V2bX/conf" vCore "github.com/Yuzuki616/V2bX/core" "github.com/Yuzuki616/V2bX/core/xray/app/dispatcher" @@ -16,8 +19,6 @@ import ( "github.com/xtls/xray-core/features/routing" statsFeature "github.com/xtls/xray-core/features/stats" coreConf "github.com/xtls/xray-core/infra/conf" - "os" - "sync" ) func init() { diff --git a/limiter/limiter.go b/limiter/limiter.go index f86e92d..60973b0 100644 --- a/limiter/limiter.go +++ b/limiter/limiter.go @@ -2,7 +2,6 @@ package limiter import ( "errors" - "fmt" "regexp" "sync" "time" @@ -60,7 +59,6 @@ func AddLimiter(tag string, l *conf.LimitConfig, users []panel.UserInfo) *Limite userLimit := &UserLimitInfo{ UID: users[i].Id, SpeedLimit: users[i].SpeedLimit, - ExpireTime: 0, } info.UserLimitInfo.Store(format.UserTag(tag, users[i].Uuid), userLimit) } @@ -81,11 +79,13 @@ func GetLimiter(tag string) (info *Limiter, err error) { return } -func UpdateLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error { - l, err := GetLimiter(tag) - if err != nil { - return fmt.Errorf("get limit error: %s", err) - } +func DeleteLimiter(tag string) { + limitLock.Lock() + delete(limiter, tag) + limitLock.Unlock() +} + +func (l *Limiter) UpdateUser(tag string, added []panel.UserInfo, deleted []panel.UserInfo) { for i := range deleted { l.UserLimitInfo.Delete(format.UserTag(tag, deleted[i].Uuid)) } @@ -99,13 +99,17 @@ func UpdateLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) l.UserLimitInfo.Store(format.UserTag(tag, added[i].Uuid), userLimit) } } - return nil } -func DeleteLimiter(tag string) { - limitLock.Lock() - delete(limiter, tag) - limitLock.Unlock() +func (l *Limiter) UpdateDynamicSpeedLimit(tag, uuid string, limit int, expire time.Time) error { + if v, ok := l.UserLimitInfo.Load(format.UserTag(tag, uuid)); ok { + info := v.(*UserLimitInfo) + info.DynamicSpeedLimit = limit + info.ExpireTime = expire.Unix() + } else { + return errors.New("not found") + } + return nil } func (l *Limiter) CheckLimit(email string, ip string, isTcp bool) (Bucket *ratelimit.Bucket, Reject bool) { diff --git a/node/controller.go b/node/controller.go index f4e4853..4f389cd 100644 --- a/node/controller.go +++ b/node/controller.go @@ -16,8 +16,9 @@ import ( type Controller struct { server vCore.Core apiClient *panel.Client - nodeInfo *panel.NodeInfo - Tag string + tag string + limiter *limiter.Limiter + traffic map[string]int64 userList []panel.UserInfo ipRecorder iprecoder.IpRecorder nodeInfoMonitorPeriodic *task.Task @@ -42,7 +43,7 @@ func NewController(server vCore.Core, api *panel.Client, config *conf.Controller func (c *Controller) Start() error { // First fetch Node Info var err error - c.nodeInfo, err = c.apiClient.GetNodeInfo() + node, err := c.apiClient.GetNodeInfo() if err != nil { return fmt.Errorf("get node info error: %s", err) } @@ -54,42 +55,43 @@ func (c *Controller) Start() error { if len(c.userList) == 0 { return errors.New("add users error: not have any user") } - c.Tag = c.buildNodeTag() + c.tag = c.buildNodeTag(node) // add limiter - l := limiter.AddLimiter(c.Tag, &c.LimitConfig, c.userList) + l := limiter.AddLimiter(c.tag, &c.LimitConfig, c.userList) // add rule limiter - if err = l.UpdateRule(&c.nodeInfo.Rules); err != nil { + if err = l.UpdateRule(&node.Rules); err != nil { return fmt.Errorf("update rule error: %s", err) } - if c.nodeInfo.Tls || c.nodeInfo.Type == "hysteria" { + c.limiter = l + if node.Tls || node.Type == "hysteria" { err = c.requestCert() if err != nil { return fmt.Errorf("request cert error: %s", err) } } // Add new tag - err = c.server.AddNode(c.Tag, c.nodeInfo, c.ControllerConfig) + err = c.server.AddNode(c.tag, node, c.ControllerConfig) if err != nil { return fmt.Errorf("add new node error: %s", err) } added, err := c.server.AddUsers(&vCore.AddUsersParams{ - Tag: c.Tag, + Tag: c.tag, Config: c.ControllerConfig, UserInfo: c.userList, - NodeInfo: c.nodeInfo, + NodeInfo: node, }) if err != nil { return fmt.Errorf("add users error: %s", err) } - log.WithField("tag", c.Tag).Infof("Added %d new users", added) - c.initTask() + log.WithField("tag", c.tag).Infof("Added %d new users", added) + c.startTasks(node) return nil } // Close implement the Close() function of the service interface func (c *Controller) Close() error { - limiter.DeleteLimiter(c.Tag) + limiter.DeleteLimiter(c.tag) if c.nodeInfoMonitorPeriodic != nil { c.nodeInfoMonitorPeriodic.Close() } @@ -108,6 +110,6 @@ func (c *Controller) Close() error { return nil } -func (c *Controller) buildNodeTag() string { - return fmt.Sprintf("%s-%s-%d", c.apiClient.APIHost, c.nodeInfo.Type, c.nodeInfo.Id) +func (c *Controller) buildNodeTag(node *panel.NodeInfo) string { + return fmt.Sprintf("%s-%s-%d", c.apiClient.APIHost, node.Type, node.Id) } diff --git a/node/task.go b/node/task.go index 77f1985..441affb 100644 --- a/node/task.go +++ b/node/task.go @@ -3,29 +3,30 @@ package node import ( "time" + "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/common/task" vCore "github.com/Yuzuki616/V2bX/core" "github.com/Yuzuki616/V2bX/limiter" log "github.com/sirupsen/logrus" ) -func (c *Controller) initTask() { +func (c *Controller) startTasks(node *panel.NodeInfo) { // fetch node info task c.nodeInfoMonitorPeriodic = &task.Task{ - Interval: c.nodeInfo.PullInterval, + Interval: node.PullInterval, Execute: c.nodeInfoMonitor, } // fetch user list task c.userReportPeriodic = &task.Task{ - Interval: c.nodeInfo.PushInterval, + Interval: node.PushInterval, Execute: c.reportUserTrafficTask, } - log.WithField("tag", c.Tag).Info("Start monitor node status") + log.WithField("tag", c.tag).Info("Start monitor node status") // delay to start nodeInfoMonitor _ = c.nodeInfoMonitorPeriodic.Start(false) - log.WithField("tag", c.Tag).Info("Start report node status") + log.WithField("tag", c.tag).Info("Start report node status") _ = c.userReportPeriodic.Start(false) - if c.nodeInfo.Tls { + if node.Tls { switch c.CertConfig.CertMode { case "reality", "none", "": default: @@ -33,11 +34,18 @@ func (c *Controller) initTask() { Interval: time.Hour * 24, Execute: c.reportUserTrafficTask, } - log.WithField("tag", c.Tag).Info("Start renew cert") + log.WithField("tag", c.tag).Info("Start renew cert") // delay to start renewCert _ = c.renewCertPeriodic.Start(true) } } + if c.LimitConfig.EnableDynamicSpeedLimit { + c.traffic = make(map[string]int64) + c.renewCertPeriodic = &task.Task{ + Interval: time.Duration(c.LimitConfig.DynamicSpeedLimitConfig.Periodic) * time.Minute, + Execute: c.reportUserTrafficTask, + } + } } func (c *Controller) nodeInfoMonitor() (err error) { @@ -45,7 +53,7 @@ func (c *Controller) nodeInfoMonitor() (err error) { newNodeInfo, err := c.apiClient.GetNodeInfo() if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Get node info failed") return nil @@ -54,57 +62,58 @@ func (c *Controller) nodeInfoMonitor() (err error) { newUserInfo, err := c.apiClient.GetUserList() if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Get user list failed") return nil } if newNodeInfo != nil { // nodeInfo changed + c.traffic = make(map[string]int64) // Remove old tag - log.WithField("tag", c.Tag).Info("Node changed, reload") - err = c.server.DelNode(c.Tag) + log.WithField("tag", c.tag).Info("Node changed, reload") + err = c.server.DelNode(c.tag) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Delete node failed") return nil } // Remove Old limiter - limiter.DeleteLimiter(c.Tag) + limiter.DeleteLimiter(c.tag) // Add new Limiter - c.Tag = c.buildNodeTag() - l := limiter.AddLimiter(c.Tag, &c.LimitConfig, newUserInfo) + c.tag = c.buildNodeTag(newNodeInfo) + l := limiter.AddLimiter(c.tag, &c.LimitConfig, newUserInfo) // check cert if newNodeInfo.Tls || newNodeInfo.Type == "hysteria" { err = c.requestCert() if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Request cert failed") return nil } } // add new node - err = c.server.AddNode(c.Tag, newNodeInfo, c.ControllerConfig) + err = c.server.AddNode(c.tag, newNodeInfo, c.ControllerConfig) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Add node failed") return nil } _, err = c.server.AddUsers(&vCore.AddUsersParams{ - Tag: c.Tag, + Tag: c.tag, Config: c.ControllerConfig, UserInfo: newUserInfo, NodeInfo: newNodeInfo, }) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Add users failed") return nil @@ -112,11 +121,12 @@ func (c *Controller) nodeInfoMonitor() (err error) { err = l.UpdateRule(&newNodeInfo.Rules) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Update Rule failed") return nil } + c.limiter = l // Check interval if c.nodeInfoMonitorPeriodic.Interval != newNodeInfo.PullInterval && newNodeInfo.PullInterval != 0 { @@ -130,9 +140,8 @@ func (c *Controller) nodeInfoMonitor() (err error) { c.userReportPeriodic.Close() _ = c.userReportPeriodic.Start(false) } - c.nodeInfo = newNodeInfo c.userList = newUserInfo - log.WithField("tag", c.Tag).Infof("Added %d new users", len(newUserInfo)) + log.WithField("tag", c.tag).Infof("Added %d new users", len(newUserInfo)) // exit return nil } @@ -141,10 +150,10 @@ func (c *Controller) nodeInfoMonitor() (err error) { deleted, added := compareUserList(c.userList, newUserInfo) if len(deleted) > 0 { // have deleted users - err = c.server.DelUsers(deleted, c.Tag) + err = c.server.DelUsers(deleted, c.tag) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Delete users failed") return nil @@ -153,14 +162,13 @@ func (c *Controller) nodeInfoMonitor() (err error) { if len(added) > 0 { // have added users _, err = c.server.AddUsers(&vCore.AddUsersParams{ - Tag: c.Tag, + Tag: c.tag, Config: c.ControllerConfig, UserInfo: added, - NodeInfo: c.nodeInfo, }) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("Add users failed") return nil @@ -168,19 +176,37 @@ func (c *Controller) nodeInfoMonitor() (err error) { } if len(added) > 0 || len(deleted) > 0 { // update Limiter - err = limiter.UpdateLimiter(c.Tag, added, deleted) + c.limiter.UpdateUser(c.tag, added, deleted) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Error("limiter users failed") return nil } + // clear traffic record + if c.LimitConfig.EnableDynamicSpeedLimit { + for i := range deleted { + delete(c.traffic, deleted[i].Uuid) + } + } } c.userList = newUserInfo if len(added)+len(deleted) != 0 { - log.WithField("tag", c.Tag). + log.WithField("tag", c.tag). Infof("%d user deleted, %d user added", len(deleted), len(added)) } return nil } + +func (c *Controller) SpeedChecker() { + for u, t := range c.traffic { + if t >= c.LimitConfig.DynamicSpeedLimitConfig.Traffic { + err := c.limiter.UpdateDynamicSpeedLimit(c.tag, u, + c.LimitConfig.DynamicSpeedLimitConfig.SpeedLimit, + time.Now().Add(time.Duration(c.LimitConfig.DynamicSpeedLimitConfig.ExpireTime)*time.Minute)) + log.WithField("err", err).Error("Update dynamic speed limit failed") + delete(c.traffic, u) + } + } +} diff --git a/node/user.go b/node/user.go index 91aa5df..be17479 100644 --- a/node/user.go +++ b/node/user.go @@ -1,20 +1,25 @@ package node import ( - "github.com/Yuzuki616/V2bX/api/panel" - log "github.com/sirupsen/logrus" "runtime" "strconv" + + "github.com/Yuzuki616/V2bX/api/panel" + log "github.com/sirupsen/logrus" ) func (c *Controller) reportUserTrafficTask() (err error) { // Get User traffic userTraffic := make([]panel.UserTraffic, 0) for i := range c.userList { - up, down := c.server.GetUserTraffic(c.Tag, c.userList[i].Uuid, true) + up, down := c.server.GetUserTraffic(c.tag, c.userList[i].Uuid, true) if up > 0 || down > 0 { if c.LimitConfig.EnableDynamicSpeedLimit { - c.userList[i].Traffic += up + down + if _, ok := c.traffic[c.userList[i].Uuid]; ok { + c.traffic[c.userList[i].Uuid] += up + down + } else { + c.traffic[c.userList[i].Uuid] = up + down + } } userTraffic = append(userTraffic, panel.UserTraffic{ UID: (c.userList)[i].Id, @@ -26,11 +31,11 @@ func (c *Controller) reportUserTrafficTask() (err error) { err = c.apiClient.ReportUserTraffic(userTraffic) if err != nil { log.WithFields(log.Fields{ - "tag": c.Tag, + "tag": c.tag, "err": err, }).Info("Report user traffic failed") } else { - log.WithField("tag", c.Tag).Infof("Report %d online users", len(userTraffic)) + log.WithField("tag", c.tag).Infof("Report %d online users", len(userTraffic)) } } userTraffic = nil