mirror of
https://github.com/nezhahq/nezha.git
synced 2025-02-08 12:38:13 -05:00
添加触发任务类型
This commit is contained in:
parent
4f26f36c8e
commit
7228f334a5
@ -415,6 +415,7 @@ func (ma *memberAPI) addOrEditMonitor(c *gin.Context) {
|
|||||||
|
|
||||||
type cronForm struct {
|
type cronForm struct {
|
||||||
ID uint64
|
ID uint64
|
||||||
|
TaskType uint8 // 0:计划任务 1:触发任务
|
||||||
Name string
|
Name string
|
||||||
Scheduler string
|
Scheduler string
|
||||||
Command string
|
Command string
|
||||||
@ -429,6 +430,7 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) {
|
|||||||
var cr model.Cron
|
var cr model.Cron
|
||||||
err := c.ShouldBindJSON(&cf)
|
err := c.ShouldBindJSON(&cf)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
cr.TaskType = cf.TaskType
|
||||||
cr.Name = cf.Name
|
cr.Name = cf.Name
|
||||||
cr.Scheduler = cf.Scheduler
|
cr.Scheduler = cf.Scheduler
|
||||||
cr.Command = cf.Command
|
cr.Command = cf.Command
|
||||||
@ -439,6 +441,17 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) {
|
|||||||
cr.Cover = cf.Cover
|
cr.Cover = cf.Cover
|
||||||
err = utils.Json.Unmarshal([]byte(cf.ServersRaw), &cr.Servers)
|
err = utils.Json.Unmarshal([]byte(cf.ServersRaw), &cr.Servers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 计划任务类型不得使用触发服务器执行方式
|
||||||
|
if cr.TaskType == model.CronTypeCronTask && cr.Cover == model.CronCoverSelf {
|
||||||
|
err = errors.New("计划任务类型不得使用触发服务器执行方式")
|
||||||
|
c.JSON(http.StatusOK, model.Response{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Message: fmt.Sprintf("请求错误:%s", err),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
tx := singleton.DB.Begin()
|
tx := singleton.DB.Begin()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// 保证NotificationTag不为空
|
// 保证NotificationTag不为空
|
||||||
@ -452,7 +465,10 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr))
|
// 对于计划任务类型,需要更新CronJob
|
||||||
|
if cf.TaskType == model.CronTypeCronTask {
|
||||||
|
cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = tx.Commit().Error
|
err = tx.Commit().Error
|
||||||
|
@ -11,11 +11,15 @@ import (
|
|||||||
const (
|
const (
|
||||||
CronCoverIgnoreAll = iota
|
CronCoverIgnoreAll = iota
|
||||||
CronCoverAll
|
CronCoverAll
|
||||||
|
CronCoverSelf
|
||||||
|
CronTypeCronTask = 0
|
||||||
|
CronTypeTriggerTask = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cron struct {
|
type Cron struct {
|
||||||
Common
|
Common
|
||||||
Name string
|
Name string
|
||||||
|
TaskType uint8 `gorm:"default:0"` // 0:计划任务 1:触发任务
|
||||||
Scheduler string //分钟 小时 天 月 星期
|
Scheduler string //分钟 小时 天 月 星期
|
||||||
Command string
|
Command string
|
||||||
Servers []uint64 `gorm:"-"`
|
Servers []uint64 `gorm:"-"`
|
||||||
@ -23,7 +27,7 @@ type Cron struct {
|
|||||||
NotificationTag string // 指定通知方式的分组
|
NotificationTag string // 指定通知方式的分组
|
||||||
LastExecutedAt time.Time // 最后一次执行时间
|
LastExecutedAt time.Time // 最后一次执行时间
|
||||||
LastResult bool // 最后一次执行结果
|
LastResult bool // 最后一次执行结果
|
||||||
Cover uint8 // 计划任务覆盖范围 (0:仅覆盖特定服务器 1:仅忽略特定服务器)
|
Cover uint8 // 计划任务覆盖范围 (0:仅覆盖特定服务器 1:仅忽略特定服务器 2:由触发该计划任务的服务器执行)
|
||||||
|
|
||||||
CronJobID cron.EntryID `gorm:"-"`
|
CronJobID cron.EntryID `gorm:"-"`
|
||||||
ServersRaw string
|
ServersRaw string
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/jinzhu/copier"
|
"github.com/jinzhu/copier"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
@ -32,6 +33,10 @@ func LoadCronTasks() {
|
|||||||
var notificationTagList []string
|
var notificationTagList []string
|
||||||
notificationMsgMap := make(map[string]*bytes.Buffer)
|
notificationMsgMap := make(map[string]*bytes.Buffer)
|
||||||
for i := 0; i < len(crons); i++ {
|
for i := 0; i < len(crons); i++ {
|
||||||
|
// 触发任务类型无需注册
|
||||||
|
if crons[i].TaskType == model.CronTypeTriggerTask {
|
||||||
|
continue
|
||||||
|
}
|
||||||
// 旧版本计划任务可能不存在通知组 为其添加默认通知组
|
// 旧版本计划任务可能不存在通知组 为其添加默认通知组
|
||||||
if crons[i].NotificationTag == "" {
|
if crons[i].NotificationTag == "" {
|
||||||
crons[i].NotificationTag = "default"
|
crons[i].NotificationTag = "default"
|
||||||
@ -63,12 +68,36 @@ func ManualTrigger(c model.Cron) {
|
|||||||
CronTrigger(c)()
|
CronTrigger(c)()
|
||||||
}
|
}
|
||||||
|
|
||||||
func CronTrigger(cr model.Cron) func() {
|
func CronTrigger(cr model.Cron, triggerServer ...uint64) func() {
|
||||||
crIgnoreMap := make(map[uint64]bool)
|
crIgnoreMap := make(map[uint64]bool)
|
||||||
for j := 0; j < len(cr.Servers); j++ {
|
for j := 0; j < len(cr.Servers); j++ {
|
||||||
crIgnoreMap[cr.Servers[j]] = true
|
crIgnoreMap[cr.Servers[j]] = true
|
||||||
}
|
}
|
||||||
return func() {
|
return func() {
|
||||||
|
if cr.Cover == model.CronCoverSelf {
|
||||||
|
if len(triggerServer) == 0 {
|
||||||
|
log.Println("触发任务未指定触发服务器")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ServerLock.RLock()
|
||||||
|
defer ServerLock.RUnlock()
|
||||||
|
if s, ok := ServerList[triggerServer[0]]; ok {
|
||||||
|
if s.TaskStream != nil {
|
||||||
|
s.TaskStream.Send(&pb.Task{
|
||||||
|
Id: cr.ID,
|
||||||
|
Data: cr.Command,
|
||||||
|
Type: model.TaskTypeCommand,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// 保存当前服务器状态信息
|
||||||
|
curServer := model.Server{}
|
||||||
|
copier.Copy(&curServer, s)
|
||||||
|
SendNotification(cr.NotificationTag, fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false, &curServer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
ServerLock.RLock()
|
ServerLock.RLock()
|
||||||
defer ServerLock.RUnlock()
|
defer ServerLock.RUnlock()
|
||||||
for _, s := range ServerList {
|
for _, s := range ServerList {
|
||||||
|
Loading…
Reference in New Issue
Block a user