Skip to content

Commit

Permalink
Add tracing instrumentation (#3445)
Browse files Browse the repository at this point in the history
This adds lib/trace package which provides APIs in order to interact with Open Telemetry
tracing instrumentation.


And a flag `--traces-output` that can be used to configure the traces output by setting the
destination endpoint, protocol, extra headers, etc.

As well as providing the TracerProvider to the js extensions/modules through the `lib.State`.
  • Loading branch information
ka3de authored Nov 16, 2023
1 parent c545ad2 commit 7a30976
Show file tree
Hide file tree
Showing 248 changed files with 49,119 additions and 17 deletions.
65 changes: 57 additions & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/lib/trace"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
Expand All @@ -40,11 +41,17 @@ type cmdRun struct {
gs *state.GlobalState
}

// We use an excessively high timeout to wait for event processing to complete,
// since prematurely proceeding before it is done could create bigger problems.
// In practice, this effectively acts as no timeout, and the user will have to
// kill k6 if a hang happens, which is the behavior without events anyway.
const waitEventDoneTimeout = 30 * time.Minute
const (
// We use an excessively high timeout to wait for event processing to complete,
// since prematurely proceeding before it is done could create bigger problems.
// In practice, this effectively acts as no timeout, and the user will have to
// kill k6 if a hang happens, which is the behavior without events anyway.
waitEventDoneTimeout = 30 * time.Minute

// This timeout should be long enough to flush all remaining traces, but still
// provides a safeguard to not block indefinitely.
waitForTracerProviderStopTimeout = 3 * time.Minute
)

// TODO: split apart some more
//
Expand Down Expand Up @@ -105,6 +112,17 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

if err = c.setupTracerProvider(globalCtx, test); err != nil {
return err
}
waitTracesFlushed := func() {
ctx, cancel := context.WithTimeout(globalCtx, waitForTracerProviderStopTimeout)
defer cancel()
if tpErr := test.preInitState.TracerProvider.Shutdown(ctx); tpErr != nil {
logger.Errorf("The tracer provider didn't stop gracefully: %v", tpErr)
}
}

// Write the full consolidated *and derived* options back to the Runner.
conf := test.derivedConfig
testRunState, err := test.buildTestRunState(conf.Options)
Expand Down Expand Up @@ -250,10 +268,25 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}

defer func() {
logger.Debug("Waiting for metric processing to finish...")
logger.Debug("Waiting for metrics and traces processing to finish...")
close(samples)
waitOutputsFlushed()
logger.Debug("Metrics processing finished!")

ww := [...]func(){
waitOutputsFlushed,
waitTracesFlushed,
}
var wg sync.WaitGroup
wg.Add(len(ww))
for _, w := range ww {
w := w
go func() {
w()
wg.Done()
}()
}
wg.Wait()

logger.Debug("Metrics and traces processing finished!")
}()

// Spin up the REST API server, if not disabled.
Expand Down Expand Up @@ -407,6 +440,22 @@ func (c *cmdRun) flagSet() *pflag.FlagSet {
return flags
}

func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
ro := test.preInitState.RuntimeOptions
if ro.TracesOutput.String == "none" {
test.preInitState.TracerProvider = trace.NewNoopTracerProvider()
return nil
}

tp, err := trace.TracerProviderFromConfigLine(ctx, ro.TracesOutput.String)
if err != nil {
return err
}
test.preInitState.TracerProvider = tp

return nil
}

func getCmdRun(gs *state.GlobalState) *cobra.Command {
c := &cmdRun{
gs: gs,
Expand Down
9 changes: 9 additions & 0 deletions cmd/runtime_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ extended: base + Babel with parts of ES2015 preset
"",
"output the end-of-test summary report to JSON file",
)
flags.String("traces-output", "none",
"set the output for k6 traces, possible values are none,otel[=host:port]")
return flags
}

Expand Down Expand Up @@ -66,6 +68,7 @@ func getRuntimeOptions(flags *pflag.FlagSet, environment map[string]string) (lib
NoThresholds: getNullBool(flags, "no-thresholds"),
NoSummary: getNullBool(flags, "no-summary"),
SummaryExport: getNullString(flags, "summary-export"),
TracesOutput: getNullString(flags, "traces-output"),
Env: make(map[string]string),
}

Expand Down Expand Up @@ -104,6 +107,12 @@ func getRuntimeOptions(flags *pflag.FlagSet, environment map[string]string) (lib
}
}

if envVar, ok := environment["K6_TRACES_OUTPUT"]; ok {
if !opts.TracesOutput.Valid {
opts.TracesOutput = null.StringFrom(envVar)
}
}

if opts.IncludeSystemEnvVars.Bool { // If enabled, gather the actual system environment variables
opts.Env = environment
}
Expand Down
56 changes: 53 additions & 3 deletions cmd/runtime_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ func testRuntimeOptionsCase(t *testing.T, tc runtimeOptionsTestCase) {
func TestRuntimeOptions(t *testing.T) {
t.Parallel()
var (
defaultCompatMode = null.NewString("extended", false)
baseCompatMode = null.NewString("base", true)
extendedCompatMode = null.NewString("extended", true)
defaultCompatMode = null.NewString("extended", false)
baseCompatMode = null.NewString("base", true)
extendedCompatMode = null.NewString("extended", true)
defaultTracesOutput = null.NewString("none", false)
)

runtimeOptionsTestCases := map[string]runtimeOptionsTestCase{
Expand All @@ -119,6 +120,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: nil,
TracesOutput: defaultTracesOutput,
},
},
"disabled sys env by default": {
Expand All @@ -128,6 +130,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(false, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{},
TracesOutput: defaultTracesOutput,
},
},
"disabled sys env by default with ext compat mode": {
Expand All @@ -137,6 +140,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(false, false),
CompatibilityMode: extendedCompatMode,
Env: map[string]string{},
TracesOutput: defaultTracesOutput,
},
},
"disabled sys env by cli 1": {
Expand All @@ -147,6 +151,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(false, true),
CompatibilityMode: baseCompatMode,
Env: map[string]string{},
TracesOutput: defaultTracesOutput,
},
},
"disabled sys env by cli 2": {
Expand All @@ -157,6 +162,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(false, true),
CompatibilityMode: baseCompatMode,
Env: map[string]string{},
TracesOutput: defaultTracesOutput,
},
},
"disabled sys env by env": {
Expand All @@ -166,6 +172,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(false, true),
CompatibilityMode: extendedCompatMode,
Env: map[string]string{},
TracesOutput: defaultTracesOutput,
},
},
"enabled sys env by env": {
Expand All @@ -175,6 +182,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, true),
CompatibilityMode: extendedCompatMode,
Env: map[string]string{"K6_INCLUDE_SYSTEM_ENV_VARS": "true", "K6_COMPATIBILITY_MODE": "extended"},
TracesOutput: defaultTracesOutput,
},
},
"enabled sys env by default": {
Expand All @@ -185,6 +193,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1"},
TracesOutput: defaultTracesOutput,
},
},
"enabled sys env by cli 1": {
Expand All @@ -195,6 +204,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, true),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1"},
TracesOutput: defaultTracesOutput,
},
},
"enabled sys env by cli 2": {
Expand All @@ -205,6 +215,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, true),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1"},
TracesOutput: defaultTracesOutput,
},
},
"run only system env": {
Expand All @@ -215,6 +226,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1"},
TracesOutput: defaultTracesOutput,
},
},
"mixed system and cli env": {
Expand All @@ -225,6 +237,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1", "test2": "", "test3": "val3", "test4": "", "test5": ""},
TracesOutput: defaultTracesOutput,
},
},
"mixed system and cli env 2": {
Expand All @@ -235,6 +248,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, true),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1", "test2": "", "test3": "val3", "test4": "", "test5": ""},
TracesOutput: defaultTracesOutput,
},
},
"disabled system env with cli params": {
Expand All @@ -245,6 +259,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(false, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test2": "val2"},
TracesOutput: defaultTracesOutput,
},
},
"overwriting system env with cli param": {
Expand All @@ -255,6 +270,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "val1cli"},
TracesOutput: defaultTracesOutput,
},
},
"error wrong compat mode env var value": {
Expand Down Expand Up @@ -296,6 +312,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "value 1", "test2": "value 2"},
TracesOutput: defaultTracesOutput,
},
},
"valid env vars with special chars": {
Expand All @@ -306,6 +323,7 @@ func TestRuntimeOptions(t *testing.T) {
IncludeSystemEnvVars: null.NewBool(true, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{"test1": "value 1", "test2": "value,2", "test3": ` , ,,, value, ,, 2!'@#,"`},
TracesOutput: defaultTracesOutput,
},
},
"summary and thresholds from env": {
Expand All @@ -318,6 +336,7 @@ func TestRuntimeOptions(t *testing.T) {
NoThresholds: null.NewBool(false, true),
NoSummary: null.NewBool(false, true),
SummaryExport: null.NewString("foo", true),
TracesOutput: defaultTracesOutput,
},
},
"summary and thresholds from env overwritten by CLI": {
Expand All @@ -331,6 +350,7 @@ func TestRuntimeOptions(t *testing.T) {
NoThresholds: null.NewBool(true, true),
NoSummary: null.NewBool(true, true),
SummaryExport: null.NewString("bar", true),
TracesOutput: defaultTracesOutput,
},
},
"env var error detected even when CLI flags overwrite 1": {
Expand All @@ -345,6 +365,36 @@ func TestRuntimeOptions(t *testing.T) {
cliFlags: []string{"--no-summary", "true"},
expErr: true,
},
"traces output default": {
useSysEnv: false,
expRTOpts: lib.RuntimeOptions{
IncludeSystemEnvVars: null.NewBool(false, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{},
TracesOutput: null.NewString("none", false),
},
},
"traces output from env": {
useSysEnv: false,
systemEnv: map[string]string{"K6_TRACES_OUTPUT": "foo"},
expRTOpts: lib.RuntimeOptions{
IncludeSystemEnvVars: null.NewBool(false, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{},
TracesOutput: null.NewString("foo", true),
},
},
"traces output from env overwritten by CLI": {
useSysEnv: false,
systemEnv: map[string]string{"K6_TRACES_OUTPUT": "foo"},
cliFlags: []string{"--traces-output", "bar"},
expRTOpts: lib.RuntimeOptions{
IncludeSystemEnvVars: null.NewBool(false, false),
CompatibilityMode: defaultCompatMode,
Env: map[string]string{},
TracesOutput: null.NewString("bar", true),
},
},
}
for name, tc := range runtimeOptionsTestCases {
tc := tc
Expand Down
12 changes: 6 additions & 6 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func TestSetupTeardownThresholds(t *testing.T) {
assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Running thresholds on 4 metrics..."))
assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Finalizing thresholds..."))
assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Metrics emission of VUs and VUsMax metrics stopped"))
assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Metrics processing finished!"))
assert.True(t, testutils.LogContains(logMsgs, logrus.DebugLevel, "Metrics and traces processing finished!"))
}

func TestThresholdsFailed(t *testing.T) {
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestAbortedByThreshold(t *testing.T) {
assert.Contains(t, stdOut, `✗ iterations`)
assert.Contains(t, stdOut, `teardown() called`)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdOut, `level=debug msg="Metrics and traces processing finished!"`)
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=8 tainted=true`)
}

Expand Down Expand Up @@ -766,7 +766,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
assert.Contains(t, stdout, `✓ { group:::teardown }`)
assert.Contains(t, stdout, `Stopping k6 in response to signal`)
assert.Contains(t, stdout, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Metrics and traces processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
}

Expand Down Expand Up @@ -896,7 +896,7 @@ func TestAbortedByUserWithRestAPI(t *testing.T) {
assert.Contains(t, stdout, `PATCH /v1/status`)
assert.Contains(t, stdout, `level=error msg="test run stopped from REST API`)
assert.Contains(t, stdout, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Metrics and traces processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
assert.NotContains(t, stdout, `Running thresholds`)
assert.NotContains(t, stdout, `Finalizing thresholds`)
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func testAbortedByScriptError(t *testing.T, script string, runTest func(*testing
stdout := ts.Stdout.String()
t.Log(stdout)
assert.Contains(t, stdout, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Metrics and traces processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Everything has finished, exiting k6 with an error!"`)
assert.Contains(t, stdout, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`)
return ts
Expand Down Expand Up @@ -1191,7 +1191,7 @@ func testAbortedByScriptTestAbort(t *testing.T, script string, runTest func(*tes
assert.Contains(t, stdout, "test aborted: foo")
assert.Contains(t, stdout, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
assert.Contains(t, stdout, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdout, `level=debug msg="Metrics and traces processing finished!"`)
assert.Contains(t, stdout, "bogus summary")
}

Expand Down
Loading

0 comments on commit 7a30976

Please sign in to comment.