From a41f623dd2afa4b076dcab715db5e9b9b0aca08a Mon Sep 17 00:00:00 2001 From: UUBulb <35923940+uubulb@users.noreply.github.com> Date: Tue, 4 Feb 2025 11:25:49 +0800 Subject: [PATCH] feat: batch set server config (#983) * feat: batch set server config * run in parallel * fix route * fix * return some information * fix order --- cmd/dashboard/controller/controller.go | 4 +- cmd/dashboard/controller/server.go | 87 +++++++++++++++++--------- model/server_api.go | 7 ++- pkg/ddns/ddns.go | 2 +- service/rpc/nezha.go | 4 +- service/singleton/notification.go | 2 +- 6 files changed, 69 insertions(+), 37 deletions(-) diff --git a/cmd/dashboard/controller/controller.go b/cmd/dashboard/controller/controller.go index b883c0f..ba268f9 100644 --- a/cmd/dashboard/controller/controller.go +++ b/cmd/dashboard/controller/controller.go @@ -110,8 +110,8 @@ func routers(r *gin.Engine, frontendDist fs.FS) { auth.GET("/server", listHandler(listServer)) auth.PATCH("/server/:id", commonHandler(updateServer)) - auth.GET("/server/:id/config", commonHandler(getServerConfig)) - auth.POST("/server/:id/config", commonHandler(setServerConfig)) + auth.GET("/server/config/:id", commonHandler(getServerConfig)) + auth.POST("/server/config", commonHandler(setServerConfig)) auth.POST("/batch-delete/server", commonHandler(batchDeleteServer)) auth.POST("/force-update/server", commonHandler(forceUpdateServer)) diff --git a/cmd/dashboard/controller/server.go b/cmd/dashboard/controller/server.go index f8a208c..553f575 100644 --- a/cmd/dashboard/controller/server.go +++ b/cmd/dashboard/controller/server.go @@ -2,6 +2,7 @@ package controller import ( "strconv" + "sync" "time" "github.com/gin-gonic/gin" @@ -182,15 +183,15 @@ func batchDeleteServer(c *gin.Context) (any, error) { // @Accept json // @param request body []uint64 true "id list" // @Produce json -// @Success 200 {object} model.CommonResponse[model.ForceUpdateResponse] +// @Success 200 {object} model.CommonResponse[model.ServerTaskResponse] // @Router /force-update/server [post] -func forceUpdateServer(c *gin.Context) (*model.ForceUpdateResponse, error) { +func forceUpdateServer(c *gin.Context) (*model.ServerTaskResponse, error) { var forceUpdateServers []uint64 if err := c.ShouldBindJSON(&forceUpdateServers); err != nil { return nil, err } - forceUpdateResp := new(model.ForceUpdateResponse) + forceUpdateResp := new(model.ServerTaskResponse) for _, sid := range forceUpdateServers { singleton.ServerLock.RLock() @@ -223,7 +224,7 @@ func forceUpdateServer(c *gin.Context) (*model.ForceUpdateResponse, error) { // @Tags auth required // @Produce json // @Success 200 {object} model.CommonResponse[string] -// @Router /server/{id}/config [get] +// @Router /server/config/{id} [get] func getServerConfig(c *gin.Context) (string, error) { idStr := c.Param("id") id, err := strconv.ParseUint(idStr, 10, 64) @@ -273,40 +274,66 @@ func getServerConfig(c *gin.Context) (string, error) { // @Description Set server config // @Tags auth required // @Accept json -// @param request body string true "config" +// @Param body body model.ServerConfigForm true "ServerConfigForm" // @Produce json -// @Success 200 {object} model.CommonResponse[any] -// @Router /server/{id}/config [post] -func setServerConfig(c *gin.Context) (any, error) { - idStr := c.Param("id") - id, err := strconv.ParseUint(idStr, 10, 64) - if err != nil { - return "", err - } - - var configRaw string - if err := c.ShouldBindJSON(&configRaw); err != nil { +// @Success 200 {object} model.CommonResponse[model.ServerTaskResponse] +// @Router /server/config [post] +func setServerConfig(c *gin.Context) (*model.ServerTaskResponse, error) { + var configForm model.ServerConfigForm + if err := c.ShouldBindJSON(&configForm); err != nil { return nil, err } + var resp model.ServerTaskResponse singleton.ServerLock.RLock() - s, ok := singleton.ServerList[id] - if !ok || s.TaskStream == nil { - singleton.ServerLock.RUnlock() - return "", nil + servers := make([]*model.Server, 0, len(configForm.Servers)) + for _, sid := range configForm.Servers { + if s, ok := singleton.ServerList[sid]; ok { + if !s.HasPermission(c) { + singleton.ServerLock.RUnlock() + return nil, singleton.Localizer.ErrorT("permission denied") + } + if s.TaskStream == nil { + resp.Offline = append(resp.Offline, s.ID) + continue + } + servers = append(servers, s) + } } singleton.ServerLock.RUnlock() - if !s.HasPermission(c) { - return "", singleton.Localizer.ErrorT("permission denied") + var wg sync.WaitGroup + var respMu sync.Mutex + + for i := 0; i < len(servers); i += 10 { + end := i + 10 + if end > len(servers) { + end = len(servers) + } + group := servers[i:end] + + wg.Add(1) + go func(srvGroup []*model.Server) { + defer wg.Done() + for _, s := range srvGroup { + // Create and send the task. + task := &pb.Task{ + Type: model.TaskTypeApplyConfig, + Data: configForm.Config, + } + if err := s.TaskStream.Send(task); err != nil { + respMu.Lock() + resp.Failure = append(resp.Failure, s.ID) + respMu.Unlock() + continue + } + respMu.Lock() + resp.Success = append(resp.Success, s.ID) + respMu.Unlock() + } + }(group) } - if err := s.TaskStream.Send(&pb.Task{ - Type: model.TaskTypeApplyConfig, - Data: configRaw, - }); err != nil { - return "", err - } - - return nil, nil + wg.Wait() + return &resp, nil } diff --git a/model/server_api.go b/model/server_api.go index 3155637..846a492 100644 --- a/model/server_api.go +++ b/model/server_api.go @@ -31,7 +31,12 @@ type ServerForm struct { OverrideDDNSDomains map[uint64][]string `json:"override_ddns_domains,omitempty" validate:"optional"` } -type ForceUpdateResponse struct { +type ServerConfigForm struct { + Servers []uint64 `json:"servers,omitempty"` + Config string `json:"config,omitempty"` +} + +type ServerTaskResponse struct { Success []uint64 `json:"success,omitempty" validate:"optional"` Failure []uint64 `json:"failure,omitempty" validate:"optional"` Offline []uint64 `json:"offline,omitempty" validate:"optional"` diff --git a/pkg/ddns/ddns.go b/pkg/ddns/ddns.go index ce566d1..6088103 100644 --- a/pkg/ddns/ddns.go +++ b/pkg/ddns/ddns.go @@ -54,7 +54,7 @@ func (provider *Provider) UpdateDomain(ctx context.Context, overrideDomains ...s if err := provider.updateDomain(domain); err != nil { log.Printf("NEZHA>> Failed to update DNS record of domain %s: %v", domain, err) } else { - log.Printf("NEZHA>> Update DNS record of domain %s succeed", domain) + log.Printf("NEZHA>> Update DNS record of domain %s succeeded", domain) break } } diff --git a/service/rpc/nezha.go b/service/rpc/nezha.go index 9c3e0ab..7f3b73a 100644 --- a/service/rpc/nezha.go +++ b/service/rpc/nezha.go @@ -44,9 +44,9 @@ func (s *NezhaHandler) RequestTask(stream pb.NezhaService_RequestTaskServer) err return err } - singleton.ServerLock.RLock() + singleton.ServerLock.Lock() singleton.ServerList[clientID].TaskStream = stream - singleton.ServerLock.RUnlock() + singleton.ServerLock.Unlock() var result *pb.TaskResult for { diff --git a/service/singleton/notification.go b/service/singleton/notification.go index b040214..9029996 100644 --- a/service/singleton/notification.go +++ b/service/singleton/notification.go @@ -270,7 +270,7 @@ func SendNotification(notificationGroupID uint64, desc string, muteLabel *string if err := ns.Send(desc); err != nil { log.Printf("NEZHA>> Sending notification to %s failed: %v", n.Name, err) } else { - log.Printf("NEZHA>> Sending notification to %s succeed", n.Name) + log.Printf("NEZHA>> Sending notification to %s succeeded", n.Name) } } }