diff --git a/README.md b/README.md index 693daf8..22258bf 100644 --- a/README.md +++ b/README.md @@ -43,9 +43,44 @@ } ``` -### 通知 +### 报警通知 -正在开发,进度 0% +#### 通知到 server酱 示例 + +1. 添加通知方式 + + - 备注:server酱 + + - URL:https://sc.ftqq.com/SCUrandomkeys.send + + - 请求方式: GET + + - 请求类型: JSON/FORM 都可以,其他接入其他API时要选择其使用的类型 + + - Body: `{"text": "#NEZHA#"}` + Body 参数必须是`JSON`,格式是 `key:value` 的形式,`#NEZHA#` 是面板消息占位符,面板触发通知时会自动替换占位符到实际消息 + + 请求方式为 GET 时面板会将 `Body` 里面的参数拼接到 URL 的 query 里面 + +2. 添加一个离线报警 + + - 备注:离线通知 + - 规则:`[{"Type":"offline","Min":0,"Max":0,"Duration":10}]` + - 启用:√ + +3. 添加一个监控 CPU 持续 10s 超过 50% **且** 内存持续 20s 占用低于 20% 的报警 + + - 备注:CPU+内存 + - 规则:`[{"Type":"cpu","Min":0,"Max":50,"Duration":10},{"Type":"memory","Min":20,"Max":0,"Duration":20}]` + - 启用:√ + +#### 报警规则说明 + +- Type + - cpu、memory、swap、disk:Min/Max 数值为占用百分比 + - net_in_speed(入站网速)、net_out_speed(出站网速)、net_all_speed(双向网速)、transfer_in(入站流量)、transfer_out(出站流量)、transfer_all(双向流量):Min/Max 数值为字节(1kb=1024,1mb = 1024*1024) + - offline:不支持 Min/Max 参数 +- Duration:持续秒数,监控比较简陋,取持续时间内的 70 采样结果 ## 常见问题 diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 4f20562..a7ebfba 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -173,7 +173,7 @@ func reportState() { for { if client != nil { monitor.TrackNetworkSpeed() - _, err = client.ReportState(ctx, monitor.GetState(2).PB()) + _, err = client.ReportState(ctx, monitor.GetState(dao.ReportDelay).PB()) if err != nil { log.Printf("reportState error %v", err) time.Sleep(delayWhenError) diff --git a/cmd/dashboard/controller/member_api.go b/cmd/dashboard/controller/member_api.go index 90f35cb..efbe056 100644 --- a/cmd/dashboard/controller/member_api.go +++ b/cmd/dashboard/controller/member_api.go @@ -2,6 +2,7 @@ package controller import ( "encoding/json" + "errors" "fmt" "net/http" "strconv" @@ -12,6 +13,7 @@ import ( "github.com/naiba/nezha/model" "github.com/naiba/nezha/pkg/mygin" + "github.com/naiba/nezha/service/alertmanager" "github.com/naiba/nezha/service/dao" ) @@ -58,8 +60,14 @@ func (ma *memberAPI) delete(c *gin.Context) { } case "notification": err = dao.DB.Delete(&model.Notification{}, "id = ?", id).Error + if err == nil { + alertmanager.OnDeleteNotification(id) + } case "alert-rule": err = dao.DB.Delete(&model.AlertRule{}, "id = ?", id).Error + if err == nil { + alertmanager.OnDeleteAlert(id) + } } if err != nil { c.JSON(http.StatusOK, model.Response{ @@ -105,6 +113,8 @@ func (ma *memberAPI) addOrEditServer(c *gin.Context) { }) return } + s.Host = &model.Host{} + s.State = &model.State{} dao.ServerList[fmt.Sprintf("%d", s.ID)] = &s c.JSON(http.StatusOK, model.Response{ Code: http.StatusOK, @@ -138,6 +148,9 @@ func (ma *memberAPI) addOrEditNotification(c *gin.Context) { verifySSL := nf.VerifySSL == "on" n.VerifySSL = &verifySSL n.ID = nf.ID + err = n.Send("这是测试消息") + } + if err == nil { if n.ID == 0 { err = dao.DB.Create(&n).Error } else { @@ -151,6 +164,7 @@ func (ma *memberAPI) addOrEditNotification(c *gin.Context) { }) return } + alertmanager.OnRefreshOrAddNotification(n) c.JSON(http.StatusOK, model.Response{ Code: http.StatusOK, }) @@ -169,12 +183,17 @@ func (ma *memberAPI) addOrEditAlertRule(c *gin.Context) { err := c.ShouldBindJSON(&arf) if err == nil { err = json.Unmarshal([]byte(arf.RulesRaw), &r.Rules) - if err == nil && len(r.Rules) == 0 { - c.JSON(http.StatusOK, model.Response{ - Code: http.StatusBadRequest, - Message: fmt.Sprintf("请求错误:%s", "至少定义一条规则"), - }) - return + } + if err == nil { + if len(r.Rules) == 0 { + err = errors.New("至少定义一条规则") + } else { + for i := 0; i < len(r.Rules); i++ { + if r.Rules[i].Duration < 3 { + err = errors.New("Duration 至少为 3") + break + } + } } } if err == nil { @@ -196,6 +215,7 @@ func (ma *memberAPI) addOrEditAlertRule(c *gin.Context) { }) return } + alertmanager.OnRefreshOrAddAlert(r) c.JSON(http.StatusOK, model.Response{ Code: http.StatusOK, }) diff --git a/cmd/dashboard/main.go b/cmd/dashboard/main.go index 412fe6a..f71e767 100644 --- a/cmd/dashboard/main.go +++ b/cmd/dashboard/main.go @@ -11,6 +11,7 @@ import ( "github.com/naiba/nezha/cmd/dashboard/controller" "github.com/naiba/nezha/cmd/dashboard/rpc" "github.com/naiba/nezha/model" + "github.com/naiba/nezha/service/alertmanager" "github.com/naiba/nezha/service/dao" ) @@ -40,6 +41,8 @@ func initDB() { dao.DB.Find(&servers) for _, s := range servers { innerS := s + innerS.Host = &model.Host{} + innerS.State = &model.State{} dao.ServerList[fmt.Sprintf("%d", innerS.ID)] = &innerS } } @@ -47,5 +50,6 @@ func initDB() { func main() { go controller.ServeWeb(dao.Conf.HTTPPort) go rpc.ServeRPC(5555) + go alertmanager.Start() select {} } diff --git a/model/alertrule.go b/model/alertrule.go index 6a6e406..cc3e65c 100644 --- a/model/alertrule.go +++ b/model/alertrule.go @@ -1,18 +1,64 @@ package model import ( + "bytes" "encoding/json" + "fmt" + "time" "gorm.io/gorm" ) +const ( + RuleCheckPass = 1 + RuleCheckFail = 0 +) + type Rule struct { - Type string // 指标类型,cpu、memory、swap、disk、net_in、net_out、net_all、transfer_in、transfer_out、transfer_all、offline + // 指标类型,cpu、memory、swap、disk、net_in_speed、net_out_speed + // net_all_speed、transfer_in、transfer_out、transfer_all、offline + Type string Min uint64 // 最小阈值 (百分比、字节 kb ÷ 1024) Max uint64 // 最大阈值 (百分比、字节 kb ÷ 1024) Duration uint64 // 持续时间 (秒) } +func (u *Rule) Snapshot(server *Server) interface{} { + var src uint64 + switch u.Type { + case "cpu": + src = uint64(server.State.CPU) + case "memory": + src = uint64(server.State.MemUsed / server.Host.MemTotal * 100) + case "swap": + src = uint64(server.State.SwapUsed / server.Host.SwapTotal * 100) + case "disk": + src = uint64(server.State.DiskUsed / server.Host.DiskTotal * 100) + case "net_in_speed": + src = server.State.NetInSpeed + case "net_out_speed": + src = server.State.NetOutSpeed + case "net_all_speed": + src = server.State.NetOutSpeed + server.State.NetOutSpeed + case "transfer_in": + src = server.State.NetInTransfer + case "transfer_out": + src = server.State.NetOutTransfer + case "transfer_all": + src = server.State.NetOutTransfer + server.State.NetInTransfer + case "offline": + src = uint64(server.LastActive.Unix()) + } + if u.Type == "offline" { + if uint64(time.Now().Unix())-src > 6 { + return struct{}{} + } + } else if (u.Max > 0 && src > u.Max) || (u.Min > 0 && src < u.Min) { + return struct{}{} + } + return nil +} + type AlertRule struct { Common Name string @@ -31,5 +77,39 @@ func (r *AlertRule) BeforeSave(tx *gorm.DB) error { } func (r *AlertRule) AfterFind(tx *gorm.DB) error { - return json.Unmarshal([]byte(r.RulesRaw), r.Rules) + return json.Unmarshal([]byte(r.RulesRaw), &r.Rules) +} + +func (r *AlertRule) Snapshot(server *Server) []interface{} { + var point []interface{} + for i := 0; i < len(r.Rules); i++ { + point = append(point, r.Rules[i].Snapshot(server)) + } + return point +} + +func (r *AlertRule) Check(points [][]interface{}) (int, string) { + var dist bytes.Buffer + var max int + for i := 0; i < len(r.Rules); i++ { + total := 0.0 + fail := 0.0 + num := int(r.Rules[i].Duration / 2) // SnapshotDelay + if num > max { + max = num + } + if len(points) < num { + continue + } + for j := len(points) - 1; j >= 0; j-- { + total++ + if points[j][i] != nil { + fail++ + } + } + if fail/total > 0.3 { + dist.WriteString(fmt.Sprintf("%+v\n", r.Rules[i])) + } + } + return max, dist.String() } diff --git a/model/notification.go b/model/notification.go index 2f12fcc..b77157e 100644 --- a/model/notification.go +++ b/model/notification.go @@ -22,12 +22,6 @@ const ( NotificationRequestMethodPOST ) -type NotificatonSender struct { - Rule *Rule - Server *Server - State *State -} - type Notification struct { Common Name string @@ -38,7 +32,7 @@ type Notification struct { VerifySSL *bool } -func (n *Notification) Send(sender *NotificatonSender, message string) { +func (n *Notification) Send(message string) error { var verifySSL bool if n.VerifySSL != nil && *n.VerifySSL { @@ -57,36 +51,44 @@ func (n *Notification) Send(sender *NotificatonSender, message string) { err = json.Unmarshal([]byte(n.RequestBody), &data) } + var resp *http.Response + if err == nil { if n.RequestMethod == NotificationRequestMethodGET { + var queryValue = reqURL.Query() for k, v := range data { - reqURL.Query().Set(k, replaceParamsInString(v, sender)) + queryValue.Set(k, replaceParamsInString(v, message)) } - client.Get(reqURL.String()) + reqURL.RawQuery = queryValue.Encode() + resp, err = client.Get(reqURL.String()) } else { if n.RequestType == NotificationRequestTypeForm { params := url.Values{} for k, v := range data { - params.Add(k, replaceParamsInString(v, sender)) + params.Add(k, replaceParamsInString(v, message)) } - client.PostForm(reqURL.String(), params) + resp, err = client.PostForm(reqURL.String(), params) } else { - jsonValue := replaceParamsInJSON(n.RequestBody, sender) - if err == nil { - client.Post(reqURL.String(), "application/json", strings.NewReader(jsonValue)) - } + jsonValue := replaceParamsInJSON(n.RequestBody, message) + resp, err = client.Post(reqURL.String(), "application/json", strings.NewReader(jsonValue)) } } } + + if err == nil && (resp.StatusCode < 200 || resp.StatusCode > 299) { + err = fmt.Errorf("%d %s", resp.StatusCode, resp.Status) + } + + return err } -func replaceParamsInString(str string, sender *NotificatonSender) string { - str = strings.ReplaceAll(str, "#CPU#", fmt.Sprintf("%2f%%", sender.State.CPU)) +func replaceParamsInString(str string, message string) string { + str = strings.ReplaceAll(str, "#NEZHA#", message) return str } -func replaceParamsInJSON(str string, sender *NotificatonSender) string { - str = strings.ReplaceAll(str, "#CPU#", fmt.Sprintf("%2f%%", sender.State.CPU)) +func replaceParamsInJSON(str string, message string) string { + str = strings.ReplaceAll(str, "#NEZHA#", message) return str } diff --git a/service/alertmanager/alertmanager.go b/service/alertmanager/alertmanager.go new file mode 100644 index 0000000..2a8299b --- /dev/null +++ b/service/alertmanager/alertmanager.go @@ -0,0 +1,154 @@ +package alertmanager + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "log" + "sync" + "time" + + "github.com/naiba/nezha/model" + "github.com/naiba/nezha/service/dao" +) + +const firstNotificationDelay = time.Minute * 15 + +// 通知方式 +var notifications []model.Notification +var notificationsLock sync.RWMutex + +// 报警规则 +var alertsLock sync.RWMutex +var alerts []model.AlertRule +var alertsStore map[uint64]map[uint64][][]interface{} + +type NotificationHistory struct { + Duration time.Duration + Until time.Time +} + +func Start() { + alertsStore = make(map[uint64]map[uint64][][]interface{}) + alertsLock.Lock() + if err := dao.DB.Find(&alerts).Error; err != nil { + panic(err) + } + if err := dao.DB.Find(¬ifications).Error; err != nil { + panic(err) + } + alertsLock.Unlock() + for i := 0; i < len(alerts); i++ { + alertsStore[alerts[i].ID] = make(map[uint64][][]interface{}) + } + + time.Sleep(time.Second * 10) + go checkStatus() +} + +func OnRefreshOrAddAlert(alert model.AlertRule) { + alertsLock.Lock() + defer alertsLock.Unlock() + delete(alertsStore, alert.ID) + for i := 0; i < len(alerts); i++ { + if alerts[i].ID == alert.ID { + alerts[i] = alert + } + } + alertsStore[alert.ID] = make(map[uint64][][]interface{}) +} + +func OnDeleteAlert(id uint64) { + alertsLock.Lock() + defer alertsLock.Unlock() + delete(alertsStore, id) + for i := 0; i < len(alerts); i++ { + if alerts[i].ID == id { + alerts = append(alerts[:i], alerts[i+1:]...) + } + } +} + +func OnRefreshOrAddNotification(n model.Notification) { + notificationsLock.Lock() + defer notificationsLock.Unlock() + for i := 0; i < len(notifications); i++ { + if notifications[i].ID == n.ID { + notifications[i] = n + } + } +} + +func OnDeleteNotification(id uint64) { + notificationsLock.Lock() + defer notificationsLock.Unlock() + for i := 0; i < len(notifications); i++ { + if notifications[i].ID == id { + notifications = append(notifications[:i], notifications[i+1:]...) + } + } +} + +func checkStatus() { + startedAt := time.Now() + defer func() { + time.Sleep(time.Until(startedAt.Add(time.Second * dao.SnapshotDelay))) + checkStatus() + }() + + alertsLock.RLock() + defer alertsLock.RUnlock() + dao.ServerLock.RLock() + defer dao.ServerLock.RUnlock() + + for j := 0; j < len(alerts); j++ { + for _, server := range dao.ServerList { + // 监测点 + alertsStore[alerts[j].ID][server.ID] = append(alertsStore[alerts[j]. + ID][server.ID], alerts[j].Snapshot(server)) + // 发送通知 + max, desc := alerts[j].Check(alertsStore[alerts[j].ID][server.ID]) + if desc != "" { + nID := getNotificationHash(server, desc) + var flag bool + if cacheN, has := dao.Cache.Get(nID); has { + nHistory := cacheN.(NotificationHistory) + // 超过一天或者超过上次提醒阈值 + if time.Now().After(nHistory.Until) || nHistory.Duration >= time.Hour*24 { + flag = true + nHistory.Duration *= 2 + nHistory.Until = time.Now().Add(nHistory.Duration) + } + } else { + // 新提醒 + flag = true + dao.Cache.Set(nID, NotificationHistory{ + Duration: firstNotificationDelay, + Until: time.Now().Add(firstNotificationDelay), + }, firstNotificationDelay) + } + if flag { + message := fmt.Sprintf("逮到咯,快去看看!服务器:%s(%s),报警规则:%s,%s", server.Name, server.Host.IP, alerts[j].Name, desc) + log.Printf("通知:%s\n", message) + go sendNotification(message) + } + } + // 清理旧数据 + if max > 0 && max <= len(alertsStore[alerts[j].ID][server.ID]) { + alertsStore[alerts[j].ID][server.ID] = alertsStore[alerts[j].ID][server.ID][max:] + } + } + } +} + +func sendNotification(desc string) { + notificationsLock.RLock() + defer notificationsLock.RUnlock() + for i := 0; i < len(notifications); i++ { + notifications[i].Send(desc) + } +} + +func getNotificationHash(server *model.Server, desc string) string { + return hex.EncodeToString(md5.New().Sum([]byte(fmt.Sprintf("%d::%s", server.ID, desc)))) +} diff --git a/service/dao/dao.go b/service/dao/dao.go index 27a1663..8bcebd8 100644 --- a/service/dao/dao.go +++ b/service/dao/dao.go @@ -10,6 +10,11 @@ import ( pb "github.com/naiba/nezha/proto" ) +const ( + SnapshotDelay = 3 + ReportDelay = 2 +) + // Conf .. var Conf *model.Config