From 3ac6b93c30e9f0839129e6fa3c205e171726340c Mon Sep 17 00:00:00 2001 From: Derek Su Date: Mon, 23 Sep 2024 17:28:33 +0800 Subject: [PATCH] refactor(controller): refactor upgradeEngineForVolume Preliminary task for v2 volume upgrade Longhorn 9104 Signed-off-by: Derek Su --- controller/volume_controller.go | 184 +++++++++++++++++++------------- 1 file changed, 109 insertions(+), 75 deletions(-) diff --git a/controller/volume_controller.go b/controller/volume_controller.go index 4f8a2ecb85..af658367e9 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -3007,9 +3007,7 @@ func (c *VolumeController) hasEngineStatusSynced(e *longhorn.Engine, rs map[stri return true } -func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) error { - var err error - +func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) { if len(es) > 1 { return nil } @@ -3115,44 +3113,95 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str } if types.IsDataEngineV1(v.Spec.DataEngine) { - oldImage, err := c.getEngineImageRO(v.Status.CurrentImage) - if err != nil { - log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Status.CurrentImage) + if err := c.checkOldAndNewEngineImagesForLiveUpgrade(v, volumeAndReplicaNodes...); err != nil { + log.Warnf("%v", err) return nil } - if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, volumeAndReplicaNodes...); !isReady { - log.WithError(err).Warnf("Engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image) - return nil - } - newImage, err := c.getEngineImageRO(v.Spec.Image) - if err != nil { - log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Spec.Image) - return nil - } - if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, volumeAndReplicaNodes...); !isReady { - log.WithError(err).Warnf("Engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image) - return nil + _, dataPathToOldRunningReplica, dataPathToNewReplica := c.groupReplicasByImageAndState(v, e, rs) + + // Skip checking and creating new replicas for the 2 cases: + // 1. Volume is degraded. + // 2. The new replicas is activated and all old replicas are already purged. + if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas { + if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica, + func(r *longhorn.Replica, engineImage string) { + r.Spec.Image = engineImage + }, + v.Spec.Image); err != nil { + return err + } } - if oldImage.Status.GitCommit == newImage.Status.GitCommit { - log.Warnf("Engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image) - return nil + if e.Spec.Image != v.Spec.Image { + replicaAddressMap, err := c.constructReplicaAddressMap(v, e, dataPathToNewReplica) + if err != nil { + return nil + } + + // Only upgrade e.Spec.Image if there are enough new upgraded replica. + // This prevent the deadlock in the case that an upgrade from engine image + // is followed immediately by an other upgrade. + // More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty. + // Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty. + // Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade, + // the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0. + // On the other hand, the engine controller blocks the engine's status from being refreshed + // and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume. + if len(replicaAddressMap) == v.Spec.NumberOfReplicas { + e.Spec.UpgradedReplicaAddressMap = replicaAddressMap + e.Spec.Image = v.Spec.Image + } } + c.finishLiveEngineUpgrade(v, e, rs, log) + } else { + // TODO: Implement the logic for data engine v2 + return nil + } + return nil +} - if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion || - oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion { - log.Warnf("Failed to live upgrade from %v to %v: the old controller version %v "+ - "is not compatible with the new controller version %v and the new controller minimal version %v", - oldImage.Spec.Image, newImage.Spec.Image, - oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion) - return nil +func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) { + log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ + "engine": e.Name, + "volumeDesiredEngineImage": v.Spec.Image, + }) + + replicaAddressMap := map[string]string{} + + for _, r := range dataPathToNewReplica { + // wait for all potentially healthy replicas become running + if r.Status.CurrentState != longhorn.InstanceStateRunning { + return replicaAddressMap, fmt.Errorf("replica %v is not running", r.Name) } + if r.Status.IP == "" { + log.WithField("replica", r.Name).Warn("replica is running but IP is empty") + continue + } + if r.Status.StorageIP == "" { + log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update") + continue + } + if r.Status.Port == 0 { + log.WithField("replica", r.Name).Warn("Replica is running but port is 0") + continue + } + replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port) } - unknownReplicas := map[string]*longhorn.Replica{} - dataPathToOldRunningReplica := map[string]*longhorn.Replica{} - dataPathToNewReplica := map[string]*longhorn.Replica{} + return replicaAddressMap, nil +} + +func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) { + log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ + "engine": e.Name, + "volumeDesiredEngineImage": v.Spec.Image, + }) + + unknownReplicas = map[string]*longhorn.Replica{} + dataPathToOldRunningReplica = map[string]*longhorn.Replica{} + dataPathToNewReplica = map[string]*longhorn.Replica{} + for _, r := range rs { dataPath := types.GetReplicaDataPath(r.Spec.DiskPath, r.Spec.DataDirectoryName) if r.Spec.Image == v.Status.CurrentImage && r.Status.CurrentState == longhorn.InstanceStateRunning && r.Spec.HealthyAt != "" { @@ -3165,53 +3214,38 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str } } - // Skip checking and creating new replicas for the 2 cases: - // 1. Volume is degraded. - // 2. The new replicas is activated and all old replicas are already purged. - if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas { - if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica, func(r *longhorn.Replica, engineImage string) { - r.Spec.Image = engineImage - }, v.Spec.Image); err != nil { - return err - } + return unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica +} + +func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn.Volume, nodes ...string) error { + oldImage, err := c.getEngineImageRO(v.Status.CurrentImage) + if err != nil { + return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Status.CurrentImage) } - if e.Spec.Image != v.Spec.Image { - replicaAddressMap := map[string]string{} - for _, r := range dataPathToNewReplica { - // wait for all potentially healthy replicas become running - if r.Status.CurrentState != longhorn.InstanceStateRunning { - return nil - } - if r.Status.IP == "" { - log.WithField("replica", r.Name).Warn("replica is running but IP is empty") - continue - } - if r.Status.StorageIP == "" { - log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update") - continue - } - if r.Status.Port == 0 { - log.WithField("replica", r.Name).Warn("Replica is running but port is 0") - continue - } - replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port) - } - // Only upgrade e.Spec.Image if there are enough new upgraded replica. - // This prevent the deadlock in the case that an upgrade from engine image - // is followed immediately by an other upgrade. - // More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty. - // Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty. - // Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade, - // the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0. - // On the other hand, the engine controller blocks the engine's status from being refreshed - // and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume. - if len(replicaAddressMap) == v.Spec.NumberOfReplicas { - e.Spec.UpgradedReplicaAddressMap = replicaAddressMap - e.Spec.Image = v.Spec.Image - } + if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, nodes...); !isReady { + return errors.Wrapf(err, "engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image) + } + newImage, err := c.getEngineImageRO(v.Spec.Image) + if err != nil { + return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Spec.Image) } - c.finishLiveEngineUpgrade(v, e, rs, log) + if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, nodes...); !isReady { + return errors.Wrapf(err, "engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image) + } + + if oldImage.Status.GitCommit == newImage.Status.GitCommit { + return fmt.Errorf("engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image) + } + + if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion || + oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion { + return fmt.Errorf("failed to live upgrade from %v to %v: the old controller version %v "+ + "is not compatible with the new controller version %v and the new controller minimal version %v", + oldImage.Spec.Image, newImage.Spec.Image, + oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion) + } + return nil }