Skip to content

Commit

Permalink
refactor(controller): refactor upgradeEngineForVolume
Browse files Browse the repository at this point in the history
Preliminary task for v2 volume upgrade

Longhorn 9104

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed Sep 23, 2024
1 parent 2edf8ec commit b9ab47f
Showing 1 changed file with 109 additions and 75 deletions.
184 changes: 109 additions & 75 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3007,9 +3007,7 @@ func (c *VolumeController) hasEngineStatusSynced(e *longhorn.Engine, rs map[stri
return true
}

func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) error {
var err error

func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
if len(es) > 1 {
return nil
}
Expand Down Expand Up @@ -3115,44 +3113,95 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
}

if types.IsDataEngineV1(v.Spec.DataEngine) {
oldImage, err := c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Status.CurrentImage)
if err := c.checkOldAndNewEngineImagesForLiveUpgrade(v, volumeAndReplicaNodes...); err != nil {
log.Warnf("%v", err)
return nil
}

if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, volumeAndReplicaNodes...); !isReady {
log.WithError(err).Warnf("Engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image)
return nil
}
newImage, err := c.getEngineImageRO(v.Spec.Image)
if err != nil {
log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Spec.Image)
return nil
}
if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, volumeAndReplicaNodes...); !isReady {
log.WithError(err).Warnf("Engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image)
return nil
_, dataPathToOldRunningReplica, dataPathToNewReplica := c.groupReplicasByImageAndState(v, e, rs)

// Skip checking and creating new replicas for the 2 cases:
// 1. Volume is degraded.
// 2. The new replicas is activated and all old replicas are already purged.
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica,
func(r *longhorn.Replica, engineImage string) {
r.Spec.Image = engineImage
},
v.Spec.Image); err != nil {
return err
}
}

if oldImage.Status.GitCommit == newImage.Status.GitCommit {
log.Warnf("Engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image)
return nil
if e.Spec.Image != v.Spec.Image {
replicaAddressMap, err := c.constructReplicaAddressMap(v, e, dataPathToNewReplica)
if err != nil {
return nil
}

// Only upgrade e.Spec.Image if there are enough new upgraded replica.
// This prevent the deadlock in the case that an upgrade from engine image
// is followed immediately by an other upgrade.
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
// On the other hand, the engine controller blocks the engine's status from being refreshed
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
e.Spec.Image = v.Spec.Image
}
}
c.finishLiveEngineUpgrade(v, e, rs, log)
} else {
// TODO: Implement the logic for data engine v2
return nil
}
return nil
}

if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion ||
oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion {
log.Warnf("Failed to live upgrade from %v to %v: the old controller version %v "+
"is not compatible with the new controller version %v and the new controller minimal version %v",
oldImage.Spec.Image, newImage.Spec.Image,
oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion)
return nil
func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) {
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
"engine": e.Name,
"volumeDesiredEngineImage": v.Spec.Image,
})

replicaAddressMap := map[string]string{}

for _, r := range dataPathToNewReplica {
// wait for all potentially healthy replicas become running
if r.Status.CurrentState != longhorn.InstanceStateRunning {
return replicaAddressMap, fmt.Errorf("replica %v is not running", r.Name)
}
if r.Status.IP == "" {
log.WithField("replica", r.Name).Warn("replica is running but IP is empty")
continue
}
if r.Status.StorageIP == "" {
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
continue
}
if r.Status.Port == 0 {
log.WithField("replica", r.Name).Warn("Replica is running but port is 0")
continue
}
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
}

unknownReplicas := map[string]*longhorn.Replica{}
dataPathToOldRunningReplica := map[string]*longhorn.Replica{}
dataPathToNewReplica := map[string]*longhorn.Replica{}
return replicaAddressMap, nil
}

func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) {
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
"engine": e.Name,
"volumeDesiredEngineImage": v.Spec.Image,
})

unknownReplicas = map[string]*longhorn.Replica{}
dataPathToOldRunningReplica = map[string]*longhorn.Replica{}
dataPathToNewReplica = map[string]*longhorn.Replica{}

for _, r := range rs {
dataPath := types.GetReplicaDataPath(r.Spec.DiskPath, r.Spec.DataDirectoryName)
if r.Spec.Image == v.Status.CurrentImage && r.Status.CurrentState == longhorn.InstanceStateRunning && r.Spec.HealthyAt != "" {
Expand All @@ -3165,53 +3214,38 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
}
}

// Skip checking and creating new replicas for the 2 cases:
// 1. Volume is degraded.
// 2. The new replicas is activated and all old replicas are already purged.
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica, func(r *longhorn.Replica, engineImage string) {
r.Spec.Image = engineImage
}, v.Spec.Image); err != nil {
return err
}
return unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica
}

func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn.Volume, nodes ...string) error {
oldImage, err := c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Status.CurrentImage)
}

if e.Spec.Image != v.Spec.Image {
replicaAddressMap := map[string]string{}
for _, r := range dataPathToNewReplica {
// wait for all potentially healthy replicas become running
if r.Status.CurrentState != longhorn.InstanceStateRunning {
return nil
}
if r.Status.IP == "" {
log.WithField("replica", r.Name).Warn("replica is running but IP is empty")
continue
}
if r.Status.StorageIP == "" {
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
continue
}
if r.Status.Port == 0 {
log.WithField("replica", r.Name).Warn("Replica is running but port is 0")
continue
}
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
}
// Only upgrade e.Spec.Image if there are enough new upgraded replica.
// This prevent the deadlock in the case that an upgrade from engine image
// is followed immediately by an other upgrade.
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
// On the other hand, the engine controller blocks the engine's status from being refreshed
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
e.Spec.Image = v.Spec.Image
}
if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, nodes...); !isReady {
return errors.Wrapf(err, "engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image)
}
newImage, err := c.getEngineImageRO(v.Spec.Image)
if err != nil {
return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Spec.Image)
}
c.finishLiveEngineUpgrade(v, e, rs, log)
if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, nodes...); !isReady {
return errors.Wrapf(err, "engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image)
}

if oldImage.Status.GitCommit == newImage.Status.GitCommit {
return fmt.Errorf("engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image)
}

if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion ||
oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion {
return fmt.Errorf("failed to live upgrade from %v to %v: the old controller version %v "+
"is not compatible with the new controller version %v and the new controller minimal version %v",
oldImage.Spec.Image, newImage.Spec.Image,
oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion)
}

return nil
}

Expand Down

0 comments on commit b9ab47f

Please sign in to comment.