Skip to content

Commit

Permalink
chore: Optimizing queries (#836)
Browse files Browse the repository at this point in the history
* Gitops queue to notify on new deploys

* Gitops queue to notify on new deploys

* Processing may block without a buffer

* Fixing tests
  • Loading branch information
laszlocph authored Mar 12, 2024
1 parent a3b634d commit f9aef3c
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 26 deletions.
18 changes: 10 additions & 8 deletions cmd/dashboard/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,9 @@ func main() {
)
go alertStateManager.Run()

stopCh := make(chan struct{})
stopCh := make(chan os.Signal, 1)
defer close(stopCh)

gimletdStopCh := make(chan os.Signal, 1)
signal.Notify(gimletdStopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
waitCh := make(chan struct{})
signal.Notify(stopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

if config.GitopsRepo != "" || config.GitopsRepoDeployKeyPath != "" {
panic("GITOPS_REPO and GITOPS_REPO_DEPLOY_KEY_PATH are deprecated." +
Expand Down Expand Up @@ -148,10 +145,13 @@ func main() {
go repoCache.Run()
log.Info("Repo cache initialized")

gitopsQueue := make(chan int, 1000)

artifactsWorker := worker.NewArtifactsWorker(
repoCache,
store,
triggerArtifactGeneration,
gitopsQueue,
)
go artifactsWorker.Run()

Expand All @@ -166,7 +166,7 @@ func main() {
go weeklyReporter.Run()
}

imageBuildWorker := worker.NewImageBuildWorker(store, successfullImageBuilds)
imageBuildWorker := worker.NewImageBuildWorker(store, successfullImageBuilds, gitopsQueue)
go imageBuildWorker.Run()

chartUpdatePullRequests := map[string]interface{}{}
Expand Down Expand Up @@ -213,9 +213,9 @@ func main() {
perf,
gitUser,
config.GitHost,
gitopsQueue,
)
go gitopsWorker.Run()
log.Info("Gitops worker started")

if config.ReleaseStats == "enabled" {
releaseStateWorker := &worker.ReleaseStateWorker{
Expand Down Expand Up @@ -268,6 +268,7 @@ func main() {
logger,
gitServer,
gitUser,
gitopsQueue,
)

go func() {
Expand All @@ -285,7 +286,8 @@ func main() {
}
}

<-waitCh
<-stopCh
close(gitopsQueue)
log.Info("Successfully cleaned up resources. Stopping.")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Test_artifact(t *testing.T) {
store := store.NewTest(encryptionKey, encryptionKeyNew)
logger := logrus.Logger{}

router := server.SetupRouter(&config.Config{}, &dynamicconfig.DynamicConfig{}, nil, nil, nil, store, nil, nil, nil, nil, nil, nil, nil, &logger, nil, nil)
router := server.SetupRouter(&config.Config{}, &dynamicconfig.DynamicConfig{}, nil, nil, nil, store, nil, nil, nil, nil, nil, nil, nil, &logger, nil, nil, make(chan int, 10))
server := httptest.NewServer(router)
defer server.Close()

Expand Down
8 changes: 8 additions & 0 deletions pkg/dashboard/server/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func saveArtifact(w http.ResponseWriter, r *http.Request) {
}

savedArtifact, err := model.ToArtifact(savedEvent)
if err != nil {
logrus.Errorf("cannot serialize artifact: %s", err)
http.Error(w, http.StatusText(500), 500)
return
}
artifactStr, err := json.Marshal(savedArtifact)
if err != nil {
logrus.Errorf("cannot serialize artifact: %s", err)
Expand All @@ -47,6 +52,9 @@ func saveArtifact(w http.ResponseWriter, r *http.Request) {

w.WriteHeader(http.StatusCreated)
w.Write(artifactStr)

gitopsQueue := ctx.Value("gitopsQueue").(chan int)
gitopsQueue <- 1
}

func getArtifacts(w http.ResponseWriter, r *http.Request) {
Expand Down
1 change: 1 addition & 0 deletions pkg/dashboard/server/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Test_saveArtifact(t *testing.T) {

_, body, err := testEndpoint(saveArtifact, func(ctx context.Context) context.Context {
ctx = context.WithValue(ctx, "store", store)
ctx = context.WithValue(ctx, "gitopsQueue", make(chan int, 10))
return ctx
}, "/path")
assert.Nil(t, err)
Expand Down
6 changes: 6 additions & 0 deletions pkg/dashboard/server/releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ func release(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("%s - cannot save release request: %s", http.StatusText(http.StatusInternalServerError), err), http.StatusInternalServerError)
return
}

gitopsQueue := ctx.Value("gitopsQueue").(chan int)
gitopsQueue <- 1
}

eventIDBytes, _ := json.Marshal(map[string]string{
Expand Down Expand Up @@ -576,6 +579,9 @@ func performRollback(w http.ResponseWriter, r *http.Request) {

w.WriteHeader(http.StatusCreated)
w.Write(eventIDBytes)

gitopsQueue := ctx.Value("gitopsQueue").(chan int)
gitopsQueue <- 1
}

func delete(w http.ResponseWriter, r *http.Request) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/dashboard/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func SetupRouter(
logger *log.Logger,
gitServer http.Handler,
gitUser *model.User,
gitopsQueue chan int,
) *chi.Mux {
r := chi.NewRouter()

Expand All @@ -67,7 +68,7 @@ func SetupRouter(
r.Use(middleware.WithValue("gitopsUpdatePullRequests", gitopsUpdatePullRequests))
r.Use(middleware.WithValue("router", r))
r.Use(middleware.WithValue("gitUser", gitUser))

r.Use(middleware.WithValue("gitopsQueue", gitopsQueue))
r.Use(middleware.WithValue("notificationsManager", notificationsManager))
r.Use(middleware.WithValue("perf", perf))

Expand Down
1 change: 1 addition & 0 deletions pkg/dashboard/server/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func Test_MustUser(t *testing.T) {
&logger,
nil,
nil,
nil,
)
server := httptest.NewServer(router)
defer server.Close()
Expand Down
12 changes: 7 additions & 5 deletions pkg/dashboard/worker/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ type ArtifactsWorker struct {
gitRepoCache *nativeGit.RepoCache
dao *store.Store
trigger chan string
gitopsQueue chan int
}

func NewArtifactsWorker(
gitRepoCache *nativeGit.RepoCache,
dao *store.Store,
trigger chan string,
gitopsQueue chan int,
) *ArtifactsWorker {
return &ArtifactsWorker{gitRepoCache: gitRepoCache, dao: dao, trigger: trigger}
return &ArtifactsWorker{gitRepoCache: gitRepoCache, dao: dao, trigger: trigger, gitopsQueue: gitopsQueue}
}

func (a *ArtifactsWorker) Run() {
Expand All @@ -37,9 +39,6 @@ func (a *ArtifactsWorker) Run() {
}
}

// - should not trigger policy, if there is a newer artifact - geezus
// should be called thourhg a channel, with a go routine to not wait for a lock
// but then it also has to stream stuff to the gui
func (a *ArtifactsWorker) assureGimletArtifacts(repoName string) error {
err := a.gitRepoCache.PerformAction(repoName, func(repo *git.Repository) error {
var innerErr error
Expand All @@ -55,7 +54,10 @@ func (a *ArtifactsWorker) assureGimletArtifacts(repoName string) error {

slices.Reverse(hashes) //artifacts should be generated in commit creation order

return generateFakeArtifactsForCommits(repoName, headBranch, hashes, a.dao, repo)
err := generateFakeArtifactsForCommits(repoName, headBranch, hashes, a.dao, repo)
a.gitopsQueue <- 1

return err
})

return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/dashboard/worker/branchDeleteEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type BranchDeleteEventWorker struct {
tokenManager customScm.NonImpersonatedTokenManager
cachePath string
dao *store.Store
stopCh chan struct{}
stopCh chan os.Signal
}

func NewBranchDeleteEventWorker(
tokenManager customScm.NonImpersonatedTokenManager,
cachePath string,
dao *store.Store,
stopCh chan struct{},
stopCh chan os.Signal,
) *BranchDeleteEventWorker {
branchDeleteEventWorker := &BranchDeleteEventWorker{
tokenManager: tokenManager,
Expand Down
23 changes: 18 additions & 5 deletions pkg/dashboard/worker/gitops.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type GitopsWorker struct {
perf *prometheus.HistogramVec
gitUser *model.User
gitHost string
gitopsQueue chan int
}

func NewGitopsWorker(
Expand All @@ -54,6 +55,7 @@ func NewGitopsWorker(
perf *prometheus.HistogramVec,
gitUser *model.User,
gitHost string,
gitopsQueue chan int,
) *GitopsWorker {
return &GitopsWorker{
store: store,
Expand All @@ -65,18 +67,31 @@ func NewGitopsWorker(
perf: perf,
gitUser: gitUser,
gitHost: gitHost,
gitopsQueue: gitopsQueue,
}
}

func (w *GitopsWorker) Run() {
ticker := time.NewTicker(1 * time.Minute)
defer func() {
ticker.Stop()
}()

logrus.Info("Gitops worker started")
for {
select {
case _, ok := <-w.gitopsQueue:
if !ok {
logrus.Info("Gitops worker stopped")
return
}
case <-ticker.C:
}

events, err := w.store.UnprocessedEvents()
if err != nil {
logrus.Errorf("Could not fetch unprocessed events %s", err.Error())
time.Sleep(1 * time.Second)
continue
}

for _, event := range events {
w.eventsProcessed.Inc()
processEvent(w.store,
Expand All @@ -90,8 +105,6 @@ func (w *GitopsWorker) Run() {
w.gitHost,
)
}

time.Sleep(100 * time.Millisecond)
}
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/dashboard/worker/imageBuildWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ import (
type ImageBuildWorker struct {
store *store.Store
imageBuilds chan streaming.ImageBuildStatusWSMessage
gitopsQueue chan int
}

func NewImageBuildWorker(
store *store.Store,
imageBuilds chan streaming.ImageBuildStatusWSMessage,
gitopsQueue chan int,
) *ImageBuildWorker {
imageBuildWorker := &ImageBuildWorker{
store: store,
imageBuilds: imageBuilds,
gitopsQueue: gitopsQueue,
}

return imageBuildWorker
Expand All @@ -38,7 +41,7 @@ func (m *ImageBuildWorker) Run() {
}

if imageBuildStatus.Status == "success" {
go createDeployRequest(imageBuildStatus.BuildId, m.store)
go createDeployRequest(imageBuildStatus.BuildId, m.store, m.gitopsQueue)
} else if imageBuildStatus.Status != "running" {
go handleImageBuildError(imageBuildStatus.BuildId, m.store)
}
Expand Down Expand Up @@ -70,7 +73,7 @@ func handleImageBuildError(buildId string, store *store.Store) {
}
}

func createDeployRequest(buildId string, store *store.Store) {
func createDeployRequest(buildId string, store *store.Store, gitopsQueue chan int) {
event, err := store.Event(buildId)
if err != nil {
logrus.Error(err)
Expand Down Expand Up @@ -106,6 +109,8 @@ func createDeployRequest(buildId string, store *store.Store) {
return
}

gitopsQueue <- 1

event.Status = model.Success.String()
event.Results[0].TriggeredDeployRequestID = triggeredDeployRequestEvent.ID
resultsString, err := json.Marshal(event.Results)
Expand Down
4 changes: 2 additions & 2 deletions pkg/git/nativeGit/repoCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type RepoCache struct {
tokenManager customScm.NonImpersonatedTokenManager
repos map[string]*repoData
reposMapLock sync.Mutex // lock this if you add or remove items from the repos map
stopCh chan struct{}
stopCh chan os.Signal

// For webhook registration
config *dashboardConfig.Config
Expand All @@ -57,7 +57,7 @@ const BRANCH_DELETED_WORKER_SUBPATH = "branch-deleted-worker"

func NewRepoCache(
tokenManager customScm.NonImpersonatedTokenManager,
stopCh chan struct{},
stopCh chan os.Signal,
config *dashboardConfig.Config,
dynamicConfig *dynamicconfig.DynamicConfig,
clientHub *streaming.ClientHub,
Expand Down

0 comments on commit f9aef3c

Please sign in to comment.