diff --git a/cmd/integration_test.go b/cmd/integration_test.go index 1accfd86949..d63bb8fa7ae 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -976,11 +976,18 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) { export default function () {}; - // Should not be called, since error is in the init context export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} ` - testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, script, runTestWithLinger) + }) } func TestAbortedByScriptAbortInVUCode(t *testing.T) { @@ -995,12 +1002,12 @@ func TestAbortedByScriptAbortInVUCode(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } @@ -1041,12 +1048,12 @@ func TestAbortedByScriptAbortInSetup(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } @@ -1063,18 +1070,16 @@ func TestAbortedByScriptAbortInTeardown(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } -func testAbortedByScriptTestAbort( - t *testing.T, shouldHaveMetrics bool, script string, runTest func(*testing.T, *globalTestState), -) *globalTestState { +func testAbortedByScriptTestAbort(t *testing.T, script string, runTest func(*testing.T, *globalTestState)) { ts := getSimpleCloudOutputTestState( t, script, nil, cloudapi.RunStatusAbortedUser, cloudapi.ResultStatusPassed, exitcodes.ScriptAborted, ) @@ -1085,13 +1090,8 @@ func testAbortedByScriptTestAbort( assert.Contains(t, stdOut, "test aborted: foo") assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) - if shouldHaveMetrics { - assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) - assert.Contains(t, stdOut, "bogus summary") - } else { - assert.NotContains(t, stdOut, "bogus summary") - } - return ts + assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) + assert.Contains(t, stdOut, "bogus summary") } func TestAbortedByInterruptDuringVUInit(t *testing.T) { diff --git a/cmd/integration_tests/eventloop/eventloop_test.go b/cmd/integration_tests/eventloop/eventloop_test.go index 7cd359a4819..efbaf600ca0 100644 --- a/cmd/integration_tests/eventloop/eventloop_test.go +++ b/cmd/integration_tests/eventloop/eventloop_test.go @@ -71,8 +71,6 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context, } }() - require.NoError(t, execScheduler.Init(ctx, samples)) - errCh := make(chan error, 1) go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }() diff --git a/core/engine.go b/core/engine.go index 5e7e1e75c57..ef8366f12c6 100644 --- a/core/engine.go +++ b/core/engine.go @@ -95,11 +95,6 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o // - The second returned lambda can be used to wait for that process to finish. func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait func(), err error) { e.logger.Debug("Initialization starting...") - // TODO: if we ever need metrics processing in the init context, we can move - // this below the other components... or even start them concurrently? - if err := e.ExecutionScheduler.Init(runCtx, e.Samples); err != nil { - return nil, nil, err - } // TODO: move all of this in a separate struct? see main TODO above processMetricsAfterRun := make(chan struct{}) diff --git a/core/engine_test.go b/core/engine_test.go index 2152a8c1aec..5dfa333b999 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -893,57 +893,6 @@ func TestSetupException(t *testing.T) { } } -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestVuInitException(t *testing.T) { - t.Parallel() - - script := []byte(` - export let options = { - vus: 3, - iterations: 5, - }; - - export default function() {}; - - if (__VU == 2) { - throw new Error('oops in ' + __VU); - } - `) - - piState := getTestPreInitState(t) - runner, err := js.New( - piState, - &loader.SourceData{URL: &url.URL{Scheme: "file", Path: "/script.js"}, Data: script}, - nil, - ) - require.NoError(t, err) - - opts, err := executor.DeriveScenariosFromShortcuts(runner.GetOptions(), nil) - require.NoError(t, err) - - testState := getTestRunState(t, piState, opts, runner) - - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := NewEngine(testState, execScheduler, nil) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - _, _, err = engine.Init(ctx, ctx) // no need for 2 different contexts - - require.Error(t, err) - - var exception errext.Exception - require.ErrorAs(t, err, &exception) - assert.Equal(t, "Error: oops in 2\n\tat file:///script.js:10:9(29)\n", err.Error()) - - var errWithHint errext.HasHint - require.ErrorAs(t, err, &errWithHint) - assert.Equal(t, "error while initializing VU #2 (script exception)", errWithHint.Hint()) -} - func TestEmittedMetricsWhenScalingDown(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) diff --git a/execution/scheduler.go b/execution/scheduler.go index 2ebb442bc50..ee75b05b2f5 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime" + "sync" "sync/atomic" "time" @@ -26,10 +27,6 @@ type Scheduler struct { maxDuration time.Duration // cached value derived from the execution plan maxPossibleVUs uint64 // cached value derived from the execution plan state *lib.ExecutionState - - // TODO: remove these when we don't have separate Init() and Run() methods - // and can use a context + a WaitGroup (or something like that) - stopVUsEmission, vusEmissionStopped chan struct{} } // NewScheduler creates and returns a new Scheduler instance, without @@ -84,9 +81,6 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { maxDuration: maxDuration, maxPossibleVUs: maxPossibleVUs, state: executionState, - - stopVUsEmission: make(chan struct{}), - vusEmissionStopped: make(chan struct{}), }, nil } @@ -199,9 +193,11 @@ func (e *Scheduler) initVUsConcurrently( return doneInits } -func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { +func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) func() { e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...") tags := e.state.Test.RunTags + wg := &sync.WaitGroup{} + wg.Add(1) emitMetrics := func() { t := time.Now() @@ -234,7 +230,7 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam defer func() { ticker.Stop() e.state.Test.Logger.Debug("Metrics emission of VUs and VUsMax metrics stopped") - close(e.vusEmissionStopped) + wg.Done() }() for { @@ -243,23 +239,17 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam emitMetrics() case <-ctx.Done(): return - case <-e.stopVUsEmission: - return } } }() + + return wg.Wait } -// Init concurrently initializes all of the planned VUs and then sequentially -// initializes all of the configured executors. -func (e *Scheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { - e.emitVUsAndVUsMax(ctx, samplesOut) - defer func() { - if err != nil { - close(e.stopVUsEmission) - <-e.vusEmissionStopped - } - }() +// initVusAndExecutors concurrently initializes all of the planned VUs and then +// sequentially initializes all of the configured executors. +func (e *Scheduler) initVusAndExecutors(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { + e.initProgress.Modify(pb.WithConstProgress(0, "Init VUs...")) logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan) @@ -386,15 +376,19 @@ func (e *Scheduler) runExecutor( // out channel. // //nolint:funlen -func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { - defer func() { - close(e.stopVUsEmission) - <-e.vusEmissionStopped - }() +func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) error { + execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) + waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut) + defer waitForVUsMetricPush() + defer execSchedRunCancel() + + if err := e.initVusAndExecutors(execSchedRunCtx, samplesOut); err != nil { + return err + } executorsCount := len(e.executors) logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") - e.initProgress.Modify(pb.WithConstLeft("Run")) + e.initProgress.Modify(pb.WithConstLeft("Run"), pb.WithConstProgress(0, "Starting test...")) var interrupted bool defer func() { e.state.MarkEnded() @@ -410,7 +404,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr select { case <-e.state.ResumeNotify(): // continue - case <-runCtx.Done(): + case <-execSchedRunCtx.Done(): return nil } } @@ -422,15 +416,16 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr runResults := make(chan error, executorsCount) // nil values are successful runs - runCtx = lib.WithExecutionState(runCtx, e.state) - runSubCtx, cancel := context.WithCancel(runCtx) - defer cancel() // just in case, and to shut up go vet... + // TODO: get rid of this context, pass the e.state directly to VUs when they + // are initialized by e.initVusAndExecutors(). This will also give access to + // its properties in their init context executions. + withExecStateCtx := lib.WithExecutionState(execSchedRunCtx, e.state) // Run setup() before any executors, if it's not disabled if !e.state.Test.Options.NoSetup.Bool { e.state.SetExecutionStatus(lib.ExecutionStatusSetup) e.initProgress.Modify(pb.WithConstProgress(1, "setup()")) - if err := e.state.Test.Runner.Setup(runSubCtx, engineOut); err != nil { + if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil { logger.WithField("error", err).Debug("setup() aborted by error") return err } @@ -441,8 +436,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr logger.Debug("Start all executors...") e.state.SetExecutionStatus(lib.ExecutionStatusRunning) + executorsRunCtx, executorsRunCancel := context.WithCancel(withExecStateCtx) + defer executorsRunCancel() for _, exec := range e.executors { - go e.runExecutor(runSubCtx, runResults, engineOut, exec) + go e.runExecutor(executorsRunCtx, runResults, samplesOut, exec) } // Wait for all executors to finish @@ -452,7 +449,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr if err != nil && firstErr == nil { logger.WithError(err).Debug("Executor returned with an error, cancelling test run...") firstErr = err - cancel() + executorsRunCancel() } } @@ -462,13 +459,13 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr e.initProgress.Modify(pb.WithConstProgress(1, "teardown()")) // We run teardown() with the global context, so it isn't interrupted by - // aborts caused by thresholds or even Ctrl+C (unless used twice). - if err := e.state.Test.Runner.Teardown(globalCtx, engineOut); err != nil { + // thresholds or test.abort() or even Ctrl+C (unless used twice). + if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil { logger.WithField("error", err).Debug("teardown() aborted by error") return err } } - if err := GetCancelReasonIfTestAborted(runSubCtx); err != nil { + if err := GetCancelReasonIfTestAborted(executorsRunCtx); err != nil { interrupted = true return err } diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index 43dfc9b69ac..6e9f37c048c 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -88,8 +88,6 @@ func newTestScheduler( } }() - require.NoError(t, execScheduler.Init(ctx, samples)) - return ctx, cancel, execScheduler, samples } @@ -107,9 +105,9 @@ func TestSchedulerRunNonDefault(t *testing.T) { t.Parallel() testCases := []struct { - name, script, expErr string + name, script string }{ - {"defaultOK", `export default function () {}`, ""}, + {"defaultOK", `export default function () {}`}, {"nonDefaultOK", ` export let options = { scenarios: { @@ -121,7 +119,7 @@ func TestSchedulerRunNonDefault(t *testing.T) { }, } } - export function nonDefault() {}`, ""}, + export function nonDefault() {}`}, } for _, tc := range testCases { @@ -146,13 +144,7 @@ func TestSchedulerRunNonDefault(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) go func() { - err := execScheduler.Init(ctx, samples) - if tc.expErr != "" { - assert.EqualError(t, err, tc.expErr) - } else { - assert.NoError(t, err) - assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) - } + assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) }() for { @@ -263,7 +255,6 @@ func TestSchedulerRunEnv(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) go func() { - assert.NoError(t, execScheduler.Init(ctx, samples)) assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) }() @@ -333,7 +324,6 @@ func TestSchedulerSystemTags(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - require.NoError(t, execScheduler.Init(ctx, samples)) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) }() @@ -464,7 +454,6 @@ func TestSchedulerRunCustomTags(t *testing.T) { samples := make(chan metrics.SampleContainer) go func() { defer close(done) - require.NoError(t, execScheduler.Init(ctx, samples)) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) }() var gotTrailTag, gotNetTrailTag bool @@ -626,7 +615,6 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { samples := make(chan metrics.SampleContainer) go func() { - assert.NoError(t, execScheduler.Init(ctx, samples)) assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(samples) }() @@ -959,7 +947,6 @@ func TestSchedulerEndIterations(t *testing.T) { require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) - require.NoError(t, execScheduler.Init(ctx, samples)) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) assert.Equal(t, uint64(100), execScheduler.GetState().GetFullIterationCount()) @@ -1170,7 +1157,6 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { done := make(chan struct{}) sampleContainers := make(chan metrics.SampleContainer) go func() { - require.NoError(t, execScheduler.Init(ctx, sampleContainers)) assert.NoError(t, execScheduler.Run(ctx, ctx, sampleContainers)) close(done) }()