Skip to content

Commit

Permalink
Revert entity changes (#234)
Browse files Browse the repository at this point in the history
* Revert "Exports Entity for awsemfexporter plugins on PutLogEvent calls (#233)"

This reverts commit 299ef87.

* Revert "Adds sdk changes necessary for adding Entity field to CW Logs PutLogEvent API (#231)"

This reverts commit 69bd759.
  • Loading branch information
movence authored Oct 1, 2024
1 parent 299ef87 commit b442784
Show file tree
Hide file tree
Showing 20 changed files with 52 additions and 22,687 deletions.
2 changes: 1 addition & 1 deletion exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -24,7 +25,6 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
)

type cwlExporter struct {
Expand Down
2 changes: 1 addition & 1 deletion exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -21,7 +22,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
)

func init() {
Expand Down
29 changes: 10 additions & 19 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -34,12 +33,10 @@ const (
// AppSignals EMF config
appSignalsMetricNamespace = "ApplicationSignals"
appSignalsLogGroupNamePrefix = "/aws/application-signals/"

pusherMapLimit = 1000
)

type emfExporter struct {
pusherMap *lru.Cache[string, cwlogs.Pusher]
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
svcStructuredLog *cwlogs.Client
config *Config

Expand Down Expand Up @@ -83,20 +80,14 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
return nil, err
}

boundedPusherMap, err := lru.New[string, cwlogs.Pusher](pusherMapLimit)

if err != nil {
return nil, err
}

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
pusherMap: boundedPusherMap,
}

if config.IsAppSignalsEnabled() {
Expand Down Expand Up @@ -188,21 +179,21 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
hash := key.Hash()
var pusher cwlogs.Pusher
if pusher, ok = emf.pusherMap.Get(hash); !ok {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
emf.pusherMap.Add(hash, pusher)
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
}

return pusher
return emf.pusherMap[key]
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

return emf.pusherMap.Values()
var pushers []cwlogs.Pusher
for _, pusher := range emf.pusherMap {
pushers = append(pushers, pusher)
}
return pushers
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
Expand Down
90 changes: 18 additions & 72 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (
"testing"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -25,7 +23,6 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
)

const defaultRetryCount = 1
Expand Down Expand Up @@ -195,16 +192,10 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
streamKey := &cwlogs.StreamKey{
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{},
Attributes: map[string]*string{},
},
}
expectedStreamKeyHash := streamKey.Hash()
pusherMap, ok := exp.pusherMap.Get(expectedStreamKeyHash)
}]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -226,31 +217,16 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
attributeEntityCluster: "test-cluster-name",
keyAttributeEntityType: "Service",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
streamKey := &cwlogs.StreamKey{
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
LogStreamName: "test-task-id",
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
},
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
}]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -267,34 +243,21 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
var entity = &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{},
}

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
keyAttributeEntityType: service,
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
streamKey := cwlogs.StreamKey{
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: "test-task-id",
Entity: entity,
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
}]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -316,28 +279,16 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityType: service,
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
streamKey := cwlogs.StreamKey{
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Type": aws.String("Service"),
},
Attributes: map[string]*string{},
},
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
}]
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -361,17 +312,12 @@ func TestPushMetricsDataWithErr(t *testing.T) {
logPusher.On("ForceFlush", nil).Return("some error").Once()
logPusher.On("ForceFlush", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("some error").Once()
exp.pusherMap, err = lru.New[string, cwlogs.Pusher](pusherMapLimit)
assert.Nil(t, err)
streamKey := cwlogs.StreamKey{
exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{}
exp.pusherMap[cwlogs.StreamKey{
LogGroupName: "test-logGroupName",
LogStreamName: "test-logStreamName",
Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{},
KeyAttributes: map[string]*string{},
},
}
exp.pusherMap.Add(streamKey.Hash(), logPusher)
}] = logPusher

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
Expand Down
1 change: 0 additions & 1 deletion exporter/awsemfexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20240419190856-2f880467f335
github.com/aws/aws-sdk-go v1.53.11
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs v0.103.0
Expand Down
2 changes: 0 additions & 2 deletions exporter/awsemfexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b442784

Please sign in to comment.