From 45ffc039d625944cacd98d2d2d2ef8b49a79e88b Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Mon, 9 Sep 2024 08:42:15 +0100 Subject: [PATCH] [chore] [receiver/datadog] Add support for Service Checks (#34474) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Description: This PR adds support for Datadog Service Checks. Follow up of https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33631 , https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33957 and https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/34180. The full version of the code can be found in the cedwards/datadog-metrics-receiver-full branch, or in Grafana Alloy: https://github.com/grafana/alloy/tree/main/internal/etc/datadogreceiver Link to tracking Issue: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18278 Testing: Unit tests, as well as an end-to-end test, have been added. --------- Signed-off-by: alexgreenbank Co-authored-by: Carrie Edwards Co-authored-by: Juraci Paixão Kröhling --- .../translator/service_check_translator.go | 50 +++ .../service_check_translator_test.go | 322 ++++++++++++++++++ receiver/datadogreceiver/receiver.go | 32 +- receiver/datadogreceiver/receiver_test.go | 60 ++++ 4 files changed, 461 insertions(+), 3 deletions(-) create mode 100644 receiver/datadogreceiver/internal/translator/service_check_translator.go create mode 100644 receiver/datadogreceiver/internal/translator/service_check_translator_test.go diff --git a/receiver/datadogreceiver/internal/translator/service_check_translator.go b/receiver/datadogreceiver/internal/translator/service_check_translator.go new file mode 100644 index 000000000000..4cbd20a0ba6b --- /dev/null +++ b/receiver/datadogreceiver/internal/translator/service_check_translator.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator" + +import ( + "time" + + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type ServiceCheck struct { + Check string `json:"check"` + HostName string `json:"host_name"` + Status datadogV1.ServiceCheckStatus `json:"status"` + Timestamp int64 `json:"timestamp,omitempty"` + Tags []string `json:"tags,omitempty"` +} + +// More information on Datadog service checks: https://docs.datadoghq.com/api/latest/service-checks/ +func (mt *MetricsTranslator) TranslateServices(services []ServiceCheck) pmetric.Metrics { + bt := newBatcher() + bt.Metrics = pmetric.NewMetrics() + + for _, service := range services { + metricProperties := parseSeriesProperties("service_check", "service_check", service.Tags, service.HostName, mt.buildInfo.Version, mt.stringPool) + metric, metricID := bt.Lookup(metricProperties) // TODO(alexg): proper name + + dps := metric.Gauge().DataPoints() + dps.EnsureCapacity(1) + + dp := dps.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(service.Timestamp * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds + metricProperties.dpAttrs.CopyTo(dp.Attributes()) + dp.SetIntValue(int64(service.Status)) + + // TODO(alexg): Do this stream thing for service check metrics? + stream := identity.OfStream(metricID, dp) + ts, ok := mt.streamHasTimestamp(stream) + if ok { + dp.SetStartTimestamp(ts) + } + mt.updateLastTsForStream(stream, dp.Timestamp()) + } + return bt.Metrics +} diff --git a/receiver/datadogreceiver/internal/translator/service_check_translator_test.go b/receiver/datadogreceiver/internal/translator/service_check_translator_test.go new file mode 100644 index 000000000000..66abea5d1fbc --- /dev/null +++ b/receiver/datadogreceiver/internal/translator/service_check_translator_test.go @@ -0,0 +1,322 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translator + +import ( + "encoding/json" + "testing" + + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +var ( + testTimestamp = int64(1700000000) +) + +func TestHandleStructureParsing(t *testing.T) { + tests := []struct { + name string + checkRunPayload []byte + expectedServices []ServiceCheck + }{ + { + name: "happy", + checkRunPayload: []byte(`[ + { + "check": "datadog.agent.check_status", + "host_name": "hosta", + "status": 0, + "message": "", + "tags": [ + "check:container" + ] + }, + { + "check": "app.working", + "host_name": "hosta", + "timestamp": 1700000000, + "status": 0, + "message": "", + "tags": null + }, + { + "check": "env.test", + "host_name": "hosta", + "status": 0, + "message": "", + "tags": [ + "env:argle", "foo:bargle" + ] + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "datadog.agent.check_status", + HostName: "hosta", + Status: 0, + Tags: []string{"check:container"}, + }, + { + Check: "app.working", + HostName: "hosta", + Status: 0, + Timestamp: 1700000000, + }, + { + Check: "env.test", + HostName: "hosta", + Status: 0, + Tags: []string{"env:argle", "foo:bargle"}, + }, + }, + }, + { + name: "happy no tags", + checkRunPayload: []byte(`[ + { + "check": "app.working", + "host_name": "hosta", + "timestamp": 1700000000, + "status": 0, + "message": "", + "tags": null + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "app.working", + HostName: "hosta", + Status: 0, + Timestamp: 1700000000, + }, + }, + }, + { + name: "happy no timestamp", + checkRunPayload: []byte(`[ + { + "check": "env.test", + "host_name": "hosta", + "status": 0, + "message": "", + "tags": [ + "env:argle", "foo:bargle" + ] + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "env.test", + HostName: "hosta", + Status: 0, + Tags: []string{"env:argle", "foo:bargle"}, + }, + }, + }, + { + name: "empty", + checkRunPayload: []byte(`[]`), + expectedServices: []ServiceCheck{}, + }, + { + name: "happy no hostname", + checkRunPayload: []byte(`[ + { + "check": "env.test", + "status": 0, + "message": "", + "tags": [ + "env:argle", "foo:bargle" + ] + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "env.test", + Status: 0, + Tags: []string{"env:argle", "foo:bargle"}, + }, + }, + }, + { + name: "empty", + checkRunPayload: []byte(`[]`), + expectedServices: []ServiceCheck{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var services []ServiceCheck + err := json.Unmarshal(tt.checkRunPayload, &services) + require.NoError(t, err, "Failed to unmarshal service payload JSON") + assert.Equal(t, tt.expectedServices, services, "Parsed series does not match expected series") + }) + } +} + +func TestTranslateCheckRun(t *testing.T) { + tests := []struct { + name string + services []ServiceCheck + expect func(t *testing.T, result pmetric.Metrics) + }{ + { + name: "OK status, with TS, no tags, no hostname", + services: []ServiceCheck{ + { + Check: "app.working", + Timestamp: 1700000000, + Status: datadogV1.SERVICECHECKSTATUS_OK, + Tags: []string{}, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedAttrs := tagsToAttributes([]string{}, "", newStringPool()) + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireGauge(t, metric, "service_check", 1) + + dp := metric.Gauge().DataPoints().At(0) + requireDp(t, dp, expectedAttrs.dp, 1700000000, 0) + }, + }, + { + name: "OK status, no TS", + services: []ServiceCheck{ + { + Check: "app.working", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_OK, + Tags: []string{"env:tag1", "version:tag2"}, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "foo", newStringPool()) + require.Equal(t, 1, result.ResourceMetrics().Len()) + requireResourceAttributes(t, result.ResourceMetrics().At(0).Resource().Attributes(), expectedAttrs.resource) + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + + requireScope(t, result, expectedAttrs.scope, component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireGauge(t, metric, "service_check", 1) + + dp := metric.Gauge().DataPoints().At(0) + requireDp(t, dp, expectedAttrs.dp, 0, 0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + mt.buildInfo = component.BuildInfo{ + Command: "otelcol", + Description: "OpenTelemetry Collector", + Version: "latest", + } + result := mt.TranslateServices(tt.services) + + tt.expect(t, result) + }) + } +} + +func TestTranslateCheckRunStatuses(t *testing.T) { + tests := []struct { + name string + services []ServiceCheck + expectedStatus int64 + }{ + { + name: "OK status, no TS", + services: []ServiceCheck{ + { + Check: "app.working", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_OK, + Tags: []string{"env:tag1", "version:tag2"}, + }, + }, + expectedStatus: 0, + }, + { + name: "Warning status", + services: []ServiceCheck{ + { + Check: "app.warning", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_WARNING, + Tags: []string{"env:tag1", "version:tag2"}, + Timestamp: testTimestamp, + }, + }, + expectedStatus: 1, + }, + { + name: "Critical status", + services: []ServiceCheck{ + { + Check: "app.critical", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_CRITICAL, + Tags: []string{"env:tag1", "version:tag2"}, + Timestamp: testTimestamp, + }, + }, + expectedStatus: 2, + }, + { + name: "Unknown status", + services: []ServiceCheck{ + { + Check: "app.unknown", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_UNKNOWN, + Tags: []string{"env:tag1", "version:tag2"}, + Timestamp: testTimestamp, + }, + }, + expectedStatus: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + mt.buildInfo = component.BuildInfo{ + Command: "otelcol", + Description: "OpenTelemetry Collector", + Version: "latest", + } + result := mt.TranslateServices(tt.services) + + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + + requireScopeMetrics(t, result, 1, 1) + + requireScope(t, result, pcommon.NewMap(), component.NewDefaultBuildInfo().Version) + + metrics := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + assert.Equal(t, tt.expectedStatus, metric.Gauge().DataPoints().At(0).IntValue()) + } + }) + } +} diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index 2a3650326ba2..f605d53c6125 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -324,9 +324,35 @@ func (ddr *datadogReceiver) handleCheckRun(w http.ResponseWriter, req *http.Requ ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err) }(&metricsCount) - err = fmt.Errorf("service checks endpoint not implemented") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err)) + buf := translator.GetBuffer() + defer translator.PutBuffer(buf) + if _, err = io.Copy(buf, req.Body); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error(err.Error()) + return + } + + var services []translator.ServiceCheck + + err = json.Unmarshal(buf.Bytes(), &services) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + ddr.params.Logger.Error(err.Error()) + return + } + + metrics := ddr.metricsTranslator.TranslateServices(services) + metricsCount = metrics.DataPointCount() + + err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err)) + return + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("OK")) } // handleSketches handles sketches, the underlying data structure of distributions https://docs.datadoghq.com/metrics/distributions/ diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index e172735c3bcf..e352a82851f1 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -485,3 +485,63 @@ func TestStats_EndToEnd(t *testing.T) { assert.NoError(t, err) } + +func TestDatadogServices_EndToEnd(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" // Using a randomly assigned address + sink := new(consumertest.MetricsSink) + + dd, err := newDataDogReceiver( + cfg, + receivertest.NewNopSettings(), + ) + require.NoError(t, err, "Must not error when creating receiver") + dd.(*datadogReceiver).nextMetricsConsumer = sink + + require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, dd.Shutdown(context.Background())) + }() + + servicesPayload := []byte(`[ + { + "check": "app.working", + "host_name": "hosta", + "status": 2, + "tags": ["environment:test"] + } + ]`) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("http://%s/api/v1/check_run", dd.(*datadogReceiver).address), + io.NopCloser(bytes.NewReader(servicesPayload)), + ) + require.NoError(t, err, "Must not error when creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "Must not error performing request") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body") + require.Equal(t, "OK", string(body), "Expected response to be 'OK', got %s", string(body)) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + + mds := sink.AllMetrics() + require.Len(t, mds, 1) + got := mds[0] + require.Equal(t, 1, got.ResourceMetrics().Len()) + metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 1, metrics.Len()) + metric := metrics.At(0) + assert.Equal(t, pmetric.MetricTypeGauge, metric.Type()) + dps := metric.Gauge().DataPoints() + assert.Equal(t, 1, dps.Len()) + dp := dps.At(0) + assert.Equal(t, int64(2), dp.IntValue()) + assert.Equal(t, 1, dp.Attributes().Len()) + environment, _ := dp.Attributes().Get("environment") + assert.Equal(t, "test", environment.AsString()) + hostName, _ := got.ResourceMetrics().At(0).Resource().Attributes().Get("host.name") + assert.Equal(t, "hosta", hostName.AsString()) +}