Skip to content

Commit

Permalink
cherry-pick: address disruption taint race condition (#1180) (#1206)
Browse files Browse the repository at this point in the history
Co-authored-by: Nick Tran <10810510+njtran@users.noreply.github.com>
  • Loading branch information
jmdeal and njtran authored Apr 24, 2024
1 parent 545d88a commit b57bfe8
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 93 deletions.
2 changes: 2 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3788,6 +3788,8 @@ var _ = Describe("Consolidation", func() {
Eventually(finished.Load, 10*time.Second).Should(BeTrue())
wg.Wait()

ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

// nothing should be removed since the node is no longer empty
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
Expand Down
34 changes: 14 additions & 20 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package disruption
import (
"context"
"errors"
"fmt"

"github.com/samber/lo"
"knative.dev/pkg/logging"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
Expand Down Expand Up @@ -88,31 +88,25 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return Command{}, scheduling.Results{}, errors.New("interrupted")
case <-c.clock.After(consolidationTTL):
}
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDisrupt, c.queue)
if err != nil {
logging.FromContext(ctx).Errorf("computing validation candidates %s", err)
return Command{}, scheduling.Results{}, err
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
candidatesToDelete := mapCandidates(cmd.candidates, validationCandidates)

postValidationMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("building disruption budgets, %w", err)
}

// The deletion of empty NodeClaims is easy to validate, we just ensure that:
// 1. All the candidatesToDelete are still empty
// 2. The node isn't a target of a recent scheduling simulation
// 3. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range candidatesToDelete {
if len(n.reschedulablePods) != 0 || c.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
postValidationMapping[n.nodePool.Name]--
return Command{}, scheduling.Results{}, err
}

// TODO (jmdeal@): better encapsulate within validation
if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}

return cmd, scheduling.Results{}, nil
}

Expand Down
15 changes: 6 additions & 9 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,12 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return cmd, scheduling.Results{}, nil
}

v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue)
isValid, err := v.IsValid(ctx, cmd)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("validating, %w", err)
}

if !isValid {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
return cmd, results, nil
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
consolidationTypeLabel: s.ConsolidationType(),
}).Set(float64(len(candidates)))

v := NewValidation(consolidationTTL, s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)
v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)

// Set a timeout
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
Expand Down Expand Up @@ -78,14 +78,13 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if cmd.Action() == NoOpAction {
continue
}
isValid, err := v.IsValid(ctx, cmd)
if err != nil {
if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
if !isValid {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return cmd, results, nil
}
if !constrainedByBudgets {
Expand Down
146 changes: 89 additions & 57 deletions pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,82 +35,114 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

type ValidationError struct {
error
}

func NewValidationError(err error) *ValidationError {
return &ValidationError{error: err}
}

func IsValidationError(err error) bool {
if err == nil {
return false
}
var validationError *ValidationError
return errors.As(err, &validationError)
}

// Validation is used to perform validation on a consolidation command. It makes an assumption that when re-used, all
// of the commands passed to IsValid were constructed based off of the same consolidation state. This allows it to
// skip the validation TTL for all but the first command.
type Validation struct {
validationPeriod time.Duration
start time.Time
clock clock.Clock
cluster *state.Cluster
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
provisioner *provisioning.Provisioner
once sync.Once
recorder events.Recorder
queue *orchestration.Queue
start time.Time
clock clock.Clock
cluster *state.Cluster
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
provisioner *provisioning.Provisioner
once sync.Once
recorder events.Recorder
queue *orchestration.Queue
}

func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
cp cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) *Validation {
return &Validation{
validationPeriod: validationPeriod,
clock: clk,
cluster: cluster,
kubeClient: kubeClient,
provisioner: provisioner,
cloudProvider: cp,
recorder: recorder,
queue: queue,
clock: clk,
cluster: cluster,
kubeClient: kubeClient,
provisioner: provisioner,
cloudProvider: cp,
recorder: recorder,
queue: queue,
}
}

//nolint:gocyclo
func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod time.Duration) error {
var err error
v.once.Do(func() {
v.start = v.clock.Now()
})

waitDuration := v.validationPeriod - v.clock.Since(v.start)
waitDuration := validationPeriod - v.clock.Since(v.start)
if waitDuration > 0 {
select {
case <-ctx.Done():
return false, errors.New("context canceled")
return errors.New("context canceled")
case <-v.clock.After(waitDuration):
}
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
// We perform filtering here to ensure that none of the proposed candidates have blocking PDBs or do-not-evict/do-not-disrupt pods scheduled to them
validationCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
return err
}
validationCandidates = mapCandidates(cmd.candidates, validationCandidates)
// If we filtered out any candidates, return false as some NodeClaims in the consolidation decision have changed.
if len(validationCandidates) != len(cmd.candidates) {
return false, nil
if err := v.ValidateCommand(ctx, cmd, validatedCandidates); err != nil {
return err
}
// Rebuild the disruption budget mapping to see if any budgets have changed since validation.
postValidationMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
// Revalidate candidates after validating the command. This mitigates the chance of a race condition outlined in
// the following GitHub issue: https://github.com/kubernetes-sigs/karpenter/issues/1167.
if _, err = v.ValidateCandidates(ctx, validatedCandidates...); err != nil {
return err
}
return nil
}

// ValidateCandidates gets the current representation of the provided candidates and ensures that they are all still valid.
// For a candidate to still be valid, the following conditions must be met:
//
// a. It must pass the global candidate filtering logic (no blocking PDBs, no do-not-disrupt annotation, etc)
// b. It must not have any pods nominated for it
// c. It must still be disruptable without violating node disruption budgets
//
// If these conditions are met for all candidates, ValidateCandidates returns a slice with the updated representations.
func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Candidate) ([]*Candidate, error) {
validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
if err != nil {
return false, fmt.Errorf("building disruption budgets, %w", err)
}
// 1. a candidate we are about to delete is a target of a currently pending pod, wait for that to settle
// before continuing consolidation
// 2. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range validationCandidates {
if v.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
return false, nil
}
postValidationMapping[n.nodePool.Name]--
return nil, fmt.Errorf("constructing validation candidates, %w", err)
}
isValid, err := v.ValidateCommand(ctx, cmd, validationCandidates)
validatedCandidates = mapCandidates(candidates, validatedCandidates)
// If we filtered out any candidates, return nil as some NodeClaims in the consolidation decision have changed.
if len(validatedCandidates) != len(candidates) {
return nil, NewValidationError(fmt.Errorf("%d candidates are no longer valid", len(candidates)-len(validatedCandidates)))
}
disruptionBudgetMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
if err != nil {
return false, fmt.Errorf("validating command, %w", err)
return nil, fmt.Errorf("building disruption budgets, %w", err)
}
// Return nil if any candidate meets either of the following conditions:
// a. A pod was nominated to the candidate
// b. Disrupting the candidate would violate node disruption budgets
for _, vc := range validatedCandidates {
if v.cluster.IsNodeNominated(vc.ProviderID()) {
return nil, NewValidationError(fmt.Errorf("a candidate was nominated during validation"))
}
if disruptionBudgetMapping[vc.nodePool.Name] == 0 {
return nil, NewValidationError(fmt.Errorf("a candidate can no longer be disrupted without violating budgets"))
}
disruptionBudgetMapping[vc.nodePool.Name]--
}
return isValid, nil
return validatedCandidates, nil
}

// ShouldDisrupt is a predicate used to filter candidates
Expand All @@ -123,17 +155,17 @@ func (v *Validation) ShouldDisrupt(_ context.Context, c *Candidate) bool {
}

// ValidateCommand validates a command for a Method
func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) (bool, error) {
func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) error {
// None of the chosen candidate are valid for execution, so retry
if len(candidates) == 0 {
return false, nil
return NewValidationError(fmt.Errorf("no candidates"))
}
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
if err != nil {
return false, fmt.Errorf("simluating scheduling, %w", err)
return fmt.Errorf("simluating scheduling, %w", err)
}
if !results.AllNonPendingPodsScheduled() {
return false, nil
return NewValidationError(fmt.Errorf("all pending pods could not be scheduled"))
}

// We want to ensure that the re-simulated scheduling using the current cluster state produces the same result.
Expand All @@ -145,22 +177,22 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
if len(results.NewNodeClaims) == 0 {
if len(cmd.replacements) == 0 {
// scheduling produced zero new NodeClaims and we weren't expecting any, so this is valid.
return true, nil
return nil
}
// if it produced no new NodeClaims, but we were expecting one we should re-simulate as there is likely a better
// consolidation option now
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n)
if len(results.NewNodeClaims) > 1 {
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// we now know that scheduling simulation wants to create one new node
if len(cmd.replacements) == 0 {
// but we weren't expecting any new NodeClaims, so this is invalid
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// We know that the scheduling simulation wants to create a new node and that the command we are verifying wants
Expand All @@ -175,11 +207,11 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
// now says that we need to launch a 4xlarge. It's still launching the correct number of NodeClaims, but it's just
// as expensive or possibly more so we shouldn't validate.
if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, results.NewNodeClaims[0].InstanceTypeOptions) {
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// Now we know:
// - current scheduling simulation says to create a new node with types T = {T_0, T_1, ..., T_n}
// - our lifecycle command says to create a node with types {U_0, U_1, ..., U_n} where U is a subset of T
return true, nil
return nil
}

0 comments on commit b57bfe8

Please sign in to comment.