2022-04-12 01:16:33 -04:00
package singleton
import (
2024-12-08 07:21:35 -05:00
"cmp"
2022-04-12 01:16:33 -04:00
"fmt"
2024-10-26 11:57:47 -04:00
"slices"
2024-11-20 08:36:21 -05:00
"strings"
2022-04-14 04:41:34 -04:00
"sync"
2022-09-14 10:14:47 -04:00
"github.com/jinzhu/copier"
2022-04-14 04:41:34 -04:00
"github.com/robfig/cron/v3"
2024-11-28 06:38:54 -05:00
"github.com/nezhahq/nezha/model"
2024-12-19 12:02:10 -05:00
"github.com/nezhahq/nezha/pkg/utils"
2024-11-28 06:38:54 -05:00
pb "github.com/nezhahq/nezha/proto"
2022-04-12 01:16:33 -04:00
)
var (
Cron * cron . Cron
2024-10-26 11:57:47 -04:00
Crons map [ uint64 ] * model . Cron // [CronID] -> *model.Cron
2022-04-12 01:16:33 -04:00
CronLock sync . RWMutex
2024-10-26 11:57:47 -04:00
CronList [ ] * model . Cron
2022-04-12 01:16:33 -04:00
)
func InitCronTask ( ) {
Cron = cron . New ( cron . WithSeconds ( ) , cron . WithLocation ( Loc ) )
Crons = make ( map [ uint64 ] * model . Cron )
}
2024-07-14 07:41:50 -04:00
// loadCronTasks 加载计划任务
func loadCronTasks ( ) {
2022-04-12 01:16:33 -04:00
InitCronTask ( )
2024-10-26 11:57:47 -04:00
DB . Find ( & CronList )
2022-04-12 01:16:33 -04:00
var err error
2024-10-23 09:55:12 -04:00
var notificationGroupList [ ] uint64
2024-11-20 08:36:21 -05:00
notificationMsgMap := make ( map [ uint64 ] * strings . Builder )
2024-10-31 17:07:04 -04:00
for _ , cron := range CronList {
2022-09-12 18:14:47 -04:00
// 触发任务类型无需注册
2024-10-31 17:07:04 -04:00
if cron . TaskType == model . CronTypeTriggerTask {
Crons [ cron . ID ] = cron
2022-09-12 18:14:47 -04:00
continue
}
2022-04-12 01:16:33 -04:00
// 注册计划任务
2024-10-31 17:07:04 -04:00
cron . CronJobID , err = Cron . AddFunc ( cron . Scheduler , CronTrigger ( cron ) )
2022-04-12 01:16:33 -04:00
if err == nil {
2024-10-31 17:07:04 -04:00
Crons [ cron . ID ] = cron
2022-04-12 01:16:33 -04:00
} else {
2022-04-14 22:55:21 -04:00
// 当前通知组首次出现 将其加入通知组列表并初始化通知组消息缓存
2024-10-31 17:07:04 -04:00
if _ , ok := notificationMsgMap [ cron . NotificationGroupID ] ; ! ok {
notificationGroupList = append ( notificationGroupList , cron . NotificationGroupID )
2024-11-20 08:36:21 -05:00
notificationMsgMap [ cron . NotificationGroupID ] = new ( strings . Builder )
2024-10-31 17:07:04 -04:00
notificationMsgMap [ cron . NotificationGroupID ] . WriteString ( Localizer . T ( "Tasks failed to register: [" ) )
2022-04-12 01:16:33 -04:00
}
2024-10-31 17:07:04 -04:00
notificationMsgMap [ cron . NotificationGroupID ] . WriteString ( fmt . Sprintf ( "%d," , cron . ID ) )
2022-04-12 01:16:33 -04:00
}
}
2022-04-14 22:55:21 -04:00
// 向注册错误的计划任务所在通知组发送通知
2024-10-23 09:55:12 -04:00
for _ , gid := range notificationGroupList {
2024-10-31 17:07:04 -04:00
notificationMsgMap [ gid ] . WriteString ( Localizer . T ( "] These tasks will not execute properly. Fix them in the admin dashboard." ) )
2024-10-23 09:55:12 -04:00
SendNotification ( gid , notificationMsgMap [ gid ] . String ( ) , nil )
2022-04-12 01:16:33 -04:00
}
Cron . Start ( )
}
2024-10-26 11:57:47 -04:00
func OnRefreshOrAddCron ( c * model . Cron ) {
CronLock . Lock ( )
defer CronLock . Unlock ( )
crOld := Crons [ c . ID ]
if crOld != nil && crOld . CronJobID != 0 {
Cron . Remove ( crOld . CronJobID )
}
delete ( Crons , c . ID )
Crons [ c . ID ] = c
}
func UpdateCronList ( ) {
CronLock . RLock ( )
defer CronLock . RUnlock ( )
2024-12-19 12:02:10 -05:00
CronList = utils . MapValuesToSlice ( Crons )
2024-10-26 11:57:47 -04:00
slices . SortFunc ( CronList , func ( a , b * model . Cron ) int {
2024-12-08 07:21:35 -05:00
return cmp . Compare ( a . ID , b . ID )
2024-10-26 11:57:47 -04:00
} )
}
func OnDeleteCron ( id [ ] uint64 ) {
CronLock . Lock ( )
defer CronLock . Unlock ( )
for _ , i := range id {
cr := Crons [ i ]
if cr != nil && cr . CronJobID != 0 {
Cron . Remove ( cr . CronJobID )
}
delete ( Crons , i )
}
}
func ManualTrigger ( c * model . Cron ) {
2022-04-12 01:16:33 -04:00
CronTrigger ( c ) ( )
}
2022-09-13 23:14:23 -04:00
func SendTriggerTasks ( taskIDs [ ] uint64 , triggerServer uint64 ) {
CronLock . RLock ( )
var cronLists [ ] * model . Cron
for _ , taskID := range taskIDs {
if c , ok := Crons [ taskID ] ; ok {
cronLists = append ( cronLists , c )
}
}
CronLock . RUnlock ( )
// 依次调用CronTrigger发送任务
for _ , c := range cronLists {
2024-10-26 11:57:47 -04:00
go CronTrigger ( c , triggerServer ) ( )
2022-09-13 23:14:23 -04:00
}
}
2024-10-26 11:57:47 -04:00
func CronTrigger ( cr * model . Cron , triggerServer ... uint64 ) func ( ) {
2022-04-12 01:16:33 -04:00
crIgnoreMap := make ( map [ uint64 ] bool )
for j := 0 ; j < len ( cr . Servers ) ; j ++ {
crIgnoreMap [ cr . Servers [ j ] ] = true
}
return func ( ) {
2022-09-14 10:14:47 -04:00
if cr . Cover == model . CronCoverAlertTrigger {
2022-09-12 18:14:47 -04:00
if len ( triggerServer ) == 0 {
return
}
ServerLock . RLock ( )
defer ServerLock . RUnlock ( )
if s , ok := ServerList [ triggerServer [ 0 ] ] ; ok {
if s . TaskStream != nil {
s . TaskStream . Send ( & pb . Task {
Id : cr . ID ,
Data : cr . Command ,
Type : model . TaskTypeCommand ,
} )
} else {
// 保存当前服务器状态信息
curServer := model . Server { }
copier . Copy ( & curServer , s )
2024-10-31 17:07:04 -04:00
SendNotification ( cr . NotificationGroupID , Localizer . Tf ( "[Task failed] %s: server %s is offline and cannot execute the task" , cr . Name , s . Name ) , nil , & curServer )
2022-09-12 18:14:47 -04:00
}
}
return
}
2022-04-12 01:16:33 -04:00
ServerLock . RLock ( )
defer ServerLock . RUnlock ( )
for _ , s := range ServerList {
if cr . Cover == model . CronCoverAll && crIgnoreMap [ s . ID ] {
continue
}
if cr . Cover == model . CronCoverIgnoreAll && ! crIgnoreMap [ s . ID ] {
continue
}
if s . TaskStream != nil {
s . TaskStream . Send ( & pb . Task {
Id : cr . ID ,
Data : cr . Command ,
Type : model . TaskTypeCommand ,
} )
} else {
2022-04-18 07:59:42 -04:00
// 保存当前服务器状态信息
curServer := model . Server { }
copier . Copy ( & curServer , s )
2024-10-31 17:07:04 -04:00
SendNotification ( cr . NotificationGroupID , Localizer . Tf ( "[Task failed] %s: server %s is offline and cannot execute the task" , cr . Name , s . Name ) , nil , & curServer )
2022-04-12 01:16:33 -04:00
}
}
}
}