Skip to content

Commit

Permalink
Account for ReplicaDiskSoftAntiAffinity setting when scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
ejweber committed Jul 31, 2023
1 parent f48ec93 commit 48ddbfe
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 23 deletions.
47 changes: 42 additions & 5 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,20 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
volume.Spec.ReplicaDiskSoftAntiAffinity != "" {
diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled
}
_ = diskSoftAntiAffinity // TODO: Use this value during scheduling.

getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
diskCandidates = map[string]*Disk{}
multiError = util.NewMultiError()
for _, node := range nodes {
diskCandidates, errors := rcs.filterNodeDisksForReplica(node, nodeDisksMap[node.Name], replicas, volume, requireSchedulingCheck)
if len(diskCandidates) > 0 {
return diskCandidates, nil
diskCandidatesFromNode, errors := rcs.filterNodeDisksForReplica(node, nodeDisksMap[node.Name], replicas,
volume, requireSchedulingCheck)
diskCandidatesFromNode = filterDisksWithMatchingReplicas(diskCandidatesFromNode, replicas, diskSoftAntiAffinity)
for k, v := range diskCandidatesFromNode {
diskCandidates[k] = v
}
multiError.Append(errors)
}
return map[string]*Disk{}, multiError
return diskCandidates, multiError
}

usedNodes := map[string]*longhorn.Node{}
Expand Down Expand Up @@ -358,6 +360,41 @@ func (rcs *ReplicaScheduler) filterNodeDisksForReplica(node *longhorn.Node, disk
return preferredDisks, multiError
}

// filterDisksWithMatchingReplicas filters the input disks map and returns only the disks that have the fewest matching
// replicas. If allowDuplicates is false, it only returns disks that have no matching replicas.
func filterDisksWithMatchingReplicas(disks map[string]*Disk, replicas map[string]*longhorn.Replica,
allowMatches bool) map[string]*Disk {
replicasCountPerDisk := map[string]int{}
for _, r := range replicas {
replicasCountPerDisk[r.Spec.DiskID]++
}

highestReplicaCount := 0
disksByReplicaCount := map[int]map[string]*Disk{}
for diskUUID, disk := range disks {
count := replicasCountPerDisk[diskUUID]
if disksByReplicaCount[count] == nil {
disksByReplicaCount[count] = map[string]*Disk{}
}
disksByReplicaCount[count][diskUUID] = disk
if count > highestReplicaCount {
highestReplicaCount = count
}
}

if len(disksByReplicaCount[0]) > 0 || !allowMatches {
return disksByReplicaCount[0]
}

for i := 1; i <= highestReplicaCount; i++ {
if len(disksByReplicaCount[i]) > 0 {
return disksByReplicaCount[i]
}
}

return map[string]*Disk{}
}

func (rcs *ReplicaScheduler) getNodeInfo() (map[string]*longhorn.Node, error) {
nodeInfo, err := rcs.ds.ListNodes()
if err != nil {
Expand Down
Loading

0 comments on commit 48ddbfe

Please sign in to comment.