From bb065062f1323a7af712c00cde33ed0d97ae4980 Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Fri, 1 Mar 2024 15:41:05 -0600 Subject: [PATCH] Refine scheduling behavior with failed replicas Consider a node with a failed replica as used if the failed replica is potentially reusable and replica-replenishment-wait-interval hasn't expired. Longhorn 8043 Signed-off-by: Eric Weber --- scheduler/replica_scheduler.go | 120 ++++++++++++++++++---------- scheduler/replica_scheduler_test.go | 90 ++++++++++++++++----- 2 files changed, 150 insertions(+), 60 deletions(-) diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go index 3f06123a40..d4a1a266bb 100644 --- a/scheduler/replica_scheduler.go +++ b/scheduler/replica_scheduler.go @@ -20,6 +20,9 @@ const ( type ReplicaScheduler struct { ds *datastore.DataStore + + // Required for unit testing. + nowHandler func() time.Time } type Disk struct { @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct { func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler { rcScheduler := &ReplicaScheduler{ ds: ds, + + // Required for unit testing. + nowHandler: time.Now, } return rcScheduler } @@ -201,6 +207,13 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled } + timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume) + if err != nil { + err = errors.Wrap(err, "failed to get time until replica replacement") + multiError.Append(util.NewMultiError(err.Error())) + return map[string]*Disk{}, multiError + } + getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) { diskCandidates = map[string]*Disk{} multiError = util.NewMultiError() @@ -217,7 +230,7 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod } usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo, - ignoreFailedReplicas) + ignoreFailedReplicas, timeToReplacementReplica == 0) allowEmptyNodeSelectorVolume, err := rcs.ds.GetSettingAsBool(types.SettingNameAllowEmptyNodeSelectorVolume) if err != nil { @@ -578,7 +591,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep hasPotentiallyReusableReplica := false for _, r := range replicas { - if IsPotentiallyReusableReplica(r, hardNodeAffinity) { + if IsPotentiallyReusableReplica(r) { hasPotentiallyReusableReplica = true break } @@ -587,42 +600,23 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep return 0 } - // Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later. - settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) - if err != nil { - logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err) - return 0 - } - waitInterval := time.Duration(settingValue) * time.Second - lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) - + timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume) if err != nil { - logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err) - return 0 + msg := "Failed to get time until replica replacement, will directly replenish a new replica" + logrus.WithError(err).Errorf(msg) } - now := time.Now() - if now.After(lastDegradedAt.Add(waitInterval)) { - return 0 + if timeUntilNext > 0 { + logrus.Infof("Replica replenishment is delayed until %v", timeOfNext) } - - logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval)) - // Adding 1 more second to the check back interval to avoid clock skew - return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second + return timeUntilNext } func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) (bool, error) { - if r.Spec.FailedAt == "" { - return false, nil - } - if r.Spec.NodeID == "" || r.Spec.DiskID == "" { - return false, nil - } - if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount { - return false, nil - } - if r.Spec.EvictionRequested { + // All failedReusableReplicas are also potentiallyFailedReusableReplicas. + if !IsPotentiallyReusableReplica(r) { return false, nil } + if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { return false, nil } @@ -693,9 +687,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon return true, nil } -// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable. -// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. -func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool { +// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means +// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. +func IsPotentiallyReusableReplica(r *longhorn.Replica) bool { if r.Spec.FailedAt == "" { return false } @@ -708,9 +702,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) if r.Spec.EvictionRequested { return false } - if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { - return false - } // TODO: Reuse failed replicas for a SPDK volume if datastore.IsDataEngineV2(r.Spec.DataEngine) { return false @@ -865,10 +856,14 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false } -// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. Some callers do not consider a -// node or zone to be used if it contains a failed replica. ignoreFailedReplicas == true supports this use case. +// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. +// - Some callers do not consider a node or zone to be used if it contains a failed replica. +// ignoreFailedReplicas == true supports this use case. +// - Otherwise, getCurrentNodesAndZones does not consider a node or zone to be occupied by a failed replica that can +// no longer be used or is likely actively being replaced. This makes nodes and zones with useless replicas +// available for scheduling. func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node, - ignoreFailedReplicas bool) (map[string]*longhorn.Node, + ignoreFailedReplicas, creatingNewReplicasForReplenishment bool) (map[string]*longhorn.Node, map[string]bool, map[string]bool, map[string]bool) { usedNodes := map[string]*longhorn.Node{} usedZones := map[string]bool{} @@ -882,8 +877,16 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map if r.DeletionTimestamp != nil { continue } - if r.Spec.FailedAt != "" && ignoreFailedReplicas { - continue + if r.Spec.FailedAt != "" { + if ignoreFailedReplicas { + continue + } + if !IsPotentiallyReusableReplica(r) { + continue // This replica can never be used again, so it does not count in scheduling decisions. + } + if creatingNewReplicasForReplenishment { + continue // Maybe this replica can be used again, but it is being actively replaced anyway. + } } if node, ok := nodeInfo[r.Spec.NodeID]; ok { @@ -912,3 +915,38 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map return usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones } + +// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume, +// even if there are potentially reusable failed replicas. It returns: +// - -time.Duration if there is no need for a replacement replica, +// - 0 if a replacement replica is needed right now (replica-replenishment-wait-interval has elapsed), +// - +time.Duration if a replacement replica will be needed (replica-replenishment-wait-interval has not elapsed). +func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) { + if volume.Status.Robustness != longhorn.VolumeRobustnessDegraded { + // No replacement replica is needed. + return -1, time.Time{}, nil + } + + settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) + if err != nil { + err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval") + return 0, time.Time{}, err + } + waitInterval := time.Duration(settingValue) * time.Second + + lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) + if err != nil { + err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt) + return 0, time.Time{}, err + } + + now := rcs.nowHandler() + if now.After(lastDegradedAt.Add(waitInterval)) { + // A replacement replica is needed now. + return 0, time.Time{}, nil + } + + timeOfNext := lastDegradedAt.Add(waitInterval) + // Adding 1 more second to the check back interval to avoid clock skew + return timeOfNext.Sub(now) + time.Second, timeOfNext, nil +} diff --git a/scheduler/replica_scheduler_test.go b/scheduler/replica_scheduler_test.go index be38bb7479..e550ee873c 100644 --- a/scheduler/replica_scheduler_test.go +++ b/scheduler/replica_scheduler_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/fake" @@ -54,7 +55,8 @@ const ( TestZone1 = "test-zone-1" TestZone2 = "test-zone-2" - TestTimeNow = "2015-01-02T00:00:00Z" + TestTimeNow = "2015-01-02T00:00:00Z" + TestTimeOneMinuteAgo = "2015-01-01T23:59:00Z" ) var longhornFinalizerKey = longhorn.SchemeGroupVersion.Group @@ -65,7 +67,9 @@ func newReplicaScheduler(lhClient *lhfake.Clientset, kubeClient *fake.Clientset, ds := datastore.NewDataStore(TestNamespace, lhClient, kubeClient, extensionsClient, informerFactories) - return NewReplicaScheduler(ds) + rcs := NewReplicaScheduler(ds) + rcs.nowHandler = getTestNow + return rcs } func newDaemonPod(phase corev1.PodPhase, name, namespace, nodeID, podIP string) *corev1.Pod { @@ -242,6 +246,7 @@ type ReplicaSchedulerTestCase struct { replicaNodeSoftAntiAffinity string replicaZoneSoftAntiAffinity string replicaDiskSoftAntiAffinity string + ReplicaReplenishmentWaitInterval string // some test cases only try to schedule a subset of a volume's replicas allReplicas map[string]*longhorn.Replica @@ -1099,24 +1104,36 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { tc.replicaZoneSoftAntiAffinity = "false" // Do not allow replicas to schedule to the same zone. testCases["fail scheduling when doing so would reuse an invalid evicting node"] = tc - // Test fail to schedule to node with failed replica - // If replicaNodeSoftAntiAffinity == false, this shouldn't be possible. - tc = generateFailedReplicaTestCase("false") + // Test potentially reusable replica before interval expires + // We should fail to schedule a new replica to this node until the interval expires. + tc = generateFailedReplicaTestCase(true, false) tc.err = false tc.firstNilReplica = 0 - testCases["fail to schedule to node with failed replica"] = tc + testCases["potentially reusable replica before interval expires"] = tc - // Test succeed to schedule to node with failed replica - // If replicaNodeSoftAntiAffinity == true, this should be possible. - tc = generateFailedReplicaTestCase("true") + // Test potentially reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the interval expired. + tc = generateFailedReplicaTestCase(true, true) tc.err = false tc.firstNilReplica = -1 - testCases["succeed to schedule to node with failed replica"] = tc + testCases["potentially reusable replica after interval expires"] = tc - testCasesActual := map[string]*ReplicaSchedulerTestCase{} - testCasesActual["fail to schedule to node with failed replica"] = testCases["fail to schedule to node with failed replica"] - testCasesActual["succeed to schedule to node with failed replica"] = testCases["succeed to schedule to node with failed replica"] - for name, tc := range testCasesActual { + // Test non-reusable replica before interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable. + tc = generateFailedReplicaTestCase(false, false) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica before interval expires"] = tc + + // Test non-reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable and the + // interval expired anyway. + tc = generateFailedReplicaTestCase(false, true) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica after interval expires"] = tc + + for name, tc := range testCases { fmt.Printf("testing %v\n", name) kubeClient := fake.NewSimpleClientset() @@ -1207,11 +1224,11 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } } -func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *ReplicaSchedulerTestCase) { +// generateFailedReplicaTestCase helps generate test cases in which a node contains a failed replica and the scheduler +// must decide whether to allow additional replicas to schedule to it. +func generateFailedReplicaTestCase( + replicaReusable, waitIntervalExpired bool) (tc *ReplicaSchedulerTestCase) { tc = generateSchedulerTestCase() - tc.replicaNodeSoftAntiAffinity = replicaNodeSoftAntiAffinity - tc.replicaDiskSoftAntiAffinity = "true" // Do not hinder replica scheduling except for node. - tc.replicaZoneSoftAntiAffinity = "true" // Do not hinder replica scheduling except for node. daemon1 := newDaemonPod(corev1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1) tc.daemons = []*corev1.Pod{ daemon1, @@ -1223,6 +1240,10 @@ func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *Repl getDiskID(TestNode1, "1"): disk, } + // We are specifically interested in situations in which a replica is ONLY schedulable to a node because its + // existing replica is failed. + tc.replicaNodeSoftAntiAffinity = "false" + // A failed replica is already scheduled. var alreadyScheduledReplica *longhorn.Replica for _, replica := range tc.allReplicas { @@ -1231,7 +1252,22 @@ func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *Repl } delete(tc.replicasToSchedule, alreadyScheduledReplica.Name) alreadyScheduledReplica.Spec.NodeID = TestNode1 + alreadyScheduledReplica.Spec.DiskID = getDiskID(TestNode1, "1") alreadyScheduledReplica.Spec.FailedAt = TestTimeNow + tc.volume.Status.Robustness = longhorn.VolumeRobustnessDegraded + tc.volume.Status.LastDegradedAt = TestTimeOneMinuteAgo + + if replicaReusable { + alreadyScheduledReplica.Spec.RebuildRetryCount = 0 + } else { + alreadyScheduledReplica.Spec.RebuildRetryCount = 5 + } + + if waitIntervalExpired { + tc.ReplicaReplenishmentWaitInterval = "30" + } else { + tc.ReplicaReplenishmentWaitInterval = "90" + } node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ getDiskID(TestNode1, "1"): { @@ -1301,6 +1337,17 @@ func setSettings(tc *ReplicaSchedulerTestCase, lhClient *lhfake.Clientset, sInde err = sIndexer.Add(setting) c.Assert(err, IsNil) } + // Set replica replenishment wait interval setting + if tc.ReplicaReplenishmentWaitInterval != "" { + s := initSettings( + string(types.SettingNameReplicaReplenishmentWaitInterval), + tc.ReplicaReplenishmentWaitInterval) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } } func (s *TestSuite) TestFilterDisksWithMatchingReplicas(c *C) { @@ -1522,10 +1569,15 @@ func (s *TestSuite) TestGetCurrentNodesAndZones(c *C) { for name, tc := range testCases { fmt.Printf("testing %v\n", name) usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(tc.replicas, tc.nodeInfo, - false) + false, false) verifyNodeNames(tc.expectUsedNodeNames, usedNodes) verifyZoneNames(tc.expectUsedZoneNames, usedZones) verifyOnlyEvictingNodeNames(tc.expectOnlyEvictingNodeNames, onlyEvictingNodes) verifyOnlyEvictingZoneNames(tc.expectOnlyEvictingZoneNames, onlyEvictingZones) } } + +func getTestNow() time.Time { + now, _ := time.Parse(time.RFC3339, TestTimeNow) + return now +}