Skip to content

Commit

Permalink
Fix cloudwatch error with multiple records
Browse files Browse the repository at this point in the history
  • Loading branch information
constanca-m committed Jan 9, 2025
1 parent f135ded commit 0cdfcbe
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ const (
)

var (
errInvalidRecords = errors.New("record format invalid")
errUnsupportedContentType = errors.New("content type not supported")
errInvalidFormatStart = errors.New("unable to decode data length from message")
errInvalidRecords = errors.New("record format invalid")
errUnknownLength = errors.New("unable to decode data length from message")
)

// Unmarshaler for the CloudWatch Log JSON record format.
Expand All @@ -37,12 +36,19 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
return &Unmarshaler{logger}
}

// isJSON returns true if record starts with { and ends with }
// isJSON returns true if record starts with { and ends with }. Ignores new lines at the end.
func isJSON(record []byte) bool {
if len(record) < 2 {
return false
}
return record[0] == '{' && record[len(record)-1] == '}'

// Remove all newlines at the end, if there are any
lastIndex := len(record) - 1
for lastIndex >= 0 && record[lastIndex] == '\n' {
lastIndex--
}

return lastIndex > 0 && record[0] == '{' && record[lastIndex] == '}'
}

// isCloudWatchLog checks if the data has the entries needed to be considered a cloudwatch log
Expand Down Expand Up @@ -132,23 +138,27 @@ func (u *Unmarshaler) UnmarshalLogs(records [][]byte) (plog.Logs, error) {
cloudwatchLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder)
for i, record := range records {
if isJSON(record) {
if isCloudWatchLog(record) {
if err := u.addCloudwatchLog(record, cloudwatchLogs, ld); err != nil {
for j, datum := range bytes.Split(record, []byte(recordDelimiter)) {
if isCloudWatchLog(datum) {
if err := u.addCloudwatchLog(datum, cloudwatchLogs, ld); err != nil {
u.logger.Error(
"Unable to unmarshal record to cloudwatch log",
zap.Error(err),
zap.Int("datum_index", j),
zap.Int("record_index", i),
)
}
} else {
u.logger.Error(
"Unable to unmarshal record to cloudwatch log",
zap.Error(err),
"Unsupported log type for JSON record",
zap.Int("datum_index", j),
zap.Int("record_index", i),
)
}
} else {
u.logger.Error(
"Unsupported log type",
zap.Int("record_index", i),
)
}
} else {
u.logger.Error(
"Unsupported log type",
"Unsupported log type for protobuf record",
zap.Int("record_index", i),
)
}
Expand All @@ -165,7 +175,7 @@ func (u *Unmarshaler) addOTLPMetric(record []byte, md pmetric.Metrics) error {
for pos < dataLen {
n, nLen := proto.DecodeVarint(record)
if nLen == 0 && n == 0 {
return errors.New("unable to decode data length from message")
return errUnknownLength
}
req := pmetricotlp.NewExportRequest()
pos += nLen
Expand All @@ -183,19 +193,23 @@ func (u *Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error
cloudwatchMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder)
for i, record := range records {
if isJSON(record) {
if isCloudwatchMetrics(record) {
if err := u.addCloudwatchMetric(record, cloudwatchMetrics, md); err != nil {
for j, datum := range bytes.Split(record, []byte(recordDelimiter)) {
if isCloudwatchMetrics(datum) {
if err := u.addCloudwatchMetric(datum, cloudwatchMetrics, md); err != nil {
u.logger.Error(
"Unable to unmarshal input",
zap.Error(err),
zap.Int("datum_index", j),
zap.Int("record_index", i),
)
}
} else {
u.logger.Error(
"Unable to unmarshal input",
zap.Error(err),
"Unsupported metric type for JSON record",
zap.Int("datum_index", j),
zap.Int("record_index", i),
)
}
} else {
u.logger.Error(
"Unsupported metric type",
zap.Int("record_index", i),
)
}
} else { // is protobuf
// OTLP metric is the only option currently supported
Expand All @@ -207,7 +221,7 @@ func (u *Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error
)
} else {
u.logger.Error(
"Unsupported metric type",
"Unsupported metric type for protobuf record",
zap.Int("record_index", i),
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package auto

import (
"bytes"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -61,13 +60,10 @@ func TestUnmarshalMetrics_JSON(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
data, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename))
record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename))
require.NoError(t, err)

var records [][]byte
for _, record := range bytes.Split(data, []byte("\n")) {
records = append(records, record)
}
records := [][]byte{record}

metrics, err := unmarshaler.UnmarshalMetrics(records)
require.Equal(t, testCase.err, err)
Expand All @@ -83,7 +79,7 @@ func TestUnmarshalMetrics_JSON(t *testing.T) {
func TestUnmarshalLogs_JSON(t *testing.T) {
t.Parallel()

unmarshaler := NewUnmarshaler(zap.NewNop())
unmarshaler := NewUnmarshaler(zap.NewExample())
testCases := map[string]struct {
dir string
filename string
Expand Down Expand Up @@ -124,13 +120,10 @@ func TestUnmarshalLogs_JSON(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
data, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename))
record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename))
require.NoError(t, err)

var records [][]byte
for _, record := range bytes.Split(data, []byte("\n")) {
records = append(records, record)
}
records := [][]byte{record}

logs, err := unmarshaler.UnmarshalLogs(records)
require.Equal(t, testCase.err, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"

import (
"bytes"
"encoding/json"
"errors"

Expand All @@ -14,7 +15,8 @@ import (
)

const (
TypeStr = "cwlogs"
TypeStr = "cwlogs"
recordDelimiter = "\n"
)

var errInvalidRecords = errors.New("record format invalid")
Expand All @@ -38,35 +40,38 @@ func (u Unmarshaler) UnmarshalLogs(records [][]byte) (plog.Logs, error) {
md := plog.NewLogs()
builders := make(map[ResourceAttributes]*ResourceLogsBuilder)
for recordIndex, record := range records {

var log CWLog
err := json.Unmarshal(record, &log)
if err != nil {
u.logger.Error(
"Unable to unmarshal input",
zap.Error(err),
zap.Int("record_index", recordIndex),
)
continue
}
if !u.isValid(log) {
u.logger.Error(
"Invalid log",
zap.Int("record_index", recordIndex),
)
continue
}
attrs := ResourceAttributes{
Owner: log.Owner,
LogGroup: log.LogGroup,
LogStream: log.LogStream,
}
lb, ok := builders[attrs]
if !ok {
lb = NewResourceLogsBuilder(md, attrs)
builders[attrs] = lb
for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) {
var log CWLog
err := json.Unmarshal(datum, &log)
if err != nil {
u.logger.Error(
"Unable to unmarshal input",
zap.Error(err),
zap.Int("datum_index", datumIndex),
zap.Int("record_index", recordIndex),
)
continue
}
if !u.isValid(log) {
u.logger.Error(
"Invalid log",
zap.Int("datum_index", datumIndex),
zap.Int("record_index", recordIndex),
)
continue
}
attrs := ResourceAttributes{
Owner: log.Owner,
LogGroup: log.LogGroup,
LogStream: log.LogStream,
}
lb, ok := builders[attrs]
if !ok {
lb = NewResourceLogsBuilder(md, attrs)
builders[attrs] = lb
}
lb.AddLog(log)
}
lb.AddLog(log)
}

if len(builders) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package cwlog

import (
"bytes"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -53,13 +52,10 @@ func TestUnmarshal(t *testing.T) {
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
data, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename))
record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename))
require.NoError(t, err)

var records [][]byte
for _, record := range bytes.Split(data, []byte("\n")) {
records = append(records, record)
}
records := [][]byte{record}

got, err := unmarshaler.UnmarshalLogs(records)
require.Equal(t, testCase.wantErr, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ type CWMetric struct {
// Timestamp is the milliseconds since epoch for
// the metric.
Timestamp int64 `json:"timestamp"`
// Value is the cWMetricValue, which has the min, max,
// Value is the CWMetricValue, which has the min, max,
// sum, and count.
Value *cWMetricValue `json:"value"`
Value *CWMetricValue `json:"value"`
// Unit is the unit for the metric.
//
// More details can be found at:
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
Unit string `json:"unit"`
}

// The cWMetricValue is the actual values of the CloudWatch metric.
type cWMetricValue struct {
// The CWMetricValue is the actual values of the CloudWatch metric.
type CWMetricValue struct {
// Max is the highest value observed.
Max float64 `json:"max"`
// Min is the lowest value observed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestResourceMetricsBuilder(t *testing.T) {
})
}

// testCWMetricValue is a convenience function for creating a test cWMetricValue
func testCWMetricValue() *cWMetricValue {
return &cWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))}
// testCWMetricValue is a convenience function for creating a test CWMetricValue
func testCWMetricValue() *CWMetricValue {
return &CWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))}
}
Loading

0 comments on commit 0cdfcbe

Please sign in to comment.