Skip to content

Commit

Permalink
Handle the interrupt case when it get an error
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed May 12, 2023
1 parent 401dbc9 commit 4c09c5e
Showing 1 changed file with 50 additions and 5 deletions.
55 changes: 50 additions & 5 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ package expv2

import (
"context"
"net/http"
"time"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
Expand Down Expand Up @@ -40,6 +43,9 @@ type Output struct {
// a sequential ID for metrics
// then we could reuse the same strategy here
activeSeries map[*metrics.Metric]aggregatedSamples

testStopFunc func(error)
stopSendingMetrics chan struct{}
}

// New creates a new cloud output.
Expand All @@ -57,10 +63,11 @@ func New(logger logrus.FieldLogger, conf cloudapi.Config) (*Output, error) {
// }

return &Output{
config: conf,
metricsFlusher: mc,
logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}),
activeSeries: make(map[*metrics.Metric]aggregatedSamples),
config: conf,
metricsFlusher: mc,
logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}),
activeSeries: make(map[*metrics.Metric]aggregatedSamples),
stopSendingMetrics: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -93,6 +100,11 @@ func (o *Output) SetReferenceID(refID string) {
o.referenceID = refID
}

// SetTestRunStopCallback receives the function that stops the engine on error
func (o *Output) SetTestRunStopCallback(stopFunc func(error)) {
o.testStopFunc = stopFunc
}

// AddMetricSamples receives a set of metric samples.
func (o *Output) collectMetrics() {
if o.referenceID == "" {
Expand Down Expand Up @@ -130,13 +142,46 @@ func (o *Output) collectMetrics() {

err := o.metricsFlusher.Push(ctx, o.referenceID, &pbcloud.MetricSet{Metrics: metricSet})
if err != nil {
o.logger.WithError(err).Error("failed to push metrics to the cloud")
o.logger.WithError(err).Error("Failed to push metrics to the cloud")

if o.shouldStopSendingMetrics(err) {
o.logger.WithError(err).Warn("Interrupt sending metrics to cloud due to an error")
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)
if o.config.StopOnError.Bool {
o.testStopFunc(serr)
}
close(o.stopSendingMetrics)
}
return
}

o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered samples to the cloud")
}

// shouldStopSendingMetrics returns true if the output should interrupt the metric flush.
//
// note: The actual test execution should continues,
// since for local k6 run tests the end-of-test summary (or any other outputs) will still work,
// but the cloud output doesn't send any more metrics.
// Instead, if cloudapi.Config.StopOnError is enabled
// the cloud output should stop the whole test run too.
// This logic should be handled by the caller.
func (o *Output) shouldStopSendingMetrics(err error) bool {
if err == nil {
return false
}
if errResp, ok := err.(cloudapi.ErrorResponse); ok && errResp.Response != nil { //nolint:errorlint
// The Cloud service returns the error code 4 when it doesn't accept any more metrics.
// So, when k6 sees that, the cloud output just stops prematurely.
return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4
}

return false
}

// collectSamples drain the buffer and collect all the samples
func (o *Output) collectSamples(containers []metrics.SampleContainer) {
var (
Expand Down

0 comments on commit 4c09c5e

Please sign in to comment.