fix speed limiter not clear

This commit is contained in:
yuzuki999 2022-09-06 08:28:57 +08:00
parent e92a872738
commit cef8ff1b70
4 changed files with 34 additions and 29 deletions

View File

@ -228,7 +228,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn
if user != nil && len(user.Email) > 0 { if user != nil && len(user.Email) > 0 {
// Speed Limit and Device Limit // Speed Limit and Device Limit
bucket, ok, reject := d.Limiter.GetUserBucket(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String()) bucket, ok, reject := d.Limiter.CheckSpeedAndDeviceLimit(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String())
if reject { if reject {
newError("Devices reach the limit: ", user.Email).AtError().WriteToLog() newError("Devices reach the limit: ", user.Email).AtError().WriteToLog()
common.Close(outboundLink.Writer) common.Close(outboundLink.Writer)

View File

@ -21,7 +21,7 @@ type InboundInfo struct {
Tag string Tag string
NodeSpeedLimit uint64 NodeSpeedLimit uint64
UserInfo *sync.Map // Key: Uid value: UserInfo UserInfo *sync.Map // Key: Uid value: UserInfo
BucketHub *sync.Map // key: Uid, value: *ratelimit.Bucket SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket
UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool
} }
@ -39,7 +39,7 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userLi
inboundInfo := &InboundInfo{ inboundInfo := &InboundInfo{
Tag: tag, Tag: tag,
NodeSpeedLimit: nodeInfo.SpeedLimit, NodeSpeedLimit: nodeInfo.SpeedLimit,
BucketHub: new(sync.Map), SpeedLimiter: new(sync.Map),
UserOnlineIP: new(sync.Map), UserOnlineIP: new(sync.Map),
} }
userMap := new(sync.Map) userMap := new(sync.Map)
@ -73,7 +73,7 @@ func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, upd
SpeedLimit: nodeInfo.SpeedLimit, SpeedLimit: nodeInfo.SpeedLimit,
DeviceLimit: nodeInfo.DeviceLimit, DeviceLimit: nodeInfo.DeviceLimit,
}) })
inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag, inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag,
(updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID)) // Delete old limiter bucket (updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID)) // Delete old limiter bucket
} }
} else { } else {
@ -87,22 +87,15 @@ func (l *Limiter) DeleteInboundLimiter(tag string) error {
return nil return nil
} }
type UserIp struct { type UserIpList struct {
Uid int `json:"Uid"` Uid int `json:"Uid"`
IPs []string `json:"Ips"` IpList []string `json:"Ips"`
} }
func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) { func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) {
if value, ok := l.InboundInfo.Load(tag); ok { if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo) inboundInfo := value.(*InboundInfo)
// Clear Speed Limiter bucket for users who are not online onlineUser := make([]UserIpList, 0)
inboundInfo.BucketHub.Range(func(key, value interface{}) bool {
if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists {
inboundInfo.BucketHub.Delete(key.(string))
}
return true
})
onlineUser := make([]UserIp, 0)
var ipMap *sync.Map var ipMap *sync.Map
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
ipMap = value.(*sync.Map) ipMap = value.(*sync.Map)
@ -115,9 +108,9 @@ func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) {
}) })
if len(ip) > 0 { if len(ip) > 0 {
if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok { if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok {
onlineUser = append(onlineUser, UserIp{ onlineUser = append(onlineUser, UserIpList{
Uid: u.(UserInfo).UID, Uid: u.(UserInfo).UID,
IPs: ip, IpList: ip,
}) })
} }
} }
@ -132,7 +125,7 @@ func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) {
} }
} }
func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIp) { func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIpList) {
if v, ok := l.InboundInfo.Load(tag); ok { if v, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := v.(*InboundInfo) inboundInfo := v.(*InboundInfo)
//Clear old IP //Clear old IP
@ -143,17 +136,29 @@ func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIp) {
// Update User Online IP // Update User Online IP
for i := range userIpList { for i := range userIpList {
ipMap := new(sync.Map) ipMap := new(sync.Map)
for _, userIp := range (userIpList)[i].IPs { for _, userIp := range (userIpList)[i].IpList {
ipMap.Store(userIp, false) ipMap.Store(userIp, false)
} }
inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap) inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap)
} }
inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool {
if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists {
inboundInfo.SpeedLimiter.Delete(key.(string))
}
return true
})
} }
} }
func (l *Limiter) ClearOnlineUserIP(tag string) { func (l *Limiter) ClearOnlineUserIpAndSpeedLimiter(tag string) {
if v, ok := l.InboundInfo.Load(tag); ok { if v, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := v.(*InboundInfo) inboundInfo := v.(*InboundInfo)
inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool {
if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists {
inboundInfo.SpeedLimiter.Delete(key.(string))
}
return true
})
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
inboundInfo.UserOnlineIP.Delete(key) inboundInfo.UserOnlineIP.Delete(key)
return true return true
@ -161,7 +166,7 @@ func (l *Limiter) ClearOnlineUserIP(tag string) {
} }
} }
func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) { func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) (speedLimiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) {
if value, ok := l.InboundInfo.Load(tag); ok { if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo) inboundInfo := value.(*InboundInfo)
nodeLimit := inboundInfo.NodeSpeedLimit nodeLimit := inboundInfo.NodeSpeedLimit
@ -197,7 +202,7 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
limit := determineRate(nodeLimit, userLimit) // If need the Speed limit limit := determineRate(nodeLimit, userLimit) // If need the Speed limit
if limit > 0 { if limit > 0 {
limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s
if v, ok := inboundInfo.BucketHub.LoadOrStore(email, limiter); ok { if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok {
bucket := v.(*ratelimit.Bucket) bucket := v.(*ratelimit.Bucket)
return bucket, true, false return bucket, true, false
} else { } else {

View File

@ -74,14 +74,14 @@ func (p *Core) GetUserTraffic(email string) (up int64, down int64) {
return up, down return up, down
} }
func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIp, error) { func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) {
return p.dispatcher.Limiter.GetOnlineUserIp(tag) return p.dispatcher.Limiter.ListOnlineUserIp(tag)
} }
func (p *Core) UpdateOnlineIp(tag string, ips []dispatcher.UserIp) { func (p *Core) UpdateOnlineIp(tag string, ips []dispatcher.UserIpList) {
p.dispatcher.Limiter.UpdateOnlineUserIP(tag, ips) p.dispatcher.Limiter.UpdateOnlineUserIP(tag, ips)
} }
func (p *Core) ClearOnlineIp(tag string) { func (p *Core) ClearOnlineIp(tag string) {
p.dispatcher.Limiter.ClearOnlineUserIP(tag) p.dispatcher.Limiter.ClearOnlineUserIpAndSpeedLimiter(tag)
} }

View File

@ -407,7 +407,7 @@ func (c *Node) onlineIpReport() (err error) {
} }
log.Printf("[Node: %d] Report %d online ip", c.nodeInfo.NodeId, len(onlineIp)) log.Printf("[Node: %d] Report %d online ip", c.nodeInfo.NodeId, len(onlineIp))
if rsp.StatusCode() == 200 { if rsp.StatusCode() == 200 {
onlineIp = []dispatcher.UserIp{} onlineIp = []dispatcher.UserIpList{}
err := json.Unmarshal(rsp.Body(), &onlineIp) err := json.Unmarshal(rsp.Body(), &onlineIp)
if err != nil { if err != nil {
log.Print(err) log.Print(err)