From f51c12ed9a807cfd859f354fdc21d61c6740e304 Mon Sep 17 00:00:00 2001 From: Jamie Curnow Date: Fri, 15 Jul 2022 08:52:38 +1000 Subject: [PATCH] New JobQueue worker --- backend/cmd/server/main.go | 15 +++-- .../20201013035318_initial_schema.sql | 12 ++-- .../internal/entity/certificate/methods.go | 23 +++++++ backend/internal/entity/host/methods.go | 17 +++++ backend/internal/entity/host/model.go | 11 ++++ backend/internal/jobqueue/main.go | 46 ++++++++++++++ backend/internal/jobqueue/models.go | 58 +++++++++++++++++ backend/internal/jobqueue/worker.go | 36 +++++++++++ backend/internal/state/state.go | 31 --------- backend/internal/worker/certificate.go | 63 ------------------- 10 files changed, 209 insertions(+), 103 deletions(-) create mode 100644 backend/internal/jobqueue/main.go create mode 100644 backend/internal/jobqueue/models.go create mode 100644 backend/internal/jobqueue/worker.go delete mode 100644 backend/internal/state/state.go delete mode 100644 backend/internal/worker/certificate.go diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 23e9bdc..f1e5ad3 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -8,10 +8,11 @@ import ( "npm/internal/api" "npm/internal/config" "npm/internal/database" + "npm/internal/entity/certificate" + "npm/internal/entity/host" "npm/internal/entity/setting" + "npm/internal/jobqueue" "npm/internal/logger" - "npm/internal/state" - "npm/internal/worker" ) var commit string @@ -21,13 +22,17 @@ var sentryDSN string func main() { config.InitArgs(&version, &commit) config.Init(&version, &commit, &sentryDSN) - appstate := state.NewState() database.Migrate(func() { setting.ApplySettings() database.CheckSetup() - go worker.StartCertificateWorker(appstate) + // Internal Job Queue + jobqueue.Start() + certificate.AddPendingJobs() + host.AddPendingJobs() + + // Http server api.StartServer() irqchan := make(chan os.Signal, 1) signal.Notify(irqchan, syscall.SIGINT, syscall.SIGTERM) @@ -40,6 +45,8 @@ func main() { if err != nil { logger.Error("DatabaseCloseError", err) } + // nolint + jobqueue.Shutdown() break } } diff --git a/backend/embed/migrations/20201013035318_initial_schema.sql b/backend/embed/migrations/20201013035318_initial_schema.sql index 417b540..3b95c50 100644 --- a/backend/embed/migrations/20201013035318_initial_schema.sql +++ b/backend/embed/migrations/20201013035318_initial_schema.sql @@ -180,11 +180,11 @@ CREATE TABLE IF NOT EXISTS `host` user_id INTEGER NOT NULL, type TEXT NOT NULL, host_template_id INTEGER NOT NULL, - listen_interface TEXT NOT NULL, + listen_interface TEXT NOT NULL DEFAULT "", domain_names TEXT NOT NULL, - upstream_id INTEGER NOT NULL, - certificate_id INTEGER, - access_list_id INTEGER, + upstream_id INTEGER NOT NULL DEFAULT 0, + certificate_id INTEGER NOT NULL DEFAULT 0, + access_list_id INTEGER NOT NULL DEFAULT 0, ssl_forced INTEGER NOT NULL DEFAULT 0, caching_enabled INTEGER NOT NULL DEFAULT 0, block_exploits INTEGER NOT NULL DEFAULT 0, @@ -192,9 +192,11 @@ CREATE TABLE IF NOT EXISTS `host` http2_support INTEGER NOT NULL DEFAULT 0, hsts_enabled INTEGER NOT NULL DEFAULT 0, hsts_subdomains INTEGER NOT NULL DEFAULT 0, - paths TEXT NOT NULL, + paths TEXT NOT NULL DEFAULT "", upstream_options TEXT NOT NULL DEFAULT "", advanced_config TEXT NOT NULL DEFAULT "", + status TEXT NOT NULL DEFAULT "", + error_message TEXT NOT NULL DEFAULT "", is_disabled INTEGER NOT NULL DEFAULT 0, is_deleted INTEGER NOT NULL DEFAULT 0, FOREIGN KEY (user_id) REFERENCES user (id), diff --git a/backend/internal/entity/certificate/methods.go b/backend/internal/entity/certificate/methods.go index e24181b..f08cc5c 100644 --- a/backend/internal/entity/certificate/methods.go +++ b/backend/internal/entity/certificate/methods.go @@ -8,6 +8,7 @@ import ( "npm/internal/database" "npm/internal/entity" "npm/internal/errors" + "npm/internal/jobqueue" "npm/internal/logger" "npm/internal/model" ) @@ -172,3 +173,25 @@ func GetByStatus(status string) ([]Model, error) { return models, err } + +// AddPendingJobs is intended to be used at startup to add +// anything pending to the JobQueue just once, based on +// the database row status +func AddPendingJobs() { + rows, err := GetByStatus(StatusReady) + if err != nil { + logger.Error("AddPendingJobsError", err) + return + } + + for _, row := range rows { + logger.Debug("Adding RequestCertificate job: %+v", row) + err := jobqueue.AddJob(jobqueue.Job{ + Name: "RequestCertificate", + Action: row.Request, + }) + if err != nil { + logger.Error("AddPendingJobsError", err) + } + } +} diff --git a/backend/internal/entity/host/methods.go b/backend/internal/entity/host/methods.go index ddf64dc..1de2a7d 100644 --- a/backend/internal/entity/host/methods.go +++ b/backend/internal/entity/host/methods.go @@ -50,6 +50,8 @@ func create(host *Model) (int, error) { paths, upstream_options, advanced_config, + status, + error_message, is_disabled, is_deleted ) VALUES ( @@ -73,6 +75,8 @@ func create(host *Model) (int, error) { :paths, :upstream_options, :advanced_config, + :status, + :error_message, :is_disabled, :is_deleted )`, host) @@ -86,6 +90,8 @@ func create(host *Model) (int, error) { return 0, lastErr } + logger.Debug("Created Host: %+v", host) + return int(last), nil } @@ -120,10 +126,14 @@ func update(host *Model) error { paths = :paths, upstream_options = :upstream_options, advanced_config = :advanced_config, + status = :status, + error_message = :error_message, is_disabled = :is_disabled, is_deleted = :is_deleted WHERE id = :id`, host) + logger.Debug("Updated Host: %+v", host) + return err } @@ -181,3 +191,10 @@ func List(pageInfo model.PageInfo, filters []model.Filter, expand []string) (Lis return result, nil } + +// AddPendingJobs is intended to be used at startup to add +// anything pending to the JobQueue just once, based on +// the database row status +func AddPendingJobs() { + // todo +} diff --git a/backend/internal/entity/host/model.go b/backend/internal/entity/host/model.go index e9eb88f..74dd208 100644 --- a/backend/internal/entity/host/model.go +++ b/backend/internal/entity/host/model.go @@ -20,6 +20,12 @@ const ( RedirectionHostType = "redirection" // DeadHostType is self explanatory DeadHostType = "dead" + // StatusReady means a host is ready to configure + StatusReady = "ready" + // StatusOK means a host is configured within Nginx + StatusOK = "ok" + // StatusError is self explanatory + StatusError = "error" ) // Model is the user model @@ -45,6 +51,8 @@ type Model struct { Paths string `json:"paths" db:"paths" filter:"paths,string"` UpstreamOptions string `json:"upstream_options" db:"upstream_options" filter:"upstream_options,string"` AdvancedConfig string `json:"advanced_config" db:"advanced_config" filter:"advanced_config,string"` + Status string `json:"status" db:"status" filter:"status,string"` + ErrorMessage string `json:"error_message" db:"error_message" filter:"error_message,string"` IsDisabled bool `json:"is_disabled" db:"is_disabled" filter:"is_disabled,boolean"` IsDeleted bool `json:"is_deleted,omitempty" db:"is_deleted"` // Expansions @@ -81,6 +89,9 @@ func (m *Model) Save() error { return fmt.Errorf("User ID must be specified") } + // Set this host as requiring reconfiguration + m.Status = StatusReady + if m.ID == 0 { m.ID, err = create(m) } else { diff --git a/backend/internal/jobqueue/main.go b/backend/internal/jobqueue/main.go new file mode 100644 index 0000000..4b8e4a5 --- /dev/null +++ b/backend/internal/jobqueue/main.go @@ -0,0 +1,46 @@ +package jobqueue + +import ( + "context" + "errors" +) + +var ( + ctx context.Context + cancel context.CancelFunc + worker *Worker +) + +// Start ... +func Start() { + ctx, cancel = context.WithCancel(context.Background()) + q := &Queue{ + jobs: make(chan Job), + ctx: ctx, + cancel: cancel, + } + + // Defines a queue worker, which will execute our queue. + worker = newWorker(q) + + // Execute jobs in queue. + go worker.doWork() +} + +// AddJob adds a job to the queue for processing +func AddJob(j Job) error { + if worker == nil { + return errors.New("Unable to add job, jobqueue has not been started") + } + worker.Queue.AddJob(j) + return nil +} + +// Shutdown ... +func Shutdown() error { + if cancel == nil { + return errors.New("Unable to shutdown, jobqueue has not been started") + } + cancel() + return nil +} diff --git a/backend/internal/jobqueue/models.go b/backend/internal/jobqueue/models.go new file mode 100644 index 0000000..6bf03bb --- /dev/null +++ b/backend/internal/jobqueue/models.go @@ -0,0 +1,58 @@ +package jobqueue + +import ( + "context" + "log" + "sync" +) + +// Queue holds name, list of jobs and context with cancel. +type Queue struct { + jobs chan Job + ctx context.Context + cancel context.CancelFunc +} + +// Job - holds logic to perform some operations during queue execution. +type Job struct { + Name string + Action func() error // A function that should be executed when the job is running. +} + +// AddJobs adds jobs to the queue and cancels channel. +func (q *Queue) AddJobs(jobs []Job) { + var wg sync.WaitGroup + wg.Add(len(jobs)) + + for _, job := range jobs { + // Goroutine which adds job to the queue. + go func(job Job) { + q.AddJob(job) + wg.Done() + }(job) + } + + go func() { + wg.Wait() + // Cancel queue channel, when all goroutines were done. + q.cancel() + }() +} + +// AddJob sends job to the channel. +func (q *Queue) AddJob(job Job) { + q.jobs <- job + log.Printf("New job %s added to queue", job.Name) +} + +// Run performs job execution. +func (j Job) Run() error { + log.Printf("Job running: %s", j.Name) + + err := j.Action() + if err != nil { + return err + } + + return nil +} diff --git a/backend/internal/jobqueue/worker.go b/backend/internal/jobqueue/worker.go new file mode 100644 index 0000000..c021fbe --- /dev/null +++ b/backend/internal/jobqueue/worker.go @@ -0,0 +1,36 @@ +package jobqueue + +import ( + "fmt" + "npm/internal/logger" +) + +// Worker responsible for queue serving. +type Worker struct { + Queue *Queue +} + +func newWorker(queue *Queue) *Worker { + return &Worker{ + Queue: queue, + } +} + +// doWork processes jobs from the queue (jobs channel). +func (w *Worker) doWork() bool { + for { + select { + // if context was canceled. + case <-w.Queue.ctx.Done(): + logger.Info("JobQueue worker graceful shutdown") + return true + // if job received. + case job := <-w.Queue.jobs: + err := job.Run() + if err != nil { + logger.Error(fmt.Sprintf("%sError", job.Name), err) + continue + } + } + } +} diff --git a/backend/internal/state/state.go b/backend/internal/state/state.go deleted file mode 100644 index 0fa1433..0000000 --- a/backend/internal/state/state.go +++ /dev/null @@ -1,31 +0,0 @@ -package state - -import ( - "sync" -) - -// AppState holds pointers to channels and waitGroups -// shared by all goroutines of the application -type AppState struct { - waitGroup sync.WaitGroup - termSig chan bool -} - -// NewState creates a new app state -func NewState() *AppState { - state := &AppState{ - // buffered channel - termSig: make(chan bool, 1), - } - return state -} - -// GetWaitGroup returns the state's wg -func (state *AppState) GetWaitGroup() *sync.WaitGroup { - return &state.waitGroup -} - -// GetTermSig returns the state's term signal -func (state *AppState) GetTermSig() chan bool { - return state.termSig -} diff --git a/backend/internal/worker/certificate.go b/backend/internal/worker/certificate.go deleted file mode 100644 index a108010..0000000 --- a/backend/internal/worker/certificate.go +++ /dev/null @@ -1,63 +0,0 @@ -package worker - -import ( - "time" - - "npm/internal/entity/certificate" - "npm/internal/logger" - "npm/internal/state" -) - -type certificateWorker struct { - state *state.AppState -} - -// StartCertificateWorker starts the CertificateWorker -func StartCertificateWorker(state *state.AppState) { - worker := newCertificateWorker(state) - logger.Info("CertificateWorker Started") - worker.Run() -} - -func newCertificateWorker(state *state.AppState) *certificateWorker { - return &certificateWorker{ - state: state, - } -} - -// Run the CertificateWorker -func (w *certificateWorker) Run() { - // global wait group - gwg := w.state.GetWaitGroup() - gwg.Add(1) - - ticker := time.NewTicker(15 * time.Second) -mainLoop: - for { - select { - case _, more := <-w.state.GetTermSig(): - if !more { - logger.Info("Terminating CertificateWorker ... ") - break mainLoop - } - case <-ticker.C: - // Can confirm that this will wait for completion before the next loop - requestCertificates() - } - } -} - -func requestCertificates() { - // logger.Debug("requestCertificates fired") - rows, err := certificate.GetByStatus(certificate.StatusReady) - if err != nil { - logger.Error("requestCertificatesError", err) - return - } - - for _, row := range rows { - if err := row.Request(); err != nil { - logger.Error("CertificateRequestError", err) - } - } -}