Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(controller): refactor upgradeEngineForVolume #3176

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 109 additions & 75 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3007,9 +3007,7 @@ func (c *VolumeController) hasEngineStatusSynced(e *longhorn.Engine, rs map[stri
return true
}

func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) error {
var err error

func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
if len(es) > 1 {
return nil
}
Expand Down Expand Up @@ -3115,44 +3113,95 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
}

if types.IsDataEngineV1(v.Spec.DataEngine) {
oldImage, err := c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Status.CurrentImage)
if err := c.checkOldAndNewEngineImagesForLiveUpgrade(v, volumeAndReplicaNodes...); err != nil {
log.Warnf("%v", err)
return nil
}

if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, volumeAndReplicaNodes...); !isReady {
log.WithError(err).Warnf("Engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image)
return nil
}
newImage, err := c.getEngineImageRO(v.Spec.Image)
if err != nil {
log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Spec.Image)
return nil
}
if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, volumeAndReplicaNodes...); !isReady {
log.WithError(err).Warnf("Engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image)
return nil
_, dataPathToOldRunningReplica, dataPathToNewReplica := c.groupReplicasByImageAndState(v, e, rs)

// Skip checking and creating new replicas for the 2 cases:
// 1. Volume is degraded.
// 2. The new replicas is activated and all old replicas are already purged.
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica,
func(r *longhorn.Replica, engineImage string) {
r.Spec.Image = engineImage
},
v.Spec.Image); err != nil {
return err
}
}

if oldImage.Status.GitCommit == newImage.Status.GitCommit {
log.Warnf("Engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image)
return nil
if e.Spec.Image != v.Spec.Image {
replicaAddressMap, err := c.constructReplicaAddressMap(v, e, dataPathToNewReplica)
if err != nil {
return nil
}

// Only upgrade e.Spec.Image if there are enough new upgraded replica.
// This prevent the deadlock in the case that an upgrade from engine image
// is followed immediately by an other upgrade.
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
// On the other hand, the engine controller blocks the engine's status from being refreshed
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
e.Spec.Image = v.Spec.Image
}
}
c.finishLiveEngineUpgrade(v, e, rs, log)
} else {
// TODO: Implement the logic for data engine v2
return nil
}
return nil
}

if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion ||
oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion {
log.Warnf("Failed to live upgrade from %v to %v: the old controller version %v "+
"is not compatible with the new controller version %v and the new controller minimal version %v",
oldImage.Spec.Image, newImage.Spec.Image,
oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion)
return nil
func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) {
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
"engine": e.Name,
"volumeDesiredEngineImage": v.Spec.Image,
})

replicaAddressMap := map[string]string{}

for _, r := range dataPathToNewReplica {
// wait for all potentially healthy replicas become running
if r.Status.CurrentState != longhorn.InstanceStateRunning {
return replicaAddressMap, fmt.Errorf("replica %v is not running", r.Name)
}
if r.Status.IP == "" {
log.WithField("replica", r.Name).Warn("replica is running but IP is empty")
continue
}
if r.Status.StorageIP == "" {
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
continue
}
if r.Status.Port == 0 {
log.WithField("replica", r.Name).Warn("Replica is running but port is 0")
continue
}
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
}

unknownReplicas := map[string]*longhorn.Replica{}
dataPathToOldRunningReplica := map[string]*longhorn.Replica{}
dataPathToNewReplica := map[string]*longhorn.Replica{}
return replicaAddressMap, nil
}

func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) {
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
"engine": e.Name,
"volumeDesiredEngineImage": v.Spec.Image,
})

unknownReplicas = map[string]*longhorn.Replica{}
dataPathToOldRunningReplica = map[string]*longhorn.Replica{}
dataPathToNewReplica = map[string]*longhorn.Replica{}

for _, r := range rs {
dataPath := types.GetReplicaDataPath(r.Spec.DiskPath, r.Spec.DataDirectoryName)
if r.Spec.Image == v.Status.CurrentImage && r.Status.CurrentState == longhorn.InstanceStateRunning && r.Spec.HealthyAt != "" {
Expand All @@ -3165,53 +3214,38 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
}
}

// Skip checking and creating new replicas for the 2 cases:
// 1. Volume is degraded.
// 2. The new replicas is activated and all old replicas are already purged.
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica, func(r *longhorn.Replica, engineImage string) {
r.Spec.Image = engineImage
}, v.Spec.Image); err != nil {
return err
}
return unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica
}

func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn.Volume, nodes ...string) error {
oldImage, err := c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Status.CurrentImage)
}

if e.Spec.Image != v.Spec.Image {
replicaAddressMap := map[string]string{}
for _, r := range dataPathToNewReplica {
// wait for all potentially healthy replicas become running
if r.Status.CurrentState != longhorn.InstanceStateRunning {
return nil
}
if r.Status.IP == "" {
log.WithField("replica", r.Name).Warn("replica is running but IP is empty")
continue
}
if r.Status.StorageIP == "" {
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
continue
}
if r.Status.Port == 0 {
log.WithField("replica", r.Name).Warn("Replica is running but port is 0")
continue
}
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
}
// Only upgrade e.Spec.Image if there are enough new upgraded replica.
// This prevent the deadlock in the case that an upgrade from engine image
// is followed immediately by an other upgrade.
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
// On the other hand, the engine controller blocks the engine's status from being refreshed
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
e.Spec.Image = v.Spec.Image
}
if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, nodes...); !isReady {
return errors.Wrapf(err, "engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image)
}
newImage, err := c.getEngineImageRO(v.Spec.Image)
if err != nil {
return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Spec.Image)
}
c.finishLiveEngineUpgrade(v, e, rs, log)
if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, nodes...); !isReady {
return errors.Wrapf(err, "engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image)
}

if oldImage.Status.GitCommit == newImage.Status.GitCommit {
return fmt.Errorf("engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image)
}

if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion ||
oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion {
return fmt.Errorf("failed to live upgrade from %v to %v: the old controller version %v "+
"is not compatible with the new controller version %v and the new controller minimal version %v",
oldImage.Spec.Image, newImage.Spec.Image,
oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion)
}

return nil
}

Expand Down