From 52aa894c10c735b614fd3697730bc03b91d39be6 Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Mon, 26 Feb 2024 16:45:59 -0600 Subject: [PATCH] Consider a node with a failed replica as still used The backport to v1.5.x was complex. It required some test refactoring that had not previously been backported. This commit includes the necessary refactor and the reworked Longhorn 8043 commits. Longhorn 8043 Signed-off-by: Eric Weber (cherry picked from commit 29a895c25559e5645652e3a3ddcd17322a359fe9) (cherry picked from commit 65ceb37c3ba5be08323dc06df6581ed7a001516a) (cherry picked from commit e685946e8fd11965355ad54f16f3d69f0d211d25) --- scheduler/replica_scheduler.go | 142 +++++++++++----- scheduler/replica_scheduler_test.go | 248 ++++++++++++++++++++++------ 2 files changed, 296 insertions(+), 94 deletions(-) diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go index 1a7f4b32b6..9af9c55cd7 100644 --- a/scheduler/replica_scheduler.go +++ b/scheduler/replica_scheduler.go @@ -20,6 +20,9 @@ const ( type ReplicaScheduler struct { ds *datastore.DataStore + + // Required for unit testing. + nowHandler func() time.Time } type Disk struct { @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct { func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler { rcScheduler := &ReplicaScheduler{ ds: ds, + + // Required for unit testing. + nowHandler: time.Now, } return rcScheduler } @@ -88,7 +94,7 @@ func (rcs *ReplicaScheduler) ScheduleReplica(replica *longhorn.Replica, replicas nodeDisksMap[node.Name] = disks } - diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true) + diskCandidates, multiError := rcs.getDiskCandidates(nodeCandidates, nodeDisksMap, replicas, volume, true, false) // there's no disk that fit for current replica if len(diskCandidates) == 0 { @@ -143,7 +149,17 @@ func getNodesWithEvictingReplicas(replicas map[string]*longhorn.Replica, nodeInf return nodesWithEvictingReplicas } -func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, nodeDisksMap map[string]map[string]struct{}, replicas map[string]*longhorn.Replica, volume *longhorn.Volume, requireSchedulingCheck bool) (map[string]*Disk, util.MultiError) { +// getDiskCandidates returns a map of the most appropriate disks a replica can be scheduled to (assuming it can be +// scheduled at all). For example, consider a case in which there are two disks on nodes without a replica for a volume +// and two disks on nodes with a replica for the same volume. getDiskCandidates only returns the disks without a +// replica, even if the replica can legally be scheduled on all four disks. +// Some callers (e.g. CheckAndReuseFailedReplicas) do not consider a node or zone to be used if it contains a failed +// replica. ignoreFailedReplicas == true supports this use case. +func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Node, + nodeDisksMap map[string]map[string]struct{}, + replicas map[string]*longhorn.Replica, + volume *longhorn.Volume, + requireSchedulingCheck, ignoreFailedReplicas bool) (map[string]*Disk, util.MultiError) { multiError := util.NewMultiError() nodeSoftAntiAffinity, err := rcs.ds.GetSettingAsBool(types.SettingNameReplicaSoftAntiAffinity) @@ -165,6 +181,17 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod zoneSoftAntiAffinity = volume.Spec.ReplicaZoneSoftAntiAffinity == longhorn.ReplicaZoneSoftAntiAffinityEnabled } + creatingNewReplicasForReplenishment := false + if volume.Status.Robustness == longhorn.VolumeRobustnessDegraded { + timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume) + if err != nil { + err = errors.Wrap(err, "failed to get time until replica replacement") + multiError.Append(util.NewMultiError(err.Error())) + return map[string]*Disk{}, multiError + } + creatingNewReplicasForReplenishment = timeToReplacementReplica == 0 + } + getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) { multiError = util.NewMultiError() for _, node := range nodes { @@ -182,14 +209,30 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod replicasCountPerNode := map[string]int{} // Get current nodes and zones for _, r := range replicas { - if r.Spec.NodeID != "" && r.DeletionTimestamp == nil && r.Spec.FailedAt == "" { - if node, ok := nodeInfo[r.Spec.NodeID]; ok { - usedNodes[r.Spec.NodeID] = node - // For empty zone label, we treat them as - // one zone. - usedZones[node.Status.Zone] = true - replicasCountPerNode[r.Spec.NodeID] = replicasCountPerNode[r.Spec.NodeID] + 1 + if r.Spec.NodeID == "" { + continue + } + if r.DeletionTimestamp != nil { + continue + } + if r.Spec.FailedAt != "" { + if ignoreFailedReplicas { + continue + } + if !IsPotentiallyReusableReplica(r) { + continue // This replica can never be used again, so it does not count in scheduling decisions. } + if creatingNewReplicasForReplenishment { + continue // Maybe this replica can be used again, but it is being actively replaced anyway. + } + } + + if node, ok := nodeInfo[r.Spec.NodeID]; ok { + usedNodes[r.Spec.NodeID] = node + // For empty zone label, we treat them as + // one zone. + usedZones[node.Status.Zone] = true + replicasCountPerNode[r.Spec.NodeID] = replicasCountPerNode[r.Spec.NodeID] + 1 } } @@ -471,7 +514,9 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon } } - diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false) + // Call getDiskCandidates with ignoreFailedReplicas == true since we want the list of candidates to include disks + // that already contain a failed replica. + diskCandidates, _ := rcs.getDiskCandidates(availableNodesInfo, availableNodeDisksMap, replicas, volume, false, true) var reusedReplica *longhorn.Replica for _, suggestDisk := range diskCandidates { @@ -513,7 +558,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep hasPotentiallyReusableReplica := false for _, r := range replicas { - if IsPotentiallyReusableReplica(r, hardNodeAffinity) { + if IsPotentiallyReusableReplica(r) { hasPotentiallyReusableReplica = true break } @@ -522,42 +567,25 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep return 0 } - // Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later. - settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) + timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume) if err != nil { - logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err) - return 0 + msg := "Failed to get time until replica replacement, will directly replenish a new replica" + logrus.WithError(err).Errorf(msg) } - waitInterval := time.Duration(settingValue) * time.Second - lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) - - if err != nil { - logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err) - return 0 + if timeUntilNext > 0 { + // Adding another second to the checkBackDuration to avoid clock skew. + timeUntilNext = timeUntilNext + time.Second + logrus.Infof("Replica replenishment is delayed until %v", timeOfNext.Add(time.Second)) } - now := time.Now() - if now.After(lastDegradedAt.Add(waitInterval)) { - return 0 - } - - logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval)) - // Adding 1 more second to the check back interval to avoid clock skew - return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second + return timeUntilNext } func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) bool { - if r.Spec.FailedAt == "" { - return false - } - if r.Spec.NodeID == "" || r.Spec.DiskID == "" { - return false - } - if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount { - return false - } - if r.Spec.EvictionRequested { + // All failedReusableReplicas are also potentiallyFailedReusableReplicas. + if !IsPotentiallyReusableReplica(r) { return false } + if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { return false } @@ -622,9 +650,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon return true } -// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable. -// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. -func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool { +// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means +// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue. +func IsPotentiallyReusableReplica(r *longhorn.Replica) bool { if r.Spec.FailedAt == "" { return false } @@ -637,9 +665,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) if r.Spec.EvictionRequested { return false } - if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity { - return false - } // TODO: Reuse failed replicas for a SPDK volume if r.Spec.BackendStoreDriver == longhorn.BackendStoreDriverTypeV2 { return false @@ -793,3 +818,30 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long } return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false } + +// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume, +// even if there are potentially reusable failed replicas. It returns 0 if replica-replenishment-wait-interval has +// elapsed and a new replica is needed right now. +func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) { + settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval) + if err != nil { + err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval") + return 0, time.Time{}, err + } + waitInterval := time.Duration(settingValue) * time.Second + + lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt) + if err != nil { + err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt) + return 0, time.Time{}, err + } + + now := rcs.nowHandler() + timeOfNext := lastDegradedAt.Add(waitInterval) + if now.After(timeOfNext) { + // A replacement replica is needed now. + return 0, time.Time{}, nil + } + + return timeOfNext.Sub(now), timeOfNext, nil +} diff --git a/scheduler/replica_scheduler_test.go b/scheduler/replica_scheduler_test.go index 96295e0635..17ff8b4715 100644 --- a/scheduler/replica_scheduler_test.go +++ b/scheduler/replica_scheduler_test.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "testing" + "time" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" corev1 "k8s.io/api/core/v1" @@ -49,6 +51,9 @@ const ( // TestDiskID2 = "diskID2" TestDiskSize = 5000000000 TestDiskAvailableSize = 3000000000 + + TestTimeNow = "2015-01-02T00:00:00Z" + TestTimeOneMinuteAgo = "2015-01-01T23:59:00Z" ) var longhornFinalizerKey = longhorn.SchemeGroupVersion.Group @@ -59,7 +64,9 @@ func newReplicaScheduler(lhClient *lhfake.Clientset, kubeClient *fake.Clientset, ds := datastore.NewDataStore(TestNamespace, lhClient, kubeClient, extensionsClient, informerFactories) - return NewReplicaScheduler(ds) + rcs := NewReplicaScheduler(ds) + rcs.nowHandler = getTestNow + return rcs } func newDaemonPod(phase corev1.PodPhase, name, namespace, nodeID, podIP string) *corev1.Pod { @@ -226,35 +233,50 @@ func (s *TestSuite) SetUpTest(c *C) { type ReplicaSchedulerTestCase struct { volume *longhorn.Volume - replicas map[string]*longhorn.Replica daemons []*corev1.Pod nodes map[string]*longhorn.Node engineImage *longhorn.EngineImage storageOverProvisioningPercentage string storageMinimalAvailablePercentage string replicaNodeSoftAntiAffinity string + replicaZoneSoftAntiAffinity string + replicaReplenishmentWaitInterval string + + // some test cases only try to schedule a subset of a volume's replicas + allReplicas map[string]*longhorn.Replica + replicasToSchedule map[string]struct{} // schedule state expectedNodes map[string]*longhorn.Node // scheduler exception err bool - // couldn't schedule replica - isNilReplica bool + // first replica expected to fail scheduling + // -1 = default in constructor, don't fail to schedule + // 0 = fail to schedule first + // 1 = schedule first, fail to schedule second + // etc... + firstNilReplica int } func generateSchedulerTestCase() *ReplicaSchedulerTestCase { v := newVolume(TestVolumeName, 2) replica1 := newReplicaForVolume(v) replica2 := newReplicaForVolume(v) - replicas := map[string]*longhorn.Replica{ + allReplicas := map[string]*longhorn.Replica{ replica1.Name: replica1, replica2.Name: replica2, } + replicasToSchedule := map[string]struct{}{} + for name := range allReplicas { + replicasToSchedule[name] = struct{}{} + } engineImage := newEngineImage(TestEngineImage, longhorn.EngineImageStateDeployed) return &ReplicaSchedulerTestCase{ - volume: v, - replicas: replicas, - engineImage: engineImage, + volume: v, + engineImage: engineImage, + allReplicas: allReplicas, + replicasToSchedule: replicasToSchedule, + firstNilReplica: -1, } } @@ -329,7 +351,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = false + tc.firstNilReplica = -1 // Set replica node soft anti-affinity tc.replicaNodeSoftAntiAffinity = "true" testCases["nodes could not schedule"] = tc @@ -354,7 +376,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 testCases["there's no disk for replica"] = tc // Test engine image is not deployed on any node @@ -407,7 +429,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 testCases["there's no engine image deployed on any node"] = tc // Test anti affinity nodes, replica should schedule to both node1 and node2 @@ -498,17 +520,17 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = false + tc.firstNilReplica = -1 testCases["anti-affinity nodes"] = tc // Test scheduler error when replica.NodeID is not "" tc = generateSchedulerTestCase() - replicas := tc.replicas + replicas := tc.allReplicas for _, replica := range replicas { replica.Spec.NodeID = TestNode1 } tc.err = true - tc.isNilReplica = true + tc.firstNilReplica = 0 testCases["scheduler error when replica has NodeID"] = tc // Test no available disks @@ -575,7 +597,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 tc.storageOverProvisioningPercentage = "0" tc.storageMinimalAvailablePercentage = "100" testCases["there's no available disks for scheduling"] = tc @@ -645,7 +667,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 tc.storageOverProvisioningPercentage = "200" tc.storageMinimalAvailablePercentage = "20" testCases["there's no available disks for scheduling due to required storage"] = tc @@ -738,9 +760,38 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = false + tc.firstNilReplica = -1 testCases["schedule to disk with the most usable storage"] = tc + // Test potentially reusable replica before interval expires + // We should fail to schedule a new replica to this node until the interval expires. + tc = generateFailedReplicaTestCase(true, false) + tc.err = false + tc.firstNilReplica = 0 + testCases["potentially reusable replica before interval expires"] = tc + + // Test potentially reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the interval expired. + tc = generateFailedReplicaTestCase(true, true) + tc.err = false + tc.firstNilReplica = -1 + testCases["potentially reusable replica after interval expires"] = tc + + // Test non-reusable replica before interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable. + tc = generateFailedReplicaTestCase(false, false) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica before interval expires"] = tc + + // Test non-reusable replica after interval expires + // We should succeed to schedule a new replica to this node because the existing replica is not reusable and the + // interval expired anyway. + tc = generateFailedReplicaTestCase(false, true) + tc.err = false + tc.firstNilReplica = -1 + testCases["non-reusable replica after interval expires"] = tc + for name, tc := range testCases { fmt.Printf("testing %v\n", name) @@ -786,43 +837,21 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { err = vIndexer.Add(volume) c.Assert(err, IsNil) // set settings - if tc.storageOverProvisioningPercentage != "" && tc.storageMinimalAvailablePercentage != "" { - s := initSettings(string(types.SettingNameStorageOverProvisioningPercentage), tc.storageOverProvisioningPercentage) - setting, err := lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) - c.Assert(err, IsNil) - err = sIndexer.Add(setting) - c.Assert(err, IsNil) - - s = initSettings(string(types.SettingNameStorageMinimalAvailablePercentage), tc.storageMinimalAvailablePercentage) - setting, err = lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) - c.Assert(err, IsNil) - err = sIndexer.Add(setting) - c.Assert(err, IsNil) - } - // Set replica node soft anti-affinity setting - if tc.replicaNodeSoftAntiAffinity != "" { - s := initSettings( - string(types.SettingNameReplicaSoftAntiAffinity), - tc.replicaNodeSoftAntiAffinity) - setting, err := - lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) - c.Assert(err, IsNil) - err = sIndexer.Add(setting) - c.Assert(err, IsNil) - } + setSettings(tc, lhClient, sIndexer, c) // validate scheduler - for _, replica := range tc.replicas { - r, err := lhClient.LonghornV1beta2().Replicas(TestNamespace).Create(context.TODO(), replica, metav1.CreateOptions{}) + numScheduled := 0 + for replicaName := range tc.replicasToSchedule { + r, err := lhClient.LonghornV1beta2().Replicas(TestNamespace).Create(context.TODO(), tc.allReplicas[replicaName], metav1.CreateOptions{}) c.Assert(err, IsNil) c.Assert(r, NotNil) err = rIndexer.Add(r) c.Assert(err, IsNil) - sr, _, err := s.ScheduleReplica(r, tc.replicas, volume) + sr, _, err := s.ScheduleReplica(r, tc.allReplicas, volume) if tc.err { c.Assert(err, NotNil) } else { - if tc.isNilReplica { + if numScheduled == tc.firstNilReplica { c.Assert(sr, IsNil) } else { c.Assert(err, IsNil) @@ -831,17 +860,138 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { c.Assert(sr.Spec.DiskID, Not(Equals), "") c.Assert(sr.Spec.DiskPath, Not(Equals), "") c.Assert(sr.Spec.DataDirectoryName, Not(Equals), "") - tc.replicas[sr.Name] = sr + tc.allReplicas[sr.Name] = sr // check expected node - for nname, node := range tc.expectedNodes { - if sr.Spec.NodeID == nname { + for name, node := range tc.expectedNodes { + if sr.Spec.NodeID == name { c.Assert(sr.Spec.DiskPath, Equals, node.Spec.Disks[sr.Spec.DiskID].Path) - delete(tc.expectedNodes, nname) + delete(tc.expectedNodes, name) } } + numScheduled++ } } } c.Assert(len(tc.expectedNodes), Equals, 0) } } + +// generateFailedReplicaTestCase helps generate test cases in which a node contains a failed replica and the scheduler +// must decide whether to allow additional replicas to schedule to it. +func generateFailedReplicaTestCase( + replicaReusable, waitIntervalExpired bool) (tc *ReplicaSchedulerTestCase) { + tc = generateSchedulerTestCase() + daemon1 := newDaemonPod(corev1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1) + tc.daemons = []*corev1.Pod{ + daemon1, + } + node1 := newNode(TestNode1, TestNamespace, true, longhorn.ConditionStatusTrue) + tc.engineImage.Status.NodeDeploymentMap[node1.Name] = true + disk := newDisk(TestDefaultDataPath, true, 0) + node1.Spec.Disks = map[string]longhorn.DiskSpec{ + getDiskID(TestNode1, "1"): disk, + } + + // We are specifically interested in situations in which a replica is ONLY schedulable to a node because its + // existing replica is failed. + tc.replicaNodeSoftAntiAffinity = "false" + + // A failed replica is already scheduled. + var alreadyScheduledReplica *longhorn.Replica + for _, replica := range tc.allReplicas { + alreadyScheduledReplica = replica + break + } + delete(tc.replicasToSchedule, alreadyScheduledReplica.Name) + alreadyScheduledReplica.Spec.NodeID = TestNode1 + alreadyScheduledReplica.Spec.DiskID = getDiskID(TestNode1, "1") + alreadyScheduledReplica.Spec.FailedAt = TestTimeNow + tc.volume.Status.Robustness = longhorn.VolumeRobustnessDegraded + tc.volume.Status.LastDegradedAt = TestTimeOneMinuteAgo + + if replicaReusable { + alreadyScheduledReplica.Spec.RebuildRetryCount = 0 + } else { + alreadyScheduledReplica.Spec.RebuildRetryCount = 5 + } + + if waitIntervalExpired { + tc.replicaReplenishmentWaitInterval = "30" + } else { + tc.replicaReplenishmentWaitInterval = "90" + } + + node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ + getDiskID(TestNode1, "1"): { + StorageAvailable: TestDiskAvailableSize, + StorageScheduled: TestVolumeSize, + StorageMaximum: TestDiskSize, + Conditions: []longhorn.Condition{ + newCondition(longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusTrue), + }, + DiskUUID: getDiskID(TestNode1, "1"), + Type: longhorn.DiskTypeFilesystem, + ScheduledReplica: map[string]int64{alreadyScheduledReplica.Name: TestVolumeSize}, + }, + } + nodes := map[string]*longhorn.Node{ + TestNode1: node1, + } + tc.nodes = nodes + return +} + +func setSettings(tc *ReplicaSchedulerTestCase, lhClient *lhfake.Clientset, sIndexer cache.Indexer, c *C) { + // Set storage over-provisioning percentage settings + if tc.storageOverProvisioningPercentage != "" && tc.storageMinimalAvailablePercentage != "" { + s := initSettings(string(types.SettingNameStorageOverProvisioningPercentage), tc.storageOverProvisioningPercentage) + setting, err := lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + + s = initSettings(string(types.SettingNameStorageMinimalAvailablePercentage), tc.storageMinimalAvailablePercentage) + setting, err = lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } + // Set replica node soft anti-affinity setting + if tc.replicaNodeSoftAntiAffinity != "" { + s := initSettings( + string(types.SettingNameReplicaSoftAntiAffinity), + tc.replicaNodeSoftAntiAffinity) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } + // Set replica zone soft anti-affinity setting + if tc.replicaZoneSoftAntiAffinity != "" { + s := initSettings( + string(types.SettingNameReplicaZoneSoftAntiAffinity), + tc.replicaZoneSoftAntiAffinity) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } + // Set replica replenishment wait interval setting + if tc.replicaReplenishmentWaitInterval != "" { + s := initSettings( + string(types.SettingNameReplicaReplenishmentWaitInterval), + tc.replicaReplenishmentWaitInterval) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } +} + +func getTestNow() time.Time { + now, _ := time.Parse(time.RFC3339, TestTimeNow) + return now +}