From 0cdfcbeacca98118f8b29ca9e38db2d3a6afeb12 Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Thu, 9 Jan 2025 10:39:47 +0100 Subject: [PATCH] Fix cloudwatch error with multiple records --- .../internal/unmarshaler/auto/unmarshaller.go | 66 +++++++++++------- .../unmarshaler/auto/unmarshaller_test.go | 17 ++--- .../internal/unmarshaler/cwlog/unmarshaler.go | 63 +++++++++-------- .../unmarshaler/cwlog/unmarshaler_test.go | 8 +-- .../unmarshaler/cwmetricstream/cwmetric.go | 8 +-- .../cwmetricstream/metricsbuilder_test.go | 6 +- .../unmarshaler/cwmetricstream/unmarshaler.go | 68 +++++++++++-------- .../cwmetricstream/unmarshaler_test.go | 8 +-- 8 files changed, 129 insertions(+), 115 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go index e64010b1bca4..6ffdc688621e 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go @@ -20,9 +20,8 @@ const ( ) var ( - errInvalidRecords = errors.New("record format invalid") - errUnsupportedContentType = errors.New("content type not supported") - errInvalidFormatStart = errors.New("unable to decode data length from message") + errInvalidRecords = errors.New("record format invalid") + errUnknownLength = errors.New("unable to decode data length from message") ) // Unmarshaler for the CloudWatch Log JSON record format. @@ -37,12 +36,19 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// isJSON returns true if record starts with { and ends with } +// isJSON returns true if record starts with { and ends with }. Ignores new lines at the end. func isJSON(record []byte) bool { if len(record) < 2 { return false } - return record[0] == '{' && record[len(record)-1] == '}' + + // Remove all newlines at the end, if there are any + lastIndex := len(record) - 1 + for lastIndex >= 0 && record[lastIndex] == '\n' { + lastIndex-- + } + + return lastIndex > 0 && record[0] == '{' && record[lastIndex] == '}' } // isCloudWatchLog checks if the data has the entries needed to be considered a cloudwatch log @@ -132,23 +138,27 @@ func (u *Unmarshaler) UnmarshalLogs(records [][]byte) (plog.Logs, error) { cloudwatchLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder) for i, record := range records { if isJSON(record) { - if isCloudWatchLog(record) { - if err := u.addCloudwatchLog(record, cloudwatchLogs, ld); err != nil { + for j, datum := range bytes.Split(record, []byte(recordDelimiter)) { + if isCloudWatchLog(datum) { + if err := u.addCloudwatchLog(datum, cloudwatchLogs, ld); err != nil { + u.logger.Error( + "Unable to unmarshal record to cloudwatch log", + zap.Error(err), + zap.Int("datum_index", j), + zap.Int("record_index", i), + ) + } + } else { u.logger.Error( - "Unable to unmarshal record to cloudwatch log", - zap.Error(err), + "Unsupported log type for JSON record", + zap.Int("datum_index", j), zap.Int("record_index", i), ) } - } else { - u.logger.Error( - "Unsupported log type", - zap.Int("record_index", i), - ) } } else { u.logger.Error( - "Unsupported log type", + "Unsupported log type for protobuf record", zap.Int("record_index", i), ) } @@ -165,7 +175,7 @@ func (u *Unmarshaler) addOTLPMetric(record []byte, md pmetric.Metrics) error { for pos < dataLen { n, nLen := proto.DecodeVarint(record) if nLen == 0 && n == 0 { - return errors.New("unable to decode data length from message") + return errUnknownLength } req := pmetricotlp.NewExportRequest() pos += nLen @@ -183,19 +193,23 @@ func (u *Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error cloudwatchMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder) for i, record := range records { if isJSON(record) { - if isCloudwatchMetrics(record) { - if err := u.addCloudwatchMetric(record, cloudwatchMetrics, md); err != nil { + for j, datum := range bytes.Split(record, []byte(recordDelimiter)) { + if isCloudwatchMetrics(datum) { + if err := u.addCloudwatchMetric(datum, cloudwatchMetrics, md); err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", j), + zap.Int("record_index", i), + ) + } + } else { u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), + "Unsupported metric type for JSON record", + zap.Int("datum_index", j), zap.Int("record_index", i), ) } - } else { - u.logger.Error( - "Unsupported metric type", - zap.Int("record_index", i), - ) } } else { // is protobuf // OTLP metric is the only option currently supported @@ -207,7 +221,7 @@ func (u *Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error ) } else { u.logger.Error( - "Unsupported metric type", + "Unsupported metric type for protobuf record", zap.Int("record_index", i), ) } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go index 6c5f7f04b9b3..5a47d4c97e1b 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go @@ -1,7 +1,6 @@ package auto import ( - "bytes" "os" "path/filepath" "testing" @@ -61,13 +60,10 @@ func TestUnmarshalMetrics_JSON(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - data, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) + record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) require.NoError(t, err) - var records [][]byte - for _, record := range bytes.Split(data, []byte("\n")) { - records = append(records, record) - } + records := [][]byte{record} metrics, err := unmarshaler.UnmarshalMetrics(records) require.Equal(t, testCase.err, err) @@ -83,7 +79,7 @@ func TestUnmarshalMetrics_JSON(t *testing.T) { func TestUnmarshalLogs_JSON(t *testing.T) { t.Parallel() - unmarshaler := NewUnmarshaler(zap.NewNop()) + unmarshaler := NewUnmarshaler(zap.NewExample()) testCases := map[string]struct { dir string filename string @@ -124,13 +120,10 @@ func TestUnmarshalLogs_JSON(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - data, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) + record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) require.NoError(t, err) - var records [][]byte - for _, record := range bytes.Split(data, []byte("\n")) { - records = append(records, record) - } + records := [][]byte{record} logs, err := unmarshaler.UnmarshalLogs(records) require.Equal(t, testCase.err, err) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index 9e68a4cb7567..60b1e24bac48 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -4,6 +4,7 @@ package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" import ( + "bytes" "encoding/json" "errors" @@ -14,7 +15,8 @@ import ( ) const ( - TypeStr = "cwlogs" + TypeStr = "cwlogs" + recordDelimiter = "\n" ) var errInvalidRecords = errors.New("record format invalid") @@ -38,35 +40,38 @@ func (u Unmarshaler) UnmarshalLogs(records [][]byte) (plog.Logs, error) { md := plog.NewLogs() builders := make(map[ResourceAttributes]*ResourceLogsBuilder) for recordIndex, record := range records { - - var log CWLog - err := json.Unmarshal(record, &log) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("record_index", recordIndex), - ) - continue - } - if !u.isValid(log) { - u.logger.Error( - "Invalid log", - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := ResourceAttributes{ - Owner: log.Owner, - LogGroup: log.LogGroup, - LogStream: log.LogStream, - } - lb, ok := builders[attrs] - if !ok { - lb = NewResourceLogsBuilder(md, attrs) - builders[attrs] = lb + for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { + var log CWLog + err := json.Unmarshal(datum, &log) + if err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + if !u.isValid(log) { + u.logger.Error( + "Invalid log", + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + attrs := ResourceAttributes{ + Owner: log.Owner, + LogGroup: log.LogGroup, + LogStream: log.LogStream, + } + lb, ok := builders[attrs] + if !ok { + lb = NewResourceLogsBuilder(md, attrs) + builders[attrs] = lb + } + lb.AddLog(log) } - lb.AddLog(log) } if len(builders) == 0 { diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go index 367c7a9ab257..6b3a90244477 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go @@ -4,7 +4,6 @@ package cwlog import ( - "bytes" "os" "path/filepath" "testing" @@ -53,13 +52,10 @@ func TestUnmarshal(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - data, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) + record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) require.NoError(t, err) - var records [][]byte - for _, record := range bytes.Split(data, []byte("\n")) { - records = append(records, record) - } + records := [][]byte{record} got, err := unmarshaler.UnmarshalLogs(records) require.Equal(t, testCase.wantErr, err) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go index 08da27978801..0426fc81ad8c 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go @@ -24,9 +24,9 @@ type CWMetric struct { // Timestamp is the milliseconds since epoch for // the metric. Timestamp int64 `json:"timestamp"` - // Value is the cWMetricValue, which has the min, max, + // Value is the CWMetricValue, which has the min, max, // sum, and count. - Value *cWMetricValue `json:"value"` + Value *CWMetricValue `json:"value"` // Unit is the unit for the metric. // // More details can be found at: @@ -34,8 +34,8 @@ type CWMetric struct { Unit string `json:"unit"` } -// The cWMetricValue is the actual values of the CloudWatch metric. -type cWMetricValue struct { +// The CWMetricValue is the actual values of the CloudWatch metric. +type CWMetricValue struct { // Max is the highest value observed. Max float64 `json:"max"` // Min is the lowest value observed. diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go index 33f8deda207d..4a716df79a51 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go @@ -206,7 +206,7 @@ func TestResourceMetricsBuilder(t *testing.T) { }) } -// testCWMetricValue is a convenience function for creating a test cWMetricValue -func testCWMetricValue() *cWMetricValue { - return &cWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))} +// testCWMetricValue is a convenience function for creating a test CWMetricValue +func testCWMetricValue() *CWMetricValue { + return &CWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))} } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go index 75924b14de5e..d404b97b41c0 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go @@ -4,6 +4,7 @@ package cwmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" import ( + "bytes" "encoding/json" "errors" "go.opentelemetry.io/collector/pdata/pmetric" @@ -13,7 +14,8 @@ import ( ) const ( - TypeStr = "cwmetrics" + TypeStr = "cwmetrics" + recordDelimiter = "\n" ) var errInvalidRecords = errors.New("record format invalid") @@ -40,35 +42,43 @@ func (u Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error) md := pmetric.NewMetrics() builders := make(map[ResourceAttributes]*ResourceMetricsBuilder) for recordIndex, record := range records { - var metric CWMetric - err := json.Unmarshal(record, &metric) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("record_index", recordIndex), - ) - continue + // In a CloudWatch metric stream that uses the JSON format, + // each Firehose record contains multiple JSON objects separated + // by a newline character (\n). Each object includes a single data + // point of a single metric. + for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { + var metric CWMetric + err := json.Unmarshal(datum, &metric) + if err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + if !u.isValid(metric) { + u.logger.Error( + "Invalid metric", + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + attrs := ResourceAttributes{ + MetricStreamName: metric.MetricStreamName, + Namespace: metric.Namespace, + AccountID: metric.AccountID, + Region: metric.Region, + } + mb, ok := builders[attrs] + if !ok { + mb = NewResourceMetricsBuilder(md, attrs) + builders[attrs] = mb + } + mb.AddMetric(metric) } - if !u.isValid(metric) { - u.logger.Error( - "Invalid metric", - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := ResourceAttributes{ - MetricStreamName: metric.MetricStreamName, - Namespace: metric.Namespace, - AccountID: metric.AccountID, - Region: metric.Region, - } - mb, ok := builders[attrs] - if !ok { - mb = NewResourceMetricsBuilder(md, attrs) - builders[attrs] = mb - } - mb.AddMetric(metric) } if len(builders) == 0 { diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go index 7d3cf9ce9be2..963a0ddb746a 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go @@ -4,7 +4,6 @@ package cwmetricstream import ( - "bytes" "os" "path/filepath" "testing" @@ -52,13 +51,10 @@ func TestUnmarshal(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - data, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) + record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) require.NoError(t, err) - var records [][]byte - for _, record := range bytes.Split(data, []byte("\n")) { - records = append(records, record) - } + records := [][]byte{record} got, err := unmarshaler.UnmarshalMetrics(records) require.Equal(t, testCase.wantErr, err)