diff --git a/csi/deployment_util.go b/csi/deployment_util.go index 4edd1a9ff1..250b4d64ca 100644 --- a/csi/deployment_util.go +++ b/csi/deployment_util.go @@ -23,6 +23,7 @@ import ( longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" longhornmeta "github.com/longhorn/longhorn-manager/meta" "github.com/longhorn/longhorn-manager/types" + "github.com/longhorn/longhorn-manager/util" ) const ( @@ -162,21 +163,9 @@ func getCommonDeployment(commonName, namespace, serviceAccount, image, rootDir s type resourceCreateFunc func(kubeClient *clientset.Clientset, obj runtime.Object) error type resourceDeleteFunc func(kubeClient *clientset.Clientset, name, namespace string) error -type resourceGetFunc func(kubeClient *clientset.Clientset, name, namespace string) (runtime.Object, error) - -func waitForDeletion(kubeClient *clientset.Clientset, name, namespace, resource string, getFunc resourceGetFunc) error { - for i := 0; i < maxRetryForDeletion; i++ { - _, err := getFunc(kubeClient, name, namespace) - if err != nil && apierrors.IsNotFound(err) { - return nil - } - time.Sleep(time.Duration(1) * time.Second) - } - return fmt.Errorf("foreground deletion of %s %s timed out", resource, name) -} func deploy(kubeClient *clientset.Clientset, obj runtime.Object, resource string, - createFunc resourceCreateFunc, deleteFunc resourceDeleteFunc, getFunc resourceGetFunc) (err error) { + createFunc resourceCreateFunc, deleteFunc resourceDeleteFunc, getFunc util.ResourceGetFunc) (err error) { kubeVersion, err := kubeClient.Discovery().ServerVersion() if err != nil { @@ -293,7 +282,7 @@ func needToUpdateDaemonSetImage(existingObj, newObj runtime.Object) bool { } func cleanup(kubeClient *clientset.Clientset, obj runtime.Object, resource string, - deleteFunc resourceDeleteFunc, getFunc resourceGetFunc) (err error) { + deleteFunc resourceDeleteFunc, getFunc util.ResourceGetFunc) (err error) { objMeta, err := meta.Accessor(obj) if err != nil { @@ -318,13 +307,13 @@ func cleanup(kubeClient *clientset.Clientset, obj runtime.Object, resource strin return err } if existingMeta.GetDeletionTimestamp() != nil { - return waitForDeletion(kubeClient, name, namespace, resource, getFunc) + return util.WaitForResourceDeletion(kubeClient, name, namespace, resource, maxRetryForDeletion, getFunc) } logrus.Infof("Deleting existing %s %s", resource, name) if err := deleteFunc(kubeClient, name, namespace); err != nil { return err } - return waitForDeletion(kubeClient, name, namespace, resource, getFunc) + return util.WaitForResourceDeletion(kubeClient, name, namespace, resource, maxRetryForDeletion, getFunc) } func serviceCreateFunc(kubeClient *clientset.Clientset, obj runtime.Object) error { diff --git a/upgrade/v14xto150/upgrade.go b/upgrade/v14xto150/upgrade.go index f054000807..4e822f6db8 100644 --- a/upgrade/v14xto150/upgrade.go +++ b/upgrade/v14xto150/upgrade.go @@ -5,20 +5,25 @@ import ( "strconv" "github.com/pkg/errors" + "github.com/sirupsen/logrus" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" "github.com/longhorn/longhorn-manager/types" upgradeutil "github.com/longhorn/longhorn-manager/upgrade/util" + "github.com/longhorn/longhorn-manager/util" ) const ( upgradeLogPrefix = "upgrade from v1.4.x to v1.5.0: " + + maxRetryForDeploymentDeletion = 300 ) func UpgradeResources(namespace string, lhClient *lhclientset.Clientset, kubeClient *clientset.Clientset, resourceMaps map[string]interface{}) error { @@ -26,15 +31,15 @@ func UpgradeResources(namespace string, lhClient *lhclientset.Clientset, kubeCli return err } - if err := upgradeVolumes(namespace, lhClient, resourceMaps); err != nil { + if err := upgradeWebhookAndRecoveryService(namespace, kubeClient); err != nil { return err } - if err := upgradeWebhookAndRecoveryService(namespace, kubeClient); err != nil { + if err := upgradeWebhookPDB(namespace, kubeClient); err != nil { return err } - if err := upgradeWebhookPDB(namespace, kubeClient); err != nil { + if err := upgradeVolumes(namespace, lhClient, resourceMaps); err != nil { return err } @@ -230,16 +235,27 @@ func upgradeWebhookAndRecoveryService(namespace string, kubeClient *clientset.Cl } return errors.Wrapf(err, upgradeLogPrefix+"failed to get deployment with label %v during the upgrade", selector) } + for _, deployment := range deployments.Items { err := kubeClient.AppsV1().Deployments(deployment.Namespace).Delete(context.TODO(), deployment.Name, metav1.DeleteOptions{}) if err != nil { return errors.Wrapf(err, upgradeLogPrefix+"failed to delete the deployment with label %v during the upgrade", selector) } + + err = util.WaitForResourceDeletion(kubeClient, deployment.Name, deployment.Namespace, selector, maxRetryForDeploymentDeletion, deploymentGetFunc) + if err != nil { + return errors.Wrapf(err, upgradeLogPrefix+"failed to wait for the deployment with label %v to be deleted during the upgrade", selector) + } + logrus.Infof("Deleted deployment %v with label %v during the upgrade", deployment.Name, selector) } } return nil } +func deploymentGetFunc(kubeClient *clientset.Clientset, name, namespace string) (runtime.Object, error) { + return kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) +} + func upgradeReplicas(namespace string, lhClient *lhclientset.Clientset, resourceMaps map[string]interface{}) (err error) { defer func() { err = errors.Wrapf(err, upgradeLogPrefix+"upgrade replica failed") diff --git a/util/util.go b/util/util.go index a8a0bb397a..5fb71e93d7 100644 --- a/util/util.go +++ b/util/util.go @@ -1006,3 +1006,16 @@ func VerifySnapshotLabels(labels map[string]string) error { func RemoveNewlines(input string) string { return strings.Replace(input, "\n", "", -1) } + +type ResourceGetFunc func(kubeClient *clientset.Clientset, name, namespace string) (runtime.Object, error) + +func WaitForResourceDeletion(kubeClient *clientset.Clientset, name, namespace, resource string, maxRetryForDeletion int, getFunc ResourceGetFunc) error { + for i := 0; i < maxRetryForDeletion; i++ { + _, err := getFunc(kubeClient, name, namespace) + if err != nil && apierrors.IsNotFound(err) { + return nil + } + time.Sleep(time.Duration(1) * time.Second) + } + return fmt.Errorf("foreground deletion of %s %s timed out", resource, name) +} diff --git a/webhook/resources/volume/mutator.go b/webhook/resources/volume/mutator.go index 62f671f507..9f980c284b 100644 --- a/webhook/resources/volume/mutator.go +++ b/webhook/resources/volume/mutator.go @@ -303,6 +303,12 @@ func (v *volumeMutator) Update(request *admission.Request, oldObj runtime.Object if string(volume.Spec.BackendStoreDriver) == "" { patchOps = append(patchOps, fmt.Sprintf(`{"op": "replace", "path": "/spec/backendStoreDriver", "value": "%s"}`, longhorn.BackendStoreDriverTypeV1)) } + if string(volume.Spec.BackupCompressionMethod) == "" { + patchOps = append(patchOps, fmt.Sprintf(`{"op": "replace", "path": "/spec/backupCompressionMethod", "value": "%s"}`, longhorn.BackupCompressionMethodGzip)) + } + if string(volume.Spec.OfflineReplicaRebuilding) == "" { + patchOps = append(patchOps, fmt.Sprintf(`{"op": "replace", "path": "/spec/offlineReplicaRebuilding", "value": "%s"}`, longhorn.OfflineReplicaRebuildingDisabled)) + } size := util.RoundUpSize(volume.Spec.Size) if size != volume.Spec.Size {