Skip to content

Commit

Permalink
Move the Engine data crunching logic in a new component under metrics/
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Mar 18, 2022
1 parent 45aea93 commit 2cfd7ef
Show file tree
Hide file tree
Showing 11 changed files with 452 additions and 262 deletions.
14 changes: 9 additions & 5 deletions api/v1/metric_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.Metrics, t)
engine.MetricsLock.Unlock()
engine.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t)
engine.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(metrics)
if err != nil {
Expand All @@ -56,13 +56,17 @@ func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
}

metric, ok := engine.Metrics[id]
engine.MetricsEngine.MetricsLock.Lock()
metric, ok := engine.MetricsEngine.ObservedMetrics[id]
if !ok {
engine.MetricsEngine.MetricsLock.Unlock()
apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound)
return
}
wrappedMetric := newMetricEnvelope(metric, t)
engine.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(newMetricEnvelope(metric, t))
data, err := json.Marshal(wrappedMetric)
if err != nil {
apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError)
return
Expand Down
8 changes: 4 additions & 4 deletions api/v1/metric_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func TestGetMetrics(t *testing.T) {
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{
"my_metric": stats.New("my_metric", stats.Trend, stats.Time),
}
engine.Metrics["my_metric"].Tainted = null.BoolFrom(true)
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil))
Expand Down Expand Up @@ -112,10 +112,10 @@ func TestGetMetric(t *testing.T) {
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{
"my_metric": stats.New("my_metric", stats.Trend, stats.Time),
}
engine.Metrics["my_metric"].Tainted = null.BoolFrom(true)
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

t.Run("nonexistent", func(t *testing.T) {
t.Parallel()
Expand Down
30 changes: 22 additions & 8 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -133,12 +134,14 @@ func TestSetupData(t *testing.T) {
},
},
}
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
for _, testCase := range testCases {
testCase := testCase

runTestCase := func(t *testing.T, tcid int) {
testCase := testCases[tcid]
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)

t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

Expand All @@ -164,14 +167,17 @@ func TestSetupData(t *testing.T) {
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs()

globalCtx, globalCancel := context.WithCancel(context.Background())
runCtx, runCancel := context.WithCancel(globalCtx)
run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

defer wait()
defer globalCancel()

require.NoError(t, err)

errC := make(chan error)
go func() { errC <- run() }()

Expand Down Expand Up @@ -211,4 +217,12 @@ func TestSetupData(t *testing.T) {
}
})
}

for id := range testCases {
id := id
t.Run(fmt.Sprintf("testcase_%d", id), func(t *testing.T) {
t.Parallel()
runTestCase(t, id)
})
}
}
29 changes: 18 additions & 11 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"go.k6.io/k6/js/common"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/output"
"go.k6.io/k6/ui/pb"
)

Expand Down Expand Up @@ -120,7 +119,10 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {
return err
}

// TODO: remove
// TODO: create a MetricsEngine here and add its ingester to the list of
// outputs (unless both NoThresholds and NoSummary were enabled)

// TODO: remove this completely
// Create the engine.
initBar.Modify(pb.WithConstProgress(0, "Init engine"))
engine, err := core.NewEngine(
Expand Down Expand Up @@ -151,17 +153,20 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {

// We do this here so we can get any output URLs below.
initBar.Modify(pb.WithConstProgress(0, "Starting outputs"))
outputManager := output.NewManager(outputs, logger, func(err error) {
if err != nil {
logger.WithError(err).Error("Received error to stop from output")
}
runCancel()
})
err = outputManager.StartOutputs()
// TODO: re-enable the code below
/*
outputManager := output.NewManager(outputs, logger, func(err error) {
if err != nil {
logger.WithError(err).Error("Received error to stop from output")
}
runCancel()
})
*/
err = engine.OutputManager.StartOutputs()
if err != nil {
return err
}
defer outputManager.StopOutputs()
defer engine.OutputManager.StopOutputs()

printExecutionDescription(
c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs,
Expand Down Expand Up @@ -234,8 +239,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {

// Handle the end-of-test summary.
if !test.runtimeOptions.NoSummary.Bool {
engine.MetricsEngine.MetricsLock.Lock() // TODO: refactor so this is not needed
summaryResult, err := test.initRunner.HandleSummary(globalCtx, &lib.Summary{
Metrics: engine.Metrics,
Metrics: engine.MetricsEngine.ObservedMetrics,
RootGroup: execScheduler.GetRunner().GetDefaultGroup(),
TestRunDuration: executionState.GetCurrentTestRunDuration(),
NoColor: c.gs.flags.noColor,
Expand All @@ -244,6 +250,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {
IsStdErrTTY: c.gs.stdErr.isTTY,
},
})
engine.MetricsEngine.MetricsLock.Unlock()
if err == nil {
err = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult)
}
Expand Down
Loading

0 comments on commit 2cfd7ef

Please sign in to comment.