diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 2b755bc62..793d3fd75 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -130,8 +130,8 @@ func (c CloudProvider) GetSupportedNodeClasses() []status.Object { return []status.Object{&v1alpha1.KWOKNodeClass{}} } -func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatement { - return []cloudprovider.RepairStatement{} +func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy { + return []cloudprovider.RepairPolicy{} } func (c CloudProvider) getInstanceType(instanceTypeName string) (*cloudprovider.InstanceType, error) { diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index b5c6f64bb..6206d109c 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -263,11 +263,11 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider return c.Drifted, nil } -func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatement { - return []cloudprovider.RepairStatement{ +func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy { + return []cloudprovider.RepairPolicy{ { - Type: "HealthyNode", - Status: corev1.ConditionFalse, + ConditionType: "BadNode", + ConditionStatus: corev1.ConditionFalse, TolerationDuration: 30 * time.Minute, }, } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index dc1d76120..6696c6e0a 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -42,11 +42,11 @@ var ( type DriftReason string -type RepairStatement struct { - // Type of unhealthy state that is found on the node - Type corev1.NodeConditionType - // Status condition when a node is unhealthy - Status corev1.ConditionStatus +type RepairPolicy struct { + // ConditionType of unhealthy state that is found on the node + ConditionType corev1.NodeConditionType + // ConditionStatus condition when a node is unhealthy + ConditionStatus corev1.ConditionStatus // TolerationDuration is the duration the controller will wait // before force terminating nodes that are unhealthy. TolerationDuration time.Duration @@ -76,7 +76,7 @@ type CloudProvider interface { IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error) // RepairPolicy is for CloudProviders to define a set Unhealthy condition for Karpenter // to monitor on the node. - RepairPolicy() []RepairStatement + RepairPolicies() []RepairPolicy // Name returns the CloudProvider implementation name. Name() string // GetSupportedNodeClasses returns CloudProvider NodeClass that implements status.Object diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 2c07b6ec6..2b8128c39 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -98,8 +98,8 @@ func NewControllers( health.NewController(kubeClient, cloudProvider, clock), } - // The cloud provider must define status conation for the node repair controller to used for detecting unhealthy nodes - if len(cloudProvider.RepairPolicy()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair { + // The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes + if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair { controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock)) } diff --git a/pkg/controllers/node/health/controller.go b/pkg/controllers/node/health/controller.go index 15c5ddc79..bbe82d19e 100644 --- a/pkg/controllers/node/health/controller.go +++ b/pkg/controllers/node/health/controller.go @@ -21,13 +21,12 @@ import ( "fmt" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -63,67 +62,52 @@ func (c *Controller) Register(_ context.Context, m manager.Manager) error { } func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) { - nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) - if err != nil { - return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) - } - ctx = injection.WithControllerName(ctx, "node.health") - ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("nodeclaim", nodeClaim.Name)) - nodeHealthCondition := corev1.NodeCondition{} - foundCloudProviderPolicy := cloudprovider.RepairStatement{} + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(node.Namespace, node.Name))) // Validate that the node is owned by us and is not being deleted - if !node.GetDeletionTimestamp().IsZero() || !controllerutil.ContainsFinalizer(node, v1.TerminationFinalizer) { - return reconcile.Result{}, nil - } - - for _, policy := range c.cloudProvider.RepairPolicy() { - nodeHealthCondition = nodeutils.GetCondition(node, policy.Type) - if nodeHealthCondition.Status == policy.Status { - // found unhealthy condition on the node - foundCloudProviderPolicy = policy - break - } - } - - // From here there are three scenarios to handle: - // 1. If node is healthy, exit node repair loop - if foundCloudProviderPolicy.Type == "" { - return reconcile.Result{}, nil + nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) + if err != nil { + return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) } - // 2. If the Node is unhealthy, but has not reached it's full toleration disruption, exit the loop - disruptionTime := nodeHealthCondition.LastTransitionTime.Add(foundCloudProviderPolicy.TolerationDuration) - if c.clock.Now().Before(disruptionTime) { - return reconcile.Result{RequeueAfter: disruptionTime.Sub(c.clock.Now())}, nil + // If find if a node is unhealthy + healthCondition, _ := lo.Find(c.cloudProvider.RepairPolicies(), func(policy cloudprovider.RepairPolicy) bool { + nodeCondition := nodeutils.GetCondition(node, policy.ConditionType) + return nodeCondition.Status == policy.ConditionStatus + }) + + // If the Node is unhealthy, but has not reached it's full toleration disruption + // requeue at the termination time of the unhealthy node + terminationTime := nodeutils.GetCondition(node, healthCondition.ConditionType).LastTransitionTime.Add(healthCondition.TolerationDuration) + if c.clock.Now().Before(terminationTime) { + return reconcile.Result{RequeueAfter: terminationTime.Sub(c.clock.Now())}, nil } + // For unhealthy past the tolerationDisruption window we can forcefully terminate the node if err := c.annotateTerminationGracePeriod(ctx, nodeClaim); err != nil { - return reconcile.Result{}, fmt.Errorf("annotated termination grace period on nodeclaim, %w", err) + return reconcile.Result{}, client.IgnoreNotFound(err) } - - // 3. Otherwise, if the Node is unhealthy and past it's tolerationDisruption window we can forcefully terminate the node if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { - return reconcile.Result{}, client.IgnoreNotFound(err) + return reconcile.Result{}, err } - // 4. The deletion timestamp has successfully been set for the Node, update relevant metrics. + + // The deletion timestamp has successfully been set for the Node, update relevant metrics. log.FromContext(ctx).V(1).Info("deleting unhealthy node") - metrics.NodeClaimsDisruptedTotal.With(prometheus.Labels{ - metrics.ReasonLabel: metrics.UnhealthyReason, - metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey], - metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey], - }).Inc() + metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{ + metrics.ReasonLabel: string(healthCondition.ConditionType), + metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], + metrics.CapacityTypeLabel: node.Labels[v1.CapacityTypeLabelKey], + }) return reconcile.Result{}, nil } func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error { stored := nodeClaim.DeepCopy() - terminationTime := c.clock.Now().Format(time.RFC3339) - nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime}) + nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: c.clock.Now().Format(time.RFC3339)}) if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { - return client.IgnoreNotFound(err) + return err } return nil diff --git a/pkg/controllers/node/health/suite_test.go b/pkg/controllers/node/health/suite_test.go index c047b3567..29599296a 100644 --- a/pkg/controllers/node/health/suite_test.go +++ b/pkg/controllers/node/health/suite_test.go @@ -18,6 +18,7 @@ package health_test import ( "context" + "fmt" "testing" "time" @@ -25,6 +26,7 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -36,7 +38,8 @@ import ( "sigs.k8s.io/karpenter/pkg/controllers/node/health" "sigs.k8s.io/karpenter/pkg/controllers/node/termination" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" - "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle" + nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle" + "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" @@ -47,12 +50,12 @@ import ( var ctx context.Context var healthController *health.Controller var terminationController *termination.Controller +var nodeClaimController *nodeclaimlifecycle.Controller var env *test.Environment var fakeClock *clock.FakeClock var cloudProvider *fake.CloudProvider var recorder *test.EventRecorder var queue *terminator.Queue -var nodeClaimController *lifecycle.Controller func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -73,15 +76,14 @@ var _ = BeforeSuite(func() { queue = terminator.NewTestingQueue(env.Client, recorder) healthController = health.NewController(env.Client, cloudProvider, fakeClock) terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) - nodeClaimController = lifecycle.NewController(fakeClock, env.Client, cloudProvider, recorder) - + nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{})) }) var _ = AfterSuite(func() { Expect(env.Stop()).To(Succeed(), "Failed to stop environment") }) -var _ = Describe("Health", func() { +var _ = Describe("Node Health", func() { var node *corev1.Node var nodeClaim *v1.NodeClaim var nodePool *v1.NodePool @@ -107,7 +109,7 @@ var _ = Describe("Health", func() { Context("Reconciliation", func() { It("should delete nodes that are unhealthy by the cloud provider", func() { node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, }) @@ -121,23 +123,6 @@ var _ = Describe("Health", func() { ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectNotFound(ctx, env.Client, node) }) - It("should not reconcile when a node has deletion timestamp set", func() { - node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", - Status: corev1.ConditionFalse, - LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, - }) - fakeClock.Step(60 * time.Minute) - ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) - ExpectDeletionTimestampSet(ctx, env.Client, node) - // Determine to delete unhealthy nodes - ExpectObjectReconciled(ctx, env.Client, healthController, node) - // Let the termination controller terminate the node - ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) - ExpectObjectReconciled(ctx, env.Client, terminationController, node) - ExpectObjectReconciled(ctx, env.Client, terminationController, node) - ExpectExists(ctx, env.Client, node) - }) It("should not delete node when unhealthy type does not match cloud provider passed in value", func() { node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ Type: "FakeHealthyNode", @@ -156,7 +141,7 @@ var _ = Describe("Health", func() { }) It("should not delete node when health status does not match cloud provider passed in value", func() { node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionTrue, LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, }) @@ -172,7 +157,7 @@ var _ = Describe("Health", func() { }) It("should not delete node when health duration is not reached", func() { node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, // We expect the last transition for HealthyNode condition to wait 30 minutes LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, @@ -187,9 +172,9 @@ var _ = Describe("Health", func() { ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectExists(ctx, env.Client, node) }) - It("should set termination grace period to now when not defined ", func() { + It("should set annotation termination grace period when force termination is started", func() { node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, // We expect the last transition for HealthyNode condition to wait 30 minutes LastTransitionTime: metav1.Time{Time: time.Now()}, @@ -205,10 +190,10 @@ var _ = Describe("Health", func() { nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339))) }) - It("should respect termination grace period if set on the nodepool", func() { + It("should not respect termination grace period if set on the nodepool", func() { nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Minute} node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, // We expect the last transition for HealthyNode condition to wait 30 minutes LastTransitionTime: metav1.Time{Time: time.Now()}, @@ -224,10 +209,25 @@ var _ = Describe("Health", func() { nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339))) }) + It("should return the requeue interval for the time between now and when the nodeClaim termination time", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: time.Now()}, + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + + fakeClock.Step(27 * time.Minute) + + result := ExpectObjectReconciled(ctx, env.Client, healthController, node) + fmt.Println(result.RequeueAfter.String()) + Expect(result.RequeueAfter).To(BeNumerically("~", time.Minute*3, time.Second)) + }) }) Context("Forceful termination", func() { - It("should not respect node disruption budgets ", func() { + It("should ignore node disruption budgets", func() { // Blocking disruption budgets nodePool.Spec.Disruption = v1.Disruption{ Budgets: []v1.Budget{ @@ -237,7 +237,7 @@ var _ = Describe("Health", func() { }, } node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, }) @@ -251,10 +251,10 @@ var _ = Describe("Health", func() { ExpectObjectReconciled(ctx, env.Client, terminationController, node) ExpectNotFound(ctx, env.Client, node) }) - It("should not respect do-not-disrupt on node", func() { + It("should ignore do-not-disrupt on a node", func() { node.Annotations = map[string]string{v1.DoNotDisruptAnnotationKey: "true"} node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, }) @@ -272,7 +272,7 @@ var _ = Describe("Health", func() { Context("Metrics", func() { It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() { node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ - Type: "HealthyNode", + Type: "BadNode", Status: corev1.ConditionFalse, LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, }) @@ -287,7 +287,7 @@ var _ = Describe("Health", func() { ExpectNotFound(ctx, env.Client, node) ExpectMetricCounterValue(metrics.NodeClaimsDisruptedTotal, 1, map[string]string{ - metrics.ReasonLabel: metrics.UnhealthyReason, + metrics.ReasonLabel: string(cloudProvider.RepairPolicies()[0].ConditionType), metrics.NodePoolLabel: nodePool.Name, }) }) diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go index a958aeb4a..32d8bdcac 100644 --- a/pkg/metrics/constants.go +++ b/pkg/metrics/constants.go @@ -33,7 +33,6 @@ const ( // Reasons for CREATE/DELETE shared metrics ProvisionedReason = "provisioned" ExpiredReason = "expired" - UnhealthyReason = "unhealthy" ) // DurationBuckets returns a []float64 of default threshold values for duration histograms.