diff --git a/receiver/datadogreceiver/go.mod b/receiver/datadogreceiver/go.mod index e528445fa719..b84bbdab4f63 100644 --- a/receiver/datadogreceiver/go.mod +++ b/receiver/datadogreceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datado go 1.21.0 require ( + github.com/DataDog/agent-payload/v5 v5.0.124 github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9 github.com/DataDog/datadog-api-client-go/v2 v2.28.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.107.0 diff --git a/receiver/datadogreceiver/go.sum b/receiver/datadogreceiver/go.sum index 3b20112adc39..10579682242d 100644 --- a/receiver/datadogreceiver/go.sum +++ b/receiver/datadogreceiver/go.sum @@ -1,3 +1,5 @@ +github.com/DataDog/agent-payload/v5 v5.0.124 h1:m7MLSy8oyLQDT59oesWhZiSFDzV/Q9CWzhue7s2SFaw= +github.com/DataDog/agent-payload/v5 v5.0.124/go.mod h1:FgVQKmVdqdmZTbxIptqJC/l+xEzdiXsaAOs/vGAvWzs= github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9 h1:/j0jQpk3EkJ3F3x88LDhWKk6AxE+JXf1H3dDvZFtm80= github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9/go.mod h1:gHkSUTn6H6UEZQHY3XWBIGNjfI3Tdi0IxlrxIFBWDwU= github.com/DataDog/datadog-api-client-go/v2 v2.28.0 h1:seA/bTwKnB/7ELtZrXIw5csksdR78eGtMrpOUK5XJKA= diff --git a/receiver/datadogreceiver/internal/translator/batcher.go b/receiver/datadogreceiver/internal/translator/batcher.go index 5c77d0add735..72ddb5dd4265 100644 --- a/receiver/datadogreceiver/internal/translator/batcher.go +++ b/receiver/datadogreceiver/internal/translator/batcher.go @@ -48,14 +48,14 @@ var metricTypeMap = map[string]pmetric.MetricType{ } func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) dimensions { - resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool) + attrs := tagsToAttributes(tags, host, stringPool) return dimensions{ name: name, metricType: metricTypeMap[metricType], buildInfo: version, - resourceAttrs: resourceAttrs, - scopeAttrs: scopeAttrs, - dpAttrs: dpAttrs, + resourceAttrs: attrs.resource, + scopeAttrs: attrs.scope, + dpAttrs: attrs.dp, } } diff --git a/receiver/datadogreceiver/internal/translator/batcher_test.go b/receiver/datadogreceiver/internal/translator/batcher_test.go index 5a6b27dd4d41..a2938d44303c 100644 --- a/receiver/datadogreceiver/internal/translator/batcher_test.go +++ b/receiver/datadogreceiver/internal/translator/batcher_test.go @@ -47,15 +47,16 @@ func TestMetricBatcher(t *testing.T) { }, expect: func(t *testing.T, result pmetric.Metrics) { // Different hosts should result in different ResourceMetrics + requireMetricAndDataPointCounts(t, result, 2, 2) require.Equal(t, 2, result.ResourceMetrics().Len()) resource1 := result.ResourceMetrics().At(0) resource2 := result.ResourceMetrics().At(1) - v, exists := resource1.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host1", v.AsString()) - v, exists = resource2.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host2", v.AsString()) + + res1ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool()) + requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource) + + res2ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host2", newStringPool()) + requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource) require.Equal(t, 1, resource1.ScopeMetrics().Len()) require.Equal(t, 1, resource2.ScopeMetrics().Len()) @@ -63,8 +64,8 @@ func TestMetricBatcher(t *testing.T) { require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len()) require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len()) - require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name()) - require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name()) + requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1) + requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1) }, }, { @@ -98,18 +99,19 @@ func TestMetricBatcher(t *testing.T) { expect: func(t *testing.T, result pmetric.Metrics) { // The different metrics will fall under the same ResourceMetric and ScopeMetric // and there will be separate metrics under the ScopeMetric.Metrics() + requireMetricAndDataPointCounts(t, result, 2, 2) require.Equal(t, 1, result.ResourceMetrics().Len()) resource := result.ResourceMetrics().At(0) - v, exists := resource.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host1", v.AsString()) + expectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool()) + requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource) require.Equal(t, 1, resource.ScopeMetrics().Len()) - require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len()) require.Equal(t, "TestCount1", resource.ScopeMetrics().At(0).Metrics().At(0).Name()) require.Equal(t, "TestCount2", resource.ScopeMetrics().At(0).Metrics().At(1).Name()) + requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1) + requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestCount2", 1) }, }, { @@ -142,21 +144,16 @@ func TestMetricBatcher(t *testing.T) { }, expect: func(t *testing.T, result pmetric.Metrics) { // Differences in attribute values should result in different resourceMetrics + requireMetricAndDataPointCounts(t, result, 2, 2) require.Equal(t, 2, result.ResourceMetrics().Len()) resource1 := result.ResourceMetrics().At(0) resource2 := result.ResourceMetrics().At(1) - v, exists := resource1.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host1", v.AsString()) - v, exists = resource2.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host1", v.AsString()) - v, exists = resource1.Resource().Attributes().Get("deployment.environment") - require.True(t, exists) - require.Equal(t, "dev", v.AsString()) - v, exists = resource2.Resource().Attributes().Get("deployment.environment") - require.True(t, exists) - require.Equal(t, "prod", v.AsString()) + + res1ExpectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool()) + requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource) + + res2ExpectedAttrs := tagsToAttributes([]string{"env:prod", "version:tag1"}, "Host1", newStringPool()) + requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource) require.Equal(t, 1, resource1.ScopeMetrics().Len()) require.Equal(t, 1, resource1.ScopeMetrics().Len()) @@ -167,8 +164,8 @@ func TestMetricBatcher(t *testing.T) { require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len()) require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len()) - require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name()) - require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name()) + requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1) + requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1) }, }, { @@ -203,22 +200,20 @@ func TestMetricBatcher(t *testing.T) { // The different metrics will fall under the same ResourceMetric and ScopeMetric // and there will be separate metrics under the ScopeMetric.Metrics() due to the different // data types + requireMetricAndDataPointCounts(t, result, 2, 2) require.Equal(t, 1, result.ResourceMetrics().Len()) resource := result.ResourceMetrics().At(0) - v, exists := resource.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host1", v.AsString()) + expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool()) + requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource) require.Equal(t, 1, resource.ScopeMetrics().Len()) - require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len()) - require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name()) require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(1).Name()) - require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type()) - require.Equal(t, pmetric.MetricTypeGauge, resource.ScopeMetrics().At(0).Metrics().At(1).Type()) + requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 1) + requireGauge(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestMetric", 1) }, }, { @@ -253,21 +248,16 @@ func TestMetricBatcher(t *testing.T) { // Same host, tags, and metric name but two different datapoints // should result in a single resourceMetric, scopeMetric, and metric // but two different datapoints under that metric + requireMetricAndDataPointCounts(t, result, 1, 2) require.Equal(t, 1, result.ResourceMetrics().Len()) resource := result.ResourceMetrics().At(0) - v, exists := resource.Resource().Attributes().Get("host.name") - require.True(t, exists) - require.Equal(t, "Host1", v.AsString()) + expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool()) + requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource) require.Equal(t, 1, resource.ScopeMetrics().Len()) - require.Equal(t, 1, resource.ScopeMetrics().At(0).Metrics().Len()) - - require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name()) - - require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type()) - require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len()) + requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 2) }, }, } diff --git a/receiver/datadogreceiver/internal/translator/series.go b/receiver/datadogreceiver/internal/translator/series.go index ca173c5ad2d7..569ad02919ce 100644 --- a/receiver/datadogreceiver/internal/translator/series.go +++ b/receiver/datadogreceiver/internal/translator/series.go @@ -4,8 +4,12 @@ package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator" import ( + "io" + "net/http" + "strings" "time" + "github.com/DataDog/agent-payload/v5/gogen" datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -23,6 +27,22 @@ type SeriesList struct { Series []datadogV1.Series `json:"series"` } +// TODO: add handling for JSON format in additional to protobuf? +func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gogen.MetricPayload_MetricSeries, err error) { + buf := GetBuffer() + defer PutBuffer(buf) + if _, err := io.Copy(buf, req.Body); err != nil { + return mp, err + } + + pl := new(gogen.MetricPayload) + if err := pl.Unmarshal(buf.Bytes()); err != nil { + return mp, err + } + + return pl.GetSeries(), nil +} + func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metrics { bt := newBatcher() @@ -87,3 +107,68 @@ func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metric } return bt.Metrics } + +func (mt *MetricsTranslator) TranslateSeriesV2(series []*gogen.MetricPayload_MetricSeries) pmetric.Metrics { + bt := newBatcher() + + for _, serie := range series { + var dps pmetric.NumberDataPointSlice + + // The V2 payload stores the host name under in the Resources field + resourceMap := getV2Resources(serie.Resources) + // TODO(jesus.vazquez) (Do this with string interning) + dimensions := parseSeriesProperties(serie.Metric, strings.ToLower(serie.Type.String()), serie.Tags, resourceMap["host"], mt.buildInfo.Version, mt.stringPool) + for k, v := range resourceMap { + if k == "host" { + continue // Host has already been added as a resource attribute in parseSeriesProperties(), so avoid duplicating that attribute + } + dimensions.resourceAttrs.PutStr(k, v) + } + dimensions.resourceAttrs.PutStr("source", serie.SourceTypeName) //TODO: check if this is correct handling of SourceTypeName field + metric, metricID := bt.Lookup(dimensions) + + switch serie.Type { + case gogen.MetricPayload_COUNT: + metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition + dps = metric.Sum().DataPoints() + case gogen.MetricPayload_GAUGE: + dps = metric.Gauge().DataPoints() + case gogen.MetricPayload_RATE: + metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) //TODO: verify that this is always the case + dps = metric.Sum().DataPoints() + case gogen.MetricPayload_UNSPECIFIED: + // Type is unset/unspecified + continue + } + + dps.EnsureCapacity(len(serie.Points)) + + for _, point := range serie.Points { + dp := dps.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(point.Timestamp * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds + dimensions.dpAttrs.CopyTo(dp.Attributes()) // TODO(jesus.vazquez) Review this copy + val := point.Value + if serie.Type == gogen.MetricPayload_RATE && serie.Interval != 0 { + val *= float64(serie.Interval) + } + dp.SetDoubleValue(val) + + stream := identity.OfStream(metricID, dp) + ts, ok := mt.streamHasTimestamp(stream) + if ok { + dp.SetStartTimestamp(ts) + } + mt.updateLastTsForStream(stream, dp.Timestamp()) + } + } + return bt.Metrics +} + +func getV2Resources(resources []*gogen.MetricPayload_Resource) map[string]string { + resourceMap := make(map[string]string) + for i := range resources { + resourceMap[resources[i].Type] = resources[i].Name + } + return resourceMap +} diff --git a/receiver/datadogreceiver/internal/translator/series_test.go b/receiver/datadogreceiver/internal/translator/series_test.go index 5c8483a97478..0f633a4729ec 100644 --- a/receiver/datadogreceiver/internal/translator/series_test.go +++ b/receiver/datadogreceiver/internal/translator/series_test.go @@ -4,9 +4,14 @@ package translator import ( + "bytes" + "io" + "net/http" "testing" + "github.com/DataDog/agent-payload/v5/gogen" "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -28,6 +33,64 @@ func testPointsToDatadogPoints(points []testPoint) [][]*float64 { } +func TestHandleMetricsPayloadV2(t *testing.T) { + tests := []struct { + name string + metricsPayload gogen.MetricPayload + expectedSeriesCount int + expectedPointsCounts []int + }{ + { + name: "v2", + metricsPayload: gogen.MetricPayload{ + Series: []*gogen.MetricPayload_MetricSeries{ + { + Resources: []*gogen.MetricPayload_Resource{ + { + Type: "host", + Name: "Host1", + }, + }, + Metric: "system.load.1", + Tags: []string{"env:test"}, + Points: []*gogen.MetricPayload_MetricPoint{ + { + Timestamp: 1636629071, + Value: 1.5, + }, + { + Timestamp: 1636629081, + Value: 2.0, + }, + }, + Type: gogen.MetricPayload_COUNT, + }, + }, + }, + expectedSeriesCount: 1, + expectedPointsCounts: []int{2}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pb, err := tt.metricsPayload.Marshal() + require.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, "/api/v2/series", io.NopCloser(bytes.NewReader(pb))) + require.NoError(t, err) + mt := createMetricsTranslator() + series, err := mt.HandleSeriesV2Payload(req) + require.NoError(t, err) + require.NoError(t, err, "Failed to parse metrics payload") + require.Equal(t, tt.expectedSeriesCount, len(series)) + for i, s := range series { + require.Equal(t, tt.expectedPointsCounts[i], len(s.Points)) + } + }) + } +} + func TestTranslateSeriesV1(t *testing.T) { tests := []struct { name string @@ -56,19 +119,22 @@ func TestTranslateSeriesV1(t *testing.T) { }, }, expect: func(t *testing.T, result pmetric.Metrics) { - expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) - requireResourceMetrics(t, result, expectedResourceAttrs, 1) + requireMetricAndDataPointCounts(t, result, 1, 2) + + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) requireScopeMetrics(t, result, 1, 1) - requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - requireSum(t, metric, "TestCount", pmetric.AggregationTemporalityDelta, 2) + requireSum(t, metric, "TestCount", 2) dp := metric.Sum().DataPoints().At(0) - requireDp(t, dp, expectedDpAttrs, 1636629071, 0.5) + requireDp(t, dp, expectedAttrs.dp, 1636629071, 0.5) dp = metric.Sum().DataPoints().At(1) - requireDp(t, dp, expectedDpAttrs, 1636629081, 1.0) + requireDp(t, dp, expectedAttrs.dp, 1636629081, 1.0) }, }, { @@ -94,19 +160,22 @@ func TestTranslateSeriesV1(t *testing.T) { }, }, expect: func(t *testing.T, result pmetric.Metrics) { - expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) - requireResourceMetrics(t, result, expectedResourceAttrs, 1) + requireMetricAndDataPointCounts(t, result, 1, 2) + + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) requireScopeMetrics(t, result, 1, 1) - requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) requireGauge(t, metric, "TestGauge", 2) dp := metric.Gauge().DataPoints().At(0) - requireDp(t, dp, expectedDpAttrs, 1636629071, 2) + requireDp(t, dp, expectedAttrs.dp, 1636629071, 2) dp = metric.Gauge().DataPoints().At(1) - requireDp(t, dp, expectedDpAttrs, 1636629081, 3) + requireDp(t, dp, expectedAttrs.dp, 1636629081, 3) }, }, { @@ -132,19 +201,22 @@ func TestTranslateSeriesV1(t *testing.T) { }, }, expect: func(t *testing.T, result pmetric.Metrics) { - expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) - requireResourceMetrics(t, result, expectedResourceAttrs, 1) + requireMetricAndDataPointCounts(t, result, 1, 2) + + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) requireScopeMetrics(t, result, 1, 1) - requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - requireSum(t, metric, "TestRate", pmetric.AggregationTemporalityDelta, 2) + requireSum(t, metric, "TestRate", 2) dp := metric.Sum().DataPoints().At(0) - requireDp(t, dp, expectedDpAttrs, 1636629071, 2) + requireDp(t, dp, expectedAttrs.dp, 1636629071, 2) dp = metric.Sum().DataPoints().At(1) - requireDp(t, dp, expectedDpAttrs, 1636629081, 3) + requireDp(t, dp, expectedAttrs.dp, 1636629081, 3) }, }, } @@ -158,3 +230,206 @@ func TestTranslateSeriesV1(t *testing.T) { }) } } + +func TestTranslateSeriesV2(t *testing.T) { + tests := []struct { + name string + series []*gogen.MetricPayload_MetricSeries + expect func(t *testing.T, result pmetric.Metrics) + }{ + { + name: "Count metric", + series: []*gogen.MetricPayload_MetricSeries{ + { + Resources: []*gogen.MetricPayload_Resource{ + { + Type: "host", + Name: "Host1", + }, + }, + Metric: "TestCount", + Tags: []string{"env:tag1", "version:tag2"}, + Points: []*gogen.MetricPayload_MetricPoint{ + { + Timestamp: 1636629071, + Value: 0.5, + }, + { + Timestamp: 1636629081, + Value: 1.0, + }, + }, + Type: gogen.MetricPayload_COUNT, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + requireMetricAndDataPointCounts(t, result, 1, 2) + + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + expectedAttrs.resource.PutStr("source", "") + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) + requireScopeMetrics(t, result, 1, 1) + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireSum(t, metric, "TestCount", 2) + + dp := metric.Sum().DataPoints().At(0) + requireDp(t, dp, expectedAttrs.dp, 1636629071, 0.5) + + dp = metric.Sum().DataPoints().At(1) + requireDp(t, dp, expectedAttrs.dp, 1636629081, 1.0) + }, + }, + { + name: "Gauge metric", + series: []*gogen.MetricPayload_MetricSeries{ + { + Resources: []*gogen.MetricPayload_Resource{ + { + Type: "host", + Name: "Host1", + }, + }, + Metric: "TestGauge", + Tags: []string{"env:tag1", "version:tag2"}, + Points: []*gogen.MetricPayload_MetricPoint{ + { + Timestamp: 1636629071, + Value: 2, + }, + { + Timestamp: 1636629081, + Value: 3, + }, + }, + Type: gogen.MetricPayload_GAUGE, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + requireMetricAndDataPointCounts(t, result, 1, 2) + + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + expectedAttrs.resource.PutStr("source", "") + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) + requireScopeMetrics(t, result, 1, 1) + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireGauge(t, metric, "TestGauge", 2) + + dp := metric.Gauge().DataPoints().At(0) + requireDp(t, dp, expectedAttrs.dp, 1636629071, 2) + + dp = metric.Gauge().DataPoints().At(1) + requireDp(t, dp, expectedAttrs.dp, 1636629081, 3) + }, + }, + { + name: "Rate metric", + series: []*gogen.MetricPayload_MetricSeries{ + { + Resources: []*gogen.MetricPayload_Resource{ + { + Type: "host", + Name: "Host1", + }, + }, + Metric: "TestRate", + Tags: []string{"env:tag1", "version:tag2"}, + Points: []*gogen.MetricPayload_MetricPoint{ + { + Timestamp: 1636629071, + Value: 2, + }, + { + Timestamp: 1636629081, + Value: 3, + }, + }, + Type: gogen.MetricPayload_RATE, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + requireMetricAndDataPointCounts(t, result, 1, 2) + + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + expectedAttrs.resource.PutStr("source", "") + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) + requireScopeMetrics(t, result, 1, 1) + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireSum(t, metric, "TestRate", 2) + + dp := metric.Sum().DataPoints().At(0) + requireDp(t, dp, expectedAttrs.dp, 1636629071, 2) + + dp = metric.Sum().DataPoints().At(1) + requireDp(t, dp, expectedAttrs.dp, 1636629081, 3) + }, + }, + { + name: "Unspecified metric type", + series: []*gogen.MetricPayload_MetricSeries{ + { + Resources: []*gogen.MetricPayload_Resource{ + { + Type: "host", + Name: "Host1", + }, + }, + Metric: "TestUnspecified", + Tags: []string{"env:tag1", "version:tag2"}, + Points: []*gogen.MetricPayload_MetricPoint{ + { + Timestamp: 1636629071, + Value: 2, + }, + { + Timestamp: 1636629081, + Value: 3, + }, + }, + Type: gogen.MetricPayload_UNSPECIFIED, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + requireMetricAndDataPointCounts(t, result, 1, 0) + + require.Equal(t, 1, result.ResourceMetrics().Len()) + v, exists := result.ResourceMetrics().At(0).Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + v, exists = result.ResourceMetrics().At(0).Resource().Attributes().Get("deployment.environment") + require.True(t, exists) + require.Equal(t, "tag1", v.AsString()) + v, exists = result.ResourceMetrics().At(0).Resource().Attributes().Get("service.version") + require.True(t, exists) + require.Equal(t, "tag2", v.AsString()) + + require.Equal(t, 1, result.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, 1, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + + require.Equal(t, "otelcol/datadogreceiver", result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, component.NewDefaultBuildInfo().Version, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Version()) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + require.Equal(t, "TestUnspecified", metric.Name()) + require.Equal(t, pmetric.MetricTypeEmpty, metric.Type()) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + result := mt.TranslateSeriesV2(tt.series) + + tt.expect(t, result) + }) + } +} diff --git a/receiver/datadogreceiver/internal/translator/tags.go b/receiver/datadogreceiver/internal/translator/tags.go index eec8b55ba8bb..e2cfc3da4f80 100644 --- a/receiver/datadogreceiver/internal/translator/tags.go +++ b/receiver/datadogreceiver/internal/translator/tags.go @@ -116,13 +116,21 @@ func (s *StringPool) Intern(str string) string { return interned } -func tagsToAttributes(tags []string, host string, stringPool *StringPool) (pcommon.Map, pcommon.Map, pcommon.Map) { - resourceAttrs := pcommon.NewMap() - scopeAttrs := pcommon.NewMap() - dpAttrs := pcommon.NewMap() +type attributes struct { + resource pcommon.Map + scope pcommon.Map + dp pcommon.Map +} + +func tagsToAttributes(tags []string, host string, stringPool *StringPool) attributes { + attrs := attributes{ + resource: pcommon.NewMap(), + scope: pcommon.NewMap(), + dp: pcommon.NewMap(), + } if host != "" { - resourceAttrs.PutStr(semconv.AttributeHostName, host) + attrs.resource.PutStr(semconv.AttributeHostName, host) } var key, val string @@ -130,13 +138,13 @@ func tagsToAttributes(tags []string, host string, stringPool *StringPool) (pcomm key, val = translateDatadogTagToKeyValuePair(tag) if attr, ok := datadogKnownResourceAttributes[key]; ok { val = stringPool.Intern(val) // No need to intern the key if we already have it - resourceAttrs.PutStr(attr, val) + attrs.resource.PutStr(attr, val) } else { key = stringPool.Intern(translateDatadogKeyToOTel(key)) val = stringPool.Intern(val) - dpAttrs.PutStr(key, val) + attrs.dp.PutStr(key, val) } } - return resourceAttrs, scopeAttrs, dpAttrs + return attrs } diff --git a/receiver/datadogreceiver/internal/translator/tags_test.go b/receiver/datadogreceiver/internal/translator/tags_test.go index b1ec4729ac33..f7fbff04d296 100644 --- a/receiver/datadogreceiver/internal/translator/tags_test.go +++ b/receiver/datadogreceiver/internal/translator/tags_test.go @@ -68,30 +68,30 @@ func TestGetMetricAttributes(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { pool := newStringPool() - resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(c.tags, c.host, pool) + attrs := tagsToAttributes(c.tags, c.host, pool) - assert.Equal(t, c.expectedResourceAttrs.Len(), resourceAttrs.Len()) + assert.Equal(t, c.expectedResourceAttrs.Len(), attrs.resource.Len()) c.expectedResourceAttrs.Range(func(k string, _ pcommon.Value) bool { ev, _ := c.expectedResourceAttrs.Get(k) - av, ok := resourceAttrs.Get(k) + av, ok := attrs.resource.Get(k) assert.True(t, ok) assert.Equal(t, ev, av) return true }) - assert.Equal(t, c.expectedScopeAttrs.Len(), scopeAttrs.Len()) + assert.Equal(t, c.expectedScopeAttrs.Len(), attrs.scope.Len()) c.expectedScopeAttrs.Range(func(k string, _ pcommon.Value) bool { ev, _ := c.expectedScopeAttrs.Get(k) - av, ok := scopeAttrs.Get(k) + av, ok := attrs.scope.Get(k) assert.True(t, ok) assert.Equal(t, ev, av) return true }) - assert.Equal(t, c.expectedDpAttrs.Len(), dpAttrs.Len()) + assert.Equal(t, c.expectedDpAttrs.Len(), attrs.dp.Len()) c.expectedDpAttrs.Range(func(k string, _ pcommon.Value) bool { ev, _ := c.expectedDpAttrs.Get(k) - av, ok := dpAttrs.Get(k) + av, ok := attrs.dp.Get(k) assert.True(t, ok) assert.Equal(t, ev, av) return true @@ -99,7 +99,6 @@ func TestGetMetricAttributes(t *testing.T) { }) } - } func newMapFromKV(t *testing.T, kv map[string]any) pcommon.Map { diff --git a/receiver/datadogreceiver/internal/translator/testutil.go b/receiver/datadogreceiver/internal/translator/testutil.go index c4ee342ad241..986725c53596 100644 --- a/receiver/datadogreceiver/internal/translator/testutil.go +++ b/receiver/datadogreceiver/internal/translator/testutil.go @@ -12,6 +12,11 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) +const ( + scopeName string = "otelcol/datadogreceiver" + aggregationTemporality = pmetric.AggregationTemporalityDelta +) + func createMetricsTranslator() *MetricsTranslator { mt := NewMetricsTranslator(component.BuildInfo{ Command: "otelcol", @@ -21,26 +26,37 @@ func createMetricsTranslator() *MetricsTranslator { return mt } -func requireResourceMetrics(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Map, expectedLen int) { - require.Equal(t, expectedLen, result.ResourceMetrics().Len()) - require.Equal(t, expectedAttrs, result.ResourceMetrics().At(0).Resource().Attributes()) +func requireResourceAttributes(t *testing.T, attrs, expectedAttrs pcommon.Map) { + expectedAttrs.Range(func(k string, _ pcommon.Value) bool { + ev, _ := expectedAttrs.Get(k) + av, ok := attrs.Get(k) + require.True(t, ok) + require.Equal(t, ev, av) + return true + }) } +// nolint:unparam func requireScopeMetrics(t *testing.T, result pmetric.Metrics, expectedScopeMetricsLen, expectedMetricsLen int) { require.Equal(t, expectedScopeMetricsLen, result.ResourceMetrics().At(0).ScopeMetrics().Len()) require.Equal(t, expectedMetricsLen, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) } -func requireScope(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Map, expectedName, expectedVersion string) { - require.Equal(t, expectedName, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) +func requireScope(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Map, expectedVersion string) { + require.Equal(t, scopeName, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) require.Equal(t, expectedVersion, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Version()) require.Equal(t, expectedAttrs, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes()) } -func requireSum(t *testing.T, metric pmetric.Metric, expectedName string, expectedAggregationTemporality pmetric.AggregationTemporality, expectedDpsLen int) { +func requireMetricAndDataPointCounts(t *testing.T, result pmetric.Metrics, expectedMetricCount, expectedDpCount int) { + require.Equal(t, expectedMetricCount, result.MetricCount()) + require.Equal(t, expectedDpCount, result.DataPointCount()) +} + +func requireSum(t *testing.T, metric pmetric.Metric, expectedName string, expectedDpsLen int) { require.Equal(t, expectedName, metric.Name()) require.Equal(t, pmetric.MetricTypeSum, metric.Type()) - require.Equal(t, expectedAggregationTemporality, metric.Sum().AggregationTemporality()) + require.Equal(t, aggregationTemporality, metric.Sum().AggregationTemporality()) require.Equal(t, expectedDpsLen, metric.Sum().DataPoints().Len()) } diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index a00b7edd54eb..8920a912b40e 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -185,9 +185,25 @@ func (ddr *datadogReceiver) handleV2Series(w http.ResponseWriter, req *http.Requ ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err) }(&metricsCount) - err = fmt.Errorf("series v2 endpoint not implemented") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err)) + series, err := ddr.metricsTranslator.HandleSeriesV2Payload(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + ddr.params.Logger.Error(err.Error()) + return + } + + metrics := ddr.metricsTranslator.TranslateSeriesV2(series) + metricsCount = metrics.DataPointCount() + + err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err)) + return + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("OK")) } // handleCheckRun handles the service checks endpoint https://docs.datadoghq.com/api/latest/service-checks/ diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index f39377d1f07d..60b4ada499f2 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/DataDog/agent-payload/v5/gogen" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -168,3 +169,83 @@ func TestDatadogMetricsV1_EndToEnd(t *testing.T) { expectedEnvironment, _ := metric.Sum().DataPoints().At(0).Attributes().Get("environment") assert.Equal(t, "test", expectedEnvironment.AsString()) } + +func TestDatadogMetricsV2_EndToEnd(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" // Using a randomly assigned address + sink := new(consumertest.MetricsSink) + + dd, err := newDataDogReceiver( + cfg, + receivertest.NewNopSettings(), + ) + require.NoError(t, err, "Must not error when creating receiver") + dd.(*datadogReceiver).nextMetricsConsumer = sink + + require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, dd.Shutdown(context.Background())) + }() + + metricsPayloadV2 := gogen.MetricPayload{ + Series: []*gogen.MetricPayload_MetricSeries{ + { + Resources: []*gogen.MetricPayload_Resource{ + { + Type: "host", + Name: "Host1", + }, + }, + Metric: "system.load.1", + Tags: []string{"env:test"}, + Points: []*gogen.MetricPayload_MetricPoint{ + { + Timestamp: 1636629071, + Value: 1.5, + }, + { + Timestamp: 1636629081, + Value: 2.0, + }, + }, + Type: gogen.MetricPayload_COUNT, + }, + }, + } + + pb, err := metricsPayloadV2.Marshal() + assert.NoError(t, err) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("http://%s/api/v2/series", dd.(*datadogReceiver).address), + io.NopCloser(bytes.NewReader(pb)), + ) + require.NoError(t, err, "Must not error when creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "Must not error performing request") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body") + require.Equal(t, string(body), "OK", "Expected response to be 'OK', got %s", string(body)) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + + mds := sink.AllMetrics() + require.Len(t, mds, 1) + got := mds[0] + require.Equal(t, 1, got.ResourceMetrics().Len()) + metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 1, metrics.Len()) + metric := metrics.At(0) + assert.Equal(t, pmetric.MetricTypeSum, metric.Type()) + assert.Equal(t, "system.load.1", metric.Name()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, metric.Sum().AggregationTemporality()) + assert.Equal(t, false, metric.Sum().IsMonotonic()) + assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(0).Timestamp()) + assert.Equal(t, 1.5, metric.Sum().DataPoints().At(0).DoubleValue()) + assert.Equal(t, pcommon.Timestamp(0), metric.Sum().DataPoints().At(0).StartTimestamp()) + assert.Equal(t, pcommon.Timestamp(1636629081*1_000_000_000), metric.Sum().DataPoints().At(1).Timestamp()) + assert.Equal(t, 2.0, metric.Sum().DataPoints().At(1).DoubleValue()) + assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(1).StartTimestamp()) +} diff --git a/testbed/go.mod b/testbed/go.mod index c98565d448e8..0ddf05ed03c4 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -79,6 +79,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4 v4.3.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect github.com/Code-Hex/go-generics-cache v1.5.1 // indirect + github.com/DataDog/agent-payload/v5 v5.0.124 // indirect github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9 // indirect github.com/DataDog/datadog-agent/pkg/trace/exportable v0.0.0-20201016145401-4646cf596b02 // indirect github.com/DataDog/datadog-api-client-go/v2 v2.28.0 // indirect diff --git a/testbed/go.sum b/testbed/go.sum index c5d33407acd8..6de6ad8219be 100644 --- a/testbed/go.sum +++ b/testbed/go.sum @@ -59,6 +59,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU= github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= +github.com/DataDog/agent-payload/v5 v5.0.124 h1:m7MLSy8oyLQDT59oesWhZiSFDzV/Q9CWzhue7s2SFaw= +github.com/DataDog/agent-payload/v5 v5.0.124/go.mod h1:FgVQKmVdqdmZTbxIptqJC/l+xEzdiXsaAOs/vGAvWzs= github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9 h1:/j0jQpk3EkJ3F3x88LDhWKk6AxE+JXf1H3dDvZFtm80= github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9/go.mod h1:gHkSUTn6H6UEZQHY3XWBIGNjfI3Tdi0IxlrxIFBWDwU= github.com/DataDog/datadog-agent/pkg/trace/exportable v0.0.0-20201016145401-4646cf596b02 h1:N2BRKjJ/c+ipDwt5b+ijqEc2EsmK3zXq2lNeIPnSwMI=