Skip to content

Commit

Permalink
Different drain timeout for Unknown nodes (#51)
Browse files Browse the repository at this point in the history
* drain unknown

* Update nodes.go

* Update server.go

* add unit test
  • Loading branch information
eytan-avisror authored Mar 10, 2020
1 parent eabed53 commit 48c3bc7
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 30 deletions.
45 changes: 24 additions & 21 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ const (
)

var (
localMode string
region string
queueName string
kubectlLocalPath string
nodeName string
logLevel string
deregisterTargetGroups bool
drainRetryIntervalSeconds int
maxDrainConcurrency int64
drainTimeoutSeconds int
pollingIntervalSeconds int
localMode string
region string
queueName string
kubectlLocalPath string
nodeName string
logLevel string
deregisterTargetGroups bool
drainRetryIntervalSeconds int
maxDrainConcurrency int64
drainTimeoutSeconds int
drainTimeoutUnknownSeconds int
pollingIntervalSeconds int

// DefaultRetryer is the default retry configuration for some AWS API calls
DefaultRetryer = client.DefaultRetryer{
Expand Down Expand Up @@ -81,15 +82,16 @@ var serveCmd = &cobra.Command{

// prepare runtime context
context := service.ManagerContext{
CacheConfig: cacheCfg,
KubectlLocalPath: kubectlLocalPath,
QueueName: queueName,
DrainTimeoutSeconds: int64(drainTimeoutSeconds),
PollingIntervalSeconds: int64(pollingIntervalSeconds),
DrainRetryIntervalSeconds: int64(drainRetryIntervalSeconds),
MaxDrainConcurrency: semaphore.NewWeighted(maxDrainConcurrency),
Region: region,
WithDeregister: deregisterTargetGroups,
CacheConfig: cacheCfg,
KubectlLocalPath: kubectlLocalPath,
QueueName: queueName,
DrainTimeoutSeconds: int64(drainTimeoutSeconds),
DrainTimeoutUnknownSeconds: int64(drainTimeoutUnknownSeconds),
PollingIntervalSeconds: int64(pollingIntervalSeconds),
DrainRetryIntervalSeconds: int64(drainRetryIntervalSeconds),
MaxDrainConcurrency: semaphore.NewWeighted(maxDrainConcurrency),
Region: region,
WithDeregister: deregisterTargetGroups,
}

s := service.New(auth, context)
Expand All @@ -105,7 +107,8 @@ func init() {
serveCmd.Flags().StringVar(&kubectlLocalPath, "kubectl-path", "/usr/local/bin/kubectl", "the path to kubectl binary")
serveCmd.Flags().StringVar(&logLevel, "log-level", "info", "the logging level (info, warning, debug)")
serveCmd.Flags().Int64Var(&maxDrainConcurrency, "max-drain-concurrency", 32, "maximum number of node drains to process in parallel")
serveCmd.Flags().IntVar(&drainTimeoutSeconds, "drain-timeout", 300, "hard time limit for drain")
serveCmd.Flags().IntVar(&drainTimeoutSeconds, "drain-timeout", 300, "hard time limit for draining healthy nodes")
serveCmd.Flags().IntVar(&drainTimeoutUnknownSeconds, "drain-timeout-unknown", 30, "hard time limit for draining nodes that are in unknown state")
serveCmd.Flags().IntVar(&drainRetryIntervalSeconds, "drain-interval", 30, "interval in seconds for which to retry draining")
serveCmd.Flags().IntVar(&pollingIntervalSeconds, "polling-interval", 10, "interval in seconds for which to poll SQS")
serveCmd.Flags().BoolVar(&deregisterTargetGroups, "with-deregister", true, "try to deregister deleting instance from target groups")
Expand Down
19 changes: 10 additions & 9 deletions pkg/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ type Manager struct {

// ManagerContext contain the user input parameters on the current context
type ManagerContext struct {
CacheConfig *cache.Config
KubectlLocalPath string
QueueName string
Region string
DrainTimeoutSeconds int64
DrainRetryIntervalSeconds int64
PollingIntervalSeconds int64
WithDeregister bool
MaxDrainConcurrency *semaphore.Weighted
CacheConfig *cache.Config
KubectlLocalPath string
QueueName string
Region string
DrainTimeoutUnknownSeconds int64
DrainTimeoutSeconds int64
DrainRetryIntervalSeconds int64
PollingIntervalSeconds int64
WithDeregister bool
MaxDrainConcurrency *semaphore.Weighted
}

// Authenticator holds clients for all required APIs
Expand Down
19 changes: 19 additions & 0 deletions pkg/service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,29 @@ func getNodeByInstance(k kubernetes.Interface, instanceID string) (v1.Node, bool
return foundNode, false
}

func isNodeStatusInCondition(node v1.Node, condition v1.ConditionStatus) bool {
var (
conditions = node.Status.Conditions
)
for _, c := range conditions {
if c.Type == v1.NodeReady {
if c.Status == condition {
return true
}
}
}
return false
}

func drainNode(kubectlPath, nodeName string, timeout, retryInterval int64) error {
drainArgs := []string{"drain", nodeName, "--ignore-daemonsets=true", "--delete-local-data=true", "--force", "--grace-period=-1"}
drainCommand := kubectlPath

if timeout == 0 {
log.Warn("skipping drain since timeout was set to 0")
return nil
}

err := runCommandWithContext(drainCommand, drainArgs, timeout, retryInterval)
if err != nil {
if err.Error() == "command execution timed out" {
Expand Down
35 changes: 35 additions & 0 deletions pkg/service/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,41 @@ var (
stubKubectlPathFail = "/bin/some-bad-file"
)

func Test_NodeStatusPredicate(t *testing.T) {
t.Log("Test_NodeStatusPredicate: should return true if node readiness is in given condition")

readyNode := v1.Node{
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}

unknownNode := v1.Node{
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
},
},
},
}

if isNodeStatusInCondition(readyNode, v1.ConditionTrue) != true {
t.Fatalf("expected isNodeStatusInCondition exists to be: %t, got: %t", true, false)
}

if isNodeStatusInCondition(unknownNode, v1.ConditionUnknown) != true {
t.Fatalf("expected isNodeStatusInCondition exists to be: %t, got: %t", true, false)
}

}

func Test_GetNodeByInstancePositive(t *testing.T) {
t.Log("Test_GetNodeByInstancePositive: If a node exists, should be able to get it's instance ID")
kubeClient := fake.NewSimpleClientset()
Expand Down
8 changes: 8 additions & 0 deletions pkg/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

v1 "k8s.io/api/core/v1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/elb"
Expand Down Expand Up @@ -58,6 +60,7 @@ func (mgr *Manager) Start() {
log.Infof("queue = %v", ctx.QueueName)
log.Infof("polling interval seconds = %v", ctx.PollingIntervalSeconds)
log.Infof("node drain timeout seconds = %v", ctx.DrainTimeoutSeconds)
log.Infof("unknown node drain timeout seconds = %v", ctx.DrainTimeoutUnknownSeconds)
log.Infof("node drain retry interval seconds = %v", ctx.DrainRetryIntervalSeconds)
log.Infof("with alb deregister = %v", ctx.WithDeregister)

Expand Down Expand Up @@ -231,6 +234,11 @@ func (mgr *Manager) drainNodeTarget(event *LifecycleEvent) error {
metrics.IncGauge(DrainingInstancesCountMetric)
defer metrics.DecGauge(DrainingInstancesCountMetric)

if isNodeStatusInCondition(event.referencedNode, v1.ConditionUnknown) {
log.Infof("%v> node is in unknown state, setting drain deadline to %vs", event.EC2InstanceID, ctx.DrainTimeoutUnknownSeconds)
drainTimeout = ctx.DrainTimeoutUnknownSeconds
}

log.Infof("%v> draining node/%v", event.EC2InstanceID, event.referencedNode.Name)
err := drainNode(kubectlPath, event.referencedNode.Name, drainTimeout, retryInterval)
if err != nil {
Expand Down

0 comments on commit 48c3bc7

Please sign in to comment.