Skip to content

Commit

Permalink
Addressed change requests
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Apr 21, 2023
1 parent ad173b7 commit d7df637
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 39 deletions.
24 changes: 14 additions & 10 deletions output/cloud/expv2/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package expv2
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"sync"
Expand All @@ -25,26 +26,26 @@ type MetricsClient struct {
httpClient httpDoer
logger logrus.FieldLogger
token string
host string
userAgent string

pushBufferPool sync.Pool
baseURL string
}

// NewMetricsClient creates and initializes a new MetricsClient.
func NewMetricsClient(logger logrus.FieldLogger, host string, token string) (*MetricsClient, error) {
if host == "" {
return nil, fmt.Errorf("host is required")
return nil, errors.New("host is required")
}
if token == "" {
return nil, fmt.Errorf("token is required")
return nil, errors.New("token is required")
}
return &MetricsClient{
httpClient: &http.Client{Timeout: 5 * time.Second},
logger: logger,
host: host,
baseURL: host + "/v2/metrics/",
token: token,
userAgent: fmt.Sprintf("k6cloud/v%s", consts.Version),
userAgent: "k6cloud/v" + consts.Version,
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
Expand All @@ -56,10 +57,9 @@ func NewMetricsClient(logger logrus.FieldLogger, host string, token string) (*Me
// Push pushes the provided metric samples for the given referenceID
func (mc *MetricsClient) Push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error {
if referenceID == "" {
return fmt.Errorf("a Reference ID of the test run is required")
return errors.New("a Reference ID of the test run is required")
}
start := time.Now()
url := fmt.Sprintf("%s/v2/metrics/%s", mc.host, referenceID)

b, err := newRequestBody(samples)
if err != nil {
Expand All @@ -74,6 +74,10 @@ func (mc *MetricsClient) Push(ctx context.Context, referenceID string, samples *
if err != nil {
return err
}
// TODO: it is always the same
// we don't expect to share this client across different refID
// with a bit of effort we can find a way to just allocate once
url := mc.baseURL + referenceID
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buf)
if err != nil {
return err
Expand All @@ -83,17 +87,17 @@ func (mc *MetricsClient) Push(ctx context.Context, referenceID string, samples *
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("K6-Metrics-Protocol-Version", "2.0")
req.Header.Set("Authorization", fmt.Sprintf("Token %s", mc.token))
req.Header.Set("Authorization", "Token "+mc.token)

resp, err := mc.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to push metrics: %w", err)
return err
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to push metrics: push metrics response got an unexpected status code: %s", resp.Status)
return fmt.Errorf("response got an unexpected status code: %s", resp.Status)
}
mc.logger.WithField("t", time.Since(start)).WithField("size", len(b)).
Debug("Pushed part to cloud")
Expand Down
41 changes: 19 additions & 22 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ func (o *Output) flushMetrics() {
return
}

start := time.Now()

if hasOne := o.collectSamples(); !hasOne {
samplesContainers := o.GetBufferedSamples()
if len(samplesContainers) < 1 {
return
}

start := time.Now()
o.collectSamples(samplesContainers)

// TODO: in case an aggregation period will be added then
// it continue only if the aggregation time frame passed

Expand All @@ -110,24 +112,20 @@ func (o *Output) flushMetrics() {

err := o.client.Push(ctx, o.referenceID, &pbcloud.MetricSet{Metrics: metricSet})
if err != nil {
o.logger.Error(err)
o.logger.WithError(err).Error("failed to push metrics to the cloud")
return
}

o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered samples to the cloud")
}

func (o *Output) collectSamples() (updates bool) {
samplesContainers := o.GetBufferedSamples()
if len(samplesContainers) < 1 {
return false
}

// collectSamples drain the buffer and collect all the samples
func (o *Output) collectSamples(containers []metrics.SampleContainer) {
var (
aggr aggregatedSamples
ok bool
)
for _, sampleContainer := range samplesContainers {
for _, sampleContainer := range containers {
samples := sampleContainer.GetSamples()
for i := 0; i < len(samples); i++ {
aggr, ok = o.activeSeries[samples[i].Metric]
Expand All @@ -140,21 +138,19 @@ func (o *Output) collectSamples() (updates bool) {
aggr.AddSample(&samples[i])
}
}

return true
}

func (o *Output) mapMetricProto(m *metrics.Metric, as aggregatedSamples) *pbcloud.Metric {
var mtype pbcloud.MetricType
switch m.Type {
case metrics.Counter:
mtype = pbcloud.MetricType_COUNTER
mtype = pbcloud.MetricType_METRIC_TYPE_COUNTER
case metrics.Gauge:
mtype = pbcloud.MetricType_GAUGE
mtype = pbcloud.MetricType_METRIC_TYPE_GAUGE
case metrics.Rate:
mtype = pbcloud.MetricType_RATE
mtype = pbcloud.MetricType_METRIC_TYPE_RATE
case metrics.Trend:
mtype = pbcloud.MetricType_TREND
mtype = pbcloud.MetricType_METRIC_TYPE_TREND
}

// TODO: based on the fact that this mapping is a pointer
Expand Down Expand Up @@ -230,11 +226,12 @@ func (as *aggregatedSamples) MapAsProto(refID string) []*pbcloud.TimeSeries {
gaugeSamples := &pbcloud.GaugeSamples{}
for _, gaugeSample := range samples {
gaugeSamples.Values = append(gaugeSamples.Values, &pbcloud.GaugeValue{
Time: timestamppb.New(gaugeSample.Time),
Last: gaugeSample.Value,
Min: gaugeSample.Value,
Max: gaugeSample.Value,
Avg: gaugeSample.Value,
Time: timestamppb.New(gaugeSample.Time),
Last: gaugeSample.Value,
Min: gaugeSample.Value,
Max: gaugeSample.Value,
Avg: gaugeSample.Value,
Count: 1,
})
}
pb.Samples = &pbcloud.TimeSeries_GaugeSamples{
Expand Down
11 changes: 4 additions & 7 deletions output/cloud/expv2/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ func TestOutputCollectSamples(t *testing.T) {
},
}

o.AddMetricSamples([]metrics.SampleContainer{
o.collectSamples([]metrics.SampleContainer{
metrics.Samples{s1},
metrics.Samples{s2},
metrics.Samples{subs1},
metrics.Samples{s1},
})

hasOne := o.collectSamples()
require.True(t, hasOne)
require.Len(t, o.activeSeries, 2)

assert.Equal(t, []*metrics.Sample{&s1, &s1}, o.activeSeries[m1].Samples[s1.TimeSeries])
Expand Down Expand Up @@ -79,7 +76,7 @@ func TestOutputMapMetricProto(t *testing.T) {

protodata := o.mapMetricProto(m1, aggSamples)
assert.Equal(t, "metric1", protodata.Name)
assert.Equal(t, "COUNTER", pbcloud.MetricType_name[int32(protodata.Type)])
assert.Equal(t, "METRIC_TYPE_COUNTER", pbcloud.MetricType_name[int32(protodata.Type)])
assert.Len(t, protodata.TimeSeries, 1)
}

Expand Down Expand Up @@ -116,8 +113,8 @@ func TestAggregatedSamplesMapAsProto(t *testing.T) {
Samples: &pbcloud.TimeSeries_GaugeSamples{
GaugeSamples: &pbcloud.GaugeSamples{
Values: []*pbcloud.GaugeValue{
{Time: timestamppb.New(now), Last: 42, Min: 42, Max: 42, Avg: 42},
{Time: timestamppb.New(now), Last: 42, Min: 42, Max: 42, Avg: 42},
{Time: timestamppb.New(now), Last: 42, Min: 42, Max: 42, Avg: 42, Count: 1},
{Time: timestamppb.New(now), Last: 42, Min: 42, Max: 42, Avg: 42, Count: 1},
},
},
},
Expand Down

0 comments on commit d7df637

Please sign in to comment.