Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider a node with a failed reusable replica as still used (backport #2650) #2679

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 97 additions & 45 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

type ReplicaScheduler struct {
ds *datastore.DataStore

// Required for unit testing.
nowHandler func() time.Time
}

type Disk struct {
Expand All @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct {
func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler {
rcScheduler := &ReplicaScheduler{
ds: ds,

// Required for unit testing.
nowHandler: time.Now,
}
return rcScheduler
}
Expand Down Expand Up @@ -88,7 +94,7 @@ func (rcs *ReplicaScheduler) ScheduleReplica(replica *longhorn.Replica, replicas
nodeDisksMap[node.Name] = disks
}

diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true)
diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true, false)

// there's no disk that fit for current replica
if len(diskCandidates) == 0 {
Expand Down Expand Up @@ -143,7 +149,17 @@ func getNodesWithEvictingReplicas(replicas map[string]*longhorn.Replica, nodeInf
return nodesWithEvictingReplicas
}

func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, nodeDisksMap map[string]map[string]struct{}, replicas map[string]*longhorn.Replica, volume *longhorn.Volume, requireSchedulingCheck bool) (map[string]*Disk, util.MultiError) {
// getDiskCandidates returns a map of the most appropriate disks a replica can be scheduled to (assuming it can be
// scheduled at all). For example, consider a case in which there are two disks on nodes without a replica for a volume
// and two disks on nodes with a replica for the same volume. getDiskCandidates only returns the disks without a
// replica, even if the replica can legally be scheduled on all four disks.
// Some callers (e.g. CheckAndReuseFailedReplicas) do not consider a node or zone to be used if it contains a failed
// replica. ignoreFailedReplicas == true supports this use case.
func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node,
nodeDisksMap map[string]map[string]struct{},
replicas map[string]*longhorn.Replica,
volume *longhorn.Volume,
requireSchedulingCheck, ignoreFailedReplicas bool) (map[string]*Disk, util.MultiError) {
multiError := util.NewMultiError()

nodeSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaSoftAntiAffinity)
Expand All @@ -165,6 +181,17 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
zoneSoftAntiAffinity = volume.Spec.ReplicaZoneSoftAntiAffinity == longhorn.ReplicaZoneSoftAntiAffinityEnabled
}

creatingNewReplicasForReplenishment := false
if volume.Status.Robustness == longhorn.VolumeRobustnessDegraded {
timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume)
if err != nil {
err = errors.Wrap(err, "failed to get time until replica replacement")
multiError.Append(util.NewMultiError(err.Error()))
return map[string]*Disk{}, multiError
}
creatingNewReplicasForReplenishment = timeToReplacementReplica == 0
}

getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
multiError = util.NewMultiError()
for _, node := range nodes {
Expand All @@ -182,14 +209,30 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
replicasCountPerNode := map[string]int{}
// Get current nodes and zones
for _, r := range replicas {
if r.Spec.NodeID != "" && r.DeletionTimestamp == nil && r.Spec.FailedAt == "" {
if node, ok := nodeInfo[r.Spec.NodeID]; ok {
usedNodes[r.Spec.NodeID] = node
// For empty zone label, we treat them as
// one zone.
usedZones[node.Status.Zone] = true
replicasCountPerNode[r.Spec.NodeID] = replicasCountPerNode[r.Spec.NodeID] + 1
if r.Spec.NodeID == "" {
continue
}
if r.DeletionTimestamp != nil {
continue
}
if r.Spec.FailedAt != "" {
if ignoreFailedReplicas {
continue
}
if !IsPotentiallyReusableReplica(r) {
continue // This replica can never be used again, so it does not count in scheduling decisions.
}
if creatingNewReplicasForReplenishment {
continue // Maybe this replica can be used again, but it is being actively replaced anyway.
}
}

if node, ok := nodeInfo[r.Spec.NodeID]; ok {
usedNodes[r.Spec.NodeID] = node
// For empty zone label, we treat them as
// one zone.
usedZones[node.Status.Zone] = true
replicasCountPerNode[r.Spec.NodeID] = replicasCountPerNode[r.Spec.NodeID] + 1
}
}

Expand Down Expand Up @@ -471,7 +514,9 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon
}
}

diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false)
// Call getDiskCandidates with ignoreFailedReplicas == true since we want the list of candidates to include disks
// that already contain a failed replica.
diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false, true)

var reusedReplica *longhorn.Replica
for _, suggestDisk := range diskCandidates {
Expand Down Expand Up @@ -513,7 +558,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep

hasPotentiallyReusableReplica := false
for _, r := range replicas {
if IsPotentiallyReusableReplica(r, hardNodeAffinity) {
if IsPotentiallyReusableReplica(r) {
hasPotentiallyReusableReplica = true
break
}
Expand All @@ -522,42 +567,25 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep
return 0
}

// Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later.
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume)
if err != nil {
logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err)
return 0
msg := "Failed to get time until replica replacement, will directly replenish a new replica"
logrus.WithError(err).Errorf(msg)
}
waitInterval := time.Duration(settingValue) * time.Second
lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)

if err != nil {
logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err)
return 0
if timeUntilNext > 0 {
// Adding another second to the checkBackDuration to avoid clock skew.
timeUntilNext = timeUntilNext + time.Second
logrus.Infof("Replica replenishment is delayed until %v", timeOfNext.Add(time.Second))
}
now := time.Now()
if now.After(lastDegradedAt.Add(waitInterval)) {
return 0
}

logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval))
// Adding 1 more second to the check back interval to avoid clock skew
return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second
return timeUntilNext
}

func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) bool {
if r.Spec.FailedAt == "" {
return false
}
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
return false
}
if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount {
return false
}
if r.Spec.EvictionRequested {
// All failedReusableReplicas are also potentiallyFailedReusableReplicas.
if !IsPotentiallyReusableReplica(r) {
return false
}

if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
Expand Down Expand Up @@ -622,9 +650,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon
return true
}

// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable.
// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool {
// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means
// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica) bool {
if r.Spec.FailedAt == "" {
return false
}
Expand All @@ -637,9 +665,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string)
if r.Spec.EvictionRequested {
return false
}
if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
// TODO: Reuse failed replicas for a SPDK volume
if r.Spec.BackendStoreDriver == longhorn.BackendStoreDriverTypeV2 {
return false
Expand Down Expand Up @@ -793,3 +818,30 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long
}
return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false
}

// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume,
// even if there are potentially reusable failed replicas. It returns 0 if replica-replenishment-wait-interval has
// elapsed and a new replica is needed right now.
func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) {
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval")
return 0, time.Time{}, err
}
waitInterval := time.Duration(settingValue) * time.Second

lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)
if err != nil {
err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt)
return 0, time.Time{}, err
}

now := rcs.nowHandler()
timeOfNext := lastDegradedAt.Add(waitInterval)
if now.After(timeOfNext) {
// A replacement replica is needed now.
return 0, time.Time{}, nil
}

return timeOfNext.Sub(now), timeOfNext, nil
}
Loading
Loading