diff --git a/core/app/dispatcher/default.go b/core/app/dispatcher/default.go index 176faef..609d343 100644 --- a/core/app/dispatcher/default.go +++ b/core/app/dispatcher/default.go @@ -228,7 +228,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn if user != nil && len(user.Email) > 0 { // Speed Limit and Device Limit - bucket, ok, reject := d.Limiter.GetUserBucket(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String()) + bucket, ok, reject := d.Limiter.CheckSpeedAndDeviceLimit(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String()) if reject { newError("Devices reach the limit: ", user.Email).AtError().WriteToLog() common.Close(outboundLink.Writer) diff --git a/core/app/dispatcher/limiter.go b/core/app/dispatcher/limiter.go index 1996b1a..cc8edc4 100644 --- a/core/app/dispatcher/limiter.go +++ b/core/app/dispatcher/limiter.go @@ -21,7 +21,7 @@ type InboundInfo struct { Tag string NodeSpeedLimit uint64 UserInfo *sync.Map // Key: Uid value: UserInfo - BucketHub *sync.Map // key: Uid, value: *ratelimit.Bucket + SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool } @@ -39,7 +39,7 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userLi inboundInfo := &InboundInfo{ Tag: tag, NodeSpeedLimit: nodeInfo.SpeedLimit, - BucketHub: new(sync.Map), + SpeedLimiter: new(sync.Map), UserOnlineIP: new(sync.Map), } userMap := new(sync.Map) @@ -73,7 +73,7 @@ func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, upd SpeedLimit: nodeInfo.SpeedLimit, DeviceLimit: nodeInfo.DeviceLimit, }) - inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag, + inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag, (updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID)) // Delete old limiter bucket } } else { @@ -87,22 +87,15 @@ func (l *Limiter) DeleteInboundLimiter(tag string) error { return nil } -type UserIp struct { - Uid int `json:"Uid"` - IPs []string `json:"Ips"` +type UserIpList struct { + Uid int `json:"Uid"` + IpList []string `json:"Ips"` } -func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) { +func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) { if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) - // Clear Speed Limiter bucket for users who are not online - inboundInfo.BucketHub.Range(func(key, value interface{}) bool { - if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { - inboundInfo.BucketHub.Delete(key.(string)) - } - return true - }) - onlineUser := make([]UserIp, 0) + onlineUser := make([]UserIpList, 0) var ipMap *sync.Map inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { ipMap = value.(*sync.Map) @@ -115,9 +108,9 @@ func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) { }) if len(ip) > 0 { if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok { - onlineUser = append(onlineUser, UserIp{ - Uid: u.(UserInfo).UID, - IPs: ip, + onlineUser = append(onlineUser, UserIpList{ + Uid: u.(UserInfo).UID, + IpList: ip, }) } } @@ -132,7 +125,7 @@ func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) { } } -func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIp) { +func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIpList) { if v, ok := l.InboundInfo.Load(tag); ok { inboundInfo := v.(*InboundInfo) //Clear old IP @@ -143,17 +136,29 @@ func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIp) { // Update User Online IP for i := range userIpList { ipMap := new(sync.Map) - for _, userIp := range (userIpList)[i].IPs { + for _, userIp := range (userIpList)[i].IpList { ipMap.Store(userIp, false) } inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap) } + inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool { + if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { + inboundInfo.SpeedLimiter.Delete(key.(string)) + } + return true + }) } } -func (l *Limiter) ClearOnlineUserIP(tag string) { +func (l *Limiter) ClearOnlineUserIpAndSpeedLimiter(tag string) { if v, ok := l.InboundInfo.Load(tag); ok { inboundInfo := v.(*InboundInfo) + inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool { + if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists { + inboundInfo.SpeedLimiter.Delete(key.(string)) + } + return true + }) inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { inboundInfo.UserOnlineIP.Delete(key) return true @@ -161,7 +166,7 @@ func (l *Limiter) ClearOnlineUserIP(tag string) { } } -func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) { +func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) (speedLimiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) { if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) nodeLimit := inboundInfo.NodeSpeedLimit @@ -197,7 +202,7 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r limit := determineRate(nodeLimit, userLimit) // If need the Speed limit if limit > 0 { limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s - if v, ok := inboundInfo.BucketHub.LoadOrStore(email, limiter); ok { + if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok { bucket := v.(*ratelimit.Bucket) return bucket, true, false } else { diff --git a/core/user.go b/core/user.go index 14b1c4a..7231b4b 100644 --- a/core/user.go +++ b/core/user.go @@ -74,14 +74,14 @@ func (p *Core) GetUserTraffic(email string) (up int64, down int64) { return up, down } -func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIp, error) { - return p.dispatcher.Limiter.GetOnlineUserIp(tag) +func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) { + return p.dispatcher.Limiter.ListOnlineUserIp(tag) } -func (p *Core) UpdateOnlineIp(tag string, ips []dispatcher.UserIp) { +func (p *Core) UpdateOnlineIp(tag string, ips []dispatcher.UserIpList) { p.dispatcher.Limiter.UpdateOnlineUserIP(tag, ips) } func (p *Core) ClearOnlineIp(tag string) { - p.dispatcher.Limiter.ClearOnlineUserIP(tag) + p.dispatcher.Limiter.ClearOnlineUserIpAndSpeedLimiter(tag) } diff --git a/node/node.go b/node/node.go index aec6bdb..dc806ec 100644 --- a/node/node.go +++ b/node/node.go @@ -407,7 +407,7 @@ func (c *Node) onlineIpReport() (err error) { } log.Printf("[Node: %d] Report %d online ip", c.nodeInfo.NodeId, len(onlineIp)) if rsp.StatusCode() == 200 { - onlineIp = []dispatcher.UserIp{} + onlineIp = []dispatcher.UserIpList{} err := json.Unmarshal(rsp.Body(), &onlineIp) if err != nil { log.Print(err)