Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exports Entity for awsemfexporter plugins on PutLogEvent calls #233

Merged
merged 45 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
644b0fa
adds sdk changes to cw logs components
dchappa Aug 22, 2024
ce92f5e
adds entity on export
dchappa Aug 28, 2024
83df5a3
adds debug statements
dchappa Aug 30, 2024
d184f65
adds more log lines in newEventBatch function
dchappa Sep 3, 2024
9c15961
fixes other newEventBatch calls
dchappa Sep 3, 2024
525100f
adds type assertion logging to ensure type is the same when being sent
dchappa Sep 3, 2024
ec8e74e
verifies current entity instead of previous
dchappa Sep 3, 2024
36dbeac
adds entity when creating new log pusher
dchappa Sep 3, 2024
c4ca2a5
fills entity fields with certain known otel attributes
dchappa Sep 6, 2024
9e883bd
log lines for resourceAttributes
dchappa Sep 6, 2024
882eb11
hardcodes otel attributes to ensure entity population works
dchappa Sep 6, 2024
bbcf3bf
temporarily comments out attribute removal
dchappa Sep 6, 2024
fec89d2
removes asg temporarily
dchappa Sep 6, 2024
96bc334
adds keyAttributes after changing test app_signals config
dchappa Sep 6, 2024
b4f0e3d
reimplement removal of the resource attributes
dchappa Sep 6, 2024
345ea32
refactors fetching of entity fields
dchappa Sep 6, 2024
57a5953
adds resourceMutex for concurrency issue to the resource Attributes
dchappa Sep 6, 2024
7272016
copies atrribute map over to a new map to enable mutations
dchappa Sep 6, 2024
f9e02bb
fix for assigning variables
dchappa Sep 6, 2024
5e2a295
addresses request size error with
dchappa Sep 6, 2024
3beae3c
adds normal attributes, refactors
dchappa Sep 6, 2024
c0b2081
fixes helper function
dchappa Sep 6, 2024
c87246b
fixes attribute map
dchappa Sep 6, 2024
dc69644
removes log lines, fixes unit tests
dchappa Sep 9, 2024
d8030e9
fixes lint issues, stray log lines
dchappa Sep 9, 2024
bc5ceae
fixes linting, testing
dchappa Sep 9, 2024
81300e3
removes commented line
dchappa Sep 9, 2024
33c203e
fixes lint for comment
dchappa Sep 9, 2024
ee93dd9
fixes more linting issues
dchappa Sep 9, 2024
7fd20c4
adds comment explaining processAttributes function
dchappa Sep 10, 2024
5a08c19
adds more to unit tests, fixes variable name
dchappa Sep 12, 2024
8c22b7d
address lint check finding for error check in test
dchappa Sep 12, 2024
3cbf1d6
adds attributes for resource types. Refactors pusher map logic to be …
dchappa Sep 17, 2024
56b7329
attempting to fix the resource attributes issue.
dchappa Sep 17, 2024
820e414
fixes tests, and bug with fetching existing pusher
dchappa Sep 17, 2024
76f1547
adds debug line for entity
dchappa Sep 17, 2024
776ac6a
Revert "adds debug line for entity"
dchappa Sep 17, 2024
0742057
fixes lint check
dchappa Sep 17, 2024
f6d5d88
fixes lint check
dchappa Sep 17, 2024
b6bb072
adds unit tests for bounded pusher map
dchappa Sep 18, 2024
2eb0318
lint issues
dchappa Sep 18, 2024
e81b01d
fixes logic to remove entity attributes
dchappa Sep 18, 2024
84eb70f
reverts entity attribute refactoring
dchappa Sep 18, 2024
a1cd2fc
lint check. Adds breadcrumb to update hash function
dchappa Sep 18, 2024
00e53b0
removes bounded pusher map in favor of lru. Adds some unit tests and …
dchappa Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

type emfExporter struct {
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
pusherMap map[string]cwlogs.Pusher
dchappa marked this conversation as resolved.
Show resolved Hide resolved
svcStructuredLog *cwlogs.Client
config *Config

Expand Down Expand Up @@ -86,7 +86,7 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
pusherMap: map[string]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
}

Expand Down Expand Up @@ -179,10 +179,11 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
hash := key.Hash()
if _, ok = emf.pusherMap[hash]; !ok {
emf.pusherMap[hash] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
}
return emf.pusherMap[key]
return emf.pusherMap[hash]
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
Expand Down
81 changes: 64 additions & 17 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -23,6 +24,7 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
)

const defaultRetryCount = 1
Expand Down Expand Up @@ -192,10 +194,15 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := &cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
}]
Entity: &cloudwatchlogs.Entity{KeyAttributes: map[string]*string{
entityType: aws.String(service),
dchappa marked this conversation as resolved.
Show resolved Hide resolved
}},
}
expectedStreamKeyHash := streamKey.Hash()
pusherMap, ok := exp.pusherMap[expectedStreamKeyHash]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -217,16 +224,29 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
attributeEntityCluster: "test-cluster-name",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := &cwlogs.StreamKey{
LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
LogStreamName: "test-task-id",
}]
Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
KeyAttributes: map[string]*string{
"Type": aws.String(service),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
}},
}
pusherMap, ok := exp.pusherMap[streamKey.Hash()]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -243,21 +263,32 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
var entity = &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String(service),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
}

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: "test-task-id",
}]
Entity: entity,
}
pusherMap, ok := exp.pusherMap[streamKey.Hash()]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -279,16 +310,26 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String(service),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
},
}
pusherMap, ok := exp.pusherMap[streamKey.Hash()]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -312,11 +353,17 @@ func TestPushMetricsDataWithErr(t *testing.T) {
logPusher.On("ForceFlush", nil).Return("some error").Once()
logPusher.On("ForceFlush", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("some error").Once()
exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{}
exp.pusherMap[cwlogs.StreamKey{
exp.pusherMap = map[string]cwlogs.Pusher{}
streamKey := cwlogs.StreamKey{
LogGroupName: "test-logGroupName",
LogStreamName: "test-logStreamName",
}] = logPusher
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String(service),
},
},
}
exp.pusherMap[streamKey.Hash()] = logPusher

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
Expand Down
63 changes: 60 additions & 3 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)

const (
Expand All @@ -32,8 +35,36 @@ const (
containerInsightsReceiver = "awscontainerinsight"
attributeReceiver = "receiver"
fieldPrometheusMetricType = "prom_metric_type"

// Entity fields
keyAttributeEntityServiceName = "aws.entity.service.name"
serviceName = "Name"
keyAttributeEntityDeploymentEnvironment = "aws.entity.deployment.environment"
deploymentEnvironment = "Environment"
entityType = "Type"
service = "Service"
attributeEntityCluster = "aws.entity.k8s.cluster.name"
cluster = "Cluster"
attributeEntityNamespace = "aws.entity.k8s.namespace.name"
namespace = "Namespace"
attributeEntityWorkload = "aws.entity.k8s.workload.name"
workload = "Workload"
attributeEntityNode = "aws.entity.k8s.node.name"
node = "Node"
)

var keyAttributeEntityToShortNameMap = map[string]string{
keyAttributeEntityServiceName: serviceName,
keyAttributeEntityDeploymentEnvironment: deploymentEnvironment,
}

var attributeEntityToShortNameMap = map[string]string{
attributeEntityCluster: cluster,
attributeEntityNamespace: namespace,
attributeEntityWorkload: workload,
attributeEntityNode: node,
}

var errMissingMetricsForEnhancedContainerInsights = errors.New("nil event detected with EnhancedContainerInsights enabled")

var fieldPrometheusTypes = map[pmetric.MetricType]string{
Expand Down Expand Up @@ -79,6 +110,7 @@ type groupedMetricMetadata struct {
timestampMs int64
logGroup string
logStream string
entity *cloudwatchlogs.Entity
metricDataType pmetric.MetricType
retainInitialValueForDelta bool
}
Expand All @@ -103,8 +135,8 @@ func newMetricTranslator(config Config) metricTranslator {
return metricTranslator{
metricDescriptor: mt,
calculators: &emfCalculators{
delta: aws.NewFloat64DeltaCalculator(),
summary: aws.NewMetricCalculator(calculateSummaryDelta),
delta: awsmetrics.NewFloat64DeltaCalculator(),
summary: awsmetrics.NewMetricCalculator(calculateSummaryDelta),
},
}
}
Expand Down Expand Up @@ -139,6 +171,9 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
}
}

entity, resourceAttributes := fetchEntityFields(rm.Resource().Attributes())
resourceAttributes.CopyTo(rm.Resource().Attributes())
dchappa marked this conversation as resolved.
Show resolved Hide resolved

for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.Scope().Name() != "" {
Expand All @@ -154,6 +189,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
timestampMs: timestamp,
logGroup: logGroup,
logStream: logStream,
entity: &entity,
metricDataType: metric.Type(),
retainInitialValueForDelta: deltaInitialValue,
},
Expand All @@ -169,6 +205,25 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
return nil
}

func fetchEntityFields(resourceAttributes pcommon.Map) (cloudwatchlogs.Entity, pcommon.Map) {
// the original resourceAttributes map is immutable, so we need to create a mutable copy
// to remove the entity fields from the attributes
mutableResourceAttributes := pcommon.NewMap()
resourceAttributes.CopyTo(mutableResourceAttributes)
serviceKeyAttr := map[string]*string{
dchappa marked this conversation as resolved.
Show resolved Hide resolved
entityType: aws.String(service),
nathalapooja marked this conversation as resolved.
Show resolved Hide resolved
}
attributeMap := map[string]*string{}

processAttributes(keyAttributeEntityToShortNameMap, serviceKeyAttr, mutableResourceAttributes)
processAttributes(attributeEntityToShortNameMap, attributeMap, mutableResourceAttributes)

return cloudwatchlogs.Entity{
KeyAttributes: serviceKeyAttr,
Attributes: attributeMap,
}, mutableResourceAttributes
}

// translateGroupedMetricToCWMetric converts Grouped Metric format to CloudWatch Metric format.
func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Config) *cWMetrics {
labels := groupedMetric.labels
Expand Down Expand Up @@ -505,13 +560,15 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d

logGroup := groupedMetric.metadata.logGroup
logStream := groupedMetric.metadata.logStream
entity := groupedMetric.metadata.entity

if logStream == "" {
logStream = defaultLogStream
}

event.LogGroupName = logGroup
event.LogStreamName = logStream
event.Entity = entity

return event, nil
}
32 changes: 32 additions & 0 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -22,6 +23,7 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/occonventions"
)

Expand All @@ -39,6 +41,7 @@ func createTestResourceMetricsHelper(numMetrics int) pmetric.ResourceMetrics {
rm.Resource().Attributes().PutStr("ClusterName", "myCluster")
rm.Resource().Attributes().PutStr("PodName", "myPod")
rm.Resource().Attributes().PutStr(attributeReceiver, prometheusReceiver)
rm.Resource().Attributes().PutStr(keyAttributeEntityServiceName, "myServiceName")
sm := rm.ScopeMetrics().AppendEmpty()

m := sm.Metrics().AppendEmpty()
Expand Down Expand Up @@ -2574,6 +2577,35 @@ func TestTranslateOtToGroupedMetricForInitialDeltaValue(t *testing.T) {
}
}

func TestFetchEntityFields(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(keyAttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(keyAttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(attributeEntityNode, "my-node")
resourceMetrics.Resource().Attributes().PutStr(attributeEntityCluster, "my-cluster")
resourceMetrics.Resource().Attributes().PutStr(attributeEntityNamespace, "my-namespace")
resourceMetrics.Resource().Attributes().PutStr(attributeEntityWorkload, "my-workload")

expectedEntity := cloudwatchlogs.Entity{KeyAttributes: map[string]*string{
dchappa marked this conversation as resolved.
Show resolved Hide resolved
entityType: aws.String(service),
serviceName: aws.String("my-service"),
deploymentEnvironment: aws.String("my-environment"),
},
Attributes: map[string]*string{
node: aws.String("my-node"),
cluster: aws.String("my-cluster"),
namespace: aws.String("my-namespace"),
workload: aws.String("my-workload"),
},
}

entity, attrs := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, expectedEntity, entity)
attrs.CopyTo(resourceMetrics.Resource().Attributes())
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())

}

func generateTestMetrics(tm testMetric) pmetric.Metrics {
md := pmetric.NewMetrics()
now := time.Now()
Expand Down
Loading
Loading