nezha/cmd/agent/main.go

446 lines
11 KiB
Go
Raw Normal View History

2019-12-05 09:36:58 -05:00
package main
import (
"context"
2021-08-17 23:56:54 -04:00
"encoding/json"
"errors"
2019-12-07 05:14:40 -05:00
"fmt"
2021-08-17 23:56:54 -04:00
"io"
"net"
"net/http"
2019-12-08 10:49:38 -05:00
"os"
"os/exec"
2019-12-07 05:14:40 -05:00
"time"
2019-12-05 09:36:58 -05:00
2020-11-29 11:07:27 -05:00
"github.com/blang/semver"
"github.com/go-ping/ping"
2021-08-17 23:56:54 -04:00
"github.com/gorilla/websocket"
2020-12-12 12:31:22 -05:00
"github.com/p14yground/go-github-selfupdate/selfupdate"
2021-08-27 08:30:36 -04:00
flag "github.com/spf13/pflag"
2019-12-05 09:36:58 -05:00
"google.golang.org/grpc"
2019-12-05 10:42:20 -05:00
2021-03-20 02:53:10 -04:00
"github.com/naiba/nezha/cmd/agent/monitor"
2021-08-18 05:42:26 -04:00
"github.com/naiba/nezha/cmd/agent/processgroup"
"github.com/naiba/nezha/cmd/agent/pty"
2020-11-10 21:07:45 -05:00
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/pkg/utils"
2020-11-10 21:07:45 -05:00
pb "github.com/naiba/nezha/proto"
"github.com/naiba/nezha/service/rpc"
2019-12-05 09:36:58 -05:00
)
2021-09-27 09:18:09 -04:00
type AgentConfig struct {
SkipConnectionCount bool
SkipProcsCount bool
DisableAutoUpdate bool
DisableCommandExecute bool
Debug bool
Server string
ClientSecret string
ReportDelay int
2021-09-27 09:18:09 -04:00
}
2019-12-08 10:49:38 -05:00
var (
version string
client pb.NezhaServiceClient
inited bool
2019-12-08 10:49:38 -05:00
)
2020-11-30 01:24:00 -05:00
var (
agentConf AgentConfig
updateCh = make(chan struct{}) // Agent 自动更新间隔
httpClient = &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Timeout: time.Second * 30,
}
2020-11-30 01:24:00 -05:00
)
const (
delayWhenError = time.Second * 10 // Agent 重连间隔
networkTimeOut = time.Second * 5 // 普通网络超时
)
func init() {
http.DefaultClient.Timeout = time.Second * 5
flag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true
}
2019-12-05 09:36:58 -05:00
func main() {
2020-11-29 23:45:51 -05:00
// 来自于 GoReleaser 的版本号
2021-07-08 12:01:58 -04:00
monitor.Version = version
2019-12-08 10:49:38 -05:00
2021-09-27 09:18:09 -04:00
flag.BoolVarP(&agentConf.Debug, "debug", "d", false, "开启调试信息")
2021-09-27 09:26:56 -04:00
flag.StringVarP(&agentConf.Server, "server", "s", "localhost:5555", "管理面板RPC端口")
2021-09-27 09:18:09 -04:00
flag.StringVarP(&agentConf.ClientSecret, "password", "p", "", "Agent连接Secret")
flag.IntVar(&agentConf.ReportDelay, "report-delay", 1, "系统状态上报间隔")
2021-09-27 09:18:09 -04:00
flag.BoolVar(&agentConf.SkipConnectionCount, "skip-conn", false, "不监控连接数")
flag.BoolVar(&agentConf.SkipProcsCount, "skip-procs", false, "不监控进程数")
flag.BoolVar(&agentConf.DisableCommandExecute, "disable-command-execute", false, "禁止在此机器上执行命令")
2021-09-27 09:18:09 -04:00
flag.BoolVar(&agentConf.DisableAutoUpdate, "disable-auto-update", false, "禁用自动升级")
flag.Parse()
2021-09-27 09:18:09 -04:00
if agentConf.ClientSecret == "" {
flag.Usage()
return
}
2021-10-09 12:12:38 -04:00
if agentConf.ReportDelay < 1 || agentConf.ReportDelay > 4 {
println("report-delay 的区间为 1-4")
return
}
run()
}
func run() {
2019-12-09 03:02:49 -05:00
auth := rpc.AuthHandler{
2021-09-27 09:18:09 -04:00
ClientSecret: agentConf.ClientSecret,
2019-12-05 09:36:58 -05:00
}
2019-12-10 04:57:57 -05:00
2021-08-18 05:42:26 -04:00
go pty.DownloadDependency()
2019-12-10 04:57:57 -05:00
// 上报服务器信息
go reportState()
2021-03-20 11:50:16 -04:00
// 更新IP信息
go monitor.UpdateIP()
2019-12-10 04:57:57 -05:00
2021-09-27 09:18:09 -04:00
if _, err := semver.Parse(version); err == nil && !agentConf.DisableAutoUpdate {
go func() {
for range updateCh {
go func() {
defer func() {
time.Sleep(time.Minute * 20)
updateCh <- struct{}{}
}()
doSelfUpdate()
}()
}
}()
updateCh <- struct{}{}
}
2020-11-29 11:07:27 -05:00
2019-12-10 04:57:57 -05:00
var err error
var conn *grpc.ClientConn
2020-02-09 09:04:59 -05:00
retry := func() {
2021-07-16 06:09:50 -04:00
inited = false
println("Error to close connection ...")
2020-02-09 09:04:59 -05:00
if conn != nil {
conn.Close()
}
time.Sleep(delayWhenError)
println("Try to reconnect ...")
2020-02-09 09:04:59 -05:00
}
2019-12-09 05:14:31 -05:00
for {
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
2021-09-27 09:18:09 -04:00
conn, err = grpc.DialContext(timeOutCtx, agentConf.Server, grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth))
2019-12-09 03:02:49 -05:00
if err != nil {
println("与面板建立连接失败:", err)
cancel()
2019-12-09 05:14:31 -05:00
retry()
2019-12-09 03:02:49 -05:00
continue
}
cancel()
2019-12-09 03:02:49 -05:00
client = pb.NewNezhaServiceClient(conn)
// 第一步注册
timeOutCtx, cancel = context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemInfo(timeOutCtx, monitor.GetHost().PB())
2019-12-09 05:14:31 -05:00
if err != nil {
println("上报系统信息失败:", err)
cancel()
2019-12-09 05:14:31 -05:00
retry()
continue
}
cancel()
2021-07-16 06:09:50 -04:00
inited = true
// 执行 Task
2021-06-11 23:51:26 -04:00
tasks, err := client.RequestTask(context.Background(), monitor.GetHost().PB())
2019-12-09 03:02:49 -05:00
if err != nil {
println("请求任务失败:", err)
2019-12-09 05:14:31 -05:00
retry()
2019-12-09 03:02:49 -05:00
continue
}
err = receiveTasks(tasks)
println("receiveTasks exit to main", err)
2019-12-09 05:14:31 -05:00
retry()
2019-12-07 05:14:40 -05:00
}
2019-12-09 03:02:49 -05:00
}
2019-12-07 05:14:40 -05:00
func receiveTasks(tasks pb.NezhaService_RequestTaskClient) error {
2019-12-09 03:02:49 -05:00
var err error
defer println("receiveTasks exit", time.Now(), "=>", err)
2019-12-09 03:02:49 -05:00
for {
var task *pb.Task
task, err = tasks.Recv()
2019-12-09 03:02:49 -05:00
if err != nil {
return err
}
2021-08-17 23:56:54 -04:00
go func() {
defer func() {
if err := recover(); err != nil {
println("task panic", task, err)
2021-08-17 23:56:54 -04:00
}
}()
doTask(task)
}()
}
}
func doTask(task *pb.Task) {
var result pb.TaskResult
result.Id = task.GetId()
result.Type = task.GetType()
switch task.GetType() {
2021-08-17 23:56:54 -04:00
case model.TaskTypeTerminal:
handleTerminalTask(task)
case model.TaskTypeHTTPGET:
2021-08-17 23:56:54 -04:00
handleHttpGetTask(task, &result)
case model.TaskTypeICMPPing:
2021-08-17 23:56:54 -04:00
handleIcmpPingTask(task, &result)
case model.TaskTypeTCPPing:
2021-08-17 23:56:54 -04:00
handleTcpPingTask(task, &result)
case model.TaskTypeCommand:
2021-08-17 23:56:54 -04:00
handleCommandTask(task, &result)
case model.TaskTypeUpgrade:
handleUpgradeTask(task, &result)
default:
println("不支持的任务:", task)
2019-12-07 05:14:40 -05:00
}
client.ReportTask(context.Background(), &result)
2019-12-09 03:02:49 -05:00
}
2019-12-05 09:36:58 -05:00
2019-12-09 03:02:49 -05:00
func reportState() {
var lastReportHostInfo time.Time
2019-12-09 03:02:49 -05:00
var err error
defer println("reportState exit", time.Now(), "=>", err)
2019-12-09 03:02:49 -05:00
for {
2021-07-16 06:09:50 -04:00
// 为了更准确的记录时段流量inited 后再上传状态信息
if client != nil && inited {
2019-12-09 10:45:23 -05:00
monitor.TrackNetworkSpeed()
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemState(timeOutCtx, monitor.GetState(agentConf.SkipConnectionCount, agentConf.SkipProcsCount).PB())
cancel()
2019-12-09 05:14:31 -05:00
if err != nil {
println("reportState error", err)
2019-12-09 05:14:31 -05:00
time.Sleep(delayWhenError)
}
if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) {
lastReportHostInfo = time.Now()
client.ReportSystemInfo(context.Background(), monitor.GetHost().PB())
}
2019-12-05 10:42:20 -05:00
}
time.Sleep(time.Second * time.Duration(agentConf.ReportDelay))
2019-12-05 10:42:20 -05:00
}
2019-12-05 09:36:58 -05:00
}
func doSelfUpdate() {
v := semver.MustParse(version)
println("检查更新:", v)
latest, err := selfupdate.UpdateSelf(v, "naiba/nezha")
if err != nil {
println("更新失败:", err)
return
}
if !latest.Version.Equals(v) {
os.Exit(1)
}
}
func handleUpgradeTask(task *pb.Task, result *pb.TaskResult) {
doSelfUpdate()
}
2021-08-17 23:56:54 -04:00
func handleTcpPingTask(task *pb.Task, result *pb.TaskResult) {
start := time.Now()
conn, err := net.DialTimeout("tcp", task.GetData(), time.Second*10)
if err == nil {
conn.Write([]byte("ping\n"))
conn.Close()
result.Delay = float32(time.Since(start).Microseconds()) / 1000.0
result.Successful = true
} else {
result.Data = err.Error()
}
}
func handleIcmpPingTask(task *pb.Task, result *pb.TaskResult) {
pinger, err := ping.NewPinger(task.GetData())
if err == nil {
pinger.SetPrivileged(true)
pinger.Count = 5
pinger.Timeout = time.Second * 20
err = pinger.Run() // Blocks until finished.
}
if err == nil {
result.Delay = float32(pinger.Statistics().AvgRtt.Microseconds()) / 1000.0
result.Successful = true
} else {
result.Data = err.Error()
}
}
func handleHttpGetTask(task *pb.Task, result *pb.TaskResult) {
start := time.Now()
resp, err := httpClient.Get(task.GetData())
if err == nil {
// 检查 HTTP Response 状态
result.Delay = float32(time.Since(start).Microseconds()) / 1000.0
if resp.StatusCode > 399 || resp.StatusCode < 200 {
err = errors.New("\n应用错误" + resp.Status)
}
}
if err == nil {
// 检查 SSL 证书信息
if resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 {
2021-08-29 02:41:00 -04:00
c := resp.TLS.PeerCertificates[0]
result.Data = c.Issuer.CommonName + "|" + c.NotAfter.In(time.Local).String()
2021-08-17 23:56:54 -04:00
}
2021-08-29 02:41:00 -04:00
result.Successful = true
2021-08-17 23:56:54 -04:00
} else {
// HTTP 请求失败
result.Data = err.Error()
}
}
func handleCommandTask(task *pb.Task, result *pb.TaskResult) {
if agentConf.DisableCommandExecute {
result.Data = "此 Agent 已禁止命令执行"
return
}
2021-08-17 23:56:54 -04:00
startedAt := time.Now()
var cmd *exec.Cmd
var endCh = make(chan struct{})
2021-08-18 05:42:26 -04:00
pg, err := processgroup.NewProcessExitGroup()
2021-08-17 23:56:54 -04:00
if err != nil {
// 进程组创建失败,直接退出
result.Data = err.Error()
return
}
timeout := time.NewTimer(time.Hour * 2)
if utils.IsWindows() {
2021-09-04 00:42:51 -04:00
cmd = exec.Command("cmd", "/c", task.GetData()) // #nosec
2021-08-17 23:56:54 -04:00
} else {
2021-09-04 00:42:51 -04:00
cmd = exec.Command("sh", "-c", task.GetData()) // #nosec
2021-08-17 23:56:54 -04:00
}
2021-08-19 11:57:45 -04:00
cmd.Env = os.Environ()
2021-08-17 23:56:54 -04:00
pg.AddProcess(cmd)
go func() {
select {
case <-timeout.C:
result.Data = "任务执行超时\n"
close(endCh)
pg.Dispose()
case <-endCh:
timeout.Stop()
}
}()
output, err := cmd.Output()
if err != nil {
result.Data += fmt.Sprintf("%s\n%s", string(output), err.Error())
} else {
close(endCh)
result.Data = string(output)
result.Successful = true
}
pg.Dispose()
2021-08-17 23:56:54 -04:00
result.Delay = float32(time.Since(startedAt).Seconds())
}
2021-08-18 05:42:26 -04:00
type WindowSize struct {
Cols uint32
Rows uint32
}
2021-08-17 23:56:54 -04:00
func handleTerminalTask(task *pb.Task) {
if agentConf.DisableCommandExecute {
println("此 Agent 已禁止命令执行")
return
}
2021-08-17 23:56:54 -04:00
var terminal model.TerminalTask
err := json.Unmarshal([]byte(task.GetData()), &terminal)
if err != nil {
println("Terminal 任务解析错误:", err)
return
}
protocol := "ws"
if terminal.UseSSL {
protocol += "s"
}
header := http.Header{}
2021-09-27 09:18:09 -04:00
header.Add("Secret", agentConf.ClientSecret)
2021-08-17 23:56:54 -04:00
conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s://%s/terminal/%s", protocol, terminal.Host, terminal.Session), header)
if err != nil {
println("Terminal 连接失败:", err)
return
}
defer conn.Close()
2021-08-18 05:42:26 -04:00
tty, err := pty.Start()
2021-08-17 23:56:54 -04:00
if err != nil {
println("Terminal pty.Start失败", err)
return
}
defer func() {
err := tty.Close()
2021-08-17 23:56:54 -04:00
conn.Close()
println("terminal exit", terminal.Session, err)
2021-08-17 23:56:54 -04:00
}()
2021-08-18 05:42:26 -04:00
println("terminal init", terminal.Session)
2021-08-17 23:56:54 -04:00
go func() {
for {
buf := make([]byte, 1024)
read, err := tty.Read(buf)
if err != nil {
conn.WriteMessage(websocket.TextMessage, []byte(err.Error()))
return
}
conn.WriteMessage(websocket.BinaryMessage, buf[:read])
}
}()
for {
messageType, reader, err := conn.NextReader()
if err != nil {
return
}
if messageType == websocket.TextMessage {
continue
}
dataTypeBuf := make([]byte, 1)
read, err := reader.Read(dataTypeBuf)
if err != nil {
conn.WriteMessage(websocket.TextMessage, []byte("Unable to read message type from reader"))
return
}
if read != 1 {
return
}
switch dataTypeBuf[0] {
case 0:
io.Copy(tty, reader)
case 1:
decoder := json.NewDecoder(reader)
2021-08-18 05:42:26 -04:00
var resizeMessage WindowSize
2021-08-17 23:56:54 -04:00
err := decoder.Decode(&resizeMessage)
if err != nil {
continue
}
2021-08-18 05:42:26 -04:00
tty.Setsize(resizeMessage.Cols, resizeMessage.Rows)
2021-08-17 23:56:54 -04:00
}
}
}
func println(v ...interface{}) {
2021-09-27 09:18:09 -04:00
if agentConf.Debug {
fmt.Printf("NEZHA@%s>> ", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(v...)
}
}