🍻 报警通知上线

This commit is contained in:
naiba 2020-12-20 12:18:27 +08:00
parent ad4fedaa9b
commit 2ae832f509
8 changed files with 330 additions and 30 deletions

View File

@ -43,9 +43,44 @@
}
```
### 通知
### 报警通知
正在开发,进度 0%
#### 通知到 server酱 示例
1. 添加通知方式
- 备注server酱
- URLhttps://sc.ftqq.com/SCUrandomkeys.send
- 请求方式: GET
- 请求类型: JSON/FORM 都可以其他接入其他API时要选择其使用的类型
- Body: `{"text": "#NEZHA#"}`
Body 参数必须是`JSON`,格式是 `key:value` 的形式,`#NEZHA#` 是面板消息占位符,面板触发通知时会自动替换占位符到实际消息
请求方式为 GET 时面板会将 `Body` 里面的参数拼接到 URL 的 query 里面
2. 添加一个离线报警
- 备注:离线通知
- 规则:`[{"Type":"offline","Min":0,"Max":0,"Duration":10}]`
- 启用:√
3. 添加一个监控 CPU 持续 10s 超过 50% **且** 内存持续 20s 占用低于 20% 的报警
- 备注CPU+内存
- 规则:`[{"Type":"cpu","Min":0,"Max":50,"Duration":10},{"Type":"memory","Min":20,"Max":0,"Duration":20}]`
- 启用:√
#### 报警规则说明
- Type
- cpu、memory、swap、diskMin/Max 数值为占用百分比
- net_in_speed(入站网速)、net_out_speed(出站网速)、net_all_speed(双向网速)、transfer_in(入站流量)、transfer_out(出站流量)、transfer_all(双向流量)Min/Max 数值为字节1kb=10241mb = 1024*1024
- offline不支持 Min/Max 参数
- Duration持续秒数监控比较简陋取持续时间内的 70 采样结果
## 常见问题

View File

@ -173,7 +173,7 @@ func reportState() {
for {
if client != nil {
monitor.TrackNetworkSpeed()
_, err = client.ReportState(ctx, monitor.GetState(2).PB())
_, err = client.ReportState(ctx, monitor.GetState(dao.ReportDelay).PB())
if err != nil {
log.Printf("reportState error %v", err)
time.Sleep(delayWhenError)

View File

@ -2,6 +2,7 @@ package controller
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
@ -12,6 +13,7 @@ import (
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/pkg/mygin"
"github.com/naiba/nezha/service/alertmanager"
"github.com/naiba/nezha/service/dao"
)
@ -58,8 +60,14 @@ func (ma *memberAPI) delete(c *gin.Context) {
}
case "notification":
err = dao.DB.Delete(&model.Notification{}, "id = ?", id).Error
if err == nil {
alertmanager.OnDeleteNotification(id)
}
case "alert-rule":
err = dao.DB.Delete(&model.AlertRule{}, "id = ?", id).Error
if err == nil {
alertmanager.OnDeleteAlert(id)
}
}
if err != nil {
c.JSON(http.StatusOK, model.Response{
@ -105,6 +113,8 @@ func (ma *memberAPI) addOrEditServer(c *gin.Context) {
})
return
}
s.Host = &model.Host{}
s.State = &model.State{}
dao.ServerList[fmt.Sprintf("%d", s.ID)] = &s
c.JSON(http.StatusOK, model.Response{
Code: http.StatusOK,
@ -138,6 +148,9 @@ func (ma *memberAPI) addOrEditNotification(c *gin.Context) {
verifySSL := nf.VerifySSL == "on"
n.VerifySSL = &verifySSL
n.ID = nf.ID
err = n.Send("这是测试消息")
}
if err == nil {
if n.ID == 0 {
err = dao.DB.Create(&n).Error
} else {
@ -151,6 +164,7 @@ func (ma *memberAPI) addOrEditNotification(c *gin.Context) {
})
return
}
alertmanager.OnRefreshOrAddNotification(n)
c.JSON(http.StatusOK, model.Response{
Code: http.StatusOK,
})
@ -169,12 +183,17 @@ func (ma *memberAPI) addOrEditAlertRule(c *gin.Context) {
err := c.ShouldBindJSON(&arf)
if err == nil {
err = json.Unmarshal([]byte(arf.RulesRaw), &r.Rules)
if err == nil && len(r.Rules) == 0 {
c.JSON(http.StatusOK, model.Response{
Code: http.StatusBadRequest,
Message: fmt.Sprintf("请求错误:%s", "至少定义一条规则"),
})
return
}
if err == nil {
if len(r.Rules) == 0 {
err = errors.New("至少定义一条规则")
} else {
for i := 0; i < len(r.Rules); i++ {
if r.Rules[i].Duration < 3 {
err = errors.New("Duration 至少为 3")
break
}
}
}
}
if err == nil {
@ -196,6 +215,7 @@ func (ma *memberAPI) addOrEditAlertRule(c *gin.Context) {
})
return
}
alertmanager.OnRefreshOrAddAlert(r)
c.JSON(http.StatusOK, model.Response{
Code: http.StatusOK,
})

View File

@ -11,6 +11,7 @@ import (
"github.com/naiba/nezha/cmd/dashboard/controller"
"github.com/naiba/nezha/cmd/dashboard/rpc"
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/service/alertmanager"
"github.com/naiba/nezha/service/dao"
)
@ -40,6 +41,8 @@ func initDB() {
dao.DB.Find(&servers)
for _, s := range servers {
innerS := s
innerS.Host = &model.Host{}
innerS.State = &model.State{}
dao.ServerList[fmt.Sprintf("%d", innerS.ID)] = &innerS
}
}
@ -47,5 +50,6 @@ func initDB() {
func main() {
go controller.ServeWeb(dao.Conf.HTTPPort)
go rpc.ServeRPC(5555)
go alertmanager.Start()
select {}
}

View File

@ -1,18 +1,64 @@
package model
import (
"bytes"
"encoding/json"
"fmt"
"time"
"gorm.io/gorm"
)
const (
RuleCheckPass = 1
RuleCheckFail = 0
)
type Rule struct {
Type string // 指标类型cpu、memory、swap、disk、net_in、net_out、net_all、transfer_in、transfer_out、transfer_all、offline
// 指标类型cpu、memory、swap、disk、net_in_speed、net_out_speed
// net_all_speed、transfer_in、transfer_out、transfer_all、offline
Type string
Min uint64 // 最小阈值 (百分比、字节 kb ÷ 1024)
Max uint64 // 最大阈值 (百分比、字节 kb ÷ 1024)
Duration uint64 // 持续时间 (秒)
}
func (u *Rule) Snapshot(server *Server) interface{} {
var src uint64
switch u.Type {
case "cpu":
src = uint64(server.State.CPU)
case "memory":
src = uint64(server.State.MemUsed / server.Host.MemTotal * 100)
case "swap":
src = uint64(server.State.SwapUsed / server.Host.SwapTotal * 100)
case "disk":
src = uint64(server.State.DiskUsed / server.Host.DiskTotal * 100)
case "net_in_speed":
src = server.State.NetInSpeed
case "net_out_speed":
src = server.State.NetOutSpeed
case "net_all_speed":
src = server.State.NetOutSpeed + server.State.NetOutSpeed
case "transfer_in":
src = server.State.NetInTransfer
case "transfer_out":
src = server.State.NetOutTransfer
case "transfer_all":
src = server.State.NetOutTransfer + server.State.NetInTransfer
case "offline":
src = uint64(server.LastActive.Unix())
}
if u.Type == "offline" {
if uint64(time.Now().Unix())-src > 6 {
return struct{}{}
}
} else if (u.Max > 0 && src > u.Max) || (u.Min > 0 && src < u.Min) {
return struct{}{}
}
return nil
}
type AlertRule struct {
Common
Name string
@ -31,5 +77,39 @@ func (r *AlertRule) BeforeSave(tx *gorm.DB) error {
}
func (r *AlertRule) AfterFind(tx *gorm.DB) error {
return json.Unmarshal([]byte(r.RulesRaw), r.Rules)
return json.Unmarshal([]byte(r.RulesRaw), &r.Rules)
}
func (r *AlertRule) Snapshot(server *Server) []interface{} {
var point []interface{}
for i := 0; i < len(r.Rules); i++ {
point = append(point, r.Rules[i].Snapshot(server))
}
return point
}
func (r *AlertRule) Check(points [][]interface{}) (int, string) {
var dist bytes.Buffer
var max int
for i := 0; i < len(r.Rules); i++ {
total := 0.0
fail := 0.0
num := int(r.Rules[i].Duration / 2) // SnapshotDelay
if num > max {
max = num
}
if len(points) < num {
continue
}
for j := len(points) - 1; j >= 0; j-- {
total++
if points[j][i] != nil {
fail++
}
}
if fail/total > 0.3 {
dist.WriteString(fmt.Sprintf("%+v\n", r.Rules[i]))
}
}
return max, dist.String()
}

View File

@ -22,12 +22,6 @@ const (
NotificationRequestMethodPOST
)
type NotificatonSender struct {
Rule *Rule
Server *Server
State *State
}
type Notification struct {
Common
Name string
@ -38,7 +32,7 @@ type Notification struct {
VerifySSL *bool
}
func (n *Notification) Send(sender *NotificatonSender, message string) {
func (n *Notification) Send(message string) error {
var verifySSL bool
if n.VerifySSL != nil && *n.VerifySSL {
@ -57,36 +51,44 @@ func (n *Notification) Send(sender *NotificatonSender, message string) {
err = json.Unmarshal([]byte(n.RequestBody), &data)
}
var resp *http.Response
if err == nil {
if n.RequestMethod == NotificationRequestMethodGET {
var queryValue = reqURL.Query()
for k, v := range data {
reqURL.Query().Set(k, replaceParamsInString(v, sender))
queryValue.Set(k, replaceParamsInString(v, message))
}
client.Get(reqURL.String())
reqURL.RawQuery = queryValue.Encode()
resp, err = client.Get(reqURL.String())
} else {
if n.RequestType == NotificationRequestTypeForm {
params := url.Values{}
for k, v := range data {
params.Add(k, replaceParamsInString(v, sender))
params.Add(k, replaceParamsInString(v, message))
}
client.PostForm(reqURL.String(), params)
resp, err = client.PostForm(reqURL.String(), params)
} else {
jsonValue := replaceParamsInJSON(n.RequestBody, sender)
if err == nil {
client.Post(reqURL.String(), "application/json", strings.NewReader(jsonValue))
}
jsonValue := replaceParamsInJSON(n.RequestBody, message)
resp, err = client.Post(reqURL.String(), "application/json", strings.NewReader(jsonValue))
}
}
}
if err == nil && (resp.StatusCode < 200 || resp.StatusCode > 299) {
err = fmt.Errorf("%d %s", resp.StatusCode, resp.Status)
}
return err
}
func replaceParamsInString(str string, sender *NotificatonSender) string {
str = strings.ReplaceAll(str, "#CPU#", fmt.Sprintf("%2f%%", sender.State.CPU))
func replaceParamsInString(str string, message string) string {
str = strings.ReplaceAll(str, "#NEZHA#", message)
return str
}
func replaceParamsInJSON(str string, sender *NotificatonSender) string {
str = strings.ReplaceAll(str, "#CPU#", fmt.Sprintf("%2f%%", sender.State.CPU))
func replaceParamsInJSON(str string, message string) string {
str = strings.ReplaceAll(str, "#NEZHA#", message)
return str
}

View File

@ -0,0 +1,154 @@
package alertmanager
import (
"crypto/md5"
"encoding/hex"
"fmt"
"log"
"sync"
"time"
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/service/dao"
)
const firstNotificationDelay = time.Minute * 15
// 通知方式
var notifications []model.Notification
var notificationsLock sync.RWMutex
// 报警规则
var alertsLock sync.RWMutex
var alerts []model.AlertRule
var alertsStore map[uint64]map[uint64][][]interface{}
type NotificationHistory struct {
Duration time.Duration
Until time.Time
}
func Start() {
alertsStore = make(map[uint64]map[uint64][][]interface{})
alertsLock.Lock()
if err := dao.DB.Find(&alerts).Error; err != nil {
panic(err)
}
if err := dao.DB.Find(&notifications).Error; err != nil {
panic(err)
}
alertsLock.Unlock()
for i := 0; i < len(alerts); i++ {
alertsStore[alerts[i].ID] = make(map[uint64][][]interface{})
}
time.Sleep(time.Second * 10)
go checkStatus()
}
func OnRefreshOrAddAlert(alert model.AlertRule) {
alertsLock.Lock()
defer alertsLock.Unlock()
delete(alertsStore, alert.ID)
for i := 0; i < len(alerts); i++ {
if alerts[i].ID == alert.ID {
alerts[i] = alert
}
}
alertsStore[alert.ID] = make(map[uint64][][]interface{})
}
func OnDeleteAlert(id uint64) {
alertsLock.Lock()
defer alertsLock.Unlock()
delete(alertsStore, id)
for i := 0; i < len(alerts); i++ {
if alerts[i].ID == id {
alerts = append(alerts[:i], alerts[i+1:]...)
}
}
}
func OnRefreshOrAddNotification(n model.Notification) {
notificationsLock.Lock()
defer notificationsLock.Unlock()
for i := 0; i < len(notifications); i++ {
if notifications[i].ID == n.ID {
notifications[i] = n
}
}
}
func OnDeleteNotification(id uint64) {
notificationsLock.Lock()
defer notificationsLock.Unlock()
for i := 0; i < len(notifications); i++ {
if notifications[i].ID == id {
notifications = append(notifications[:i], notifications[i+1:]...)
}
}
}
func checkStatus() {
startedAt := time.Now()
defer func() {
time.Sleep(time.Until(startedAt.Add(time.Second * dao.SnapshotDelay)))
checkStatus()
}()
alertsLock.RLock()
defer alertsLock.RUnlock()
dao.ServerLock.RLock()
defer dao.ServerLock.RUnlock()
for j := 0; j < len(alerts); j++ {
for _, server := range dao.ServerList {
// 监测点
alertsStore[alerts[j].ID][server.ID] = append(alertsStore[alerts[j].
ID][server.ID], alerts[j].Snapshot(server))
// 发送通知
max, desc := alerts[j].Check(alertsStore[alerts[j].ID][server.ID])
if desc != "" {
nID := getNotificationHash(server, desc)
var flag bool
if cacheN, has := dao.Cache.Get(nID); has {
nHistory := cacheN.(NotificationHistory)
// 超过一天或者超过上次提醒阈值
if time.Now().After(nHistory.Until) || nHistory.Duration >= time.Hour*24 {
flag = true
nHistory.Duration *= 2
nHistory.Until = time.Now().Add(nHistory.Duration)
}
} else {
// 新提醒
flag = true
dao.Cache.Set(nID, NotificationHistory{
Duration: firstNotificationDelay,
Until: time.Now().Add(firstNotificationDelay),
}, firstNotificationDelay)
}
if flag {
message := fmt.Sprintf("逮到咯,快去看看!服务器:%s(%s),报警规则:%s%s", server.Name, server.Host.IP, alerts[j].Name, desc)
log.Printf("通知:%s\n", message)
go sendNotification(message)
}
}
// 清理旧数据
if max > 0 && max <= len(alertsStore[alerts[j].ID][server.ID]) {
alertsStore[alerts[j].ID][server.ID] = alertsStore[alerts[j].ID][server.ID][max:]
}
}
}
}
func sendNotification(desc string) {
notificationsLock.RLock()
defer notificationsLock.RUnlock()
for i := 0; i < len(notifications); i++ {
notifications[i].Send(desc)
}
}
func getNotificationHash(server *model.Server, desc string) string {
return hex.EncodeToString(md5.New().Sum([]byte(fmt.Sprintf("%d::%s", server.ID, desc))))
}

View File

@ -10,6 +10,11 @@ import (
pb "github.com/naiba/nezha/proto"
)
const (
SnapshotDelay = 3
ReportDelay = 2
)
// Conf ..
var Conf *model.Config