Skip to content

Commit

Permalink
Refine scheduling behavior with failed replicas
Browse files Browse the repository at this point in the history
Consider a node with a failed replica as used if the failed replica is
potentially reusable and replica-replenishment-wait-interval hasn't expired.

Longhorn 8043

Signed-off-by: Eric Weber <eric.weber@suse.com>
(cherry picked from commit 65ceb37)

# Conflicts:
#	scheduler/replica_scheduler.go
#	scheduler/replica_scheduler_test.go
  • Loading branch information
ejweber authored and mergify[bot] committed Mar 7, 2024
1 parent aa66553 commit 7da2a91
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 45 deletions.
132 changes: 101 additions & 31 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

type ReplicaScheduler struct {
ds *datastore.DataStore

// Required for unit testing.
nowHandler func() time.Time
}

type Disk struct {
Expand All @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct {
func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler {
rcScheduler := &ReplicaScheduler{
ds: ds,

// Required for unit testing.
nowHandler: time.Now,
}
return rcScheduler
}
Expand Down Expand Up @@ -175,6 +181,27 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
zoneSoftAntiAffinity = volume.Spec.ReplicaZoneSoftAntiAffinity == longhorn.ReplicaZoneSoftAntiAffinityEnabled
}

<<<<<<< HEAD
=======
diskSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaDiskSoftAntiAffinity)
if err != nil {
err = errors.Wrapf(err, "failed to get %v setting", types.SettingNameReplicaDiskSoftAntiAffinity)
multiError.Append(util.NewMultiError(err.Error()))
return map[string]*Disk{}, multiError
}
if volume.Spec.ReplicaDiskSoftAntiAffinity != longhorn.ReplicaDiskSoftAntiAffinityDefault &&
volume.Spec.ReplicaDiskSoftAntiAffinity != "" {
diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled
}

timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume)
if err != nil {
err = errors.Wrap(err, "failed to get time until replica replacement")
multiError.Append(util.NewMultiError(err.Error()))
return map[string]*Disk{}, multiError
}

>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas)
getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
multiError = util.NewMultiError()
for _, node := range nodes {
Expand All @@ -191,7 +218,7 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
}

usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo,
ignoreFailedReplicas)
ignoreFailedReplicas, timeToReplacementReplica == 0)

allowEmptyNodeSelectorVolume, err := rcs.ds.GetSettingAsBool(types.SettingNameAllowEmptyNodeSelectorVolume)
if err != nil {
Expand Down Expand Up @@ -539,7 +566,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep

hasPotentiallyReusableReplica := false
for _, r := range replicas {
if IsPotentiallyReusableReplica(r, hardNodeAffinity) {
if IsPotentiallyReusableReplica(r) {
hasPotentiallyReusableReplica = true
break
}
Expand All @@ -548,29 +575,18 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep
return 0
}

// Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later.
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err)
return 0
}
waitInterval := time.Duration(settingValue) * time.Second
lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)

timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume)
if err != nil {
logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err)
return 0
msg := "Failed to get time until replica replacement, will directly replenish a new replica"
logrus.WithError(err).Errorf(msg)
}
now := time.Now()
if now.After(lastDegradedAt.Add(waitInterval)) {
return 0
if timeUntilNext > 0 {
logrus.Infof("Replica replenishment is delayed until %v", timeOfNext)
}

logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval))
// Adding 1 more second to the check back interval to avoid clock skew
return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second
return timeUntilNext
}

<<<<<<< HEAD
func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) bool {
if r.Spec.FailedAt == "" {
return false
Expand All @@ -583,7 +599,14 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon
}
if r.Spec.EvictionRequested {
return false
=======
func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) (bool, error) {
// All failedReusableReplicas are also potentiallyFailedReusableReplicas.
if !IsPotentiallyReusableReplica(r) {
return false, nil
>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas)
}

if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
Expand Down Expand Up @@ -648,9 +671,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon
return true
}

// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable.
// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool {
// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means
// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica) bool {
if r.Spec.FailedAt == "" {
return false
}
Expand All @@ -663,9 +686,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string)
if r.Spec.EvictionRequested {
return false
}
if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
// TODO: Reuse failed replicas for a SPDK volume
if r.Spec.BackendStoreDriver == longhorn.BackendStoreDriverTypeV2 {
return false
Expand Down Expand Up @@ -822,10 +842,14 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long
<<<<<<< HEAD
=======

// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. Some callers do not consider a
// node or zone to be used if it contains a failed replica. ignoreFailedReplicas == true supports this use case.
// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to.
// - Some callers do not consider a node or zone to be used if it contains a failed replica.
// ignoreFailedReplicas == true supports this use case.
// - Otherwise, getCurrentNodesAndZones does not consider a node or zone to be occupied by a failed replica that can
// no longer be used or is likely actively being replaced. This makes nodes and zones with useless replicas
// available for scheduling.
func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node,
ignoreFailedReplicas bool) (map[string]*longhorn.Node,
ignoreFailedReplicas, creatingNewReplicasForReplenishment bool) (map[string]*longhorn.Node,
map[string]bool, map[string]bool, map[string]bool) {
usedNodes := map[string]*longhorn.Node{}
usedZones := map[string]bool{}
Expand All @@ -839,8 +863,16 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map
if r.DeletionTimestamp != nil {
continue
}
if r.Spec.FailedAt != "" && ignoreFailedReplicas {
continue
if r.Spec.FailedAt != "" {
if ignoreFailedReplicas {
continue
}
if !IsPotentiallyReusableReplica(r) {
continue // This replica can never be used again, so it does not count in scheduling decisions.
}
if creatingNewReplicasForReplenishment {
continue // Maybe this replica can be used again, but it is being actively replaced anyway.
}
}

if node, ok := nodeInfo[r.Spec.NodeID]; ok {
Expand Down Expand Up @@ -869,4 +901,42 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map

return usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones
}
<<<<<<< HEAD
>>>>>>> 29a895c2 (Consider a node with a failed replica as still used)
=======

// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume,
// even if there are potentially reusable failed replicas. It returns:
// - -time.Duration if there is no need for a replacement replica,
// - 0 if a replacement replica is needed right now (replica-replenishment-wait-interval has elapsed),
// - +time.Duration if a replacement replica will be needed (replica-replenishment-wait-interval has not elapsed).
func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) {
if volume.Status.Robustness != longhorn.VolumeRobustnessDegraded {
// No replacement replica is needed.
return -1, time.Time{}, nil
}

settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval")
return 0, time.Time{}, err
}
waitInterval := time.Duration(settingValue) * time.Second

lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)
if err != nil {
err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt)
return 0, time.Time{}, err
}

now := rcs.nowHandler()
if now.After(lastDegradedAt.Add(waitInterval)) {
// A replacement replica is needed now.
return 0, time.Time{}, nil
}

timeOfNext := lastDegradedAt.Add(waitInterval)
// Adding 1 more second to the check back interval to avoid clock skew
return timeOfNext.Sub(now) + time.Second, timeOfNext, nil
}
>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas)
Loading

0 comments on commit 7da2a91

Please sign in to comment.