diff --git a/output/cloud/expv2/collect.go b/output/cloud/expv2/collect.go index bef49c2fec62..c241ce1ce15a 100644 --- a/output/cloud/expv2/collect.go +++ b/output/cloud/expv2/collect.go @@ -2,24 +2,12 @@ package expv2 import ( "errors" - "strconv" "sync" "time" - "go.k6.io/k6/cloudapi/insights" - "go.k6.io/k6/lib/netext/httpext" "go.k6.io/k6/metrics" ) -const ( - metadataTraceIDKey = "trace_id" - scenarioTag = "scenario" - groupTag = "group" - nameTag = "name" - methodTag = "method" - statusTag = "status" -) - type timeBucket struct { Time int64 Sinks map[metrics.TimeSeries]metricValue @@ -175,94 +163,3 @@ func (c *collector) timeFromBucketID(id int64) int64 { func (c *collector) bucketCutoffID() int64 { return c.nowFunc().Add(-c.waitPeriod).UnixNano() / int64(c.aggregationPeriod) } - -type rmCollector struct { - testRunID int64 - buffer insights.RequestMetadatas - bufferMu *sync.Mutex -} - -func newRequestMetadatasCollector(testRunID int64) *rmCollector { - return &rmCollector{ - testRunID: testRunID, - buffer: nil, - bufferMu: &sync.Mutex{}, - } -} - -func (c *rmCollector) CollectRequestMetadatas(sampleContainers []metrics.SampleContainer) { - if len(sampleContainers) < 1 { - return - } - - // TODO(lukasz, other-proto-support): Support grpc/websocket trails. - var newBuffer insights.RequestMetadatas - for _, sampleContainer := range sampleContainers { - trail, ok := sampleContainer.(*httpext.Trail) - if !ok { - continue - } - - traceID, found := trail.Metadata[metadataTraceIDKey] - if !found { - continue - } - - m := insights.RequestMetadata{ - TraceID: traceID, - Start: trail.EndTime.Add(-trail.Duration), - End: trail.EndTime, - TestRunLabels: insights.TestRunLabels{ - ID: c.testRunID, - Scenario: c.getStringTagFromTrail(trail, scenarioTag), - Group: c.getStringTagFromTrail(trail, groupTag), - }, - ProtocolLabels: insights.ProtocolHTTPLabels{ - URL: c.getStringTagFromTrail(trail, nameTag), - Method: c.getStringTagFromTrail(trail, methodTag), - StatusCode: c.getIntTagFromTrail(trail, statusTag), - }, - } - - newBuffer = append(newBuffer, m) - } - - if len(newBuffer) < 1 { - return - } - - c.bufferMu.Lock() - defer c.bufferMu.Unlock() - - c.buffer = append(c.buffer, newBuffer...) -} - -func (c *rmCollector) PopAll() insights.RequestMetadatas { - c.bufferMu.Lock() - defer c.bufferMu.Unlock() - - b := c.buffer - c.buffer = nil - return b -} - -func (c *rmCollector) getStringTagFromTrail(trail *httpext.Trail, key string) string { - if tag, found := trail.Tags.Get(key); found { - return tag - } - - return "" -} - -func (c *rmCollector) getIntTagFromTrail(trail *httpext.Trail, key string) int64 { - if tag, found := trail.Tags.Get(key); found { - tagInt, err := strconv.ParseInt(tag, 10, 64) - if err != nil { - return 0 - } - - return tagInt - } - - return 0 -} diff --git a/output/cloud/expv2/collect_test.go b/output/cloud/expv2/collect_test.go index 5b5bf28ea4ee..0fa013a1f986 100644 --- a/output/cloud/expv2/collect_test.go +++ b/output/cloud/expv2/collect_test.go @@ -1,16 +1,12 @@ package expv2 import ( - "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.k6.io/k6/cloudapi/insights" - "go.k6.io/k6/lib/netext" - "go.k6.io/k6/lib/netext/httpext" "go.k6.io/k6/metrics" ) @@ -312,137 +308,3 @@ func TestBucketQPushPopConcurrency(t *testing.T) { } } } - -func Test_requestMetadatasCollector_CollectRequestMetadatas_DoesNothingWithEmptyData(t *testing.T) { - t.Parallel() - - // Given - testRunID := int64(1337) - col := newRequestMetadatasCollector(testRunID) - var data []metrics.SampleContainer - - // When - col.CollectRequestMetadatas(data) - - // Then - require.Empty(t, col.buffer) -} - -func Test_requestMetadatasCollector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestMetadatas(t *testing.T) { - t.Parallel() - - // Given - testRunID := int64(1337) - col := newRequestMetadatasCollector(testRunID) - data := []metrics.SampleContainer{ - &httpext.Trail{ - EndTime: time.Unix(10, 0), - Duration: time.Second, - Tags: metrics.NewRegistry().RootTagSet(). - With(scenarioTag, "test-scenario-1"). - With(groupTag, "test-group-1"). - With(nameTag, "test-url-1"). - With(methodTag, "test-method-1"). - With(statusTag, "200"), - Metadata: map[string]string{ - metadataTraceIDKey: "test-trace-id-1", - }, - }, - &httpext.Trail{ - // HTTP trail without trace ID should be ignored - }, - &netext.NetTrail{ - // Net trail should be ignored - }, - &httpext.Trail{ - EndTime: time.Unix(20, 0), - Duration: time.Second, - Tags: metrics.NewRegistry().RootTagSet(). - With(scenarioTag, "test-scenario-2"). - With(groupTag, "test-group-2"). - With(nameTag, "test-url-2"). - With(methodTag, "test-method-2"). - With(statusTag, "401"), - Metadata: map[string]string{ - metadataTraceIDKey: "test-trace-id-2", - }, - }, - &httpext.Trail{ - EndTime: time.Unix(20, 0), - Duration: time.Second, - Tags: metrics.NewRegistry().RootTagSet(), - // HTTP trail without `trace_id` metadata key should be ignored - Metadata: map[string]string{}, - }, - &httpext.Trail{ - EndTime: time.Unix(20, 0), - Duration: time.Second, - // If no tags are present, output should be set to `unknown` - Tags: metrics.NewRegistry().RootTagSet(), - Metadata: map[string]string{ - metadataTraceIDKey: "test-trace-id-3", - }, - }, - } - - // When - col.CollectRequestMetadatas(data) - - // Then - require.Len(t, col.buffer, 3) - require.Contains(t, col.buffer, insights.RequestMetadata{ - TraceID: "test-trace-id-1", - Start: time.Unix(9, 0), - End: time.Unix(10, 0), - TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-1", Group: "test-group-1"}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200}, - }) - require.Contains(t, col.buffer, insights.RequestMetadata{ - TraceID: "test-trace-id-2", - Start: time.Unix(19, 0), - End: time.Unix(20, 0), - TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-2", Group: "test-group-2"}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-2", Method: "test-method-2", StatusCode: 401}, - }) - require.Contains(t, col.buffer, insights.RequestMetadata{ - TraceID: "test-trace-id-3", - Start: time.Unix(19, 0), - End: time.Unix(20, 0), - TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "", Group: ""}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "", Method: "", StatusCode: 0}, - }) -} - -func Test_requestMetadatasCollector_PopAll_DoesNothingWithEmptyData(t *testing.T) { - t.Parallel() - - // Given - data := insights.RequestMetadatas{ - { - TraceID: "test-trace-id-1", - Start: time.Unix(9, 0), - End: time.Unix(10, 0), - TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-1", Group: "test-group-1"}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200}, - }, - { - TraceID: "test-trace-id-2", - Start: time.Unix(19, 0), - End: time.Unix(20, 0), - TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "unknown", Group: "unknown"}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "unknown", Method: "unknown", StatusCode: 0}, - }, - } - col := &rmCollector{ - buffer: data, - bufferMu: &sync.Mutex{}, - } - - // When - got := col.PopAll() - - // Then - require.Nil(t, col.buffer) - require.Empty(t, col.buffer) - require.Equal(t, data, got) -} diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 4140a609bf80..63cb3ab75db1 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -1,11 +1,10 @@ package expv2 import ( - "context" "time" "github.com/sirupsen/logrus" - "go.k6.io/k6/cloudapi/insights" + "go.k6.io/k6/metrics" "go.k6.io/k6/output/cloud/expv2/pbcloud" ) @@ -189,30 +188,3 @@ func (msb *metricSetBuilder) recordDiscardedLabels(labels []string) { msb.discardedLabels[key] = struct{}{} } } - -// insightsClient is an interface for sending request metadatas to the Insights API. -type insightsClient interface { - IngestRequestMetadatasBatch(context.Context, insights.RequestMetadatas) error - Close() error -} - -type requestMetadatasFlusher struct { - client insightsClient - collector requestMetadatasCollector -} - -func newTracesFlusher(client insightsClient, collector requestMetadatasCollector) *requestMetadatasFlusher { - return &requestMetadatasFlusher{ - client: client, - collector: collector, - } -} - -func (f *requestMetadatasFlusher) flush() error { - requestMetadatas := f.collector.PopAll() - if len(requestMetadatas) < 1 { - return nil - } - - return f.client.IngestRequestMetadatasBatch(context.Background(), requestMetadatas) -} diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index 1ad432ef66dd..35847ef8dd81 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -1,90 +1,18 @@ package expv2 import ( - "context" - "errors" "strconv" "testing" - "time" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.k6.io/k6/cloudapi/insights" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/metrics" "go.k6.io/k6/output/cloud/expv2/pbcloud" ) -type mockWorkingInsightsClient struct { - ingestRequestMetadatasBatchInvoked bool - dataSent bool - data insights.RequestMetadatas -} - -func (c *mockWorkingInsightsClient) IngestRequestMetadatasBatch(ctx context.Context, data insights.RequestMetadatas) error { - c.ingestRequestMetadatasBatchInvoked = true - - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - c.dataSent = true - c.data = data - - return nil -} - -func (c *mockWorkingInsightsClient) Close() error { - return nil -} - -type mockFailingInsightsClient struct { - err error -} - -func (c *mockFailingInsightsClient) IngestRequestMetadatasBatch(_ context.Context, _ insights.RequestMetadatas) error { - return c.err -} - -func (c *mockFailingInsightsClient) Close() error { - return nil -} - -type mockRequestMetadatasCollector struct { - data insights.RequestMetadatas -} - -func (m *mockRequestMetadatasCollector) CollectRequestMetadatas(_ []metrics.SampleContainer) { - panic("implement me") -} - -func (m *mockRequestMetadatasCollector) PopAll() insights.RequestMetadatas { - return m.data -} - -func newMockRequestMetadatas() insights.RequestMetadatas { - return insights.RequestMetadatas{ - { - TraceID: "test-trace-id-1", - Start: time.Unix(1337, 0), - End: time.Unix(1338, 0), - TestRunLabels: insights.TestRunLabels{ID: 1, Scenario: "test-scenario-1", Group: "test-group-1"}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200}, - }, - { - TraceID: "test-trace-id-2", - Start: time.Unix(2337, 0), - End: time.Unix(2338, 0), - TestRunLabels: insights.TestRunLabels{ID: 1, Scenario: "test-scenario-2", Group: "test-group-2"}, - ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-2", Method: "test-method-2", StatusCode: 200}, - }, - } -} - // TODO: additional case // case: add when the metric already exist // case: add when the metric and the timeseries already exist @@ -290,58 +218,3 @@ func (pm *pusherMock) push(ms *pbcloud.MetricSet) error { pm.pushCalled++ return nil } - -func Test_tracesFlusher_Flush_ReturnsNoErrorWithWorkingInsightsClientAndNonCancelledContextAndNoData(t *testing.T) { - t.Parallel() - - // Given - data := insights.RequestMetadatas{} - cli := &mockWorkingInsightsClient{} - col := &mockRequestMetadatasCollector{data: data} - flusher := newTracesFlusher(cli, col) - - // When - err := flusher.flush() - - // Then - require.NoError(t, err) - require.False(t, cli.ingestRequestMetadatasBatchInvoked) - require.False(t, cli.dataSent) - require.Empty(t, cli.data) -} - -func Test_tracesFlusher_Flush_ReturnsNoErrorWithWorkingInsightsClientAndNonCancelledContextAndData(t *testing.T) { - t.Parallel() - - // Given - data := newMockRequestMetadatas() - cli := &mockWorkingInsightsClient{} - col := &mockRequestMetadatasCollector{data: data} - flusher := newTracesFlusher(cli, col) - - // When - err := flusher.flush() - - // Then - require.NoError(t, err) - require.True(t, cli.ingestRequestMetadatasBatchInvoked) - require.True(t, cli.dataSent) - require.Equal(t, data, cli.data) -} - -func Test_tracesFlusher_Flush_ReturnsErrorWithFailingInsightsClientAndNonCancelledContext(t *testing.T) { - t.Parallel() - - // Given - data := newMockRequestMetadatas() - testErr := errors.New("test-error") - cli := &mockFailingInsightsClient{err: testErr} - col := &mockRequestMetadatasCollector{data: data} - flusher := newTracesFlusher(cli, col) - - // When - err := flusher.flush() - - // Then - require.ErrorIs(t, err, testErr) -} diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index af6fb73cb372..4f991efc84ab 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -19,17 +19,11 @@ import ( "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" "go.k6.io/k6/output" + insightsOutput "go.k6.io/k6/output/cloud/insights" "github.com/sirupsen/logrus" ) -// requestMetadatasCollector is an interface for collecting request metadatas -// and retrieving them, so they can be flushed using a flusher. -type requestMetadatasCollector interface { - CollectRequestMetadatas([]metrics.SampleContainer) - PopAll() insights.RequestMetadatas -} - // flusher is an interface for flushing data to the cloud. type flusher interface { flush() error @@ -47,9 +41,9 @@ type Output struct { collector *collector flushing flusher - insightsClient insightsClient - requestMetadatasCollector requestMetadatasCollector - requestMetadatasFlusher flusher + insightsClient insightsOutput.Client + requestMetadatasCollector insightsOutput.RequestMetadatasCollector + requestMetadatasFlusher insightsOutput.RequestMetadatasFlusher // wg tracks background goroutines wg sync.WaitGroup @@ -124,12 +118,12 @@ func (o *Output) Start() error { o.runFlushWorkers() o.periodicInvoke(o.config.AggregationPeriod.TimeDuration(), o.collectSamples) - if o.tracingEnabled() { + if insightsOutput.Enabled(o.config) { testRunID, err := strconv.ParseInt(o.testRunID, 10, 64) if err != nil { return err } - o.requestMetadatasCollector = newRequestMetadatasCollector(testRunID) + o.requestMetadatasCollector = insightsOutput.NewCollector(testRunID) insightsClientConfig := insights.ClientConfig{ IngesterHost: o.config.TracesHost.String, @@ -156,14 +150,14 @@ func (o *Output) Start() error { } insightsClient := insights.NewClient(insightsClientConfig) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := insightsClient.Dial(ctx); err != nil { return err } o.insightsClient = insightsClient - o.requestMetadatasFlusher = newTracesFlusher(insightsClient, o.requestMetadatasCollector) + o.requestMetadatasFlusher = insightsOutput.NewFlusher(insightsClient, o.requestMetadatasCollector) o.runFlushRequestMetadatas() } @@ -194,7 +188,7 @@ func (o *Output) StopWithTestError(_ error) error { o.flushMetrics() // Flush all the remaining request metadatas. - if o.tracingEnabled() { + if insightsOutput.Enabled(o.config) { o.flushRequestMetadatas() if err := o.insightsClient.Close(); err != nil { o.logger.WithError(err).Error("Failed to close the insights client") @@ -277,7 +271,7 @@ func (o *Output) collectSamples() { samples := o.GetBufferedSamples() o.collector.CollectSamples(samples) - if o.tracingEnabled() { + if insightsOutput.Enabled(o.config) { o.requestMetadatasCollector.CollectRequestMetadatas(samples) } } @@ -322,7 +316,7 @@ func (o *Output) runFlushRequestMetadatas() { func (o *Output) flushRequestMetadatas() { start := time.Now() - err := o.requestMetadatasFlusher.flush() + err := o.requestMetadatasFlusher.Flush() if err != nil { o.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud") } @@ -378,18 +372,6 @@ func (o *Output) handleFlushError(err error) { }) } -func (o *Output) tracingEnabled() bool { - // TODO(lukasz): Check if k6 x Tempo is enabled - // - // We want to check whether a given organization is - // eligible for k6 x Tempo feature. If it isn't, we may - // consider to skip the traces output. - // - // We currently don't have a backend API to check this - // information. - return o.config.TracesEnabled.ValueOrZero() -} - func printableConfig(c cloudapi.Config) map[string]any { m := map[string]any{ "host": c.Host.String, diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index 852329baa606..2ad87add7dd1 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -11,11 +11,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v3" + "go.k6.io/k6/cloudapi" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" - "gopkg.in/guregu/null.v3" ) func TestNew(t *testing.T) { @@ -496,6 +497,10 @@ func TestOutputFlushRequestMetadatasAbort(t *testing.T) { type flusherFunc func() +func (ff flusherFunc) Flush() error { + return ff.flush() +} + func (ff flusherFunc) flush() error { ff() return nil diff --git a/output/cloud/insights/collect.go b/output/cloud/insights/collect.go new file mode 100644 index 000000000000..c5b58f03eec3 --- /dev/null +++ b/output/cloud/insights/collect.go @@ -0,0 +1,117 @@ +package insights + +import ( + "strconv" + "sync" + + "go.k6.io/k6/cloudapi/insights" + "go.k6.io/k6/lib/netext/httpext" + "go.k6.io/k6/metrics" +) + +const ( + metadataTraceIDKey = "trace_id" + scenarioTag = "scenario" + groupTag = "group" + nameTag = "name" + methodTag = "method" + statusTag = "status" +) + +// RequestMetadatasCollector is an interface for collecting request metadatas +// and retrieving them, so they can be flushed using a flusher. +type RequestMetadatasCollector interface { + CollectRequestMetadatas([]metrics.SampleContainer) + PopAll() insights.RequestMetadatas +} + +type Collector struct { + testRunID int64 + buffer insights.RequestMetadatas + bufferMu *sync.Mutex +} + +func NewCollector(testRunID int64) *Collector { + return &Collector{ + testRunID: testRunID, + buffer: nil, + bufferMu: &sync.Mutex{}, + } +} + +func (c *Collector) CollectRequestMetadatas(sampleContainers []metrics.SampleContainer) { + if len(sampleContainers) < 1 { + return + } + + // TODO(lukasz, other-proto-support): Support grpc/websocket trails. + var newBuffer insights.RequestMetadatas + for _, sampleContainer := range sampleContainers { + trail, ok := sampleContainer.(*httpext.Trail) + if !ok { + continue + } + + traceID, found := trail.Metadata[metadataTraceIDKey] + if !found { + continue + } + + m := insights.RequestMetadata{ + TraceID: traceID, + Start: trail.EndTime.Add(-trail.Duration), + End: trail.EndTime, + TestRunLabels: insights.TestRunLabels{ + ID: c.testRunID, + Scenario: c.getStringTagFromTrail(trail, scenarioTag), + Group: c.getStringTagFromTrail(trail, groupTag), + }, + ProtocolLabels: insights.ProtocolHTTPLabels{ + URL: c.getStringTagFromTrail(trail, nameTag), + Method: c.getStringTagFromTrail(trail, methodTag), + StatusCode: c.getIntTagFromTrail(trail, statusTag), + }, + } + + newBuffer = append(newBuffer, m) + } + + if len(newBuffer) < 1 { + return + } + + c.bufferMu.Lock() + defer c.bufferMu.Unlock() + + c.buffer = append(c.buffer, newBuffer...) +} + +func (c *Collector) PopAll() insights.RequestMetadatas { + c.bufferMu.Lock() + defer c.bufferMu.Unlock() + + b := c.buffer + c.buffer = nil + return b +} + +func (c *Collector) getStringTagFromTrail(trail *httpext.Trail, key string) string { + if tag, found := trail.Tags.Get(key); found { + return tag + } + + return "" +} + +func (c *Collector) getIntTagFromTrail(trail *httpext.Trail, key string) int64 { + if tag, found := trail.Tags.Get(key); found { + tagInt, err := strconv.ParseInt(tag, 10, 64) + if err != nil { + return 0 + } + + return tagInt + } + + return 0 +} diff --git a/output/cloud/insights/collect_test.go b/output/cloud/insights/collect_test.go new file mode 100644 index 000000000000..aeca952b9bca --- /dev/null +++ b/output/cloud/insights/collect_test.go @@ -0,0 +1,148 @@ +package insights + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.k6.io/k6/cloudapi/insights" + "go.k6.io/k6/lib/netext" + "go.k6.io/k6/lib/netext/httpext" + "go.k6.io/k6/metrics" +) + +func Test_Collector_CollectRequestMetadatas_DoesNothingWithEmptyData(t *testing.T) { + t.Parallel() + + // Given + testRunID := int64(1337) + col := NewCollector(testRunID) + var data []metrics.SampleContainer + + // When + col.CollectRequestMetadatas(data) + + // Then + require.Empty(t, col.buffer) +} + +func Test_Collector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestMetadatas(t *testing.T) { + t.Parallel() + + // Given + testRunID := int64(1337) + col := NewCollector(testRunID) + data := []metrics.SampleContainer{ + &httpext.Trail{ + EndTime: time.Unix(10, 0), + Duration: time.Second, + Tags: metrics.NewRegistry().RootTagSet(). + With(scenarioTag, "test-scenario-1"). + With(groupTag, "test-group-1"). + With(nameTag, "test-url-1"). + With(methodTag, "test-method-1"). + With(statusTag, "200"), + Metadata: map[string]string{ + metadataTraceIDKey: "test-trace-id-1", + }, + }, + &httpext.Trail{ + // HTTP trail without trace ID should be ignored + }, + &netext.NetTrail{ + // Net trail should be ignored + }, + &httpext.Trail{ + EndTime: time.Unix(20, 0), + Duration: time.Second, + Tags: metrics.NewRegistry().RootTagSet(). + With(scenarioTag, "test-scenario-2"). + With(groupTag, "test-group-2"). + With(nameTag, "test-url-2"). + With(methodTag, "test-method-2"). + With(statusTag, "401"), + Metadata: map[string]string{ + metadataTraceIDKey: "test-trace-id-2", + }, + }, + &httpext.Trail{ + EndTime: time.Unix(20, 0), + Duration: time.Second, + Tags: metrics.NewRegistry().RootTagSet(), + // HTTP trail without `trace_id` metadata key should be ignored + Metadata: map[string]string{}, + }, + &httpext.Trail{ + EndTime: time.Unix(20, 0), + Duration: time.Second, + // If no tags are present, output should be set to `unknown` + Tags: metrics.NewRegistry().RootTagSet(), + Metadata: map[string]string{ + metadataTraceIDKey: "test-trace-id-3", + }, + }, + } + + // When + col.CollectRequestMetadatas(data) + + // Then + require.Len(t, col.buffer, 3) + require.Contains(t, col.buffer, insights.RequestMetadata{ + TraceID: "test-trace-id-1", + Start: time.Unix(9, 0), + End: time.Unix(10, 0), + TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-1", Group: "test-group-1"}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200}, + }) + require.Contains(t, col.buffer, insights.RequestMetadata{ + TraceID: "test-trace-id-2", + Start: time.Unix(19, 0), + End: time.Unix(20, 0), + TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-2", Group: "test-group-2"}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-2", Method: "test-method-2", StatusCode: 401}, + }) + require.Contains(t, col.buffer, insights.RequestMetadata{ + TraceID: "test-trace-id-3", + Start: time.Unix(19, 0), + End: time.Unix(20, 0), + TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "", Group: ""}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "", Method: "", StatusCode: 0}, + }) +} + +func Test_Collector_PopAll_DoesNothingWithEmptyData(t *testing.T) { + t.Parallel() + + // Given + data := insights.RequestMetadatas{ + { + TraceID: "test-trace-id-1", + Start: time.Unix(9, 0), + End: time.Unix(10, 0), + TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-1", Group: "test-group-1"}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200}, + }, + { + TraceID: "test-trace-id-2", + Start: time.Unix(19, 0), + End: time.Unix(20, 0), + TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "unknown", Group: "unknown"}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "unknown", Method: "unknown", StatusCode: 0}, + }, + } + col := &Collector{ + buffer: data, + bufferMu: &sync.Mutex{}, + } + + // When + got := col.PopAll() + + // Then + require.Nil(t, col.buffer) + require.Empty(t, col.buffer) + require.Equal(t, data, got) +} diff --git a/output/cloud/insights/enable.go b/output/cloud/insights/enable.go new file mode 100644 index 000000000000..0655cd152203 --- /dev/null +++ b/output/cloud/insights/enable.go @@ -0,0 +1,17 @@ +package insights + +import ( + "go.k6.io/k6/cloudapi" +) + +func Enabled(config cloudapi.Config) bool { + // TODO(lukasz): Check if k6 x Tempo is enabled + // + // We want to check whether a given organization is + // eligible for k6 x Tempo feature. If it isn't, we may + // consider to skip the traces output. + // + // We currently don't have a backend API to check this + // information. + return config.TracesEnabled.ValueOrZero() +} diff --git a/output/cloud/insights/flush.go b/output/cloud/insights/flush.go new file mode 100644 index 000000000000..b2c2ee216a90 --- /dev/null +++ b/output/cloud/insights/flush.go @@ -0,0 +1,39 @@ +package insights + +import ( + "context" + + "go.k6.io/k6/cloudapi/insights" +) + +// Client is an interface for sending request metadatas to the Insights API. +type Client interface { + IngestRequestMetadatasBatch(context.Context, insights.RequestMetadatas) error + Close() error +} + +// RequestMetadatasFlusher is an interface for flushing data to the cloud. +type RequestMetadatasFlusher interface { + Flush() error +} + +type Flusher struct { + client Client + collector RequestMetadatasCollector +} + +func NewFlusher(client Client, collector RequestMetadatasCollector) *Flusher { + return &Flusher{ + client: client, + collector: collector, + } +} + +func (f *Flusher) Flush() error { + requestMetadatas := f.collector.PopAll() + if len(requestMetadatas) < 1 { + return nil + } + + return f.client.IngestRequestMetadatasBatch(context.Background(), requestMetadatas) +} diff --git a/output/cloud/insights/flush_test.go b/output/cloud/insights/flush_test.go new file mode 100644 index 000000000000..3ee049281c3b --- /dev/null +++ b/output/cloud/insights/flush_test.go @@ -0,0 +1,136 @@ +package insights + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.k6.io/k6/cloudapi/insights" + "go.k6.io/k6/metrics" +) + +type mockWorkingInsightsClient struct { + ingestRequestMetadatasBatchInvoked bool + dataSent bool + data insights.RequestMetadatas +} + +func (c *mockWorkingInsightsClient) IngestRequestMetadatasBatch(ctx context.Context, data insights.RequestMetadatas) error { + c.ingestRequestMetadatasBatchInvoked = true + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + c.dataSent = true + c.data = data + + return nil +} + +func (c *mockWorkingInsightsClient) Close() error { + return nil +} + +type mockFailingInsightsClient struct { + err error +} + +func (c *mockFailingInsightsClient) IngestRequestMetadatasBatch(_ context.Context, _ insights.RequestMetadatas) error { + return c.err +} + +func (c *mockFailingInsightsClient) Close() error { + return nil +} + +type mockRequestMetadatasCollector struct { + data insights.RequestMetadatas +} + +func (m *mockRequestMetadatasCollector) CollectRequestMetadatas(_ []metrics.SampleContainer) { + panic("implement me") +} + +func (m *mockRequestMetadatasCollector) PopAll() insights.RequestMetadatas { + return m.data +} + +func newMockRequestMetadatas() insights.RequestMetadatas { + return insights.RequestMetadatas{ + { + TraceID: "test-trace-id-1", + Start: time.Unix(1337, 0), + End: time.Unix(1338, 0), + TestRunLabels: insights.TestRunLabels{ID: 1, Scenario: "test-scenario-1", Group: "test-group-1"}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200}, + }, + { + TraceID: "test-trace-id-2", + Start: time.Unix(2337, 0), + End: time.Unix(2338, 0), + TestRunLabels: insights.TestRunLabels{ID: 1, Scenario: "test-scenario-2", Group: "test-group-2"}, + ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-2", Method: "test-method-2", StatusCode: 200}, + }, + } +} + +func Test_tracesFlusher_Flush_ReturnsNoErrorWithWorkingInsightsClientAndNonCancelledContextAndNoData(t *testing.T) { + t.Parallel() + + // Given + data := insights.RequestMetadatas{} + cli := &mockWorkingInsightsClient{} + col := &mockRequestMetadatasCollector{data: data} + flusher := NewFlusher(cli, col) + + // When + err := flusher.Flush() + + // Then + require.NoError(t, err) + require.False(t, cli.ingestRequestMetadatasBatchInvoked) + require.False(t, cli.dataSent) + require.Empty(t, cli.data) +} + +func Test_tracesFlusher_Flush_ReturnsNoErrorWithWorkingInsightsClientAndNonCancelledContextAndData(t *testing.T) { + t.Parallel() + + // Given + data := newMockRequestMetadatas() + cli := &mockWorkingInsightsClient{} + col := &mockRequestMetadatasCollector{data: data} + flusher := NewFlusher(cli, col) + + // When + err := flusher.Flush() + + // Then + require.NoError(t, err) + require.True(t, cli.ingestRequestMetadatasBatchInvoked) + require.True(t, cli.dataSent) + require.Equal(t, data, cli.data) +} + +func Test_tracesFlusher_Flush_ReturnsErrorWithFailingInsightsClientAndNonCancelledContext(t *testing.T) { + t.Parallel() + + // Given + data := newMockRequestMetadatas() + testErr := errors.New("test-error") + cli := &mockFailingInsightsClient{err: testErr} + col := &mockRequestMetadatasCollector{data: data} + flusher := NewFlusher(cli, col) + + // When + err := flusher.Flush() + + // Then + require.ErrorIs(t, err, testErr) +}