update: 发送消息前对服务器状态进行拷贝 保证消息发送的状态指标与触发该请求的状态指标相同

This commit is contained in:
Akkia 2022-04-18 19:59:42 +08:00
parent 61630a41db
commit 1f08b579a4
No known key found for this signature in database
GPG Key ID: DABE9A4AB2DD7EF3
4 changed files with 24 additions and 26 deletions

View File

@ -3,6 +3,7 @@ package rpc
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/jinzhu/copier"
"time" "time"
"github.com/naiba/nezha/model" "github.com/naiba/nezha/model"
@ -28,11 +29,14 @@ func (s *NezhaHandler) ReportTask(c context.Context, r *pb.TaskResult) (*pb.Rece
if cr != nil { if cr != nil {
singleton.ServerLock.RLock() singleton.ServerLock.RLock()
defer singleton.ServerLock.RUnlock() defer singleton.ServerLock.RUnlock()
// 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, singleton.ServerList[clientID])
if cr.PushSuccessful && r.GetSuccessful() { if cr.PushSuccessful && r.GetSuccessful() {
singleton.SendNotification(cr.NotificationTag, fmt.Sprintf("$%d$[任务成功] %s ,服务器:%s日志\n%s", singleton.ServerList[clientID].ID, cr.Name, singleton.ServerList[clientID].Name, r.GetData()), false) singleton.SendNotification(cr.NotificationTag, fmt.Sprintf("[任务成功] %s ,服务器:%s日志\n%s", cr.Name, singleton.ServerList[clientID].Name, r.GetData()), false, &curServer)
} }
if !r.GetSuccessful() { if !r.GetSuccessful() {
singleton.SendNotification(cr.NotificationTag, fmt.Sprintf("$%d$[任务失败] %s ,服务器:%s日志\n%s", singleton.ServerList[clientID].ID, cr.Name, singleton.ServerList[clientID].Name, r.GetData()), false) singleton.SendNotification(cr.NotificationTag, fmt.Sprintf("[任务失败] %s ,服务器:%s日志\n%s", cr.Name, singleton.ServerList[clientID].Name, r.GetData()), false, &curServer)
} }
singleton.DB.Model(cr).Updates(model.Cron{ singleton.DB.Model(cr).Updates(model.Cron{
LastExecutedAt: time.Now().Add(time.Second * -1 * time.Duration(r.GetDelay())), LastExecutedAt: time.Now().Add(time.Second * -1 * time.Duration(r.GetDelay())),

View File

@ -2,6 +2,7 @@ package singleton
import ( import (
"fmt" "fmt"
"github.com/jinzhu/copier"
"log" "log"
"sync" "sync"
"time" "time"
@ -147,14 +148,17 @@ func checkStatus() {
ID][server.ID], alert.Snapshot(AlertsCycleTransferStatsStore[alert.ID], server, DB)) ID][server.ID], alert.Snapshot(AlertsCycleTransferStatsStore[alert.ID], server, DB))
// 发送通知,分为触发报警和恢复通知 // 发送通知,分为触发报警和恢复通知
max, passed := alert.Check(alertsStore[alert.ID][server.ID]) max, passed := alert.Check(alertsStore[alert.ID][server.ID])
// 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, server)
if !passed { if !passed {
alertsPrevState[alert.ID][server.ID] = _RuleCheckFail alertsPrevState[alert.ID][server.ID] = _RuleCheckFail
message := fmt.Sprintf("$%d$[主机故障] %s(%s) 规则:%s", server.ID, server.Name, IPDesensitize(server.Host.IP), alert.Name) message := fmt.Sprintf("[主机故障] %s(%s) 规则:%s", server.Name, IPDesensitize(server.Host.IP), alert.Name)
go SendNotification(alert.NotificationTag, message, true) go SendNotification(alert.NotificationTag, message, true, &curServer)
} else { } else {
if alertsPrevState[alert.ID][server.ID] == _RuleCheckFail { if alertsPrevState[alert.ID][server.ID] == _RuleCheckFail {
message := fmt.Sprintf("$%d$[主机恢复] %s(%s) 规则:%s", server.ID, server.Name, IPDesensitize(server.Host.IP), alert.Name) message := fmt.Sprintf("[主机恢复] %s(%s) 规则:%s", server.Name, IPDesensitize(server.Host.IP), alert.Name)
go SendNotification(alert.NotificationTag, message, true) go SendNotification(alert.NotificationTag, message, true, &curServer)
} }
alertsPrevState[alert.ID][server.ID] = _RuleCheckPass alertsPrevState[alert.ID][server.ID] = _RuleCheckPass
} }

View File

@ -3,6 +3,7 @@ package singleton
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/jinzhu/copier"
"sync" "sync"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
@ -84,7 +85,10 @@ func CronTrigger(cr model.Cron) func() {
Type: model.TaskTypeCommand, Type: model.TaskTypeCommand,
}) })
} else { } else {
SendNotification(cr.NotificationTag, fmt.Sprintf("$%d$[任务失败] %s服务器 %s 离线,无法执行。", s.ID, cr.Name, s.Name), false) // 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, s)
SendNotification(cr.NotificationTag, fmt.Sprintf("[任务失败] %s服务器 %s 离线,无法执行。", cr.Name, s.Name), false, &curServer)
} }
} }
} }

View File

@ -4,9 +4,6 @@ import (
"crypto/md5" // #nosec "crypto/md5" // #nosec
"encoding/hex" "encoding/hex"
"log" "log"
"regexp"
"strconv"
"strings"
"sync" "sync"
"time" "time"
@ -105,7 +102,7 @@ func OnDeleteNotification(id uint64) {
} }
// SendNotification 向指定的通知方式组的所有通知方式发送通知 // SendNotification 向指定的通知方式组的所有通知方式发送通知
func SendNotification(notificationTag string, desc string, mutable bool) { func SendNotification(notificationTag string, desc string, mutable bool, ext ...*model.Server) {
if mutable { if mutable {
// 通知防骚扰策略 // 通知防骚扰策略
nID := hex.EncodeToString(md5.New().Sum([]byte(desc))) // #nosec nID := hex.EncodeToString(md5.New().Sum([]byte(desc))) // #nosec
@ -146,10 +143,12 @@ func SendNotification(notificationTag string, desc string, mutable bool) {
log.Println("尝试通知", n.Name) log.Println("尝试通知", n.Name)
} }
for _, n := range NotificationList[notificationTag] { for _, n := range NotificationList[notificationTag] {
desc, server := findServerInMsg(desc)
ns := model.NotificationServerBundle{ ns := model.NotificationServerBundle{
Notification: n, Notification: n,
Server: server, Server: nil,
}
if len(ext) > 0 {
ns.Server = ext[0]
} }
if err := ns.Send(desc); err != nil { if err := ns.Send(desc); err != nil {
log.Println("NEZHA>> 向 ", n.Name, " 发送通知失败:", err) log.Println("NEZHA>> 向 ", n.Name, " 发送通知失败:", err)
@ -158,16 +157,3 @@ func SendNotification(notificationTag string, desc string, mutable bool) {
} }
} }
} }
// findServerInMsg 通过msg字符串中的$ServerID$ 返回修改后的字符串与Server对象
func findServerInMsg(msg string) (string, *model.Server) {
reg1 := regexp.MustCompile(`^\$\d+`)
reg2 := regexp.MustCompile(`[^$]+`)
ServerIDStr := reg2.FindString(reg1.FindString(msg))
ServerID, _ := strconv.ParseUint(ServerIDStr, 10, 64)
// 将原字符串的ServerID标识去除
if ServerIDStr != "" {
msg = strings.Replace(msg, "$"+ServerIDStr+"$", "", 1)
}
return msg, ServerList[ServerID]
}