From d457a7c5cdc060987ff566ff9278a7c3ee05f489 Mon Sep 17 00:00:00 2001 From: UUBulb <35923940+uubulb@users.noreply.github.com> Date: Wed, 26 Feb 2025 20:48:54 +0800 Subject: [PATCH] fix: ConfigCache not copied affer server updates (#1008) * fix: ConfigCache not copied affer server updates * fix: server list not updated when dispatching tasks * improve * reuse logic --- cmd/dashboard/rpc/rpc.go | 82 +++++++++++----------------- model/server.go | 1 + service/singleton/servicesentinel.go | 72 +++++++++++++----------- 3 files changed, 73 insertions(+), 82 deletions(-) diff --git a/cmd/dashboard/rpc/rpc.go b/cmd/dashboard/rpc/rpc.go index 79157f8..aa84398 100644 --- a/cmd/dashboard/rpc/rpc.go +++ b/cmd/dashboard/rpc/rpc.go @@ -75,65 +75,36 @@ func getRealIp(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, } func DispatchTask(serviceSentinelDispatchBus <-chan *model.Service) { - workedServerIndex := 0 - list := singleton.ServerShared.GetSortedList() for task := range serviceSentinelDispatchBus { if task == nil { continue } - round := 0 - endIndex := workedServerIndex - // 如果已经轮了一整圈又轮到自己,没有合适机器去请求,跳出循环 - for round < 1 || workedServerIndex < endIndex { - // 如果到了圈尾,再回到圈头,圈数加一,游标重置 - if workedServerIndex >= len(list) { - workedServerIndex = 0 - round++ - continue - } - // 如果服务器不在线,跳过这个服务器 - if list[workedServerIndex].TaskStream == nil { - workedServerIndex++ - continue - } - // 如果此任务不可使用此服务器请求,跳过这个服务器(有些 IPv6 only 开了 NAT64 的机器请求 IPv4 总会出问题) - if (task.Cover == model.ServiceCoverAll && task.SkipServers[list[workedServerIndex].ID]) || - (task.Cover == model.ServiceCoverIgnoreAll && !task.SkipServers[list[workedServerIndex].ID]) { - workedServerIndex++ - continue - } - if task.Cover == model.ServiceCoverIgnoreAll && task.SkipServers[list[workedServerIndex].ID] { - server := list[workedServerIndex] - singleton.UserLock.RLock() - var role uint8 - if u, ok := singleton.UserInfoMap[server.UserID]; !ok { - role = model.RoleMember - } else { - role = u.Role + switch task.Cover { + case model.ServiceCoverIgnoreAll: + for id, enabled := range task.SkipServers { + if !enabled { + continue } - singleton.UserLock.RUnlock() - if task.UserID == server.UserID || role == model.RoleAdmin { - list[workedServerIndex].TaskStream.Send(task.PB()) + + server, _ := singleton.ServerShared.Get(id) + if server == nil || server.TaskStream == nil { + continue + } + + if canSendTaskToServer(task, server) { + server.TaskStream.Send(task.PB()) } - workedServerIndex++ - continue } - if task.Cover == model.ServiceCoverAll && !task.SkipServers[list[workedServerIndex].ID] { - server := list[workedServerIndex] - singleton.UserLock.RLock() - var role uint8 - if u, ok := singleton.UserInfoMap[server.UserID]; !ok { - role = model.RoleMember - } else { - role = u.Role + case model.ServiceCoverAll: + for id, server := range singleton.ServerShared.Range { + if server == nil || server.TaskStream == nil || task.SkipServers[id] { + continue } - singleton.UserLock.RUnlock() - if task.UserID == server.UserID || role == model.RoleAdmin { - list[workedServerIndex].TaskStream.Send(task.PB()) + + if canSendTaskToServer(task, server) { + server.TaskStream.Send(task.PB()) } - workedServerIndex++ - continue } } } @@ -203,3 +174,16 @@ func ServeNAT(w http.ResponseWriter, r *http.Request, natConfig *model.NAT) { rpcService.NezhaHandlerSingleton.StartStream(streamId, time.Second*10) } + +func canSendTaskToServer(task *model.Service, server *model.Server) bool { + var role uint8 + singleton.UserLock.RLock() + if u, ok := singleton.UserInfoMap[server.UserID]; !ok { + role = model.RoleMember + } else { + role = u.Role + } + singleton.UserLock.RUnlock() + + return task.UserID == server.UserID || role == model.RoleAdmin +} diff --git a/model/server.go b/model/server.go index 67f655e..ffd54e9 100644 --- a/model/server.go +++ b/model/server.go @@ -52,6 +52,7 @@ func (s *Server) CopyFromRunningServer(old *Server) { s.GeoIP = old.GeoIP s.LastActive = old.LastActive s.TaskStream = old.TaskStream + s.ConfigCache = old.ConfigCache s.PrevTransferInSnapshot = old.PrevTransferInSnapshot s.PrevTransferOutSnapshot = old.PrevTransferOutSnapshot } diff --git a/service/singleton/servicesentinel.go b/service/singleton/servicesentinel.go index c583725..ae1d07e 100644 --- a/service/singleton/servicesentinel.go +++ b/service/singleton/servicesentinel.go @@ -101,8 +101,12 @@ func NewServiceSentinel(serviceSentinelDispatchBus chan<- *model.Service, sc *Se notificationc: nc, crc: crc, } + // 加载历史记录 - ss.loadServiceHistory() + err := ss.loadServiceHistory() + if err != nil { + return nil, err + } year, month, day := time.Now().Date() today := time.Date(year, month, day, 0, 0, 0, 0, Loc) @@ -112,13 +116,13 @@ func NewServiceSentinel(serviceSentinelDispatchBus chan<- *model.Service, sc *Se DB.Where("created_at >= ?", today).Find(&mhs) totalDelay := make(map[uint64]float32) totalDelayCount := make(map[uint64]float32) - for i := 0; i < len(mhs); i++ { - totalDelay[mhs[i].ServiceID] += mhs[i].AvgDelay - totalDelayCount[mhs[i].ServiceID]++ - ss.serviceStatusToday[mhs[i].ServiceID].Up += int(mhs[i].Up) - ss.monthlyStatus[mhs[i].ServiceID].TotalUp += mhs[i].Up - ss.serviceStatusToday[mhs[i].ServiceID].Down += int(mhs[i].Down) - ss.monthlyStatus[mhs[i].ServiceID].TotalDown += mhs[i].Down + for _, mh := range mhs { + totalDelay[mh.ServiceID] += mh.AvgDelay + totalDelayCount[mh.ServiceID]++ + ss.serviceStatusToday[mh.ServiceID].Up += int(mh.Up) + ss.monthlyStatus[mh.ServiceID].TotalUp += mh.Up + ss.serviceStatusToday[mh.ServiceID].Down += int(mh.Down) + ss.monthlyStatus[mh.ServiceID].TotalDown += mh.Down } for id, delay := range totalDelay { ss.serviceStatusToday[id].Delay = delay / float32(totalDelayCount[id]) @@ -128,7 +132,7 @@ func NewServiceSentinel(serviceSentinelDispatchBus chan<- *model.Service, sc *Se go ss.worker() // 每日将游标往后推一天 - _, err := crc.AddFunc("0 0 0 * * *", ss.refreshMonthlyServiceStatus) + _, err = crc.AddFunc("0 0 0 * * *", ss.refreshMonthlyServiceStatus) if err != nil { return nil, err } @@ -155,7 +159,7 @@ func (ss *ServiceSentinel) refreshMonthlyServiceStatus() { ss.monthlyStatusLock.Lock() defer ss.monthlyStatusLock.Unlock() for k, v := range ss.monthlyStatus { - for i := 0; i < len(v.Up)-1; i++ { + for i := range len(v.Up) - 1 { if i == 0 { // 30 天在线率,减去已经出30天之外的数据 v.TotalDown -= uint64(v.Down[i]) @@ -195,34 +199,34 @@ func (ss *ServiceSentinel) UpdateServiceList() { } // loadServiceHistory 加载服务监控器的历史状态信息 -func (ss *ServiceSentinel) loadServiceHistory() { +func (ss *ServiceSentinel) loadServiceHistory() error { var services []*model.Service err := DB.Find(&services).Error if err != nil { - panic(err) + return err } - for i := 0; i < len(services); i++ { - task := services[i] + for _, service := range services { + task := service // 通过cron定时将服务监控任务传递给任务调度管道 - services[i].CronJobID, err = ss.crc.AddFunc(task.CronSpec(), func() { + service.CronJobID, err = ss.crc.AddFunc(task.CronSpec(), func() { ss.dispatchBus <- task }) if err != nil { - panic(err) + return err } - ss.services[services[i].ID] = services[i] - ss.serviceCurrentStatusData[services[i].ID] = make([]*pb.TaskResult, _CurrentStatusSize) - ss.serviceStatusToday[services[i].ID] = &_TodayStatsOfService{} + ss.services[service.ID] = service + ss.serviceCurrentStatusData[service.ID] = make([]*pb.TaskResult, _CurrentStatusSize) + ss.serviceStatusToday[service.ID] = &_TodayStatsOfService{} } ss.serviceList = services year, month, day := time.Now().Date() today := time.Date(year, month, day, 0, 0, 0, 0, Loc) - for i := 0; i < len(services); i++ { - ss.monthlyStatus[services[i].ID] = &serviceResponseItem{ - service: services[i], + for _, service := range services { + ss.monthlyStatus[service.ID] = &serviceResponseItem{ + service: service, ServiceResponseItem: model.ServiceResponseItem{ Delay: &[30]float32{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Up: &[30]int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -235,18 +239,20 @@ func (ss *ServiceSentinel) loadServiceHistory() { var mhs []model.ServiceHistory DB.Where("created_at > ? AND created_at < ?", today.AddDate(0, 0, -29), today).Find(&mhs) var delayCount = make(map[int]int) - for i := 0; i < len(mhs); i++ { - dayIndex := 28 - (int(today.Sub(mhs[i].CreatedAt).Hours()) / 24) + for _, mh := range mhs { + dayIndex := 28 - (int(today.Sub(mh.CreatedAt).Hours()) / 24) if dayIndex < 0 { continue } - ss.monthlyStatus[mhs[i].ServiceID].Delay[dayIndex] = (ss.monthlyStatus[mhs[i].ServiceID].Delay[dayIndex]*float32(delayCount[dayIndex]) + mhs[i].AvgDelay) / float32(delayCount[dayIndex]+1) + ss.monthlyStatus[mh.ServiceID].Delay[dayIndex] = (ss.monthlyStatus[mh.ServiceID].Delay[dayIndex]*float32(delayCount[dayIndex]) + mh.AvgDelay) / float32(delayCount[dayIndex]+1) delayCount[dayIndex]++ - ss.monthlyStatus[mhs[i].ServiceID].Up[dayIndex] += int(mhs[i].Up) - ss.monthlyStatus[mhs[i].ServiceID].TotalUp += mhs[i].Up - ss.monthlyStatus[mhs[i].ServiceID].Down[dayIndex] += int(mhs[i].Down) - ss.monthlyStatus[mhs[i].ServiceID].TotalDown += mhs[i].Down + ss.monthlyStatus[mh.ServiceID].Up[dayIndex] += int(mh.Up) + ss.monthlyStatus[mh.ServiceID].TotalUp += mh.Up + ss.monthlyStatus[mh.ServiceID].Down[dayIndex] += int(mh.Down) + ss.monthlyStatus[mh.ServiceID].TotalDown += mh.Down } + + return nil } func (ss *ServiceSentinel) Update(m *model.Service) error { @@ -470,11 +476,11 @@ func (ss *ServiceSentinel) worker() { ss.serviceResponseDataStoreCurrentAvgDelay[mh.GetId()] = 0 // 永远是最新的 30 个数据的状态 [01:00, 02:00, 03:00] -> [04:00, 02:00, 03: 00] - for i := 0; i < len(ss.serviceCurrentStatusData[mh.GetId()]); i++ { - if ss.serviceCurrentStatusData[mh.GetId()][i].GetId() > 0 { - if ss.serviceCurrentStatusData[mh.GetId()][i].Successful { + for _, cs := range ss.serviceCurrentStatusData[mh.GetId()] { + if cs.GetId() > 0 { + if cs.Successful { ss.serviceResponseDataStoreCurrentUp[mh.GetId()]++ - ss.serviceResponseDataStoreCurrentAvgDelay[mh.GetId()] = (ss.serviceResponseDataStoreCurrentAvgDelay[mh.GetId()]*float32(ss.serviceResponseDataStoreCurrentUp[mh.GetId()]-1) + ss.serviceCurrentStatusData[mh.GetId()][i].Delay) / float32(ss.serviceResponseDataStoreCurrentUp[mh.GetId()]) + ss.serviceResponseDataStoreCurrentAvgDelay[mh.GetId()] = (ss.serviceResponseDataStoreCurrentAvgDelay[mh.GetId()]*float32(ss.serviceResponseDataStoreCurrentUp[mh.GetId()]-1) + cs.Delay) / float32(ss.serviceResponseDataStoreCurrentUp[mh.GetId()]) } else { ss.serviceResponseDataStoreCurrentDown[mh.GetId()]++ }