improve: 优化可能造成流量统计异常的情况

This commit is contained in:
naiba 2024-08-11 10:35:19 +08:00
parent c18e0e420e
commit b1d77a1d27
6 changed files with 39 additions and 28 deletions

View File

@ -72,11 +72,11 @@ func execCase(t *testing.T, item testSt) {
UdpConnCount: 0, UdpConnCount: 0,
ProcessCount: 0, ProcessCount: 0,
}, },
LastActive: time.Time{}, LastActive: time.Time{},
TaskClose: nil, TaskClose: nil,
TaskStream: nil, TaskStream: nil,
PrevHourlyTransferIn: 0, PrevTransferInSnapshot: 0,
PrevHourlyTransferOut: 0, PrevTransferOutSnapshot: 0,
} }
ns := NotificationServerBundle{ ns := NotificationServerBundle{
Notification: &n, Notification: &n,

View File

@ -5,6 +5,8 @@ import (
"time" "time"
"gorm.io/gorm" "gorm.io/gorm"
"github.com/naiba/nezha/pkg/utils"
) )
const ( const (
@ -103,21 +105,21 @@ func (u *Rule) Snapshot(cycleTransferStats *CycleTransferStats, server *Server,
src = float64(server.LastActive.Unix()) src = float64(server.LastActive.Unix())
} }
case "transfer_in_cycle": case "transfer_in_cycle":
src = float64(server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn)) src = float64(utils.Uint64SubInt64(server.State.NetInTransfer, server.PrevTransferInSnapshot))
if u.CycleInterval != 0 { if u.CycleInterval != 0 {
var res NResult var res NResult
db.Model(&Transfer{}).Select("SUM(`in`) AS n").Where("datetime(`created_at`) >= datetime(?) AND server_id = ?", u.GetTransferDurationStart().UTC(), server.ID).Scan(&res) db.Model(&Transfer{}).Select("SUM(`in`) AS n").Where("datetime(`created_at`) >= datetime(?) AND server_id = ?", u.GetTransferDurationStart().UTC(), server.ID).Scan(&res)
src += float64(res.N) src += float64(res.N)
} }
case "transfer_out_cycle": case "transfer_out_cycle":
src = float64(server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut)) src = float64(utils.Uint64SubInt64(server.State.NetOutTransfer, server.PrevTransferOutSnapshot))
if u.CycleInterval != 0 { if u.CycleInterval != 0 {
var res NResult var res NResult
db.Model(&Transfer{}).Select("SUM(`out`) AS n").Where("datetime(`created_at`) >= datetime(?) AND server_id = ?", u.GetTransferDurationStart().UTC(), server.ID).Scan(&res) db.Model(&Transfer{}).Select("SUM(`out`) AS n").Where("datetime(`created_at`) >= datetime(?) AND server_id = ?", u.GetTransferDurationStart().UTC(), server.ID).Scan(&res)
src += float64(res.N) src += float64(res.N)
} }
case "transfer_all_cycle": case "transfer_all_cycle":
src = float64(server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut) + server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn)) src = float64(utils.Uint64SubInt64(server.State.NetOutTransfer, server.PrevTransferOutSnapshot) + utils.Uint64SubInt64(server.State.NetInTransfer, server.PrevTransferInSnapshot))
if u.CycleInterval != 0 { if u.CycleInterval != 0 {
var res NResult var res NResult
db.Model(&Transfer{}).Select("SUM(`in`+`out`) AS n").Where("datetime(`created_at`) >= datetime(?) AND server_id = ?", u.GetTransferDurationStart().UTC(), server.ID).Scan(&res) db.Model(&Transfer{}).Select("SUM(`in`+`out`) AS n").Where("datetime(`created_at`) >= datetime(?) AND server_id = ?", u.GetTransferDurationStart().UTC(), server.ID).Scan(&res)

View File

@ -30,8 +30,8 @@ type Server struct {
TaskClose chan error `gorm:"-" json:"-"` TaskClose chan error `gorm:"-" json:"-"`
TaskStream pb.NezhaService_RequestTaskServer `gorm:"-" json:"-"` TaskStream pb.NezhaService_RequestTaskServer `gorm:"-" json:"-"`
PrevHourlyTransferIn int64 `gorm:"-" json:"-"` // 上次数据点时的入站使用量 PrevTransferInSnapshot int64 `gorm:"-" json:"-"` // 上次数据点时的入站使用量
PrevHourlyTransferOut int64 `gorm:"-" json:"-"` // 上次数据点时的出站使用量 PrevTransferOutSnapshot int64 `gorm:"-" json:"-"` // 上次数据点时的出站使用量
} }
func (s *Server) CopyFromRunningServer(old *Server) { func (s *Server) CopyFromRunningServer(old *Server) {
@ -40,8 +40,8 @@ func (s *Server) CopyFromRunningServer(old *Server) {
s.LastActive = old.LastActive s.LastActive = old.LastActive
s.TaskClose = old.TaskClose s.TaskClose = old.TaskClose
s.TaskStream = old.TaskStream s.TaskStream = old.TaskStream
s.PrevHourlyTransferIn = old.PrevHourlyTransferIn s.PrevTransferInSnapshot = old.PrevTransferInSnapshot
s.PrevHourlyTransferOut = old.PrevHourlyTransferOut s.PrevTransferOutSnapshot = old.PrevTransferOutSnapshot
} }
func boolToString(b bool) string { func boolToString(b bool) string {

View File

@ -76,3 +76,10 @@ func GenerateRandomString(n int) (string, error) {
} }
return string(ret), nil return string(ret), nil
} }
func Uint64SubInt64(a uint64, b int64) uint64 {
if b < 0 {
return a + uint64(-b)
}
return a - uint64(b)
}

View File

@ -112,10 +112,10 @@ func (s *NezhaHandler) ReportSystemState(c context.Context, r *pb.State) (*pb.Re
singleton.ServerList[clientID].LastActive = time.Now() singleton.ServerList[clientID].LastActive = time.Now()
singleton.ServerList[clientID].State = &state singleton.ServerList[clientID].State = &state
// 如果从未记录过,先打点,等到小时时间点时入库 // 应对 dashboard 重启的情况,如果从未记录过,先打点,等到小时时间点时入库
if singleton.ServerList[clientID].PrevHourlyTransferIn == 0 || singleton.ServerList[clientID].PrevHourlyTransferOut == 0 { if singleton.ServerList[clientID].PrevTransferInSnapshot == 0 || singleton.ServerList[clientID].PrevTransferOutSnapshot == 0 {
singleton.ServerList[clientID].PrevHourlyTransferIn = int64(state.NetInTransfer) singleton.ServerList[clientID].PrevTransferInSnapshot = int64(state.NetInTransfer)
singleton.ServerList[clientID].PrevHourlyTransferOut = int64(state.NetOutTransfer) singleton.ServerList[clientID].PrevTransferOutSnapshot = int64(state.NetOutTransfer)
} }
return &pb.Receipt{Proced: true}, nil return &pb.Receipt{Proced: true}, nil
@ -135,9 +135,8 @@ func (s *NezhaHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rece
// 检查并更新DDNS // 检查并更新DDNS
if singleton.Conf.DDNS.Enable && if singleton.Conf.DDNS.Enable &&
singleton.ServerList[clientID].EnableDDNS && singleton.ServerList[clientID].EnableDDNS &&
singleton.ServerList[clientID].Host != nil &&
host.IP != "" && host.IP != "" &&
singleton.ServerList[clientID].Host.IP != host.IP { (singleton.ServerList[clientID].Host == nil || singleton.ServerList[clientID].Host.IP != host.IP) {
serverDomain := singleton.ServerList[clientID].DDNSDomain serverDomain := singleton.ServerList[clientID].DDNSDomain
if singleton.Conf.DDNS.Provider == "" { if singleton.Conf.DDNS.Provider == "" {
provider, err = singleton.GetDDNSProviderFromProfile(singleton.ServerList[clientID].DDNSProfile) provider, err = singleton.GetDDNSProviderFromProfile(singleton.ServerList[clientID].DDNSProfile)
@ -164,10 +163,9 @@ func (s *NezhaHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rece
} }
// 发送IP变动通知 // 发送IP变动通知
if singleton.Conf.EnableIPChangeNotification && if singleton.ServerList[clientID].Host != nil && singleton.Conf.EnableIPChangeNotification &&
((singleton.Conf.Cover == model.ConfigCoverAll && !singleton.Conf.IgnoredIPNotificationServerIDs[clientID]) || ((singleton.Conf.Cover == model.ConfigCoverAll && !singleton.Conf.IgnoredIPNotificationServerIDs[clientID]) ||
(singleton.Conf.Cover == model.ConfigCoverIgnoreAll && singleton.Conf.IgnoredIPNotificationServerIDs[clientID])) && (singleton.Conf.Cover == model.ConfigCoverIgnoreAll && singleton.Conf.IgnoredIPNotificationServerIDs[clientID])) &&
singleton.ServerList[clientID].Host != nil &&
singleton.ServerList[clientID].Host.IP != "" && singleton.ServerList[clientID].Host.IP != "" &&
host.IP != "" && host.IP != "" &&
singleton.ServerList[clientID].Host.IP != host.IP { singleton.ServerList[clientID].Host.IP != host.IP {
@ -184,10 +182,14 @@ func (s *NezhaHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rece
nil) nil)
} }
// 判断是否是机器重启,如果是机器重启要录入最后记录的流量里面 /**
if singleton.ServerList[clientID].Host.BootTime < host.BootTime { * 这里的 singleton 中的数据都是关机前的旧数据
singleton.ServerList[clientID].PrevHourlyTransferIn = singleton.ServerList[clientID].PrevHourlyTransferIn - int64(singleton.ServerList[clientID].State.NetInTransfer) * agent 重启时bootTime 变大agent 端会先上报 host 信息然后上报 state 信息
singleton.ServerList[clientID].PrevHourlyTransferOut = singleton.ServerList[clientID].PrevHourlyTransferOut - int64(singleton.ServerList[clientID].State.NetOutTransfer) * 这是可以借助上报顺序的空档将停机前的流量统计数据标记下来加到下一个小时的数据点上
*/
if singleton.ServerList[clientID].Host != nil && singleton.ServerList[clientID].Host.BootTime < host.BootTime {
singleton.ServerList[clientID].PrevTransferInSnapshot = singleton.ServerList[clientID].PrevTransferInSnapshot - int64(singleton.ServerList[clientID].State.NetInTransfer)
singleton.ServerList[clientID].PrevTransferOutSnapshot = singleton.ServerList[clientID].PrevTransferOutSnapshot - int64(singleton.ServerList[clientID].State.NetOutTransfer)
} }
// 不要冲掉国家码 // 不要冲掉国家码

View File

@ -100,14 +100,14 @@ func RecordTransferHourlyUsage() {
for id, server := range ServerList { for id, server := range ServerList {
tx := model.Transfer{ tx := model.Transfer{
ServerID: id, ServerID: id,
In: server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn), In: utils.Uint64SubInt64(server.State.NetInTransfer, server.PrevTransferInSnapshot),
Out: server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut), Out: utils.Uint64SubInt64(server.State.NetOutTransfer, server.PrevTransferOutSnapshot),
} }
if tx.In == 0 && tx.Out == 0 { if tx.In == 0 && tx.Out == 0 {
continue continue
} }
server.PrevHourlyTransferIn = int64(server.State.NetInTransfer) server.PrevTransferInSnapshot = int64(server.State.NetInTransfer)
server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer) server.PrevTransferOutSnapshot = int64(server.State.NetOutTransfer)
tx.CreatedAt = nowTrimSeconds tx.CreatedAt = nowTrimSeconds
txs = append(txs, tx) txs = append(txs, tx)
} }