diff --git a/api/panel/api.go b/api/panel/api.go deleted file mode 100644 index 64251ff..0000000 --- a/api/panel/api.go +++ /dev/null @@ -1,78 +0,0 @@ -// Package api contains all the api used by XrayR -// To implement an api , one needs to implement the interface below. - -package panel - -import ( - "github.com/Yuzuki616/V2bX/conf" - "github.com/go-resty/resty/v2" - "log" - "strconv" - "sync" - "time" -) - -// Panel is the interface for different panel's api. - -type ClientInfo struct { - APIHost string - NodeID int - Key string - NodeType string -} - -type Client struct { - client *resty.Client - APIHost string - NodeID int - Key string - NodeType string - //EnableSS2022 bool - EnableVless bool - EnableXTLS bool - SpeedLimit float64 - DeviceLimit int - LocalRuleList []DetectRule - RemoteRuleCache *[]Rule - access sync.Mutex - NodeInfoRspMd5 [16]byte - NodeRuleRspMd5 [16]byte -} - -func New(apiConfig *conf.ApiConfig) Panel { - client := resty.New() - client.SetRetryCount(3) - if apiConfig.Timeout > 0 { - client.SetTimeout(time.Duration(apiConfig.Timeout) * time.Second) - } else { - client.SetTimeout(5 * time.Second) - } - client.OnError(func(req *resty.Request, err error) { - if v, ok := err.(*resty.ResponseError); ok { - // v.Response contains the last response from the server - // v.Err contains the original error - log.Print(v.Err) - } - }) - client.SetBaseURL(apiConfig.APIHost) - // Create Key for each requests - client.SetQueryParams(map[string]string{ - "node_id": strconv.Itoa(apiConfig.NodeID), - "token": apiConfig.Key, - }) - // Read local rule list - localRuleList := readLocalRuleList(apiConfig.RuleListPath) - return &Client{ - client: client, - NodeID: apiConfig.NodeID, - Key: apiConfig.Key, - APIHost: apiConfig.APIHost, - NodeType: apiConfig.NodeType, - //EnableSS2022: apiConfig.EnableSS2022, - EnableVless: apiConfig.EnableVless, - EnableXTLS: apiConfig.EnableXTLS, - SpeedLimit: apiConfig.SpeedLimit, - DeviceLimit: apiConfig.DeviceLimit, - LocalRuleList: localRuleList, - } -} diff --git a/api/panel/interface.go b/api/panel/interface.go new file mode 100644 index 0000000..d6acb78 --- /dev/null +++ b/api/panel/interface.go @@ -0,0 +1,10 @@ +package panel + +type Panel interface { + GetNodeInfo() (nodeInfo *NodeInfo, err error) + GetUserList() (userList []UserInfo, err error) + ReportUserTraffic(userTraffic []UserTraffic) (err error) + Describe() ClientInfo + GetNodeRule() (ruleList []DetectRule, protocolList []string, err error) + Debug() +} diff --git a/api/panel/panel.go b/api/panel/panel.go index d6acb78..072409a 100644 --- a/api/panel/panel.go +++ b/api/panel/panel.go @@ -1,10 +1,75 @@ package panel -type Panel interface { - GetNodeInfo() (nodeInfo *NodeInfo, err error) - GetUserList() (userList []UserInfo, err error) - ReportUserTraffic(userTraffic []UserTraffic) (err error) - Describe() ClientInfo - GetNodeRule() (ruleList []DetectRule, protocolList []string, err error) - Debug() +import ( + "github.com/Yuzuki616/V2bX/conf" + "github.com/go-resty/resty/v2" + "log" + "strconv" + "sync" + "time" +) + +// Panel is the interface for different panel's api. + +type ClientInfo struct { + APIHost string + NodeID int + Key string + NodeType string +} + +type Client struct { + client *resty.Client + APIHost string + NodeID int + Key string + NodeType string + //EnableSS2022 bool + EnableVless bool + EnableXTLS bool + SpeedLimit float64 + DeviceLimit int + LocalRuleList []DetectRule + RemoteRuleCache *[]Rule + access sync.Mutex + NodeInfoRspMd5 [16]byte + NodeRuleRspMd5 [16]byte +} + +func New(apiConfig *conf.ApiConfig) Panel { + client := resty.New() + client.SetRetryCount(3) + if apiConfig.Timeout > 0 { + client.SetTimeout(time.Duration(apiConfig.Timeout) * time.Second) + } else { + client.SetTimeout(5 * time.Second) + } + client.OnError(func(req *resty.Request, err error) { + if v, ok := err.(*resty.ResponseError); ok { + // v.Response contains the last response from the server + // v.Err contains the original error + log.Print(v.Err) + } + }) + client.SetBaseURL(apiConfig.APIHost) + // Create Key for each requests + client.SetQueryParams(map[string]string{ + "node_id": strconv.Itoa(apiConfig.NodeID), + "token": apiConfig.Key, + }) + // Read local rule list + localRuleList := readLocalRuleList(apiConfig.RuleListPath) + return &Client{ + client: client, + NodeID: apiConfig.NodeID, + Key: apiConfig.Key, + APIHost: apiConfig.APIHost, + NodeType: apiConfig.NodeType, + //EnableSS2022: apiConfig.EnableSS2022, + EnableVless: apiConfig.EnableVless, + EnableXTLS: apiConfig.EnableXTLS, + SpeedLimit: apiConfig.SpeedLimit, + DeviceLimit: apiConfig.DeviceLimit, + LocalRuleList: localRuleList, + } } diff --git a/api/panel/user.go b/api/panel/user.go index e2c5075..ac25089 100644 --- a/api/panel/user.go +++ b/api/panel/user.go @@ -23,6 +23,7 @@ type UserInfo struct { /*DeviceLimit int `json:"device_limit"` SpeedLimit uint64 `json:"speed_limit"`*/ UID int `json:"id"` + Traffic int64 `json:"-"` Port int `json:"port"` Cipher string `json:"cipher"` Secret string `json:"secret"` diff --git a/conf/node.go b/conf/node.go index d78bc28..3348e2b 100644 --- a/conf/node.go +++ b/conf/node.go @@ -27,22 +27,31 @@ type IpReportConfig struct { EnableIpSync bool `mapstructure:"EnableIpSync"` } +type DynamicSpeedLimitConfig struct { + Periodic int `mapstructure:"Periodic"` + Traffic int64 `mapstructure:"Traffic"` + SpeedLimit uint64 `mapstructure:"SpeedLimit"` + ExpireTime int `mapstructure:"ExpireTime"` +} + type ControllerConfig struct { - ListenIP string `mapstructure:"ListenIP"` - SendIP string `mapstructure:"SendIP"` - UpdatePeriodic int `mapstructure:"UpdatePeriodic"` - EnableDNS bool `mapstructure:"EnableDNS"` - DNSType string `mapstructure:"DNSType"` - DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"` - DisableGetRule bool `mapstructure:"DisableGetRule"` - EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"` - EnableFallback bool `mapstructure:"EnableFallback"` - DisableIVCheck bool `mapstructure:"DisableIVCheck"` - DisableSniffing bool `mapstructure:"DisableSniffing"` - FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"` - EnableIpRecorder bool `mapstructure:"EnableIpRecorder"` - IpRecorderConfig *IpReportConfig `mapstructure:"IpRecorderConfig"` - CertConfig *CertConfig `mapstructure:"CertConfig"` + ListenIP string `mapstructure:"ListenIP"` + SendIP string `mapstructure:"SendIP"` + UpdatePeriodic int `mapstructure:"UpdatePeriodic"` + EnableDNS bool `mapstructure:"EnableDNS"` + DNSType string `mapstructure:"DNSType"` + DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"` + DisableGetRule bool `mapstructure:"DisableGetRule"` + EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"` + EnableFallback bool `mapstructure:"EnableFallback"` + DisableIVCheck bool `mapstructure:"DisableIVCheck"` + DisableSniffing bool `mapstructure:"DisableSniffing"` + FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"` + EnableIpRecorder bool `mapstructure:"EnableIpRecorder"` + IpRecorderConfig *IpReportConfig `mapstructure:"IpRecorderConfig"` + EnableDynamicSpeedLimit bool `mapstructure:"EnableDynamicSpeedLimit"` + DynamicSpeedLimitConfig *DynamicSpeedLimitConfig `mapstructure:"DynamicSpeedLimitConfig"` + CertConfig *CertConfig `mapstructure:"CertConfig"` } type ApiConfig struct { diff --git a/core/app/dispatcher/limiter.go b/core/app/dispatcher/limiter.go index cc8edc4..d4d3f2d 100644 --- a/core/app/dispatcher/limiter.go +++ b/core/app/dispatcher/limiter.go @@ -14,6 +14,7 @@ import ( type UserInfo struct { UID int SpeedLimit uint64 + ExpireTime int64 DeviceLimit int } @@ -51,7 +52,7 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userLi (*userList)[i].DeviceLimit = nodeInfo.DeviceLimit }*/ userMap.Store(fmt.Sprintf("%s|%s|%d", tag, (userList)[i].V2rayUser.Email, (userList)[i].UID), - UserInfo{ + &UserInfo{ UID: (userList)[i].UID, SpeedLimit: nodeInfo.SpeedLimit, DeviceLimit: nodeInfo.DeviceLimit, @@ -62,19 +63,23 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userLi return nil } -func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, updatedUserList []panel.UserInfo) error { +func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, added, deleted []panel.UserInfo) error { if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) // Update User info - for i := range updatedUserList { + for i := range added { inboundInfo.UserInfo.Store(fmt.Sprintf("%s|%s|%d", tag, - (updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID), UserInfo{ - UID: (updatedUserList)[i].UID, + (added)[i].V2rayUser.Email, (added)[i].UID), &UserInfo{ + UID: (added)[i].UID, SpeedLimit: nodeInfo.SpeedLimit, DeviceLimit: nodeInfo.DeviceLimit, }) + } + for i := range deleted { + inboundInfo.UserInfo.Delete(fmt.Sprintf("%s|%s|%d", tag, + (deleted)[i].V2rayUser.Email, (deleted)[i].UID)) inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag, - (updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID)) // Delete old limiter bucket + (deleted)[i].V2rayUser.Email, (deleted)[i].UID)) // Delete limiter bucket } } else { return fmt.Errorf("no such inbound in limiter: %s", tag) @@ -87,6 +92,22 @@ func (l *Limiter) DeleteInboundLimiter(tag string) error { return nil } +func (l *Limiter) UpdateUserSpeedLimit(tag string, userInfo *panel.UserInfo, limit uint64, expire int64) error { + if value, ok := l.InboundInfo.Load(tag); ok { + inboundInfo := value.(*InboundInfo) + if user, ok := inboundInfo.UserInfo.Load(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID)); ok { + user.(*UserInfo).SpeedLimit = limit + user.(*UserInfo).ExpireTime = time.Now().Add(time.Duration(expire) * time.Second).Unix() + inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID)) + } else { + return fmt.Errorf("no such user in limiter: %s", userInfo.GetUserEmail()) + } + return nil + } else { + return fmt.Errorf("no such inbound in limiter: %s", tag) + } +} + type UserIpList struct { Uid int `json:"Uid"` IpList []string `json:"Ips"` @@ -109,7 +130,7 @@ func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) { if len(ip) > 0 { if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok { onlineUser = append(onlineUser, UserIpList{ - Uid: u.(UserInfo).UID, + Uid: u.(*UserInfo).UID, IpList: ip, }) } @@ -172,9 +193,15 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) nodeLimit := inboundInfo.NodeSpeedLimit var userLimit uint64 = 0 var deviceLimit = 0 + expired := false if v, ok := inboundInfo.UserInfo.Load(email); ok { - u := v.(UserInfo) - userLimit = u.SpeedLimit + u := v.(*UserInfo) + if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 { + userLimit = 0 + expired = true + } else { + userLimit = u.SpeedLimit + } deviceLimit = u.DeviceLimit } ipMap := new(sync.Map) @@ -203,6 +230,10 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) if limit > 0 { limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok { + if expired { + inboundInfo.SpeedLimiter.Store(email, limiter) + return limiter, true, false + } bucket := v.(*ratelimit.Bucket) return bucket, true, false } else { diff --git a/core/inbound.go b/core/inbound.go index dfd9044..5987568 100644 --- a/core/inbound.go +++ b/core/inbound.go @@ -10,8 +10,7 @@ import ( ) func (p *Core) RemoveInbound(tag string) error { - err := p.ihm.RemoveHandler(context.Background(), tag) - return err + return p.ihm.RemoveHandler(context.Background(), tag) } func (p *Core) AddInbound(config *core.InboundHandlerConfig) error { @@ -30,8 +29,7 @@ func (p *Core) AddInbound(config *core.InboundHandlerConfig) error { } func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userList []panel.UserInfo) error { - err := p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, userList) - return err + return p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, userList) } func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) { @@ -42,12 +40,10 @@ func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) { return nil, fmt.Errorf("not found limiter") } -func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, updatedUserList []panel.UserInfo) error { - err := p.dispatcher.Limiter.UpdateInboundLimiter(tag, nodeInfo, updatedUserList) - return err +func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, added, deleted []panel.UserInfo) error { + return p.dispatcher.Limiter.UpdateInboundLimiter(tag, nodeInfo, added, deleted) } func (p *Core) DeleteInboundLimiter(tag string) error { - err := p.dispatcher.Limiter.DeleteInboundLimiter(tag) - return err + return p.dispatcher.Limiter.DeleteInboundLimiter(tag) } diff --git a/core/user.go b/core/user.go index f6364f1..01f68a8 100644 --- a/core/user.go +++ b/core/user.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "github.com/Yuzuki616/V2bX/api/panel" "github.com/Yuzuki616/V2bX/core/app/dispatcher" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/proxy" @@ -79,6 +80,10 @@ func (p *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) { return up, down } +func (p *Core) UpdateUserSpeedLimit(tag string, user *panel.UserInfo, speedLimit uint64, expire int64) error { + return p.dispatcher.Limiter.UpdateUserSpeedLimit(tag, user, speedLimit, expire) +} + func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) { return p.dispatcher.Limiter.ListOnlineUserIp(tag) } diff --git a/example/config.yml.example b/example/config.yml.example index 008f3e0..0158fc5 100644 --- a/example/config.yml.example +++ b/example/config.yml.example @@ -47,6 +47,12 @@ Nodes: Periodic: 60 # Report interval, sec. Timeout: 10 # Report timeout, sec. EnableIpSync: false # Enable online ip sync + EnableDynamicSpeedLimit: false # Enable dynamic speed limit + DynamicSpeedLimitConfig: + Periodic: 60 # Time to check the user traffic , sec. + Traffic: 0 # Traffic limit, MB + SpeedLimit: 0 # Speed limit, Mbps + ExpireTime: 0 # Time limit, sec. CertConfig: CertMode: dns # Option about how to get certificate: none, file, http, dns. Choose "none" will forcedly disable the tls config. CertDomain: "node1.test.com" # Domain to cert diff --git a/node/node.go b/node/node.go index 62c339d..7bcce38 100644 --- a/node/node.go +++ b/node/node.go @@ -19,16 +19,17 @@ import ( ) type Node struct { - server *core.Core - config *conf.ControllerConfig - clientInfo panel.ClientInfo - apiClient panel.Panel - nodeInfo *panel.NodeInfo - Tag string - userList []panel.UserInfo - nodeInfoMonitorPeriodic *task.Periodic - userReportPeriodic *task.Periodic - onlineIpReportPeriodic *task.Periodic + server *core.Core + config *conf.ControllerConfig + clientInfo panel.ClientInfo + apiClient panel.Panel + nodeInfo *panel.NodeInfo + Tag string + userList []panel.UserInfo + nodeInfoMonitorPeriodic *task.Periodic + userReportPeriodic *task.Periodic + onlineIpReportPeriodic *task.Periodic + DynamicSpeedLimitPeriodic *task.Periodic } // New return a Node service with default parameters. @@ -113,11 +114,22 @@ func (c *Node) Start() error { Execute: c.onlineIpReport, } go func() { - time.Sleep(time.Duration(c.config.UpdatePeriodic) * time.Second) + time.Sleep(time.Duration(c.config.IpRecorderConfig.Periodic) * time.Second) _ = c.onlineIpReportPeriodic.Start() }() log.Printf("[%s: %d] Start report online ip", c.nodeInfo.NodeType, c.nodeInfo.NodeId) } + if c.config.EnableDynamicSpeedLimit { + c.DynamicSpeedLimitPeriodic = &task.Periodic{ + Interval: time.Duration(c.config.DynamicSpeedLimitConfig.Periodic) * time.Second, + Execute: c.DynamicSpeedLimit, + } + go func() { + time.Sleep(time.Duration(c.config.DynamicSpeedLimitConfig.Periodic) * time.Second) + _ = c.DynamicSpeedLimitPeriodic.Start() + }() + log.Printf("[%s: %d] Start dynamic speed limit", c.nodeInfo.NodeType, c.nodeInfo.NodeId) + } runtime.GC() return nil } @@ -252,8 +264,10 @@ func (c *Node) nodeInfoMonitor() (err error) { if err != nil { log.Print(err) } + } + if len(added) > 0 || len(deleted) > 0 { // Update Limiter - if err := c.server.UpdateInboundLimiter(c.Tag, c.nodeInfo, added); err != nil { + if err := c.server.UpdateInboundLimiter(c.Tag, c.nodeInfo, added, deleted); err != nil { log.Print(err) } } @@ -365,6 +379,9 @@ func (c *Node) userInfoMonitor() (err error) { for i := range c.userList { up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), true) if up > 0 || down > 0 { + if c.config.EnableDynamicSpeedLimit { + c.userList[i].Traffic += up + down + } userTraffic = append(userTraffic, panel.UserTraffic{ UID: (c.userList)[i].UID, Upload: up, @@ -423,6 +440,25 @@ func (c *Node) onlineIpReport() (err error) { return nil } +func (c *Node) DynamicSpeedLimit() error { + if c.config.EnableDynamicSpeedLimit { + for i := range c.userList { + up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), false) + if c.userList[i].Traffic+down+up/1024/1024 > c.config.DynamicSpeedLimitConfig.Traffic { + err := c.server.UpdateUserSpeedLimit(c.Tag, + &c.userList[i], + c.config.DynamicSpeedLimitConfig.SpeedLimit, + time.Now().Add(time.Second*time.Duration(c.config.DynamicSpeedLimitConfig.ExpireTime)).Unix()) + if err != nil { + log.Print(err) + } + } + c.userList[i].Traffic = 0 + } + } + return nil +} + func (c *Node) buildNodeTag() string { return fmt.Sprintf("%s_%s_%d", c.nodeInfo.NodeType, c.config.ListenIP, c.nodeInfo.NodeId) }