Skip to content

Commit

Permalink
Make the REST API handlers independent from the core.Engine
Browse files Browse the repository at this point in the history
After this commit, the only place the Engine is used in the api/ package tests, and only in a few places that will be easy to refactor after we remove it completely.
  • Loading branch information
na-- committed Dec 8, 2022
1 parent 577089f commit 195c2ab
Show file tree
Hide file tree
Showing 22 changed files with 184 additions and 275 deletions.
25 changes: 0 additions & 25 deletions api/common/context.go

This file was deleted.

39 changes: 24 additions & 15 deletions api/server.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
package api

import (
"context"
"fmt"
"net/http"
"time"

"github.com/sirupsen/logrus"

"go.k6.io/k6/api/common"
v1 "go.k6.io/k6/api/v1"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

func newHandler(logger logrus.FieldLogger) http.Handler {
func newHandler(cs *v1.ControlSurface) http.Handler {
mux := http.NewServeMux()
mux.Handle("/v1/", v1.NewHandler())
mux.Handle("/ping", handlePing(logger))
mux.Handle("/", handlePing(logger))
mux.Handle("/v1/", v1.NewHandler(cs))
mux.Handle("/ping", handlePing(cs.RunState.Logger))
mux.Handle("/", handlePing(cs.RunState.Logger))
return mux
}

// GetServer returns a http.Server instance that can serve k6's REST API.
func GetServer(addr string, engine *core.Engine, logger logrus.FieldLogger) *http.Server {
mux := withEngine(engine, newLogger(logger, newHandler(logger)))
func GetServer(
runCtx context.Context, addr string, runState *lib.TestRunState,
samples chan metrics.SampleContainer, me *engine.MetricsEngine, es *execution.Scheduler,
) *http.Server {
// TODO: reduce the control surface as much as possible? For example, if
// we refactor the Runner API, we won't need to send the Samples channel.
cs := &v1.ControlSurface{
RunCtx: runCtx,
Samples: samples,
MetricsEngine: me,
Scheduler: es,
RunState: runState,
}

mux := newLogger(runState.Logger, newHandler(cs))
return &http.Server{Addr: addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second}
}

Expand All @@ -46,13 +62,6 @@ func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
}
}

func withEngine(engine *core.Engine, next http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
r = r.WithContext(common.WithEngine(r.Context(), engine))
next.ServeHTTP(rw, r)
})
}

func handlePing(logger logrus.FieldLogger) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "text/plain; charset=utf-8")
Expand Down
35 changes: 1 addition & 34 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@ import (
"github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/api/common"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func testHTTPHandler(rw http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -57,36 +50,10 @@ func TestLogger(t *testing.T) {
}
}

func TestWithEngine(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
testState := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: logger,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
},
Options: lib.Options{},
Runner: &minirunner.MiniRunner{},
}

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

rw := httptest.NewRecorder()
r := httptest.NewRequest("GET", "http://example.com/", nil)
withEngine(engine, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
assert.Equal(t, engine, common.GetEngine(r.Context()))
}))(rw, r)
}

func TestPing(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
mux := newHandler(logger)
mux := handlePing(logger)

rw := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/ping", nil)
Expand Down
20 changes: 20 additions & 0 deletions api/v1/control_surface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package v1

import (
"context"

"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

// ControlSurface includes the methods the REST API can use to control and
// communicate with the rest of k6.
type ControlSurface struct {
RunCtx context.Context
Samples chan metrics.SampleContainer
MetricsEngine *engine.MetricsEngine
Scheduler *execution.Scheduler
RunState *lib.TestRunState
}
14 changes: 4 additions & 10 deletions api/v1/group_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package v1
import (
"encoding/json"
"net/http"

"go.k6.io/k6/api/common"
)

func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroups(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) {
root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil)
groups := FlattenGroup(root)

data, err := json.Marshal(newGroupsJSONAPI(groups))
Expand All @@ -21,10 +17,8 @@ func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetGroup(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroup(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) {
root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil)
groups := FlattenGroup(root)

var group *Group
Expand Down
33 changes: 25 additions & 8 deletions api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -10,12 +11,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

func getTestPreInitState(tb testing.TB) *lib.TestPreInitState {
Expand All @@ -41,6 +42,26 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
tb.Cleanup(cancel)
ctx, _ = execution.NewTestRunContext(ctx, testState.Logger)

return &ControlSurface{
RunCtx: ctx,
Samples: make(chan metrics.SampleContainer, 1000),
MetricsEngine: me,
Scheduler: execScheduler,
RunState: testState,
}
}

func TestGetGroups(t *testing.T) {
g0, err := lib.NewGroup("", nil)
assert.NoError(t, err)
Expand All @@ -49,15 +70,11 @@ func TestGetGroups(t *testing.T) {
g2, err := g1.Group("group 2")
assert.NoError(t, err)

testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0})
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)
cs := getControlSurface(t, getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}))

t.Run("list", func(t *testing.T) {
rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups", nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups", nil))
res := rw.Result()
body := rw.Body.Bytes()
assert.Equal(t, http.StatusOK, res.StatusCode)
Expand Down Expand Up @@ -105,7 +122,7 @@ func TestGetGroups(t *testing.T) {
for _, gp := range []*lib.Group{g0, g1, g2} {
t.Run(gp.Name, func(t *testing.T) {
rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups/"+gp.ID, nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups/"+gp.ID, nil))
res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)
})
Expand Down
32 changes: 13 additions & 19 deletions api/v1/metric_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@ import (
"encoding/json"
"net/http"
"time"

"go.k6.io/k6/api/common"
)

func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

func handleGetMetrics(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.Scheduler != nil {
t = cs.Scheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t)
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(cs.MetricsEngine.ObservedMetrics, t)
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(metrics)
if err != nil {
Expand All @@ -28,23 +24,21 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

func handleGetMetric(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.Scheduler != nil {
t = cs.Scheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsEngine.MetricsLock.Lock()
metric, ok := engine.MetricsEngine.ObservedMetrics[id]
cs.MetricsEngine.MetricsLock.Lock()
metric, ok := cs.MetricsEngine.ObservedMetrics[id]
if !ok {
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Unlock()
apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound)
return
}
wrappedMetric := newMetricEnvelope(metric, t)
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(wrappedMetric)
if err != nil {
Expand Down
26 changes: 9 additions & 17 deletions api/v1/metric_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
Expand All @@ -23,18 +21,15 @@ func TestGetMetrics(t *testing.T) {
testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{})
testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time)
require.NoError(t, err)
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)
cs := getControlSurface(t, testState)

engine.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{
cs.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{
"my_metric": testMetric,
}
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)
cs.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics", nil))
res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)

Expand Down Expand Up @@ -82,21 +77,18 @@ func TestGetMetric(t *testing.T) {
testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{})
testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time)
require.NoError(t, err)
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)
cs := getControlSurface(t, testState)

engine.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{
cs.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{
"my_metric": testMetric,
}
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)
cs.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

t.Run("nonexistent", func(t *testing.T) {
t.Parallel()

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics/notreal", nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics/notreal", nil))
res := rw.Result()
assert.Equal(t, http.StatusNotFound, res.StatusCode)
})
Expand All @@ -105,7 +97,7 @@ func TestGetMetric(t *testing.T) {
t.Parallel()

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics/my_metric", nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics/my_metric", nil))
res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)

Expand Down
Loading

0 comments on commit 195c2ab

Please sign in to comment.