diff --git a/output/cloud/v1/output.go b/output/cloud/v1/output.go index a8554c0f3eb2..8de6c7a76985 100644 --- a/output/cloud/v1/output.go +++ b/output/cloud/v1/output.go @@ -3,16 +3,21 @@ package cloud import ( + "context" "net/http" + "strconv" "sync" "time" - easyjson "github.com/mailru/easyjson" + "github.com/mailru/easyjson" "github.com/sirupsen/logrus" "go.k6.io/k6/cloudapi" + "go.k6.io/k6/cloudapi/insights" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/lib/types" + insightsOutput "go.k6.io/k6/output/cloud/insights" "go.k6.io/k6/lib/netext" "go.k6.io/k6/lib/netext/httpext" @@ -34,6 +39,10 @@ type Output struct { bufferHTTPTrails []*httpext.Trail bufferSamples []*Sample + insightsClient insightsOutput.Client + requestMetadatasCollector insightsOutput.RequestMetadatasCollector + requestMetadatasFlusher insightsOutput.RequestMetadatasFlusher + // TODO: optimize this // // Since the real-time metrics refactoring (https://github.com/k6io/k6/pull/678), @@ -101,6 +110,49 @@ func (out *Output) Start() error { }() } + if insightsOutput.Enabled(out.config) { + testRunID, err := strconv.ParseInt(out.referenceID, 10, 64) + if err != nil { + return err + } + out.requestMetadatasCollector = insightsOutput.NewCollector(testRunID) + + insightsClientConfig := insights.ClientConfig{ + IngesterHost: out.config.TracesHost.String, + Timeout: types.NewNullDuration(90*time.Second, false), + AuthConfig: insights.ClientAuthConfig{ + Enabled: true, + TestRunID: testRunID, + Token: out.config.Token.String, + RequireTransportSecurity: true, + }, + TLSConfig: insights.ClientTLSConfig{ + Insecure: false, + }, + RetryConfig: insights.ClientRetryConfig{ + RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`, + MaxAttempts: 3, + PerRetryTimeout: 30 * time.Second, + BackoffConfig: insights.ClientBackoffConfig{ + Enabled: true, + JitterFraction: 0.1, + WaitBetween: 1 * time.Second, + }, + }, + } + insightsClient := insights.NewClient(insightsClientConfig) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := insightsClient.Dial(ctx); err != nil { + return err + } + + out.insightsClient = insightsClient + out.requestMetadatasFlusher = insightsOutput.NewFlusher(insightsClient, out.requestMetadatasCollector) + out.runFlushRequestMetadatas() + } + out.outputDone.Add(1) go func() { defer out.outputDone.Done() @@ -136,6 +188,11 @@ func (out *Output) StopWithTestError(testErr error) error { close(out.stopOutput) out.outputDone.Wait() out.logger.Debug("Metric emission stopped, calling cloud API...") + if insightsOutput.Enabled(out.config) { + if err := out.insightsClient.Close(); err != nil { + out.logger.WithError(err).Error("Failed to close the insights client") + } + } return nil } @@ -222,6 +279,10 @@ func (out *Output) AddMetricSamples(sampleContainers []metrics.SampleContainer) out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...) out.bufferMutex.Unlock() } + + if insightsOutput.Enabled(out.config) { + out.requestMetadatasCollector.CollectRequestMetadatas(sampleContainers) + } } //nolint:funlen,nestif,gocognit @@ -472,4 +533,42 @@ func (out *Output) pushMetrics() { }).Debug("Pushing metrics to cloud finished") } +func (out *Output) runFlushRequestMetadatas() { + t := time.NewTicker(out.config.TracesPushInterval.TimeDuration()) + + for i := int64(0); i < out.config.TracesPushConcurrency.Int64; i++ { + out.outputDone.Add(1) + go func() { + defer out.outputDone.Done() + defer t.Stop() + + for { + select { + case <-out.stopSendingMetrics: + return + default: + } + select { + case <-out.stopOutput: + out.flushRequestMetadatas() + return + case <-t.C: + out.flushRequestMetadatas() + } + } + }() + } +} + +func (out *Output) flushRequestMetadatas() { + start := time.Now() + + err := out.requestMetadatasFlusher.Flush() + if err != nil { + out.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud") + } + + out.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud") +} + const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate