From 07e14f161a695e9d3b20e1a02feed314c5577a19 Mon Sep 17 00:00:00 2001 From: oussama Date: Tue, 16 Jul 2024 10:37:09 +0100 Subject: [PATCH] fix selecting node issue and let k8s scheduler decides --- cmd/storage-check/main.go | 12 --- cmd/storage-check/run_check.go | 171 ++------------------------------- cmd/storage-check/storage.go | 3 +- 3 files changed, 7 insertions(+), 179 deletions(-) diff --git a/cmd/storage-check/main.go b/cmd/storage-check/main.go index fc4d395..e8356b8 100644 --- a/cmd/storage-check/main.go +++ b/cmd/storage-check/main.go @@ -29,9 +29,6 @@ var ( // K8s config file for the client. kubeConfigFile = filepath.Join(os.Getenv("HOME"), ".kube", "config") - allowedCheckNodesEnv = os.Getenv("CHECK_STORAGE_ALLOWED_CHECK_NODES") - ignoredCheckNodesEnv = os.Getenv("CHECK_STORAGE_IGNORED_CHECK_NODES") - // By default, there is no storage class defined for the PVC (used the cluster default) storageClassNameEnv = os.Getenv("CHECK_STORAGE_PVC_STORAGE_CLASS_NAME") @@ -79,10 +76,6 @@ var ( checkNamespaceEnv = os.Getenv("CHECK_NAMESPACE") checkNamespace string - // ServiceAccount that will deploy the test deployment [default = default] - checkServiceAccountEnv = os.Getenv("CHECK_SERVICE_ACCOUNT") - checkServiceAccount string - // Deployment pod resource requests and limits. millicoreRequestEnv = os.Getenv("CHECK_POD_CPU_REQUEST") millicoreRequest int @@ -99,10 +92,6 @@ var ( // Check time limit. checkTimeLimit time.Duration - // Boolean value if a rolling-update is requested. - rollingUpdateEnv = os.Getenv("CHECK_DEPLOYMENT_ROLLING_UPDATE") - rollingUpdate bool - // Additional container environment variables if a custom image is used for the deployment. additionalEnvVarsEnv = os.Getenv("ADDITIONAL_ENV_VARS") additionalEnvVars = make(map[string]string, 0) @@ -282,5 +271,4 @@ func reportToKuberhealthy(ok bool, errs []string) { if err != nil { log.Fatalln("error reporting to kuberhealthy:", err.Error()) } - return } diff --git a/cmd/storage-check/run_check.go b/cmd/storage-check/run_check.go index 20b2f6c..ba4cbd1 100644 --- a/cmd/storage-check/run_check.go +++ b/cmd/storage-check/run_check.go @@ -13,36 +13,18 @@ package main import ( "context" "fmt" - "math/rand" "os" - "reflect" - "strings" "time" log "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// Node holds a Node struct so we can easily get status -type Node struct { - name string - schedulable bool - override bool - status v1.NodeStatus -} - // runStorageCheck sets up a storage PVC, a storage init and storage check and applies it to the cluster. // Attempts to read a known values from the mounted PVC on each node func runStorageCheck() { - log.Infoln("Starting Storage check.") - - checkNodes := make(map[string]*Node) - // Init a timeout for this entire check run. runTimeout := time.After(checkTimeLimit) - // Init a timeout for cleaning up the check. Assume that the check should not take more than 2m. cleanupTimeout := time.After(time.Minute * 2) @@ -125,114 +107,7 @@ func runStorageCheck() { reportErrorsToKuberhealthy([]string{"failed to initialize storage within timeout"}) return } - - // If the user supplied a list of nodes to check then we will do that exclusively - allowedCheckNodesEnv = os.Getenv("CHECK_STORAGE_ALLOWED_CHECK_NODES") - // TODO Can almost certainly combine this into a function for allowed and ignore - log.Infof("CHECK_STORAGE_ALLOWED_CHECK_NODES=%+v", allowedCheckNodesEnv) - if allowedCheckNodesEnv != "" { - log.Infof("Requsted explicit nodes to perform storage check. Will NOT do discovery %+v", allowedCheckNodesEnv) - delimiter := " " - // Support , or space separated strings - if strings.Contains(allowedCheckNodesEnv, ",") { - log.Debugln("Found , so using as a delimiter") - delimiter = "," - } - - stringSlice := strings.Split(strings.TrimSpace(allowedCheckNodesEnv), delimiter) - log.Debugf("stringSlice is %s", stringSlice) - for _, v := range stringSlice { - node := new(Node) - log.Infof("Found a node to explicitly check for: %s", v) - node.name = strings.TrimSpace(v) - node.override = true - node.schedulable = true - checkNodes[node.name] = node - } - } else { - log.Infoln("Getting a list of worker nodes from the cluster") - //TODO Maybe allow different selectors to determine how to get "usable" nodes from the cluster? - nodes, err := client.CoreV1().Nodes().List(metav1.ListOptions{ - //FieldSelector: "metadata.name=" + checkStorageName, - // LabelSelector: defaultLabelKey + "=" + defaultLabelValueBase + strconv.Itoa(int(now.Unix())), - }) - if err != nil { - log.Infoln("Error on getting nodes..not sure what to do", err) - } - - log.Infoln("Nodes are ", nodes) - for _, n := range nodes.Items { - log.Infoln("Node.name=", n.Name) - log.Infoln("Status=", n.Status) - - node := new(Node) - node.name = n.Name - node.status = n.Status - - if len(n.Spec.Taints) > 0 { - // By default, only schedule the storage checks on untainted nodes - node.schedulable = toleratesAllTaints(tolerations, n.Spec.Taints) - - status := "be" - if !node.schedulable { - status = "NOT be" - } - log.Printf("Adding node %s with taints %s to %s scheduled for check", n.Name, formatTaints(n.Spec.Taints), status) - } else { - log.Infoln("Adding untainted node ", n.Name, " to be scheduled for check") - node.schedulable = true - } - checkNodes[node.name] = node - } - } - - // If the user wants to ignore nodes via the environment variable - ignoredCheckNodesEnv = os.Getenv("CHECK_STORAGE_IGNORED_CHECK_NODES") - log.Infof("CHECK_STORAGE_IGNORED_CHECK_NODES=%+v", ignoredCheckNodesEnv) - if ignoredCheckNodesEnv != "" { - log.Infof("Requested nodes to ignore %+v", ignoredCheckNodesEnv) - delimiter := " " - // Support , or space separated strings - if strings.Contains(ignoredCheckNodesEnv, ",") { - log.Debugln("Found , so using as a delimiter") - delimiter = "," - } - - stringSlice := strings.Split(strings.TrimSpace(ignoredCheckNodesEnv), delimiter) - for k, v := range stringSlice { - nodeName := strings.TrimSpace(v) - log.Debugf("k=%d, v=%v", k, nodeName) - if _, ok := checkNodes[nodeName]; ok { - log.Infof("Found a node to ignore: %s", nodeName) - checkNodes[nodeName].override = true - checkNodes[nodeName].schedulable = false - } - } - } - - var checkableNodes []string - for nodeName, node := range checkNodes { - if node.schedulable { - checkableNodes = append(checkableNodes, nodeName) - } - } - if len(checkableNodes) == 0 { - log.Errorf("no node is available to run tests on") - return - } - var idx = rand.Intn(len(checkableNodes)) - nodeName := checkableNodes[idx] - - // Choose one random node to run test on - node, ok := checkNodes[nodeName] - if !ok { - log.Errorf("no node is available to run tests on") - return - } - - log.Infof("Going to run check on this node %v", nodeName) - log.Infof("Creating config for %s %+v", nodeName, node) - config := checkNodeConfig(checkStorageName+"-check-job", checkStorageName, nodeName) + config := checkNodeConfig(checkStorageName+"-check-job", checkStorageName) log.Infoln("Created config.") log.Infof("It looks like: %+v", config) @@ -243,7 +118,7 @@ func runStorageCheck() { // Handle errors when the storage check process completes. if checkStorageResult.Err != nil { log.Errorln("error occurred checking storage in cluster:", checkStorageResult.Err) - reportErrorsToKuberhealthy([]string{checkStorageResult.Err.Error() + " Node:" + nodeName}) + reportErrorsToKuberhealthy([]string{checkStorageResult.Err.Error()}) return } // Continue with the check if there is no error. @@ -251,15 +126,15 @@ func runStorageCheck() { case <-ctx.Done(): // If there is a cancellation interrupt signal. log.Infoln("Cancelling storage check and shutting down due to interrupt.") - reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout Node:" + nodeName}) + reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout"}) return case <-runTimeout: // If creating a storage took too long, exit. - reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout Node:" + nodeName}) + reportErrorsToKuberhealthy([]string{"failed to check already initialized storage within timeout"}) return } // If we made it here then our Job returned ok and the storage check passed - log.Infoln("Check for", nodeName, "was healthy! Removing the job now...") + log.Infoln("Check for node was healthy! Removing the job now...") // Delete the storage check Job. // TODO - add select to catch context timeout expiration @@ -268,7 +143,7 @@ func runStorageCheck() { log.Errorln("error cleaning up storage check job:", err) } - log.Infoln("Removed job check for", nodeName) + log.Infoln("Removed job check") // Clean up! cleanUpError := cleanUp(ctx) @@ -283,7 +158,6 @@ func runStorageCheck() { // the check. // TODO - add in context that expires when check times out func cleanUp(ctx context.Context) error { - log.Infoln("Cleaning up storage, storage init job and storage check job.") var err error var resultErr error @@ -376,36 +250,3 @@ func cleanUpOrphanedResources(ctx context.Context) chan error { return cleanUpChan } - -func toleratesAllTaints(tolerations []v1.Toleration, nodeTaints []v1.Taint) bool { - for _, nodeTaint := range nodeTaints { - tolerated := false - for _, toleration := range tolerations { - if reflect.DeepEqual(toleration, v1.Toleration{ - Key: nodeTaint.Key, - Value: nodeTaint.Value, - Operator: v1.TolerationOpEqual, - Effect: nodeTaint.Effect, - }) { - tolerated = true - break - } - } - if !tolerated { - return false - } - } - return true -} - -func formatTaints(taints []v1.Taint) string { - var taintStrings []string - - for _, taint := range taints { - // Format each taint as "key=value:effect" - taintString := fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect) - taintStrings = append(taintStrings, taintString) - } - - return strings.Join(taintStrings, ",") -} diff --git a/cmd/storage-check/storage.go b/cmd/storage-check/storage.go index e43f9b0..f2b29bb 100644 --- a/cmd/storage-check/storage.go +++ b/cmd/storage-check/storage.go @@ -166,7 +166,7 @@ func initializeStorageConfig(jobName string, pvcName string) *batchv1.Job { } // checkNodeConfig creates and configures a k8s job to initialize storage at PVC and returns the struct (ready to apply with client). -func checkNodeConfig(jobName string, pvcName string, node string) *batchv1.Job { +func checkNodeConfig(jobName string, pvcName string) *batchv1.Job { // Make a Job job := &batchv1.Job{} @@ -203,7 +203,6 @@ func checkNodeConfig(jobName string, pvcName string, node string) *batchv1.Job { Args: args, }, }, - NodeName: node, RestartPolicy: v1.RestartPolicyNever, Volumes: []corev1.Volume{{ Name: "data",