Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the registry for (sub-)metric validation and move data crunching out of the Engine #2426

Merged
merged 11 commits into from
Mar 29, 2022
12 changes: 8 additions & 4 deletions .github/workflows/xk6-tests/xk6-js-test/jstest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type (
// JSTest is meant to test xk6 and the JS extension sub-system of k6.
JSTest struct {
vu modules.VU

foos *stats.Metric
}
)

Expand All @@ -35,7 +37,10 @@ func New() *RootModule {
// NewModuleInstance implements the modules.Module interface and returns
// a new instance for each VU.
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return &JSTest{vu: vu}
return &JSTest{
vu: vu,
foos: vu.InitEnv().Registry.MustNewMetric("foos", stats.Counter),
}
}

// Exports implements the modules.Instance interface and returns the exports
Expand All @@ -45,20 +50,19 @@ func (j *JSTest) Exports() modules.Exports {
}

// Foo emits a foo metric
func (j JSTest) Foo(arg float64) (bool, error) {
func (j *JSTest) Foo(arg float64) (bool, error) {
state := j.vu.State()
if state == nil {
return false, fmt.Errorf("the VU State is not avaialble in the init context")
}

ctx := j.vu.Context()

allTheFoos := stats.New("foos", stats.Counter)
tags := state.CloneTags()
tags["foo"] = "bar"
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
Time: time.Now(),
Metric: allTheFoos, Tags: stats.IntoSampleTags(&tags),
Metric: j.foos, Tags: stats.IntoSampleTags(&tags),
Value: arg,
})

Expand Down
8 changes: 4 additions & 4 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func testHTTPHandler(rw http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -80,11 +80,11 @@ func TestLogger(t *testing.T) {
func TestWithEngine(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand Down
8 changes: 4 additions & 4 deletions api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func TestGetGroups(t *testing.T) {
Expand All @@ -49,11 +49,11 @@ func TestGetGroups(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))

execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

t.Run("list", func(t *testing.T) {
Expand Down
14 changes: 9 additions & 5 deletions api/v1/metric_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.Metrics, t)
engine.MetricsLock.Unlock()
engine.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t)
engine.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(metrics)
if err != nil {
Expand All @@ -56,13 +56,17 @@ func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
}

metric, ok := engine.Metrics[id]
engine.MetricsEngine.MetricsLock.Lock()
metric, ok := engine.MetricsEngine.ObservedMetrics[id]
if !ok {
engine.MetricsEngine.MetricsLock.Unlock()
apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound)
return
}
wrappedMetric := newMetricEnvelope(metric, t)
engine.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(newMetricEnvelope(metric, t))
data, err := json.Marshal(wrappedMetric)
if err != nil {
apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError)
return
Expand Down
22 changes: 11 additions & 11 deletions api/v1/metric_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/stats"
)

Expand All @@ -45,17 +45,17 @@ func TestGetMetrics(t *testing.T) {

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{
"my_metric": stats.New("my_metric", stats.Trend, stats.Time),
}
engine.Metrics["my_metric"].Tainted = null.BoolFrom(true)
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil))
Expand Down Expand Up @@ -105,17 +105,17 @@ func TestGetMetric(t *testing.T) {

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{
"my_metric": stats.New("my_metric", stats.Trend, stats.Time),
}
engine.Metrics["my_metric"].Tainted = null.BoolFrom(true)
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

t.Run("nonexistent", func(t *testing.T) {
t.Parallel()
Expand Down
36 changes: 25 additions & 11 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand All @@ -39,10 +40,10 @@ import (
"go.k6.io/k6/core/local"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
)

func TestSetupData(t *testing.T) {
Expand Down Expand Up @@ -133,15 +134,17 @@ func TestSetupData(t *testing.T) {
},
},
}
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
for _, testCase := range testCases {
testCase := testCase

runTestCase := func(t *testing.T, tcid int) {
testCase := testCases[tcid]
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)

runner, err := js.New(
logger,
&loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: testCase.script},
Expand All @@ -159,19 +162,22 @@ func TestSetupData(t *testing.T) {
SetupTimeout: types.NullDurationFrom(5 * time.Second),
TeardownTimeout: types.NullDurationFrom(5 * time.Second),
})
execScheduler, err := local.NewExecutionScheduler(runner, logger)
execScheduler, err := local.NewExecutionScheduler(runner, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs()

globalCtx, globalCancel := context.WithCancel(context.Background())
runCtx, runCancel := context.WithCancel(globalCtx)
run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

defer wait()
defer globalCancel()

require.NoError(t, err)

errC := make(chan error)
go func() { errC <- run() }()

Expand Down Expand Up @@ -211,4 +217,12 @@ func TestSetupData(t *testing.T) {
}
})
}

for id := range testCases {
id := id
t.Run(fmt.Sprintf("testcase_%d", id), func(t *testing.T) {
t.Parallel()
runTestCase(t, id)
})
}
}
54 changes: 32 additions & 22 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func TestGetStatus(t *testing.T) {
t.Parallel()

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand Down Expand Up @@ -124,34 +124,44 @@ func TestPatchStatus(t *testing.T) {
Payload: []byte(`{"data":{"type":"status","id":"default","attributes":{"status":0,"paused":null,"vus":10,"vus-max":10,"stopped":false,"running":false,"tainted":false}}}`),
},
}
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))

scenarios := lib.ScenarioConfigs{}
err := json.Unmarshal([]byte(`
{"external": {"executor": "externally-controlled",
"vus": 0, "maxVUs": 10, "duration": "1s"}}`), &scenarios)
require.NoError(t, err)
options := lib.Options{Scenarios: scenarios}
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)

for name, testCase := range testData {
t.Run(name, func(t *testing.T) {
t.Parallel()
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))

execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, logger)
scenarios := lib.ScenarioConfigs{}
err := json.Unmarshal([]byte(`
{"external": {"executor": "externally-controlled",
"vus": 0, "maxVUs": 10, "duration": "0"}}`), &scenarios)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
options := lib.Options{Scenarios: scenarios}

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, builtinMetrics, logger)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
run, _, err := engine.Init(ctx, ctx)
engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

go func() { _ = run() }()
require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
run, wait, err := engine.Init(ctx, ctx)
require.NoError(t, err)

defer func() {
cancel()
wait()
}()

go func() {
assert.NoError(t, run())
}()
// wait for the executor to initialize to avoid a potential data race below
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "PATCH", "/v1/status", bytes.NewReader(testCase.Payload)))
Expand Down
4 changes: 2 additions & 2 deletions cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
globalCancel()
}()
}
hardStop := func(sig os.Signal) {
onHardStop := func(sig os.Signal) {
logger.WithField("sig", sig).Error("Aborting k6 in response to signal, we won't wait for the test to end.")
}
stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, hardStop)
stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, onHardStop)
defer stopSignalHandling()

et, err := lib.NewExecutionTuple(test.derivedConfig.ExecutionSegment, test.derivedConfig.ExecutionSegmentSequence)
Expand Down
8 changes: 4 additions & 4 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,23 @@ func printToStdout(gs *globalState, s string) {
}

// Trap Interrupts, SIGINTs and SIGTERMs and call the given.
func handleTestAbortSignals(gs *globalState, firstHandler, secondHandler func(os.Signal)) (stop func()) {
func handleTestAbortSignals(gs *globalState, gracefulStopHandler, onHardStop func(os.Signal)) (stop func()) {
sigC := make(chan os.Signal, 2)
done := make(chan struct{})
gs.signalNotify(sigC, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

go func() {
select {
case sig := <-sigC:
firstHandler(sig)
gracefulStopHandler(sig)
case <-done:
return
}

select {
case sig := <-sigC:
if secondHandler != nil {
secondHandler(sig)
if onHardStop != nil {
onHardStop(sig)
}
// If we get a second signal, we immediately exit, so something like
// https://github.com/k6io/k6/issues/971 never happens again
Expand Down
Loading