diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go index f5cc47786c..da7f915cfc 100644 --- a/scheduler/replica_scheduler.go +++ b/scheduler/replica_scheduler.go @@ -20,6 +20,9 @@ const ( type ReplicaScheduler struct { ds *datastore.DataStore + + // Required for unit testing. + nowHandler func() time.Time } type Disk struct { @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct { func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler { rcScheduler := &ReplicaScheduler{ ds: ds, + + // Required for unit testing. + nowHandler: time.Now, } return rcScheduler } @@ -88,7 +94,7 @@ func (rcs *ReplicaScheduler) ScheduleReplica(replica *longhorn.Replica, replicas nodeDisksMap[node.Name] = disks } - diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true) + diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true, false) // there's no disk that fit for current replica if len(diskCandidates) == 0 { @@ -154,7 +160,17 @@ func getNodesWithEvictingReplicas(replicas map[string]*longhorn.Replica, nodeInf return nodesWithEvictingReplicas } -func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, nodeDisksMap map[string]map[string]struct{}, replicas map[string]*longhorn.Replica, volume *longhorn.Volume, requireSchedulingCheck bool) (map[string]*Disk, util.MultiError) { +// getDiskCandidates returns a map of the most appropriate disks a replica can be scheduled to (assuming it can be +// scheduled at all). For example, consider a case in which there are two disks on nodes without a replica for a volume +// and two disks on nodes with a replica for the same volume. getDiskCandidates only returns the disks without a +// replica, even if the replica can legally be scheduled on all four disks. +// Some callers (e.g. CheckAndReuseFailedReplicas) do not consider a node or zone to be used if it contains a failed +// replica. ignoreFailedReplicas == true supports this use case. +func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, + nodeDisksMap map[string]map[string]struct{}, + replicas map[string]*longhorn.Replica, + volume *longhorn.Volume, + requireSchedulingCheck, ignoreFailedReplicas bool) (map[string]*Disk, util.MultiError) { multiError := util.NewMultiError() nodeSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaSoftAntiAffinity) @@ -191,6 +207,17 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled } + creatingNewReplicasForReplenishment := false + if volume.Status.Robustness == longhorn.VolumeRobustnessDegraded { + timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume) + if err != nil { + err = errors.Wrap(err, "failed to get time until replica replacement") + multiError.Append(util.NewMultiError(err.Error())) + return map[string]*Disk{}, multiError + } + creatingNewReplicasForReplenishment = timeToReplacementReplica == 0 + } + getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) { diskCandidates = map[string]*Disk{} multiError = util.NewMultiError() @@ -206,7 +233,8 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod return diskCandidates, multiError } - usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo) + usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo, + ignoreFailedReplicas, creatingNewReplicasForReplenishment) allowEmptyNodeSelectorVolume, err := rcs.ds.GetSettingAsBool(types.SettingNameAllowEmptyNodeSelectorVolume) if err != nil { @@ -523,7 +551,9 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon } } - diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false) + // Call getDiskCandidates with ignoreFailedReplicas == true since we want the list of candidates to include disks + // that already contain a failed replica. + diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false, true) var reusedReplica *longhorn.Replica for _, suggestDisk := range diskCandidates { @@ -565,7 +595,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep hasPotentiallyReusableReplica := false for _, r := range replicas { - if IsPotentiallyReusableReplica(r, hardNodeAffinity) { + if IsPotentiallyReusableReplica(r) { hasPotentiallyReusableReplica = true break } @@ -574,42 +604,25 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep return 0 } - // Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later. - settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) + timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume) if err != nil { - logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err) - return 0 + msg := "Failed to get time until replica replacement, will directly replenish a new replica" + logrus.WithError(err).Errorf(msg) } - waitInterval := time.Duration(settingValue) * time.Second - lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) - - if err != nil { - logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err) - return 0 - } - now := time.Now() - if now.After(lastDegradedAt.Add(waitInterval)) { - return 0 + if timeUntilNext > 0 { + // Adding another second to the checkBackDuration to avoid clock skew. + timeUntilNext = timeUntilNext + time.Second + logrus.Infof("Replica replenishment is delayed until %v", timeOfNext.Add(time.Second)) } - - logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval)) - // Adding 1 more second to the check back interval to avoid clock skew - return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second + return timeUntilNext } func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) (bool, error) { - if r.Spec.FailedAt == "" { - return false, nil - } - if r.Spec.NodeID == "" || r.Spec.DiskID == "" { - return false, nil - } - if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount { - return false, nil - } - if r.Spec.EvictionRequested { + // All failedReusableReplicas are also potentiallyFailedReusableReplicas. + if !IsPotentiallyReusableReplica(r) { return false, nil } + if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { return false, nil } @@ -680,9 +693,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon return true, nil } -// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable. -// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. -func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool { +// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means +// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. +func IsPotentiallyReusableReplica(r *longhorn.Replica) bool { if r.Spec.FailedAt == "" { return false } @@ -695,9 +708,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) if r.Spec.EvictionRequested { return false } - if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { - return false - } // TODO: Reuse failed replicas for a SPDK volume if datastore.IsDataEngineV2(r.Spec.DataEngine) { return false @@ -852,7 +862,14 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false } -func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node) (map[string]*longhorn.Node, +// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. +// - Some callers do not consider a node or zone to be used if it contains a failed replica. +// ignoreFailedReplicas == true supports this use case. +// - Otherwise, getCurrentNodesAndZones does not consider a node or zone to be occupied by a failed replica that can +// no longer be used or is likely actively being replaced. This makes nodes and zones with useless replicas +// available for scheduling. +func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node, + ignoreFailedReplicas, creatingNewReplicasForReplenishment bool) (map[string]*longhorn.Node, map[string]bool, map[string]bool, map[string]bool) { usedNodes := map[string]*longhorn.Node{} usedZones := map[string]bool{} @@ -860,31 +877,74 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map onlyEvictingZones := map[string]bool{} for _, r := range replicas { - if r.Spec.NodeID != "" && r.DeletionTimestamp == nil && r.Spec.FailedAt == "" { - if node, ok := nodeInfo[r.Spec.NodeID]; ok { - if r.Spec.EvictionRequested { - if _, ok := usedNodes[r.Spec.NodeID]; !ok { - // This is an evicting replica on a thus far unused node. We won't change this again unless we - // find a non-evicting replica on this node. - onlyEvictingNodes[node.Name] = true - } - if used := usedZones[node.Status.Zone]; !used { - // This is an evicting replica in a thus far unused zone. We won't change this again unless we - // find a non-evicting replica in this zone. - onlyEvictingZones[node.Status.Zone] = true - } - } else { - // There is now at least one replica on this node and in this zone that is not evicting. - onlyEvictingNodes[node.Name] = false - onlyEvictingZones[node.Status.Zone] = false - } + if r.Spec.NodeID == "" { + continue + } + if r.DeletionTimestamp != nil { + continue + } + if r.Spec.FailedAt != "" { + if ignoreFailedReplicas { + continue + } + if !IsPotentiallyReusableReplica(r) { + continue // This replica can never be used again, so it does not count in scheduling decisions. + } + if creatingNewReplicasForReplenishment { + continue // Maybe this replica can be used again, but it is being actively replaced anyway. + } + } - usedNodes[node.Name] = node - // For empty zone label, we treat them as one zone. - usedZones[node.Status.Zone] = true + if node, ok := nodeInfo[r.Spec.NodeID]; ok { + if r.Spec.EvictionRequested { + if _, ok := usedNodes[r.Spec.NodeID]; !ok { + // This is an evicting replica on a thus far unused node. We won't change this again unless we + // find a non-evicting replica on this node. + onlyEvictingNodes[node.Name] = true + } + if used := usedZones[node.Status.Zone]; !used { + // This is an evicting replica in a thus far unused zone. We won't change this again unless we + // find a non-evicting replica in this zone. + onlyEvictingZones[node.Status.Zone] = true + } + } else { + // There is now at least one replica on this node and in this zone that is not evicting. + onlyEvictingNodes[node.Name] = false + onlyEvictingZones[node.Status.Zone] = false } + + usedNodes[node.Name] = node + // For empty zone label, we treat them as one zone. + usedZones[node.Status.Zone] = true } } return usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones } + +// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume, +// even if there are potentially reusable failed replicas. It returns 0 if replica-replenishment-wait-interval has +// elapsed and a new replica is needed right now. +func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) { + settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) + if err != nil { + err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval") + return 0, time.Time{}, err + } + waitInterval := time.Duration(settingValue) * time.Second + + lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) + if err != nil { + err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt) + return 0, time.Time{}, err + } + + now := rcs.nowHandler() + timeOfNext := lastDegradedAt.Add(waitInterval) + if now.After(timeOfNext) { + // A replacement replica is needed now. + return 0, time.Time{}, nil + } + + return timeOfNext.Sub(now), timeOfNext, nil +} diff --git a/scheduler/replica_scheduler_test.go b/scheduler/replica_scheduler_test.go index d92f74e456..e550ee873c 100644 --- a/scheduler/replica_scheduler_test.go +++ b/scheduler/replica_scheduler_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/fake" @@ -53,6 +54,9 @@ const ( TestZone1 = "test-zone-1" TestZone2 = "test-zone-2" + + TestTimeNow = "2015-01-02T00:00:00Z" + TestTimeOneMinuteAgo = "2015-01-01T23:59:00Z" ) var longhornFinalizerKey = longhorn.SchemeGroupVersion.Group @@ -63,7 +67,9 @@ func newReplicaScheduler(lhClient *lhfake.Clientset, kubeClient *fake.Clientset, ds := datastore.NewDataStore(TestNamespace, lhClient, kubeClient, extensionsClient, informerFactories) - return NewReplicaScheduler(ds) + rcs := NewReplicaScheduler(ds) + rcs.nowHandler = getTestNow + return rcs } func newDaemonPod(phase corev1.PodPhase, name, namespace, nodeID, podIP string) *corev1.Pod { @@ -240,6 +246,7 @@ type ReplicaSchedulerTestCase struct { replicaNodeSoftAntiAffinity string replicaZoneSoftAntiAffinity string replicaDiskSoftAntiAffinity string + ReplicaReplenishmentWaitInterval string // some test cases only try to schedule a subset of a volume's replicas allReplicas map[string]*longhorn.Replica @@ -1097,6 +1104,35 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { tc.replicaZoneSoftAntiAffinity = "false" // Do not allow replicas to schedule to the same zone. testCases["fail scheduling when doing so would reuse an invalid evicting node"] = tc + // Test potentially reusable replica before interval expires + // We should fail to schedule a new replica to this node until the interval expires. + tc = generateFailedReplicaTestCase(true, false) + tc.err = false + tc.firstNilReplica = 0 + testCases["potentially reusable replica before interval expires"] = tc + + // Test potentially reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the interval expired. + tc = generateFailedReplicaTestCase(true, true) + tc.err = false + tc.firstNilReplica = -1 + testCases["potentially reusable replica after interval expires"] = tc + + // Test non-reusable replica before interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable. + tc = generateFailedReplicaTestCase(false, false) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica before interval expires"] = tc + + // Test non-reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable and the + // interval expired anyway. + tc = generateFailedReplicaTestCase(false, true) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica after interval expires"] = tc + for name, tc := range testCases { fmt.Printf("testing %v\n", name) @@ -1188,6 +1224,71 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } } +// generateFailedReplicaTestCase helps generate test cases in which a node contains a failed replica and the scheduler +// must decide whether to allow additional replicas to schedule to it. +func generateFailedReplicaTestCase( + replicaReusable, waitIntervalExpired bool) (tc *ReplicaSchedulerTestCase) { + tc = generateSchedulerTestCase() + daemon1 := newDaemonPod(corev1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1) + tc.daemons = []*corev1.Pod{ + daemon1, + } + node1 := newNode(TestNode1, TestNamespace, TestZone1, true, longhorn.ConditionStatusTrue) + tc.engineImage.Status.NodeDeploymentMap[node1.Name] = true + disk := newDisk(TestDefaultDataPath, true, 0) + node1.Spec.Disks = map[string]longhorn.DiskSpec{ + getDiskID(TestNode1, "1"): disk, + } + + // We are specifically interested in situations in which a replica is ONLY schedulable to a node because its + // existing replica is failed. + tc.replicaNodeSoftAntiAffinity = "false" + + // A failed replica is already scheduled. + var alreadyScheduledReplica *longhorn.Replica + for _, replica := range tc.allReplicas { + alreadyScheduledReplica = replica + break + } + delete(tc.replicasToSchedule, alreadyScheduledReplica.Name) + alreadyScheduledReplica.Spec.NodeID = TestNode1 + alreadyScheduledReplica.Spec.DiskID = getDiskID(TestNode1, "1") + alreadyScheduledReplica.Spec.FailedAt = TestTimeNow + tc.volume.Status.Robustness = longhorn.VolumeRobustnessDegraded + tc.volume.Status.LastDegradedAt = TestTimeOneMinuteAgo + + if replicaReusable { + alreadyScheduledReplica.Spec.RebuildRetryCount = 0 + } else { + alreadyScheduledReplica.Spec.RebuildRetryCount = 5 + } + + if waitIntervalExpired { + tc.ReplicaReplenishmentWaitInterval = "30" + } else { + tc.ReplicaReplenishmentWaitInterval = "90" + } + + node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ + getDiskID(TestNode1, "1"): { + StorageAvailable: TestDiskAvailableSize, + StorageScheduled: TestVolumeSize, + StorageMaximum: TestDiskSize, + Conditions: []longhorn.Condition{ + newCondition(longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusTrue), + }, + DiskUUID: getDiskID(TestNode1, "1"), + Type: longhorn.DiskTypeFilesystem, + ScheduledReplica: map[string]int64{alreadyScheduledReplica.Name: TestVolumeSize}, + }, + } + nodes := map[string]*longhorn.Node{ + TestNode1: node1, + } + tc.nodes = nodes + return +} + func setSettings(tc *ReplicaSchedulerTestCase, lhClient *lhfake.Clientset, sIndexer cache.Indexer, c *C) { // Set storage over-provisioning percentage settings if tc.storageOverProvisioningPercentage != "" && tc.storageMinimalAvailablePercentage != "" { @@ -1236,6 +1337,17 @@ func setSettings(tc *ReplicaSchedulerTestCase, lhClient *lhfake.Clientset, sInde err = sIndexer.Add(setting) c.Assert(err, IsNil) } + // Set replica replenishment wait interval setting + if tc.ReplicaReplenishmentWaitInterval != "" { + s := initSettings( + string(types.SettingNameReplicaReplenishmentWaitInterval), + tc.ReplicaReplenishmentWaitInterval) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } } func (s *TestSuite) TestFilterDisksWithMatchingReplicas(c *C) { @@ -1456,10 +1568,16 @@ func (s *TestSuite) TestGetCurrentNodesAndZones(c *C) { for name, tc := range testCases { fmt.Printf("testing %v\n", name) - usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(tc.replicas, tc.nodeInfo) + usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(tc.replicas, tc.nodeInfo, + false, false) verifyNodeNames(tc.expectUsedNodeNames, usedNodes) verifyZoneNames(tc.expectUsedZoneNames, usedZones) verifyOnlyEvictingNodeNames(tc.expectOnlyEvictingNodeNames, onlyEvictingNodes) verifyOnlyEvictingZoneNames(tc.expectOnlyEvictingZoneNames, onlyEvictingZones) } } + +func getTestNow() time.Time { + now, _ := time.Parse(time.RFC3339, TestTimeNow) + return now +}