nezha/service/singleton/crontask.go
UUBulb 68d7e16773
add cron, nat api & refactor alert rule (#459)
* add cron api & refactor alert rule

* add nat api

* fix swagger

* remove unnecessary steps
2024-10-26 23:57:47 +08:00

180 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package singleton
import (
"bytes"
"fmt"
"slices"
"sync"
"github.com/jinzhu/copier"
"github.com/robfig/cron/v3"
"github.com/naiba/nezha/model"
pb "github.com/naiba/nezha/proto"
)
var (
Cron *cron.Cron
Crons map[uint64]*model.Cron // [CronID] -> *model.Cron
CronLock sync.RWMutex
CronList []*model.Cron
)
func InitCronTask() {
Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc))
Crons = make(map[uint64]*model.Cron)
}
// loadCronTasks 加载计划任务
func loadCronTasks() {
InitCronTask()
DB.Find(&CronList)
var err error
var notificationGroupList []uint64
notificationMsgMap := make(map[uint64]*bytes.Buffer)
for i := 0; i < len(CronList); i++ {
// 触发任务类型无需注册
if CronList[i].TaskType == model.CronTypeTriggerTask {
Crons[CronList[i].ID] = CronList[i]
continue
}
// 注册计划任务
CronList[i].CronJobID, err = Cron.AddFunc(CronList[i].Scheduler, CronTrigger(CronList[i]))
if err == nil {
Crons[CronList[i].ID] = CronList[i]
} else {
// 当前通知组首次出现 将其加入通知组列表并初始化通知组消息缓存
if _, ok := notificationMsgMap[CronList[i].NotificationGroupID]; !ok {
notificationGroupList = append(notificationGroupList, CronList[i].NotificationGroupID)
notificationMsgMap[CronList[i].NotificationGroupID] = bytes.NewBufferString("")
notificationMsgMap[CronList[i].NotificationGroupID].WriteString("调度失败的计划任务:[")
}
notificationMsgMap[CronList[i].NotificationGroupID].WriteString(fmt.Sprintf("%d,", CronList[i].ID))
}
}
// 向注册错误的计划任务所在通知组发送通知
for _, gid := range notificationGroupList {
notificationMsgMap[gid].WriteString("] 这些任务将无法正常执行,请进入后点重新修改保存。")
SendNotification(gid, notificationMsgMap[gid].String(), nil)
}
Cron.Start()
}
func OnRefreshOrAddCron(c *model.Cron) {
CronLock.Lock()
defer CronLock.Unlock()
crOld := Crons[c.ID]
if crOld != nil && crOld.CronJobID != 0 {
Cron.Remove(crOld.CronJobID)
}
delete(Crons, c.ID)
Crons[c.ID] = c
}
func UpdateCronList() {
CronLock.RLock()
defer CronLock.RUnlock()
CronList = make([]*model.Cron, 0, len(Crons))
for _, c := range Crons {
CronList = append(CronList, c)
}
slices.SortFunc(CronList, func(a, b *model.Cron) int {
if a.ID < b.ID {
return -1
} else if a.ID == b.ID {
return 0
}
return 1
})
}
func OnDeleteCron(id []uint64) {
CronLock.Lock()
defer CronLock.Unlock()
for _, i := range id {
cr := Crons[i]
if cr != nil && cr.CronJobID != 0 {
Cron.Remove(cr.CronJobID)
}
delete(Crons, i)
}
}
func ManualTrigger(c *model.Cron) {
CronTrigger(c)()
}
func SendTriggerTasks(taskIDs []uint64, triggerServer uint64) {
CronLock.RLock()
var cronLists []*model.Cron
for _, taskID := range taskIDs {
if c, ok := Crons[taskID]; ok {
cronLists = append(cronLists, c)
}
}
CronLock.RUnlock()
// 依次调用CronTrigger发送任务
for _, c := range cronLists {
go CronTrigger(c, triggerServer)()
}
}
func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
crIgnoreMap := make(map[uint64]bool)
for j := 0; j < len(cr.Servers); j++ {
crIgnoreMap[cr.Servers[j]] = true
}
return func() {
if cr.Cover == model.CronCoverAlertTrigger {
if len(triggerServer) == 0 {
return
}
ServerLock.RLock()
defer ServerLock.RUnlock()
if s, ok := ServerList[triggerServer[0]]; ok {
if s.TaskStream != nil {
s.TaskStream.Send(&pb.Task{
Id: cr.ID,
Data: cr.Command,
Type: model.TaskTypeCommand,
})
} else {
// 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, s)
SendNotification(cr.NotificationGroupID, fmt.Sprintf("[任务失败] %s服务器 %s 离线,无法执行。", cr.Name, s.Name), nil, &curServer)
}
}
return
}
ServerLock.RLock()
defer ServerLock.RUnlock()
for _, s := range ServerList {
if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
continue
}
if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] {
continue
}
if s.TaskStream != nil {
s.TaskStream.Send(&pb.Task{
Id: cr.ID,
Data: cr.Command,
Type: model.TaskTypeCommand,
})
} else {
// 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, s)
SendNotification(cr.NotificationGroupID, fmt.Sprintf("[任务失败] %s服务器 %s 离线,无法执行。", cr.Name, s.Name), nil, &curServer)
}
}
}
}