add dynamic speed limit

fix old user limit info not clear
fix some wrong names
This commit is contained in:
yuzuki999 2022-09-07 23:02:02 +08:00
parent 52134c6e4e
commit 5fd09079e3
10 changed files with 211 additions and 130 deletions

View File

@ -1,78 +0,0 @@
// Package api contains all the api used by XrayR
// To implement an api , one needs to implement the interface below.
package panel
import (
"github.com/Yuzuki616/V2bX/conf"
"github.com/go-resty/resty/v2"
"log"
"strconv"
"sync"
"time"
)
// Panel is the interface for different panel's api.
type ClientInfo struct {
APIHost string
NodeID int
Key string
NodeType string
}
type Client struct {
client *resty.Client
APIHost string
NodeID int
Key string
NodeType string
//EnableSS2022 bool
EnableVless bool
EnableXTLS bool
SpeedLimit float64
DeviceLimit int
LocalRuleList []DetectRule
RemoteRuleCache *[]Rule
access sync.Mutex
NodeInfoRspMd5 [16]byte
NodeRuleRspMd5 [16]byte
}
func New(apiConfig *conf.ApiConfig) Panel {
client := resty.New()
client.SetRetryCount(3)
if apiConfig.Timeout > 0 {
client.SetTimeout(time.Duration(apiConfig.Timeout) * time.Second)
} else {
client.SetTimeout(5 * time.Second)
}
client.OnError(func(req *resty.Request, err error) {
if v, ok := err.(*resty.ResponseError); ok {
// v.Response contains the last response from the server
// v.Err contains the original error
log.Print(v.Err)
}
})
client.SetBaseURL(apiConfig.APIHost)
// Create Key for each requests
client.SetQueryParams(map[string]string{
"node_id": strconv.Itoa(apiConfig.NodeID),
"token": apiConfig.Key,
})
// Read local rule list
localRuleList := readLocalRuleList(apiConfig.RuleListPath)
return &Client{
client: client,
NodeID: apiConfig.NodeID,
Key: apiConfig.Key,
APIHost: apiConfig.APIHost,
NodeType: apiConfig.NodeType,
//EnableSS2022: apiConfig.EnableSS2022,
EnableVless: apiConfig.EnableVless,
EnableXTLS: apiConfig.EnableXTLS,
SpeedLimit: apiConfig.SpeedLimit,
DeviceLimit: apiConfig.DeviceLimit,
LocalRuleList: localRuleList,
}
}

10
api/panel/interface.go Normal file
View File

@ -0,0 +1,10 @@
package panel
type Panel interface {
GetNodeInfo() (nodeInfo *NodeInfo, err error)
GetUserList() (userList []UserInfo, err error)
ReportUserTraffic(userTraffic []UserTraffic) (err error)
Describe() ClientInfo
GetNodeRule() (ruleList []DetectRule, protocolList []string, err error)
Debug()
}

View File

@ -1,10 +1,75 @@
package panel
type Panel interface {
GetNodeInfo() (nodeInfo *NodeInfo, err error)
GetUserList() (userList []UserInfo, err error)
ReportUserTraffic(userTraffic []UserTraffic) (err error)
Describe() ClientInfo
GetNodeRule() (ruleList []DetectRule, protocolList []string, err error)
Debug()
import (
"github.com/Yuzuki616/V2bX/conf"
"github.com/go-resty/resty/v2"
"log"
"strconv"
"sync"
"time"
)
// Panel is the interface for different panel's api.
type ClientInfo struct {
APIHost string
NodeID int
Key string
NodeType string
}
type Client struct {
client *resty.Client
APIHost string
NodeID int
Key string
NodeType string
//EnableSS2022 bool
EnableVless bool
EnableXTLS bool
SpeedLimit float64
DeviceLimit int
LocalRuleList []DetectRule
RemoteRuleCache *[]Rule
access sync.Mutex
NodeInfoRspMd5 [16]byte
NodeRuleRspMd5 [16]byte
}
func New(apiConfig *conf.ApiConfig) Panel {
client := resty.New()
client.SetRetryCount(3)
if apiConfig.Timeout > 0 {
client.SetTimeout(time.Duration(apiConfig.Timeout) * time.Second)
} else {
client.SetTimeout(5 * time.Second)
}
client.OnError(func(req *resty.Request, err error) {
if v, ok := err.(*resty.ResponseError); ok {
// v.Response contains the last response from the server
// v.Err contains the original error
log.Print(v.Err)
}
})
client.SetBaseURL(apiConfig.APIHost)
// Create Key for each requests
client.SetQueryParams(map[string]string{
"node_id": strconv.Itoa(apiConfig.NodeID),
"token": apiConfig.Key,
})
// Read local rule list
localRuleList := readLocalRuleList(apiConfig.RuleListPath)
return &Client{
client: client,
NodeID: apiConfig.NodeID,
Key: apiConfig.Key,
APIHost: apiConfig.APIHost,
NodeType: apiConfig.NodeType,
//EnableSS2022: apiConfig.EnableSS2022,
EnableVless: apiConfig.EnableVless,
EnableXTLS: apiConfig.EnableXTLS,
SpeedLimit: apiConfig.SpeedLimit,
DeviceLimit: apiConfig.DeviceLimit,
LocalRuleList: localRuleList,
}
}

View File

@ -23,6 +23,7 @@ type UserInfo struct {
/*DeviceLimit int `json:"device_limit"`
SpeedLimit uint64 `json:"speed_limit"`*/
UID int `json:"id"`
Traffic int64 `json:"-"`
Port int `json:"port"`
Cipher string `json:"cipher"`
Secret string `json:"secret"`

View File

@ -27,22 +27,31 @@ type IpReportConfig struct {
EnableIpSync bool `mapstructure:"EnableIpSync"`
}
type DynamicSpeedLimitConfig struct {
Periodic int `mapstructure:"Periodic"`
Traffic int64 `mapstructure:"Traffic"`
SpeedLimit uint64 `mapstructure:"SpeedLimit"`
ExpireTime int `mapstructure:"ExpireTime"`
}
type ControllerConfig struct {
ListenIP string `mapstructure:"ListenIP"`
SendIP string `mapstructure:"SendIP"`
UpdatePeriodic int `mapstructure:"UpdatePeriodic"`
EnableDNS bool `mapstructure:"EnableDNS"`
DNSType string `mapstructure:"DNSType"`
DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"`
DisableGetRule bool `mapstructure:"DisableGetRule"`
EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"`
EnableFallback bool `mapstructure:"EnableFallback"`
DisableIVCheck bool `mapstructure:"DisableIVCheck"`
DisableSniffing bool `mapstructure:"DisableSniffing"`
FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"`
EnableIpRecorder bool `mapstructure:"EnableIpRecorder"`
IpRecorderConfig *IpReportConfig `mapstructure:"IpRecorderConfig"`
CertConfig *CertConfig `mapstructure:"CertConfig"`
ListenIP string `mapstructure:"ListenIP"`
SendIP string `mapstructure:"SendIP"`
UpdatePeriodic int `mapstructure:"UpdatePeriodic"`
EnableDNS bool `mapstructure:"EnableDNS"`
DNSType string `mapstructure:"DNSType"`
DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"`
DisableGetRule bool `mapstructure:"DisableGetRule"`
EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"`
EnableFallback bool `mapstructure:"EnableFallback"`
DisableIVCheck bool `mapstructure:"DisableIVCheck"`
DisableSniffing bool `mapstructure:"DisableSniffing"`
FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"`
EnableIpRecorder bool `mapstructure:"EnableIpRecorder"`
IpRecorderConfig *IpReportConfig `mapstructure:"IpRecorderConfig"`
EnableDynamicSpeedLimit bool `mapstructure:"EnableDynamicSpeedLimit"`
DynamicSpeedLimitConfig *DynamicSpeedLimitConfig `mapstructure:"DynamicSpeedLimitConfig"`
CertConfig *CertConfig `mapstructure:"CertConfig"`
}
type ApiConfig struct {

View File

@ -14,6 +14,7 @@ import (
type UserInfo struct {
UID int
SpeedLimit uint64
ExpireTime int64
DeviceLimit int
}
@ -51,7 +52,7 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userLi
(*userList)[i].DeviceLimit = nodeInfo.DeviceLimit
}*/
userMap.Store(fmt.Sprintf("%s|%s|%d", tag, (userList)[i].V2rayUser.Email, (userList)[i].UID),
UserInfo{
&UserInfo{
UID: (userList)[i].UID,
SpeedLimit: nodeInfo.SpeedLimit,
DeviceLimit: nodeInfo.DeviceLimit,
@ -62,19 +63,23 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userLi
return nil
}
func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, updatedUserList []panel.UserInfo) error {
func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, added, deleted []panel.UserInfo) error {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
// Update User info
for i := range updatedUserList {
for i := range added {
inboundInfo.UserInfo.Store(fmt.Sprintf("%s|%s|%d", tag,
(updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID), UserInfo{
UID: (updatedUserList)[i].UID,
(added)[i].V2rayUser.Email, (added)[i].UID), &UserInfo{
UID: (added)[i].UID,
SpeedLimit: nodeInfo.SpeedLimit,
DeviceLimit: nodeInfo.DeviceLimit,
})
}
for i := range deleted {
inboundInfo.UserInfo.Delete(fmt.Sprintf("%s|%s|%d", tag,
(deleted)[i].V2rayUser.Email, (deleted)[i].UID))
inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag,
(updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID)) // Delete old limiter bucket
(deleted)[i].V2rayUser.Email, (deleted)[i].UID)) // Delete limiter bucket
}
} else {
return fmt.Errorf("no such inbound in limiter: %s", tag)
@ -87,6 +92,22 @@ func (l *Limiter) DeleteInboundLimiter(tag string) error {
return nil
}
func (l *Limiter) UpdateUserSpeedLimit(tag string, userInfo *panel.UserInfo, limit uint64, expire int64) error {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
if user, ok := inboundInfo.UserInfo.Load(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID)); ok {
user.(*UserInfo).SpeedLimit = limit
user.(*UserInfo).ExpireTime = time.Now().Add(time.Duration(expire) * time.Second).Unix()
inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID))
} else {
return fmt.Errorf("no such user in limiter: %s", userInfo.GetUserEmail())
}
return nil
} else {
return fmt.Errorf("no such inbound in limiter: %s", tag)
}
}
type UserIpList struct {
Uid int `json:"Uid"`
IpList []string `json:"Ips"`
@ -109,7 +130,7 @@ func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) {
if len(ip) > 0 {
if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok {
onlineUser = append(onlineUser, UserIpList{
Uid: u.(UserInfo).UID,
Uid: u.(*UserInfo).UID,
IpList: ip,
})
}
@ -172,9 +193,15 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string)
nodeLimit := inboundInfo.NodeSpeedLimit
var userLimit uint64 = 0
var deviceLimit = 0
expired := false
if v, ok := inboundInfo.UserInfo.Load(email); ok {
u := v.(UserInfo)
userLimit = u.SpeedLimit
u := v.(*UserInfo)
if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 {
userLimit = 0
expired = true
} else {
userLimit = u.SpeedLimit
}
deviceLimit = u.DeviceLimit
}
ipMap := new(sync.Map)
@ -203,6 +230,10 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string)
if limit > 0 {
limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s
if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok {
if expired {
inboundInfo.SpeedLimiter.Store(email, limiter)
return limiter, true, false
}
bucket := v.(*ratelimit.Bucket)
return bucket, true, false
} else {

View File

@ -10,8 +10,7 @@ import (
)
func (p *Core) RemoveInbound(tag string) error {
err := p.ihm.RemoveHandler(context.Background(), tag)
return err
return p.ihm.RemoveHandler(context.Background(), tag)
}
func (p *Core) AddInbound(config *core.InboundHandlerConfig) error {
@ -30,8 +29,7 @@ func (p *Core) AddInbound(config *core.InboundHandlerConfig) error {
}
func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userList []panel.UserInfo) error {
err := p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, userList)
return err
return p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, userList)
}
func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) {
@ -42,12 +40,10 @@ func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) {
return nil, fmt.Errorf("not found limiter")
}
func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, updatedUserList []panel.UserInfo) error {
err := p.dispatcher.Limiter.UpdateInboundLimiter(tag, nodeInfo, updatedUserList)
return err
func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, added, deleted []panel.UserInfo) error {
return p.dispatcher.Limiter.UpdateInboundLimiter(tag, nodeInfo, added, deleted)
}
func (p *Core) DeleteInboundLimiter(tag string) error {
err := p.dispatcher.Limiter.DeleteInboundLimiter(tag)
return err
return p.dispatcher.Limiter.DeleteInboundLimiter(tag)
}

View File

@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"github.com/Yuzuki616/V2bX/api/panel"
"github.com/Yuzuki616/V2bX/core/app/dispatcher"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/proxy"
@ -79,6 +80,10 @@ func (p *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) {
return up, down
}
func (p *Core) UpdateUserSpeedLimit(tag string, user *panel.UserInfo, speedLimit uint64, expire int64) error {
return p.dispatcher.Limiter.UpdateUserSpeedLimit(tag, user, speedLimit, expire)
}
func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) {
return p.dispatcher.Limiter.ListOnlineUserIp(tag)
}

View File

@ -47,6 +47,12 @@ Nodes:
Periodic: 60 # Report interval, sec.
Timeout: 10 # Report timeout, sec.
EnableIpSync: false # Enable online ip sync
EnableDynamicSpeedLimit: false # Enable dynamic speed limit
DynamicSpeedLimitConfig:
Periodic: 60 # Time to check the user traffic , sec.
Traffic: 0 # Traffic limit, MB
SpeedLimit: 0 # Speed limit, Mbps
ExpireTime: 0 # Time limit, sec.
CertConfig:
CertMode: dns # Option about how to get certificate: none, file, http, dns. Choose "none" will forcedly disable the tls config.
CertDomain: "node1.test.com" # Domain to cert

View File

@ -19,16 +19,17 @@ import (
)
type Node struct {
server *core.Core
config *conf.ControllerConfig
clientInfo panel.ClientInfo
apiClient panel.Panel
nodeInfo *panel.NodeInfo
Tag string
userList []panel.UserInfo
nodeInfoMonitorPeriodic *task.Periodic
userReportPeriodic *task.Periodic
onlineIpReportPeriodic *task.Periodic
server *core.Core
config *conf.ControllerConfig
clientInfo panel.ClientInfo
apiClient panel.Panel
nodeInfo *panel.NodeInfo
Tag string
userList []panel.UserInfo
nodeInfoMonitorPeriodic *task.Periodic
userReportPeriodic *task.Periodic
onlineIpReportPeriodic *task.Periodic
DynamicSpeedLimitPeriodic *task.Periodic
}
// New return a Node service with default parameters.
@ -113,11 +114,22 @@ func (c *Node) Start() error {
Execute: c.onlineIpReport,
}
go func() {
time.Sleep(time.Duration(c.config.UpdatePeriodic) * time.Second)
time.Sleep(time.Duration(c.config.IpRecorderConfig.Periodic) * time.Second)
_ = c.onlineIpReportPeriodic.Start()
}()
log.Printf("[%s: %d] Start report online ip", c.nodeInfo.NodeType, c.nodeInfo.NodeId)
}
if c.config.EnableDynamicSpeedLimit {
c.DynamicSpeedLimitPeriodic = &task.Periodic{
Interval: time.Duration(c.config.DynamicSpeedLimitConfig.Periodic) * time.Second,
Execute: c.DynamicSpeedLimit,
}
go func() {
time.Sleep(time.Duration(c.config.DynamicSpeedLimitConfig.Periodic) * time.Second)
_ = c.DynamicSpeedLimitPeriodic.Start()
}()
log.Printf("[%s: %d] Start dynamic speed limit", c.nodeInfo.NodeType, c.nodeInfo.NodeId)
}
runtime.GC()
return nil
}
@ -252,8 +264,10 @@ func (c *Node) nodeInfoMonitor() (err error) {
if err != nil {
log.Print(err)
}
}
if len(added) > 0 || len(deleted) > 0 {
// Update Limiter
if err := c.server.UpdateInboundLimiter(c.Tag, c.nodeInfo, added); err != nil {
if err := c.server.UpdateInboundLimiter(c.Tag, c.nodeInfo, added, deleted); err != nil {
log.Print(err)
}
}
@ -365,6 +379,9 @@ func (c *Node) userInfoMonitor() (err error) {
for i := range c.userList {
up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), true)
if up > 0 || down > 0 {
if c.config.EnableDynamicSpeedLimit {
c.userList[i].Traffic += up + down
}
userTraffic = append(userTraffic, panel.UserTraffic{
UID: (c.userList)[i].UID,
Upload: up,
@ -423,6 +440,25 @@ func (c *Node) onlineIpReport() (err error) {
return nil
}
func (c *Node) DynamicSpeedLimit() error {
if c.config.EnableDynamicSpeedLimit {
for i := range c.userList {
up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), false)
if c.userList[i].Traffic+down+up/1024/1024 > c.config.DynamicSpeedLimitConfig.Traffic {
err := c.server.UpdateUserSpeedLimit(c.Tag,
&c.userList[i],
c.config.DynamicSpeedLimitConfig.SpeedLimit,
time.Now().Add(time.Second*time.Duration(c.config.DynamicSpeedLimitConfig.ExpireTime)).Unix())
if err != nil {
log.Print(err)
}
}
c.userList[i].Traffic = 0
}
}
return nil
}
func (c *Node) buildNodeTag() string {
return fmt.Sprintf("%s_%s_%d", c.nodeInfo.NodeType, c.config.ListenIP, c.nodeInfo.NodeId)
}