Skip to content

Commit

Permalink
Mutate the resources that need update during upgrade
Browse files Browse the repository at this point in the history
The patch is only for improving the resilience.

Longhorn 6259

Signed-off-by: Derek Su <derek.su@suse.com>
(cherry picked from commit 75d2e77)
  • Loading branch information
derekbit authored and David Ko committed Jul 11, 2023
1 parent a9f2fef commit bef6044
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
21 changes: 5 additions & 16 deletions csi/deployment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
longhornmeta "github.com/longhorn/longhorn-manager/meta"
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util"
)

const (
Expand Down Expand Up @@ -162,21 +163,9 @@ func getCommonDeployment(commonName, namespace, serviceAccount, image, rootDir s

type resourceCreateFunc func(kubeClient *clientset.Clientset, obj runtime.Object) error
type resourceDeleteFunc func(kubeClient *clientset.Clientset, name, namespace string) error
type resourceGetFunc func(kubeClient *clientset.Clientset, name, namespace string) (runtime.Object, error)

func waitForDeletion(kubeClient *clientset.Clientset, name, namespace, resource string, getFunc resourceGetFunc) error {
for i := 0; i < maxRetryForDeletion; i++ {
_, err := getFunc(kubeClient, name, namespace)
if err != nil && apierrors.IsNotFound(err) {
return nil
}
time.Sleep(time.Duration(1) * time.Second)
}
return fmt.Errorf("foreground deletion of %s %s timed out", resource, name)
}

func deploy(kubeClient *clientset.Clientset, obj runtime.Object, resource string,
createFunc resourceCreateFunc, deleteFunc resourceDeleteFunc, getFunc resourceGetFunc) (err error) {
createFunc resourceCreateFunc, deleteFunc resourceDeleteFunc, getFunc util.ResourceGetFunc) (err error) {

kubeVersion, err := kubeClient.Discovery().ServerVersion()
if err != nil {
Expand Down Expand Up @@ -293,7 +282,7 @@ func needToUpdateDaemonSetImage(existingObj, newObj runtime.Object) bool {
}

func cleanup(kubeClient *clientset.Clientset, obj runtime.Object, resource string,
deleteFunc resourceDeleteFunc, getFunc resourceGetFunc) (err error) {
deleteFunc resourceDeleteFunc, getFunc util.ResourceGetFunc) (err error) {

objMeta, err := meta.Accessor(obj)
if err != nil {
Expand All @@ -318,13 +307,13 @@ func cleanup(kubeClient *clientset.Clientset, obj runtime.Object, resource strin
return err
}
if existingMeta.GetDeletionTimestamp() != nil {
return waitForDeletion(kubeClient, name, namespace, resource, getFunc)
return util.WaitForResourceDeletion(kubeClient, name, namespace, resource, maxRetryForDeletion, getFunc)
}
logrus.Infof("Deleting existing %s %s", resource, name)
if err := deleteFunc(kubeClient, name, namespace); err != nil {
return err
}
return waitForDeletion(kubeClient, name, namespace, resource, getFunc)
return util.WaitForResourceDeletion(kubeClient, name, namespace, resource, maxRetryForDeletion, getFunc)
}

func serviceCreateFunc(kubeClient *clientset.Clientset, obj runtime.Object) error {
Expand Down
22 changes: 19 additions & 3 deletions upgrade/v14xto150/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@ import (
"strconv"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"
"github.com/longhorn/longhorn-manager/types"
upgradeutil "github.com/longhorn/longhorn-manager/upgrade/util"
"github.com/longhorn/longhorn-manager/util"
)

const (
upgradeLogPrefix = "upgrade from v1.4.x to v1.5.0: "

maxRetryForDeploymentDeletion = 300
)

func UpgradeResources(namespace string, lhClient *lhclientset.Clientset, kubeClient *clientset.Clientset, resourceMaps map[string]interface{}) error {
if err := upgradeCSIPlugin(namespace, kubeClient); err != nil {
return err
}

if err := upgradeVolumes(namespace, lhClient, resourceMaps); err != nil {
if err := upgradeWebhookAndRecoveryService(namespace, kubeClient); err != nil {
return err
}

if err := upgradeWebhookAndRecoveryService(namespace, kubeClient); err != nil {
if err := upgradeWebhookPDB(namespace, kubeClient); err != nil {
return err
}

if err := upgradeWebhookPDB(namespace, kubeClient); err != nil {
if err := upgradeVolumes(namespace, lhClient, resourceMaps); err != nil {
return err
}

Expand Down Expand Up @@ -230,16 +235,27 @@ func upgradeWebhookAndRecoveryService(namespace string, kubeClient *clientset.Cl
}
return errors.Wrapf(err, upgradeLogPrefix+"failed to get deployment with label %v during the upgrade", selector)
}

for _, deployment := range deployments.Items {
err := kubeClient.AppsV1().Deployments(deployment.Namespace).Delete(context.TODO(), deployment.Name, metav1.DeleteOptions{})
if err != nil {
return errors.Wrapf(err, upgradeLogPrefix+"failed to delete the deployment with label %v during the upgrade", selector)
}

err = util.WaitForResourceDeletion(kubeClient, deployment.Name, deployment.Namespace, selector, maxRetryForDeploymentDeletion, deploymentGetFunc)
if err != nil {
return errors.Wrapf(err, upgradeLogPrefix+"failed to wait for the deployment with label %v to be deleted during the upgrade", selector)
}
logrus.Infof("Deleted deployment %v with label %v during the upgrade", deployment.Name, selector)
}
}
return nil
}

func deploymentGetFunc(kubeClient *clientset.Clientset, name, namespace string) (runtime.Object, error) {
return kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

func upgradeReplicas(namespace string, lhClient *lhclientset.Clientset, resourceMaps map[string]interface{}) (err error) {
defer func() {
err = errors.Wrapf(err, upgradeLogPrefix+"upgrade replica failed")
Expand Down
13 changes: 13 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,3 +1006,16 @@ func VerifySnapshotLabels(labels map[string]string) error {
func RemoveNewlines(input string) string {
return strings.Replace(input, "\n", "", -1)
}

type ResourceGetFunc func(kubeClient *clientset.Clientset, name, namespace string) (runtime.Object, error)

func WaitForResourceDeletion(kubeClient *clientset.Clientset, name, namespace, resource string, maxRetryForDeletion int, getFunc ResourceGetFunc) error {
for i := 0; i < maxRetryForDeletion; i++ {
_, err := getFunc(kubeClient, name, namespace)
if err != nil && apierrors.IsNotFound(err) {
return nil
}
time.Sleep(time.Duration(1) * time.Second)
}
return fmt.Errorf("foreground deletion of %s %s timed out", resource, name)
}
6 changes: 6 additions & 0 deletions webhook/resources/volume/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ func (v *volumeMutator) Update(request *admission.Request, oldObj runtime.Object
if string(volume.Spec.BackendStoreDriver) == "" {
patchOps = append(patchOps, fmt.Sprintf(`{"op": "replace", "path": "/spec/backendStoreDriver", "value": "%s"}`, longhorn.BackendStoreDriverTypeV1))
}
if string(volume.Spec.BackupCompressionMethod) == "" {
patchOps = append(patchOps, fmt.Sprintf(`{"op": "replace", "path": "/spec/backupCompressionMethod", "value": "%s"}`, longhorn.BackupCompressionMethodGzip))
}
if string(volume.Spec.OfflineReplicaRebuilding) == "" {
patchOps = append(patchOps, fmt.Sprintf(`{"op": "replace", "path": "/spec/offlineReplicaRebuilding", "value": "%s"}`, longhorn.OfflineReplicaRebuildingDisabled))
}

size := util.RoundUpSize(volume.Spec.Size)
if size != volume.Spec.Size {
Expand Down

0 comments on commit bef6044

Please sign in to comment.