Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exports Entity for awsemfexporter plugins on PutLogEvent calls #233

Merged
merged 45 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
644b0fa
adds sdk changes to cw logs components
dchappa Aug 22, 2024
ce92f5e
adds entity on export
dchappa Aug 28, 2024
83df5a3
adds debug statements
dchappa Aug 30, 2024
d184f65
adds more log lines in newEventBatch function
dchappa Sep 3, 2024
9c15961
fixes other newEventBatch calls
dchappa Sep 3, 2024
525100f
adds type assertion logging to ensure type is the same when being sent
dchappa Sep 3, 2024
ec8e74e
verifies current entity instead of previous
dchappa Sep 3, 2024
36dbeac
adds entity when creating new log pusher
dchappa Sep 3, 2024
c4ca2a5
fills entity fields with certain known otel attributes
dchappa Sep 6, 2024
9e883bd
log lines for resourceAttributes
dchappa Sep 6, 2024
882eb11
hardcodes otel attributes to ensure entity population works
dchappa Sep 6, 2024
bbcf3bf
temporarily comments out attribute removal
dchappa Sep 6, 2024
fec89d2
removes asg temporarily
dchappa Sep 6, 2024
96bc334
adds keyAttributes after changing test app_signals config
dchappa Sep 6, 2024
b4f0e3d
reimplement removal of the resource attributes
dchappa Sep 6, 2024
345ea32
refactors fetching of entity fields
dchappa Sep 6, 2024
57a5953
adds resourceMutex for concurrency issue to the resource Attributes
dchappa Sep 6, 2024
7272016
copies atrribute map over to a new map to enable mutations
dchappa Sep 6, 2024
f9e02bb
fix for assigning variables
dchappa Sep 6, 2024
5e2a295
addresses request size error with
dchappa Sep 6, 2024
3beae3c
adds normal attributes, refactors
dchappa Sep 6, 2024
c0b2081
fixes helper function
dchappa Sep 6, 2024
c87246b
fixes attribute map
dchappa Sep 6, 2024
dc69644
removes log lines, fixes unit tests
dchappa Sep 9, 2024
d8030e9
fixes lint issues, stray log lines
dchappa Sep 9, 2024
bc5ceae
fixes linting, testing
dchappa Sep 9, 2024
81300e3
removes commented line
dchappa Sep 9, 2024
33c203e
fixes lint for comment
dchappa Sep 9, 2024
ee93dd9
fixes more linting issues
dchappa Sep 9, 2024
7fd20c4
adds comment explaining processAttributes function
dchappa Sep 10, 2024
5a08c19
adds more to unit tests, fixes variable name
dchappa Sep 12, 2024
8c22b7d
address lint check finding for error check in test
dchappa Sep 12, 2024
3cbf1d6
adds attributes for resource types. Refactors pusher map logic to be …
dchappa Sep 17, 2024
56b7329
attempting to fix the resource attributes issue.
dchappa Sep 17, 2024
820e414
fixes tests, and bug with fetching existing pusher
dchappa Sep 17, 2024
76f1547
adds debug line for entity
dchappa Sep 17, 2024
776ac6a
Revert "adds debug line for entity"
dchappa Sep 17, 2024
0742057
fixes lint check
dchappa Sep 17, 2024
f6d5d88
fixes lint check
dchappa Sep 17, 2024
b6bb072
adds unit tests for bounded pusher map
dchappa Sep 18, 2024
2eb0318
lint issues
dchappa Sep 18, 2024
e81b01d
fixes logic to remove entity attributes
dchappa Sep 18, 2024
84eb70f
reverts entity attribute refactoring
dchappa Sep 18, 2024
a1cd2fc
lint check. Adds breadcrumb to update hash function
dchappa Sep 18, 2024
00e53b0
removes bounded pusher map in favor of lru. Adds some unit tests and …
dchappa Sep 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -33,10 +34,12 @@ const (
// AppSignals EMF config
appSignalsMetricNamespace = "ApplicationSignals"
appSignalsLogGroupNamePrefix = "/aws/application-signals/"

pusherMapLimit = 1000
)

type emfExporter struct {
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
pusherMap *lru.Cache[string, cwlogs.Pusher]
svcStructuredLog *cwlogs.Client
config *Config

Expand Down Expand Up @@ -80,14 +83,20 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
return nil, err
}

boundedPusherMap, err := lru.New[string, cwlogs.Pusher](pusherMapLimit)

if err != nil {
return nil, err
}

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
pusherMap: boundedPusherMap,
}

if config.IsAppSignalsEnabled() {
Expand Down Expand Up @@ -179,21 +188,21 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
hash := key.Hash()
var pusher cwlogs.Pusher
if pusher, ok = emf.pusherMap.Get(hash); !ok {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
emf.pusherMap.Add(hash, pusher)
}
return emf.pusherMap[key]

return pusher
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

var pushers []cwlogs.Pusher
for _, pusher := range emf.pusherMap {
pushers = append(pushers, pusher)
}
return pushers
return emf.pusherMap.Values()
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
Expand Down
90 changes: 72 additions & 18 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"testing"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -23,6 +25,7 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
)

const defaultRetryCount = 1
Expand Down Expand Up @@ -192,10 +195,16 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := &cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{},
Attributes: map[string]*string{},
},
}
expectedStreamKeyHash := streamKey.Hash()
pusherMap, ok := exp.pusherMap.Get(expectedStreamKeyHash)
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -217,16 +226,31 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
attributeEntityCluster: "test-cluster-name",
keyAttributeEntityType: "Service",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := &cwlogs.StreamKey{
LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
LogStreamName: "test-task-id",
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
},
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -243,21 +267,34 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
var entity = &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{},
}

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
keyAttributeEntityType: service,
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: "test-task-id",
}]
Entity: entity,
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -279,16 +316,28 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityType: service,
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Type": aws.String("Service"),
},
Attributes: map[string]*string{},
},
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -312,12 +361,17 @@ func TestPushMetricsDataWithErr(t *testing.T) {
logPusher.On("ForceFlush", nil).Return("some error").Once()
logPusher.On("ForceFlush", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("some error").Once()
exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{}
exp.pusherMap[cwlogs.StreamKey{
exp.pusherMap, err = lru.New[string, cwlogs.Pusher](pusherMapLimit)
assert.Nil(t, err)
streamKey := cwlogs.StreamKey{
LogGroupName: "test-logGroupName",
LogStreamName: "test-logStreamName",
}] = logPusher

Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{},
KeyAttributes: map[string]*string{},
},
}
exp.pusherMap.Add(streamKey.Hash(), logPusher)
md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20240419190856-2f880467f335
github.com/aws/aws-sdk-go v1.53.11
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs v0.103.0
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading