Skip to content

Commit

Permalink
Consider a node with a failed replica as still used
Browse files Browse the repository at this point in the history
The backport to v1.5.x was complex. It required some test refactoring that
had not previously been backported. This commit includes the necessary refactor
and the reworked Longhorn 8043 commits.

Longhorn 8043

Signed-off-by: Eric Weber <eric.weber@suse.com>
(cherry picked from commit 29a895c)
(cherry picked from commit 65ceb37)
(cherry picked from commit e685946)
  • Loading branch information
ejweber committed Mar 11, 2024
1 parent 0b29d37 commit cf8f2c9
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 94 deletions.
142 changes: 97 additions & 45 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

type ReplicaScheduler struct {
ds *datastore.DataStore

// Required for unit testing.
nowHandler func() time.Time
}

type Disk struct {
Expand All @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct {
func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler {
rcScheduler := &ReplicaScheduler{
ds: ds,

// Required for unit testing.
nowHandler: time.Now,
}
return rcScheduler
}
Expand Down Expand Up @@ -88,7 +94,7 @@ func (rcs *ReplicaScheduler) ScheduleReplica(replica *longhorn.Replica, replicas
nodeDisksMap[node.Name] = disks
}

diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true)
diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true, false)

// there's no disk that fit for current replica
if len(diskCandidates) == 0 {
Expand Down Expand Up @@ -143,7 +149,17 @@ func getNodesWithEvictingReplicas(replicas map[string]*longhorn.Replica, nodeInf
return nodesWithEvictingReplicas
}

func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, nodeDisksMap map[string]map[string]struct{}, replicas map[string]*longhorn.Replica, volume *longhorn.Volume, requireSchedulingCheck bool) (map[string]*Disk, util.MultiError) {
// getDiskCandidates returns a map of the most appropriate disks a replica can be scheduled to (assuming it can be
// scheduled at all). For example, consider a case in which there are two disks on nodes without a replica for a volume
// and two disks on nodes with a replica for the same volume. getDiskCandidates only returns the disks without a
// replica, even if the replica can legally be scheduled on all four disks.
// Some callers (e.g. CheckAndReuseFailedReplicas) do not consider a node or zone to be used if it contains a failed
// replica. ignoreFailedReplicas == true supports this use case.
func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node,
nodeDisksMap map[string]map[string]struct{},
replicas map[string]*longhorn.Replica,
volume *longhorn.Volume,
requireSchedulingCheck, ignoreFailedReplicas bool) (map[string]*Disk, util.MultiError) {
multiError := util.NewMultiError()

nodeSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaSoftAntiAffinity)
Expand All @@ -165,6 +181,17 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
zoneSoftAntiAffinity = volume.Spec.ReplicaZoneSoftAntiAffinity == longhorn.ReplicaZoneSoftAntiAffinityEnabled
}

creatingNewReplicasForReplenishment := false
if volume.Status.Robustness == longhorn.VolumeRobustnessDegraded {
timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume)
if err != nil {
err = errors.Wrap(err, "failed to get time until replica replacement")
multiError.Append(util.NewMultiError(err.Error()))
return map[string]*Disk{}, multiError
}
creatingNewReplicasForReplenishment = timeToReplacementReplica == 0
}

getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
multiError = util.NewMultiError()
for _, node := range nodes {
Expand All @@ -182,14 +209,30 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
replicasCountPerNode := map[string]int{}
// Get current nodes and zones
for _, r := range replicas {
if r.Spec.NodeID != "" && r.DeletionTimestamp == nil && r.Spec.FailedAt == "" {
if node, ok := nodeInfo[r.Spec.NodeID]; ok {
usedNodes[r.Spec.NodeID] = node
// For empty zone label, we treat them as
// one zone.
usedZones[node.Status.Zone] = true
replicasCountPerNode[r.Spec.NodeID] = replicasCountPerNode[r.Spec.NodeID] + 1
if r.Spec.NodeID == "" {
continue
}
if r.DeletionTimestamp != nil {
continue
}
if r.Spec.FailedAt != "" {
if ignoreFailedReplicas {
continue
}
if !IsPotentiallyReusableReplica(r) {
continue // This replica can never be used again, so it does not count in scheduling decisions.
}
if creatingNewReplicasForReplenishment {
continue // Maybe this replica can be used again, but it is being actively replaced anyway.
}
}

if node, ok := nodeInfo[r.Spec.NodeID]; ok {
usedNodes[r.Spec.NodeID] = node
// For empty zone label, we treat them as
// one zone.
usedZones[node.Status.Zone] = true
replicasCountPerNode[r.Spec.NodeID] = replicasCountPerNode[r.Spec.NodeID] + 1
}
}

Expand Down Expand Up @@ -471,7 +514,9 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon
}
}

diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false)
// Call getDiskCandidates with ignoreFailedReplicas == true since we want the list of candidates to include disks
// that already contain a failed replica.
diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false, true)

var reusedReplica *longhorn.Replica
for _, suggestDisk := range diskCandidates {
Expand Down Expand Up @@ -513,7 +558,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep

hasPotentiallyReusableReplica := false
for _, r := range replicas {
if IsPotentiallyReusableReplica(r, hardNodeAffinity) {
if IsPotentiallyReusableReplica(r) {
hasPotentiallyReusableReplica = true
break
}
Expand All @@ -522,42 +567,25 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep
return 0
}

// Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later.
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume)
if err != nil {
logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err)
return 0
msg := "Failed to get time until replica replacement, will directly replenish a new replica"
logrus.WithError(err).Errorf(msg)
}
waitInterval := time.Duration(settingValue) * time.Second
lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)

if err != nil {
logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err)
return 0
if timeUntilNext > 0 {
// Adding another second to the checkBackDuration to avoid clock skew.
timeUntilNext = timeUntilNext + time.Second
logrus.Infof("Replica replenishment is delayed until %v", timeOfNext.Add(time.Second))
}
now := time.Now()
if now.After(lastDegradedAt.Add(waitInterval)) {
return 0
}

logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval))
// Adding 1 more second to the check back interval to avoid clock skew
return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second
return timeUntilNext
}

func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) bool {
if r.Spec.FailedAt == "" {
return false
}
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
return false
}
if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount {
return false
}
if r.Spec.EvictionRequested {
// All failedReusableReplicas are also potentiallyFailedReusableReplicas.
if !IsPotentiallyReusableReplica(r) {
return false
}

if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
Expand Down Expand Up @@ -622,9 +650,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon
return true
}

// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable.
// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool {
// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means
// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica) bool {
if r.Spec.FailedAt == "" {
return false
}
Expand All @@ -637,9 +665,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string)
if r.Spec.EvictionRequested {
return false
}
if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
// TODO: Reuse failed replicas for a SPDK volume
if r.Spec.BackendStoreDriver == longhorn.BackendStoreDriverTypeV2 {
return false
Expand Down Expand Up @@ -793,3 +818,30 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long
}
return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false
}

// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume,
// even if there are potentially reusable failed replicas. It returns 0 if replica-replenishment-wait-interval has
// elapsed and a new replica is needed right now.
func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) {
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval")
return 0, time.Time{}, err
}
waitInterval := time.Duration(settingValue) * time.Second

lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)
if err != nil {
err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt)
return 0, time.Time{}, err
}

now := rcs.nowHandler()
timeOfNext := lastDegradedAt.Add(waitInterval)
if now.After(timeOfNext) {
// A replacement replica is needed now.
return 0, time.Time{}, nil
}

return timeOfNext.Sub(now), timeOfNext, nil
}
Loading

0 comments on commit cf8f2c9

Please sign in to comment.