Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move most capabilities out of the core.Engine in preparation for removal #2813

Merged
merged 4 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions api/common/context.go

This file was deleted.

43 changes: 26 additions & 17 deletions api/server.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
package api

import (
"context"
"fmt"
"net/http"
"time"

"github.com/sirupsen/logrus"

"go.k6.io/k6/api/common"
v1 "go.k6.io/k6/api/v1"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

func newHandler(logger logrus.FieldLogger) http.Handler {
func newHandler(cs *v1.ControlSurface) http.Handler {
mux := http.NewServeMux()
mux.Handle("/v1/", v1.NewHandler())
mux.Handle("/ping", handlePing(logger))
mux.Handle("/", handlePing(logger))
mux.Handle("/v1/", v1.NewHandler(cs))
mux.Handle("/ping", handlePing(cs.RunState.Logger))
mux.Handle("/", handlePing(cs.RunState.Logger))
return mux
}

// GetServer returns a http.Server instance that can serve k6's REST API.
func GetServer(addr string, engine *core.Engine, logger logrus.FieldLogger) *http.Server {
mux := withEngine(engine, newLogger(logger, newHandler(logger)))
func GetServer(
runCtx context.Context, addr string, runState *lib.TestRunState,
samples chan metrics.SampleContainer, me *engine.MetricsEngine, es *execution.Scheduler,
) *http.Server {
// TODO: reduce the control surface as much as possible? For example, if
// we refactor the Runner API, we won't need to send the Samples channel.
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, do we have an issue with it? I didn't find much searching by Runner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't think we have a dedicated issue for that 🤔 We have a few code TODOs like this one, and some tangentially related issues like #1048 and #2869, but nothing dedicated 🤔

The problem is that we (or at least I) don't have a clear vision for how it should refactored yet... For example, I know that we need to change the name, since Runner is super confusing when it is essentially a "factory" for VUs... 😅 But since I don't have a clear idea how we can refactor it, I also don't know into what to rename it 😞

So yeah, this is why we have the TODO here and in the Runner:

k6/lib/runner.go

Lines 41 to 85 in 1d99b0b

// A Runner is a factory for VUs. It should precompute as much as possible upon
// creation (parse ASTs, load files into memory, etc.), so that spawning VUs
// becomes as fast as possible. The Runner doesn't actually *do* anything in
// itself, the ExecutionScheduler is responsible for wrapping and scheduling
// these VUs for execution.
//
// TODO: Rename this to something more obvious? This name made sense a very long
// time ago.
type Runner interface {
// Creates an Archive of the runner. There should be a corresponding NewFromArchive() function
// that will restore the runner from the archive.
MakeArchive() *Archive
// Spawns a new VU. It's fine to make this function rather heavy, if it means a performance
// improvement at runtime. Remember, this is called once per VU and normally only at the start
// of a test - RunOnce() may be called hundreds of thousands of times, and must be fast.
NewVU(ctx context.Context, idLocal, idGlobal uint64, out chan<- metrics.SampleContainer) (InitializedVU, error)
// Runs pre-test setup, if applicable.
Setup(ctx context.Context, out chan<- metrics.SampleContainer) error
// Returns json representation of the setup data if setup() is specified and run, nil otherwise
GetSetupData() []byte
// Saves the externally supplied setup data as json in the runner
SetSetupData([]byte)
// Runs post-test teardown, if applicable.
Teardown(ctx context.Context, out chan<- metrics.SampleContainer) error
// Returns the default (root) Group.
GetDefaultGroup() *Group
// Get and set options. The initial value will be whatever the script specifies (for JS,
// `export let options = {}`); cmd/run.go will mix this in with CLI-, config- and env-provided
// values and write it back to the runner.
GetOptions() Options
SetOptions(opts Options) error
// Returns whether the given name is an exported and executable
// function in the script.
IsExecutable(string) bool
HandleSummary(context.Context, *Summary) (map[string]io.Reader, error)
}

I also have some very vague ideas on how to refactor setup/teardown/handleSummary handling so they don't require their own dedicated methods, and also how to refactor the archive handling to be somewhat separate, but everything is super vague at the moment and probably full of problems 😞 Someone needs to start doing it, so we can hit the real issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to create a new issue after all: #2873

Even though we don't have the whole picture, it doesn't hurt to have a place to collect problems that need to be fixed

cs := &v1.ControlSurface{
RunCtx: runCtx,
Samples: samples,
MetricsEngine: me,
Scheduler: es,
RunState: runState,
}

mux := withLoggingHandler(runState.Logger, newHandler(cs))
return &http.Server{Addr: addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second}
}

Expand All @@ -36,8 +52,8 @@ func (w wrappedResponseWriter) WriteHeader(status int) {
w.ResponseWriter.WriteHeader(status)
}

// newLogger returns the middleware which logs response status for request.
func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
// withLoggingHandler returns the middleware which logs response status for request.
func withLoggingHandler(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
wrapped := wrappedResponseWriter{ResponseWriter: rw, status: 200} // The default status code is 200 if it's not set
next.ServeHTTP(wrapped, r)
Expand All @@ -46,13 +62,6 @@ func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
}
}

func withEngine(engine *core.Engine, next http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
r = r.WithContext(common.WithEngine(r.Context(), engine))
next.ServeHTTP(rw, r)
})
}

func handlePing(logger logrus.FieldLogger) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "text/plain; charset=utf-8")
Expand Down
37 changes: 2 additions & 35 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@ import (
"github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/api/common"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func testHTTPHandler(rw http.ResponseWriter, r *http.Request) {
Expand All @@ -37,7 +30,7 @@ func TestLogger(t *testing.T) {

l, hook := logtest.NewNullLogger()
l.Level = logrus.DebugLevel
newLogger(l, http.HandlerFunc(testHTTPHandler))(rw, r)
withLoggingHandler(l, http.HandlerFunc(testHTTPHandler))(rw, r)

res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)
Expand All @@ -57,36 +50,10 @@ func TestLogger(t *testing.T) {
}
}

func TestWithEngine(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
testState := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: logger,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
},
Options: lib.Options{},
Runner: &minirunner.MiniRunner{},
}

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

rw := httptest.NewRecorder()
r := httptest.NewRequest("GET", "http://example.com/", nil)
withEngine(engine, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
assert.Equal(t, engine, common.GetEngine(r.Context()))
}))(rw, r)
}

func TestPing(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
mux := newHandler(logger)
mux := handlePing(logger)

rw := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/ping", nil)
Expand Down
20 changes: 20 additions & 0 deletions api/v1/control_surface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package v1

import (
"context"

"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

// ControlSurface includes the methods the REST API can use to control and
// communicate with the rest of k6.
type ControlSurface struct {
RunCtx context.Context
Samples chan metrics.SampleContainer
MetricsEngine *engine.MetricsEngine
Scheduler *execution.Scheduler
RunState *lib.TestRunState
}
14 changes: 4 additions & 10 deletions api/v1/group_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package v1
import (
"encoding/json"
"net/http"

"go.k6.io/k6/api/common"
)

func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroups(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) {
root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil)
groups := FlattenGroup(root)

data, err := json.Marshal(newGroupsJSONAPI(groups))
Expand All @@ -21,10 +17,8 @@ func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetGroup(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroup(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) {
root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil)
groups := FlattenGroup(root)

var group *Group
Expand Down
33 changes: 25 additions & 8 deletions api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -10,12 +11,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

func getTestPreInitState(tb testing.TB) *lib.TestPreInitState {
Expand All @@ -41,6 +42,26 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
tb.Cleanup(cancel)
ctx, _ = execution.NewTestRunContext(ctx, testState.Logger)

return &ControlSurface{
RunCtx: ctx,
Samples: make(chan metrics.SampleContainer, 1000),
MetricsEngine: me,
Scheduler: execScheduler,
RunState: testState,
}
}

func TestGetGroups(t *testing.T) {
g0, err := lib.NewGroup("", nil)
assert.NoError(t, err)
Expand All @@ -49,15 +70,11 @@ func TestGetGroups(t *testing.T) {
g2, err := g1.Group("group 2")
assert.NoError(t, err)

testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0})
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)
cs := getControlSurface(t, getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}))

t.Run("list", func(t *testing.T) {
rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups", nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups", nil))
res := rw.Result()
body := rw.Body.Bytes()
assert.Equal(t, http.StatusOK, res.StatusCode)
Expand Down Expand Up @@ -105,7 +122,7 @@ func TestGetGroups(t *testing.T) {
for _, gp := range []*lib.Group{g0, g1, g2} {
t.Run(gp.Name, func(t *testing.T) {
rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups/"+gp.ID, nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups/"+gp.ID, nil))
res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)
})
Expand Down
32 changes: 13 additions & 19 deletions api/v1/metric_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@ import (
"encoding/json"
"net/http"
"time"

"go.k6.io/k6/api/common"
)

func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

func handleGetMetrics(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.Scheduler != nil {
t = cs.Scheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t)
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(cs.MetricsEngine.ObservedMetrics, t)
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(metrics)
if err != nil {
Expand All @@ -28,23 +24,21 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

func handleGetMetric(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.Scheduler != nil {
t = cs.Scheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsEngine.MetricsLock.Lock()
metric, ok := engine.MetricsEngine.ObservedMetrics[id]
cs.MetricsEngine.MetricsLock.Lock()
metric, ok := cs.MetricsEngine.ObservedMetrics[id]
if !ok {
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Unlock()
apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound)
return
}
wrappedMetric := newMetricEnvelope(metric, t)
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(wrappedMetric)
if err != nil {
Expand Down
Loading