From c65028188ce79807c0250d24c065ccfc2e56460e Mon Sep 17 00:00:00 2001 From: Akkia Date: Tue, 12 Apr 2022 13:16:33 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E7=BB=84=E7=BB=87=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/dashboard/main.go | 176 ++---------------------------- model/config.go | 3 + service/singleton/crontask.go | 81 ++++++++++++++ service/singleton/notification.go | 2 +- service/singleton/server.go | 58 ++++++++++ service/singleton/singleton.go | 105 ++++++------------ service/singleton/toolfunc.go | 95 ++++++++++++++++ 7 files changed, 281 insertions(+), 239 deletions(-) create mode 100644 service/singleton/crontask.go create mode 100644 service/singleton/server.go create mode 100644 service/singleton/toolfunc.go diff --git a/cmd/dashboard/main.go b/cmd/dashboard/main.go index 3bfeb2a..540c2cf 100644 --- a/cmd/dashboard/main.go +++ b/cmd/dashboard/main.go @@ -1,196 +1,40 @@ package main import ( - "bytes" "context" - "fmt" - "log" - "time" - - "github.com/ory/graceful" - "github.com/patrickmn/go-cache" - "github.com/robfig/cron/v3" - "gorm.io/driver/sqlite" - "gorm.io/gorm" - "github.com/naiba/nezha/cmd/dashboard/controller" "github.com/naiba/nezha/cmd/dashboard/rpc" "github.com/naiba/nezha/model" "github.com/naiba/nezha/service/singleton" + "github.com/ory/graceful" + "log" ) func init() { - shanghai, err := time.LoadLocation("Asia/Shanghai") - if err != nil { - panic(err) - } - // 初始化 dao 包 singleton.Init() - singleton.Conf = &model.Config{} - singleton.Cron = cron.New(cron.WithSeconds(), cron.WithLocation(shanghai)) - singleton.Crons = make(map[uint64]*model.Cron) - singleton.ServerList = make(map[uint64]*model.Server) - singleton.SecretToID = make(map[string]uint64) - - err = singleton.Conf.Read("data/config.yaml") - if err != nil { - panic(err) - } - singleton.DB, err = gorm.Open(sqlite.Open("data/sqlite.db"), &gorm.Config{ - CreateBatchSize: 200, - }) - if err != nil { - panic(err) - } - if singleton.Conf.Debug { - singleton.DB = singleton.DB.Debug() - } - if singleton.Conf.GRPCPort == 0 { - singleton.Conf.GRPCPort = 5555 - } - singleton.Cache = cache.New(5*time.Minute, 10*time.Minute) - + singleton.InitConfigFromPath("data/config.yaml") + singleton.InitDBFromPath("data/sqlite.db") initSystem() } func initSystem() { - singleton.DB.AutoMigrate(model.Server{}, model.User{}, - model.Notification{}, model.AlertRule{}, model.Monitor{}, - model.MonitorHistory{}, model.Cron{}, model.Transfer{}) - - singleton.LoadNotifications() - loadServers() //加载服务器列表 - loadCrons() //加载计划任务 + // 启动 singleton 包下的所有服务 + singleton.LoadSingleton() // 每天的3:30 对 监控记录 和 流量记录 进行清理 - _, err := singleton.Cron.AddFunc("0 30 3 * * *", cleanMonitorHistory) - if err != nil { + if _, err := singleton.Cron.AddFunc("0 30 3 * * *", singleton.CleanMonitorHistory); err != nil { panic(err) } // 每小时对流量记录进行打点 - _, err = singleton.Cron.AddFunc("0 0 * * * *", recordTransferHourlyUsage) - if err != nil { + if _, err := singleton.Cron.AddFunc("0 0 * * * *", singleton.RecordTransferHourlyUsage); err != nil { panic(err) } } -// recordTransferHourlyUsage 对流量记录进行打点 -func recordTransferHourlyUsage() { - singleton.ServerLock.Lock() - defer singleton.ServerLock.Unlock() - now := time.Now() - nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) - var txs []model.Transfer - for id, server := range singleton.ServerList { - tx := model.Transfer{ - ServerID: id, - In: server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn), - Out: server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut), - } - if tx.In == 0 && tx.Out == 0 { - continue - } - server.PrevHourlyTransferIn = int64(server.State.NetInTransfer) - server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer) - tx.CreatedAt = nowTrimSeconds - txs = append(txs, tx) - } - if len(txs) == 0 { - return - } - log.Println("NEZHA>> Cron 流量统计入库", len(txs), singleton.DB.Create(txs).Error) -} - -// cleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录 -func cleanMonitorHistory() { - // 清理已被删除的服务器的监控记录与流量记录 - singleton.DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30)) - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)") - // 计算可清理流量记录的时长 - var allServerKeep time.Time - specialServerKeep := make(map[uint64]time.Time) - var specialServerIDs []uint64 - var alerts []model.AlertRule - singleton.DB.Find(&alerts) - for i := 0; i < len(alerts); i++ { - for j := 0; j < len(alerts[i].Rules); j++ { - // 是不是流量记录规则 - if !alerts[i].Rules[j].IsTransferDurationRule() { - continue - } - dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart() - // 判断规则影响的机器范围 - if alerts[i].Rules[j].Cover == model.RuleCoverAll { - // 更新全局可以清理的数据点 - if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) { - allServerKeep = dataCouldRemoveBefore - } - } else { - // 更新特定机器可以清理数据点 - for id := range alerts[i].Rules[j].Ignore { - if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) { - specialServerKeep[id] = dataCouldRemoveBefore - specialServerIDs = append(specialServerIDs, id) - } - } - } - } - } - for id, couldRemove := range specialServerKeep { - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove) - } - if allServerKeep.IsZero() { - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs) - } else { - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep) - } -} - -//loadServers 加载服务器列表并根据ID排序 -func loadServers() { - var servers []model.Server - singleton.DB.Find(&servers) - for _, s := range servers { - innerS := s - innerS.Host = &model.Host{} - innerS.State = &model.HostState{} - singleton.ServerList[innerS.ID] = &innerS - singleton.SecretToID[innerS.Secret] = innerS.ID - } - singleton.ReSortServer() -} - -// loadCrons 加载计划任务 -func loadCrons() { - var crons []model.Cron - singleton.DB.Find(&crons) - var err error - errMsg := new(bytes.Buffer) - for i := 0; i < len(crons); i++ { - cr := crons[i] - - // 注册计划任务 - cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr)) - if err == nil { - singleton.Crons[cr.ID] = &cr - } else { - if errMsg.Len() == 0 { - errMsg.WriteString("调度失败的计划任务:[") - } - errMsg.WriteString(fmt.Sprintf("%d,", cr.ID)) - } - } - if errMsg.Len() > 0 { - msg := errMsg.String() - singleton.SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false) - } - singleton.Cron.Start() -} - func main() { - cleanMonitorHistory() + singleton.CleanMonitorHistory() go rpc.ServeRPC(singleton.Conf.GRPCPort) serviceSentinelDispatchBus := make(chan model.Monitor) // 用于传递服务监控任务信息的channel go rpc.DispatchTask(serviceSentinelDispatchBus) @@ -202,7 +46,7 @@ func main() { return srv.ListenAndServe() }, func(c context.Context) error { log.Println("NEZHA>> Graceful::START") - recordTransferHourlyUsage() + singleton.RecordTransferHourlyUsage() log.Println("NEZHA>> Graceful::END") srv.Shutdown(c) return nil diff --git a/model/config.go b/model/config.go index 92e04ba..dd7c655 100644 --- a/model/config.go +++ b/model/config.go @@ -98,6 +98,9 @@ func (c *Config) Read(path string) error { if c.Site.Theme == "" { c.Site.Theme = "default" } + if c.GRPCPort == 0 { + c.GRPCPort = 5555 + } c.updateIgnoredIPNotificationID() return nil diff --git a/service/singleton/crontask.go b/service/singleton/crontask.go new file mode 100644 index 0000000..b4aafe1 --- /dev/null +++ b/service/singleton/crontask.go @@ -0,0 +1,81 @@ +package singleton + +import ( + "bytes" + "fmt" + "github.com/naiba/nezha/model" + pb "github.com/naiba/nezha/proto" + "github.com/robfig/cron/v3" + "sync" +) + +var ( + Cron *cron.Cron + Crons map[uint64]*model.Cron + CronLock sync.RWMutex +) + +func InitCronTask() { + Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc)) + Crons = make(map[uint64]*model.Cron) +} + +// LoadCronTasks 加载计划任务 +func LoadCronTasks() { + InitCronTask() + var crons []model.Cron + DB.Find(&crons) + var err error + errMsg := new(bytes.Buffer) + for i := 0; i < len(crons); i++ { + cr := crons[i] + + // 注册计划任务 + cr.CronJobID, err = Cron.AddFunc(cr.Scheduler, CronTrigger(cr)) + if err == nil { + Crons[cr.ID] = &cr + } else { + if errMsg.Len() == 0 { + errMsg.WriteString("调度失败的计划任务:[") + } + errMsg.WriteString(fmt.Sprintf("%d,", cr.ID)) + } + } + if errMsg.Len() > 0 { + msg := errMsg.String() + SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false) + } + Cron.Start() +} + +func ManualTrigger(c model.Cron) { + CronTrigger(c)() +} + +func CronTrigger(cr model.Cron) func() { + crIgnoreMap := make(map[uint64]bool) + for j := 0; j < len(cr.Servers); j++ { + crIgnoreMap[cr.Servers[j]] = true + } + return func() { + ServerLock.RLock() + defer ServerLock.RUnlock() + for _, s := range ServerList { + if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] { + continue + } + if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] { + continue + } + if s.TaskStream != nil { + s.TaskStream.Send(&pb.Task{ + Id: cr.ID, + Data: cr.Command, + Type: model.TaskTypeCommand, + }) + } else { + SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false) + } + } + } +} diff --git a/service/singleton/notification.go b/service/singleton/notification.go index a7783d4..476403f 100644 --- a/service/singleton/notification.go +++ b/service/singleton/notification.go @@ -16,7 +16,7 @@ const firstNotificationDelay = time.Minute * 15 var notifications []model.Notification var notificationsLock sync.RWMutex -// LoadNotifications 加载通知方式到 singleton.notifications 变量 +// LoadNotifications 从 DB 加载通知方式到 singleton.notifications 变量 func LoadNotifications() { notificationsLock.Lock() if err := DB.Find(¬ifications).Error; err != nil { diff --git a/service/singleton/server.go b/service/singleton/server.go new file mode 100644 index 0000000..10c228e --- /dev/null +++ b/service/singleton/server.go @@ -0,0 +1,58 @@ +package singleton + +import ( + "github.com/naiba/nezha/model" + "sort" + "sync" +) + +var ( + ServerList map[uint64]*model.Server // [ServerID] -> model.Server + SecretToID map[string]uint64 // [ServerSecret] -> ServerID + ServerLock sync.RWMutex + + SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序 + SortedServerLock sync.RWMutex +) + +// InitServer 初始化 ServerID <-> Secret 的映射 +func InitServer() { + ServerList = make(map[uint64]*model.Server) + SecretToID = make(map[string]uint64) +} + +//LoadServers 加载服务器列表并根据ID排序 +func LoadServers() { + InitServer() + var servers []model.Server + DB.Find(&servers) + for _, s := range servers { + innerS := s + innerS.Host = &model.Host{} + innerS.State = &model.HostState{} + ServerList[innerS.ID] = &innerS + SecretToID[innerS.Secret] = innerS.ID + } + ReSortServer() +} + +// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前) +func ReSortServer() { + ServerLock.RLock() + defer ServerLock.RUnlock() + SortedServerLock.Lock() + defer SortedServerLock.Unlock() + + SortedServerList = []*model.Server{} + for _, s := range ServerList { + SortedServerList = append(SortedServerList, s) + } + + // 按照服务器 ID 排序的具体实现(ID越大越靠前) + sort.SliceStable(SortedServerList, func(i, j int) bool { + if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex { + return SortedServerList[i].ID < SortedServerList[j].ID + } + return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex + }) +} diff --git a/service/singleton/singleton.go b/service/singleton/singleton.go index 531ea5a..2de0947 100644 --- a/service/singleton/singleton.go +++ b/service/singleton/singleton.go @@ -1,18 +1,13 @@ package singleton import ( - "fmt" - "sort" - "sync" + "gorm.io/driver/sqlite" "time" "github.com/patrickmn/go-cache" - "github.com/robfig/cron/v3" "gorm.io/gorm" "github.com/naiba/nezha/model" - "github.com/naiba/nezha/pkg/utils" - pb "github.com/naiba/nezha/proto" ) var Version = "v0.12.18" // !!记得修改 README 中的 badge 版本!! @@ -22,86 +17,52 @@ var ( Cache *cache.Cache DB *gorm.DB Loc *time.Location - - ServerList map[uint64]*model.Server // [ServerID] -> model.Server - SecretToID map[string]uint64 // [ServerSecret] -> ServerID - ServerLock sync.RWMutex - - SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序 - SortedServerLock sync.RWMutex ) -// Init 初始化时区为上海时区 +// Init 初始化singleton func Init() { + // 初始化时区至上海 UTF+8 var err error Loc, err = time.LoadLocation("Asia/Shanghai") if err != nil { panic(err) } + + Conf = &model.Config{} + Cache = cache.New(5*time.Minute, 10*time.Minute) } -// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前) -func ReSortServer() { - ServerLock.RLock() - defer ServerLock.RUnlock() - SortedServerLock.Lock() - defer SortedServerLock.Unlock() +// LoadSingleton 加载子服务并执行 +func LoadSingleton() { + LoadNotifications() // 加载通知服务 + LoadServers() // 加载服务器列表 + LoadCronTasks() // 加载定时任务 +} - SortedServerList = []*model.Server{} - for _, s := range ServerList { - SortedServerList = append(SortedServerList, s) +// InitConfigFromPath 从给出的文件路径中加载配置 +func InitConfigFromPath(path string) { + err := Conf.Read(path) + if err != nil { + panic(err) } +} - // 按照服务器 ID 排序的具体实现(ID越大越靠前) - sort.SliceStable(SortedServerList, func(i, j int) bool { - if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex { - return SortedServerList[i].ID < SortedServerList[j].ID - } - return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex +// InitDBFromPath 从给出的文件路径中加载数据库 +func InitDBFromPath(path string) { + var err error + DB, err = gorm.Open(sqlite.Open(path), &gorm.Config{ + CreateBatchSize: 200, }) -} - -// =============== Cron Mixin =============== - -var CronLock sync.RWMutex -var Crons map[uint64]*model.Cron -var Cron *cron.Cron - -func ManualTrigger(c model.Cron) { - CronTrigger(c)() -} - -func CronTrigger(cr model.Cron) func() { - crIgnoreMap := make(map[uint64]bool) - for j := 0; j < len(cr.Servers); j++ { - crIgnoreMap[cr.Servers[j]] = true + if err != nil { + panic(err) } - return func() { - ServerLock.RLock() - defer ServerLock.RUnlock() - for _, s := range ServerList { - if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] { - continue - } - if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] { - continue - } - if s.TaskStream != nil { - s.TaskStream.Send(&pb.Task{ - Id: cr.ID, - Data: cr.Command, - Type: model.TaskTypeCommand, - }) - } else { - SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false) - } - } + if Conf.Debug { + DB = DB.Debug() + } + err = DB.AutoMigrate(model.Server{}, model.User{}, + model.Notification{}, model.AlertRule{}, model.Monitor{}, + model.MonitorHistory{}, model.Cron{}, model.Transfer{}) + if err != nil { + panic(err) } } - -func IPDesensitize(ip string) string { - if Conf.EnablePlainIPInNotification { - return ip - } - return utils.IPDesensitize(ip) -} diff --git a/service/singleton/toolfunc.go b/service/singleton/toolfunc.go new file mode 100644 index 0000000..e6a3340 --- /dev/null +++ b/service/singleton/toolfunc.go @@ -0,0 +1,95 @@ +package singleton + +import ( + "github.com/naiba/nezha/model" + "github.com/naiba/nezha/pkg/utils" + "log" + "time" +) + +/* + 该文件保存了一些工具函数 + RecordTransferHourlyUsage 对流量记录进行打点 + CleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录 + IPDesensitize 根据用户设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP) +*/ + +// RecordTransferHourlyUsage 对流量记录进行打点 +func RecordTransferHourlyUsage() { + ServerLock.Lock() + defer ServerLock.Unlock() + now := time.Now() + nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) + var txs []model.Transfer + for id, server := range ServerList { + tx := model.Transfer{ + ServerID: id, + In: server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn), + Out: server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut), + } + if tx.In == 0 && tx.Out == 0 { + continue + } + server.PrevHourlyTransferIn = int64(server.State.NetInTransfer) + server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer) + tx.CreatedAt = nowTrimSeconds + txs = append(txs, tx) + } + if len(txs) == 0 { + return + } + log.Println("NEZHA>> Cron 流量统计入库", len(txs), DB.Create(txs).Error) +} + +// CleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录 +func CleanMonitorHistory() { + // 清理已被删除的服务器的监控记录与流量记录 + DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30)) + DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)") + // 计算可清理流量记录的时长 + var allServerKeep time.Time + specialServerKeep := make(map[uint64]time.Time) + var specialServerIDs []uint64 + var alerts []model.AlertRule + DB.Find(&alerts) + for i := 0; i < len(alerts); i++ { + for j := 0; j < len(alerts[i].Rules); j++ { + // 是不是流量记录规则 + if !alerts[i].Rules[j].IsTransferDurationRule() { + continue + } + dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart() + // 判断规则影响的机器范围 + if alerts[i].Rules[j].Cover == model.RuleCoverAll { + // 更新全局可以清理的数据点 + if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) { + allServerKeep = dataCouldRemoveBefore + } + } else { + // 更新特定机器可以清理数据点 + for id := range alerts[i].Rules[j].Ignore { + if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) { + specialServerKeep[id] = dataCouldRemoveBefore + specialServerIDs = append(specialServerIDs, id) + } + } + } + } + } + for id, couldRemove := range specialServerKeep { + DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove) + } + if allServerKeep.IsZero() { + DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs) + } else { + DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep) + } +} + +// IPDesensitize 根据设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP) +func IPDesensitize(ip string) string { + if Conf.EnablePlainIPInNotification { + return ip + } + return utils.IPDesensitize(ip) +}