Skip to content

Commit

Permalink
Update to use RepairStatements
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Nov 17, 2024
1 parent b6e5837 commit 4a18b48
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 94 deletions.
4 changes: 2 additions & 2 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (c CloudProvider) GetSupportedNodeClasses() []status.Object {
return []status.Object{&v1alpha1.KWOKNodeClass{}}
}

func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatement {
return []cloudprovider.RepairStatement{}
func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{}
}

func (c CloudProvider) getInstanceType(instanceTypeName string) (*cloudprovider.InstanceType, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider
return c.Drifted, nil
}

func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatement {
return []cloudprovider.RepairStatement{
func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{
{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
ConditionType: "BadNode",
ConditionStatus: corev1.ConditionFalse,
TolerationDuration: 30 * time.Minute,
},
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ var (

type DriftReason string

type RepairStatement struct {
// Type of unhealthy state that is found on the node
Type corev1.NodeConditionType
// Status condition when a node is unhealthy
Status corev1.ConditionStatus
type RepairPolicy struct {
// ConditionType of unhealthy state that is found on the node
ConditionType corev1.NodeConditionType
// ConditionStatus condition when a node is unhealthy
ConditionStatus corev1.ConditionStatus
// TolerationDuration is the duration the controller will wait
// before force terminating nodes that are unhealthy.
TolerationDuration time.Duration
Expand Down Expand Up @@ -76,7 +76,7 @@ type CloudProvider interface {
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
// RepairPolicy is for CloudProviders to define a set Unhealthy condition for Karpenter
// to monitor on the node.
RepairPolicy() []RepairStatement
RepairPolicies() []RepairPolicy
// Name returns the CloudProvider implementation name.
Name() string
// GetSupportedNodeClasses returns CloudProvider NodeClass that implements status.Object
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func NewControllers(
health.NewController(kubeClient, cloudProvider, clock),
}

// The cloud provider must define status conation for the node repair controller to used for detecting unhealthy nodes
if len(cloudProvider.RepairPolicy()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
// The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes
if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock))
}

Expand Down
72 changes: 28 additions & 44 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -63,67 +62,52 @@ func (c *Controller) Register(_ context.Context, m manager.Manager) error {
}

func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) {
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}

ctx = injection.WithControllerName(ctx, "node.health")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("nodeclaim", nodeClaim.Name))
nodeHealthCondition := corev1.NodeCondition{}
foundCloudProviderPolicy := cloudprovider.RepairStatement{}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(node.Namespace, node.Name)))

// Validate that the node is owned by us and is not being deleted
if !node.GetDeletionTimestamp().IsZero() || !controllerutil.ContainsFinalizer(node, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}

for _, policy := range c.cloudProvider.RepairPolicy() {
nodeHealthCondition = nodeutils.GetCondition(node, policy.Type)
if nodeHealthCondition.Status == policy.Status {
// found unhealthy condition on the node
foundCloudProviderPolicy = policy
break
}
}

// From here there are three scenarios to handle:
// 1. If node is healthy, exit node repair loop
if foundCloudProviderPolicy.Type == "" {
return reconcile.Result{}, nil
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}

// 2. If the Node is unhealthy, but has not reached it's full toleration disruption, exit the loop
disruptionTime := nodeHealthCondition.LastTransitionTime.Add(foundCloudProviderPolicy.TolerationDuration)
if c.clock.Now().Before(disruptionTime) {
return reconcile.Result{RequeueAfter: disruptionTime.Sub(c.clock.Now())}, nil
// If find if a node is unhealthy
healthCondition, _ := lo.Find(c.cloudProvider.RepairPolicies(), func(policy cloudprovider.RepairPolicy) bool {
nodeCondition := nodeutils.GetCondition(node, policy.ConditionType)
return nodeCondition.Status == policy.ConditionStatus
})

// If the Node is unhealthy, but has not reached it's full toleration disruption
// requeue at the termination time of the unhealthy node
terminationTime := nodeutils.GetCondition(node, healthCondition.ConditionType).LastTransitionTime.Add(healthCondition.TolerationDuration)
if c.clock.Now().Before(terminationTime) {
return reconcile.Result{RequeueAfter: terminationTime.Sub(c.clock.Now())}, nil
}

// For unhealthy past the tolerationDisruption window we can forcefully terminate the node
if err := c.annotateTerminationGracePeriod(ctx, nodeClaim); err != nil {
return reconcile.Result{}, fmt.Errorf("annotated termination grace period on nodeclaim, %w", err)
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// 3. Otherwise, if the Node is unhealthy and past it's tolerationDisruption window we can forcefully terminate the node
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
return reconcile.Result{}, err
}
// 4. The deletion timestamp has successfully been set for the Node, update relevant metrics.

// The deletion timestamp has successfully been set for the Node, update relevant metrics.
log.FromContext(ctx).V(1).Info("deleting unhealthy node")
metrics.NodeClaimsDisruptedTotal.With(prometheus.Labels{
metrics.ReasonLabel: metrics.UnhealthyReason,
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
}).Inc()
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: string(healthCondition.ConditionType),
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: node.Labels[v1.CapacityTypeLabelKey],
})
return reconcile.Result{}, nil
}

func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error {
stored := nodeClaim.DeepCopy()
terminationTime := c.clock.Now().Format(time.RFC3339)
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime})
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: c.clock.Now().Format(time.RFC3339)})

if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return client.IgnoreNotFound(err)
return err
}

return nil
Expand Down
70 changes: 35 additions & 35 deletions pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package health_test

import (
"context"
"fmt"
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/cache"

Expand All @@ -36,7 +38,8 @@ import (
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand All @@ -47,12 +50,12 @@ import (
var ctx context.Context
var healthController *health.Controller
var terminationController *termination.Controller
var nodeClaimController *nodeclaimlifecycle.Controller
var env *test.Environment
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue
var nodeClaimController *lifecycle.Controller

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -73,15 +76,14 @@ var _ = BeforeSuite(func() {
queue = terminator.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock)
terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
nodeClaimController = lifecycle.NewController(fakeClock, env.Client, cloudProvider, recorder)

nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}))
})

var _ = AfterSuite(func() {
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = Describe("Health", func() {
var _ = Describe("Node Health", func() {
var node *corev1.Node
var nodeClaim *v1.NodeClaim
var nodePool *v1.NodePool
Expand All @@ -107,7 +109,7 @@ var _ = Describe("Health", func() {
Context("Reconciliation", func() {
It("should delete nodes that are unhealthy by the cloud provider", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
Expand All @@ -121,23 +123,6 @@ var _ = Describe("Health", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should not reconcile when a node has deletion timestamp set", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
fakeClock.Step(60 * time.Minute)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
ExpectDeletionTimestampSet(ctx, env.Client, node)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
})
It("should not delete node when unhealthy type does not match cloud provider passed in value", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "FakeHealthyNode",
Expand All @@ -156,7 +141,7 @@ var _ = Describe("Health", func() {
})
It("should not delete node when health status does not match cloud provider passed in value", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
Expand All @@ -172,7 +157,7 @@ var _ = Describe("Health", func() {
})
It("should not delete node when health duration is not reached", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
Expand All @@ -187,9 +172,9 @@ var _ = Describe("Health", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
})
It("should set termination grace period to now when not defined ", func() {
It("should set annotation termination grace period when force termination is started", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
Expand All @@ -205,10 +190,10 @@ var _ = Describe("Health", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339)))
})
It("should respect termination grace period if set on the nodepool", func() {
It("should not respect termination grace period if set on the nodepool", func() {
nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Minute}
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
Expand All @@ -224,10 +209,25 @@ var _ = Describe("Health", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339)))
})
It("should return the requeue interval for the time between now and when the nodeClaim termination time", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)

fakeClock.Step(27 * time.Minute)

result := ExpectObjectReconciled(ctx, env.Client, healthController, node)
fmt.Println(result.RequeueAfter.String())
Expect(result.RequeueAfter).To(BeNumerically("~", time.Minute*3, time.Second))
})
})

Context("Forceful termination", func() {
It("should not respect node disruption budgets ", func() {
It("should ignore node disruption budgets", func() {
// Blocking disruption budgets
nodePool.Spec.Disruption = v1.Disruption{
Budgets: []v1.Budget{
Expand All @@ -237,7 +237,7 @@ var _ = Describe("Health", func() {
},
}
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
Expand All @@ -251,10 +251,10 @@ var _ = Describe("Health", func() {
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should not respect do-not-disrupt on node", func() {
It("should ignore do-not-disrupt on a node", func() {
node.Annotations = map[string]string{v1.DoNotDisruptAnnotationKey: "true"}
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
Expand All @@ -272,7 +272,7 @@ var _ = Describe("Health", func() {
Context("Metrics", func() {
It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
Expand All @@ -287,7 +287,7 @@ var _ = Describe("Health", func() {
ExpectNotFound(ctx, env.Client, node)

ExpectMetricCounterValue(metrics.NodeClaimsDisruptedTotal, 1, map[string]string{
metrics.ReasonLabel: metrics.UnhealthyReason,
metrics.ReasonLabel: string(cloudProvider.RepairPolicies()[0].ConditionType),
metrics.NodePoolLabel: nodePool.Name,
})
})
Expand Down
1 change: 0 additions & 1 deletion pkg/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
// Reasons for CREATE/DELETE shared metrics
ProvisionedReason = "provisioned"
ExpiredReason = "expired"
UnhealthyReason = "unhealthy"
)

// DurationBuckets returns a []float64 of default threshold values for duration histograms.
Expand Down

0 comments on commit 4a18b48

Please sign in to comment.