Skip to content

Commit

Permalink
cloud: New binary-based output
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Mar 8, 2023
1 parent cd71f92 commit a735438
Show file tree
Hide file tree
Showing 8 changed files with 2,063 additions and 8 deletions.
59 changes: 59 additions & 0 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,25 @@ func getCloudTestEndChecker(
return srv
}

func getCloudMetricsServer(t *testing.T, testRunID int) *httptest.Server {
metricsFlushed := false
testStart := cloudTestStartSimple(t, testRunID)

srv := getTestServer(t, map[string]http.Handler{
"POST ^/v1/tests$": testStart,
fmt.Sprintf("POST ^/v2/metrics/%d$", testRunID): http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
metricsFlushed = true
}),
})

t.Cleanup(func() {
assert.Truef(t, metricsFlushed, "expected test to have called the cloud API endpoint to flush the metrics")
srv.Close()
})

return srv
}

func getSimpleCloudOutputTestState(
t *testing.T, script string, cliFlags []string,
expRunStatus cloudapi.RunStatus, expResultStatus cloudapi.ResultStatus, expExitCode exitcodes.ExitCode,
Expand Down Expand Up @@ -1881,3 +1900,43 @@ func TestRunStaticArchives(t *testing.T) {
})
}
}

func TestCloudOutputV2(t *testing.T) {
t.Parallel()
script := `
import { sleep } from 'k6';
export let options = {
scenarios: {
sc1: {
executor: 'per-vu-iterations',
vus: 1, iterations: 5,
}
},
thresholds: {
'iterations': ['count == 5'],
},
};
export default function () { };
`
cliFlags := []string{"-v", "--log-output=stdout", "--out", "cloud"}

srv := getCloudMetricsServer(t, 123)
ts := getSingleFileTestState(t, script, cliFlags, 0)
ts.Env["K6_CLOUD_HOST"] = srv.URL

cmd.ExecuteWithGlobalState(ts.GlobalState)

stdout := ts.Stdout.String()
t.Log(stdout)

assert.Contains(t, stdout, `execution: local`)
assert.Contains(t, stdout, `output: cloud (https://app.k6.io/runs/123)`)
assert.Contains(t, stdout, `Started!" output=cloudv2`)
assert.Contains(t, stdout, `✓ iterations...........: 5`)
assert.Contains(t, stdout, `Successfully flushed buffered samples to the cloud`)
assert.Contains(t, stdout, `Cloud output successfully stopped!`)
assert.Contains(t, stdout, `Stopped!" output=cloudv2`)

assert.NotContains(t, stdout, `failed to flush metrics`)
}
106 changes: 106 additions & 0 deletions output/cloud/expv2/metrics_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package expv2

import (
"bytes"
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/golang/snappy"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"

"go.k6.io/k6/lib/consts"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type httpDoer interface {
Do(*http.Request) (*http.Response, error)
}

// MetricsClient is a wrapper around the cloudapi.Client that is also capable of pushing
type MetricsClient struct {
httpClient httpDoer
logger logrus.FieldLogger
host string
userAgent string

pushBufferPool sync.Pool
}

// NewMetricsClient creates and initializes a new MetricsClient.
func NewMetricsClient(logger logrus.FieldLogger, host string) *MetricsClient {
return &MetricsClient{
httpClient: &http.Client{Timeout: 5 * time.Second},
logger: logger,
host: host,
userAgent: fmt.Sprintf("k6cloud/v%s", consts.Version),
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}

// Push pushes the provided metric samples for the given referenceID
func (mc *MetricsClient) Push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error {
if referenceID == "" {
return fmt.Errorf("a Reference ID of the test run is required")
}
start := time.Now()
url := fmt.Sprintf("%s/v2/metrics/%s", mc.host, referenceID)

b, err := newRequestBody(samples)
if err != nil {
return err
}

buf, _ := mc.pushBufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer mc.pushBufferPool.Put(buf)

_, err = buf.Write(b)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buf)
if err != nil {
return err
}
req.Header.Set("User-Agent", mc.userAgent)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("K6-Metrics-Protocol-Version", "2.0")

// TODO: Add authentication
// req.Header.Set("Authorization", fmt.Sprintf("Token %s", c.token))

resp, err := mc.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to push metrics: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to push metrics: push metrics response got an unexpected status code: %s", resp.Status)
}
mc.logger.WithField("t", time.Since(start)).WithField("size", len(b)).
Debug("Pushed part to cloud")
return nil
}

func newRequestBody(data *pbcloud.MetricSet) ([]byte, error) {
b, err := proto.Marshal(data)
if err != nil {
return nil, fmt.Errorf("encoding series as protobuf write request failed: %w", err)
}
if snappy.MaxEncodedLen(len(b)) < 0 {
return nil, fmt.Errorf("the protobuf message is too large to be handled by Snappy encoder; "+
"size: %d, limit: %d", len(b), 0xffffffff)
}
return snappy.Encode(nil, b), nil
}
85 changes: 85 additions & 0 deletions output/cloud/expv2/metrics_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package expv2

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type httpDoerFunc func(*http.Request) (*http.Response, error)

func (fn httpDoerFunc) Do(r *http.Request) (*http.Response, error) {
return fn(r)
}

func TestMetricsClientPush(t *testing.T) {
t.Parallel()

done := make(chan struct{}, 1)
reqs := 0
h := func(rw http.ResponseWriter, r *http.Request) {
defer close(done)
reqs++

assert.Equal(t, "/v2/metrics/test-ref-id", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
assert.Contains(t, r.Header.Get("User-Agent"), "k6cloud/v0.4")
assert.Equal(t, "application/x-protobuf", r.Header.Get("Content-Type"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
assert.Equal(t, "2.0", r.Header.Get("K6-Metrics-Protocol-Version"))

b, err := io.ReadAll(r.Body)
require.NoError(t, err)
assert.NotEmpty(t, b)
}

ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

mc := NewMetricsClient(testutils.NewLogger(t), ts.URL)
mc.httpClient = ts.Client()

mset := pbcloud.MetricSet{}
err := mc.Push(context.TODO(), "test-ref-id", &mset)
<-done
require.NoError(t, err)
assert.Equal(t, 1, reqs)
}

func TestMetricsClientPushUnexpectedStatus(t *testing.T) {
t.Parallel()

h := func(rw http.ResponseWriter, _ *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

mc := NewMetricsClient(testutils.NewLogger(t), ts.URL)
mc.httpClient = ts.Client()

err := mc.Push(context.TODO(), "test-ref-id", nil)
assert.ErrorContains(t, err, "500 Internal Server Error")
}

func TestMetricsClientPushError(t *testing.T) {
t.Parallel()

httpClientMock := func(_ *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("fake generated error")
}

mc := NewMetricsClient(testutils.NewLogger(t), "")
mc.httpClient = httpDoerFunc(httpClientMock)

err := mc.Push(context.TODO(), "test-ref-id", nil)
assert.ErrorContains(t, err, "fake generated error")
}
Loading

0 comments on commit a735438

Please sign in to comment.