Skip to content

Commit

Permalink
Refine scheduling behavior with failed replicas
Browse files Browse the repository at this point in the history
Consider a node with a failed replica as used if the failed replica is
potentially reusable and replica-replenishment-wait-interval hasn't expired.

Longhorn 8043

Signed-off-by: Eric Weber <eric.weber@suse.com>
  • Loading branch information
ejweber committed Mar 1, 2024
1 parent da244ec commit bb06506
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 60 deletions.
120 changes: 79 additions & 41 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

type ReplicaScheduler struct {
ds *datastore.DataStore

// Required for unit testing.
nowHandler func() time.Time
}

type Disk struct {
Expand All @@ -40,6 +43,9 @@ type DiskSchedulingInfo struct {
func NewReplicaScheduler(ds *datastore.DataStore) *ReplicaScheduler {
rcScheduler := &ReplicaScheduler{
ds: ds,

// Required for unit testing.
nowHandler: time.Now,
}
return rcScheduler
}
Expand Down Expand Up @@ -201,6 +207,13 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
diskSoftAntiAffinity = volume.Spec.ReplicaDiskSoftAntiAffinity == longhorn.ReplicaDiskSoftAntiAffinityEnabled
}

timeToReplacementReplica, _, err := rcs.timeToReplacementReplica(volume)
if err != nil {
err = errors.Wrap(err, "failed to get time until replica replacement")
multiError.Append(util.NewMultiError(err.Error()))
return map[string]*Disk{}, multiError
}

getDiskCandidatesFromNodes := func(nodes map[string]*longhorn.Node) (diskCandidates map[string]*Disk, multiError util.MultiError) {
diskCandidates = map[string]*Disk{}
multiError = util.NewMultiError()
Expand All @@ -217,7 +230,7 @@ func (rcs *ReplicaScheduler) getDiskCandidates(nodeInfo map[string]*longhorn.Nod
}

usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(replicas, nodeInfo,
ignoreFailedReplicas)
ignoreFailedReplicas, timeToReplacementReplica == 0)

allowEmptyNodeSelectorVolume, err := rcs.ds.GetSettingAsBool(types.SettingNameAllowEmptyNodeSelectorVolume)
if err != nil {
Expand Down Expand Up @@ -578,7 +591,7 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep

hasPotentiallyReusableReplica := false
for _, r := range replicas {
if IsPotentiallyReusableReplica(r, hardNodeAffinity) {
if IsPotentiallyReusableReplica(r) {
hasPotentiallyReusableReplica = true
break
}
Expand All @@ -587,42 +600,23 @@ func (rcs *ReplicaScheduler) RequireNewReplica(replicas map[string]*longhorn.Rep
return 0
}

// Otherwise Longhorn will relay the new replica creation then there is a chance to reuse failed replicas later.
settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
logrus.Errorf("Failed to get Setting ReplicaReplenishmentWaitInterval, will directly replenish a new replica: %v", err)
return 0
}
waitInterval := time.Duration(settingValue) * time.Second
lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)

timeUntilNext, timeOfNext, err := rcs.timeToReplacementReplica(volume)
if err != nil {
logrus.Errorf("Failed to get parse volume last degraded timestamp %v, will directly replenish a new replica: %v", volume.Status.LastDegradedAt, err)
return 0
msg := "Failed to get time until replica replacement, will directly replenish a new replica"
logrus.WithError(err).Errorf(msg)
}
now := time.Now()
if now.After(lastDegradedAt.Add(waitInterval)) {
return 0
if timeUntilNext > 0 {
logrus.Infof("Replica replenishment is delayed until %v", timeOfNext)
}

logrus.Infof("Replica replenishment is delayed until %v", lastDegradedAt.Add(waitInterval))
// Adding 1 more second to the check back interval to avoid clock skew
return lastDegradedAt.Add(waitInterval).Sub(now) + time.Second
return timeUntilNext
}

func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *longhorn.Volume, nodeInfo map[string]*longhorn.Node, hardNodeAffinity string) (bool, error) {
if r.Spec.FailedAt == "" {
return false, nil
}
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
return false, nil
}
if r.Spec.RebuildRetryCount >= FailedReplicaMaxRetryCount {
return false, nil
}
if r.Spec.EvictionRequested {
// All failedReusableReplicas are also potentiallyFailedReusableReplicas.
if !IsPotentiallyReusableReplica(r) {
return false, nil
}

if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false, nil
}
Expand Down Expand Up @@ -693,9 +687,9 @@ func (rcs *ReplicaScheduler) isFailedReplicaReusable(r *longhorn.Replica, v *lon
return true, nil
}

// IsPotentiallyReusableReplica is used to check if a failed replica is potentially reusable.
// A potentially reusable replica means this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string) bool {
// IsPotentiallyReusableReplica checks if a failed replica is potentially reusable. A potentially reusable replica means
// this failed replica may be able to reuse it later but it’s not valid now due to node/disk down issue.
func IsPotentiallyReusableReplica(r *longhorn.Replica) bool {
if r.Spec.FailedAt == "" {
return false
}
Expand All @@ -708,9 +702,6 @@ func IsPotentiallyReusableReplica(r *longhorn.Replica, hardNodeAffinity string)
if r.Spec.EvictionRequested {
return false
}
if hardNodeAffinity != "" && r.Spec.NodeID != hardNodeAffinity {
return false
}
// TODO: Reuse failed replicas for a SPDK volume
if datastore.IsDataEngineV2(r.Spec.DataEngine) {
return false
Expand Down Expand Up @@ -865,10 +856,14 @@ func findDiskSpecAndDiskStatusInNode(diskUUID string, node *longhorn.Node) (long
return longhorn.DiskSpec{}, longhorn.DiskStatus{}, false
}

// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to. Some callers do not consider a
// node or zone to be used if it contains a failed replica. ignoreFailedReplicas == true supports this use case.
// getCurrentNodesAndZones returns the nodes and zones a replica is already scheduled to.
// - Some callers do not consider a node or zone to be used if it contains a failed replica.
// ignoreFailedReplicas == true supports this use case.
// - Otherwise, getCurrentNodesAndZones does not consider a node or zone to be occupied by a failed replica that can
// no longer be used or is likely actively being replaced. This makes nodes and zones with useless replicas
// available for scheduling.
func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map[string]*longhorn.Node,
ignoreFailedReplicas bool) (map[string]*longhorn.Node,
ignoreFailedReplicas, creatingNewReplicasForReplenishment bool) (map[string]*longhorn.Node,
map[string]bool, map[string]bool, map[string]bool) {
usedNodes := map[string]*longhorn.Node{}
usedZones := map[string]bool{}
Expand All @@ -882,8 +877,16 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map
if r.DeletionTimestamp != nil {
continue
}
if r.Spec.FailedAt != "" && ignoreFailedReplicas {
continue
if r.Spec.FailedAt != "" {
if ignoreFailedReplicas {
continue
}
if !IsPotentiallyReusableReplica(r) {
continue // This replica can never be used again, so it does not count in scheduling decisions.
}
if creatingNewReplicasForReplenishment {
continue // Maybe this replica can be used again, but it is being actively replaced anyway.
}
}

if node, ok := nodeInfo[r.Spec.NodeID]; ok {
Expand Down Expand Up @@ -912,3 +915,38 @@ func getCurrentNodesAndZones(replicas map[string]*longhorn.Replica, nodeInfo map

return usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones
}

// timeToReplacementReplica returns the amount of time until Longhorn should create a new replica for a degraded volume,
// even if there are potentially reusable failed replicas. It returns:
// - -time.Duration if there is no need for a replacement replica,
// - 0 if a replacement replica is needed right now (replica-replenishment-wait-interval has elapsed),
// - +time.Duration if a replacement replica will be needed (replica-replenishment-wait-interval has not elapsed).
func (rcs *ReplicaScheduler) timeToReplacementReplica(volume *longhorn.Volume) (time.Duration, time.Time, error) {
if volume.Status.Robustness != longhorn.VolumeRobustnessDegraded {
// No replacement replica is needed.
return -1, time.Time{}, nil
}

settingValue, err := rcs.ds.GetSettingAsInt(types.SettingNameReplicaReplenishmentWaitInterval)
if err != nil {
err = errors.Wrapf(err, "failed to get setting ReplicaReplenishmentWaitInterval")
return 0, time.Time{}, err
}
waitInterval := time.Duration(settingValue) * time.Second

lastDegradedAt, err := util.ParseTime(volume.Status.LastDegradedAt)
if err != nil {
err = errors.Wrapf(err, "failed to parse last degraded timestamp %v", volume.Status.LastDegradedAt)
return 0, time.Time{}, err
}

now := rcs.nowHandler()
if now.After(lastDegradedAt.Add(waitInterval)) {
// A replacement replica is needed now.
return 0, time.Time{}, nil
}

timeOfNext := lastDegradedAt.Add(waitInterval)
// Adding 1 more second to the check back interval to avoid clock skew
return timeOfNext.Sub(now) + time.Second, timeOfNext, nil
}
90 changes: 71 additions & 19 deletions scheduler/replica_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -54,7 +55,8 @@ const (
TestZone1 = "test-zone-1"
TestZone2 = "test-zone-2"

TestTimeNow = "2015-01-02T00:00:00Z"
TestTimeNow = "2015-01-02T00:00:00Z"
TestTimeOneMinuteAgo = "2015-01-01T23:59:00Z"
)

var longhornFinalizerKey = longhorn.SchemeGroupVersion.Group
Expand All @@ -65,7 +67,9 @@ func newReplicaScheduler(lhClient *lhfake.Clientset, kubeClient *fake.Clientset,

ds := datastore.NewDataStore(TestNamespace, lhClient, kubeClient, extensionsClient, informerFactories)

return NewReplicaScheduler(ds)
rcs := NewReplicaScheduler(ds)
rcs.nowHandler = getTestNow
return rcs
}

func newDaemonPod(phase corev1.PodPhase, name, namespace, nodeID, podIP string) *corev1.Pod {
Expand Down Expand Up @@ -242,6 +246,7 @@ type ReplicaSchedulerTestCase struct {
replicaNodeSoftAntiAffinity string
replicaZoneSoftAntiAffinity string
replicaDiskSoftAntiAffinity string
ReplicaReplenishmentWaitInterval string

// some test cases only try to schedule a subset of a volume's replicas
allReplicas map[string]*longhorn.Replica
Expand Down Expand Up @@ -1099,24 +1104,36 @@ func (s *TestSuite) TestReplicaScheduler(c *C) {
tc.replicaZoneSoftAntiAffinity = "false" // Do not allow replicas to schedule to the same zone.
testCases["fail scheduling when doing so would reuse an invalid evicting node"] = tc

// Test fail to schedule to node with failed replica
// If replicaNodeSoftAntiAffinity == false, this shouldn't be possible.
tc = generateFailedReplicaTestCase("false")
// Test potentially reusable replica before interval expires
// We should fail to schedule a new replica to this node until the interval expires.
tc = generateFailedReplicaTestCase(true, false)
tc.err = false
tc.firstNilReplica = 0
testCases["fail to schedule to node with failed replica"] = tc
testCases["potentially reusable replica before interval expires"] = tc

// Test succeed to schedule to node with failed replica
// If replicaNodeSoftAntiAffinity == true, this should be possible.
tc = generateFailedReplicaTestCase("true")
// Test potentially reusable replica after interval expires
// We should succeed to schedule a new replica to this node because the interval expired.
tc = generateFailedReplicaTestCase(true, true)
tc.err = false
tc.firstNilReplica = -1
testCases["succeed to schedule to node with failed replica"] = tc
testCases["potentially reusable replica after interval expires"] = tc

testCasesActual := map[string]*ReplicaSchedulerTestCase{}
testCasesActual["fail to schedule to node with failed replica"] = testCases["fail to schedule to node with failed replica"]
testCasesActual["succeed to schedule to node with failed replica"] = testCases["succeed to schedule to node with failed replica"]
for name, tc := range testCasesActual {
// Test non-reusable replica before interval expires
// We should succeed to schedule a new replica to this node because the existing replica is not reusable.
tc = generateFailedReplicaTestCase(false, false)
tc.err = false
tc.firstNilReplica = -1
testCases["non-reusable replica before interval expires"] = tc

// Test non-reusable replica after interval expires
// We should succeed to schedule a new replica to this node because the existing replica is not reusable and the
// interval expired anyway.
tc = generateFailedReplicaTestCase(false, true)
tc.err = false
tc.firstNilReplica = -1
testCases["non-reusable replica after interval expires"] = tc

for name, tc := range testCases {
fmt.Printf("testing %v\n", name)

kubeClient := fake.NewSimpleClientset()
Expand Down Expand Up @@ -1207,11 +1224,11 @@ func (s *TestSuite) TestReplicaScheduler(c *C) {
}
}

func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *ReplicaSchedulerTestCase) {
// generateFailedReplicaTestCase helps generate test cases in which a node contains a failed replica and the scheduler
// must decide whether to allow additional replicas to schedule to it.
func generateFailedReplicaTestCase(
replicaReusable, waitIntervalExpired bool) (tc *ReplicaSchedulerTestCase) {
tc = generateSchedulerTestCase()
tc.replicaNodeSoftAntiAffinity = replicaNodeSoftAntiAffinity
tc.replicaDiskSoftAntiAffinity = "true" // Do not hinder replica scheduling except for node.
tc.replicaZoneSoftAntiAffinity = "true" // Do not hinder replica scheduling except for node.
daemon1 := newDaemonPod(corev1.PodRunning, TestDaemon1, TestNamespace, TestNode1, TestIP1)
tc.daemons = []*corev1.Pod{
daemon1,
Expand All @@ -1223,6 +1240,10 @@ func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *Repl
getDiskID(TestNode1, "1"): disk,
}

// We are specifically interested in situations in which a replica is ONLY schedulable to a node because its
// existing replica is failed.
tc.replicaNodeSoftAntiAffinity = "false"

// A failed replica is already scheduled.
var alreadyScheduledReplica *longhorn.Replica
for _, replica := range tc.allReplicas {
Expand All @@ -1231,7 +1252,22 @@ func generateFailedReplicaTestCase(replicaNodeSoftAntiAffinity string) (tc *Repl
}
delete(tc.replicasToSchedule, alreadyScheduledReplica.Name)
alreadyScheduledReplica.Spec.NodeID = TestNode1
alreadyScheduledReplica.Spec.DiskID = getDiskID(TestNode1, "1")
alreadyScheduledReplica.Spec.FailedAt = TestTimeNow
tc.volume.Status.Robustness = longhorn.VolumeRobustnessDegraded
tc.volume.Status.LastDegradedAt = TestTimeOneMinuteAgo

if replicaReusable {
alreadyScheduledReplica.Spec.RebuildRetryCount = 0
} else {
alreadyScheduledReplica.Spec.RebuildRetryCount = 5
}

if waitIntervalExpired {
tc.ReplicaReplenishmentWaitInterval = "30"
} else {
tc.ReplicaReplenishmentWaitInterval = "90"
}

node1.Status.DiskStatus = map[string]*longhorn.DiskStatus{
getDiskID(TestNode1, "1"): {
Expand Down Expand Up @@ -1301,6 +1337,17 @@ func setSettings(tc *ReplicaSchedulerTestCase, lhClient *lhfake.Clientset, sInde
err = sIndexer.Add(setting)
c.Assert(err, IsNil)
}
// Set replica replenishment wait interval setting
if tc.ReplicaReplenishmentWaitInterval != "" {
s := initSettings(
string(types.SettingNameReplicaReplenishmentWaitInterval),
tc.ReplicaReplenishmentWaitInterval)
setting, err :=
lhClient.LonghornV1beta2().Settings(TestNamespace).Create(context.TODO(), s, metav1.CreateOptions{})
c.Assert(err, IsNil)
err = sIndexer.Add(setting)
c.Assert(err, IsNil)
}
}

func (s *TestSuite) TestFilterDisksWithMatchingReplicas(c *C) {
Expand Down Expand Up @@ -1522,10 +1569,15 @@ func (s *TestSuite) TestGetCurrentNodesAndZones(c *C) {
for name, tc := range testCases {
fmt.Printf("testing %v\n", name)
usedNodes, usedZones, onlyEvictingNodes, onlyEvictingZones := getCurrentNodesAndZones(tc.replicas, tc.nodeInfo,
false)
false, false)
verifyNodeNames(tc.expectUsedNodeNames, usedNodes)
verifyZoneNames(tc.expectUsedZoneNames, usedZones)
verifyOnlyEvictingNodeNames(tc.expectOnlyEvictingNodeNames, onlyEvictingNodes)
verifyOnlyEvictingZoneNames(tc.expectOnlyEvictingZoneNames, onlyEvictingZones)
}
}

func getTestNow() time.Time {
now, _ := time.Parse(time.RFC3339, TestTimeNow)
return now
}

0 comments on commit bb06506

Please sign in to comment.