mirror of
https://github.com/nezhahq/nezha.git
synced 2025-01-22 20:58:14 -05:00
68d7e16773
* add cron api & refactor alert rule * add nat api * fix swagger * remove unnecessary steps
180 lines
4.5 KiB
Go
180 lines
4.5 KiB
Go
package singleton
|
||
|
||
import (
|
||
"bytes"
|
||
"fmt"
|
||
"slices"
|
||
"sync"
|
||
|
||
"github.com/jinzhu/copier"
|
||
|
||
"github.com/robfig/cron/v3"
|
||
|
||
"github.com/naiba/nezha/model"
|
||
pb "github.com/naiba/nezha/proto"
|
||
)
|
||
|
||
var (
|
||
Cron *cron.Cron
|
||
Crons map[uint64]*model.Cron // [CronID] -> *model.Cron
|
||
CronLock sync.RWMutex
|
||
|
||
CronList []*model.Cron
|
||
)
|
||
|
||
func InitCronTask() {
|
||
Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc))
|
||
Crons = make(map[uint64]*model.Cron)
|
||
}
|
||
|
||
// loadCronTasks 加载计划任务
|
||
func loadCronTasks() {
|
||
InitCronTask()
|
||
DB.Find(&CronList)
|
||
var err error
|
||
var notificationGroupList []uint64
|
||
notificationMsgMap := make(map[uint64]*bytes.Buffer)
|
||
for i := 0; i < len(CronList); i++ {
|
||
// 触发任务类型无需注册
|
||
if CronList[i].TaskType == model.CronTypeTriggerTask {
|
||
Crons[CronList[i].ID] = CronList[i]
|
||
continue
|
||
}
|
||
// 注册计划任务
|
||
CronList[i].CronJobID, err = Cron.AddFunc(CronList[i].Scheduler, CronTrigger(CronList[i]))
|
||
if err == nil {
|
||
Crons[CronList[i].ID] = CronList[i]
|
||
} else {
|
||
// 当前通知组首次出现 将其加入通知组列表并初始化通知组消息缓存
|
||
if _, ok := notificationMsgMap[CronList[i].NotificationGroupID]; !ok {
|
||
notificationGroupList = append(notificationGroupList, CronList[i].NotificationGroupID)
|
||
notificationMsgMap[CronList[i].NotificationGroupID] = bytes.NewBufferString("")
|
||
notificationMsgMap[CronList[i].NotificationGroupID].WriteString("调度失败的计划任务:[")
|
||
}
|
||
notificationMsgMap[CronList[i].NotificationGroupID].WriteString(fmt.Sprintf("%d,", CronList[i].ID))
|
||
}
|
||
}
|
||
// 向注册错误的计划任务所在通知组发送通知
|
||
for _, gid := range notificationGroupList {
|
||
notificationMsgMap[gid].WriteString("] 这些任务将无法正常执行,请进入后点重新修改保存。")
|
||
SendNotification(gid, notificationMsgMap[gid].String(), nil)
|
||
}
|
||
Cron.Start()
|
||
}
|
||
|
||
func OnRefreshOrAddCron(c *model.Cron) {
|
||
CronLock.Lock()
|
||
defer CronLock.Unlock()
|
||
crOld := Crons[c.ID]
|
||
if crOld != nil && crOld.CronJobID != 0 {
|
||
Cron.Remove(crOld.CronJobID)
|
||
}
|
||
|
||
delete(Crons, c.ID)
|
||
Crons[c.ID] = c
|
||
}
|
||
|
||
func UpdateCronList() {
|
||
CronLock.RLock()
|
||
defer CronLock.RUnlock()
|
||
|
||
CronList = make([]*model.Cron, 0, len(Crons))
|
||
for _, c := range Crons {
|
||
CronList = append(CronList, c)
|
||
}
|
||
slices.SortFunc(CronList, func(a, b *model.Cron) int {
|
||
if a.ID < b.ID {
|
||
return -1
|
||
} else if a.ID == b.ID {
|
||
return 0
|
||
}
|
||
return 1
|
||
})
|
||
}
|
||
|
||
func OnDeleteCron(id []uint64) {
|
||
CronLock.Lock()
|
||
defer CronLock.Unlock()
|
||
for _, i := range id {
|
||
cr := Crons[i]
|
||
if cr != nil && cr.CronJobID != 0 {
|
||
Cron.Remove(cr.CronJobID)
|
||
}
|
||
delete(Crons, i)
|
||
}
|
||
}
|
||
|
||
func ManualTrigger(c *model.Cron) {
|
||
CronTrigger(c)()
|
||
}
|
||
|
||
func SendTriggerTasks(taskIDs []uint64, triggerServer uint64) {
|
||
CronLock.RLock()
|
||
var cronLists []*model.Cron
|
||
for _, taskID := range taskIDs {
|
||
if c, ok := Crons[taskID]; ok {
|
||
cronLists = append(cronLists, c)
|
||
}
|
||
}
|
||
CronLock.RUnlock()
|
||
|
||
// 依次调用CronTrigger发送任务
|
||
for _, c := range cronLists {
|
||
go CronTrigger(c, triggerServer)()
|
||
}
|
||
}
|
||
|
||
func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
|
||
crIgnoreMap := make(map[uint64]bool)
|
||
for j := 0; j < len(cr.Servers); j++ {
|
||
crIgnoreMap[cr.Servers[j]] = true
|
||
}
|
||
return func() {
|
||
if cr.Cover == model.CronCoverAlertTrigger {
|
||
if len(triggerServer) == 0 {
|
||
return
|
||
}
|
||
ServerLock.RLock()
|
||
defer ServerLock.RUnlock()
|
||
if s, ok := ServerList[triggerServer[0]]; ok {
|
||
if s.TaskStream != nil {
|
||
s.TaskStream.Send(&pb.Task{
|
||
Id: cr.ID,
|
||
Data: cr.Command,
|
||
Type: model.TaskTypeCommand,
|
||
})
|
||
} else {
|
||
// 保存当前服务器状态信息
|
||
curServer := model.Server{}
|
||
copier.Copy(&curServer, s)
|
||
SendNotification(cr.NotificationGroupID, fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), nil, &curServer)
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
ServerLock.RLock()
|
||
defer ServerLock.RUnlock()
|
||
for _, s := range ServerList {
|
||
if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
|
||
continue
|
||
}
|
||
if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] {
|
||
continue
|
||
}
|
||
if s.TaskStream != nil {
|
||
s.TaskStream.Send(&pb.Task{
|
||
Id: cr.ID,
|
||
Data: cr.Command,
|
||
Type: model.TaskTypeCommand,
|
||
})
|
||
} else {
|
||
// 保存当前服务器状态信息
|
||
curServer := model.Server{}
|
||
copier.Copy(&curServer, s)
|
||
SendNotification(cr.NotificationGroupID, fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), nil, &curServer)
|
||
}
|
||
}
|
||
}
|
||
}
|