Skip to content

Commit

Permalink
Merge pull request #3204 from grafana/local-execution-controller
Browse files Browse the repository at this point in the history
Introduce `execution.Controller` with a `local` no-op implementation
  • Loading branch information
na-- authored Jan 11, 2024
2 parents d814297 + 19a30e2 commit 033d768
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 31 deletions.
3 changes: 2 additions & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
Expand Down Expand Up @@ -41,7 +42,7 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
Expand Down
3 changes: 2 additions & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestSetupData(t *testing.T) {
TeardownTimeout: types.NullDurationFrom(5 * time.Second),
}, runner)

execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestPatchStatus(t *testing.T) {
require.NoError(t, err)

testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
Expand Down
2 changes: 1 addition & 1 deletion cmd/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type cmdArchive struct {
}

func (c *cmdArchive) run(cmd *cobra.Command, args []string) error {
test, err := loadAndConfigureTest(c.gs, cmd, args, getPartialConfig)
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
)
printBar(c.gs, progressBar)

test, err := loadAndConfigureTest(c.gs, cmd, args, getPartialConfig)
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getCmdInspect(gs *state.GlobalState) *cobra.Command {
Long: `Inspect a script or archive.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
test, err := loadTest(gs, cmd, args)
test, err := loadLocalTest(gs, cmd, args)
if err != nil {
return err
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js/common"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/fsext"
Expand All @@ -35,6 +36,9 @@ import (
// cmdRun handles the `k6 run` sub-command
type cmdRun struct {
gs *state.GlobalState

// TODO: figure out something more elegant?
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
}

const (
Expand Down Expand Up @@ -96,7 +100,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
c.gs.Events.UnsubscribeAll()
}()

test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig)
test, controller, err := c.loadConfiguredTest(cmd, args)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,7 +132,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {

// Create a local execution scheduler wrapping the runner.
logger.Debug("Initializing the execution scheduler...")
execScheduler, err := execution.NewScheduler(testRunState)
execScheduler, err := execution.NewScheduler(testRunState, controller)
if err != nil {
return err
}
Expand Down Expand Up @@ -456,6 +460,10 @@ func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfigu
func getCmdRun(gs *state.GlobalState) *cobra.Command {
c := &cmdRun{
gs: gs,
loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) {
test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig)
return test, local.NewController(), err
},
}

exampleText := getExampleText(gs, `
Expand Down
6 changes: 3 additions & 3 deletions cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type loadedTest struct {
moduleResolver *modules.ModuleResolver
}

func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loadedTest, error) {
func loadLocalTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loadedTest, error) {
if len(args) < 1 {
return nil, fmt.Errorf("k6 needs at least one argument to load the test")
}
Expand Down Expand Up @@ -241,11 +241,11 @@ type loadedAndConfiguredTest struct {
derivedConfig Config
}

func loadAndConfigureTest(
func loadAndConfigureLocalTest(
gs *state.GlobalState, cmd *cobra.Command, args []string,
cliConfigGetter func(flags *pflag.FlagSet) (Config, error),
) (*loadedAndConfiguredTest, error) {
test, err := loadTest(gs, cmd, args)
test, err := loadLocalTest(gs, cmd, args)
if err != nil {
return nil, err
}
Expand Down
53 changes: 53 additions & 0 deletions execution/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package execution

// Controller implementations are used to control the k6 execution of a test or
// test suite, either locally or in a distributed environment.
type Controller interface {
// GetOrCreateData requests the data chunk with the given ID, if it already
// exists. If it doesn't (i.e. this was the first time this function was
// called with that ID), the given callback is called and its result and
// error are saved for the ID and returned for all other calls with it.
//
// This is an atomic and single-flight function, so any calls to it while the callback is
// being executed the the same ID will wait for the first call to to finish
// and receive its result.
//
// TODO: split apart into `Once()`, `SetData(), `GetData()` and implement
// the GetOrCreateData() behavior in a helper like the ones below?
GetOrCreateData(ID string, callback func() ([]byte, error)) ([]byte, error)

// Signal is used to notify that the current instance has reached the given
// event ID, or that it has had an error.
Signal(eventID string, err error) error

// Subscribe creates a listener for the specified event ID and returns a
// callback that can wait until all other instances have reached it.
Subscribe(eventID string) (wait func() error)
}

// SignalAndWait implements a rendezvous point / barrier, a way for all
// instances to reach the same execution point and wait for each other, before
// they all ~simultaneously continue with the execution.
//
// It subscribes for the given event ID, signals that the current instance has
// reached it without an error, and then waits until all other instances have
// reached it or until there is an error in one of them.
func SignalAndWait(c Controller, eventID string) error {
wait := c.Subscribe(eventID)

if err := c.Signal(eventID, nil); err != nil {
return err
}
return wait()
}

// SignalErrorOrWait is a helper method that either immediately signals the
// given error and returns it, or it signals nominal completion and waits for
// all other instances to do the same (or signal an error themselves).
func SignalErrorOrWait(c Controller, eventID string, err error) error {
if err != nil {
_ = c.Signal(eventID, err)
return err // return the same error we got
}
return SignalAndWait(c, eventID)
}
45 changes: 45 additions & 0 deletions execution/local/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Package local implements the execution.Controller interface for local
// (single-machine) k6 execution.
package local

// Controller "controls" local tests. It doesn't actually do anything, it just
// implements the execution.Controller interface with no-op operations. The
// methods don't do anything because local tests have only a single instance.
//
// However, for test suites (https://github.com/grafana/k6/issues/1342) in the
// future, we will probably need to actually implement some of these methods and
// introduce simple synchronization primitives even for a single machine...
type Controller struct{}

// NewController creates a new local execution Controller.
func NewController() *Controller {
return &Controller{}
}

// GetOrCreateData immediately calls the given callback and returns its results.
func (c *Controller) GetOrCreateData(_ string, callback func() ([]byte, error)) ([]byte, error) {
return callback()
}

// Subscribe is a no-op, it doesn't actually wait for anything, because there is
// nothing to wait on - we only have one instance in local tests.
//
// TODO: actually use waitgroups, since this may actually matter for test
// suites, even for local test runs. That's because multiple tests might be
// executed even by a single instance, and if we have complicated flows (e.g.
// "test C is executed only after test A and test B finish"), the easiest way
// would be for different tests in the suite to reuse this Controller API *both*
// local and distributed runs.
func (c *Controller) Subscribe(_ string) func() error {
return func() error {
return nil
}
}

// Signal is a no-op, it doesn't actually do anything for local test runs.
//
// TODO: similar to Wait() above, this might actually be required for
// complex/branching test suites, even during local non-distributed execution.
func (c *Controller) Signal(_ string, _ error) error {
return nil
}
61 changes: 56 additions & 5 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
// executors, running setup() and teardown(), and actually starting the
// executors for the different scenarios at the appropriate times.
type Scheduler struct {
controller Controller

initProgress *pb.ProgressBar
executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID)
executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work
Expand All @@ -33,7 +35,7 @@ type Scheduler struct {
// initializing it beyond the bare minimum. Specifically, it creates the needed
// executor instances and a lot of state placeholders, but it doesn't initialize
// the executors and it doesn't initialize or run VUs.
func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
func NewScheduler(trs *lib.TestRunState, controller Controller) (*Scheduler, error) {
options := trs.Options
et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence)
if err != nil {
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
maxDuration: maxDuration,
maxPossibleVUs: maxPossibleVUs,
state: executionState,
controller: controller,
}, nil
}

Expand Down Expand Up @@ -380,6 +383,13 @@ func (e *Scheduler) Init(
) (stopVUEmission func(), initErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

if err := SignalAndWait(e.controller, "scheduler-init-start"); err != nil {
return nil, err
}
defer func() {
initErr = SignalErrorOrWait(e.controller, "scheduler-init-done", initErr)
}()

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
stopVUEmission = func() {
Expand Down Expand Up @@ -409,12 +419,16 @@ func (e *Scheduler) Init(
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

if err := SignalAndWait(e.controller, "scheduler-run-start"); err != nil {
return err
}
defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
runErr = interruptErr
}
runErr = SignalErrorOrWait(e.controller, "scheduler-run-done", runErr)
}()

e.initProgress.Modify(pb.WithConstLeft("Run"))
Expand All @@ -430,6 +444,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}

if err := SignalAndWait(e.controller, "test-ready-to-run-setup"); err != nil {
return err
}

e.initProgress.Modify(pb.WithConstProgress(1, "Starting test..."))
e.state.MarkStarted()
defer e.state.MarkEnded()
Expand All @@ -449,11 +467,27 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
if !e.state.Test.Options.NoSetup.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusSetup)
e.initProgress.Modify(pb.WithConstProgress(1, "setup()"))
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
actuallyRanSetup := false
data, err := e.controller.GetOrCreateData("setup", func() ([]byte, error) {
actuallyRanSetup = true
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
return nil, err
}
return e.state.Test.Runner.GetSetupData(), nil
})
if err != nil {
return err
}
if !actuallyRanSetup {
e.state.Test.Runner.SetSetupData(data)
}
}

if err := SignalAndWait(e.controller, "setup-done"); err != nil {
return err
}

e.initProgress.Modify(pb.WithHijack(e.getRunStats))

// Start all executors at their particular startTime in a separate goroutine...
Expand All @@ -469,6 +503,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
// Wait for all executors to finish
var firstErr error
for range e.executors {
// TODO: add logic to abort the test early if there was an error from
// the controller (e.g. some other instance for this test died)
err := <-runResults
if err != nil && firstErr == nil {
logger.WithError(err).Debug("Executor returned with an error, cancelling test run...")
Expand All @@ -477,19 +513,34 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}

if err := SignalAndWait(e.controller, "execution-done"); err != nil {
return err
}

// Run teardown() after all executors are done, if it's not disabled
if !e.state.Test.Options.NoTeardown.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusTeardown)
e.initProgress.Modify(pb.WithConstProgress(1, "teardown()"))

// We run teardown() with the global context, so it isn't interrupted by
// thresholds or test.abort() or even Ctrl+C (unless used twice).
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
// TODO: add a `sync.Once` equivalent?
_, err := e.controller.GetOrCreateData("teardown", func() ([]byte, error) {
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
return nil, err
}
return nil, nil
})
if err != nil {
return err
}
}

if err := SignalAndWait(e.controller, "teardown-done"); err != nil {
return err
}

return firstErr
}

Expand Down
Loading

0 comments on commit 033d768

Please sign in to comment.