From 195c2aba9482ca7fbe570f428eed02148171b804 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 8 Dec 2022 12:20:36 +0200 Subject: [PATCH] Make the REST API handlers independent from the core.Engine After this commit, the only place the Engine is used in the api/ package tests, and only in a few places that will be easy to refactor after we remove it completely. --- api/common/context.go | 25 ------------------ api/server.go | 39 +++++++++++++++++----------- api/server_test.go | 35 +------------------------ api/v1/control_surface.go | 20 ++++++++++++++ api/v1/group_routes.go | 14 +++------- api/v1/group_routes_test.go | 33 +++++++++++++++++------ api/v1/metric_routes.go | 32 ++++++++++------------- api/v1/metric_routes_test.go | 26 +++++++------------ api/v1/routes.go | 23 ++++++++-------- api/v1/routes_test.go | 22 ---------------- api/v1/setup_teardown_routes.go | 26 +++++++------------ api/v1/setup_teardown_routes_test.go | 23 ++++++++++------ api/v1/status.go | 15 +++++++---- api/v1/status_jsonapi.go | 8 ++---- api/v1/status_routes.go | 24 +++++++++-------- api/v1/status_routes_test.go | 32 +++++++++++++---------- cmd/common.go | 2 ++ cmd/integration_test.go | 2 +- cmd/run.go | 4 +-- core/engine.go | 34 +++--------------------- core/engine_test.go | 15 ----------- execution/scheduler.go | 5 ---- 22 files changed, 184 insertions(+), 275 deletions(-) delete mode 100644 api/common/context.go create mode 100644 api/v1/control_surface.go delete mode 100644 api/v1/routes_test.go diff --git a/api/common/context.go b/api/common/context.go deleted file mode 100644 index 7340f222d7a..00000000000 --- a/api/common/context.go +++ /dev/null @@ -1,25 +0,0 @@ -package common - -import ( - "context" - - "go.k6.io/k6/core" -) - -type ContextKey int - -const ctxKeyEngine = ContextKey(1) - -// WithEngine sets the k6 running Engine in the under the hood context. -// -// Deprecated: Use directly the Engine as dependency. -func WithEngine(ctx context.Context, engine *core.Engine) context.Context { - return context.WithValue(ctx, ctxKeyEngine, engine) -} - -// GetEngine returns the k6 running Engine fetching it from the context. -// -// Deprecated: Use directly the Engine as dependency. -func GetEngine(ctx context.Context) *core.Engine { - return ctx.Value(ctxKeyEngine).(*core.Engine) -} diff --git a/api/server.go b/api/server.go index caed31b692f..5dd71b00d5c 100644 --- a/api/server.go +++ b/api/server.go @@ -1,28 +1,44 @@ package api import ( + "context" "fmt" "net/http" "time" "github.com/sirupsen/logrus" - "go.k6.io/k6/api/common" v1 "go.k6.io/k6/api/v1" - "go.k6.io/k6/core" + "go.k6.io/k6/execution" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" ) -func newHandler(logger logrus.FieldLogger) http.Handler { +func newHandler(cs *v1.ControlSurface) http.Handler { mux := http.NewServeMux() - mux.Handle("/v1/", v1.NewHandler()) - mux.Handle("/ping", handlePing(logger)) - mux.Handle("/", handlePing(logger)) + mux.Handle("/v1/", v1.NewHandler(cs)) + mux.Handle("/ping", handlePing(cs.RunState.Logger)) + mux.Handle("/", handlePing(cs.RunState.Logger)) return mux } // GetServer returns a http.Server instance that can serve k6's REST API. -func GetServer(addr string, engine *core.Engine, logger logrus.FieldLogger) *http.Server { - mux := withEngine(engine, newLogger(logger, newHandler(logger))) +func GetServer( + runCtx context.Context, addr string, runState *lib.TestRunState, + samples chan metrics.SampleContainer, me *engine.MetricsEngine, es *execution.Scheduler, +) *http.Server { + // TODO: reduce the control surface as much as possible? For example, if + // we refactor the Runner API, we won't need to send the Samples channel. + cs := &v1.ControlSurface{ + RunCtx: runCtx, + Samples: samples, + MetricsEngine: me, + Scheduler: es, + RunState: runState, + } + + mux := newLogger(runState.Logger, newHandler(cs)) return &http.Server{Addr: addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second} } @@ -46,13 +62,6 @@ func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc { } } -func withEngine(engine *core.Engine, next http.Handler) http.HandlerFunc { - return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - r = r.WithContext(common.WithEngine(r.Context(), engine)) - next.ServeHTTP(rw, r) - }) -} - func handlePing(logger logrus.FieldLogger) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { rw.Header().Add("Content-Type", "text/plain; charset=utf-8") diff --git a/api/server_test.go b/api/server_test.go index dd7fc06c814..3278727c1d4 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -9,15 +9,8 @@ import ( "github.com/sirupsen/logrus" logtest "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.k6.io/k6/api/common" - "go.k6.io/k6/core" - "go.k6.io/k6/execution" - "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" - "go.k6.io/k6/lib/testutils/minirunner" - "go.k6.io/k6/metrics" ) func testHTTPHandler(rw http.ResponseWriter, r *http.Request) { @@ -57,36 +50,10 @@ func TestLogger(t *testing.T) { } } -func TestWithEngine(t *testing.T) { - logger := logrus.New() - logger.SetOutput(testutils.NewTestOutput(t)) - registry := metrics.NewRegistry() - testState := &lib.TestRunState{ - TestPreInitState: &lib.TestPreInitState{ - Logger: logger, - Registry: registry, - BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), - }, - Options: lib.Options{}, - Runner: &minirunner.MiniRunner{}, - } - - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) - - rw := httptest.NewRecorder() - r := httptest.NewRequest("GET", "http://example.com/", nil) - withEngine(engine, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - assert.Equal(t, engine, common.GetEngine(r.Context())) - }))(rw, r) -} - func TestPing(t *testing.T) { logger := logrus.New() logger.SetOutput(testutils.NewTestOutput(t)) - mux := newHandler(logger) + mux := handlePing(logger) rw := httptest.NewRecorder() r := httptest.NewRequest("GET", "/ping", nil) diff --git a/api/v1/control_surface.go b/api/v1/control_surface.go new file mode 100644 index 00000000000..12689cd1e3c --- /dev/null +++ b/api/v1/control_surface.go @@ -0,0 +1,20 @@ +package v1 + +import ( + "context" + + "go.k6.io/k6/execution" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" +) + +// ControlSurface includes the methods the REST API can use to control and +// communicate with the rest of k6. +type ControlSurface struct { + RunCtx context.Context + Samples chan metrics.SampleContainer + MetricsEngine *engine.MetricsEngine + Scheduler *execution.Scheduler + RunState *lib.TestRunState +} diff --git a/api/v1/group_routes.go b/api/v1/group_routes.go index d84056eeeb7..d4c9f1773e4 100644 --- a/api/v1/group_routes.go +++ b/api/v1/group_routes.go @@ -3,14 +3,10 @@ package v1 import ( "encoding/json" "net/http" - - "go.k6.io/k6/api/common" ) -func handleGetGroups(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - - root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil) +func handleGetGroups(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { + root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil) groups := FlattenGroup(root) data, err := json.Marshal(newGroupsJSONAPI(groups)) @@ -21,10 +17,8 @@ func handleGetGroups(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func handleGetGroup(rw http.ResponseWriter, r *http.Request, id string) { - engine := common.GetEngine(r.Context()) - - root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil) +func handleGetGroup(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) { + root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil) groups := FlattenGroup(root) var group *Group diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 460fb0e4635..e9b7868ab41 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -1,6 +1,7 @@ package v1 import ( + "context" "encoding/json" "net/http" "net/http/httptest" @@ -10,12 +11,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.k6.io/k6/core" "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" ) func getTestPreInitState(tb testing.TB) *lib.TestPreInitState { @@ -41,6 +42,26 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib } } +func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface { + execScheduler, err := execution.NewScheduler(testState) + require.NoError(tb, err) + + me, err := engine.NewMetricsEngine(execScheduler.GetState()) + require.NoError(tb, err) + + ctx, cancel := context.WithCancel(context.Background()) + tb.Cleanup(cancel) + ctx, _ = execution.NewTestRunContext(ctx, testState.Logger) + + return &ControlSurface{ + RunCtx: ctx, + Samples: make(chan metrics.SampleContainer, 1000), + MetricsEngine: me, + Scheduler: execScheduler, + RunState: testState, + } +} + func TestGetGroups(t *testing.T) { g0, err := lib.NewGroup("", nil) assert.NoError(t, err) @@ -49,15 +70,11 @@ func TestGetGroups(t *testing.T) { g2, err := g1.Group("group 2") assert.NoError(t, err) - testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0})) t.Run("list", func(t *testing.T) { rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups", nil)) res := rw.Result() body := rw.Body.Bytes() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -105,7 +122,7 @@ func TestGetGroups(t *testing.T) { for _, gp := range []*lib.Group{g0, g1, g2} { t.Run(gp.Name, func(t *testing.T) { rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups/"+gp.ID, nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups/"+gp.ID, nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) }) diff --git a/api/v1/metric_routes.go b/api/v1/metric_routes.go index a8e81bc4159..c98408f75e2 100644 --- a/api/v1/metric_routes.go +++ b/api/v1/metric_routes.go @@ -4,21 +4,17 @@ import ( "encoding/json" "net/http" "time" - - "go.k6.io/k6/api/common" ) -func handleGetMetrics(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - +func handleGetMetrics(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { var t time.Duration - if engine.ExecutionScheduler != nil { - t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration() + if cs.Scheduler != nil { + t = cs.Scheduler.GetState().GetCurrentTestRunDuration() } - engine.MetricsEngine.MetricsLock.Lock() - metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t) - engine.MetricsEngine.MetricsLock.Unlock() + cs.MetricsEngine.MetricsLock.Lock() + metrics := newMetricsJSONAPI(cs.MetricsEngine.ObservedMetrics, t) + cs.MetricsEngine.MetricsLock.Unlock() data, err := json.Marshal(metrics) if err != nil { @@ -28,23 +24,21 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) { - engine := common.GetEngine(r.Context()) - +func handleGetMetric(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) { var t time.Duration - if engine.ExecutionScheduler != nil { - t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration() + if cs.Scheduler != nil { + t = cs.Scheduler.GetState().GetCurrentTestRunDuration() } - engine.MetricsEngine.MetricsLock.Lock() - metric, ok := engine.MetricsEngine.ObservedMetrics[id] + cs.MetricsEngine.MetricsLock.Lock() + metric, ok := cs.MetricsEngine.ObservedMetrics[id] if !ok { - engine.MetricsEngine.MetricsLock.Unlock() + cs.MetricsEngine.MetricsLock.Unlock() apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound) return } wrappedMetric := newMetricEnvelope(metric, t) - engine.MetricsEngine.MetricsLock.Unlock() + cs.MetricsEngine.MetricsLock.Unlock() data, err := json.Marshal(wrappedMetric) if err != nil { diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 5612d5f3523..f39ed3c1317 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" - "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" @@ -23,18 +21,15 @@ func TestGetMetrics(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, testState) - engine.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ + cs.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ "my_metric": testMetric, } - engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) + cs.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics", nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -82,21 +77,18 @@ func TestGetMetric(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, testState) - engine.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ + cs.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ "my_metric": testMetric, } - engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) + cs.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) t.Run("nonexistent", func(t *testing.T) { t.Parallel() rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics/notreal", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics/notreal", nil)) res := rw.Result() assert.Equal(t, http.StatusNotFound, res.StatusCode) }) @@ -105,7 +97,7 @@ func TestGetMetric(t *testing.T) { t.Parallel() rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics/my_metric", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics/my_metric", nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) diff --git a/api/v1/routes.go b/api/v1/routes.go index 27d1e361bef..81fe99305c7 100644 --- a/api/v1/routes.go +++ b/api/v1/routes.go @@ -5,15 +5,16 @@ import ( "net/http" ) -func NewHandler() http.Handler { +// NewHandler returns the top handler for the v1 REST APIs +func NewHandler(cs *ControlSurface) http.Handler { mux := http.NewServeMux() mux.HandleFunc("/v1/status", func(rw http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: - handleGetStatus(rw, r) + handleGetStatus(cs, rw, r) case http.MethodPatch: - handlePatchStatus(rw, r) + handlePatchStatus(cs, rw, r) default: rw.WriteHeader(http.StatusMethodNotAllowed) } @@ -24,7 +25,7 @@ func NewHandler() http.Handler { rw.WriteHeader(http.StatusMethodNotAllowed) return } - handleGetMetrics(rw, r) + handleGetMetrics(cs, rw, r) }) mux.HandleFunc("/v1/metrics/", func(rw http.ResponseWriter, r *http.Request) { @@ -34,7 +35,7 @@ func NewHandler() http.Handler { } id := r.URL.Path[len("/v1/metrics/"):] - handleGetMetric(rw, r, id) + handleGetMetric(cs, rw, r, id) }) mux.HandleFunc("/v1/groups", func(rw http.ResponseWriter, r *http.Request) { @@ -43,7 +44,7 @@ func NewHandler() http.Handler { return } - handleGetGroups(rw, r) + handleGetGroups(cs, rw, r) }) mux.HandleFunc("/v1/groups/", func(rw http.ResponseWriter, r *http.Request) { @@ -53,17 +54,17 @@ func NewHandler() http.Handler { } id := r.URL.Path[len("/v1/groups/"):] - handleGetGroup(rw, r, id) + handleGetGroup(cs, rw, r, id) }) mux.HandleFunc("/v1/setup", func(rw http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: - handleRunSetup(rw, r) + handleRunSetup(cs, rw, r) case http.MethodPut: - handleSetSetupData(rw, r) + handleSetSetupData(cs, rw, r) case http.MethodGet: - handleGetSetupData(rw, r) + handleGetSetupData(cs, rw, r) default: rw.WriteHeader(http.StatusMethodNotAllowed) } @@ -75,7 +76,7 @@ func NewHandler() http.Handler { return } - handleRunTeardown(rw, r) + handleRunTeardown(cs, rw, r) }) return mux diff --git a/api/v1/routes_test.go b/api/v1/routes_test.go deleted file mode 100644 index ffbd9ebcc5f..00000000000 --- a/api/v1/routes_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package v1 - -import ( - "io" - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" - - "go.k6.io/k6/api/common" - "go.k6.io/k6/core" -) - -func newRequestWithEngine(engine *core.Engine, method, target string, body io.Reader) *http.Request { - r := httptest.NewRequest(method, target, body) - return r.WithContext(common.WithEngine(r.Context(), engine)) -} - -func TestNewHandler(t *testing.T) { - assert.NotNil(t, NewHandler()) -} diff --git a/api/v1/setup_teardown_routes.go b/api/v1/setup_teardown_routes.go index 207c809c981..0e7a90a82e0 100644 --- a/api/v1/setup_teardown_routes.go +++ b/api/v1/setup_teardown_routes.go @@ -4,8 +4,6 @@ import ( "encoding/json" "io/ioutil" "net/http" - - "go.k6.io/k6/api/common" ) // NullSetupData is wrapper around null to satisfy jsonapi @@ -38,13 +36,12 @@ func handleSetupDataOutput(rw http.ResponseWriter, setupData json.RawMessage) { } // handleGetSetupData just returns the current JSON-encoded setup data -func handleGetSetupData(rw http.ResponseWriter, r *http.Request) { - runner := common.GetEngine(r.Context()).ExecutionScheduler.GetRunner() - handleSetupDataOutput(rw, runner.GetSetupData()) +func handleGetSetupData(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { + handleSetupDataOutput(rw, cs.RunState.Runner.GetSetupData()) } // handleSetSetupData just parses the JSON request body and sets the result as setup data for the runner -func handleSetSetupData(rw http.ResponseWriter, r *http.Request) { +func handleSetSetupData(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { apiError(rw, "Error reading request body", err.Error(), http.StatusBadRequest) @@ -59,8 +56,7 @@ func handleSetSetupData(rw http.ResponseWriter, r *http.Request) { } } - runner := common.GetEngine(r.Context()).ExecutionScheduler.GetRunner() - + runner := cs.RunState.Runner if len(body) == 0 { runner.SetSetupData(nil) } else { @@ -71,11 +67,10 @@ func handleSetSetupData(rw http.ResponseWriter, r *http.Request) { } // handleRunSetup executes the runner's Setup() method and returns the result -func handleRunSetup(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - runner := engine.ExecutionScheduler.GetRunner() +func handleRunSetup(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { + runner := cs.RunState.Runner - if err := runner.Setup(r.Context(), engine.Samples); err != nil { + if err := cs.RunState.Runner.Setup(r.Context(), cs.Samples); err != nil { apiError(rw, "Error executing setup", err.Error(), http.StatusInternalServerError) return } @@ -84,11 +79,8 @@ func handleRunSetup(rw http.ResponseWriter, r *http.Request) { } // handleRunTeardown executes the runner's Teardown() method -func handleRunTeardown(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - runner := common.GetEngine(r.Context()).ExecutionScheduler.GetRunner() - - if err := runner.Teardown(r.Context(), engine.Samples); err != nil { +func handleRunTeardown(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { + if err := cs.RunState.Runner.Teardown(r.Context(), cs.Samples); err != nil { apiError(rw, "Error executing teardown", err.Error(), http.StatusInternalServerError) } } diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 4b9e06b150e..7d0571fc94d 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -141,25 +141,34 @@ func TestSetupData(t *testing.T) { engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) + globalCtx, globalCancel := context.WithCancel(context.Background()) + t.Cleanup(globalCancel) + runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger) + defer runAbort(fmt.Errorf("unexpected abort")) + require.NoError(t, engine.OutputManager.StartOutputs()) defer engine.OutputManager.StopOutputs(nil) - globalCtx, globalCancel := context.WithCancel(context.Background()) - runCtx, runCancel := context.WithCancel(globalCtx) + cs := &ControlSurface{ + RunCtx: runCtx, + Samples: engine.Samples, + MetricsEngine: engine.MetricsEngine, + Scheduler: execScheduler, + RunState: testState, + } run, wait, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) defer wait() - defer globalCancel() errC := make(chan error) go func() { errC <- run() }() - handler := NewHandler() + handler := NewHandler(cs) checkSetup := func(method, body, expResult string) { rw := httptest.NewRecorder() - handler.ServeHTTP(rw, newRequestWithEngine(engine, method, "/v1/setup", bytes.NewBufferString(body))) + handler.ServeHTTP(rw, httptest.NewRequest(method, "/v1/setup", bytes.NewBufferString(body))) res := rw.Result() if !assert.Equal(t, http.StatusOK, res.StatusCode) { t.Logf("body: %s\n", rw.Body.String()) @@ -179,14 +188,12 @@ func TestSetupData(t *testing.T) { checkSetup(setupRun[0], setupRun[1], setupRun[2]) } - require.NoError(t, engine.ExecutionScheduler.SetPaused(false)) + require.NoError(t, cs.Scheduler.SetPaused(false)) select { case <-time.After(10 * time.Second): - runCancel() t.Fatal("Test timed out") case err := <-errC: - runCancel() require.NoError(t, err) } }) diff --git a/api/v1/status.go b/api/v1/status.go index ad33ce60c86..0915c08a8c4 100644 --- a/api/v1/status.go +++ b/api/v1/status.go @@ -3,7 +3,6 @@ package v1 import ( "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" "go.k6.io/k6/lib" ) @@ -18,15 +17,21 @@ type Status struct { Tainted bool `json:"tainted" yaml:"tainted"` } -func NewStatus(engine *core.Engine) Status { - executionState := engine.ExecutionScheduler.GetState() +func newStatus(cs *ControlSurface) Status { + executionState := cs.Scheduler.GetState() + isStopped := false + select { + case <-cs.RunCtx.Done(): + isStopped = true + default: + } return Status{ Status: executionState.GetCurrentExecutionStatus(), Running: executionState.HasStarted() && !executionState.HasEnded(), Paused: null.BoolFrom(executionState.IsPaused()), - Stopped: engine.IsStopped(), + Stopped: isStopped, VUs: null.IntFrom(executionState.GetCurrentlyActiveVUsCount()), VUsMax: null.IntFrom(executionState.GetInitializedVUsCount()), - Tainted: engine.IsTainted(), + Tainted: cs.MetricsEngine.GetMetricsWithBreachedThresholdsCount() > 0, } } diff --git a/api/v1/status_jsonapi.go b/api/v1/status_jsonapi.go index 5bd28bd31f8..42977ea1ba2 100644 --- a/api/v1/status_jsonapi.go +++ b/api/v1/status_jsonapi.go @@ -1,9 +1,5 @@ package v1 -import ( - "go.k6.io/k6/core" -) - // StatusJSONAPI is JSON API envelop for metrics type StatusJSONAPI struct { Data statusData `json:"data"` @@ -31,6 +27,6 @@ type statusData struct { Attributes Status `json:"attributes"` } -func newStatusJSONAPIFromEngine(engine *core.Engine) StatusJSONAPI { - return NewStatusJSONAPI(NewStatus(engine)) +func newStatusJSONAPIFromEngine(cs *ControlSurface) StatusJSONAPI { + return NewStatusJSONAPI(newStatus(cs)) } diff --git a/api/v1/status_routes.go b/api/v1/status_routes.go index dc18a822660..1a1eefc3b46 100644 --- a/api/v1/status_routes.go +++ b/api/v1/status_routes.go @@ -3,18 +3,18 @@ package v1 import ( "encoding/json" "errors" + "fmt" "io/ioutil" "net/http" - "go.k6.io/k6/api/common" + "go.k6.io/k6/errext" + "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/execution" "go.k6.io/k6/lib/executor" ) -func handleGetStatus(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - - status := newStatusJSONAPIFromEngine(engine) +func handleGetStatus(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { + status := newStatusJSONAPIFromEngine(cs) data, err := json.Marshal(status) if err != nil { apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError) @@ -34,9 +34,8 @@ func getFirstExternallyControlledExecutor(execScheduler *execution.Scheduler) (* return nil, errors.New("an externally-controlled executor needs to be configured for live configuration updates") } -func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { +func handlePatchStatus(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { rw.Header().Set("Content-Type", "application/json; charset=utf-8") - engine := common.GetEngine(r.Context()) body, err := ioutil.ReadAll(r.Body) if err != nil { @@ -53,10 +52,13 @@ func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { status := statusEnvelop.Status() if status.Stopped { //nolint:nestif - engine.Stop() + execution.AbortTestRun(cs.RunCtx, errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(fmt.Errorf("test run stopped from REST API"), exitcodes.ScriptStoppedFromRESTAPI), + errext.AbortedByUser, + )) } else { if status.Paused.Valid { - if err = engine.ExecutionScheduler.SetPaused(status.Paused.Bool); err != nil { + if err = cs.Scheduler.SetPaused(status.Paused.Bool); err != nil { apiError(rw, "Pause error", err.Error(), http.StatusInternalServerError) return } @@ -66,7 +68,7 @@ func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { // TODO: add ability to specify the actual executor id? Though this should // likely be in the v2 REST API, where we could implement it in a way that // may allow us to eventually support other executor types. - executor, updateErr := getFirstExternallyControlledExecutor(engine.ExecutionScheduler) + executor, updateErr := getFirstExternallyControlledExecutor(cs.Scheduler) if updateErr != nil { apiError(rw, "Execution config error", updateErr.Error(), http.StatusInternalServerError) return @@ -85,7 +87,7 @@ func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { } } - data, err := json.Marshal(newStatusJSONAPIFromEngine(engine)) + data, err := json.Marshal(newStatusJSONAPIFromEngine(cs)) if err != nil { apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError) return diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index ea004345668..9e74f768e89 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -25,13 +25,10 @@ func TestGetStatus(t *testing.T) { t.Parallel() testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, testState) rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/status", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/status", nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -120,18 +117,27 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, engine.OutputManager.StartOutputs()) defer engine.OutputManager.StopOutputs(nil) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - runSubCtx, runSubAbort := execution.NewTestRunContext(ctx, testState.Logger) - engine.AbortFn = runSubAbort + globalCtx, globalCancel := context.WithCancel(context.Background()) + t.Cleanup(globalCancel) + runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger) + defer runAbort(fmt.Errorf("unexpected abort")) + engine.AbortFn = runAbort + + cs := &ControlSurface{ + RunCtx: runCtx, + Samples: engine.Samples, + MetricsEngine: engine.MetricsEngine, + Scheduler: execScheduler, + RunState: testState, + } - run, wait, err := engine.Init(ctx, runSubCtx) + run, wait, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(1) defer func() { - runSubAbort(fmt.Errorf("custom cancel signal")) + runAbort(fmt.Errorf("custom cancel signal")) wait() wg.Wait() }() @@ -144,7 +150,7 @@ func TestPatchStatus(t *testing.T) { time.Sleep(200 * time.Millisecond) rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "PATCH", "/v1/status", bytes.NewReader(testCase.Payload))) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodPatch, "/v1/status", bytes.NewReader(testCase.Payload))) res := rw.Result() require.Equal(t, "application/json; charset=utf-8", rw.Header().Get("Content-Type")) @@ -155,7 +161,7 @@ func TestPatchStatus(t *testing.T) { return } - status := NewStatus(engine) + status := newStatus(cs) if testCase.ExpectedStatus.Paused.Valid { assert.Equal(t, testCase.ExpectedStatus.Paused, status.Paused) } diff --git a/cmd/common.go b/cmd/common.go index da0c1aaf841..078c5baa1a0 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -73,6 +73,7 @@ func printToStdout(gs *globalState, s string) { // Trap Interrupts, SIGINTs and SIGTERMs and call the given. func handleTestAbortSignals(gs *globalState, gracefulStopHandler, onHardStop func(os.Signal)) (stop func()) { + gs.logger.Debug("Trapping interrupt signals so k6 can handle them gracefully...") sigC := make(chan os.Signal, 2) done := make(chan struct{}) gs.signalNotify(sigC, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) @@ -99,6 +100,7 @@ func handleTestAbortSignals(gs *globalState, gracefulStopHandler, onHardStop fun }() return func() { + gs.logger.Debug("Releasing signal trap...") close(done) gs.signalStop(sigC) } diff --git a/cmd/integration_test.go b/cmd/integration_test.go index d6281246a75..4195950bad3 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -788,7 +788,7 @@ func TestAbortedByUserWithRestAPI(t *testing.T) { assert.Contains(t, stdOut, `a simple iteration`) assert.Contains(t, stdOut, `teardown() called`) assert.Contains(t, stdOut, `PATCH /v1/status`) - assert.Contains(t, stdOut, `run: stopped by user via REST API; exiting...`) + assert.Contains(t, stdOut, `level=error msg="test run stopped from REST API"`) assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) diff --git a/cmd/run.go b/cmd/run.go index 1f2562dd806..2b0759f30b9 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -129,7 +129,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { defer srvCancel() // TODO: send the ExecutionState and MetricsEngine instead of the Engine - srv := api.GetServer(c.gs.flags.address, engine, logger) + srv := api.GetServer(runSubCtx, c.gs.flags.address, testRunState, engine.Samples, engine.MetricsEngine, execScheduler) go func() { defer apiWG.Done() logger.Debugf("Starting the REST API server on %s", c.gs.flags.address) @@ -236,7 +236,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { engine.MetricsEngine.MetricsLock.Lock() // TODO: refactor so this is not needed summaryResult, hsErr := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ Metrics: engine.MetricsEngine.ObservedMetrics, - RootGroup: execScheduler.GetRunner().GetDefaultGroup(), + RootGroup: testRunState.Runner.GetDefaultGroup(), TestRunDuration: executionState.GetCurrentTestRunDuration(), NoColor: c.gs.flags.noColor, UIState: lib.UIState{ diff --git a/core/engine.go b/core/engine.go index 0a45c379353..5e7e1e75c57 100644 --- a/core/engine.go +++ b/core/engine.go @@ -9,8 +9,6 @@ import ( "github.com/sirupsen/logrus" - "go.k6.io/k6/errext" - "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" @@ -40,10 +38,8 @@ type Engine struct { ingester output.Output - logger *logrus.Entry - stopOnce sync.Once - stopChan chan struct{} - AbortFn func(error) // temporary + logger *logrus.Entry + AbortFn func(error) // temporary Samples chan metrics.SampleContainer } @@ -59,7 +55,6 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o runtimeOptions: testState.RuntimeOptions, Samples: make(chan metrics.SampleContainer, testState.Options.MetricSamplesBufferSize.Int64), - stopChan: make(chan struct{}), logger: testState.Logger.WithField("component", "engine"), } @@ -78,7 +73,7 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o if err != nil { testState.Logger.WithError(err).Error("Received error to stop from output") } - e.Stop() + e.AbortFn(err) }) return e, nil @@ -231,26 +226,3 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu func (e *Engine) IsTainted() bool { return e.MetricsEngine.GetMetricsWithBreachedThresholdsCount() > 0 } - -// Stop closes a signal channel, forcing a running Engine to return -func (e *Engine) Stop() { - e.stopOnce.Do(func() { - e.logger.Debug("run: stopped by user via REST API; exiting...") - err := errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI), - errext.AbortedByUser, - ) - e.AbortFn(err) - close(e.stopChan) - }) -} - -// IsStopped returns a bool indicating whether the Engine has been stopped -func (e *Engine) IsStopped() bool { - select { - case <-e.stopChan: - return true - default: - return false - } -} diff --git a/core/engine_test.go b/core/engine_test.go index bc1562f22c6..0c93a381010 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -223,21 +223,6 @@ func TestEngineAtTime(t *testing.T) { assert.NoError(t, test.run()) } -func TestEngineStopped(t *testing.T) { - t.Parallel() - test := newTestEngine(t, nil, nil, nil, lib.Options{ - VUs: null.IntFrom(1), - Duration: types.NullDurationFrom(20 * time.Second), - }) - defer test.wait() - - assert.NoError(t, test.run()) - assert.Equal(t, false, test.engine.IsStopped(), "engine should be running") - test.engine.Stop() - assert.Equal(t, true, test.engine.IsStopped(), "engine should be stopped") - test.engine.Stop() // test that a second stop doesn't panic -} - func TestEngineOutput(t *testing.T) { t.Parallel() diff --git a/execution/scheduler.go b/execution/scheduler.go index a6ce357e76a..2ebb442bc50 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -90,11 +90,6 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { }, nil } -// GetRunner returns the wrapped lib.Runner instance. -func (e *Scheduler) GetRunner() lib.Runner { // TODO: remove - return e.state.Test.Runner -} - // GetState returns a pointer to the execution state struct for the execution // scheduler. It's guaranteed to be initialized and present, though see the // documentation in lib/execution.go for caveats about its usage. The most