Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/resourcedetection] Add k8s cluster name detection in EKS environment #28649

Merged
merged 11 commits into from
Nov 21, 2023
27 changes: 27 additions & 0 deletions .chloggen/eks_cluster_name.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: resourcedetectionprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add k8s cluster name detection when running in EKS

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26794]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 5 additions & 1 deletion processor/resourcedetectionprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,18 @@ processors:

* cloud.provider ("aws")
* cloud.platform ("aws_eks")
* k8s.cluster.name

Note: The kubernetes cluster name is only available when running on EC2 instances, and requires permission to run the `EC2:DescribeInstances` [action](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html).
If you see an error with the message `context deadline exceeded`, please increase the timeout setting in your config.

Example:

```yaml
processors:
resourcedetection/eks:
detectors: [env, eks]
timeout: 2s
timeout: 15s
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
override: false
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import (
"context"
"fmt"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand All @@ -28,10 +33,16 @@ const (
kubernetesServiceHostEnvVar = "KUBERNETES_SERVICE_HOST"
authConfigmapNS = "kube-system"
authConfigmapName = "aws-auth"

clusterNameAwsEksTag = "aws:eks:cluster-name"
clusterNameEksTag = "eks:cluster-name"
kubernetesClusterNameTag = "kubernetes.io/cluster/"
)

type detectorUtils interface {
getConfigMap(ctx context.Context, namespace string, name string) (map[string]string, error)
getClusterName(ctx context.Context) (string, error)
getClusterNameTagFromReservations([]*ec2.Reservation) string
}

type eksDetectorUtils struct {
Expand All @@ -43,6 +54,7 @@ type detector struct {
utils detectorUtils
logger *zap.Logger
err error
ra metadata.ResourceAttributesConfig
rb *metadata.ResourceBuilder
}

Expand All @@ -54,10 +66,12 @@ var _ detectorUtils = (*eksDetectorUtils)(nil)
func NewDetector(set processor.CreateSettings, dcfg internal.DetectorConfig) (internal.Detector, error) {
cfg := dcfg.(Config)
utils, err := newK8sDetectorUtils()

return &detector{
utils: utils,
logger: set.Logger,
err: err,
ra: cfg.ResourceAttributes,
rb: metadata.NewResourceBuilder(cfg.ResourceAttributes),
}, nil
}
Expand All @@ -74,7 +88,16 @@ func (d *detector) Detect(ctx context.Context) (resource pcommon.Resource, schem
d.rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
d.rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSEKS)

return d.rb.Emit(), conventions.SchemaURL, nil
// The error is unhandled because we want to return successfully detected resources
// regardless of an error. The caller will properly handle any error hit while getting
// the cluster name.
if d.ra.K8sClusterName.Enabled {
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
var clusterName string
clusterName, err = d.utils.getClusterName(ctx)
d.rb.SetK8sClusterName(clusterName)
}

return d.rb.Emit(), conventions.SchemaURL, err
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
}

func isEKS(ctx context.Context, utils detectorUtils) (bool, error) {
Expand Down Expand Up @@ -114,3 +137,57 @@ func (e eksDetectorUtils) getConfigMap(ctx context.Context, namespace string, na
}
return cm.Data, nil
}

func (e eksDetectorUtils) getClusterName(ctx context.Context) (string, error) {
sess, err := session.NewSession()
if err != nil {
return "", err
}

ec2Svc := ec2metadata.New(sess)
region, err := ec2Svc.Region()
if err != nil {
return "", err
}

svc := ec2.New(sess, aws.NewConfig().WithRegion(region))
instanceIdentityDocument, err := ec2Svc.GetInstanceIdentityDocumentWithContext(ctx)
if err != nil {
return "", err
}

instances, err := svc.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: []*string{
aws.String(instanceIdentityDocument.InstanceID),
},
})
if err != nil {
return "", err
}

clusterName := e.getClusterNameTagFromReservations(instances.Reservations)
if len(clusterName) == 0 {
return clusterName, fmt.Errorf("Failed to detect EKS cluster name. No tag for cluster name found on EC2 instance")
}
return clusterName, nil
}

func (e eksDetectorUtils) getClusterNameTagFromReservations(reservations []*ec2.Reservation) string {
for _, reservation := range reservations {
for _, instance := range reservation.Instances {
for _, tag := range instance.Tags {
if tag.Key == nil {
continue
}

if *tag.Key == clusterNameAwsEksTag || *tag.Key == clusterNameEksTag {
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
return *tag.Value
} else if strings.HasPrefix(*tag.Key, kubernetesClusterNameTag) {
return strings.TrimPrefix(*tag.Key, kubernetesClusterNameTag)
}
}
}
}

return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@ import (
"context"
"testing"

"github.com/aws/aws-sdk-go/service/ec2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/processor/processortest"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/eks/internal/metadata"
)

const (
clusterName = "my-cluster"
)

type MockDetectorUtils struct {
mock.Mock
}
Expand All @@ -25,6 +31,15 @@ func (detectorUtils *MockDetectorUtils) getConfigMap(_ context.Context, namespac
return args.Get(0).(map[string]string), args.Error(1)
}

func (detectorUtils *MockDetectorUtils) getClusterName(_ context.Context) (string, error) {
var reservations []*ec2.Reservation
return detectorUtils.getClusterNameTagFromReservations(reservations), nil
}

func (detectorUtils *MockDetectorUtils) getClusterNameTagFromReservations(_ []*ec2.Reservation) string {
return clusterName
}

func TestNewDetector(t *testing.T) {
dcfg := CreateDefaultConfig()
detector, err := NewDetector(processortest.NewNopCreateSettings(), dcfg)
Expand All @@ -38,9 +53,9 @@ func TestEKS(t *testing.T) {
ctx := context.Background()

t.Setenv("KUBERNETES_SERVICE_HOST", "localhost")
detectorUtils.On("getConfigMap", authConfigmapNS, authConfigmapName).Return(map[string]string{"cluster.name": "my-cluster"}, nil)
detectorUtils.On("getConfigMap", authConfigmapNS, authConfigmapName).Return(map[string]string{conventions.AttributeK8SClusterName: clusterName}, nil)
// Call EKS Resource detector to detect resources
eksResourceDetector := &detector{utils: detectorUtils, err: nil, rb: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig())}
eksResourceDetector := &detector{utils: detectorUtils, err: nil, ra: metadata.DefaultResourceAttributesConfig(), rb: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig())}
res, _, err := eksResourceDetector.Detect(ctx)
require.NoError(t, err)

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ all_set:
enabled: true
cloud.provider:
enabled: true
k8s.cluster.name:
enabled: true
none_set:
resource_attributes:
cloud.platform:
enabled: false
cloud.provider:
enabled: false
k8s.cluster.name:
enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ resource_attributes:
cloud.platform:
description: The cloud.platform
type: string
enabled: true
enabled: true
k8s.cluster.name:
description: The EKS cluster name. This attribute is currently only available when running on EC2 instances, and requires permission to run the EC2:DescribeInstances action.
type: string
enabled: false