From 5df32d65c55e6ca3e5d2513d0055875ad16ca2e2 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Fri, 6 Oct 2023 13:20:11 -0400 Subject: [PATCH 1/5] test(server): Fix flakiness in TestServer_ReloadWorkerTags This attempts to address some flakiness by replacing hard-coded sleeps with some smarter polling similarly used in TestServer_ReloadInitialUpstreams. --- .../worker_initial_upstreams_reload_test.go | 4 +- .../server/worker_tags_reload_test.go | 85 ++++++++++++++++--- 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go b/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go index 24b7b36fb1..7bf6b323e2 100644 --- a/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go +++ b/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go @@ -97,7 +97,7 @@ pollFirstController: for { select { case <-timeout.C: - t.Fatalf("timeout wait for worker to connect to first controller") + t.Fatalf("timeout waiting for worker to connect to first controller") case <-poll.C: w, err = serversRepo.LookupWorkerByName(testController.Context(), "test") require.NoError(err) @@ -168,7 +168,7 @@ pollSecondController: for { select { case <-timeout.C: - t.Fatalf("timeout wait for worker to connect to second controller") + t.Fatalf("timeout waiting for worker to connect to second controller") case <-poll.C: w, err = serversRepo.LookupWorkerByName(testController2.Context(), "test") require.NoError(err) diff --git a/internal/cmd/commands/server/worker_tags_reload_test.go b/internal/cmd/commands/server/worker_tags_reload_test.go index 2fc87e59c0..085dbbc462 100644 --- a/internal/cmd/commands/server/worker_tags_reload_test.go +++ b/internal/cmd/commands/server/worker_tags_reload_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/hashicorp/boundary/internal/server" "github.com/hashicorp/boundary/testing/controller" wrapping "github.com/hashicorp/go-kms-wrapping/v2" "github.com/hashicorp/go-kms-wrapping/v2/aead" @@ -74,8 +75,13 @@ func TestServer_ReloadWorkerTags(t *testing.T) { rootWrapper, _ := wrapperWithKey(t) recoveryWrapper, _ := wrapperWithKey(t) workerAuthWrapper, key := wrapperWithKey(t) - testController := controller.NewTestController(t, controller.WithWorkerAuthKms(workerAuthWrapper), controller.WithRootKms(rootWrapper), controller.WithRecoveryKms(recoveryWrapper)) - defer testController.Shutdown() + testController := controller.NewTestController( + t, + controller.WithWorkerAuthKms(workerAuthWrapper), + controller.WithRootKms(rootWrapper), + controller.WithRecoveryKms(recoveryWrapper), + ) + t.Cleanup(testController.Shutdown) wg := &sync.WaitGroup{} @@ -94,7 +100,7 @@ func TestServer_ReloadWorkerTags(t *testing.T) { select { case <-cmd.startedCh: case <-time.After(15 * time.Second): - t.Fatalf("timeout") + t.Fatalf("timeout waiting for worker start") } fetchWorkerTags := func(name string, key string, values []string) { @@ -104,29 +110,88 @@ func TestServer_ReloadWorkerTags(t *testing.T) { w, err := serversRepo.LookupWorkerByName(testController.Context(), name) require.NoError(err) require.NotNil(w) - v, ok := w.CanonicalTags()[key] - require.True(ok) + tags := w.CanonicalTags() + v, ok := tags[key] + require.True(ok, fmt.Sprintf("EXPECTED: map[%s:%s], ACTUAL: %+v", key, values, tags)) require.ElementsMatch(values, v) } // Give time to populate up to the controller - time.Sleep(10 * time.Second) + // Wait until the worker has connected to the controller as seen via + // two status updates + timeout := time.NewTimer(15 * time.Second) + poll := time.NewTimer(0) + var w *server.Worker + var lastStatusTime time.Time + serversRepo, err := testController.Controller().ServersRepoFn() + require.NoError(err) +pollController: + for { + select { + case <-timeout.C: + t.Fatalf("timeout waiting for worker to connect to controller") + case <-poll.C: + w, err = serversRepo.LookupWorkerByName(testController.Context(), "test") + require.NoError(err) + if w != nil { + switch { + case lastStatusTime.IsZero(): + lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) + default: + if !lastStatusTime.Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { + timeout.Stop() + break pollController + } + } + } + poll.Reset(time.Second) + } + } + fetchWorkerTags("test", "type", []string{"dev", "local"}) + // Set new tags on worker cmd.presetConfig.Store(fmt.Sprintf(workerBaseConfig+tag2Config, key, testController.ClusterAddrs()[0])) - cmd.SighupCh <- struct{}{} select { case <-cmd.reloadedCh: case <-time.After(15 * time.Second): - t.Fatalf("timeout") + t.Fatalf("timeout waiting for worker start") + } + + // Give time to populate up to the controller + timeout.Reset(15 * time.Second) + poll.Reset(time.Second) + lastStatusTime = time.Time{} + startTime := time.Now() +pollControllerAgain: + for { + select { + case <-timeout.C: + t.Fatalf("timeout waiting for worker to connect to controller") + case <-poll.C: + w, err = serversRepo.LookupWorkerByName(testController.Context(), "test") + require.NoError(err) + if w != nil { + switch { + case lastStatusTime.IsZero(): + if w.GetLastStatusTime().AsTime().Round(time.Second).After(startTime) { + lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) + } + default: + if !lastStatusTime.Round(time.Second).Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { + timeout.Stop() + break pollControllerAgain + } + } + } + poll.Reset(time.Second) + } } - time.Sleep(10 * time.Second) fetchWorkerTags("test", "foo", []string{"bar", "baz"}) cmd.ShutdownCh <- struct{}{} - wg.Wait() } From 63b70ef2dad123f089b0dc050aa3aadf70600446 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 9 Oct 2023 11:26:52 -0400 Subject: [PATCH 2/5] wip --- .../server/worker_tags_reload_test.go | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/cmd/commands/server/worker_tags_reload_test.go b/internal/cmd/commands/server/worker_tags_reload_test.go index 085dbbc462..1d1e364efc 100644 --- a/internal/cmd/commands/server/worker_tags_reload_test.go +++ b/internal/cmd/commands/server/worker_tags_reload_test.go @@ -119,6 +119,7 @@ func TestServer_ReloadWorkerTags(t *testing.T) { // Give time to populate up to the controller // Wait until the worker has connected to the controller as seen via // two status updates + t.Log("Waiting for worker to connect to controller...") timeout := time.NewTimer(15 * time.Second) poll := time.NewTimer(0) var w *server.Worker @@ -151,6 +152,7 @@ pollController: fetchWorkerTags("test", "type", []string{"dev", "local"}) // Set new tags on worker + t.Log("Restart worker with new tags...") cmd.presetConfig.Store(fmt.Sprintf(workerBaseConfig+tag2Config, key, testController.ClusterAddrs()[0])) cmd.SighupCh <- struct{}{} select { @@ -160,10 +162,11 @@ pollController: } // Give time to populate up to the controller + t.Log("Waiting for worker to connect to controller again...") timeout.Reset(15 * time.Second) poll.Reset(time.Second) lastStatusTime = time.Time{} - startTime := time.Now() + i := 0 pollControllerAgain: for { select { @@ -175,13 +178,18 @@ pollControllerAgain: if w != nil { switch { case lastStatusTime.IsZero(): - if w.GetLastStatusTime().AsTime().Round(time.Second).After(startTime) { - lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) - } + lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) + t.Log(lastStatusTime) default: + t.Log(w.GetLastStatusTime().AsTime().Round(time.Second)) + if !lastStatusTime.Round(time.Second).Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { - timeout.Stop() - break pollControllerAgain + if i == 3 { + timeout.Stop() + break pollControllerAgain + } else { + i += 1 + } } } } From 11035996830e131a74e9080e17dfd6abdeb8594d Mon Sep 17 00:00:00 2001 From: Michael Li Date: Thu, 12 Oct 2023 15:00:01 -0400 Subject: [PATCH 3/5] doc(controller): Fix typo in comment --- testing/controller/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/controller/controller.go b/testing/controller/controller.go index ec5d1e2936..2ceebf27c3 100644 --- a/testing/controller/controller.go +++ b/testing/controller/controller.go @@ -124,8 +124,8 @@ func DisableDatabaseCreation() Option { } } -// DisableDatabaseCreation skips creating a database in docker and allows one to -// be provided through a tcOptions. +// DisableDatabaseDestruction skips destoying the database in docker and allows +// it to be examined after-the-fact func DisableDatabaseDestruction() Option { return func(c *option) error { c.setDisableDatabaseDestruction = true From 3151c6515411fd07a75d834f8a673084dfad5aa9 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 16 Oct 2023 09:36:56 -0400 Subject: [PATCH 4/5] DEBUG: Log tags --- internal/cmd/commands/server/worker_tags_reload_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/cmd/commands/server/worker_tags_reload_test.go b/internal/cmd/commands/server/worker_tags_reload_test.go index 1d1e364efc..e975bd7cca 100644 --- a/internal/cmd/commands/server/worker_tags_reload_test.go +++ b/internal/cmd/commands/server/worker_tags_reload_test.go @@ -176,13 +176,11 @@ pollControllerAgain: w, err = serversRepo.LookupWorkerByName(testController.Context(), "test") require.NoError(err) if w != nil { + t.Logf("%s, %+v", w.GetLastStatusTime().AsTime().Round(time.Second), w.GetConfigTags()) switch { case lastStatusTime.IsZero(): lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) - t.Log(lastStatusTime) default: - t.Log(w.GetLastStatusTime().AsTime().Round(time.Second)) - if !lastStatusTime.Round(time.Second).Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { if i == 3 { timeout.Stop() From 3d5b907bb60ae38cbf6736f174f31aa58e49e51d Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 16 Oct 2023 14:30:47 -0400 Subject: [PATCH 5/5] wip2 --- internal/cmd/commands/server/worker_tags_reload_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/cmd/commands/server/worker_tags_reload_test.go b/internal/cmd/commands/server/worker_tags_reload_test.go index e975bd7cca..e0595b1466 100644 --- a/internal/cmd/commands/server/worker_tags_reload_test.go +++ b/internal/cmd/commands/server/worker_tags_reload_test.go @@ -80,6 +80,7 @@ func TestServer_ReloadWorkerTags(t *testing.T) { controller.WithWorkerAuthKms(workerAuthWrapper), controller.WithRootKms(rootWrapper), controller.WithRecoveryKms(recoveryWrapper), + controller.WithEnableEventing(), ) t.Cleanup(testController.Shutdown) @@ -154,6 +155,7 @@ pollController: // Set new tags on worker t.Log("Restart worker with new tags...") cmd.presetConfig.Store(fmt.Sprintf(workerBaseConfig+tag2Config, key, testController.ClusterAddrs()[0])) + t.Logf("%s", workerBaseConfig+tag2Config) cmd.SighupCh <- struct{}{} select { case <-cmd.reloadedCh: