diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go index 094d85e52e..75291deb0b 100644 --- a/scheduler/replica_scheduler.go +++ b/scheduler/replica_scheduler.go @@ -20,6 +20,9 @@ const ( type ReplicaScheduler struct { ds *datastore.DataStore + + // Required for unit testing. + nowHandler func() time.Time } type Disk struct { @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct { func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler { rcScheduler := &ReplicaScheduler{ ds: ds, + + // Required for unit testing. + nowHandler: time.Now, } return rcScheduler } @@ -175,6 +181,27 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod zoneSoftAntiAffinity = volume.Spec.ReplicaZoneSoftAntiAffinity == longhorn.ReplicaZoneSoftAntiAffinityEnabled } +<<<<<<< HEAD +======= + diskSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaDiskSoftAntiAffinity) + if err != nil { + err = errors.Wrapf(err, "failed to get %v setting", types.SettingNameReplicaDiskSoftAntiAffinity) + multiError.Append(util.NewMultiError(err.Error())) + return map[string]*Disk{}, multiError + } + if volume.Spec.ReplicaDiskSoftAntiAffinity != longhorn.ReplicaDiskSoftAntiAffinityDefault && + volume.Spec.ReplicaDiskSoftAntiAffinity != "" { + diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled + } + + timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume) + if err != nil { + err = errors.Wrap(err, "failed to get time until replica replacement") + multiError.Append(util.NewMultiError(err.Error())) + return map[string]*Disk{}, multiError + } + +>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas) getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) { multiError = util.NewMultiError() for _, node := range nodes { @@ -191,7 +218,7 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod } usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo, - ignoreFailedReplicas) + ignoreFailedReplicas, timeToReplacementReplica == 0) allowEmptyNodeSelectorVolume, err := rcs.ds.GetSettingAsBool(types.SettingNameAllowEmptyNodeSelectorVolume) if err != nil { @@ -539,7 +566,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep hasPotentiallyReusableReplica := false for _, r := range replicas { - if IsPotentiallyReusableReplica(r, hardNodeAffinity) { + if IsPotentiallyReusableReplica(r) { hasPotentiallyReusableReplica = true break } @@ -548,29 +575,18 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep return 0 } - // Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later. - settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) - if err != nil { - logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err) - return 0 - } - waitInterval := time.Duration(settingValue) * time.Second - lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) - + timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume) if err != nil { - logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err) - return 0 + msg := "Failed to get time until replica replacement, will directly replenish a new replica" + logrus.WithError(err).Errorf(msg) } - now := time.Now() - if now.After(lastDegradedAt.Add(waitInterval)) { - return 0 + if timeUntilNext > 0 { + logrus.Infof("Replica replenishment is delayed until %v", timeOfNext) } - - logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval)) - // Adding 1 more second to the check back interval to avoid clock skew - return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second + return timeUntilNext } +<<<<<<< HEAD func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) bool { if r.Spec.FailedAt == "" { return false @@ -583,7 +599,14 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon } if r.Spec.EvictionRequested { return false +======= +func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) (bool, error) { + // All failedReusableReplicas are also potentiallyFailedReusableReplicas. + if !IsPotentiallyReusableReplica(r) { + return false, nil +>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas) } + if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { return false } @@ -648,9 +671,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon return true } -// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable. -// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. -func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool { +// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means +// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. +func IsPotentiallyReusableReplica(r *longhorn.Replica) bool { if r.Spec.FailedAt == "" { return false } @@ -663,9 +686,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) if r.Spec.EvictionRequested { return false } - if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { - return false - } // TODO: Reuse failed replicas for a SPDK volume if r.Spec.BackendStoreDriver == longhorn.BackendStoreDriverTypeV2 { return false @@ -822,10 +842,14 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long <<<<<<< HEAD ======= -// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. Some callers do not consider a -// node or zone to be used if it contains a failed replica. ignoreFailedReplicas == true supports this use case. +// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. +// - Some callers do not consider a node or zone to be used if it contains a failed replica. +// ignoreFailedReplicas == true supports this use case. +// - Otherwise, getCurrentNodesAndZones does not consider a node or zone to be occupied by a failed replica that can +// no longer be used or is likely actively being replaced. This makes nodes and zones with useless replicas +// available for scheduling. func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node, - ignoreFailedReplicas bool) (map[string]*longhorn.Node, + ignoreFailedReplicas, creatingNewReplicasForReplenishment bool) (map[string]*longhorn.Node, map[string]bool, map[string]bool, map[string]bool) { usedNodes := map[string]*longhorn.Node{} usedZones := map[string]bool{} @@ -839,8 +863,16 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map if r.DeletionTimestamp != nil { continue } - if r.Spec.FailedAt != "" && ignoreFailedReplicas { - continue + if r.Spec.FailedAt != "" { + if ignoreFailedReplicas { + continue + } + if !IsPotentiallyReusableReplica(r) { + continue // This replica can never be used again, so it does not count in scheduling decisions. + } + if creatingNewReplicasForReplenishment { + continue // Maybe this replica can be used again, but it is being actively replaced anyway. + } } if node, ok := nodeInfo[r.Spec.NodeID]; ok { @@ -869,4 +901,42 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map return usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones } +<<<<<<< HEAD >>>>>>> 29a895c2 (Consider a node with a failed replica as still used) +======= + +// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume, +// even if there are potentially reusable failed replicas. It returns: +// - -time.Duration if there is no need for a replacement replica, +// - 0 if a replacement replica is needed right now (replica-replenishment-wait-interval has elapsed), +// - +time.Duration if a replacement replica will be needed (replica-replenishment-wait-interval has not elapsed). +func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) { + if volume.Status.Robustness != longhorn.VolumeRobustnessDegraded { + // No replacement replica is needed. + return -1, time.Time{}, nil + } + + settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) + if err != nil { + err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval") + return 0, time.Time{}, err + } + waitInterval := time.Duration(settingValue) * time.Second + + lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) + if err != nil { + err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt) + return 0, time.Time{}, err + } + + now := rcs.nowHandler() + if now.After(lastDegradedAt.Add(waitInterval)) { + // A replacement replica is needed now. + return 0, time.Time{}, nil + } + + timeOfNext := lastDegradedAt.Add(waitInterval) + // Adding 1 more second to the check back interval to avoid clock skew + return timeOfNext.Sub(now) + time.Second, timeOfNext, nil +} +>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas) diff --git a/scheduler/replica_scheduler_test.go b/scheduler/replica_scheduler_test.go index f73e7cf847..cf6dbfd877 100644 --- a/scheduler/replica_scheduler_test.go +++ b/scheduler/replica_scheduler_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/fake" @@ -55,8 +56,13 @@ const ( TestZone1 = "test-zone-1" TestZone2 = "test-zone-2" +<<<<<<< HEAD TestTimeNow = "2015-01-02T00:00:00Z" >>>>>>> 29a895c2 (Consider a node with a failed replica as still used) +======= + TestTimeNow = "2015-01-02T00:00:00Z" + TestTimeOneMinuteAgo = "2015-01-01T23:59:00Z" +>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas) ) var longhornFinalizerKey = longhorn.SchemeGroupVersion.Group @@ -67,7 +73,9 @@ func newReplicaScheduler(lhClient *lhfake.Clientset, kubeClient *fake.Clientset, ds := datastore.NewDataStore(TestNamespace, lhClient, kubeClient, extensionsClient, informerFactories) - return NewReplicaScheduler(ds) + rcs := NewReplicaScheduler(ds) + rcs.nowHandler = getTestNow + return rcs } func newDaemonPod(phase corev1.PodPhase, name, namespace, nodeID, podIP string) *corev1.Pod { @@ -241,6 +249,16 @@ type ReplicaSchedulerTestCase struct { storageOverProvisioningPercentage string storageMinimalAvailablePercentage string replicaNodeSoftAntiAffinity string +<<<<<<< HEAD +======= + replicaZoneSoftAntiAffinity string + replicaDiskSoftAntiAffinity string + ReplicaReplenishmentWaitInterval string + + // some test cases only try to schedule a subset of a volume's replicas + allReplicas map[string]*longhorn.Replica + replicasToSchedule map[string]struct{} +>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas) // schedule state expectedNodes map[string]*longhorn.Node @@ -1087,25 +1105,44 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { tc.replicaZoneSoftAntiAffinity = "false" // Do not allow replicas to schedule to the same zone. testCases["fail scheduling when doing so would reuse an invalid evicting node"] = tc - // Test fail to schedule to node with failed replica - // If replicaNodeSoftAntiAffinity == false, this shouldn't be possible. - tc = generateFailedReplicaTestCase("false") + // Test potentially reusable replica before interval expires + // We should fail to schedule a new replica to this node until the interval expires. + tc = generateFailedReplicaTestCase(true, false) tc.err = false tc.firstNilReplica = 0 - testCases["fail to schedule to node with failed replica"] = tc + testCases["potentially reusable replica before interval expires"] = tc - // Test succeed to schedule to node with failed replica - // If replicaNodeSoftAntiAffinity == true, this should be possible. - tc = generateFailedReplicaTestCase("true") + // Test potentially reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the interval expired. + tc = generateFailedReplicaTestCase(true, true) tc.err = false tc.firstNilReplica = -1 - testCases["succeed to schedule to node with failed replica"] = tc + testCases["potentially reusable replica after interval expires"] = tc +<<<<<<< HEAD testCasesActual := map[string]*ReplicaSchedulerTestCase{} testCasesActual["fail to schedule to node with failed replica"] = testCases["fail to schedule to node with failed replica"] testCasesActual["succeed to schedule to node with failed replica"] = testCases["succeed to schedule to node with failed replica"] for name, tc := range testCasesActual { >>>>>>> 29a895c2 (Consider a node with a failed replica as still used) +======= + // Test non-reusable replica before interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable. + tc = generateFailedReplicaTestCase(false, false) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica before interval expires"] = tc + + // Test non-reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable and the + // interval expired anyway. + tc = generateFailedReplicaTestCase(false, true) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica after interval expires"] = tc + + for name, tc := range testCases { +>>>>>>> 65ceb37c (Refine scheduling behavior with failed replicas) fmt.Printf("testing %v\n", name) kubeClient := fake.NewSimpleClientset() @@ -1213,11 +1250,11 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } } -func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *ReplicaSchedulerTestCase) { +// generateFailedReplicaTestCase helps generate test cases in which a node contains a failed replica and the scheduler +// must decide whether to allow additional replicas to schedule to it. +func generateFailedReplicaTestCase( + replicaReusable, waitIntervalExpired bool) (tc *ReplicaSchedulerTestCase) { tc = generateSchedulerTestCase() - tc.replicaNodeSoftAntiAffinity = replicaNodeSoftAntiAffinity - tc.replicaDiskSoftAntiAffinity = "true" // Do not hinder replica scheduling except for node. - tc.replicaZoneSoftAntiAffinity = "true" // Do not hinder replica scheduling except for node. daemon1 := newDaemonPod(corev1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1) tc.daemons = []*corev1.Pod{ daemon1, @@ -1229,6 +1266,10 @@ func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *Repl getDiskID(TestNode1, "1"): disk, } + // We are specifically interested in situations in which a replica is ONLY schedulable to a node because its + // existing replica is failed. + tc.replicaNodeSoftAntiAffinity = "false" + // A failed replica is already scheduled. var alreadyScheduledReplica *longhorn.Replica for _, replica := range tc.allReplicas { @@ -1237,7 +1278,22 @@ func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *Repl } delete(tc.replicasToSchedule, alreadyScheduledReplica.Name) alreadyScheduledReplica.Spec.NodeID = TestNode1 + alreadyScheduledReplica.Spec.DiskID = getDiskID(TestNode1, "1") alreadyScheduledReplica.Spec.FailedAt = TestTimeNow + tc.volume.Status.Robustness = longhorn.VolumeRobustnessDegraded + tc.volume.Status.LastDegradedAt = TestTimeOneMinuteAgo + + if replicaReusable { + alreadyScheduledReplica.Spec.RebuildRetryCount = 0 + } else { + alreadyScheduledReplica.Spec.RebuildRetryCount = 5 + } + + if waitIntervalExpired { + tc.ReplicaReplenishmentWaitInterval = "30" + } else { + tc.ReplicaReplenishmentWaitInterval = "90" + } node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ getDiskID(TestNode1, "1"): { @@ -1307,6 +1363,17 @@ func setSettings(tc *ReplicaSchedulerTestCase, lhClient *lhfake.Clientset, sInde err = sIndexer.Add(setting) c.Assert(err, IsNil) } + // Set replica replenishment wait interval setting + if tc.ReplicaReplenishmentWaitInterval != "" { + s := initSettings( + string(types.SettingNameReplicaReplenishmentWaitInterval), + tc.ReplicaReplenishmentWaitInterval) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } } func (s *TestSuite) TestFilterDisksWithMatchingReplicas(c *C) { @@ -1528,7 +1595,7 @@ func (s *TestSuite) TestGetCurrentNodesAndZones(c *C) { for name, tc := range testCases { fmt.Printf("testing %v\n", name) usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(tc.replicas, tc.nodeInfo, - false) + false, false) verifyNodeNames(tc.expectUsedNodeNames, usedNodes) verifyZoneNames(tc.expectUsedZoneNames, usedZones) verifyOnlyEvictingNodeNames(tc.expectOnlyEvictingNodeNames, onlyEvictingNodes) @@ -1536,3 +1603,8 @@ func (s *TestSuite) TestGetCurrentNodesAndZones(c *C) { >>>>>>> 29a895c2 (Consider a node with a failed replica as still used) } } + +func getTestNow() time.Time { + now, _ := time.Parse(time.RFC3339, TestTimeNow) + return now +}