diff --git a/core/app/dispatcher/limiter.go b/core/app/dispatcher/limiter.go index d4d3f2d..5b8e840 100644 --- a/core/app/dispatcher/limiter.go +++ b/core/app/dispatcher/limiter.go @@ -11,19 +11,20 @@ import ( "time" ) -type UserInfo struct { - UID int - SpeedLimit uint64 - ExpireTime int64 - DeviceLimit int +type UserLimitInfo struct { + UID int + SpeedLimit uint64 + ExpireTime int64 + //DeviceLimit int } type InboundInfo struct { - Tag string - NodeSpeedLimit uint64 - UserInfo *sync.Map // Key: Uid value: UserInfo - SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket - UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool + Tag string + NodeSpeedLimit uint64 + NodeDeviceLimit int + UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo + SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket + UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool } type Limiter struct { @@ -36,47 +37,24 @@ func NewLimiter() *Limiter { } } -func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userList []panel.UserInfo) error { +func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo) error { inboundInfo := &InboundInfo{ - Tag: tag, - NodeSpeedLimit: nodeInfo.SpeedLimit, - SpeedLimiter: new(sync.Map), - UserOnlineIP: new(sync.Map), + Tag: tag, + NodeSpeedLimit: nodeInfo.SpeedLimit, + NodeDeviceLimit: nodeInfo.DeviceLimit, + SpeedLimiter: new(sync.Map), + UserOnlineIP: new(sync.Map), } - userMap := new(sync.Map) - for i := range userList { - /*if (*userList)[i].SpeedLimit == 0 { - (*userList)[i].SpeedLimit = nodeInfo.SpeedLimit - } - if (*userList)[i].DeviceLimit == 0 { - (*userList)[i].DeviceLimit = nodeInfo.DeviceLimit - }*/ - userMap.Store(fmt.Sprintf("%s|%s|%d", tag, (userList)[i].V2rayUser.Email, (userList)[i].UID), - &UserInfo{ - UID: (userList)[i].UID, - SpeedLimit: nodeInfo.SpeedLimit, - DeviceLimit: nodeInfo.DeviceLimit, - }) - } - inboundInfo.UserInfo = userMap + inboundInfo.UserLimitInfo = new(sync.Map) l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info return nil } -func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, added, deleted []panel.UserInfo) error { +func (l *Limiter) UpdateInboundLimiter(tag string, deleted []panel.UserInfo) error { if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) - // Update User info - for i := range added { - inboundInfo.UserInfo.Store(fmt.Sprintf("%s|%s|%d", tag, - (added)[i].V2rayUser.Email, (added)[i].UID), &UserInfo{ - UID: (added)[i].UID, - SpeedLimit: nodeInfo.SpeedLimit, - DeviceLimit: nodeInfo.DeviceLimit, - }) - } for i := range deleted { - inboundInfo.UserInfo.Delete(fmt.Sprintf("%s|%s|%d", tag, + inboundInfo.UserLimitInfo.Delete(fmt.Sprintf("%s|%s|%d", tag, (deleted)[i].V2rayUser.Email, (deleted)[i].UID)) inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag, (deleted)[i].V2rayUser.Email, (deleted)[i].UID)) // Delete limiter bucket @@ -92,16 +70,14 @@ func (l *Limiter) DeleteInboundLimiter(tag string) error { return nil } -func (l *Limiter) UpdateUserSpeedLimit(tag string, userInfo *panel.UserInfo, limit uint64, expire int64) error { +func (l *Limiter) AddUserSpeedLimit(tag string, userInfo *panel.UserInfo, limit uint64, expire int64) error { if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) - if user, ok := inboundInfo.UserInfo.Load(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID)); ok { - user.(*UserInfo).SpeedLimit = limit - user.(*UserInfo).ExpireTime = time.Now().Add(time.Duration(expire) * time.Second).Unix() - inboundInfo.SpeedLimiter.Delete(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID)) - } else { - return fmt.Errorf("no such user in limiter: %s", userInfo.GetUserEmail()) + userLimit := &UserLimitInfo{ + SpeedLimit: limit, + ExpireTime: time.Now().Add(time.Duration(expire) * time.Second).Unix(), } + inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, userInfo.GetUserEmail(), userInfo.UID), userLimit) return nil } else { return fmt.Errorf("no such inbound in limiter: %s", tag) @@ -128,9 +104,9 @@ func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) { return true }) if len(ip) > 0 { - if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok { + if u, ok := inboundInfo.UserLimitInfo.Load(key.(string)); ok { onlineUser = append(onlineUser, UserIpList{ - Uid: u.(*UserInfo).UID, + Uid: u.(*UserLimitInfo).UID, IpList: ip, }) } @@ -192,17 +168,15 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) inboundInfo := value.(*InboundInfo) nodeLimit := inboundInfo.NodeSpeedLimit var userLimit uint64 = 0 - var deviceLimit = 0 expired := false - if v, ok := inboundInfo.UserInfo.Load(email); ok { - u := v.(*UserInfo) + if v, ok := inboundInfo.UserLimitInfo.Load(email); ok { + u := v.(*UserLimitInfo) if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 { userLimit = 0 expired = true } else { userLimit = u.SpeedLimit } - deviceLimit = u.DeviceLimit } ipMap := new(sync.Map) ipMap.Store(ip, true) @@ -216,7 +190,7 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) counter++ return true }) - if counter > deviceLimit && deviceLimit > 0 { + if counter > inboundInfo.NodeDeviceLimit && inboundInfo.NodeDeviceLimit > 0 { ipMap.Delete(ip) return nil, false, true } @@ -226,12 +200,13 @@ func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) } } } - limit := determineRate(nodeLimit, userLimit) // If need the Speed limit + limit := determineSpeedLimit(nodeLimit, userLimit) // If you need the Speed limit if limit > 0 { limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok { if expired { inboundInfo.SpeedLimiter.Store(email, limiter) + inboundInfo.UserLimitInfo.Delete(email) return limiter, true, false } bucket := v.(*ratelimit.Bucket) @@ -270,8 +245,28 @@ func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error { return w.writer.WriteMultiBuffer(mb) } -// determineRate returns the minimum non-zero rate -func determineRate(nodeLimit, userLimit uint64) (limit uint64) { +// determineSpeedLimit returns the minimum non-zero rate +func determineSpeedLimit(nodeLimit, userLimit uint64) (limit uint64) { + if nodeLimit == 0 || userLimit == 0 { + if nodeLimit > userLimit { + return nodeLimit + } else if nodeLimit < userLimit { + return userLimit + } else { + return 0 + } + } else { + if nodeLimit > userLimit { + return userLimit + } else if nodeLimit < userLimit { + return nodeLimit + } else { + return nodeLimit + } + } +} + +func determineDeviceLimit(nodeLimit, userLimit int) (limit int) { if nodeLimit == 0 || userLimit == 0 { if nodeLimit > userLimit { return nodeLimit diff --git a/core/inbound.go b/core/inbound.go index 5987568..6b3e1fd 100644 --- a/core/inbound.go +++ b/core/inbound.go @@ -28,8 +28,8 @@ func (p *Core) AddInbound(config *core.InboundHandlerConfig) error { return nil } -func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userList []panel.UserInfo) error { - return p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, userList) +func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo) error { + return p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo) } func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) { @@ -40,8 +40,8 @@ func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) { return nil, fmt.Errorf("not found limiter") } -func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, added, deleted []panel.UserInfo) error { - return p.dispatcher.Limiter.UpdateInboundLimiter(tag, nodeInfo, added, deleted) +func (p *Core) UpdateInboundLimiter(tag string, deleted []panel.UserInfo) error { + return p.dispatcher.Limiter.UpdateInboundLimiter(tag, deleted) } func (p *Core) DeleteInboundLimiter(tag string) error { diff --git a/core/user.go b/core/user.go index 01f68a8..6a60994 100644 --- a/core/user.go +++ b/core/user.go @@ -80,8 +80,8 @@ func (p *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) { return up, down } -func (p *Core) UpdateUserSpeedLimit(tag string, user *panel.UserInfo, speedLimit uint64, expire int64) error { - return p.dispatcher.Limiter.UpdateUserSpeedLimit(tag, user, speedLimit, expire) +func (p *Core) AddUserSpeedLimit(tag string, user *panel.UserInfo, speedLimit uint64, expire int64) error { + return p.dispatcher.Limiter.AddUserSpeedLimit(tag, user, speedLimit, expire) } func (p *Core) ListOnlineIp(tag string) ([]dispatcher.UserIpList, error) { diff --git a/node/node.go b/node/node.go index 7bcce38..b89a1c4 100644 --- a/node/node.go +++ b/node/node.go @@ -59,17 +59,15 @@ func (c *Node) Start() error { return err } // Update user - userInfo, err := c.apiClient.GetUserList() + c.userList, err = c.apiClient.GetUserList() if err != nil { return err } - err = c.addNewUser(userInfo, newNodeInfo) + err = c.addNewUser(c.userList, newNodeInfo) if err != nil { return err } - //sync controller userList - c.userList = userInfo - if err := c.server.AddInboundLimiter(c.Tag, c.nodeInfo, userInfo); err != nil { + if err := c.server.AddInboundLimiter(c.Tag, c.nodeInfo); err != nil { log.Print(err) } // Add Rule Manager @@ -240,7 +238,7 @@ func (c *Node) nodeInfoMonitor() (err error) { } newNodeInfo = nil // Add Limiter - if err := c.server.AddInboundLimiter(c.Tag, c.nodeInfo, c.userList); err != nil { + if err := c.server.AddInboundLimiter(c.Tag, c.nodeInfo); err != nil { log.Print(err) return nil } @@ -267,7 +265,7 @@ func (c *Node) nodeInfoMonitor() (err error) { } if len(added) > 0 || len(deleted) > 0 { // Update Limiter - if err := c.server.UpdateInboundLimiter(c.Tag, c.nodeInfo, added, deleted); err != nil { + if err := c.server.UpdateInboundLimiter(c.Tag, deleted); err != nil { log.Print(err) } } @@ -445,7 +443,7 @@ func (c *Node) DynamicSpeedLimit() error { for i := range c.userList { up, down := c.server.GetUserTraffic(c.buildUserTag(&(c.userList)[i]), false) if c.userList[i].Traffic+down+up/1024/1024 > c.config.DynamicSpeedLimitConfig.Traffic { - err := c.server.UpdateUserSpeedLimit(c.Tag, + err := c.server.AddUserSpeedLimit(c.Tag, &c.userList[i], c.config.DynamicSpeedLimitConfig.SpeedLimit, time.Now().Add(time.Second*time.Duration(c.config.DynamicSpeedLimitConfig.ExpireTime)).Unix())