From c533c6ad2b82d564db5ea8d4bbb2de36418beb12 Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Mon, 31 Jul 2023 11:31:21 -0500 Subject: [PATCH] Account for ReplicaDiskSoftAntiAffinity setting when scheduling Signed-off-by: Eric Weber --- scheduler/replica_scheduler.go | 47 +++++- scheduler/replica_scheduler_test.go | 250 ++++++++++++++++++++++++++-- 2 files changed, 274 insertions(+), 23 deletions(-) diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go index 0d3719ec9f..dc445a9448 100644 --- a/scheduler/replica_scheduler.go +++ b/scheduler/replica_scheduler.go @@ -172,18 +172,20 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod volume.Spec.ReplicaDiskSoftAntiAffinity != "" { diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled } - _ = diskSoftAntiAffinity // TODO: Use this value during scheduling. getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) { + diskCandidates = map[string]*Disk{} multiError = util.NewMultiError() for _, node := range nodes { - diskCandidates, errors := rcs.filterNodeDisksForReplica(node, nodeDisksMap[node.Name], replicas, volume, requireSchedulingCheck) - if len(diskCandidates) > 0 { - return diskCandidates, nil + diskCandidatesFromNode, errors := rcs.filterNodeDisksForReplica(node, nodeDisksMap[node.Name], replicas, + volume, requireSchedulingCheck) + for k, v := range diskCandidatesFromNode { + diskCandidates[k] = v } multiError.Append(errors) } - return map[string]*Disk{}, multiError + diskCandidates = filterDisksWithMatchingReplicas(diskCandidates, replicas, diskSoftAntiAffinity) + return diskCandidates, multiError } usedNodes := map[string]*longhorn.Node{} @@ -358,6 +360,41 @@ func (rcs *ReplicaScheduler) filterNodeDisksForReplica(node *longhorn.Node, disk return preferredDisks, multiError } +// filterDisksWithMatchingReplicas filters the input disks map and returns only the disks that have the fewest matching +// replicas. If allowDuplicates is false, it only returns disks that have no matching replicas. +func filterDisksWithMatchingReplicas(disks map[string]*Disk, replicas map[string]*longhorn.Replica, + allowMatches bool) map[string]*Disk { + replicasCountPerDisk := map[string]int{} + for _, r := range replicas { + replicasCountPerDisk[r.Spec.DiskID]++ + } + + highestReplicaCount := 0 + disksByReplicaCount := map[int]map[string]*Disk{} + for diskUUID, disk := range disks { + count := replicasCountPerDisk[diskUUID] + if disksByReplicaCount[count] == nil { + disksByReplicaCount[count] = map[string]*Disk{} + } + disksByReplicaCount[count][diskUUID] = disk + if count > highestReplicaCount { + highestReplicaCount = count + } + } + + if len(disksByReplicaCount[0]) > 0 || !allowMatches { + return disksByReplicaCount[0] + } + + for i := 1; i <= highestReplicaCount; i++ { + if len(disksByReplicaCount[i]) > 0 { + return disksByReplicaCount[i] + } + } + + return map[string]*Disk{} +} + func (rcs *ReplicaScheduler) getNodeInfo() (map[string]*longhorn.Node, error) { nodeInfo, err := rcs.ds.ListNodes() if err != nil { diff --git a/scheduler/replica_scheduler_test.go b/scheduler/replica_scheduler_test.go index 13a5c0379f..879b15aa99 100644 --- a/scheduler/replica_scheduler_test.go +++ b/scheduler/replica_scheduler_test.go @@ -234,13 +234,19 @@ type ReplicaSchedulerTestCase struct { storageOverProvisioningPercentage string storageMinimalAvailablePercentage string replicaNodeSoftAntiAffinity string + replicaDiskSoftAntiAffinity string // schedule state expectedNodes map[string]*longhorn.Node + expectedDisks map[string]struct{} // scheduler exception err bool - // couldn't schedule replica - isNilReplica bool + // first replica expected to fail scheduling + // -1 = default in constructor, don't fail to schedule + // 0 = fail to schedule first + // 1 = schedule first, fail to schedule second + // etc... + firstNilReplica int } func generateSchedulerTestCase() *ReplicaSchedulerTestCase { @@ -253,9 +259,10 @@ func generateSchedulerTestCase() *ReplicaSchedulerTestCase { } engineImage := newEngineImage(TestEngineImage, longhorn.EngineImageStateDeployed) return &ReplicaSchedulerTestCase{ - volume: v, - replicas: replicas, - engineImage: engineImage, + volume: v, + replicas: replicas, + engineImage: engineImage, + firstNilReplica: -1, } } @@ -330,7 +337,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = false + tc.firstNilReplica = -1 // Set replica node soft anti-affinity tc.replicaNodeSoftAntiAffinity = "true" testCases["nodes could not schedule"] = tc @@ -355,7 +362,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 testCases["there's no disk for replica"] = tc // Test engine image is not deployed on any node @@ -408,7 +415,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 testCases["there's no engine image deployed on any node"] = tc // Test anti affinity nodes, replica should schedule to both node1 and node2 @@ -499,7 +506,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = false + tc.firstNilReplica = -1 testCases["anti-affinity nodes"] = tc // Test scheduler error when replica.NodeID is not "" @@ -509,7 +516,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { replica.Spec.NodeID = TestNode1 } tc.err = true - tc.isNilReplica = true + tc.firstNilReplica = 0 testCases["scheduler error when replica has NodeID"] = tc // Test no available disks @@ -576,7 +583,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 tc.storageOverProvisioningPercentage = "0" tc.storageMinimalAvailablePercentage = "100" testCases["there's no available disks for scheduling"] = tc @@ -646,7 +653,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { expectedNodes = map[string]*longhorn.Node{} tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = true + tc.firstNilReplica = 0 tc.storageOverProvisioningPercentage = "200" tc.storageMinimalAvailablePercentage = "20" testCases["there's no available disks for scheduling due to required storage"] = tc @@ -667,7 +674,6 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { getDiskID(TestNode1, "1"): disk, getDiskID(TestNode1, "2"): disk2, } - node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ getDiskID(TestNode1, "1"): { StorageAvailable: TestDiskAvailableSize, @@ -739,9 +745,112 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { } tc.expectedNodes = expectedNodes tc.err = false - tc.isNilReplica = false + tc.firstNilReplica = -1 testCases["schedule to disk with the most usable storage"] = tc + // Test schedule to a second disk on the same node even if the first has more available storage + tc = generateSchedulerTestCase() + daemon1 = newDaemonPod(v1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1) + tc.daemons = []*v1.Pod{ + daemon1, + } + node1 = newNode(TestNode1, TestNamespace, true, longhorn.ConditionStatusTrue) + tc.engineImage.Status.NodeDeploymentMap[node1.Name] = true + disk = newDisk(TestDefaultDataPath, true, 0) + disk2 = newDisk(TestDefaultDataPath, true, 0) + node1.Spec.Disks = map[string]longhorn.DiskSpec{ + getDiskID(TestNode1, "1"): disk, + getDiskID(TestNode1, "2"): disk2, + } + node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ + getDiskID(TestNode1, "1"): { + StorageAvailable: TestDiskAvailableSize * 2, + StorageScheduled: 0, + StorageMaximum: TestDiskSize, + Conditions: []longhorn.Condition{ + newCondition(longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusTrue), + }, + DiskUUID: getDiskID(TestNode1, "1"), + Type: longhorn.DiskTypeFilesystem, + }, + getDiskID(TestNode1, "2"): { + StorageAvailable: TestDiskAvailableSize / 2, + StorageScheduled: 0, + StorageMaximum: TestDiskSize, + Conditions: []longhorn.Condition{ + newCondition(longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusTrue), + }, + DiskUUID: getDiskID(TestNode1, "2"), + Type: longhorn.DiskTypeFilesystem, + }, + } + expectNode1 = newNode(TestNode1, TestNamespace, true, longhorn.ConditionStatusTrue) + expectNode1.Spec.Disks = map[string]longhorn.DiskSpec{ + getDiskID(TestNode1, "1"): disk, + } + nodes = map[string]*longhorn.Node{ + TestNode1: node1, + } + tc.nodes = nodes + expectedNodes = map[string]*longhorn.Node{ + TestNode1: expectNode1, + } + tc.expectedNodes = expectedNodes + tc.expectedDisks = map[string]struct{}{ + getDiskID(TestNode1, "1"): {}, + getDiskID(TestNode1, "2"): {}, + } + tc.err = false + tc.replicaNodeSoftAntiAffinity = "true" // Allow replicas to schedule to the same node. + testCases["schedule to a second disk on the same node even if the first has more available storage"] = tc + + // Test fail scheduling when replicaDiskSoftAntiAffinity is false + tc = generateSchedulerTestCase() + daemon1 = newDaemonPod(v1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1) + tc.daemons = []*v1.Pod{ + daemon1, + } + node1 = newNode(TestNode1, TestNamespace, true, longhorn.ConditionStatusTrue) + tc.engineImage.Status.NodeDeploymentMap[node1.Name] = true + disk = newDisk(TestDefaultDataPath, true, 0) + node1.Spec.Disks = map[string]longhorn.DiskSpec{ + getDiskID(TestNode1, "1"): disk, + } + alreadyScheduledReplica := newReplicaForVolume(tc.volume) + node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{ + getDiskID(TestNode1, "1"): { + StorageAvailable: TestDiskAvailableSize, + StorageScheduled: TestVolumeSize, + StorageMaximum: TestDiskSize, + Conditions: []longhorn.Condition{ + newCondition(longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusTrue), + }, + DiskUUID: getDiskID(TestNode1, "1"), + Type: longhorn.DiskTypeFilesystem, + ScheduledReplica: map[string]int64{alreadyScheduledReplica.Name: TestVolumeSize}, + }, + } + expectNode1 = newNode(TestNode1, TestNamespace, true, longhorn.ConditionStatusTrue) + expectNode1.Spec.Disks = map[string]longhorn.DiskSpec{ + getDiskID(TestNode1, "1"): disk, + } + nodes = map[string]*longhorn.Node{ + TestNode1: node1, + } + tc.nodes = nodes + expectedNodes = map[string]*longhorn.Node{ + TestNode1: expectNode1, + } + tc.expectedNodes = expectedNodes + tc.expectedDisks = map[string]struct{}{ + getDiskID(TestNode1, "1"): {}, + } + tc.err = false + tc.firstNilReplica = 1 // There is only one disk, so the second replica must fail to schedule. + tc.replicaNodeSoftAntiAffinity = "true" // Allow replicas to schedule to the same node. + tc.replicaDiskSoftAntiAffinity = "false" // Do not allow replicas to schedule to the same disk. + testCases["fail scheduling when replicaDiskSoftAntiAffinity is true"] = tc + for name, tc := range testCases { fmt.Printf("testing %v\n", name) @@ -813,7 +922,19 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { err = sIndexer.Add(setting) c.Assert(err, IsNil) } + // Set replica disk soft anti-affinity setting + if tc.replicaDiskSoftAntiAffinity != "" { + s := initSettings( + string(types.SettingNameReplicaDiskSoftAntiAffinity), + tc.replicaDiskSoftAntiAffinity) + setting, err := + lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{}) + c.Assert(err, IsNil) + err = sIndexer.Add(setting) + c.Assert(err, IsNil) + } // validate scheduler + numScheduled := 0 for _, replica := range tc.replicas { r, err := lhClient.LonghornV1beta2().Replicas(TestNamespace).Create(context.TODO(), replica, metav1.CreateOptions{}) c.Assert(err, IsNil) @@ -825,7 +946,7 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { if tc.err { c.Assert(err, NotNil) } else { - if tc.isNilReplica { + if numScheduled == tc.firstNilReplica { c.Assert(sr, IsNil) } else { c.Assert(err, IsNil) @@ -836,15 +957,108 @@ func (s *TestSuite) TestReplicaScheduler(c *C) { c.Assert(sr.Spec.DataDirectoryName, Not(Equals), "") tc.replicas[sr.Name] = sr // check expected node - for nname, node := range tc.expectedNodes { - if sr.Spec.NodeID == nname { + for name, node := range tc.expectedNodes { + if sr.Spec.NodeID == name { c.Assert(sr.Spec.DiskPath, Equals, node.Spec.Disks[sr.Spec.DiskID].Path) - delete(tc.expectedNodes, nname) + delete(tc.expectedNodes, name) } } + // check expected disk + for diskUUID := range tc.expectedDisks { + if sr.Spec.DiskID == diskUUID { + delete(tc.expectedDisks, diskUUID) + } + } + numScheduled++ } } } c.Assert(len(tc.expectedNodes), Equals, 0) + c.Assert(len(tc.expectedDisks), Equals, 0) + } +} + +func (s *TestSuite) TestFilterDisksWithMatchingReplicas(c *C) { + type testCase struct { + inputDiskUUIDs []string + inputReplicas map[string]*longhorn.Replica + allowMatches bool + + expectDiskUUIDs []string + } + tests := map[string]testCase{} + + tc := testCase{} + diskUUID1 := getDiskID(TestNode1, "1") + diskUUID2 := getDiskID(TestNode2, "2") + tc.inputDiskUUIDs = []string{diskUUID1, diskUUID2} + v := newVolume(TestVolumeName, 3) + replica1 := newReplicaForVolume(v) + replica1.Spec.DiskID = diskUUID1 + replica2 := newReplicaForVolume(v) + replica2.Spec.DiskID = diskUUID2 + tc.inputReplicas = map[string]*longhorn.Replica{ + replica1.Name: replica1, + replica2.Name: replica2, + } + tc.allowMatches = false + tc.expectDiskUUIDs = []string{} // No disks can be scheduled. + tests["allowMatches = false and no empty disks"] = tc + + tc = testCase{} + diskUUID1 = getDiskID(TestNode1, "1") + diskUUID2 = getDiskID(TestNode2, "2") + tc.inputDiskUUIDs = []string{diskUUID1, diskUUID2} + v = newVolume(TestVolumeName, 3) + replica1 = newReplicaForVolume(v) + replica1.Spec.DiskID = diskUUID1 + replica2 = newReplicaForVolume(v) + replica2.Spec.DiskID = diskUUID2 + tc.inputReplicas = map[string]*longhorn.Replica{ + replica1.Name: replica1, + replica2.Name: replica2, + } + tc.allowMatches = true + tc.expectDiskUUIDs = append(tc.expectDiskUUIDs, tc.inputDiskUUIDs...) // Both disks are equally viable. + tests["allowMatches = true and no empty disks"] = tc + + tc = testCase{} + diskUUID1 = getDiskID(TestNode1, "1") + diskUUID2 = getDiskID(TestNode2, "2") + diskUUID3 := getDiskID(TestNode2, "3") + diskUUID4 := getDiskID(TestNode2, "4") + diskUUID5 := getDiskID(TestNode2, "5") + tc.inputDiskUUIDs = []string{diskUUID1, diskUUID2, diskUUID3, diskUUID4, diskUUID5} + v = newVolume(TestVolumeName, 3) + replica1 = newReplicaForVolume(v) + replica1.Spec.DiskID = diskUUID1 + replica2 = newReplicaForVolume(v) + replica2.Spec.DiskID = diskUUID2 + replica3 := newReplicaForVolume(v) + replica3.Spec.DiskID = diskUUID3 + replica4 := newReplicaForVolume(v) + replica4.Spec.DiskID = diskUUID4 + tc.inputReplicas = map[string]*longhorn.Replica{ + replica1.Name: replica1, + replica2.Name: replica2, + replica3.Name: replica3, + replica4.Name: replica4, + } + tc.allowMatches = true + tc.expectDiskUUIDs = []string{diskUUID5} // Only disk5 has no matching replica. + tests["only schedule to disk without matching replica"] = tc + + for name, tc := range tests { + fmt.Printf("testing %v\n", name) + inputDisks := map[string]*Disk{} + for _, UUID := range tc.inputDiskUUIDs { + inputDisks[UUID] = &Disk{} + } + outputDiskUUIDs := filterDisksWithMatchingReplicas(inputDisks, tc.inputReplicas, tc.allowMatches) + c.Assert(len(outputDiskUUIDs), Equals, len(tc.expectDiskUUIDs)) + for _, UUID := range tc.expectDiskUUIDs { + _, ok := outputDiskUUIDs[UUID] + c.Assert(ok, Equals, true) + } } }