From cea727664c61101dd10a8d82ee7014023b096a17 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Sat, 3 Dec 2022 16:32:28 +0200 Subject: [PATCH] Wait for metrics handling to finish even when there is an error Previous to this, setup() and teardown() exceptions or test.abort() calls would have made k6 exit prematurely, without waiting for the metrics processing to finish. This also caused a data race in integration tests and undefined behavior in general. The commit also adds and improves a bunch of integration tests, particularly when it comes to validating aborted tests and --linger. --- cmd/integration_test.go | 280 ++++++++++++++++++++++++++++++++++++++-- cmd/run.go | 34 ++--- core/engine.go | 1 + core/local/local.go | 8 +- 4 files changed, 289 insertions(+), 34 deletions(-) diff --git a/cmd/integration_test.go b/cmd/integration_test.go index ad5f5a062f3c..10e6eba5f6b2 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -514,14 +514,22 @@ func TestSetupTeardownThresholds(t *testing.T) { }; `)) - ts := getSimpleCloudOutputTestState(t, script, []string{}, lib.RunStatusFinished, cloudapi.ResultStatusPassed, 0) + ts := getSimpleCloudOutputTestState(t, script, nil, lib.RunStatusFinished, cloudapi.ResultStatusPassed, 0) newRootCommand(ts.globalState).execute() - assert.Len(t, ts.loggerHook.Drain(), 0) stdOut := ts.stdOut.String() assert.Contains(t, stdOut, `✓ http_reqs......................: 7`) assert.Contains(t, stdOut, `✓ iterations.....................: 5`) assert.Contains(t, stdOut, `✓ setup_teardown.................: 2`) + + logMsgs := ts.loggerHook.Drain() + for _, msg := range logMsgs { + if msg.Level != logrus.DebugLevel { + assert.Failf(t, "unexpected log message", "level %s, msg '%s'", msg.Level, msg.Message) + } + } + assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Metrics emission of VUs and VUsMax metrics stopped")) + assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Metrics processing finished!")) } func TestThresholdsFailed(t *testing.T) { @@ -602,6 +610,8 @@ func TestAbortedByThreshold(t *testing.T) { t.Log(stdOut) assert.Contains(t, stdOut, `✗ iterations`) assert.Contains(t, stdOut, `teardown() called`) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) + assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=8 tainted=true`) } @@ -654,6 +664,8 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { assert.Contains(t, stdOut, `✓ tc`) assert.Contains(t, stdOut, `✓ { group:::teardown }`) assert.Contains(t, stdOut, `Stopping k6 in response to signal`) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) + assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) } @@ -720,10 +732,12 @@ func TestAbortedByUserWithRestAPI(t *testing.T) { assert.Contains(t, stdOut, `teardown() called`) assert.Contains(t, stdOut, `PATCH /v1/status`) assert.Contains(t, stdOut, `run: stopped by user; exiting...`) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) + assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) } -func TestAbortedByScriptSetupError(t *testing.T) { +func TestAbortedByScriptSetupErrorWithDependency(t *testing.T) { t.Parallel() depScript := []byte(` export default function () { @@ -732,6 +746,7 @@ func TestAbortedByScriptSetupError(t *testing.T) { function baz() { throw new Error("baz"); } + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} `) mainScript := []byte(` import bar from "./bar.js"; @@ -740,6 +755,8 @@ func TestAbortedByScriptSetupError(t *testing.T) { bar(); }; export default function() {}; + + export { handleSummary } from "./bar.js"; `) srv := getCloudTestEndChecker(t, lib.RunStatusAbortedScriptError, cloudapi.ResultStatusPassed) @@ -754,14 +771,7 @@ func TestAbortedByScriptSetupError(t *testing.T) { newRootCommand(ts.globalState).execute() - // FIXME: remove this locking after VU initialization accepts a context and - // is properly synchronized: currently when a test is aborted during the - // init phase, some logs might be emitted after the above command returns... - // see: https://github.com/grafana/k6/issues/2790 - ts.outMutex.Lock() stdOut := ts.stdOut.String() - ts.outMutex.Unlock() - t.Log(stdOut) assert.Contains(t, stdOut, `wonky setup`) @@ -772,26 +782,271 @@ func TestAbortedByScriptSetupError(t *testing.T) { assert.Contains(t, stdOut, `level=error msg="Error: baz\n\tat baz (`+rootPath+`test/bar.js:6:9(3))\n\tat `+ rootPath+`test/bar.js:3:3(3)\n\tat setup (`+rootPath+`test/test.js:5:3(9))\n\tat native\n" hint="script exception"`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`) + assert.Contains(t, stdOut, "bogus summary") +} + +func runTestWithNoLinger(t *testing.T, ts *globalTestState) { + newRootCommand(ts.globalState).execute() +} + +func runTestWithLinger(t *testing.T, ts *globalTestState) { + ts.args = append(ts.args, "--linger") + + sendSignal := make(chan struct{}) + ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) { + go func() { + <-sendSignal + c <- os.Interrupt + }() + } + ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + newRootCommand(ts.globalState).execute() + }() + + testFinished := false + for i := 0; i <= 15 && testFinished == false; i++ { + time.Sleep(1 * time.Second) + ts.outMutex.Lock() + stdOut := ts.stdOut.String() + ts.outMutex.Unlock() + + if !strings.Contains(stdOut, "Linger set; waiting for Ctrl+C") { + t.Logf("test wasn't finished on try %d at t=%s", i, time.Now()) + continue + } + testFinished = true + close(sendSignal) + } + + require.True(t, testFinished) + wg.Wait() +} + +func TestAbortedByScriptSetupError(t *testing.T) { + t.Parallel() + script := []byte(` + export function setup() { + console.log('wonky setup'); + throw new Error('foo'); + } + + export function teardown() { + console.log('nice teardown'); + } + + export default function () {}; + + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} + `) + + doChecks := func(t *testing.T, ts *globalTestState) { + stdOut := ts.stdOut.String() + assert.Contains(t, stdOut, "Error: foo") + assert.Contains(t, stdOut, "wonky setup") + assert.NotContains(t, stdOut, "nice teardown") // do not execute teardown if setup failed + assert.Contains(t, stdOut, "bogus summary") + } + + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + ts := testAbortedByScriptError(t, script, runTestWithNoLinger) + doChecks(t, ts) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + ts := testAbortedByScriptError(t, script, runTestWithLinger) + doChecks(t, ts) + }) +} + +func TestAbortedByScriptTeardownError(t *testing.T) { + t.Parallel() + script := []byte(` + export function setup() { + console.log('nice setup'); + } + + export function teardown() { + console.log('wonky teardown'); + throw new Error('foo'); + } + + export default function () {}; + + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} + `) + + doChecks := func(t *testing.T, ts *globalTestState) { + stdOut := ts.stdOut.String() + assert.Contains(t, stdOut, "Error: foo") + assert.Contains(t, stdOut, "nice setup") + assert.Contains(t, stdOut, "wonky teardown") + assert.Contains(t, stdOut, "bogus summary") + } + + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + ts := testAbortedByScriptError(t, script, runTestWithNoLinger) + doChecks(t, ts) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + ts := testAbortedByScriptError(t, script, runTestWithLinger) + doChecks(t, ts) + }) +} + +func testAbortedByScriptError(t *testing.T, script []byte, runTest func(*testing.T, *globalTestState)) *globalTestState { + ts := getSimpleCloudOutputTestState( + t, script, nil, lib.RunStatusAbortedScriptError, cloudapi.ResultStatusPassed, int(exitcodes.ScriptException), + ) + runTest(t, ts) + + stdOut := ts.stdOut.String() + t.Log(stdOut) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) + assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) + assert.Contains(t, stdOut, `level=debug msg="Everything has finished, exiting k6!"`) + assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`) + return ts +} + +func TestAbortedByTestAbortFirstInitCode(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + exec.test.abort('foo'); + export default function () {}; + + // Should not be called, since error is in the init context + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} + `) + + ts := newGlobalTestState(t) + require.NoError(t, afero.WriteFile(ts.fs, filepath.Join(ts.cwd, "test.js"), script, 0o644)) + ts.args = []string{"k6", "run", "-v", "--log-output=stdout", "test.js"} + ts.expectedExitCode = int(exitcodes.ScriptAborted) + + newRootCommand(ts.globalState).execute() + stdOut := ts.stdOut.String() + t.Log(stdOut) + assert.Contains(t, stdOut, "test aborted: foo") + assert.NotContains(t, stdOut, "bogus summary") +} + +func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + + export const options = {vus: 3, duration: '5s'}; + + if (__VU > 1) { + exec.test.abort('foo'); + } + + export default function () {}; + + // Should not be called, since error is in the init context + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} + `) + + ts := testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) + assert.NotContains(t, ts.stdOut.String(), "bogus summary") } -func TestAbortedByScriptAbort(t *testing.T) { +func TestAbortedByScriptAbortInVUCode(t *testing.T) { t.Parallel() script := []byte(` import exec from 'k6/execution'; export default function () { exec.test.abort('foo'); }; + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} `) + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + }) +} + +func TestAbortedByScriptAbortInSetup(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + export function setup() { + exec.test.abort('foo'); + } + export default function () {}; + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} + `) + + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + }) +} + +func TestAbortedByScriptAbortInTeardown(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + export function teardown() { + exec.test.abort('foo'); + } + export default function () {}; + export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} + `) + + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + }) +} + +func testAbortedByScriptTestAbort( + t *testing.T, shouldHaveMetrics bool, script []byte, runTest func(*testing.T, *globalTestState), +) *globalTestState { ts := getSimpleCloudOutputTestState( t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, int(exitcodes.ScriptAborted), ) - newRootCommand(ts.globalState).execute() + runTest(t, ts) stdOut := ts.stdOut.String() t.Log(stdOut) assert.Contains(t, stdOut, "test aborted: foo") assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) + if shouldHaveMetrics { + assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) + assert.Contains(t, stdOut, "bogus summary") + } else { + assert.NotContains(t, stdOut, "bogus summary") + } + return ts } func TestAbortedByScriptInitError(t *testing.T) { @@ -825,6 +1080,7 @@ func TestAbortedByScriptInitError(t *testing.T) { t.Log(stdOut) assert.Contains(t, stdOut, `level=error msg="Error: oops in 2\n\tat file:///`) assert.Contains(t, stdOut, `hint="error while initializing VU #2 (script exception)"`) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`) } diff --git a/cmd/run.go b/cmd/run.go index 76496f984252..5e2f55a24780 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -185,35 +185,27 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { // Start the test run initBar.Modify(pb.WithConstProgress(0, "Starting test...")) - var interrupt error err = engineRun() if err != nil { - err = common.UnwrapGojaInterruptedError(err) - if errext.IsInterruptError(err) { - // Don't return here since we need to work with --linger, - // show the end-of-test summary and exit cleanly. - interrupt = err - } - if !conf.Linger.Bool && interrupt == nil { - return errext.WithExitCodeIfNone(err, exitcodes.GenericEngine) - } + logger.WithError(err).Debug("Engine terminated with an error") + } else { + logger.Debug("Engine run terminated cleanly") } runCancel() - logger.Debug("Engine run terminated cleanly") progressCancel() progressBarWG.Wait() executionState := execScheduler.GetState() // Warn if no iterations could be completed. - if executionState.GetFullIterationCount() == 0 { + if err == nil && executionState.GetFullIterationCount() == 0 { logger.Warn("No script iterations finished, consider making the test duration longer") } // Handle the end-of-test summary. if !testRunState.RuntimeOptions.NoSummary.Bool { engine.MetricsEngine.MetricsLock.Lock() // TODO: refactor so this is not needed - summaryResult, err := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ + summaryResult, hsErr := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ Metrics: engine.MetricsEngine.ObservedMetrics, RootGroup: execScheduler.GetRunner().GetDefaultGroup(), TestRunDuration: executionState.GetCurrentTestRunDuration(), @@ -224,11 +216,11 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { }, }) engine.MetricsEngine.MetricsLock.Unlock() - if err == nil { - err = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult) + if hsErr == nil { + hsErr = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult) } - if err != nil { - logger.WithError(err).Error("failed to handle the end-of-test summary") + if hsErr != nil { + logger.WithError(hsErr).Error("failed to handle the end-of-test summary") } } @@ -250,12 +242,12 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { engineWait() logger.Debug("Everything has finished, exiting k6!") if test.keyLogger != nil { - if err := test.keyLogger.Close(); err != nil { - logger.WithError(err).Warn("Error while closing the SSLKEYLOGFILE") + if klErr := test.keyLogger.Close(); klErr != nil { + logger.WithError(klErr).Warn("Error while closing the SSLKEYLOGFILE") } } - if interrupt != nil { - return interrupt + if err != nil { + return errext.WithExitCodeIfNone(common.UnwrapGojaInterruptedError(err), exitcodes.GenericEngine) } if engine.IsTainted() { return errext.WithExitCodeIfNone(errors.New("some thresholds have failed"), exitcodes.ThresholdsHaveFailed) diff --git a/core/engine.go b/core/engine.go index 3a81861b7eea..6068f58085a8 100644 --- a/core/engine.go +++ b/core/engine.go @@ -263,6 +263,7 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu e.thresholdsTainted = thresholdsTainted e.thresholdsTaintedLock.Unlock() } + e.logger.Debug("Metrics processing finished!") }() ticker := time.NewTicker(collectRate) diff --git a/core/local/local.go b/core/local/local.go index 0029c1dc5607..8b043fd7292e 100644 --- a/core/local/local.go +++ b/core/local/local.go @@ -258,8 +258,14 @@ func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- me // Init concurrently initializes all of the planned VUs and then sequentially // initializes all of the configured executors. -func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) error { +func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { e.emitVUsAndVUsMax(ctx, samplesOut) + defer func() { + if err != nil { + close(e.stopVUsEmission) + <-e.vusEmissionStopped + } + }() logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan)