Skip to content

Commit

Permalink
Integrate request metadata output to cloud output v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Blinkuu committed Jul 17, 2023
1 parent 47c78a5 commit d7395ae
Showing 1 changed file with 100 additions and 1 deletion.
101 changes: 100 additions & 1 deletion output/cloud/v1/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
package cloud

import (
"context"
"net/http"
"strconv"
"sync"
"time"

easyjson "github.com/mailru/easyjson"
"github.com/mailru/easyjson"
"github.com/sirupsen/logrus"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib/types"
insightsOutput "go.k6.io/k6/output/cloud/insights"

"go.k6.io/k6/lib/netext"
"go.k6.io/k6/lib/netext/httpext"
Expand All @@ -34,6 +39,10 @@ type Output struct {
bufferHTTPTrails []*httpext.Trail
bufferSamples []*Sample

insightsClient insightsOutput.Client
requestMetadatasCollector insightsOutput.RequestMetadatasCollector
requestMetadatasFlusher insightsOutput.RequestMetadatasFlusher

// TODO: optimize this
//
// Since the real-time metrics refactoring (https://github.com/k6io/k6/pull/678),
Expand Down Expand Up @@ -101,6 +110,49 @@ func (out *Output) Start() error {
}()
}

if insightsOutput.Enabled(out.config) {
testRunID, err := strconv.ParseInt(out.referenceID, 10, 64)
if err != nil {
return err
}
out.requestMetadatasCollector = insightsOutput.NewCollector(testRunID)

insightsClientConfig := insights.ClientConfig{
IngesterHost: out.config.TracesHost.String,
Timeout: types.NewNullDuration(90*time.Second, false),
AuthConfig: insights.ClientAuthConfig{
Enabled: true,
TestRunID: testRunID,
Token: out.config.Token.String,
RequireTransportSecurity: true,
},
TLSConfig: insights.ClientTLSConfig{
Insecure: false,
},
RetryConfig: insights.ClientRetryConfig{
RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`,
MaxAttempts: 3,
PerRetryTimeout: 30 * time.Second,
BackoffConfig: insights.ClientBackoffConfig{
Enabled: true,
JitterFraction: 0.1,
WaitBetween: 1 * time.Second,
},
},
}
insightsClient := insights.NewClient(insightsClientConfig)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := insightsClient.Dial(ctx); err != nil {
return err
}

out.insightsClient = insightsClient
out.requestMetadatasFlusher = insightsOutput.NewFlusher(insightsClient, out.requestMetadatasCollector)
out.runFlushRequestMetadatas()
}

out.outputDone.Add(1)
go func() {
defer out.outputDone.Done()
Expand Down Expand Up @@ -136,6 +188,11 @@ func (out *Output) StopWithTestError(testErr error) error {
close(out.stopOutput)
out.outputDone.Wait()
out.logger.Debug("Metric emission stopped, calling cloud API...")
if insightsOutput.Enabled(out.config) {
if err := out.insightsClient.Close(); err != nil {
out.logger.WithError(err).Error("Failed to close the insights client")
}
}
return nil
}

Expand Down Expand Up @@ -222,6 +279,10 @@ func (out *Output) AddMetricSamples(sampleContainers []metrics.SampleContainer)
out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...)
out.bufferMutex.Unlock()
}

if insightsOutput.Enabled(out.config) {
out.requestMetadatasCollector.CollectRequestMetadatas(sampleContainers)
}
}

//nolint:funlen,nestif,gocognit
Expand Down Expand Up @@ -472,4 +533,42 @@ func (out *Output) pushMetrics() {
}).Debug("Pushing metrics to cloud finished")
}

func (out *Output) runFlushRequestMetadatas() {
t := time.NewTicker(out.config.TracesPushInterval.TimeDuration())

for i := int64(0); i < out.config.TracesPushConcurrency.Int64; i++ {
out.outputDone.Add(1)
go func() {
defer out.outputDone.Done()
defer t.Stop()

for {
select {
case <-out.stopSendingMetrics:
return
default:
}
select {
case <-out.stopOutput:
out.flushRequestMetadatas()
return
case <-t.C:
out.flushRequestMetadatas()
}
}
}()
}
}

func (out *Output) flushRequestMetadatas() {
start := time.Now()

err := out.requestMetadatasFlusher.Flush()
if err != nil {
out.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud")
}

out.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud")
}

const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate

0 comments on commit d7395ae

Please sign in to comment.