diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d33581e..6cb2562 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -59,14 +59,18 @@ func run(cmd *cobra.Command, args []string) { ClientID: clientID, ClientSecret: clientSecret, } - go reportState() - var err error - var conn *grpc.ClientConn - var hc pb.NezhaService_HeartbeatClient retry := func() { time.Sleep(delayWhenError) log.Println("Try to reconnect ...") } + + // 上报服务器信息 + go reportState() + + var err error + var conn *grpc.ClientConn + var hc pb.NezhaService_HeartbeatClient + for { conn, err = grpc.Dial(server, grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth)) if err != nil { @@ -82,6 +86,7 @@ func run(cmd *cobra.Command, args []string) { retry() continue } + // 心跳接收控制命令 hc, err = client.Heartbeat(ctx, &pb.Beat{ Timestamp: fmt.Sprintf("%v", time.Now()), }) diff --git a/cmd/dashboard/controller/common_page.go b/cmd/dashboard/controller/common_page.go index 20b612b..b041b3a 100644 --- a/cmd/dashboard/controller/common_page.go +++ b/cmd/dashboard/controller/common_page.go @@ -2,11 +2,15 @@ package controller import ( "net/http" + "sync" + "time" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" "github.com/p14yground/nezha/model" "github.com/p14yground/nezha/pkg/mygin" + pb "github.com/p14yground/nezha/proto" "github.com/p14yground/nezha/service/dao" ) @@ -18,6 +22,7 @@ func (cp *commonPage) serve() { cr := cp.r.Group("") cr.Use(mygin.Authorize(mygin.AuthorizeOption{})) cr.GET("/", cp.home) + cr.GET("/ws", cp.ws) } func (cp *commonPage) home(c *gin.Context) { @@ -33,3 +38,51 @@ func (cp *commonPage) home(c *gin.Context) { "Servers": dao.ServerList, })) } + +var upgrader = websocket.Upgrader{} + +func (cp *commonPage) ws(c *gin.Context) { + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + mygin.ShowErrorPage(c, mygin.ErrInfo{ + Code: http.StatusInternalServerError, + Title: "网络错误", + Msg: "Websocket协议切换失败", + Link: "/", + Btn: "返回首页", + }, true) + return + } + defer conn.Close() + var wg sync.WaitGroup + wg.Add(2) + go func() { + var mt int + var message []byte + for { + mt, message, err = conn.ReadMessage() + if err != nil { + wg.Done() + break + } + if mt == websocket.TextMessage && string(message) == "track" { + dao.SendCommand(&pb.Command{ + Type: model.MTReportState, + }) + } + } + }() + go func() { + for { + dao.ServerLock.RLock() + err = conn.WriteJSON(dao.ServerList) + dao.ServerLock.RUnlock() + if err != nil { + wg.Done() + break + } + time.Sleep(time.Second * 2) + } + }() + wg.Wait() +} diff --git a/go.mod b/go.mod index 2fa5787..912e532 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/golang/protobuf v1.3.2 github.com/google/go-github/v28 v28.1.1 + github.com/gorilla/websocket v1.4.0 github.com/jinzhu/gorm v1.9.11 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect diff --git a/go.sum b/go.sum index d40d8ae..6fb479a 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= diff --git a/model/config.go b/model/config.go index 97d33e4..80f88da 100644 --- a/model/config.go +++ b/model/config.go @@ -10,6 +10,7 @@ type Config struct { Debug bool Site struct { Brand string // 站点名称 + Domain string // 站点域名 CookieName string // 浏览器 Cookie 名称 } GitHub struct { diff --git a/model/monitor.go b/model/monitor.go index 746db7d..fb0dd24 100644 --- a/model/monitor.go +++ b/model/monitor.go @@ -68,7 +68,7 @@ type Host struct { Arch string Virtualization string BootTime uint64 - IP string + IP string `json:"-"` CountryCode string Version string } diff --git a/model/server.go b/model/server.go index f4f3a97..af9eb2e 100644 --- a/model/server.go +++ b/model/server.go @@ -1,5 +1,9 @@ package model +import ( + pb "github.com/p14yground/nezha/proto" +) + // Server .. type Server struct { Common @@ -8,4 +12,7 @@ type Server struct { Host Host State State + + Stream pb.NezhaService_HeartbeatServer `gorm:"-" json:"-"` + StreamClose chan<- error `gorm:"-" json:"-"` } diff --git a/resource/static/main.js b/resource/static/main.js index 6f2ff03..30f3195 100644 --- a/resource/static/main.js +++ b/resource/static/main.js @@ -1,3 +1,15 @@ +/** + * Converts a long string of bytes into a readable format e.g KB, MB, GB, TB, YB + * + * @param {Int} num The number of bytes. + */ +function readableBytes(bytes) { + var i = Math.floor(Math.log(bytes) / Math.log(1024)), + sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; + + return (bytes / Math.pow(1024, i)).toFixed(2) * 1 + ' ' + sizes[i]; +} + const confirmBtn = $('.mini.confirm.modal .positive.button') function showConfirm(title, content, callFn, extData) { diff --git a/resource/template/page/home.html b/resource/template/page/home.html index c9bff81..45c9740 100644 --- a/resource/template/page/home.html +++ b/resource/template/page/home.html @@ -4,7 +4,7 @@
-
+
@#server.Name#@ @@ -13,12 +13,14 @@ 系统:@#server.Host.Platform#@-@#server.Host.PlatformVersion#@ [@#server.Host.Virtualization#@:@#server.Host.Arch#@]
CPU:@#server.Host.CPU#@
- 硬盘:@#server.State.DiskUsed#@/@#server.State.DiskTotal#@
- 内存:@#server.State.MemUsed#@/@#server.State.MemTotal#@
- 交换:@#server.State.SwapUsed#@/@#server.State.SwapTotal#@
- 流量:@#server.State.NetInTransfer#@ @#server.State.NetOutTransfer#@
- 启动:@#server.Host.BootTime#@
+ 硬盘:@#formatByteSize(server.State.DiskUsed)#@/@#formatByteSize(server.State.DiskTotal)#@
+ 内存:@#formatByteSize(server.State.MemUsed)#@/@#formatByteSize(server.State.MemTotal)#@
+ 交换:@#formatByteSize(server.State.SwapUsed)#@/@#formatByteSize(server.State.SwapTotal)#@
+ 流量:@#formatByteSize(server.State.NetInTransfer)#@ + @#formatByteSize(server.State.NetOutTransfer)#@
+ 启动:@# formatTimestamp(server.Host.BootTime) #@
版本:@#'v'+server.Host.Version#@
@@ -26,7 +28,7 @@
CPU
-
+
@@ -34,7 +36,7 @@
内存
-
@@ -43,7 +45,7 @@
交换
-
@@ -53,13 +55,13 @@
网络
- @#server.State.NetInSpeed#@/s + @#formatByteSize(server.State.NetInSpeed)#@/s - @#server.State.NetOutSpeed#@/s + @#formatByteSize(server.State.NetOutSpeed)#@/s
硬盘
-
@@ -68,7 +70,7 @@
在线
- @#server.State.Uptime#@ + @#secondToDate(server.State.Uptime)#@
@@ -79,12 +81,12 @@
{{template "common/footer" .}} {{end}} \ No newline at end of file diff --git a/service/dao/dao.go b/service/dao/dao.go index 5727b0d..47d2006 100644 --- a/service/dao/dao.go +++ b/service/dao/dao.go @@ -7,6 +7,7 @@ import ( "github.com/patrickmn/go-cache" "github.com/p14yground/nezha/model" + pb "github.com/p14yground/nezha/proto" ) // Conf .. @@ -29,3 +30,19 @@ var ServerLock sync.RWMutex // Version .. var Version = "debug" + +// SendCommand .. +func SendCommand(cmd *pb.Command) { + ServerLock.RLock() + defer ServerLock.RUnlock() + var err error + for _, server := range ServerList { + if server.Stream != nil { + err = server.Stream.Send(cmd) + if err != nil { + close(server.StreamClose) + server.Stream = nil + } + } + } +} diff --git a/service/rpc/nezha.go b/service/rpc/nezha.go index fb753e1..a8eca69 100644 --- a/service/rpc/nezha.go +++ b/service/rpc/nezha.go @@ -21,8 +21,8 @@ func (s *NezhaHandler) ReportState(c context.Context, r *pb.State) (*pb.Receipt, if clientID, err = s.Auth.Check(c); err != nil { return nil, err } - dao.ServerLock.Lock() - defer dao.ServerLock.Unlock() + dao.ServerLock.RLock() + defer dao.ServerLock.RUnlock() dao.ServerList[clientID].State = model.PB2State(r) return &pb.Receipt{Proced: true}, nil } @@ -35,13 +35,15 @@ func (s *NezhaHandler) Heartbeat(r *pb.Beat, stream pb.NezhaService_HeartbeatSer if clientID, err = s.Auth.Check(stream.Context()); err != nil { return err } - err = stream.Send(&pb.Command{ - Type: model.MTReportState, - }) - if err != nil { - log.Printf("Heartbeat stream.Send err:%v", err) + dao.ServerLock.RLock() + defer dao.ServerLock.RUnlock() + closeCh := make(chan error) + dao.ServerList[clientID].StreamClose = closeCh + dao.ServerList[clientID].Stream = stream + select { + case err = <-closeCh: + return err } - select {} } // Register .. @@ -51,8 +53,8 @@ func (s *NezhaHandler) Register(c context.Context, r *pb.Host) (*pb.Receipt, err if clientID, err = s.Auth.Check(c); err != nil { return nil, err } - dao.ServerLock.Lock() - defer dao.ServerLock.Unlock() + dao.ServerLock.RLock() + defer dao.ServerLock.RUnlock() dao.ServerList[clientID].Host = model.PB2Host(r) return &pb.Receipt{Proced: true}, nil }