From f4ae6deb8e00f602302979fe983ad744904d9620 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Wed, 1 Jan 2025 02:25:21 +0000 Subject: [PATCH] Prometheus add nodes gauge for SQS mode --- cmd/node-termination-handler.go | 28 ++++---- pkg/ec2helper/ec2helper.go | 92 ++++++++++++++++++++++++ pkg/ec2helper/ec2helper_test.go | 74 +++++++++++++++++++ pkg/node/node.go | 26 +++++++ pkg/node/node_test.go | 32 ++++++++- pkg/observability/opentelemetry.go | 86 ++++++++++++++++++++-- pkg/observability/opentelemetry_test.go | 96 ++++++++++++++++++++++++- 7 files changed, 415 insertions(+), 19 deletions(-) create mode 100644 pkg/ec2helper/ec2helper.go create mode 100644 pkg/ec2helper/ec2helper_test.go diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 700543ea..238a5b7a 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,7 +120,20 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } - metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) + cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) + sess := session.Must(session.NewSessionWithOptions(session.Options{ + Config: *cfg, + SharedConfigState: session.SharedConfigEnable, + })) + creds, err := sess.Config.Credentials.Get() + if err != nil { + log.Fatal().Err(err).Msg("Unable to get AWS credentials") + } + log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) + + ec2 := ec2.New(sess) + + metrics, err := observability.InitMetrics(nthConfig, node, ec2) if err != nil { nthConfig.Print() log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") @@ -204,17 +217,6 @@ func main() { } } if nthConfig.EnableSQSTerminationDraining { - cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) - sess := session.Must(session.NewSessionWithOptions(session.Options{ - Config: *cfg, - SharedConfigState: session.SharedConfigEnable, - })) - creds, err := sess.Config.Credentials.Get() - if err != nil { - log.Fatal().Err(err).Msg("Unable to get AWS credentials") - } - log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) - completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second sqsMonitor := sqsevent.SQSMonitor{ CheckIfManaged: nthConfig.CheckTagBeforeDraining, @@ -224,7 +226,7 @@ func main() { CancelChan: cancelChan, SQS: sqsevent.GetSqsClient(sess), ASG: autoscaling.New(sess), - EC2: ec2.New(sess), + EC2: ec2, BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) }, } monitoringFns[sqsEvents] = sqsMonitor diff --git a/pkg/ec2helper/ec2helper.go b/pkg/ec2helper/ec2helper.go new file mode 100644 index 00000000..2405cce2 --- /dev/null +++ b/pkg/ec2helper/ec2helper.go @@ -0,0 +1,92 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ec2helper + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" +) + +type IEC2Helper interface { + GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) +} + +type EC2Helper struct { + ec2ServiceClient ec2iface.EC2API +} + +func New(ec2 ec2iface.EC2API) EC2Helper { + return EC2Helper{ + ec2ServiceClient: ec2, + } +} + +func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) { + ids := []string{} + nextToken := "" + + for { + result, err := h.ec2ServiceClient.DescribeInstances(&ec2.DescribeInstancesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("tag-key"), + Values: []*string{aws.String(tag)}, + }, + }, + NextToken: &nextToken, + }) + + if err != nil { + return ids, err + } + + if result == nil || len(result.Reservations) == 0 || + len(result.Reservations[0].Instances) == 0 { + return ids, fmt.Errorf("failed to describe instances") + } + + for _, reservation := range result.Reservations { + for _, instance := range reservation.Instances { + if instance.InstanceId == nil { + continue + } + ids = append(ids, *instance.InstanceId) + } + } + + if result.NextToken == nil { + break + } + nextToken = *result.NextToken + } + + return ids, nil +} + +func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) { + idMap := map[string]bool{} + ids, err := h.GetInstanceIdsByTagKey(tag) + if err != nil { + return idMap, err + } + + for _, id := range ids { + idMap[id] = true + } + + return idMap, nil +} diff --git a/pkg/ec2helper/ec2helper_test.go b/pkg/ec2helper/ec2helper_test.go new file mode 100644 index 00000000..01e097c0 --- /dev/null +++ b/pkg/ec2helper/ec2helper_test.go @@ -0,0 +1,74 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ec2helper_test + +import ( + "testing" + + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" +) + +const ( + instanceId1 = "i-1" + instanceId2 = "i-2" +) + +func TestGetInstanceIdsByTagKey(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + +func TestGetInstanceIdsMapByTagKey(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIdsMap, err := ec2Helper.GetInstanceIdsMapByTagKey("myNTHManagedTag") + h.Ok(t, err) + + _, exist := instanceIdsMap[instanceId1] + h.Equals(t, true, exist) + _, exist = instanceIdsMap[instanceId2] + h.Equals(t, true, exist) + _, exist = instanceIdsMap["non-existent instance id"] + h.Equals(t, false, exist) +} + +func getDescribeInstancesResp() ec2.DescribeInstancesOutput { + return ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + }, + }, + }, + } +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 7e323d13..b8b40082 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -635,6 +635,32 @@ func (n Node) fetchKubernetesNode(nodeName string) (*corev1.Node, error) { return &matchingNodes.Items[0], nil } +// fetchKubernetesNode will send an http request to the k8s api server and return list of AWS EC2 instance id +func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) { + ids := []string{} + + if n.nthConfig.DryRun { + return ids, nil + } + matchingNodes, err := n.drainHelper.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + log.Warn().Msgf("Unable to list Nodes") + return ids, err + } + + if matchingNodes == nil || len(matchingNodes.Items) == 0 { + return ids, nil + } + + for _, node := range matchingNodes.Items { + // sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678 + parts := strings.Split(node.Spec.ProviderID, "/") + ids = append(ids, parts[len(parts)-1]) + } + + return ids, nil +} + func (n Node) fetchAllPods(nodeName string) (*corev1.PodList, error) { if n.nthConfig.DryRun { log.Info().Msgf("Would have retrieved running pod list on node %s, but dry-run flag was set", nodeName) diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 0e4c393b..e94ca88c 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -15,6 +15,7 @@ package node_test import ( "context" + "fmt" "strconv" "strings" "testing" @@ -35,7 +36,11 @@ import ( ) // Size of the fakeRecorder buffer -const recorderBufferSize = 10 +const ( + recorderBufferSize = 10 + instanceId1 = "i-0abcd1234efgh5678" + instanceId2 = "i-0wxyz5678ijkl1234" +) var nodeName = "NAME" @@ -379,6 +384,31 @@ func TestUncordonIfRebootedTimeParseFailure(t *testing.T) { h.Assert(t, err != nil, "Failed to return error on UncordonIfReboted failure to parse time") } +func TestFetchKubernetesNodeInstanceIds(t *testing.T) { + client := fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + ) + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + instanceIds, err := node.FetchKubernetesNodeInstanceIds() + h.Ok(t, err) + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + func TestFilterOutDaemonSetPods(t *testing.T) { tNode, err := newNode(config.Config{IgnoreDaemonSets: true}, fake.NewSimpleClientset()) h.Ok(t, err) diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index 706fff07..581792d5 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -21,6 +21,10 @@ import ( "strconv" "time" + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" @@ -44,14 +48,19 @@ var ( // Metrics represents the stats for observability type Metrics struct { enabled bool + nthConfig config.Config + ec2Helper ec2helper.EC2Helper + node *node.Node meter api.Meter actionsCounter api.Int64Counter actionsCounterV2 api.Int64Counter errorEventsCounter api.Int64Counter + nodesGauge api.Int64Gauge + instancesGauge api.Int64Gauge } // InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry. -func InitMetrics(enabled bool, port int) (Metrics, error) { +func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) (Metrics, error) { exporter, err := prometheus.New() if err != nil { return Metrics{}, fmt.Errorf("failed to create Prometheus exporter: %w", err) @@ -61,7 +70,10 @@ func InitMetrics(enabled bool, port int) (Metrics, error) { if err != nil { return Metrics{}, fmt.Errorf("failed to register metrics with Prometheus provider: %w", err) } - metrics.enabled = enabled + metrics.enabled = nthConfig.EnablePrometheus + metrics.ec2Helper = ec2helper.New(ec2) + metrics.node = node + metrics.nthConfig = nthConfig // Starts an async process to collect golang runtime stats // go.opentelemetry.io/contrib/instrumentation/runtime @@ -70,13 +82,46 @@ func InitMetrics(enabled bool, port int) (Metrics, error) { return Metrics{}, fmt.Errorf("failed to start Go runtime metrics collection: %w", err) } - if enabled { - serveMetrics(port) + if metrics.enabled { + metrics.initCronMetrics() + serveMetrics(nthConfig.PrometheusPort) } return metrics, nil } +func (m Metrics) initCronMetrics() { + // Run a periodic task + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + m.serveNodeMetrics() + } +} + +func (m Metrics) serveNodeMetrics() { + instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(m.nthConfig.ManagedTag) + if err != nil { + log.Err(err).Msg("Failed to get AWS instance ids") + } else { + m.InstancesRecord(int64(len(instanceIdsMap))) + } + + nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds() + if err != nil { + log.Err(err).Msg("Failed to get node instance ids") + } else { + nodeCount := 0 + for _, id := range nodeInstanceIds { + if _, ok := instanceIdsMap[id]; ok { + nodeCount++ + } + } + m.NodesRecord(int64(nodeCount)) + } +} + // ErrorEventsInc will increment one for the event errors counter, partitioned by action, and only if metrics are enabled. func (m Metrics) ErrorEventsInc(where string) { if !m.enabled { @@ -105,6 +150,22 @@ func (m Metrics) NodeActionsInc(action, nodeName string, eventID string, err err m.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(labelsV2...)) } +func (m Metrics) NodesRecord(num int64) { + if !m.enabled { + return + } + + m.nodesGauge.Record(context.Background(), num) +} + +func (m Metrics) InstancesRecord(num int64) { + if !m.enabled { + return + } + + m.instancesGauge.Record(context.Background(), num) +} + func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { meter := provider.Meter("aws.node.termination.handler") @@ -131,11 +192,28 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { return Metrics{}, fmt.Errorf("failed to create Prometheus counter %q: %w", name, err) } errorEventsCounter.Add(context.Background(), 0) + + name = "nth_managed_nodes" + nodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing")) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) + } + nodesGauge.Record(context.Background(), 0) + + name = "nth_managed_instances" + instancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing")) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) + } + instancesGauge.Record(context.Background(), 0) + return Metrics{ meter: meter, errorEventsCounter: errorEventsCounter, actionsCounter: actionsCounter, actionsCounterV2: actionsCounterV2, + nodesGauge: nodesGauge, + instancesGauge: instancesGauge, }, nil } diff --git a/pkg/observability/opentelemetry_test.go b/pkg/observability/opentelemetry_test.go index 237f456e..687d5b93 100644 --- a/pkg/observability/opentelemetry_test.go +++ b/pkg/observability/opentelemetry_test.go @@ -25,13 +25,25 @@ import ( "testing" "time" + "github.com/rs/zerolog/log" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" - + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubectl/pkg/drain" + + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-node-termination-handler/pkg/uptime" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" ) var ( @@ -48,6 +60,9 @@ var ( errorStatus = "error" mockDefaultPort = 9092 mockClosedPort = 9093 + instanceId1 = "i-1" + instanceId2 = "i-2" + instanceId3 = "i-3" ) func TestInitMetrics(t *testing.T) { @@ -109,6 +124,8 @@ func TestRegisterMetricsWith(t *testing.T) { const errorEventMetricsTotal = 23 const successActionMetricsTotal = 31 const errorActionMetricsTotal = 97 + const managedInstancesTotal = 3 + const managedNodesTotal = 5 metrics := getMetrics(t) @@ -126,6 +143,9 @@ func TestRegisterMetricsWith(t *testing.T) { metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionlabels...)) } + metrics.NodesRecord(managedNodesTotal) + metrics.InstancesRecord(managedInstancesTotal) + responseRecorder := mockMetricsRequest() validateStatus(t, responseRecorder) @@ -135,6 +155,57 @@ func TestRegisterMetricsWith(t *testing.T) { validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal) validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus) validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus) + validateGauge(t, metricsMap, managedNodesTotal, "nth_managed_nodes") + validateGauge(t, metricsMap, managedInstancesTotal, "nth_managed_instances") +} + +func TestServeNodeMetrics(t *testing.T) { + metrics := getMetrics(t) + metrics.ec2Helper = ec2helper.New(h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + { + InstanceId: aws.String(instanceId3), + }, + }, + }, + }, + }, + }) + + helper := getDrainHelper(fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + )) + + node, err := node.NewWithValues(config.Config{}, helper, uptime.Uptime) + h.Ok(t, err) + + metrics.node = node + metrics.serveNodeMetrics() + + responseRecorder := mockMetricsRequest() + + validateStatus(t, responseRecorder) + + metricsMap := getMetricsMap(responseRecorder.Body.String()) + + validateGauge(t, metricsMap, 2, "nth_managed_nodes") + validateGauge(t, metricsMap, 3, "nth_managed_instances") } func TestServeMetrics(t *testing.T) { @@ -225,6 +296,20 @@ func getMetricsMap(body string) map[string]string { return metricsMap } +func getDrainHelper(client *fake.Clientset) *drain.Helper { + return &drain.Helper{ + Ctx: context.TODO(), + Client: client, + Force: true, + GracePeriodSeconds: -1, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + Timeout: time.Duration(120) * time.Second, + Out: log.Logger, + ErrOut: log.Logger, + } +} + func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) { eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth) actualValue, exists := metricsMap[eventErrorTotalKey] @@ -242,3 +327,12 @@ func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedT } h.Equals(t, strconv.Itoa(expectedTotal), actualValue) } + +func validateGauge(t *testing.T, metricsMap map[string]string, expectedTotal int, name string) { + actionTotalKey := fmt.Sprintf("%v{otel_scope_name=\"%v\",otel_scope_version=\"\"}", name, mockNth) + actualValue, exists := metricsMap[actionTotalKey] + if !exists { + actualValue = "0" + } + h.Equals(t, strconv.Itoa(expectedTotal), actualValue) +}