Skip to content

Commit

Permalink
Merge pull request #3 from thg-ice/fix-issues-with-node-selection
Browse files Browse the repository at this point in the history
fix selecting node issue and let k8s scheduler decides
  • Loading branch information
oussamarouabah authored Jul 16, 2024
2 parents 55009ea + 07e14f1 commit c8eb571
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 179 deletions.
12 changes: 0 additions & 12 deletions cmd/storage-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ var (
// K8s config file for the client.
kubeConfigFile = filepath.Join(os.Getenv("HOME"), ".kube", "config")

allowedCheckNodesEnv = os.Getenv("CHECK_STORAGE_ALLOWED_CHECK_NODES")
ignoredCheckNodesEnv = os.Getenv("CHECK_STORAGE_IGNORED_CHECK_NODES")

// By default, there is no storage class defined for the PVC (used the cluster default)
storageClassNameEnv = os.Getenv("CHECK_STORAGE_PVC_STORAGE_CLASS_NAME")

Expand Down Expand Up @@ -79,10 +76,6 @@ var (
checkNamespaceEnv = os.Getenv("CHECK_NAMESPACE")
checkNamespace string

// ServiceAccount that will deploy the test deployment [default = default]
checkServiceAccountEnv = os.Getenv("CHECK_SERVICE_ACCOUNT")
checkServiceAccount string

// Deployment pod resource requests and limits.
millicoreRequestEnv = os.Getenv("CHECK_POD_CPU_REQUEST")
millicoreRequest int
Expand All @@ -99,10 +92,6 @@ var (
// Check time limit.
checkTimeLimit time.Duration

// Boolean value if a rolling-update is requested.
rollingUpdateEnv = os.Getenv("CHECK_DEPLOYMENT_ROLLING_UPDATE")
rollingUpdate bool

// Additional container environment variables if a custom image is used for the deployment.
additionalEnvVarsEnv = os.Getenv("ADDITIONAL_ENV_VARS")
additionalEnvVars = make(map[string]string, 0)
Expand Down Expand Up @@ -282,5 +271,4 @@ func reportToKuberhealthy(ok bool, errs []string) {
if err != nil {
log.Fatalln("error reporting to kuberhealthy:", err.Error())
}
return
}
171 changes: 6 additions & 165 deletions cmd/storage-check/run_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,18 @@ package main
import (
"context"
"fmt"
"math/rand"
"os"
"reflect"
"strings"
"time"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Node holds a Node struct so we can easily get status
type Node struct {
name string
schedulable bool
override bool
status v1.NodeStatus
}

// runStorageCheck sets up a storage PVC, a storage init and storage check and applies it to the cluster.
// Attempts to read a known values from the mounted PVC on each node
func runStorageCheck() {

log.Infoln("Starting Storage check.")

checkNodes := make(map[string]*Node)

// Init a timeout for this entire check run.
runTimeout := time.After(checkTimeLimit)

// Init a timeout for cleaning up the check. Assume that the check should not take more than 2m.
cleanupTimeout := time.After(time.Minute * 2)

Expand Down Expand Up @@ -125,114 +107,7 @@ func runStorageCheck() {
reportErrorsToKuberhealthy([]string{"failed to initialize storage within timeout"})
return
}

// If the user supplied a list of nodes to check then we will do that exclusively
allowedCheckNodesEnv = os.Getenv("CHECK_STORAGE_ALLOWED_CHECK_NODES")
// TODO Can almost certainly combine this into a function for allowed and ignore
log.Infof("CHECK_STORAGE_ALLOWED_CHECK_NODES=%+v", allowedCheckNodesEnv)
if allowedCheckNodesEnv != "" {
log.Infof("Requsted explicit nodes to perform storage check. Will NOT do discovery %+v", allowedCheckNodesEnv)
delimiter := " "
// Support , or space separated strings
if strings.Contains(allowedCheckNodesEnv, ",") {
log.Debugln("Found , so using as a delimiter")
delimiter = ","
}

stringSlice := strings.Split(strings.TrimSpace(allowedCheckNodesEnv), delimiter)
log.Debugf("stringSlice is %s", stringSlice)
for _, v := range stringSlice {
node := new(Node)
log.Infof("Found a node to explicitly check for: %s", v)
node.name = strings.TrimSpace(v)
node.override = true
node.schedulable = true
checkNodes[node.name] = node
}
} else {
log.Infoln("Getting a list of worker nodes from the cluster")
//TODO Maybe allow different selectors to determine how to get "usable" nodes from the cluster?
nodes, err := client.CoreV1().Nodes().List(metav1.ListOptions{
//FieldSelector: "metadata.name=" + checkStorageName,
// LabelSelector: defaultLabelKey + "=" + defaultLabelValueBase + strconv.Itoa(int(now.Unix())),
})
if err != nil {
log.Infoln("Error on getting nodes..not sure what to do", err)
}

log.Infoln("Nodes are ", nodes)
for _, n := range nodes.Items {
log.Infoln("Node.name=", n.Name)
log.Infoln("Status=", n.Status)

node := new(Node)
node.name = n.Name
node.status = n.Status

if len(n.Spec.Taints) > 0 {
// By default, only schedule the storage checks on untainted nodes
node.schedulable = toleratesAllTaints(tolerations, n.Spec.Taints)

status := "be"
if !node.schedulable {
status = "NOT be"
}
log.Printf("Adding node %s with taints %s to %s scheduled for check", n.Name, formatTaints(n.Spec.Taints), status)
} else {
log.Infoln("Adding untainted node ", n.Name, " to be scheduled for check")
node.schedulable = true
}
checkNodes[node.name] = node
}
}

// If the user wants to ignore nodes via the environment variable
ignoredCheckNodesEnv = os.Getenv("CHECK_STORAGE_IGNORED_CHECK_NODES")
log.Infof("CHECK_STORAGE_IGNORED_CHECK_NODES=%+v", ignoredCheckNodesEnv)
if ignoredCheckNodesEnv != "" {
log.Infof("Requested nodes to ignore %+v", ignoredCheckNodesEnv)
delimiter := " "
// Support , or space separated strings
if strings.Contains(ignoredCheckNodesEnv, ",") {
log.Debugln("Found , so using as a delimiter")
delimiter = ","
}

stringSlice := strings.Split(strings.TrimSpace(ignoredCheckNodesEnv), delimiter)
for k, v := range stringSlice {
nodeName := strings.TrimSpace(v)
log.Debugf("k=%d, v=%v", k, nodeName)
if _, ok := checkNodes[nodeName]; ok {
log.Infof("Found a node to ignore: %s", nodeName)
checkNodes[nodeName].override = true
checkNodes[nodeName].schedulable = false
}
}
}

var checkableNodes []string
for nodeName, node := range checkNodes {
if node.schedulable {
checkableNodes = append(checkableNodes, nodeName)
}
}
if len(checkableNodes) == 0 {
log.Errorf("no node is available to run tests on")
return
}
var idx = rand.Intn(len(checkableNodes))
nodeName := checkableNodes[idx]

// Choose one random node to run test on
node, ok := checkNodes[nodeName]
if !ok {
log.Errorf("no node is available to run tests on")
return
}

log.Infof("Going to run check on this node %v", nodeName)
log.Infof("Creating config for %s %+v", nodeName, node)
config := checkNodeConfig(checkStorageName+"-check-job", checkStorageName, nodeName)
config := checkNodeConfig(checkStorageName+"-check-job", checkStorageName)
log.Infoln("Created config.")
log.Infof("It looks like: %+v", config)

Expand All @@ -243,23 +118,23 @@ func runStorageCheck() {
// Handle errors when the storage check process completes.
if checkStorageResult.Err != nil {
log.Errorln("error occurred checking storage in cluster:", checkStorageResult.Err)
reportErrorsToKuberhealthy([]string{checkStorageResult.Err.Error() + " Node:" + nodeName})
reportErrorsToKuberhealthy([]string{checkStorageResult.Err.Error()})
return
}
// Continue with the check if there is no error.
log.Infoln("Initialized storage check in namespace:", checkStorageResult.Pod.Namespace, "pod:", checkStorageResult.Pod.Name)
case <-ctx.Done():
// If there is a cancellation interrupt signal.
log.Infoln("Cancelling storage check and shutting down due to interrupt.")
reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout Node:" + nodeName})
reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout"})
return
case <-runTimeout:
// If creating a storage took too long, exit.
reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout Node:" + nodeName})
reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout"})
return
}
// If we made it here then our Job returned ok and the storage check passed
log.Infoln("Check for", nodeName, "was healthy! Removing the job now...")
log.Infoln("Check for node was healthy! Removing the job now...")

// Delete the storage check Job.
// TODO - add select to catch context timeout expiration
Expand All @@ -268,7 +143,7 @@ func runStorageCheck() {
log.Errorln("error cleaning up storage check job:", err)
}

log.Infoln("Removed job check for", nodeName)
log.Infoln("Removed job check")

// Clean up!
cleanUpError := cleanUp(ctx)
Expand All @@ -283,7 +158,6 @@ func runStorageCheck() {
// the check.
// TODO - add in context that expires when check times out
func cleanUp(ctx context.Context) error {

log.Infoln("Cleaning up storage, storage init job and storage check job.")
var err error
var resultErr error
Expand Down Expand Up @@ -376,36 +250,3 @@ func cleanUpOrphanedResources(ctx context.Context) chan error {

return cleanUpChan
}

func toleratesAllTaints(tolerations []v1.Toleration, nodeTaints []v1.Taint) bool {
for _, nodeTaint := range nodeTaints {
tolerated := false
for _, toleration := range tolerations {
if reflect.DeepEqual(toleration, v1.Toleration{
Key: nodeTaint.Key,
Value: nodeTaint.Value,
Operator: v1.TolerationOpEqual,
Effect: nodeTaint.Effect,
}) {
tolerated = true
break
}
}
if !tolerated {
return false
}
}
return true
}

func formatTaints(taints []v1.Taint) string {
var taintStrings []string

for _, taint := range taints {
// Format each taint as "key=value:effect"
taintString := fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect)
taintStrings = append(taintStrings, taintString)
}

return strings.Join(taintStrings, ",")
}
3 changes: 1 addition & 2 deletions cmd/storage-check/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func initializeStorageConfig(jobName string, pvcName string) *batchv1.Job {
}

// checkNodeConfig creates and configures a k8s job to initialize storage at PVC and returns the struct (ready to apply with client).
func checkNodeConfig(jobName string, pvcName string, node string) *batchv1.Job {
func checkNodeConfig(jobName string, pvcName string) *batchv1.Job {

// Make a Job
job := &batchv1.Job{}
Expand Down Expand Up @@ -203,7 +203,6 @@ func checkNodeConfig(jobName string, pvcName string, node string) *batchv1.Job {
Args: args,
},
},
NodeName: node,
RestartPolicy: v1.RestartPolicyNever,
Volumes: []corev1.Volume{{
Name: "data",
Expand Down

0 comments on commit c8eb571

Please sign in to comment.