Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider a node with a failed reusable replica as still used #2650

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 121 additions & 61 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

type ReplicaScheduler struct {
ds *datastore.DataStore

// Required for unit testing.
nowHandler func() time.Time
}

type Disk struct {
Expand All @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct {
func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler {
rcScheduler := &ReplicaScheduler{
ds: ds,

// Required for unit testing.
nowHandler: time.Now,
}
return rcScheduler
}
Expand Down Expand Up @@ -88,7 +94,7 @@ func (rcs *ReplicaScheduler) ScheduleReplica(replica *longhorn.Replica, replicas
nodeDisksMap[node.Name] = disks
}

diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true)
diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true, false)

// there's no disk that fit for current replica
if len(diskCandidates) == 0 {
Expand Down Expand Up @@ -154,7 +160,17 @@ func getNodesWithEvictingReplicas(replicas map[string]*longhorn.Replica, nodeInf
return nodesWithEvictingReplicas
}

func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, nodeDisksMap map[string]map[string]struct{}, replicas map[string]*longhorn.Replica, volume *longhorn.Volume, requireSchedulingCheck bool) (map[string]*Disk, util.MultiError) {
// getDiskCandidates returns a map of the most appropriate disks a replica can be scheduled to (assuming it can be
// scheduled at all). For example, consider a case in which there are two disks on nodes without a replica for a volume
// and two disks on nodes with a replica for the same volume. getDiskCandidates only returns the disks without a
// replica, even if the replica can legally be scheduled on all four disks.
// Some callers (e.g. CheckAndReuseFailedReplicas) do not consider a node or zone to be used if it contains a failed
// replica. ignoreFailedReplicas == true supports this use case.
ejweber marked this conversation as resolved.
Show resolved Hide resolved
func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node,
nodeDisksMap map[string]map[string]struct{},
replicas map[string]*longhorn.Replica,
volume *longhorn.Volume,
requireSchedulingCheck, ignoreFailedReplicas bool) (map[string]*Disk, util.MultiError) {
multiError := util.NewMultiError()

nodeSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaSoftAntiAffinity)
Expand Down Expand Up @@ -191,6 +207,17 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled
}

creatingNewReplicasForReplenishment := false
if volume.Status.Robustness == longhorn.VolumeRobustnessDegraded {
timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume)
if err != nil {
err = errors.Wrap(err, "failed to get time until replica replacement")
multiError.Append(util.NewMultiError(err.Error()))
return map[string]*Disk{}, multiError
}
creatingNewReplicasForReplenishment = timeToReplacementReplica == 0
}

getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
diskCandidates = map[string]*Disk{}
multiError = util.NewMultiError()
Expand All @@ -206,7 +233,8 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
return diskCandidates, multiError
}

usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo)
usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo,
ignoreFailedReplicas, creatingNewReplicasForReplenishment)

allowEmptyNodeSelectorVolume, err := rcs.ds.GetSettingAsBool(types.SettingNameAllowEmptyNodeSelectorVolume)
if err != nil {
Expand Down Expand Up @@ -523,7 +551,9 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon
}
}

diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false)
// Call getDiskCandidates with ignoreFailedReplicas == true since we want the list of candidates to include disks
// that already contain a failed replica.
diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false, true)

var reusedReplica *longhorn.Replica
for _, suggestDisk := range diskCandidates {
Expand Down Expand Up @@ -565,7 +595,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep

hasPotentiallyReusableReplica := false
for _, r := range replicas {
if IsPotentiallyReusableReplica(r, hardNodeAffinity) {
if IsPotentiallyReusableReplica(r) {
hasPotentiallyReusableReplica = true
break
}
Expand All @@ -574,42 +604,25 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep
return 0
}

// Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later.
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume)
if err != nil {
logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err)
return 0
msg := "Failed to get time until replica replacement, will directly replenish a new replica"
logrus.WithError(err).Errorf(msg)
james-munson marked this conversation as resolved.
Show resolved Hide resolved
}
waitInterval := time.Duration(settingValue) * time.Second
lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)

if err != nil {
logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err)
return 0
}
now := time.Now()
if now.After(lastDegradedAt.Add(waitInterval)) {
return 0
if timeUntilNext > 0 {
// Adding another second to the checkBackDuration to avoid clock skew.
timeUntilNext = timeUntilNext + time.Second
logrus.Infof("Replica replenishment is delayed until %v", timeOfNext.Add(time.Second))
}

logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval))
// Adding 1 more second to the check back interval to avoid clock skew
return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second
return timeUntilNext
james-munson marked this conversation as resolved.
Show resolved Hide resolved
}

func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) (bool, error) {
if r.Spec.FailedAt == "" {
return false, nil
}
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
return false, nil
}
if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount {
return false, nil
}
if r.Spec.EvictionRequested {
// All failedReusableReplicas are also potentiallyFailedReusableReplicas.
if !IsPotentiallyReusableReplica(r) {
return false, nil
}

if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false, nil
}
Expand Down Expand Up @@ -680,9 +693,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon
return true, nil
}

// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable.
// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool {
// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means
// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica) bool {
if r.Spec.FailedAt == "" {
return false
}
Expand All @@ -695,9 +708,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string)
if r.Spec.EvictionRequested {
return false
}
if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
// TODO: Reuse failed replicas for a SPDK volume
if datastore.IsDataEngineV2(r.Spec.DataEngine) {
return false
Expand Down Expand Up @@ -852,39 +862,89 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long
return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false
}

func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node) (map[string]*longhorn.Node,
// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to.
// - Some callers do not consider a node or zone to be used if it contains a failed replica.
// ignoreFailedReplicas == true supports this use case.
// - Otherwise, getCurrentNodesAndZones does not consider a node or zone to be occupied by a failed replica that can
// no longer be used or is likely actively being replaced. This makes nodes and zones with useless replicas
// available for scheduling.
func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node,
ignoreFailedReplicas, creatingNewReplicasForReplenishment bool) (map[string]*longhorn.Node,
map[string]bool, map[string]bool, map[string]bool) {
usedNodes := map[string]*longhorn.Node{}
usedZones := map[string]bool{}
onlyEvictingNodes := map[string]bool{}
onlyEvictingZones := map[string]bool{}

for _, r := range replicas {
if r.Spec.NodeID != "" && r.DeletionTimestamp == nil && r.Spec.FailedAt == "" {
if node, ok := nodeInfo[r.Spec.NodeID]; ok {
if r.Spec.EvictionRequested {
if _, ok := usedNodes[r.Spec.NodeID]; !ok {
// This is an evicting replica on a thus far unused node. We won't change this again unless we
// find a non-evicting replica on this node.
onlyEvictingNodes[node.Name] = true
}
if used := usedZones[node.Status.Zone]; !used {
// This is an evicting replica in a thus far unused zone. We won't change this again unless we
// find a non-evicting replica in this zone.
onlyEvictingZones[node.Status.Zone] = true
}
} else {
// There is now at least one replica on this node and in this zone that is not evicting.
onlyEvictingNodes[node.Name] = false
onlyEvictingZones[node.Status.Zone] = false
}
if r.Spec.NodeID == "" {
continue
}
if r.DeletionTimestamp != nil {
continue
}
if r.Spec.FailedAt != "" {
if ignoreFailedReplicas {
continue
}
if !IsPotentiallyReusableReplica(r) {
continue // This replica can never be used again, so it does not count in scheduling decisions.
}
if creatingNewReplicasForReplenishment {
continue // Maybe this replica can be used again, but it is being actively replaced anyway.
}
}

usedNodes[node.Name] = node
// For empty zone label, we treat them as one zone.
usedZones[node.Status.Zone] = true
if node, ok := nodeInfo[r.Spec.NodeID]; ok {
if r.Spec.EvictionRequested {
if _, ok := usedNodes[r.Spec.NodeID]; !ok {
// This is an evicting replica on a thus far unused node. We won't change this again unless we
// find a non-evicting replica on this node.
onlyEvictingNodes[node.Name] = true
}
if used := usedZones[node.Status.Zone]; !used {
// This is an evicting replica in a thus far unused zone. We won't change this again unless we
// find a non-evicting replica in this zone.
onlyEvictingZones[node.Status.Zone] = true
}
} else {
// There is now at least one replica on this node and in this zone that is not evicting.
onlyEvictingNodes[node.Name] = false
onlyEvictingZones[node.Status.Zone] = false
}

usedNodes[node.Name] = node
// For empty zone label, we treat them as one zone.
usedZones[node.Status.Zone] = true
}
}

return usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones
}

// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume,
// even if there are potentially reusable failed replicas. It returns 0 if replica-replenishment-wait-interval has
// elapsed and a new replica is needed right now.
func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) {
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval")
return 0, time.Time{}, err
}
waitInterval := time.Duration(settingValue) * time.Second

lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)
if err != nil {
err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt)
return 0, time.Time{}, err
}

now := rcs.nowHandler()
timeOfNext := lastDegradedAt.Add(waitInterval)
if now.After(timeOfNext) {
// A replacement replica is needed now.
return 0, time.Time{}, nil
}

return timeOfNext.Sub(now), timeOfNext, nil
}
Loading