Skip to content

Commit

Permalink
WIP scheduler and scheduler test changes
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Weber <eric.weber@suse.com>
  • Loading branch information
ejweber committed Jul 28, 2023
1 parent f48ec93 commit efef51e
Show file tree
Hide file tree
Showing 2 changed files with 493 additions and 344 deletions.
46 changes: 41 additions & 5 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,20 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
volume.Spec.ReplicaDiskSoftAntiAffinity != "" {
diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled
}
_ = diskSoftAntiAffinity // TODO: Use this value during scheduling.

getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
diskCandidates = map[string]*Disk{}
multiError = util.NewMultiError()
for _, node := range nodes {
diskCandidates, errors := rcs.filterNodeDisksForReplica(node, nodeDisksMap[node.Name], replicas, volume, requireSchedulingCheck)
if len(diskCandidates) > 0 {
return diskCandidates, nil
diskCandidatesFromNode, errors := rcs.filterNodeDisksForReplica(node, nodeDisksMap[node.Name], replicas, volume, requireSchedulingCheck)
diskCandidatesFromNode = filterDisksWithMatchingReplicas(diskCandidatesFromNode, replicas, diskSoftAntiAffinity)
// TODO: Verify this change is necessary.
for k, v := range diskCandidatesFromNode {
diskCandidates[k] = v
}
multiError.Append(errors)
}
return map[string]*Disk{}, multiError
return diskCandidates, multiError
}

usedNodes := map[string]*longhorn.Node{}
Expand Down Expand Up @@ -358,6 +360,40 @@ func (rcs *ReplicaScheduler) filterNodeDisksForReplica(node *longhorn.Node, disk
return preferredDisks, multiError
}

// filterDisksWithMatchingReplicas filters the input disks map and returns only the disks that have the fewest matching
// replicas. If allowDuplicates is false, it only returns disks that have no matching replicas.
func filterDisksWithMatchingReplicas(disks map[string]*Disk, replicas map[string]*longhorn.Replica, allowMatches bool) map[string]*Disk {
replicasCountPerDisk := map[string]int{}
for _, r := range replicas {
replicasCountPerDisk[r.Spec.DiskID]++
}

highestReplicaCount := 0
disksByReplicaCount := map[int]map[string]*Disk{}
for diskUUID, disk := range disks {
count := replicasCountPerDisk[diskUUID]
if disksByReplicaCount[count] == nil {
disksByReplicaCount[count] = map[string]*Disk{}
}
disksByReplicaCount[count][diskUUID] = disk
if count > highestReplicaCount {
highestReplicaCount = count
}
}

if len(disksByReplicaCount[0]) > 0 || !allowMatches {
return disksByReplicaCount[0]
}

for i := 1; i <= highestReplicaCount; i++ {
if len(disksByReplicaCount[i]) > 0 {
return disksByReplicaCount[i]
}
}

return map[string]*Disk{}
}

func (rcs *ReplicaScheduler) getNodeInfo() (map[string]*longhorn.Node, error) {
nodeInfo, err := rcs.ds.ListNodes()
if err != nil {
Expand Down
Loading

0 comments on commit efef51e

Please sign in to comment.