Websocket 实时拉取

This commit is contained in:
奶爸 2019-12-10 17:57:57 +08:00
parent be57af065d
commit 3c39e1537d
11 changed files with 197 additions and 32 deletions

View File

@ -59,14 +59,18 @@ func run(cmd *cobra.Command, args []string) {
ClientID: clientID,
ClientSecret: clientSecret,
}
go reportState()
var err error
var conn *grpc.ClientConn
var hc pb.NezhaService_HeartbeatClient
retry := func() {
time.Sleep(delayWhenError)
log.Println("Try to reconnect ...")
}
// 上报服务器信息
go reportState()
var err error
var conn *grpc.ClientConn
var hc pb.NezhaService_HeartbeatClient
for {
conn, err = grpc.Dial(server, grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth))
if err != nil {
@ -82,6 +86,7 @@ func run(cmd *cobra.Command, args []string) {
retry()
continue
}
// 心跳接收控制命令
hc, err = client.Heartbeat(ctx, &pb.Beat{
Timestamp: fmt.Sprintf("%v", time.Now()),
})

View File

@ -2,11 +2,15 @@ package controller
import (
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/p14yground/nezha/model"
"github.com/p14yground/nezha/pkg/mygin"
pb "github.com/p14yground/nezha/proto"
"github.com/p14yground/nezha/service/dao"
)
@ -18,6 +22,7 @@ func (cp *commonPage) serve() {
cr := cp.r.Group("")
cr.Use(mygin.Authorize(mygin.AuthorizeOption{}))
cr.GET("/", cp.home)
cr.GET("/ws", cp.ws)
}
func (cp *commonPage) home(c *gin.Context) {
@ -33,3 +38,51 @@ func (cp *commonPage) home(c *gin.Context) {
"Servers": dao.ServerList,
}))
}
var upgrader = websocket.Upgrader{}
func (cp *commonPage) ws(c *gin.Context) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
mygin.ShowErrorPage(c, mygin.ErrInfo{
Code: http.StatusInternalServerError,
Title: "网络错误",
Msg: "Websocket协议切换失败",
Link: "/",
Btn: "返回首页",
}, true)
return
}
defer conn.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
var mt int
var message []byte
for {
mt, message, err = conn.ReadMessage()
if err != nil {
wg.Done()
break
}
if mt == websocket.TextMessage && string(message) == "track" {
dao.SendCommand(&pb.Command{
Type: model.MTReportState,
})
}
}
}()
go func() {
for {
dao.ServerLock.RLock()
err = conn.WriteJSON(dao.ServerList)
dao.ServerLock.RUnlock()
if err != nil {
wg.Done()
break
}
time.Sleep(time.Second * 2)
}
}()
wg.Wait()
}

1
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/go-github/v28 v28.1.1
github.com/gorilla/websocket v1.4.0
github.com/jinzhu/gorm v1.9.11
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect

1
go.sum
View File

@ -90,6 +90,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=

View File

@ -10,6 +10,7 @@ type Config struct {
Debug bool
Site struct {
Brand string // 站点名称
Domain string // 站点域名
CookieName string // 浏览器 Cookie 名称
}
GitHub struct {

View File

@ -68,7 +68,7 @@ type Host struct {
Arch string
Virtualization string
BootTime uint64
IP string
IP string `json:"-"`
CountryCode string
Version string
}

View File

@ -1,5 +1,9 @@
package model
import (
pb "github.com/p14yground/nezha/proto"
)
// Server ..
type Server struct {
Common
@ -8,4 +12,7 @@ type Server struct {
Host Host
State State
Stream pb.NezhaService_HeartbeatServer `gorm:"-" json:"-"`
StreamClose chan<- error `gorm:"-" json:"-"`
}

View File

@ -1,3 +1,15 @@
/**
* Converts a long string of bytes into a readable format e.g KB, MB, GB, TB, YB
*
* @param {Int} num The number of bytes.
*/
function readableBytes(bytes) {
var i = Math.floor(Math.log(bytes) / Math.log(1024)),
sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
return (bytes / Math.pow(1024, i)).toFixed(2) * 1 + ' ' + sizes[i];
}
const confirmBtn = $('.mini.confirm.modal .positive.button')
function showConfirm(title, content, callFn, extData) {

View File

@ -4,7 +4,7 @@
<div class="nb-container">
<div class="ui container">
<div class="ui four status cards">
<div v-for='server in servers' class="card">
<div v-for='server in servers' :id="server.ID" class="card">
<div class="content">
<div class="header"><i v-if='server.Host.CountryCode'
:class="server.Host.CountryCode + ' flag'"></i> @#server.Name#@
@ -13,12 +13,14 @@
系统:@#server.Host.Platform#@-@#server.Host.PlatformVersion#@ [<span
v-if='server.Host.Virtualization'>@#server.Host.Virtualization#@:</span>@#server.Host.Arch#@]<br>
CPU@#server.Host.CPU#@<br>
硬盘:@#server.State.DiskUsed#@/@#server.State.DiskTotal#@<br>
内存:@#server.State.MemUsed#@/@#server.State.MemTotal#@<br>
交换:@#server.State.SwapUsed#@/@#server.State.SwapTotal#@<br>
流量:<i class='arrow alternate circle down outline icon'></i>@#server.State.NetInTransfer#@ <i
class='arrow alternate circle up outline icon'></i>@#server.State.NetOutTransfer#@<br>
启动:@#server.Host.BootTime#@<br>
硬盘:@#formatByteSize(server.State.DiskUsed)#@/@#formatByteSize(server.State.DiskTotal)#@<br>
内存:@#formatByteSize(server.State.MemUsed)#@/@#formatByteSize(server.State.MemTotal)#@<br>
交换:@#formatByteSize(server.State.SwapUsed)#@/@#formatByteSize(server.State.SwapTotal)#@<br>
流量:<i
class='arrow alternate circle down outline icon'></i>@#formatByteSize(server.State.NetInTransfer)#@
<i
class='arrow alternate circle up outline icon'></i>@#formatByteSize(server.State.NetOutTransfer)#@<br>
启动:@# formatTimestamp(server.Host.BootTime) #@<br>
版本:@#'v'+server.Host.Version#@<br>
</div>
</div>
@ -26,7 +28,7 @@
<div class="ui grid">
<div class="three wide column">CPU</div>
<div class="thirteen wide column">
<div class="ui active progress" :data-value="server.State.CPU" data-total="100">
<div class="ui cpu progress" :data-value="server.State.CPU" data-total="100">
<div class="bar">
<div class="progress"></div>
</div>
@ -34,7 +36,7 @@
</div>
<div class="three wide column">内存</div>
<div class="thirteen wide column">
<div class="ui active progress" :data-value="server.State.MemUsed"
<div class="ui mem progress" :data-value="server.State.MemUsed"
:data-total="server.State.MemTotal">
<div class="bar">
<div class="progress"></div>
@ -43,7 +45,7 @@
</div>
<div class="three wide column">交换</div>
<div class="thirteen wide column">
<div class="ui active progress" :data-value="server.State.SwapUsed"
<div class="ui swap progress" :data-value="server.State.SwapUsed"
:data-total="server.State.SwapTotal">
<div class="bar">
<div class="progress"></div>
@ -53,13 +55,13 @@
<div class="three wide column">网络</div>
<div class="thirteen wide column">
<i class="arrow alternate circle down outline icon"></i>
@#server.State.NetInSpeed#@/s
@#formatByteSize(server.State.NetInSpeed)#@/s
<i class="arrow alternate circle up outline icon"></i>
@#server.State.NetOutSpeed#@/s
@#formatByteSize(server.State.NetOutSpeed)#@/s
</div>
<div class="three wide column">硬盘</div>
<div class="thirteen wide column">
<div class="ui active progress" :data-value="server.State.DiskUsed"
<div class="ui disk progress" :data-value="server.State.DiskUsed"
:data-total="server.State.DiskTotal">
<div class="bar">
<div class="progress"></div>
@ -68,7 +70,7 @@
</div>
<div class="three wide column">在线</div>
<div class="thirteen wide column">
<i class="clock icon"></i>@#server.State.Uptime#@
<i class="clock icon"></i>@#secondToDate(server.State.Uptime)#@
</div>
</div>
</div>
@ -79,12 +81,12 @@
</div>
{{template "common/footer" .}}
<script>
const servers = {{.Servers }};
const initData = {{.Servers }};
var statusCards = new Vue({
el: 'div.status.cards',
delimiters: ['@#', '#@'],
data: {
servers,
servers: initData,
},
mounted() {
$('.progress').progress();
@ -93,7 +95,71 @@
});
},
methods: {
secondToDate(s) {
var d = Math.floor(s / 3600 / 24);
var h = Math.floor(s / 3600);
var m = Math.floor((s / 60 % 60));
var s = Math.floor((s % 60));
return result = d + "天" + h + "小时" + m + "分钟" + s + "秒";
},
formatTimestamp(t) {
return new Date(t * 1000).toLocaleString()
},
formatByteSize(bs) {
const x = readableBytes(bs)
return x != "NaN undefined" ? x : '0 KB'
}
}
});
})
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onopen = function (evt) {
$.suiAlert({
title: '实时通道建立',
description: '可以实时获取最新监控数据啦',
type: 'success',
time: '2',
position: 'top-center',
});
ws.send('track');
setInterval(() => {
ws.send('track');
console.log('追加监控时间')
}, 1000 * 60 * 6);
}
ws.onmessage = function (evt) {
const oldServers = statusCards.servers
statusCards.servers = JSON.parse(evt.data)
const keys = Object.keys(statusCards.servers)
for (let i = 0; i < keys.length; i++) {
const ns = statusCards.servers[keys[i]];
for (let j = 0; j < oldServers.length; j++) {
const os = oldServers[j];
if (ns.ID == os.ID) {
break
}
// 新加入的仔
$('#' + ns.ID + ' .yellow.info.icon').popup({
popup: '.ui.content.popup'
});
}
// 刷新进度条
const bars = [$('#' + ns.ID + ' .cpu.progress'),
$('#' + ns.ID + ' .mem.progress'),
$('#' + ns.ID + ' .swap.progress'),
$('#' + ns.ID + ' .disk.progress')]
bars.forEach(b => {
b.progress('update progress', b[0].dataset.value);
})
}
}
ws.onclose = function () {
$.suiAlert({
title: '实时通道断开',
description: '无法实时获取最新监控数据咯',
type: 'warning',
time: '2',
position: 'top-center',
});
}
</script>
{{end}}

View File

@ -7,6 +7,7 @@ import (
"github.com/patrickmn/go-cache"
"github.com/p14yground/nezha/model"
pb "github.com/p14yground/nezha/proto"
)
// Conf ..
@ -29,3 +30,19 @@ var ServerLock sync.RWMutex
// Version ..
var Version = "debug"
// SendCommand ..
func SendCommand(cmd *pb.Command) {
ServerLock.RLock()
defer ServerLock.RUnlock()
var err error
for _, server := range ServerList {
if server.Stream != nil {
err = server.Stream.Send(cmd)
if err != nil {
close(server.StreamClose)
server.Stream = nil
}
}
}
}

View File

@ -21,8 +21,8 @@ func (s *NezhaHandler) ReportState(c context.Context, r *pb.State) (*pb.Receipt,
if clientID, err = s.Auth.Check(c); err != nil {
return nil, err
}
dao.ServerLock.Lock()
defer dao.ServerLock.Unlock()
dao.ServerLock.RLock()
defer dao.ServerLock.RUnlock()
dao.ServerList[clientID].State = model.PB2State(r)
return &pb.Receipt{Proced: true}, nil
}
@ -35,13 +35,15 @@ func (s *NezhaHandler) Heartbeat(r *pb.Beat, stream pb.NezhaService_HeartbeatSer
if clientID, err = s.Auth.Check(stream.Context()); err != nil {
return err
}
err = stream.Send(&pb.Command{
Type: model.MTReportState,
})
if err != nil {
log.Printf("Heartbeat stream.Send err:%v", err)
dao.ServerLock.RLock()
defer dao.ServerLock.RUnlock()
closeCh := make(chan error)
dao.ServerList[clientID].StreamClose = closeCh
dao.ServerList[clientID].Stream = stream
select {
case err = <-closeCh:
return err
}
select {}
}
// Register ..
@ -51,8 +53,8 @@ func (s *NezhaHandler) Register(c context.Context, r *pb.Host) (*pb.Receipt, err
if clientID, err = s.Auth.Check(c); err != nil {
return nil, err
}
dao.ServerLock.Lock()
defer dao.ServerLock.Unlock()
dao.ServerLock.RLock()
defer dao.ServerLock.RUnlock()
dao.ServerList[clientID].Host = model.PB2Host(r)
return &pb.Receipt{Proced: true}, nil
}