nezha/cmd/agent/main.go

316 lines
7.3 KiB
Go
Raw Normal View History

2019-12-05 09:36:58 -05:00
package main
import (
"context"
2021-01-16 01:11:51 -05:00
"crypto/tls"
"errors"
"flag"
2019-12-07 05:14:40 -05:00
"fmt"
2019-12-05 09:36:58 -05:00
"log"
"net"
"net/http"
"net/url"
2019-12-08 10:49:38 -05:00
"os"
"os/exec"
2019-12-07 05:14:40 -05:00
"time"
2019-12-05 09:36:58 -05:00
2020-11-29 11:07:27 -05:00
"github.com/blang/semver"
"github.com/genkiroid/cert"
"github.com/go-ping/ping"
2020-12-12 12:31:22 -05:00
"github.com/p14yground/go-github-selfupdate/selfupdate"
2019-12-05 09:36:58 -05:00
"google.golang.org/grpc"
2019-12-05 10:42:20 -05:00
2021-03-20 02:53:10 -04:00
"github.com/naiba/nezha/cmd/agent/monitor"
2020-11-10 21:07:45 -05:00
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/pkg/utils"
2020-11-10 21:07:45 -05:00
pb "github.com/naiba/nezha/proto"
"github.com/naiba/nezha/service/rpc"
2019-12-05 09:36:58 -05:00
)
func init() {
cert.TimeoutSeconds = 30
http.DefaultClient.Timeout = time.Second * 5
}
2019-12-08 10:49:38 -05:00
var (
2020-11-29 11:07:27 -05:00
server string
clientSecret string
2020-12-18 23:43:03 -05:00
version string
2021-07-08 12:01:58 -04:00
debug bool
2019-12-08 10:49:38 -05:00
)
2020-11-30 01:24:00 -05:00
var (
client pb.NezhaServiceClient
2021-07-16 06:09:50 -04:00
inited bool
updateCh = make(chan struct{}) // Agent 自动更新间隔
httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Timeout: time.Second * 30,
}
2020-11-30 01:24:00 -05:00
)
const (
delayWhenError = time.Second * 10 // Agent 重连间隔
networkTimeOut = time.Second * 5 // 普通网络超时
)
2019-12-05 09:36:58 -05:00
func main() {
2020-11-29 23:45:51 -05:00
// 来自于 GoReleaser 的版本号
2021-07-08 12:01:58 -04:00
monitor.Version = version
2019-12-08 10:49:38 -05:00
flag.String("i", "", "unused 旧Agent配置兼容")
flag.BoolVar(&debug, "d", false, "开启调试信息")
flag.StringVar(&server, "s", "localhost:5555", "管理面板RPC端口")
flag.StringVar(&clientSecret, "p", "", "Agent连接Secret")
flag.Parse()
if server == "" || clientSecret == "" {
flag.Usage()
return
}
run()
}
func run() {
2019-12-09 03:02:49 -05:00
auth := rpc.AuthHandler{
ClientSecret: clientSecret,
2019-12-05 09:36:58 -05:00
}
2019-12-10 04:57:57 -05:00
// 上报服务器信息
go reportState()
2021-03-20 11:50:16 -04:00
// 更新IP信息
go monitor.UpdateIP()
2019-12-10 04:57:57 -05:00
if _, err := semver.Parse(version); err == nil {
go func() {
for range updateCh {
go doSelfUpdate()
}
}()
updateCh <- struct{}{}
}
2020-11-29 11:07:27 -05:00
2019-12-10 04:57:57 -05:00
var err error
var conn *grpc.ClientConn
2020-02-09 09:04:59 -05:00
retry := func() {
2021-07-16 06:09:50 -04:00
inited = false
println("Error to close connection ...")
2020-02-09 09:04:59 -05:00
if conn != nil {
conn.Close()
}
time.Sleep(delayWhenError)
println("Try to reconnect ...")
2020-02-09 09:04:59 -05:00
}
2019-12-09 05:14:31 -05:00
for {
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
conn, err = grpc.DialContext(timeOutCtx, server, grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth))
2019-12-09 03:02:49 -05:00
if err != nil {
println("grpc.Dial err: ", err)
cancel()
2019-12-09 05:14:31 -05:00
retry()
2019-12-09 03:02:49 -05:00
continue
}
cancel()
2019-12-09 03:02:49 -05:00
client = pb.NewNezhaServiceClient(conn)
// 第一步注册
timeOutCtx, cancel = context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemInfo(timeOutCtx, monitor.GetHost().PB())
2019-12-09 05:14:31 -05:00
if err != nil {
println("client.ReportSystemInfo err: ", err)
cancel()
2019-12-09 05:14:31 -05:00
retry()
continue
}
cancel()
2021-07-16 06:09:50 -04:00
inited = true
// 执行 Task
2021-06-11 23:51:26 -04:00
tasks, err := client.RequestTask(context.Background(), monitor.GetHost().PB())
2019-12-09 03:02:49 -05:00
if err != nil {
println("client.RequestTask err: ", err)
2019-12-09 05:14:31 -05:00
retry()
2019-12-09 03:02:49 -05:00
continue
}
err = receiveTasks(tasks)
println("receiveTasks exit to main: ", err)
2019-12-09 05:14:31 -05:00
retry()
2019-12-07 05:14:40 -05:00
}
2019-12-09 03:02:49 -05:00
}
2019-12-07 05:14:40 -05:00
func receiveTasks(tasks pb.NezhaService_RequestTaskClient) error {
2019-12-09 03:02:49 -05:00
var err error
defer println("receiveTasks exit", time.Now(), "=>", err)
2019-12-09 03:02:49 -05:00
for {
var task *pb.Task
task, err = tasks.Recv()
2019-12-09 03:02:49 -05:00
if err != nil {
return err
}
go doTask(task)
}
}
func doTask(task *pb.Task) {
var result pb.TaskResult
result.Id = task.GetId()
result.Type = task.GetType()
switch task.GetType() {
case model.TaskTypeHTTPGET:
start := time.Now()
resp, err := httpClient.Get(task.GetData())
if err == nil {
// 检查 HTTP Response 状态
result.Delay = float32(time.Since(start).Microseconds()) / 1000.0
if resp.StatusCode > 399 || resp.StatusCode < 200 {
err = errors.New("\n应用错误" + resp.Status)
}
}
if err == nil {
// 检查 SSL 证书信息
serviceUrl, err := url.Parse(task.GetData())
if err == nil {
if serviceUrl.Scheme == "https" {
c := cert.NewCert(serviceUrl.Host)
if c.Error != "" {
result.Data = "SSL证书错误" + c.Error
} else {
result.Data = c.Issuer + "|" + c.NotAfter
result.Successful = true
}
} else {
result.Successful = true
}
} else {
result.Data = "URL解析错误" + err.Error()
}
} else {
// HTTP 请求失败
result.Data = err.Error()
}
case model.TaskTypeICMPPing:
pinger, err := ping.NewPinger(task.GetData())
if err == nil {
2021-01-18 00:45:06 -05:00
pinger.SetPrivileged(true)
pinger.Count = 5
pinger.Timeout = time.Second * 20
err = pinger.Run() // Blocks until finished.
}
if err == nil {
result.Delay = float32(pinger.Statistics().AvgRtt.Microseconds()) / 1000.0
result.Successful = true
} else {
result.Data = err.Error()
}
case model.TaskTypeTCPPing:
start := time.Now()
conn, err := net.DialTimeout("tcp", task.GetData(), time.Second*10)
if err == nil {
conn.Write([]byte("ping\n"))
conn.Close()
result.Delay = float32(time.Since(start).Microseconds()) / 1000.0
result.Successful = true
} else {
result.Data = err.Error()
2019-12-09 03:02:49 -05:00
}
case model.TaskTypeCommand:
startedAt := time.Now()
var cmd *exec.Cmd
2021-01-28 21:40:57 -05:00
var endCh = make(chan struct{})
2021-01-28 22:59:35 -05:00
pg, err := utils.NewProcessExitGroup()
if err != nil {
// 进程组创建失败,直接退出
result.Data = err.Error()
client.ReportTask(context.Background(), &result)
2021-01-28 22:59:35 -05:00
return
}
2021-01-28 21:40:57 -05:00
timeout := time.NewTimer(time.Hour * 2)
if utils.IsWindows() {
cmd = exec.Command("cmd", "/c", task.GetData())
} else {
cmd = exec.Command("sh", "-c", task.GetData())
}
2021-01-28 22:59:35 -05:00
pg.AddProcess(cmd)
2021-01-28 21:40:57 -05:00
go func() {
select {
case <-timeout.C:
result.Data = "任务执行超时\n"
close(endCh)
2021-01-29 01:29:31 -05:00
pg.Dispose()
2021-01-28 21:40:57 -05:00
case <-endCh:
2021-01-29 01:29:31 -05:00
timeout.Stop()
}
2021-01-28 21:40:57 -05:00
}()
output, err := cmd.Output()
if err != nil {
result.Data += fmt.Sprintf("%s\n%s", string(output), err.Error())
} else {
close(endCh)
result.Data = string(output)
result.Successful = true
}
result.Delay = float32(time.Since(startedAt).Seconds())
default:
println("Unknown action: ", task)
2019-12-07 05:14:40 -05:00
}
client.ReportTask(context.Background(), &result)
2019-12-09 03:02:49 -05:00
}
2019-12-05 09:36:58 -05:00
2019-12-09 03:02:49 -05:00
func reportState() {
var lastReportHostInfo time.Time
2019-12-09 03:02:49 -05:00
var err error
var now time.Time
defer println("reportState exit", time.Now(), "=>", err)
2019-12-09 03:02:49 -05:00
for {
now = time.Now()
2021-07-16 06:09:50 -04:00
// 为了更准确的记录时段流量inited 后再上传状态信息
if client != nil && inited {
2019-12-09 10:45:23 -05:00
monitor.TrackNetworkSpeed()
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemState(timeOutCtx, monitor.GetState().PB())
cancel()
2019-12-09 05:14:31 -05:00
if err != nil {
println("reportState error", err)
2019-12-09 05:14:31 -05:00
time.Sleep(delayWhenError)
}
if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) {
lastReportHostInfo = time.Now()
client.ReportSystemInfo(context.Background(), monitor.GetHost().PB())
}
2019-12-05 10:42:20 -05:00
}
time.Sleep(time.Until(now.Add(time.Second)))
2019-12-05 10:42:20 -05:00
}
2019-12-05 09:36:58 -05:00
}
func doSelfUpdate() {
defer func() {
time.Sleep(time.Minute * 20)
updateCh <- struct{}{}
}()
v := semver.MustParse(version)
println("Check update", v)
latest, err := selfupdate.UpdateSelf(v, "naiba/nezha")
if err != nil {
println("Binary update failed:", err)
return
}
if latest.Version.Equals(v) {
println("Current binary is up to date", version)
} else {
println("Upgrade successfully", latest.Version)
os.Exit(1)
}
}
func println(v ...interface{}) {
2021-07-08 12:01:58 -04:00
if debug {
log.Println(v...)
}
}