From 7228f334a5dd9aecb4d9f69d5af2ba71d0b0736b Mon Sep 17 00:00:00 2001 From: Akkia Date: Tue, 13 Sep 2022 06:14:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=A7=A6=E5=8F=91=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/dashboard/controller/member_api.go | 18 ++++++++++++++- model/cron.go | 6 ++++- service/singleton/crontask.go | 31 +++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/cmd/dashboard/controller/member_api.go b/cmd/dashboard/controller/member_api.go index b02c49e..9e246ad 100644 --- a/cmd/dashboard/controller/member_api.go +++ b/cmd/dashboard/controller/member_api.go @@ -415,6 +415,7 @@ func (ma *memberAPI) addOrEditMonitor(c *gin.Context) { type cronForm struct { ID uint64 + TaskType uint8 // 0:计划任务 1:触发任务 Name string Scheduler string Command string @@ -429,6 +430,7 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) { var cr model.Cron err := c.ShouldBindJSON(&cf) if err == nil { + cr.TaskType = cf.TaskType cr.Name = cf.Name cr.Scheduler = cf.Scheduler cr.Command = cf.Command @@ -439,6 +441,17 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) { cr.Cover = cf.Cover err = utils.Json.Unmarshal([]byte(cf.ServersRaw), &cr.Servers) } + + // 计划任务类型不得使用触发服务器执行方式 + if cr.TaskType == model.CronTypeCronTask && cr.Cover == model.CronCoverSelf { + err = errors.New("计划任务类型不得使用触发服务器执行方式") + c.JSON(http.StatusOK, model.Response{ + Code: http.StatusBadRequest, + Message: fmt.Sprintf("请求错误:%s", err), + }) + return + } + tx := singleton.DB.Begin() if err == nil { // 保证NotificationTag不为空 @@ -452,7 +465,10 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) { } } if err == nil { - cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr)) + // 对于计划任务类型,需要更新CronJob + if cf.TaskType == model.CronTypeCronTask { + cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr)) + } } if err == nil { err = tx.Commit().Error diff --git a/model/cron.go b/model/cron.go index a8e756c..2ae4f44 100644 --- a/model/cron.go +++ b/model/cron.go @@ -11,11 +11,15 @@ import ( const ( CronCoverIgnoreAll = iota CronCoverAll + CronCoverSelf + CronTypeCronTask = 0 + CronTypeTriggerTask = 1 ) type Cron struct { Common Name string + TaskType uint8 `gorm:"default:0"` // 0:计划任务 1:触发任务 Scheduler string //分钟 小时 天 月 星期 Command string Servers []uint64 `gorm:"-"` @@ -23,7 +27,7 @@ type Cron struct { NotificationTag string // 指定通知方式的分组 LastExecutedAt time.Time // 最后一次执行时间 LastResult bool // 最后一次执行结果 - Cover uint8 // 计划任务覆盖范围 (0:仅覆盖特定服务器 1:仅忽略特定服务器) + Cover uint8 // 计划任务覆盖范围 (0:仅覆盖特定服务器 1:仅忽略特定服务器 2:由触发该计划任务的服务器执行) CronJobID cron.EntryID `gorm:"-"` ServersRaw string diff --git a/service/singleton/crontask.go b/service/singleton/crontask.go index 8cd26d7..bfba6b6 100644 --- a/service/singleton/crontask.go +++ b/service/singleton/crontask.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "github.com/jinzhu/copier" + "log" "sync" "github.com/robfig/cron/v3" @@ -32,6 +33,10 @@ func LoadCronTasks() { var notificationTagList []string notificationMsgMap := make(map[string]*bytes.Buffer) for i := 0; i < len(crons); i++ { + // 触发任务类型无需注册 + if crons[i].TaskType == model.CronTypeTriggerTask { + continue + } // 旧版本计划任务可能不存在通知组 为其添加默认通知组 if crons[i].NotificationTag == "" { crons[i].NotificationTag = "default" @@ -63,12 +68,36 @@ func ManualTrigger(c model.Cron) { CronTrigger(c)() } -func CronTrigger(cr model.Cron) func() { +func CronTrigger(cr model.Cron, triggerServer ...uint64) func() { crIgnoreMap := make(map[uint64]bool) for j := 0; j < len(cr.Servers); j++ { crIgnoreMap[cr.Servers[j]] = true } return func() { + if cr.Cover == model.CronCoverSelf { + if len(triggerServer) == 0 { + log.Println("触发任务未指定触发服务器") + return + } + ServerLock.RLock() + defer ServerLock.RUnlock() + if s, ok := ServerList[triggerServer[0]]; ok { + if s.TaskStream != nil { + s.TaskStream.Send(&pb.Task{ + Id: cr.ID, + Data: cr.Command, + Type: model.TaskTypeCommand, + }) + } else { + // 保存当前服务器状态信息 + curServer := model.Server{} + copier.Copy(&curServer, s) + SendNotification(cr.NotificationTag, fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false, &curServer) + } + } + return + } + ServerLock.RLock() defer ServerLock.RUnlock() for _, s := range ServerList {