From f9aef3ce09a3b2489dac3107a6e3fd3cb125f4cb Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Tue, 12 Mar 2024 21:10:30 +0100 Subject: [PATCH] chore: Optimizing queries (#836) * Gitops queue to notify on new deploys * Gitops queue to notify on new deploys * Processing may block without a buffer * Fixing tests --- cmd/dashboard/dashboard.go | 18 ++++++++++-------- pkg/client/client_test.go | 2 +- pkg/dashboard/server/artifacts.go | 8 ++++++++ pkg/dashboard/server/artifacts_test.go | 1 + pkg/dashboard/server/releases.go | 6 ++++++ pkg/dashboard/server/router.go | 3 ++- pkg/dashboard/server/router_test.go | 1 + pkg/dashboard/worker/artifacts.go | 12 +++++++----- pkg/dashboard/worker/branchDeleteEvent.go | 4 ++-- pkg/dashboard/worker/gitops.go | 23 ++++++++++++++++++----- pkg/dashboard/worker/imageBuildWorker.go | 9 +++++++-- pkg/git/nativeGit/repoCache.go | 4 ++-- 12 files changed, 65 insertions(+), 26 deletions(-) diff --git a/cmd/dashboard/dashboard.go b/cmd/dashboard/dashboard.go index c3e6f719e..5b218e49d 100644 --- a/cmd/dashboard/dashboard.go +++ b/cmd/dashboard/dashboard.go @@ -100,12 +100,9 @@ func main() { ) go alertStateManager.Run() - stopCh := make(chan struct{}) + stopCh := make(chan os.Signal, 1) defer close(stopCh) - - gimletdStopCh := make(chan os.Signal, 1) - signal.Notify(gimletdStopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - waitCh := make(chan struct{}) + signal.Notify(stopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) if config.GitopsRepo != "" || config.GitopsRepoDeployKeyPath != "" { panic("GITOPS_REPO and GITOPS_REPO_DEPLOY_KEY_PATH are deprecated." + @@ -148,10 +145,13 @@ func main() { go repoCache.Run() log.Info("Repo cache initialized") + gitopsQueue := make(chan int, 1000) + artifactsWorker := worker.NewArtifactsWorker( repoCache, store, triggerArtifactGeneration, + gitopsQueue, ) go artifactsWorker.Run() @@ -166,7 +166,7 @@ func main() { go weeklyReporter.Run() } - imageBuildWorker := worker.NewImageBuildWorker(store, successfullImageBuilds) + imageBuildWorker := worker.NewImageBuildWorker(store, successfullImageBuilds, gitopsQueue) go imageBuildWorker.Run() chartUpdatePullRequests := map[string]interface{}{} @@ -213,9 +213,9 @@ func main() { perf, gitUser, config.GitHost, + gitopsQueue, ) go gitopsWorker.Run() - log.Info("Gitops worker started") if config.ReleaseStats == "enabled" { releaseStateWorker := &worker.ReleaseStateWorker{ @@ -268,6 +268,7 @@ func main() { logger, gitServer, gitUser, + gitopsQueue, ) go func() { @@ -285,7 +286,8 @@ func main() { } } - <-waitCh + <-stopCh + close(gitopsQueue) log.Info("Successfully cleaned up resources. Stopping.") } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index f03eb8c4a..ed7d35663 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -24,7 +24,7 @@ func Test_artifact(t *testing.T) { store := store.NewTest(encryptionKey, encryptionKeyNew) logger := logrus.Logger{} - router := server.SetupRouter(&config.Config{}, &dynamicconfig.DynamicConfig{}, nil, nil, nil, store, nil, nil, nil, nil, nil, nil, nil, &logger, nil, nil) + router := server.SetupRouter(&config.Config{}, &dynamicconfig.DynamicConfig{}, nil, nil, nil, store, nil, nil, nil, nil, nil, nil, nil, &logger, nil, nil, make(chan int, 10)) server := httptest.NewServer(router) defer server.Close() diff --git a/pkg/dashboard/server/artifacts.go b/pkg/dashboard/server/artifacts.go index 6e2b19acf..58d385281 100644 --- a/pkg/dashboard/server/artifacts.go +++ b/pkg/dashboard/server/artifacts.go @@ -38,6 +38,11 @@ func saveArtifact(w http.ResponseWriter, r *http.Request) { } savedArtifact, err := model.ToArtifact(savedEvent) + if err != nil { + logrus.Errorf("cannot serialize artifact: %s", err) + http.Error(w, http.StatusText(500), 500) + return + } artifactStr, err := json.Marshal(savedArtifact) if err != nil { logrus.Errorf("cannot serialize artifact: %s", err) @@ -47,6 +52,9 @@ func saveArtifact(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusCreated) w.Write(artifactStr) + + gitopsQueue := ctx.Value("gitopsQueue").(chan int) + gitopsQueue <- 1 } func getArtifacts(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/dashboard/server/artifacts_test.go b/pkg/dashboard/server/artifacts_test.go index 2ed0e9161..a6d8e4867 100644 --- a/pkg/dashboard/server/artifacts_test.go +++ b/pkg/dashboard/server/artifacts_test.go @@ -51,6 +51,7 @@ func Test_saveArtifact(t *testing.T) { _, body, err := testEndpoint(saveArtifact, func(ctx context.Context) context.Context { ctx = context.WithValue(ctx, "store", store) + ctx = context.WithValue(ctx, "gitopsQueue", make(chan int, 10)) return ctx }, "/path") assert.Nil(t, err) diff --git a/pkg/dashboard/server/releases.go b/pkg/dashboard/server/releases.go index c1911fee3..c01832977 100644 --- a/pkg/dashboard/server/releases.go +++ b/pkg/dashboard/server/releases.go @@ -336,6 +336,9 @@ func release(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("%s - cannot save release request: %s", http.StatusText(http.StatusInternalServerError), err), http.StatusInternalServerError) return } + + gitopsQueue := ctx.Value("gitopsQueue").(chan int) + gitopsQueue <- 1 } eventIDBytes, _ := json.Marshal(map[string]string{ @@ -576,6 +579,9 @@ func performRollback(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusCreated) w.Write(eventIDBytes) + + gitopsQueue := ctx.Value("gitopsQueue").(chan int) + gitopsQueue <- 1 } func delete(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/dashboard/server/router.go b/pkg/dashboard/server/router.go index b23400c6e..9ad6cb27d 100644 --- a/pkg/dashboard/server/router.go +++ b/pkg/dashboard/server/router.go @@ -46,6 +46,7 @@ func SetupRouter( logger *log.Logger, gitServer http.Handler, gitUser *model.User, + gitopsQueue chan int, ) *chi.Mux { r := chi.NewRouter() @@ -67,7 +68,7 @@ func SetupRouter( r.Use(middleware.WithValue("gitopsUpdatePullRequests", gitopsUpdatePullRequests)) r.Use(middleware.WithValue("router", r)) r.Use(middleware.WithValue("gitUser", gitUser)) - + r.Use(middleware.WithValue("gitopsQueue", gitopsQueue)) r.Use(middleware.WithValue("notificationsManager", notificationsManager)) r.Use(middleware.WithValue("perf", perf)) diff --git a/pkg/dashboard/server/router_test.go b/pkg/dashboard/server/router_test.go index b82eb75f2..35bc8dd3d 100644 --- a/pkg/dashboard/server/router_test.go +++ b/pkg/dashboard/server/router_test.go @@ -37,6 +37,7 @@ func Test_MustUser(t *testing.T) { &logger, nil, nil, + nil, ) server := httptest.NewServer(router) defer server.Close() diff --git a/pkg/dashboard/worker/artifacts.go b/pkg/dashboard/worker/artifacts.go index 0fa83a117..f941b52c8 100644 --- a/pkg/dashboard/worker/artifacts.go +++ b/pkg/dashboard/worker/artifacts.go @@ -20,14 +20,16 @@ type ArtifactsWorker struct { gitRepoCache *nativeGit.RepoCache dao *store.Store trigger chan string + gitopsQueue chan int } func NewArtifactsWorker( gitRepoCache *nativeGit.RepoCache, dao *store.Store, trigger chan string, + gitopsQueue chan int, ) *ArtifactsWorker { - return &ArtifactsWorker{gitRepoCache: gitRepoCache, dao: dao, trigger: trigger} + return &ArtifactsWorker{gitRepoCache: gitRepoCache, dao: dao, trigger: trigger, gitopsQueue: gitopsQueue} } func (a *ArtifactsWorker) Run() { @@ -37,9 +39,6 @@ func (a *ArtifactsWorker) Run() { } } -// - should not trigger policy, if there is a newer artifact - geezus -// should be called thourhg a channel, with a go routine to not wait for a lock -// but then it also has to stream stuff to the gui func (a *ArtifactsWorker) assureGimletArtifacts(repoName string) error { err := a.gitRepoCache.PerformAction(repoName, func(repo *git.Repository) error { var innerErr error @@ -55,7 +54,10 @@ func (a *ArtifactsWorker) assureGimletArtifacts(repoName string) error { slices.Reverse(hashes) //artifacts should be generated in commit creation order - return generateFakeArtifactsForCommits(repoName, headBranch, hashes, a.dao, repo) + err := generateFakeArtifactsForCommits(repoName, headBranch, hashes, a.dao, repo) + a.gitopsQueue <- 1 + + return err }) return err diff --git a/pkg/dashboard/worker/branchDeleteEvent.go b/pkg/dashboard/worker/branchDeleteEvent.go index 8263e6a84..ea9e6da8a 100644 --- a/pkg/dashboard/worker/branchDeleteEvent.go +++ b/pkg/dashboard/worker/branchDeleteEvent.go @@ -29,14 +29,14 @@ type BranchDeleteEventWorker struct { tokenManager customScm.NonImpersonatedTokenManager cachePath string dao *store.Store - stopCh chan struct{} + stopCh chan os.Signal } func NewBranchDeleteEventWorker( tokenManager customScm.NonImpersonatedTokenManager, cachePath string, dao *store.Store, - stopCh chan struct{}, + stopCh chan os.Signal, ) *BranchDeleteEventWorker { branchDeleteEventWorker := &BranchDeleteEventWorker{ tokenManager: tokenManager, diff --git a/pkg/dashboard/worker/gitops.go b/pkg/dashboard/worker/gitops.go index feb2fe398..a0d10bbf1 100644 --- a/pkg/dashboard/worker/gitops.go +++ b/pkg/dashboard/worker/gitops.go @@ -42,6 +42,7 @@ type GitopsWorker struct { perf *prometheus.HistogramVec gitUser *model.User gitHost string + gitopsQueue chan int } func NewGitopsWorker( @@ -54,6 +55,7 @@ func NewGitopsWorker( perf *prometheus.HistogramVec, gitUser *model.User, gitHost string, + gitopsQueue chan int, ) *GitopsWorker { return &GitopsWorker{ store: store, @@ -65,18 +67,31 @@ func NewGitopsWorker( perf: perf, gitUser: gitUser, gitHost: gitHost, + gitopsQueue: gitopsQueue, } } func (w *GitopsWorker) Run() { + ticker := time.NewTicker(1 * time.Minute) + defer func() { + ticker.Stop() + }() + + logrus.Info("Gitops worker started") for { + select { + case _, ok := <-w.gitopsQueue: + if !ok { + logrus.Info("Gitops worker stopped") + return + } + case <-ticker.C: + } + events, err := w.store.UnprocessedEvents() if err != nil { logrus.Errorf("Could not fetch unprocessed events %s", err.Error()) - time.Sleep(1 * time.Second) - continue } - for _, event := range events { w.eventsProcessed.Inc() processEvent(w.store, @@ -90,8 +105,6 @@ func (w *GitopsWorker) Run() { w.gitHost, ) } - - time.Sleep(100 * time.Millisecond) } } diff --git a/pkg/dashboard/worker/imageBuildWorker.go b/pkg/dashboard/worker/imageBuildWorker.go index 999863b76..d9c4194a1 100644 --- a/pkg/dashboard/worker/imageBuildWorker.go +++ b/pkg/dashboard/worker/imageBuildWorker.go @@ -15,15 +15,18 @@ import ( type ImageBuildWorker struct { store *store.Store imageBuilds chan streaming.ImageBuildStatusWSMessage + gitopsQueue chan int } func NewImageBuildWorker( store *store.Store, imageBuilds chan streaming.ImageBuildStatusWSMessage, + gitopsQueue chan int, ) *ImageBuildWorker { imageBuildWorker := &ImageBuildWorker{ store: store, imageBuilds: imageBuilds, + gitopsQueue: gitopsQueue, } return imageBuildWorker @@ -38,7 +41,7 @@ func (m *ImageBuildWorker) Run() { } if imageBuildStatus.Status == "success" { - go createDeployRequest(imageBuildStatus.BuildId, m.store) + go createDeployRequest(imageBuildStatus.BuildId, m.store, m.gitopsQueue) } else if imageBuildStatus.Status != "running" { go handleImageBuildError(imageBuildStatus.BuildId, m.store) } @@ -70,7 +73,7 @@ func handleImageBuildError(buildId string, store *store.Store) { } } -func createDeployRequest(buildId string, store *store.Store) { +func createDeployRequest(buildId string, store *store.Store, gitopsQueue chan int) { event, err := store.Event(buildId) if err != nil { logrus.Error(err) @@ -106,6 +109,8 @@ func createDeployRequest(buildId string, store *store.Store) { return } + gitopsQueue <- 1 + event.Status = model.Success.String() event.Results[0].TriggeredDeployRequestID = triggeredDeployRequestEvent.ID resultsString, err := json.Marshal(event.Results) diff --git a/pkg/git/nativeGit/repoCache.go b/pkg/git/nativeGit/repoCache.go index 412f27696..f0d939e3e 100644 --- a/pkg/git/nativeGit/repoCache.go +++ b/pkg/git/nativeGit/repoCache.go @@ -33,7 +33,7 @@ type RepoCache struct { tokenManager customScm.NonImpersonatedTokenManager repos map[string]*repoData reposMapLock sync.Mutex // lock this if you add or remove items from the repos map - stopCh chan struct{} + stopCh chan os.Signal // For webhook registration config *dashboardConfig.Config @@ -57,7 +57,7 @@ const BRANCH_DELETED_WORKER_SUBPATH = "branch-deleted-worker" func NewRepoCache( tokenManager customScm.NonImpersonatedTokenManager, - stopCh chan struct{}, + stopCh chan os.Signal, config *dashboardConfig.Config, dynamicConfig *dynamicconfig.DynamicConfig, clientHub *streaming.ClientHub,