diff --git a/output/cloud/output.go b/output/cloud/output.go index 600a2842cdc..dc4419893a0 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -4,33 +4,45 @@ package cloud import ( "errors" "fmt" - "net/http" "path/filepath" "strings" - "sync" "time" - easyjson "github.com/mailru/easyjson" "github.com/sirupsen/logrus" - "gopkg.in/guregu/null.v3" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/errext" - "go.k6.io/k6/errext/exitcodes" - "go.k6.io/k6/output" - "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" - "go.k6.io/k6/lib/netext" - "go.k6.io/k6/lib/netext/httpext" "go.k6.io/k6/metrics" + "go.k6.io/k6/output" + cloudv1 "go.k6.io/k6/output/cloud/v1" + "gopkg.in/guregu/null.v3" ) // TestName is the default k6 Cloud test name const TestName = "k6 test" +// versionedOutput represents an output implementing +// metrics samples aggregation and flushing to the +// Cloud remote service. +// +// It mainly differs from output.Output +// because it does not define Stop (that is deprecated) +// and Description. +type versionedOutput interface { + Start() error + StopWithTestError(testRunErr error) error + + SetTestRunStopCallback(func(error)) + SetReferenceID(id string) + + AddMetricSamples(samples []metrics.SampleContainer) +} + // Output sends result data to the k6 Cloud service. type Output struct { + versionedOutput + logger logrus.FieldLogger config cloudapi.Config referenceID string @@ -38,28 +50,9 @@ type Output struct { executionPlan []lib.ExecutionStep duration int64 // in seconds thresholds map[string][]*metrics.Threshold - client *MetricsClient - - bufferMutex sync.Mutex - bufferHTTPTrails []*httpext.Trail - bufferSamples []*Sample - - // TODO: optimize this - // - // Since the real-time metrics refactoring (https://github.com/k6io/k6/pull/678), - // we should no longer have to handle metrics that have times long in the past. So instead of a - // map, we can probably use a simple slice (or even an array!) as a ring buffer to store the - // aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod - // checks basically O(1). And even if for some reason there are occasional metrics with past times that - // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated - aggrBuckets map[int64]aggregationBucket - - stopSendingMetrics chan struct{} - stopAggregation chan struct{} - aggregationDone *sync.WaitGroup - stopOutput chan struct{} - outputDone *sync.WaitGroup - testStopFunc func(error) + + client *cloudapi.Client + testStopFunc func(error) } // Verify that Output implements the wanted interfaces @@ -121,17 +114,10 @@ func newOutput(params output.Params) (*Output, error) { return &Output{ config: conf, - client: NewMetricsClient(apiClient, logger, conf.Host.String, conf.NoCompress.Bool), + client: apiClient, executionPlan: params.ExecutionPlan, duration: int64(duration / time.Second), - aggrBuckets: map[int64]aggregationBucket{}, logger: logger, - - stopSendingMetrics: make(chan struct{}), - stopAggregation: make(chan struct{}), - aggregationDone: &sync.WaitGroup{}, - stopOutput: make(chan struct{}), - outputDone: &sync.WaitGroup{}, }, nil } @@ -164,12 +150,10 @@ func (out *Output) Start() error { if out.config.PushRefID.Valid { out.referenceID = out.config.PushRefID.String out.logger.WithField("referenceId", out.referenceID).Debug("directly pushing metrics without init") - out.startBackgroundProcesses() - return nil + return out.startVersionedOutput() } thresholds := make(map[string][]string) - for name, t := range out.thresholds { for _, threshold := range t { thresholds[name] = append(thresholds[name], threshold.Source) @@ -198,7 +182,10 @@ func (out *Output) Start() error { out.config = out.config.Apply(*response.ConfigOverride) } - out.startBackgroundProcesses() + err = out.startVersionedOutput() + if err != nil { + return fmt.Errorf("the Gateway Output failed to start a versioned output: %w", err) + } out.logger.WithFields(logrus.Fields{ "name": out.config.Name, @@ -209,52 +196,23 @@ func (out *Output) Start() error { return nil } -func (out *Output) startBackgroundProcesses() { - aggregationPeriod := out.config.AggregationPeriod.TimeDuration() - // If enabled, start periodically aggregating the collected HTTP trails - if aggregationPeriod > 0 { - out.aggregationDone.Add(1) - go func() { - defer out.aggregationDone.Done() - aggregationWaitPeriod := out.config.AggregationWaitPeriod.TimeDuration() - aggregationTicker := time.NewTicker(aggregationPeriod) - defer aggregationTicker.Stop() - - for { - select { - case <-out.stopSendingMetrics: - return - case <-aggregationTicker.C: - out.aggregateHTTPTrails(aggregationWaitPeriod) - case <-out.stopAggregation: - out.aggregateHTTPTrails(0) - out.flushHTTPTrails() - return - } - } - }() +// Description returns the URL with the test run results. +func (out *Output) Description() string { + return fmt.Sprintf("cloud (%s)", cloudapi.URLForResults(out.referenceID, out.config)) +} + +// SetThresholds receives the thresholds before the output is Start()-ed. +func (out *Output) SetThresholds(scriptThresholds map[string]metrics.Thresholds) { + thresholds := make(map[string][]*metrics.Threshold) + for name, t := range scriptThresholds { + thresholds[name] = append(thresholds[name], t.Thresholds...) } + out.thresholds = thresholds +} - out.outputDone.Add(1) - go func() { - defer out.outputDone.Done() - pushTicker := time.NewTicker(out.config.MetricPushInterval.TimeDuration()) - defer pushTicker.Stop() - for { - select { - case <-out.stopSendingMetrics: - return - default: - } - select { - case <-out.stopOutput: - out.pushMetrics() - return - case <-pushTicker.C: - out.pushMetrics() - } - } - }() +// SetTestRunStopCallback receives the function that stops the engine on error +func (out *Output) SetTestRunStopCallback(stopFunc func(error)) { + out.testStopFunc = stopFunc } // Stop gracefully stops all metric emission from the output: when all metric @@ -269,25 +227,47 @@ func (out *Output) Stop() error { // all metric samples are emitted, it makes a cloud API call to finish the test // run. If testErr was specified, it extracts the RunStatus from it. func (out *Output) StopWithTestError(testErr error) error { - out.logger.Debug("Stopping the cloud output...") - close(out.stopAggregation) - out.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation - out.logger.Debug("Aggregation stopped, stopping metric emission...") - close(out.stopOutput) - out.outputDone.Wait() + err := out.versionedOutput.StopWithTestError(testErr) + if err != nil { + out.logger.WithError(err).Error("An error occurred stopping the output") + // to notify the cloud backend we have no return here + } + out.logger.Debug("Metric emission stopped, calling cloud API...") - err := out.testFinished(testErr) + err = out.testFinished(testErr) if err != nil { out.logger.WithFields(logrus.Fields{"error": err}).Warn("Failed to send test finished to the cloud") - } else { - out.logger.Debug("Cloud output successfully stopped!") + return err } - return err + out.logger.Debug("Cloud output successfully stopped!") + return nil } -// Description returns the URL with the test run results. -func (out *Output) Description() string { - return fmt.Sprintf("cloud (%s)", cloudapi.URLForResults(out.referenceID, out.config)) +func (out *Output) testFinished(testErr error) error { + if out.referenceID == "" || out.config.PushRefID.Valid { + return nil + } + + testTainted := false + thresholdResults := make(cloudapi.ThresholdResult) + for name, thresholds := range out.thresholds { + thresholdResults[name] = make(map[string]bool) + for _, t := range thresholds { + thresholdResults[name][t.Source] = t.LastFailed + if t.LastFailed { + testTainted = true + } + } + } + + runStatus := out.getRunStatus(testErr) + out.logger.WithFields(logrus.Fields{ + "ref": out.referenceID, + "tainted": testTainted, + "run_status": runStatus, + }).Debug("Sending test finished") + + return out.client.TestFinished(out.referenceID, thresholdResults, testTainted, runStatus) } // getRunStatus determines the run status of the test based on the error. @@ -339,373 +319,18 @@ func (out *Output) getRunStatus(testErr error) cloudapi.RunStatus { return cloudapi.RunStatusAbortedSystem } -// SetThresholds receives the thresholds before the output is Start()-ed. -func (out *Output) SetThresholds(scriptThresholds map[string]metrics.Thresholds) { - thresholds := make(map[string][]*metrics.Threshold) - for name, t := range scriptThresholds { - thresholds[name] = append(thresholds[name], t.Thresholds...) - } - out.thresholds = thresholds -} - -// SetTestRunStopCallback receives the function that stops the engine on error -func (out *Output) SetTestRunStopCallback(stopFunc func(error)) { - out.testStopFunc = stopFunc -} - -// AddMetricSamples receives a set of metric samples. This method is never -// called concurrently, so it defers as much of the work as possible to the -// asynchronous goroutines initialized in Start(). -func (out *Output) AddMetricSamples(sampleContainers []metrics.SampleContainer) { - select { - case <-out.stopSendingMetrics: - return - default: - } - +func (out *Output) startVersionedOutput() error { if out.referenceID == "" { - return + return errors.New("ReferenceID is required") } - newSamples := []*Sample{} - newHTTPTrails := []*httpext.Trail{} - - for _, sampleContainer := range sampleContainers { - switch sc := sampleContainer.(type) { - case *httpext.Trail: - // Check if aggregation is enabled, - if out.config.AggregationPeriod.Duration > 0 { - newHTTPTrails = append(newHTTPTrails, sc) - } else { - newSamples = append(newSamples, NewSampleFromTrail(sc)) - } - case *netext.NetTrail: - // TODO: aggregate? - values := map[string]float64{ - metrics.DataSentName: float64(sc.BytesWritten), - metrics.DataReceivedName: float64(sc.BytesRead), - } - - if sc.FullIteration { - values[metrics.IterationDurationName] = metrics.D(sc.EndTime.Sub(sc.StartTime)) - values[metrics.IterationsName] = 1 - } - - encodedTags, err := easyjson.Marshal(sc.GetTags()) - if err != nil { - out.logger.WithError(err).Error("Encoding tags failed") - } - newSamples = append(newSamples, &Sample{ - Type: DataTypeMap, - Metric: "iter_li_all", - Data: &SampleDataMap{ - Time: toMicroSecond(sc.GetTime()), - Tags: encodedTags, - Values: values, - }, - }) - default: - for _, sample := range sampleContainer.GetSamples() { - encodedTags, err := easyjson.Marshal(sample.Tags) - if err != nil { - out.logger.WithError(err).Error("Encoding tags failed") - } - - newSamples = append(newSamples, &Sample{ - Type: DataTypeSingle, - Metric: sample.Metric.Name, - Data: &SampleDataSingle{ - Type: sample.Metric.Type, - Time: toMicroSecond(sample.Time), - Tags: encodedTags, - Value: sample.Value, - }, - }) - } - } - } - - if len(newSamples) > 0 || len(newHTTPTrails) > 0 { - out.bufferMutex.Lock() - out.bufferSamples = append(out.bufferSamples, newSamples...) - out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...) - out.bufferMutex.Unlock() - } -} - -//nolint:funlen,nestif,gocognit -func (out *Output) aggregateHTTPTrails(waitPeriod time.Duration) { - out.bufferMutex.Lock() - newHTTPTrails := out.bufferHTTPTrails - out.bufferHTTPTrails = nil - out.bufferMutex.Unlock() - - aggrPeriod := int64(out.config.AggregationPeriod.Duration) - - // Distribute all newly buffered HTTP trails into buckets and sub-buckets - for _, trail := range newHTTPTrails { - bucketID := trail.GetTime().UnixNano() / aggrPeriod - - // Get or create a time bucket for that trail period - bucket, ok := out.aggrBuckets[bucketID] - if !ok { - bucket = aggregationBucket{} - out.aggrBuckets[bucketID] = bucket - } - - subBucket, ok := bucket[trail.Tags] - if !ok { - subBucket = make([]*httpext.Trail, 0, 100) - } - bucket[trail.Tags] = append(subBucket, trail) - } - - // Which buckets are still new and we'll wait for trails to accumulate before aggregating - bucketCutoffID := time.Now().Add(-waitPeriod).UnixNano() / aggrPeriod - iqrRadius := out.config.AggregationOutlierIqrRadius.Float64 - iqrLowerCoef := out.config.AggregationOutlierIqrCoefLower.Float64 - iqrUpperCoef := out.config.AggregationOutlierIqrCoefUpper.Float64 - newSamples := []*Sample{} - - // Handle all aggregation buckets older than bucketCutoffID - for bucketID, subBucket := range out.aggrBuckets { - if bucketID > bucketCutoffID { - continue - } - - for tags, httpTrails := range subBucket { - // start := time.Now() // this is in a combination with the log at the end - trailCount := int64(len(httpTrails)) - if trailCount < out.config.AggregationMinSamples.Int64 { - for _, trail := range httpTrails { - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } - continue - } - encodedTags, err := easyjson.Marshal(tags) - if err != nil { - out.logger.WithError(err).Error("Encoding tags failed") - } - - aggrData := &SampleDataAggregatedHTTPReqs{ - Time: toMicroSecond(time.Unix(0, bucketID*aggrPeriod+aggrPeriod/2)), - Type: "aggregated_trend", - Tags: encodedTags, - } - - if out.config.AggregationSkipOutlierDetection.Bool { - // Simply add up all HTTP trails, no outlier detection - for _, trail := range httpTrails { - aggrData.Add(trail) - } - } else { - connDurations := make(durations, trailCount) - reqDurations := make(durations, trailCount) - for i, trail := range httpTrails { - connDurations[i] = trail.ConnDuration - reqDurations[i] = trail.Duration - } - - var minConnDur, maxConnDur, minReqDur, maxReqDur time.Duration - if trailCount < out.config.AggregationOutlierAlgoThreshold.Int64 { - // Since there are fewer samples, we'll use the interpolation-enabled and - // more precise sorting-based algorithm - minConnDur, maxConnDur = connDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) - minReqDur, maxReqDur = reqDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) - } else { - minConnDur, maxConnDur = connDurations.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) - minReqDur, maxReqDur = reqDurations.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) - } - - for _, trail := range httpTrails { - if trail.ConnDuration < minConnDur || - trail.ConnDuration > maxConnDur || - trail.Duration < minReqDur || - trail.Duration > maxReqDur { - // Seems like an outlier, add it as a standalone metric - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } else { - // Aggregate the trail - aggrData.Add(trail) - } - } - } - - aggrData.CalcAverages() - - if aggrData.Count > 0 { - newSamples = append(newSamples, &Sample{ - Type: DataTypeAggregatedHTTPReqs, - Metric: "http_req_li_all", - Data: aggrData, - }) - } - } - delete(out.aggrBuckets, bucketID) - } - - if len(newSamples) > 0 { - out.bufferMutex.Lock() - out.bufferSamples = append(out.bufferSamples, newSamples...) - out.bufferMutex.Unlock() - } -} - -func (out *Output) flushHTTPTrails() { - out.bufferMutex.Lock() - defer out.bufferMutex.Unlock() - - newSamples := []*Sample{} - for _, trail := range out.bufferHTTPTrails { - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } - for _, bucket := range out.aggrBuckets { - for _, subBucket := range bucket { - for _, trail := range subBucket { - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } - } - } - - out.bufferHTTPTrails = nil - out.aggrBuckets = map[int64]aggregationBucket{} - out.bufferSamples = append(out.bufferSamples, newSamples...) -} - -// shouldStopSendingMetrics returns true if the output should interrupt the metric flush. -// -// note: The actual test execution should continues, -// since for local k6 run tests the end-of-test summary (or any other outputs) will still work, -// but the cloud output doesn't send any more metrics. -// Instead, if cloudapi.Config.StopOnError is enabled -// the cloud output should stop the whole test run too. -// This logic should be handled by the caller. -func (out *Output) shouldStopSendingMetrics(err error) bool { - if err == nil { - return false - } - if errResp, ok := err.(cloudapi.ErrorResponse); ok && errResp.Response != nil { //nolint:errorlint - // The Cloud service returns the error code 4 when it doesn't accept any more metrics. - // So, when k6 sees that, the cloud output just stops prematurely. - return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 - } - - return false -} - -type pushJob struct { - done chan error - samples []*Sample -} - -// ceil(a/b) -func ceilDiv(a, b int) int { - r := a / b - if a%b != 0 { - r++ - } - return r -} - -func (out *Output) pushMetrics() { - out.bufferMutex.Lock() - if len(out.bufferSamples) == 0 { - out.bufferMutex.Unlock() - return - } - buffer := out.bufferSamples - out.bufferSamples = nil - out.bufferMutex.Unlock() - - count := len(buffer) - out.logger.WithFields(logrus.Fields{ - "samples": count, - }).Debug("Pushing metrics to cloud") - start := time.Now() - - numberOfPackages := ceilDiv(len(buffer), int(out.config.MaxMetricSamplesPerPackage.Int64)) - numberOfWorkers := int(out.config.MetricPushConcurrency.Int64) - if numberOfWorkers > numberOfPackages { - numberOfWorkers = numberOfPackages - } - - ch := make(chan pushJob, numberOfPackages) - for i := 0; i < numberOfWorkers; i++ { - go func() { - for job := range ch { - err := out.client.PushMetric(out.referenceID, job.samples) - job.done <- err - if out.shouldStopSendingMetrics(err) { - return - } - } - }() - } - - jobs := make([]pushJob, 0, numberOfPackages) - - for len(buffer) > 0 { - size := len(buffer) - if size > int(out.config.MaxMetricSamplesPerPackage.Int64) { - size = int(out.config.MaxMetricSamplesPerPackage.Int64) - } - job := pushJob{done: make(chan error, 1), samples: buffer[:size]} - ch <- job - jobs = append(jobs, job) - buffer = buffer[size:] - } - - close(ch) - - for _, job := range jobs { - err := <-job.done - if err != nil { - if out.shouldStopSendingMetrics(err) { - out.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error") - serr := errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), - errext.AbortedByOutput, - ) - if out.config.StopOnError.Bool { - out.testStopFunc(serr) - } - close(out.stopSendingMetrics) - break - } - out.logger.WithError(err).Warn("Failed to send metrics to cloud") - } - } - out.logger.WithFields(logrus.Fields{ - "samples": count, - "t": time.Since(start), - }).Debug("Pushing metrics to cloud finished") -} - -func (out *Output) testFinished(testErr error) error { - if out.referenceID == "" || out.config.PushRefID.Valid { - return nil - } - - testTainted := false - thresholdResults := make(cloudapi.ThresholdResult) - for name, thresholds := range out.thresholds { - thresholdResults[name] = make(map[string]bool) - for _, t := range thresholds { - thresholdResults[name][t.Source] = t.LastFailed - if t.LastFailed { - testTainted = true - } - } + var err error + out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client) + if err != nil { + return err } - runStatus := out.getRunStatus(testErr) - out.logger.WithFields(logrus.Fields{ - "ref": out.referenceID, - "tainted": testTainted, - "run_status": runStatus, - }).Debug("Sending test finished") - - return out.client.TestFinished(out.referenceID, thresholdResults, testTainted, runStatus) + out.versionedOutput.SetReferenceID(out.referenceID) + out.versionedOutput.SetTestRunStopCallback(out.testStopFunc) + return out.versionedOutput.Start() } - -const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate diff --git a/output/cloud/output_test.go b/output/cloud/output_test.go index 7c237d61044..62bdec23a91 100644 --- a/output/cloud/output_test.go +++ b/output/cloud/output_test.go @@ -1,661 +1,24 @@ package cloud import ( - "bytes" - "compress/gzip" - "encoding/json" "fmt" - "io" - "math/rand" "net/http" "net/http/httptest" "net/url" - "os" - "sort" - "strconv" - "sync" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" - - "go.k6.io/k6/cloudapi" "go.k6.io/k6/lib" - "go.k6.io/k6/lib/netext" - "go.k6.io/k6/lib/netext/httpext" "go.k6.io/k6/lib/testutils" - "go.k6.io/k6/lib/testutils/httpmultibin" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" "go.k6.io/k6/output" ) -func tagEqual(expected, got json.RawMessage) bool { - var expectedMap, gotMap map[string]string - err := json.Unmarshal(expected, &expectedMap) - if err != nil { - panic("tagEqual: " + err.Error()) - } - - err = json.Unmarshal(got, &gotMap) - if err != nil { - panic("tagEqual: " + err.Error()) - } - - if len(expectedMap) != len(gotMap) { - return false - } - - for k, v := range gotMap { - if k == "url" { - if expectedMap["name"] != v { - return false - } - } else if expectedMap[k] != v { - return false - } - } - return true -} - -func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - - expSamples := <-expSamples - require.Len(t, receivedSamples, len(expSamples)) - - for i, expSample := range expSamples { - receivedSample := receivedSamples[i] - assert.Equal(t, expSample.Metric, receivedSample.Metric) - assert.Equal(t, expSample.Type, receivedSample.Type) - - if callbackCheck, ok := expSample.Data.(func(interface{})); ok { - callbackCheck(receivedSample.Data) - continue - } - - require.IsType(t, expSample.Data, receivedSample.Data) - - switch expData := expSample.Data.(type) { - case *SampleDataSingle: - receivedData, ok := receivedSample.Data.(*SampleDataSingle) - assert.True(t, ok) - assert.JSONEq(t, string(expData.Tags), string(receivedData.Tags)) - assert.Equal(t, expData.Time, receivedData.Time) - assert.Equal(t, expData.Type, receivedData.Type) - assert.Equal(t, expData.Value, receivedData.Value) - case *SampleDataMap: - receivedData, ok := receivedSample.Data.(*SampleDataMap) - assert.True(t, ok) - assert.True(t, tagEqual(expData.Tags, receivedData.Tags)) - assert.Equal(t, expData.Time, receivedData.Time) - assert.Equal(t, expData.Type, receivedData.Type) - assert.Equal(t, expData.Values, receivedData.Values) - case *SampleDataAggregatedHTTPReqs: - receivedData, ok := receivedSample.Data.(*SampleDataAggregatedHTTPReqs) - assert.True(t, ok) - assert.JSONEq(t, string(expData.Tags), string(receivedData.Tags)) - assert.Equal(t, expData.Time, receivedData.Time) - assert.Equal(t, expData.Type, receivedData.Type) - assert.Equal(t, expData.Values, receivedData.Values) - default: - t.Errorf("Unknown data type %#v", expData) - } - } - } -} - -func skewTrail(r *rand.Rand, t httpext.Trail, minCoef, maxCoef float64) httpext.Trail { - coef := minCoef + r.Float64()*(maxCoef-minCoef) - addJitter := func(d *time.Duration) { - *d = time.Duration(float64(*d) * coef) - } - addJitter(&t.Blocked) - addJitter(&t.Connecting) - addJitter(&t.TLSHandshaking) - addJitter(&t.Sending) - addJitter(&t.Waiting) - addJitter(&t.Receiving) - t.ConnDuration = t.Connecting + t.TLSHandshaking - t.Duration = t.Sending + t.Waiting + t.Receiving - return t -} - -func TestCloudOutput(t *testing.T) { - t.Parallel() - - getTestRunner := func(minSamples int) func(t *testing.T) { - return func(t *testing.T) { - t.Parallel() - runCloudOutputTestCase(t, minSamples) - } - } - - for tcNum, minSamples := range []int{60, 75, 100} { - tcNum, minSamples := tcNum, minSamples - t.Run(fmt.Sprintf("tc%d_minSamples%d", tcNum, minSamples), func(t *testing.T) { - t.Parallel() - getTestRunner(minSamples) - }) - } -} - -func runCloudOutputTestCase(t *testing.T, minSamples int) { - seed := time.Now().UnixNano() - r := rand.New(rand.NewSource(seed)) //nolint:gosec - t.Logf("Random source seeded with %d\n", seed) - - tb := httpmultibin.NewHTTPMultiBin(t) - registry := metrics.NewRegistry() - - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - Environment: map[string]string{ - "K6_CLOUD_PUSH_REF_ID": "123", - "K6_CLOUD_METRIC_PUSH_INTERVAL": "10ms", - "K6_CLOUD_AGGREGATION_PERIOD": "30ms", - "K6_CLOUD_AGGREGATION_CALC_INTERVAL": "40ms", - "K6_CLOUD_AGGREGATION_WAIT_PERIOD": "5ms", - "K6_CLOUD_AGGREGATION_MIN_SAMPLES": strconv.Itoa(minSamples), - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(t, err) - - require.NoError(t, out.Start()) - require.Equal(t, "123", out.referenceID) - - now := time.Now() - tagMap := map[string]string{"test": "mest", "a": "b", "name": "name", "url": "name"} - tags := registry.RootTagSet().WithTagsFromMap(tagMap) - - expSamples := make(chan []Sample) - defer close(expSamples) - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) - - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - - enctags, err := json.Marshal(tags) - require.NoError(t, err) - expSamples <- []Sample{{ - Type: DataTypeSingle, - Metric: metrics.VUsName, - Data: &SampleDataSingle{ - Type: builtinMetrics.VUs.Type, - Time: toMicroSecond(now), - Tags: enctags, - Value: 1.0, - }, - }} - - simpleTrail := httpext.Trail{ - Blocked: 100 * time.Millisecond, - Connecting: 200 * time.Millisecond, - TLSHandshaking: 300 * time.Millisecond, - Sending: 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - - EndTime: now, - ConnDuration: 500 * time.Millisecond, - Duration: 1500 * time.Millisecond, - Tags: tags, - } - out.AddMetricSamples([]metrics.SampleContainer{&simpleTrail}) - expSamples <- []Sample{*NewSampleFromTrail(&simpleTrail)} - - smallSkew := 0.02 - - trails := []metrics.SampleContainer{} - durations := make([]time.Duration, 0, len(trails)) - for i := int64(0); i < out.config.AggregationMinSamples.Int64; i++ { - similarTrail := skewTrail(r, simpleTrail, 1.0, 1.0+smallSkew) - trails = append(trails, &similarTrail) - durations = append(durations, similarTrail.Duration) - } - sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) - t.Logf("Sorted durations: %#v", durations) // Useful to debug any failures, doesn't get in the way otherwise - - checkAggrMetric := func(normal time.Duration, aggr AggregatedMetric) { - assert.True(t, aggr.Min <= aggr.Avg) - assert.True(t, aggr.Avg <= aggr.Max) - assert.InEpsilon(t, normal, metrics.ToD(aggr.Min), smallSkew) - assert.InEpsilon(t, normal, metrics.ToD(aggr.Avg), smallSkew) - assert.InEpsilon(t, normal, metrics.ToD(aggr.Max), smallSkew) - } - - outlierTrail := skewTrail(r, simpleTrail, 2.0+smallSkew, 3.0+smallSkew) - trails = append(trails, &outlierTrail) - out.AddMetricSamples(trails) - expSamples <- []Sample{ - *NewSampleFromTrail(&outlierTrail), - { - Type: DataTypeAggregatedHTTPReqs, - Metric: "http_req_li_all", - Data: func(data interface{}) { - aggrData, ok := data.(*SampleDataAggregatedHTTPReqs) - assert.True(t, ok) - assert.JSONEq(t, `{"test": "mest", "a": "b", "name": "name", "url": "name"}`, string(aggrData.Tags)) - assert.Equal(t, out.config.AggregationMinSamples.Int64, int64(aggrData.Count)) - assert.Equal(t, "aggregated_trend", aggrData.Type) - assert.InDelta(t, now.UnixNano(), aggrData.Time*1000, float64(out.config.AggregationPeriod.Duration)) - - checkAggrMetric(simpleTrail.Duration, aggrData.Values.Duration) - checkAggrMetric(simpleTrail.Blocked, aggrData.Values.Blocked) - checkAggrMetric(simpleTrail.Connecting, aggrData.Values.Connecting) - checkAggrMetric(simpleTrail.TLSHandshaking, aggrData.Values.TLSHandshaking) - checkAggrMetric(simpleTrail.Sending, aggrData.Values.Sending) - checkAggrMetric(simpleTrail.Waiting, aggrData.Values.Waiting) - checkAggrMetric(simpleTrail.Receiving, aggrData.Values.Receiving) - }, - }, - } - - require.NoError(t, out.Stop()) -} - -func TestCloudOutputMaxPerPacket(t *testing.T) { - t.Parallel() - - tb := httpmultibin.NewHTTPMultiBin(t) - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(t, err) - out.config.PushRefID = null.StringFrom("12") - - maxMetricSamplesPerPackage := 20 - out.config.MaxMetricSamplesPerPackage = null.IntFrom(int64(maxMetricSamplesPerPackage)) - - now := time.Now() - registry := metrics.NewRegistry() - tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) - gotTheLimit := false - var m sync.Mutex - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), - func(_ http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - assert.True(t, len(receivedSamples) <= maxMetricSamplesPerPackage) - if len(receivedSamples) == maxMetricSamplesPerPackage { - m.Lock() - gotTheLimit = true - m.Unlock() - } - }) - - require.NoError(t, out.Start()) - - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - for j := time.Duration(1); j <= 200; j++ { - container := make([]metrics.SampleContainer, 0, 500) - for i := time.Duration(1); i <= 50; i++ { - //nolint:durationcheck - container = append(container, &httpext.Trail{ - Blocked: i % 200 * 100 * time.Millisecond, - Connecting: i % 200 * 200 * time.Millisecond, - TLSHandshaking: i % 200 * 300 * time.Millisecond, - Sending: i * i * 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - - EndTime: now.Add(i * 100), - ConnDuration: 500 * time.Millisecond, - Duration: j * i * 1500 * time.Millisecond, - Tags: tags, - }) - } - out.AddMetricSamples(container) - } - - require.NoError(t, out.Stop()) - assert.True(t, gotTheLimit) -} - -func TestCloudOutputStopSendingMetric(t *testing.T) { +func TestNewOutputNameResolution(t *testing.T) { t.Parallel() - t.Run("stop engine on error", func(t *testing.T) { - t.Parallel() - testCloudOutputStopSendingMetric(t, true) - }) - - t.Run("don't stop engine on error", func(t *testing.T) { - t.Parallel() - testCloudOutputStopSendingMetric(t, false) - }) -} - -func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) { - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) - - tb := httpmultibin.NewHTTPMultiBin(t) - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ - "host": "%s", "noCompress": true, - "maxMetricSamplesPerPackage": 50, - "name": "something-that-should-be-overwritten", - "stopOnError": %t - }`, tb.ServerHTTP.URL, stopOnError)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - External: map[string]json.RawMessage{ - "loadimpact": json.RawMessage(`{"name": "my-custom-name"}`), - }, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - var expectedTestStopFuncCalled int64 - if stopOnError { - expectedTestStopFuncCalled = 1 - } - var TestStopFuncCalled int64 - out.testStopFunc = func(error) { - atomic.AddInt64(&TestStopFuncCalled, 1) - } - require.NoError(t, err) - now := time.Now() - tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) - - count := 1 - max := 5 - tb.Mux.HandleFunc("/v1/metrics/12", func(w http.ResponseWriter, r *http.Request) { - count++ - if count == max { - type payload struct { - Error cloudapi.ErrorResponse `json:"error"` - } - res := &payload{} - res.Error = cloudapi.ErrorResponse{Code: 4} - w.Header().Set("Content-Type", "application/json") - data, err := json.Marshal(res) - if err != nil { - t.Fatal(err) - } - w.WriteHeader(http.StatusForbidden) - _, _ = w.Write(data) - return - } - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - }) - - out.config.PushRefID = null.StringFrom("12") - require.NoError(t, out.Start()) - - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - for j := time.Duration(1); j <= 200; j++ { - container := make([]metrics.SampleContainer, 0, 500) - for i := time.Duration(1); i <= 50; i++ { - //nolint:durationcheck - container = append(container, &httpext.Trail{ - Blocked: i % 200 * 100 * time.Millisecond, - Connecting: i % 200 * 200 * time.Millisecond, - TLSHandshaking: i % 200 * 300 * time.Millisecond, - Sending: i * i * 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - - EndTime: now.Add(i * 100), - ConnDuration: 500 * time.Millisecond, - Duration: j * i * 1500 * time.Millisecond, - Tags: tags, - }) - } - out.AddMetricSamples(container) - } - - require.NoError(t, out.Stop()) - - select { - case <-out.stopSendingMetrics: - // all is fine - default: - t.Fatal("sending metrics wasn't stopped") - } - require.Equal(t, max, count) - require.Equal(t, expectedTestStopFuncCalled, TestStopFuncCalled) - - nBufferSamples := len(out.bufferSamples) - nBufferHTTPTrails := len(out.bufferHTTPTrails) - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - if nBufferSamples != len(out.bufferSamples) || nBufferHTTPTrails != len(out.bufferHTTPTrails) { - t.Errorf("Output still collects data after stop sending metrics") - } -} - -func TestCloudOutputRequireScriptName(t *testing.T) { - t.Parallel() - _, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: ""}, - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "script name not set") -} - -func TestCloudOutputPushRefID(t *testing.T) { - t.Parallel() - - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) - - expSamples := make(chan []Sample) - defer close(expSamples) - - tb := httpmultibin.NewHTTPMultiBin(t) - failHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - t.Errorf("%s should not have been called at all", r.RequestURI) - }) - tb.Mux.HandleFunc("/v1/tests", failHandler) - tb.Mux.HandleFunc("/v1/tests/333", failHandler) - tb.Mux.HandleFunc("/v1/metrics/333", getSampleChecker(t, expSamples)) - - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ - "host": "%s", "noCompress": true, - "metricPushInterval": "10ms", - "aggregationPeriod": "0ms", - "pushRefID": "333" - }`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(t, err) - - assert.Equal(t, "333", out.config.PushRefID.String) - require.NoError(t, out.Start()) - assert.Equal(t, "333", out.referenceID) - - now := time.Now() - tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) - encodedTags, err := json.Marshal(tags) - require.NoError(t, err) - - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.HTTPReqDuration, - Tags: tags, - }, - Time: now, - Value: 123.45, - }}) - exp := []Sample{{ - Type: DataTypeSingle, - Metric: metrics.HTTPReqDurationName, - Data: &SampleDataSingle{ - Type: builtinMetrics.HTTPReqDuration.Type, - Time: toMicroSecond(now), - Tags: encodedTags, - Value: 123.45, - }, - }} - - select { - case expSamples <- exp: - case <-time.After(5 * time.Second): - t.Error("test timeout") - } - - require.NoError(t, out.Stop()) -} - -func TestCloudOutputRecvIterLIAllIterations(t *testing.T) { - t.Parallel() - - tb := httpmultibin.NewHTTPMultiBin(t) - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ - "host": "%s", "noCompress": true, - "maxMetricSamplesPerPackage": 50 - }`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "path/to/script.js"}, - }) - require.NoError(t, err) - - gotIterations := false - var m sync.Mutex - expValues := map[string]float64{ - "data_received": 100, - "data_sent": 200, - "iteration_duration": 60000, - "iterations": 1, - } - - tb.Mux.HandleFunc("/v1/metrics/123", func(_ http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - - assert.Len(t, receivedSamples, 1) - assert.Equal(t, "iter_li_all", receivedSamples[0].Metric) - assert.Equal(t, DataTypeMap, receivedSamples[0].Type) - data, ok := receivedSamples[0].Data.(*SampleDataMap) - assert.True(t, ok) - assert.Equal(t, expValues, data.Values) - - m.Lock() - gotIterations = true - m.Unlock() - }) - - out.config.PushRefID = null.StringFrom("123") - require.NoError(t, out.Start()) - - now := time.Now() - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - simpleNetTrail := netext.NetTrail{ - BytesRead: 100, - BytesWritten: 200, - FullIteration: true, - StartTime: now.Add(-time.Minute), - EndTime: now, - Samples: []metrics.Sample{ - { - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.DataSent, - Tags: registry.RootTagSet(), - }, - Time: now, - Value: float64(200), - }, - { - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.DataReceived, - Tags: registry.RootTagSet(), - }, - Time: now, - Value: float64(100), - }, - { - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.Iterations, - Tags: registry.RootTagSet(), - }, - Time: now, - Value: 1, - }, - }, - } - - out.AddMetricSamples([]metrics.SampleContainer{&simpleNetTrail}) - require.NoError(t, out.Stop()) - require.True(t, gotIterations) -} - -func TestNewName(t *testing.T) { - t.Parallel() - mustParse := func(u string) *url.URL { result, err := url.Parse(u) require.NoError(t, err) @@ -705,78 +68,18 @@ func TestNewName(t *testing.T) { } } -func TestPublishMetric(t *testing.T) { +func TestCloudOutputRequireScriptName(t *testing.T) { t.Parallel() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - g, err := gzip.NewReader(r.Body) - - require.NoError(t, err) - var buf bytes.Buffer - _, err = io.Copy(&buf, g) //nolint:gosec - require.NoError(t, err) - byteCount, err := strconv.Atoi(r.Header.Get("x-payload-byte-count")) - require.NoError(t, err) - require.Equal(t, buf.Len(), byteCount) - - samplesCount, err := strconv.Atoi(r.Header.Get("x-payload-sample-count")) - require.NoError(t, err) - var samples []*Sample - err = json.Unmarshal(buf.Bytes(), &samples) - require.NoError(t, err) - require.Equal(t, len(samples), samplesCount) - - _, err = fmt.Fprintf(w, "") - require.NoError(t, err) - })) - defer server.Close() - - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": false}`, server.URL)), + _, err := New(output.Params{ + Logger: testutils.NewLogger(t), ScriptOptions: lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), SystemTags: &metrics.DefaultSystemTagSet, }, - ScriptPath: &url.URL{Path: "script.js"}, - }) - require.NoError(t, err) - - samples := []*Sample{ - { - Type: "Point", - Metric: "metric", - Data: &SampleDataSingle{ - Type: 1, - Time: toMicroSecond(time.Now()), - Value: 1.2, - }, - }, - } - err = out.client.PushMetric("1", samples) - assert.NoError(t, err) -} - -func TestNewOutputClientTimeout(t *testing.T) { - t.Parallel() - ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - })) - defer ts.Close() - - out, err := newOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "timeout": "2ms"}`, ts.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(50 * time.Millisecond), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "script.js"}, + ScriptPath: &url.URL{Path: ""}, }) - require.NoError(t, err) - - err = out.client.PushMetric("testmetric", nil) require.Error(t, err) - assert.True(t, os.IsTimeout(err)) //nolint:forbidigo + assert.Contains(t, err.Error(), "script name not set") } func TestOutputCreateTestWithConfigOverwrite(t *testing.T) { diff --git a/output/cloud/bench_test.go b/output/cloud/v1/bench_test.go similarity index 99% rename from output/cloud/bench_test.go rename to output/cloud/v1/bench_test.go index 80a2d0d7694..483ff71627f 100644 --- a/output/cloud/bench_test.go +++ b/output/cloud/v1/bench_test.go @@ -26,7 +26,7 @@ import ( ) func BenchmarkAggregateHTTP(b *testing.B) { - out, err := newOutput(output.Params{ + out, err := newTestOutput(output.Params{ Logger: testutils.NewLogger(b), JSONConfig: json.RawMessage(`{"noCompress": true, "aggregationCalcInterval": "200ms","aggregationPeriod": "200ms"}`), ScriptOptions: lib.Options{ @@ -292,7 +292,7 @@ func BenchmarkHTTPPush(b *testing.B) { }, ) - out, err := newOutput(output.Params{ + out, err := newTestOutput(output.Params{ Logger: testutils.NewLogger(b), JSONConfig: json.RawMessage(fmt.Sprintf(`{ "host": "%s", diff --git a/output/cloud/cloud_easyjson.go b/output/cloud/v1/cloud_easyjson.go similarity index 100% rename from output/cloud/cloud_easyjson.go rename to output/cloud/v1/cloud_easyjson.go diff --git a/output/cloud/data.go b/output/cloud/v1/data.go similarity index 100% rename from output/cloud/data.go rename to output/cloud/v1/data.go diff --git a/output/cloud/data_test.go b/output/cloud/v1/data_test.go similarity index 100% rename from output/cloud/data_test.go rename to output/cloud/v1/data_test.go diff --git a/output/cloud/metrics_client.go b/output/cloud/v1/metrics_client.go similarity index 100% rename from output/cloud/metrics_client.go rename to output/cloud/v1/metrics_client.go diff --git a/output/cloud/v1/output.go b/output/cloud/v1/output.go new file mode 100644 index 00000000000..6f295a656b6 --- /dev/null +++ b/output/cloud/v1/output.go @@ -0,0 +1,472 @@ +// Package cloud implements an Output that flushes to the k6 Cloud platform +// using the version1 of the protocol flushing a json-based payload. +package cloud + +import ( + "net/http" + "sync" + "time" + + easyjson "github.com/mailru/easyjson" + "github.com/sirupsen/logrus" + + "go.k6.io/k6/cloudapi" + "go.k6.io/k6/errext" + "go.k6.io/k6/errext/exitcodes" + + "go.k6.io/k6/lib/netext" + "go.k6.io/k6/lib/netext/httpext" + "go.k6.io/k6/metrics" +) + +// Output sends result data to the k6 Cloud service. +type Output struct { + logger logrus.FieldLogger + config cloudapi.Config + + referenceID string + client *MetricsClient + + bufferMutex sync.Mutex + bufferHTTPTrails []*httpext.Trail + bufferSamples []*Sample + + // TODO: optimize this + // + // Since the real-time metrics refactoring (https://github.com/k6io/k6/pull/678), + // we should no longer have to handle metrics that have times long in the past. So instead of a + // map, we can probably use a simple slice (or even an array!) as a ring buffer to store the + // aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod + // checks basically O(1). And even if for some reason there are occasional metrics with past times that + // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated + aggrBuckets map[int64]aggregationBucket + + stopSendingMetrics chan struct{} + stopAggregation chan struct{} + aggregationDone *sync.WaitGroup + stopOutput chan struct{} + outputDone *sync.WaitGroup + testStopFunc func(error) +} + +// New creates a new Cloud output version 1. +func New(logger logrus.FieldLogger, conf cloudapi.Config, testAPIClient *cloudapi.Client) (*Output, error) { + return &Output{ + config: conf, + client: NewMetricsClient(testAPIClient, logger, conf.Host.String, conf.NoCompress.Bool), + aggrBuckets: map[int64]aggregationBucket{}, + logger: logger, + + stopSendingMetrics: make(chan struct{}), + stopAggregation: make(chan struct{}), + aggregationDone: &sync.WaitGroup{}, + stopOutput: make(chan struct{}), + outputDone: &sync.WaitGroup{}, + }, nil +} + +// SetReferenceID sets the passed Reference ID. +func (out *Output) SetReferenceID(id string) { + out.referenceID = id +} + +// Start starts the Output, it starts the background goroutines +// for aggregating and flushing the collected metrics samples. +func (out *Output) Start() error { + aggregationPeriod := out.config.AggregationPeriod.TimeDuration() + // If enabled, start periodically aggregating the collected HTTP trails + if aggregationPeriod > 0 { + out.aggregationDone.Add(1) + go func() { + defer out.aggregationDone.Done() + aggregationWaitPeriod := out.config.AggregationWaitPeriod.TimeDuration() + aggregationTicker := time.NewTicker(aggregationPeriod) + defer aggregationTicker.Stop() + + for { + select { + case <-out.stopSendingMetrics: + return + case <-aggregationTicker.C: + out.aggregateHTTPTrails(aggregationWaitPeriod) + case <-out.stopAggregation: + out.aggregateHTTPTrails(0) + out.flushHTTPTrails() + return + } + } + }() + } + + out.outputDone.Add(1) + go func() { + defer out.outputDone.Done() + pushTicker := time.NewTicker(out.config.MetricPushInterval.TimeDuration()) + defer pushTicker.Stop() + for { + select { + case <-out.stopSendingMetrics: + return + default: + } + select { + case <-out.stopOutput: + out.pushMetrics() + return + case <-pushTicker.C: + out.pushMetrics() + } + } + }() + + return nil +} + +// StopWithTestError gracefully stops all metric emission from the output: when +// all metric samples are emitted, it makes a cloud API call to finish the test +// run. If testErr was specified, it extracts the RunStatus from it. +func (out *Output) StopWithTestError(testErr error) error { + out.logger.Debug("Stopping the cloud output...") + close(out.stopAggregation) + out.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation + out.logger.Debug("Aggregation stopped, stopping metric emission...") + close(out.stopOutput) + out.outputDone.Wait() + out.logger.Debug("Metric emission stopped, calling cloud API...") + return nil +} + +// SetTestRunStopCallback receives the function that stops the engine on error +func (out *Output) SetTestRunStopCallback(stopFunc func(error)) { + out.testStopFunc = stopFunc +} + +// AddMetricSamples receives a set of metric samples. This method is never +// called concurrently, so it defers as much of the work as possible to the +// asynchronous goroutines initialized in Start(). +func (out *Output) AddMetricSamples(sampleContainers []metrics.SampleContainer) { + select { + case <-out.stopSendingMetrics: + return + default: + } + + if out.referenceID == "" { + return + } + + newSamples := []*Sample{} + newHTTPTrails := []*httpext.Trail{} + + for _, sampleContainer := range sampleContainers { + switch sc := sampleContainer.(type) { + case *httpext.Trail: + // Check if aggregation is enabled, + if out.config.AggregationPeriod.Duration > 0 { + newHTTPTrails = append(newHTTPTrails, sc) + } else { + newSamples = append(newSamples, NewSampleFromTrail(sc)) + } + case *netext.NetTrail: + // TODO: aggregate? + values := map[string]float64{ + metrics.DataSentName: float64(sc.BytesWritten), + metrics.DataReceivedName: float64(sc.BytesRead), + } + + if sc.FullIteration { + values[metrics.IterationDurationName] = metrics.D(sc.EndTime.Sub(sc.StartTime)) + values[metrics.IterationsName] = 1 + } + + encodedTags, err := easyjson.Marshal(sc.GetTags()) + if err != nil { + out.logger.WithError(err).Error("Encoding tags failed") + } + newSamples = append(newSamples, &Sample{ + Type: DataTypeMap, + Metric: "iter_li_all", + Data: &SampleDataMap{ + Time: toMicroSecond(sc.GetTime()), + Tags: encodedTags, + Values: values, + }, + }) + default: + for _, sample := range sampleContainer.GetSamples() { + encodedTags, err := easyjson.Marshal(sample.Tags) + if err != nil { + out.logger.WithError(err).Error("Encoding tags failed") + } + + newSamples = append(newSamples, &Sample{ + Type: DataTypeSingle, + Metric: sample.Metric.Name, + Data: &SampleDataSingle{ + Type: sample.Metric.Type, + Time: toMicroSecond(sample.Time), + Tags: encodedTags, + Value: sample.Value, + }, + }) + } + } + } + + if len(newSamples) > 0 || len(newHTTPTrails) > 0 { + out.bufferMutex.Lock() + out.bufferSamples = append(out.bufferSamples, newSamples...) + out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...) + out.bufferMutex.Unlock() + } +} + +//nolint:funlen,nestif,gocognit +func (out *Output) aggregateHTTPTrails(waitPeriod time.Duration) { + out.bufferMutex.Lock() + newHTTPTrails := out.bufferHTTPTrails + out.bufferHTTPTrails = nil + out.bufferMutex.Unlock() + + aggrPeriod := int64(out.config.AggregationPeriod.Duration) + + // Distribute all newly buffered HTTP trails into buckets and sub-buckets + for _, trail := range newHTTPTrails { + bucketID := trail.GetTime().UnixNano() / aggrPeriod + + // Get or create a time bucket for that trail period + bucket, ok := out.aggrBuckets[bucketID] + if !ok { + bucket = aggregationBucket{} + out.aggrBuckets[bucketID] = bucket + } + + subBucket, ok := bucket[trail.Tags] + if !ok { + subBucket = make([]*httpext.Trail, 0, 100) + } + bucket[trail.Tags] = append(subBucket, trail) + } + + // Which buckets are still new and we'll wait for trails to accumulate before aggregating + bucketCutoffID := time.Now().Add(-waitPeriod).UnixNano() / aggrPeriod + iqrRadius := out.config.AggregationOutlierIqrRadius.Float64 + iqrLowerCoef := out.config.AggregationOutlierIqrCoefLower.Float64 + iqrUpperCoef := out.config.AggregationOutlierIqrCoefUpper.Float64 + newSamples := []*Sample{} + + // Handle all aggregation buckets older than bucketCutoffID + for bucketID, subBucket := range out.aggrBuckets { + if bucketID > bucketCutoffID { + continue + } + + for tags, httpTrails := range subBucket { + // start := time.Now() // this is in a combination with the log at the end + trailCount := int64(len(httpTrails)) + if trailCount < out.config.AggregationMinSamples.Int64 { + for _, trail := range httpTrails { + newSamples = append(newSamples, NewSampleFromTrail(trail)) + } + continue + } + encodedTags, err := easyjson.Marshal(tags) + if err != nil { + out.logger.WithError(err).Error("Encoding tags failed") + } + + aggrData := &SampleDataAggregatedHTTPReqs{ + Time: toMicroSecond(time.Unix(0, bucketID*aggrPeriod+aggrPeriod/2)), + Type: "aggregated_trend", + Tags: encodedTags, + } + + if out.config.AggregationSkipOutlierDetection.Bool { + // Simply add up all HTTP trails, no outlier detection + for _, trail := range httpTrails { + aggrData.Add(trail) + } + } else { + connDurations := make(durations, trailCount) + reqDurations := make(durations, trailCount) + for i, trail := range httpTrails { + connDurations[i] = trail.ConnDuration + reqDurations[i] = trail.Duration + } + + var minConnDur, maxConnDur, minReqDur, maxReqDur time.Duration + if trailCount < out.config.AggregationOutlierAlgoThreshold.Int64 { + // Since there are fewer samples, we'll use the interpolation-enabled and + // more precise sorting-based algorithm + minConnDur, maxConnDur = connDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) + minReqDur, maxReqDur = reqDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) + } else { + minConnDur, maxConnDur = connDurations.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) + minReqDur, maxReqDur = reqDurations.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) + } + + for _, trail := range httpTrails { + if trail.ConnDuration < minConnDur || + trail.ConnDuration > maxConnDur || + trail.Duration < minReqDur || + trail.Duration > maxReqDur { + // Seems like an outlier, add it as a standalone metric + newSamples = append(newSamples, NewSampleFromTrail(trail)) + } else { + // Aggregate the trail + aggrData.Add(trail) + } + } + } + + aggrData.CalcAverages() + + if aggrData.Count > 0 { + newSamples = append(newSamples, &Sample{ + Type: DataTypeAggregatedHTTPReqs, + Metric: "http_req_li_all", + Data: aggrData, + }) + } + } + delete(out.aggrBuckets, bucketID) + } + + if len(newSamples) > 0 { + out.bufferMutex.Lock() + out.bufferSamples = append(out.bufferSamples, newSamples...) + out.bufferMutex.Unlock() + } +} + +func (out *Output) flushHTTPTrails() { + out.bufferMutex.Lock() + defer out.bufferMutex.Unlock() + + newSamples := []*Sample{} + for _, trail := range out.bufferHTTPTrails { + newSamples = append(newSamples, NewSampleFromTrail(trail)) + } + for _, bucket := range out.aggrBuckets { + for _, subBucket := range bucket { + for _, trail := range subBucket { + newSamples = append(newSamples, NewSampleFromTrail(trail)) + } + } + } + + out.bufferHTTPTrails = nil + out.aggrBuckets = map[int64]aggregationBucket{} + out.bufferSamples = append(out.bufferSamples, newSamples...) +} + +// shouldStopSendingMetrics returns true if the output should interrupt the metric flush. +// +// note: The actual test execution should continues, +// since for local k6 run tests the end-of-test summary (or any other outputs) will still work, +// but the cloud output doesn't send any more metrics. +// Instead, if cloudapi.Config.StopOnError is enabled +// the cloud output should stop the whole test run too. +// This logic should be handled by the caller. +func (out *Output) shouldStopSendingMetrics(err error) bool { + if err == nil { + return false + } + if errResp, ok := err.(cloudapi.ErrorResponse); ok && errResp.Response != nil { //nolint:errorlint + // The Cloud service returns the error code 4 when it doesn't accept any more metrics. + // So, when k6 sees that, the cloud output just stops prematurely. + return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 + } + + return false +} + +type pushJob struct { + done chan error + samples []*Sample +} + +// ceil(a/b) +func ceilDiv(a, b int) int { + r := a / b + if a%b != 0 { + r++ + } + return r +} + +func (out *Output) pushMetrics() { + out.bufferMutex.Lock() + if len(out.bufferSamples) == 0 { + out.bufferMutex.Unlock() + return + } + buffer := out.bufferSamples + out.bufferSamples = nil + out.bufferMutex.Unlock() + + count := len(buffer) + out.logger.WithFields(logrus.Fields{ + "samples": count, + }).Debug("Pushing metrics to cloud") + start := time.Now() + + numberOfPackages := ceilDiv(len(buffer), int(out.config.MaxMetricSamplesPerPackage.Int64)) + numberOfWorkers := int(out.config.MetricPushConcurrency.Int64) + if numberOfWorkers > numberOfPackages { + numberOfWorkers = numberOfPackages + } + + ch := make(chan pushJob, numberOfPackages) + for i := 0; i < numberOfWorkers; i++ { + go func() { + for job := range ch { + err := out.client.PushMetric(out.referenceID, job.samples) + job.done <- err + if out.shouldStopSendingMetrics(err) { + return + } + } + }() + } + + jobs := make([]pushJob, 0, numberOfPackages) + + for len(buffer) > 0 { + size := len(buffer) + if size > int(out.config.MaxMetricSamplesPerPackage.Int64) { + size = int(out.config.MaxMetricSamplesPerPackage.Int64) + } + job := pushJob{done: make(chan error, 1), samples: buffer[:size]} + ch <- job + jobs = append(jobs, job) + buffer = buffer[size:] + } + + close(ch) + + for _, job := range jobs { + err := <-job.done + if err != nil { + if out.shouldStopSendingMetrics(err) { + out.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error") + serr := errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), + errext.AbortedByOutput, + ) + if out.config.StopOnError.Bool { + out.testStopFunc(serr) + } + close(out.stopSendingMetrics) + break + } + out.logger.WithError(err).Warn("Failed to send metrics to cloud") + } + } + out.logger.WithFields(logrus.Fields{ + "samples": count, + "t": time.Since(start), + }).Debug("Pushing metrics to cloud finished") +} + +const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate diff --git a/output/cloud/v1/output_test.go b/output/cloud/v1/output_test.go new file mode 100644 index 00000000000..7d312c1ca5a --- /dev/null +++ b/output/cloud/v1/output_test.go @@ -0,0 +1,727 @@ +package cloud + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "net/http/httptest" + "net/url" + "os" + "sort" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v3" + + "go.k6.io/k6/cloudapi" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/consts" + "go.k6.io/k6/lib/netext" + "go.k6.io/k6/lib/netext/httpext" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/lib/testutils/httpmultibin" + "go.k6.io/k6/lib/types" + "go.k6.io/k6/metrics" + "go.k6.io/k6/output" +) + +func tagEqual(expected, got json.RawMessage) bool { + var expectedMap, gotMap map[string]string + err := json.Unmarshal(expected, &expectedMap) + if err != nil { + panic("tagEqual: " + err.Error()) + } + + err = json.Unmarshal(got, &gotMap) + if err != nil { + panic("tagEqual: " + err.Error()) + } + + if len(expectedMap) != len(gotMap) { + return false + } + + for k, v := range gotMap { + if k == "url" { + if expectedMap["name"] != v { + return false + } + } else if expectedMap[k] != v { + return false + } + } + return true +} + +func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + assert.NoError(t, err) + receivedSamples := []Sample{} + assert.NoError(t, json.Unmarshal(body, &receivedSamples)) + + expSamples := <-expSamples + require.Len(t, receivedSamples, len(expSamples)) + + for i, expSample := range expSamples { + receivedSample := receivedSamples[i] + assert.Equal(t, expSample.Metric, receivedSample.Metric) + assert.Equal(t, expSample.Type, receivedSample.Type) + + if callbackCheck, ok := expSample.Data.(func(interface{})); ok { + callbackCheck(receivedSample.Data) + continue + } + + require.IsType(t, expSample.Data, receivedSample.Data) + + switch expData := expSample.Data.(type) { + case *SampleDataSingle: + receivedData, ok := receivedSample.Data.(*SampleDataSingle) + assert.True(t, ok) + assert.JSONEq(t, string(expData.Tags), string(receivedData.Tags)) + assert.Equal(t, expData.Time, receivedData.Time) + assert.Equal(t, expData.Type, receivedData.Type) + assert.Equal(t, expData.Value, receivedData.Value) + case *SampleDataMap: + receivedData, ok := receivedSample.Data.(*SampleDataMap) + assert.True(t, ok) + assert.True(t, tagEqual(expData.Tags, receivedData.Tags)) + assert.Equal(t, expData.Time, receivedData.Time) + assert.Equal(t, expData.Type, receivedData.Type) + assert.Equal(t, expData.Values, receivedData.Values) + case *SampleDataAggregatedHTTPReqs: + receivedData, ok := receivedSample.Data.(*SampleDataAggregatedHTTPReqs) + assert.True(t, ok) + assert.JSONEq(t, string(expData.Tags), string(receivedData.Tags)) + assert.Equal(t, expData.Time, receivedData.Time) + assert.Equal(t, expData.Type, receivedData.Type) + assert.Equal(t, expData.Values, receivedData.Values) + default: + t.Errorf("Unknown data type %#v", expData) + } + } + } +} + +func skewTrail(r *rand.Rand, t httpext.Trail, minCoef, maxCoef float64) httpext.Trail { + coef := minCoef + r.Float64()*(maxCoef-minCoef) + addJitter := func(d *time.Duration) { + *d = time.Duration(float64(*d) * coef) + } + addJitter(&t.Blocked) + addJitter(&t.Connecting) + addJitter(&t.TLSHandshaking) + addJitter(&t.Sending) + addJitter(&t.Waiting) + addJitter(&t.Receiving) + t.ConnDuration = t.Connecting + t.TLSHandshaking + t.Duration = t.Sending + t.Waiting + t.Receiving + return t +} + +func TestCloudOutput(t *testing.T) { + t.Parallel() + + getTestRunner := func(minSamples int) func(t *testing.T) { + return func(t *testing.T) { + t.Parallel() + runCloudOutputTestCase(t, minSamples) + } + } + + for tcNum, minSamples := range []int{60, 75, 100} { + tcNum, minSamples := tcNum, minSamples + t.Run(fmt.Sprintf("tc%d_minSamples%d", tcNum, minSamples), func(t *testing.T) { + t.Parallel() + getTestRunner(minSamples) + }) + } +} + +func runCloudOutputTestCase(t *testing.T, minSamples int) { + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + t.Logf("Random source seeded with %d\n", seed) + + tb := httpmultibin.NewHTTPMultiBin(t) + registry := metrics.NewRegistry() + + builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + }, + Environment: map[string]string{ + "K6_CLOUD_PUSH_REF_ID": "123", + "K6_CLOUD_METRIC_PUSH_INTERVAL": "10ms", + "K6_CLOUD_AGGREGATION_PERIOD": "30ms", + "K6_CLOUD_AGGREGATION_CALC_INTERVAL": "40ms", + "K6_CLOUD_AGGREGATION_WAIT_PERIOD": "5ms", + "K6_CLOUD_AGGREGATION_MIN_SAMPLES": strconv.Itoa(minSamples), + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + require.NoError(t, err) + + out.SetReferenceID("123") + require.NoError(t, out.Start()) + + now := time.Now() + tagMap := map[string]string{"test": "mest", "a": "b", "name": "name", "url": "name"} + tags := registry.RootTagSet().WithTagsFromMap(tagMap) + + expSamples := make(chan []Sample) + defer close(expSamples) + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) + + out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.VUs, + Tags: tags, + }, + Time: now, + Value: 1.0, + }}) + + enctags, err := json.Marshal(tags) + require.NoError(t, err) + expSamples <- []Sample{{ + Type: DataTypeSingle, + Metric: metrics.VUsName, + Data: &SampleDataSingle{ + Type: builtinMetrics.VUs.Type, + Time: toMicroSecond(now), + Tags: enctags, + Value: 1.0, + }, + }} + + simpleTrail := httpext.Trail{ + Blocked: 100 * time.Millisecond, + Connecting: 200 * time.Millisecond, + TLSHandshaking: 300 * time.Millisecond, + Sending: 400 * time.Millisecond, + Waiting: 500 * time.Millisecond, + Receiving: 600 * time.Millisecond, + + EndTime: now, + ConnDuration: 500 * time.Millisecond, + Duration: 1500 * time.Millisecond, + Tags: tags, + } + out.AddMetricSamples([]metrics.SampleContainer{&simpleTrail}) + expSamples <- []Sample{*NewSampleFromTrail(&simpleTrail)} + + smallSkew := 0.02 + + trails := []metrics.SampleContainer{} + durations := make([]time.Duration, 0, len(trails)) + for i := int64(0); i < out.config.AggregationMinSamples.Int64; i++ { + similarTrail := skewTrail(r, simpleTrail, 1.0, 1.0+smallSkew) + trails = append(trails, &similarTrail) + durations = append(durations, similarTrail.Duration) + } + sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) + t.Logf("Sorted durations: %#v", durations) // Useful to debug any failures, doesn't get in the way otherwise + + checkAggrMetric := func(normal time.Duration, aggr AggregatedMetric) { + assert.True(t, aggr.Min <= aggr.Avg) + assert.True(t, aggr.Avg <= aggr.Max) + assert.InEpsilon(t, normal, metrics.ToD(aggr.Min), smallSkew) + assert.InEpsilon(t, normal, metrics.ToD(aggr.Avg), smallSkew) + assert.InEpsilon(t, normal, metrics.ToD(aggr.Max), smallSkew) + } + + outlierTrail := skewTrail(r, simpleTrail, 2.0+smallSkew, 3.0+smallSkew) + trails = append(trails, &outlierTrail) + out.AddMetricSamples(trails) + expSamples <- []Sample{ + *NewSampleFromTrail(&outlierTrail), + { + Type: DataTypeAggregatedHTTPReqs, + Metric: "http_req_li_all", + Data: func(data interface{}) { + aggrData, ok := data.(*SampleDataAggregatedHTTPReqs) + assert.True(t, ok) + assert.JSONEq(t, `{"test": "mest", "a": "b", "name": "name", "url": "name"}`, string(aggrData.Tags)) + assert.Equal(t, out.config.AggregationMinSamples.Int64, int64(aggrData.Count)) + assert.Equal(t, "aggregated_trend", aggrData.Type) + assert.InDelta(t, now.UnixNano(), aggrData.Time*1000, float64(out.config.AggregationPeriod.Duration)) + + checkAggrMetric(simpleTrail.Duration, aggrData.Values.Duration) + checkAggrMetric(simpleTrail.Blocked, aggrData.Values.Blocked) + checkAggrMetric(simpleTrail.Connecting, aggrData.Values.Connecting) + checkAggrMetric(simpleTrail.TLSHandshaking, aggrData.Values.TLSHandshaking) + checkAggrMetric(simpleTrail.Sending, aggrData.Values.Sending) + checkAggrMetric(simpleTrail.Waiting, aggrData.Values.Waiting) + checkAggrMetric(simpleTrail.Receiving, aggrData.Values.Receiving) + }, + }, + } + + require.NoError(t, out.StopWithTestError(nil)) +} + +func TestCloudOutputMaxPerPacket(t *testing.T) { + t.Parallel() + + tb := httpmultibin.NewHTTPMultiBin(t) + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + require.NoError(t, err) + out.SetReferenceID("12") + + maxMetricSamplesPerPackage := 20 + out.config.MaxMetricSamplesPerPackage = null.IntFrom(int64(maxMetricSamplesPerPackage)) + + now := time.Now() + registry := metrics.NewRegistry() + tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) + gotTheLimit := false + var m sync.Mutex + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), + func(_ http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + assert.NoError(t, err) + receivedSamples := []Sample{} + assert.NoError(t, json.Unmarshal(body, &receivedSamples)) + assert.True(t, len(receivedSamples) <= maxMetricSamplesPerPackage) + if len(receivedSamples) == maxMetricSamplesPerPackage { + m.Lock() + gotTheLimit = true + m.Unlock() + } + }) + + require.NoError(t, out.Start()) + + builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.VUs, + Tags: tags, + }, + Time: now, + Value: 1.0, + }}) + for j := time.Duration(1); j <= 200; j++ { + container := make([]metrics.SampleContainer, 0, 500) + for i := time.Duration(1); i <= 50; i++ { + //nolint:durationcheck + container = append(container, &httpext.Trail{ + Blocked: i % 200 * 100 * time.Millisecond, + Connecting: i % 200 * 200 * time.Millisecond, + TLSHandshaking: i % 200 * 300 * time.Millisecond, + Sending: i * i * 400 * time.Millisecond, + Waiting: 500 * time.Millisecond, + Receiving: 600 * time.Millisecond, + + EndTime: now.Add(i * 100), + ConnDuration: 500 * time.Millisecond, + Duration: j * i * 1500 * time.Millisecond, + Tags: tags, + }) + } + out.AddMetricSamples(container) + } + + require.NoError(t, out.StopWithTestError(nil)) + assert.True(t, gotTheLimit) +} + +func TestCloudOutputStopSendingMetric(t *testing.T) { + t.Parallel() + t.Run("stop engine on error", func(t *testing.T) { + t.Parallel() + testCloudOutputStopSendingMetric(t, true) + }) + + t.Run("don't stop engine on error", func(t *testing.T) { + t.Parallel() + testCloudOutputStopSendingMetric(t, false) + }) +} + +func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) { + registry := metrics.NewRegistry() + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + + tb := httpmultibin.NewHTTPMultiBin(t) + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50, + "name": "something-that-should-be-overwritten", + "stopOnError": %t + }`, tb.ServerHTTP.URL, stopOnError)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + External: map[string]json.RawMessage{ + "loadimpact": json.RawMessage(`{"name": "my-custom-name"}`), + }, + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + var expectedTestStopFuncCalled int64 + if stopOnError { + expectedTestStopFuncCalled = 1 + } + var TestStopFuncCalled int64 + out.testStopFunc = func(error) { + atomic.AddInt64(&TestStopFuncCalled, 1) + } + require.NoError(t, err) + now := time.Now() + tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) + + count := 1 + max := 5 + tb.Mux.HandleFunc("/v1/metrics/12", func(w http.ResponseWriter, r *http.Request) { + count++ + if count == max { + type payload struct { + Error cloudapi.ErrorResponse `json:"error"` + } + res := &payload{} + res.Error = cloudapi.ErrorResponse{Code: 4} + w.Header().Set("Content-Type", "application/json") + data, err := json.Marshal(res) + if err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write(data) + return + } + body, err := io.ReadAll(r.Body) + assert.NoError(t, err) + receivedSamples := []Sample{} + assert.NoError(t, json.Unmarshal(body, &receivedSamples)) + }) + + out.SetReferenceID("12") + require.NoError(t, out.Start()) + + out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.VUs, + Tags: tags, + }, + Time: now, + Value: 1.0, + }}) + for j := time.Duration(1); j <= 200; j++ { + container := make([]metrics.SampleContainer, 0, 500) + for i := time.Duration(1); i <= 50; i++ { + //nolint:durationcheck + container = append(container, &httpext.Trail{ + Blocked: i % 200 * 100 * time.Millisecond, + Connecting: i % 200 * 200 * time.Millisecond, + TLSHandshaking: i % 200 * 300 * time.Millisecond, + Sending: i * i * 400 * time.Millisecond, + Waiting: 500 * time.Millisecond, + Receiving: 600 * time.Millisecond, + + EndTime: now.Add(i * 100), + ConnDuration: 500 * time.Millisecond, + Duration: j * i * 1500 * time.Millisecond, + Tags: tags, + }) + } + out.AddMetricSamples(container) + } + + require.NoError(t, out.StopWithTestError(nil)) + + select { + case <-out.stopSendingMetrics: + // all is fine + default: + t.Fatal("sending metrics wasn't stopped") + } + require.Equal(t, max, count) + require.Equal(t, expectedTestStopFuncCalled, TestStopFuncCalled) + + nBufferSamples := len(out.bufferSamples) + nBufferHTTPTrails := len(out.bufferHTTPTrails) + out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.VUs, + Tags: tags, + }, + Time: now, + Value: 1.0, + }}) + if nBufferSamples != len(out.bufferSamples) || nBufferHTTPTrails != len(out.bufferHTTPTrails) { + t.Errorf("Output still collects data after stop sending metrics") + } +} + +func TestCloudOutputPushRefID(t *testing.T) { + t.Parallel() + + registry := metrics.NewRegistry() + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + + expSamples := make(chan []Sample) + defer close(expSamples) + + tb := httpmultibin.NewHTTPMultiBin(t) + failHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Errorf("%s should not have been called at all", r.RequestURI) + }) + tb.Mux.HandleFunc("/v1/tests", failHandler) + tb.Mux.HandleFunc("/v1/tests/333", failHandler) + tb.Mux.HandleFunc("/v1/metrics/333", getSampleChecker(t, expSamples)) + + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "metricPushInterval": "10ms", + "aggregationPeriod": "0ms", + "pushRefID": "333" + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + require.NoError(t, err) + + out.SetReferenceID("333") + require.NoError(t, out.Start()) + + now := time.Now() + tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) + encodedTags, err := json.Marshal(tags) + require.NoError(t, err) + + out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.HTTPReqDuration, + Tags: tags, + }, + Time: now, + Value: 123.45, + }}) + exp := []Sample{{ + Type: DataTypeSingle, + Metric: metrics.HTTPReqDurationName, + Data: &SampleDataSingle{ + Type: builtinMetrics.HTTPReqDuration.Type, + Time: toMicroSecond(now), + Tags: encodedTags, + Value: 123.45, + }, + }} + + select { + case expSamples <- exp: + case <-time.After(5 * time.Second): + t.Error("test timeout") + } + + require.NoError(t, out.StopWithTestError(nil)) +} + +func TestCloudOutputRecvIterLIAllIterations(t *testing.T) { + t.Parallel() + + tb := httpmultibin.NewHTTPMultiBin(t) + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50 + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "path/to/script.js"}, + }) + require.NoError(t, err) + + gotIterations := false + var m sync.Mutex + expValues := map[string]float64{ + "data_received": 100, + "data_sent": 200, + "iteration_duration": 60000, + "iterations": 1, + } + + tb.Mux.HandleFunc("/v1/metrics/123", func(_ http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + assert.NoError(t, err) + + receivedSamples := []Sample{} + assert.NoError(t, json.Unmarshal(body, &receivedSamples)) + + assert.Len(t, receivedSamples, 1) + assert.Equal(t, "iter_li_all", receivedSamples[0].Metric) + assert.Equal(t, DataTypeMap, receivedSamples[0].Type) + data, ok := receivedSamples[0].Data.(*SampleDataMap) + assert.True(t, ok) + assert.Equal(t, expValues, data.Values) + + m.Lock() + gotIterations = true + m.Unlock() + }) + + out.SetReferenceID("123") + require.NoError(t, out.Start()) + + now := time.Now() + registry := metrics.NewRegistry() + builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + simpleNetTrail := netext.NetTrail{ + BytesRead: 100, + BytesWritten: 200, + FullIteration: true, + StartTime: now.Add(-time.Minute), + EndTime: now, + Samples: []metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.DataSent, + Tags: registry.RootTagSet(), + }, + Time: now, + Value: float64(200), + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.DataReceived, + Tags: registry.RootTagSet(), + }, + Time: now, + Value: float64(100), + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.Iterations, + Tags: registry.RootTagSet(), + }, + Time: now, + Value: 1, + }, + }, + } + + out.AddMetricSamples([]metrics.SampleContainer{&simpleNetTrail}) + require.NoError(t, out.StopWithTestError(nil)) + require.True(t, gotIterations) +} + +func TestPublishMetric(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + g, err := gzip.NewReader(r.Body) + + require.NoError(t, err) + var buf bytes.Buffer + _, err = io.Copy(&buf, g) //nolint:gosec + require.NoError(t, err) + byteCount, err := strconv.Atoi(r.Header.Get("x-payload-byte-count")) + require.NoError(t, err) + require.Equal(t, buf.Len(), byteCount) + + samplesCount, err := strconv.Atoi(r.Header.Get("x-payload-sample-count")) + require.NoError(t, err) + var samples []*Sample + err = json.Unmarshal(buf.Bytes(), &samples) + require.NoError(t, err) + require.Equal(t, len(samples), samplesCount) + + _, err = fmt.Fprintf(w, "") + require.NoError(t, err) + })) + defer server.Close() + + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": false}`, server.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "script.js"}, + }) + require.NoError(t, err) + + samples := []*Sample{ + { + Type: "Point", + Metric: "metric", + Data: &SampleDataSingle{ + Type: 1, + Time: toMicroSecond(time.Now()), + Value: 1.2, + }, + }, + } + err = out.client.PushMetric("1", samples) + assert.NoError(t, err) +} + +func TestNewOutputClientTimeout(t *testing.T) { + t.Parallel() + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + })) + defer ts.Close() + + out, err := newTestOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "timeout": "2ms"}`, ts.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(50 * time.Millisecond), + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "script.js"}, + }) + require.NoError(t, err) + + err = out.client.PushMetric("testmetric", nil) + assert.True(t, os.IsTimeout(err)) //nolint:forbidigo +} + +func newTestOutput(params output.Params) (*Output, error) { + conf, err := cloudapi.GetConsolidatedConfig( + params.JSONConfig, params.Environment, params.ConfigArgument, params.ScriptOptions.External) + if err != nil { + return nil, err + } + + apiClient := cloudapi.NewClient( + params.Logger, conf.Token.String, conf.Host.String, + consts.Version, conf.Timeout.TimeDuration()) + + return New(params.Logger, conf, apiClient) +}