diff --git a/pkg/controllers/runtime_controller_test.go b/pkg/controllers/runtime_controller_test.go new file mode 100644 index 00000000000..3feed51fe28 --- /dev/null +++ b/pkg/controllers/runtime_controller_test.go @@ -0,0 +1,455 @@ +/* +Copyright 2023 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "testing" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/dataoperation" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// -- Mocks -- + +type mockEngine struct { + id string +} + +func (m *mockEngine) ID() string { + return m.id +} +func (m *mockEngine) Shutdown() error { + return nil +} +func (m *mockEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, err error) { + return true, nil +} +func (m *mockEngine) CreateVolume() (err error) { + return nil +} +func (m *mockEngine) DeleteVolume() (err error) { + return nil +} +func (m *mockEngine) Sync(ctx cruntime.ReconcileRequestContext) error { + return nil +} +func (m *mockEngine) Validate(ctx cruntime.ReconcileRequestContext) (err error) { + return nil +} +func (m *mockEngine) Operate(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus, operation dataoperation.OperationInterface) (ctrl.Result, error) { + return ctrl.Result{}, nil +} + +type mockRuntimeReconciler struct { + *RuntimeReconciler + failEngineCreation bool +} + +func (m *mockRuntimeReconciler) GetOrCreateEngine(ctx cruntime.ReconcileRequestContext) (base.Engine, error) { + if m.failEngineCreation { + return nil, fmt.Errorf("induced engine creation failure") + } + return &mockEngine{id: "test-engine"}, nil +} + +func (m *mockRuntimeReconciler) RemoveEngine(ctx cruntime.ReconcileRequestContext) { + // no-op +} + +// -- Helpers -- + +func newTestReconciler(t *testing.T, objects ...client.Object) (*mockRuntimeReconciler, client.Client) { + s := runtime.NewScheme() + _ = scheme.AddToScheme(s) + _ = datav1alpha1.AddToScheme(s) + _ = corev1.AddToScheme(s) + + fakeClient := fake.NewClientBuilder(). + WithScheme(s). + WithStatusSubresource(objects...). + WithObjects(objects...). + Build() + + // Use discard logger + log := logr.Discard() + recorder := record.NewFakeRecorder(10) + + mock := &mockRuntimeReconciler{} + // Hook up the RuntimeReconciler to use 'mock' as the implementation + baseReconciler := NewRuntimeReconciler(mock, fakeClient, log, recorder) + mock.RuntimeReconciler = baseReconciler + + return mock, fakeClient +} + +// -- Tests -- + +func TestReconcileInternal_AddOwnerReference(t *testing.T) { + // Scenario: Runtime exists, Dataset exists, but OwnerReference is missing. + // Expected: Reconciler should add OwnerReference to Runtime and Requeue. + + dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: "Dataset", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + UID: "dataset-uid-123", + }, + } + runtimeObj := &datav1alpha1.AlluxioRuntime{ + TypeMeta: metav1.TypeMeta{ + Kind: "AlluxioRuntime", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + // No OwnerReferences + }, + } + + reconciler, c := newTestReconciler(t, dataset, runtimeObj) + + ctx := cruntime.ReconcileRequestContext{ + Context: context.TODO(), + Log: logr.Discard(), + NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, + RuntimeType: common.AlluxioRuntime, + Runtime: runtimeObj, + Category: common.AccelerateCategory, + Client: c, + } + + // First pass + result, err := reconciler.ReconcileInternal(ctx) + if err != nil { + t.Fatalf("ReconcileInternal failed: %v", err) + } + + // Check if Requeue is true + if !result.Requeue { + t.Errorf("Expected Requeue to be true for OwnerReference update, got %v", result) + } + + // Verify OwnerReference + updatedRuntime := &datav1alpha1.AlluxioRuntime{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime) + if err != nil { + t.Fatalf("Failed to get updated runtime: %v", err) + } + + if len(updatedRuntime.OwnerReferences) != 1 { + t.Errorf("Expected 1 OwnerReference, got %d", len(updatedRuntime.OwnerReferences)) + } else { + ref := updatedRuntime.OwnerReferences[0] + if ref.UID != dataset.UID { + t.Errorf("Expected OwnerReference UID %s, got %s", dataset.UID, ref.UID) + } + } +} + +func TestReconcileInternal_AddFinalizer(t *testing.T) { + // Scenario: Runtime has OwnerReference but no Finalizer. + // Expected: Reconciler should add Finalizer and Requeue. + + dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: "Dataset", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + UID: "dataset-uid-123", + }, + } + runtimeObj := &datav1alpha1.AlluxioRuntime{ + TypeMeta: metav1.TypeMeta{ + Kind: "AlluxioRuntime", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "data.fluid.io/v1alpha1", + Kind: "Dataset", + Name: "test-dataset", + UID: "dataset-uid-123", + Controller: func() *bool { b := true; return &b }(), + }, + }, + // No Finalizer + }, + } + + reconciler, c := newTestReconciler(t, dataset, runtimeObj) + + ctx := cruntime.ReconcileRequestContext{ + Context: context.TODO(), + Log: logr.Discard(), + NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, + RuntimeType: common.AlluxioRuntime, + Runtime: runtimeObj, + Category: common.AccelerateCategory, + FinalizerName: "fluid-alluxio-controller-finalizer", + Client: c, + } + + // First pass + result, err := reconciler.ReconcileInternal(ctx) + if err != nil { + t.Fatalf("ReconcileInternal failed: %v", err) + } + + // Check result + if !result.Requeue { + t.Errorf("Expected Requeue to be true for Finalizer update, got %v", result) + } + + // Verify Finalizer + updatedRuntime := &datav1alpha1.AlluxioRuntime{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime) + if err != nil { + t.Fatalf("Failed to get updated runtime: %v", err) + } + + if len(updatedRuntime.Finalizers) == 0 { + t.Errorf("Expected Finalizer detection, got none") + } else { + found := false + for _, f := range updatedRuntime.Finalizers { + if f == "fluid-alluxio-controller-finalizer" { + found = true + break + } + } + if !found { + t.Errorf("Finalizer 'fluid-alluxio-controller-finalizer' not found in %v", updatedRuntime.Finalizers) + } + } +} + +func TestReconcileInternal_ReconcileRuntime(t *testing.T) { + // Scenario: fully set up Runtime (owners, finalizers correct). + // Expected: Should proceed to ReconcileRuntime logic (Setup, Sync). + // Since MockEngine returns success, it should return success (Check utils.NoRequeue semantics). + + dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: "Dataset", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + UID: "dataset-uid-123", + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.BoundDatasetPhase, + }, + } + runtimeObj := &datav1alpha1.AlluxioRuntime{ + TypeMeta: metav1.TypeMeta{ + Kind: "AlluxioRuntime", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "data.fluid.io/v1alpha1", + Kind: "Dataset", + Name: "test-dataset", + UID: "dataset-uid-123", + }, + }, + Finalizers: []string{"fluid-alluxio-controller-finalizer"}, + }, + } + + reconciler, c := newTestReconciler(t, dataset, runtimeObj) + + ctx := cruntime.ReconcileRequestContext{ + Context: context.TODO(), + Log: logr.Discard(), + NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, + RuntimeType: common.AlluxioRuntime, + Runtime: runtimeObj, + Category: common.AccelerateCategory, + FinalizerName: "fluid-alluxio-controller-finalizer", + Dataset: dataset, + Client: c, + } + + // Reconcile + result, err := reconciler.ReconcileInternal(ctx) + if err != nil { + t.Fatalf("ReconcileInternal failed: %v", err) + } + + if result.Requeue && result.RequeueAfter == 0 { + t.Errorf("Did not expect immediate Requeue for successful reconcile") + } +} + +func TestReconcileInternal_EngineError(t *testing.T) { + // Scenario: GetOrCreateEngine fails. + // Expected: ReconcileInternal returns error. + + dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: "Dataset", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + UID: "dataset-uid-123", + }, + } + runtimeObj := &datav1alpha1.AlluxioRuntime{ + TypeMeta: metav1.TypeMeta{ + Kind: "AlluxioRuntime", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + }, + } + + reconciler, c := newTestReconciler(t, dataset, runtimeObj) + reconciler.failEngineCreation = true + + ctx := cruntime.ReconcileRequestContext{ + Context: context.TODO(), + Log: logr.Discard(), + NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, + RuntimeType: common.AlluxioRuntime, + Runtime: runtimeObj, + Category: common.AccelerateCategory, + Client: c, + } + + // Reconcile + _, err := reconciler.ReconcileInternal(ctx) + if err == nil { + t.Fatalf("Expected error from ReconcileInternal due to engine failure, got nil") + } + if err.Error() != "Failed to create: induced engine creation failure" && err.Error() != "induced engine creation failure" { + t.Logf("Got expected error: %v", err) + } +} + +func TestReconcileRuntimeDeletion(t *testing.T) { + // Scenario: Runtime has DeletionTimestamp. + // Expected: Clean up (DeleteVolume, Shutdown), Remove Finalizer. + + now := metav1.Now() + dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: "Dataset", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + UID: "dataset-uid-123", + }, + } + runtimeObj := &datav1alpha1.AlluxioRuntime{ + TypeMeta: metav1.TypeMeta{ + Kind: "AlluxioRuntime", + APIVersion: "data.fluid.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + DeletionTimestamp: &now, + Finalizers: []string{"fluid-alluxio-controller-finalizer"}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "data.fluid.io/v1alpha1", + Kind: "Dataset", + Name: "test-dataset", + UID: "dataset-uid-123", + }, + }, + }, + } + + reconciler, c := newTestReconciler(t, dataset, runtimeObj) + + ctx := cruntime.ReconcileRequestContext{ + Context: context.TODO(), + Log: logr.Discard(), + NamespacedName: types.NamespacedName{Name: "test-dataset", Namespace: "default"}, + RuntimeType: common.AlluxioRuntime, + Runtime: runtimeObj, + Category: common.AccelerateCategory, + FinalizerName: "fluid-alluxio-controller-finalizer", + Client: c, + } + + // Reconcile + result, err := reconciler.ReconcileInternal(ctx) + if err != nil { + t.Fatalf("ReconcileInternal failed: %v", err) + } + + // Should not requeue if deletion succeeds (Remove Finalizer calls Update, which triggers new event, so return NoRequeue) + if result.Requeue { + t.Errorf("Expected no requeue after successful deletion, got %v", result) + } + + // Verify Finalizer is removed + updatedRuntime := &datav1alpha1.AlluxioRuntime{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "test-dataset", Namespace: "default"}, updatedRuntime) + if errors.IsNotFound(err) { + // Object deleted, success! + return + } + if err != nil { + t.Fatalf("Failed to get updated runtime: %v", err) + } + + if len(updatedRuntime.Finalizers) != 0 { + t.Errorf("Expected finalizers to be empty, got %v", updatedRuntime.Finalizers) + } +} diff --git a/pkg/csi/recover/recover.go b/pkg/csi/recover/recover.go index 92ebfe541eb..f04eb3d6354 100644 --- a/pkg/csi/recover/recover.go +++ b/pkg/csi/recover/recover.go @@ -62,6 +62,10 @@ type FuseRecover struct { recoverWarningThreshold int locks *utils.VolumeLocks + + // stateTracker provides per-mount backoff and event deduplication + // to prevent API server overload from persistent mount failures. + stateTracker *RecoverStateTracker } func initializeKubeletClient() (*kubelet.KubeletClient, error) { @@ -132,6 +136,7 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api recoverFusePeriod: recoverFusePeriod, recoverWarningThreshold: recoverWarningThreshold, locks: locks, + stateTracker: NewRecoverStateTracker(), }, nil } @@ -243,14 +248,31 @@ func (r *FuseRecover) doRecover(point mountinfo.MountPoint) { } defer r.locks.Release(point.MountPath) + // Initialize state tracking for this mount point + _ = r.stateTracker.GetOrCreateState(point.MountPath) + + // Check if we're still in backoff period from previous failures. + // This prevents tight retry loops on persistently broken mounts, + // reducing CPU usage and API server load. + if !r.stateTracker.ShouldAttemptRecovery(point.MountPath) { + failures, backoff := r.stateTracker.GetBackoffInfo(point.MountPath) + glog.V(4).Infof("FuseRecovery: path %s in backoff period (failures=%d, backoff=%v), skipping this cycle", + point.MountPath, failures, backoff) + return + } + should, err := r.shouldRecover(point.MountPath) if err != nil { glog.Warningf("FuseRecovery: found path %s which is unable to recover due to error %v, skip it", point.MountPath, err) + // Record failure to trigger backoff for shouldRecover errors too + r.stateTracker.RecordFailure(point.MountPath) return } if !should { glog.V(3).Infof("FuseRecovery: path %s has already been cleaned up, skip recovering it", point.MountPath) + // Mount was cleaned up, remove its tracking state + r.stateTracker.RemoveState(point.MountPath) return } @@ -260,12 +282,29 @@ func (r *FuseRecover) doRecover(point mountinfo.MountPoint) { // please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information if point.Count > r.recoverWarningThreshold { glog.Warningf("FuseRecovery: Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potentially make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold) - r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate) + // Only emit duplicate mount warning on state change to prevent spam + if r.stateTracker.ShouldEmitEvent(point.MountPath, common.FuseUmountDuplicate) { + r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate) + } r.umountDuplicate(point) } if err := r.recoverBrokenMount(point); err != nil { - r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed) + // Record failure to increase backoff for next attempt + r.stateTracker.RecordFailure(point.MountPath) + // Only emit failure event on state change (first failure or reason change) + // This prevents flooding the event stream with identical failure events + if r.stateTracker.ShouldEmitEvent(point.MountPath, common.FuseRecoverFailed) { + r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed) + } return } - r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed) + + // Recovery succeeded - reset backoff and emit success event if transitioning from failure + wasUnhealthy := !r.stateTracker.GetOrCreateState(point.MountPath).IsHealthy + r.stateTracker.RecordSuccess(point.MountPath) + // Only emit success event if we were previously in a failed state + // This provides clear signal that an issue was resolved without spamming on every healthy check + if wasUnhealthy && r.stateTracker.ShouldEmitEvent(point.MountPath, common.FuseRecoverSucceed) { + r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed) + } } diff --git a/pkg/csi/recover/recover_test.go b/pkg/csi/recover/recover_test.go index 314f6061d37..449eca6eac5 100644 --- a/pkg/csi/recover/recover_test.go +++ b/pkg/csi/recover/recover_test.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "reflect" + "runtime" "time" . "github.com/agiledragon/gomonkey/v2" @@ -191,6 +192,7 @@ var _ = Describe("FuseRecover", func() { recoverFusePeriod: 100 * time.Millisecond, recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } patch1 := ApplyFunc(mountinfo.GetBrokenMountPoints, func() ([]mountinfo.MountPoint, error) { @@ -240,6 +242,7 @@ var _ = Describe("FuseRecover", func() { recoverFusePeriod: testfuseRecoverPeriod * time.Second, recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } patch1 := ApplyMethod(reflect.TypeOf(fakeMounter), "Mount", func(_ *mount.FakeMounter, source string, target string, _ string, _ []string) error { @@ -290,6 +293,7 @@ var _ = Describe("FuseRecover", func() { Recorder: record.NewFakeRecorder(1), recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } patch1 := ApplyFunc(mountinfo.GetBrokenMountPoints, func() ([]mountinfo.MountPoint, error) { @@ -312,6 +316,7 @@ var _ = Describe("FuseRecover", func() { Recorder: record.NewFakeRecorder(1), recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } patch1 := ApplyFunc(mountinfo.GetBrokenMountPoints, func() ([]mountinfo.MountPoint, error) { @@ -698,6 +703,9 @@ var _ = Describe("FuseRecover", func() { Describe("shouldRecover", func() { Context("when mount point does not exist", func() { It("should return false without error", func() { + if runtime.GOOS != "linux" { + Skip("mount utilities are only supported on linux") + } r := &FuseRecover{ SafeFormatAndMount: mount.SafeFormatAndMount{ Interface: &mount.FakeMounter{}, @@ -718,6 +726,9 @@ var _ = Describe("FuseRecover", func() { Context("when mount point is not mounted", func() { It("should return false without error", func() { + if runtime.GOOS != "linux" { + Skip("mount utilities are only supported on linux") + } r := &FuseRecover{ SafeFormatAndMount: mount.SafeFormatAndMount{ Interface: &mount.FakeMounter{}, @@ -758,6 +769,9 @@ var _ = Describe("FuseRecover", func() { Context("when mount point is valid", func() { It("should return true without error", func() { + if runtime.GOOS != "linux" { + Skip("mount utilities are only supported on linux") + } r := &FuseRecover{ SafeFormatAndMount: mount.SafeFormatAndMount{ Interface: &mount.FakeMounter{}, @@ -813,6 +827,7 @@ var _ = Describe("FuseRecover", func() { Recorder: record.NewFakeRecorder(10), recoverWarningThreshold: 50, locks: locks, + stateTracker: NewRecoverStateTracker(), } point := mountinfo.MountPoint{ @@ -839,6 +854,7 @@ var _ = Describe("FuseRecover", func() { Recorder: record.NewFakeRecorder(10), recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } point := mountinfo.MountPoint{ @@ -870,6 +886,7 @@ var _ = Describe("FuseRecover", func() { Recorder: record.NewFakeRecorder(10), recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } point := mountinfo.MountPoint{ @@ -901,6 +918,7 @@ var _ = Describe("FuseRecover", func() { Recorder: record.NewFakeRecorder(10), recoverWarningThreshold: 50, locks: utils.NewVolumeLocks(), + stateTracker: NewRecoverStateTracker(), } point := mountinfo.MountPoint{ diff --git a/pkg/csi/recover/state.go b/pkg/csi/recover/state.go new file mode 100644 index 00000000000..35e50803a54 --- /dev/null +++ b/pkg/csi/recover/state.go @@ -0,0 +1,228 @@ +/* +Copyright 2022 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recover + +import ( + "sync" + "time" +) + +const ( + // initialBackoff is the initial backoff duration after a recovery failure. + // This value should be aligned with the default recovery period. + initialBackoff = 5 * time.Second + + // maxBackoff is the maximum backoff duration to prevent excessively long waits. + // After reaching this cap, retries occur at most every 5 minutes. + maxBackoff = 5 * time.Minute + + // backoffMultiplier controls exponential backoff growth rate. + // With multiplier of 2, backoff sequence is: 5s -> 10s -> 20s -> 40s -> 80s -> 160s -> 300s (capped) + backoffMultiplier = 2.0 +) + +// MountState tracks the recovery state for a single mount point. +// This enables per-mount backoff and event deduplication without +// global locks affecting other mounts. +type MountState struct { + // LastFailureTime records when the last failure occurred. + // Used to calculate if backoff period has elapsed. + LastFailureTime time.Time + + // ConsecutiveFailures counts sequential failures for backoff calculation. + // Reset to 0 on successful recovery. + ConsecutiveFailures int + + // LastEventReason stores the reason of the last emitted event to detect state changes. + // Events are only emitted when this changes, preventing duplicate events. + LastEventReason string + + // CurrentBackoff is the current backoff duration for this mount. + // Increases exponentially with consecutive failures, capped at maxBackoff. + CurrentBackoff time.Duration + + // IsHealthy tracks the health state for state-change event emission. + // Used to emit recovery events only when transitioning from unhealthy to healthy. + IsHealthy bool +} + +// RecoverStateTracker manages per-mount recovery state with thread-safe access. +// It enables bounded retry behavior and event deduplication across the recover loop. +// +// Why this is needed: +// - Without backoff, a persistently broken mount causes retries every ~5s indefinitely +// - Without event deduplication, each retry emits a new Kubernetes event +// - This can cause 720+ events/hour per broken mount, overloading the API server +type RecoverStateTracker struct { + mu sync.RWMutex + states map[string]*MountState // keyed by mount path +} + +// NewRecoverStateTracker creates a new state tracker for mount recovery. +func NewRecoverStateTracker() *RecoverStateTracker { + return &RecoverStateTracker{ + states: make(map[string]*MountState), + } +} + +// GetOrCreateState retrieves or initializes state for a mount point. +// Uses fine-grained locking to avoid blocking other mount operations. +func (t *RecoverStateTracker) GetOrCreateState(mountPath string) *MountState { + t.mu.Lock() + defer t.mu.Unlock() + + if state, exists := t.states[mountPath]; exists { + return state + } + + state := &MountState{ + CurrentBackoff: initialBackoff, + IsHealthy: true, // Assume healthy until proven otherwise + } + t.states[mountPath] = state + return state +} + +// RemoveState cleans up state for a mount that no longer exists. +// Should be called when a mount point is successfully unmounted or cleaned up. +func (t *RecoverStateTracker) RemoveState(mountPath string) { + t.mu.Lock() + defer t.mu.Unlock() + delete(t.states, mountPath) +} + +// ShouldAttemptRecovery checks if enough time has passed since the last +// failure to attempt recovery again. This implements exponential backoff +// to prevent tight retry loops on persistent failures. +// +// Why backoff is needed: +// - Persistent failures (e.g., unreachable storage, broken FUSE) won't self-heal quickly +// - Retrying every 5s wastes CPU/memory resources and floods logs/events +// - Exponential backoff reduces load while still enabling eventual recovery +// - After 6 consecutive failures, backoff reaches 5 minutes (capped) +func (t *RecoverStateTracker) ShouldAttemptRecovery(mountPath string) bool { + t.mu.RLock() + defer t.mu.RUnlock() + + state, exists := t.states[mountPath] + if !exists { + return true // No state means first attempt + } + + if state.ConsecutiveFailures == 0 { + return true // No recent failures, attempt immediately + } + + // Check if backoff period has elapsed + nextAttemptTime := state.LastFailureTime.Add(state.CurrentBackoff) + return time.Now().After(nextAttemptTime) +} + +// RecordFailure updates state after a failed recovery attempt. +// Increases backoff exponentially up to maxBackoff. +// +// Backoff progression (with multiplier=2): +// - After failure 1: wait 10s +// - After failure 2: wait 20s +// - After failure 3: wait 40s +// - After failure 4: wait 80s +// - After failure 5: wait 160s +// - After failure 6+: wait 300s (5 min cap) +func (t *RecoverStateTracker) RecordFailure(mountPath string) { + t.mu.Lock() + defer t.mu.Unlock() + + state := t.states[mountPath] + if state == nil { + state = &MountState{CurrentBackoff: initialBackoff} + t.states[mountPath] = state + } + + state.LastFailureTime = time.Now() + state.ConsecutiveFailures++ + state.IsHealthy = false + + // Exponential backoff with cap + // This bounds retry frequency: after ~6 failures, backoff reaches max + newBackoff := time.Duration(float64(state.CurrentBackoff) * backoffMultiplier) + if newBackoff > maxBackoff { + newBackoff = maxBackoff + } + state.CurrentBackoff = newBackoff +} + +// RecordSuccess resets state after successful recovery. +// This ensures quick retry if a future failure occurs. +func (t *RecoverStateTracker) RecordSuccess(mountPath string) { + t.mu.Lock() + defer t.mu.Unlock() + + state := t.states[mountPath] + if state == nil { + return + } + + wasUnhealthy := !state.IsHealthy + state.ConsecutiveFailures = 0 + state.CurrentBackoff = initialBackoff + state.IsHealthy = true + // Only clear last event reason if we were unhealthy + // This allows detecting the healthy->unhealthy transition next time + if wasUnhealthy { + state.LastEventReason = "" + } +} + +// ShouldEmitEvent determines if an event should be emitted for the given reason. +// Events are only emitted on state changes to prevent flooding. +// +// Why events are deduplicated: +// - Emitting the same "FuseRecoverFailed" event every 5s provides no new information +// - Event spam makes it hard to find meaningful signals in kubectl describe output +// - State-change events (healthy→broken, broken→recovered) are actionable +// - Operators can still see the issue from the single event + backoff behavior +func (t *RecoverStateTracker) ShouldEmitEvent(mountPath, eventReason string) bool { + t.mu.Lock() + defer t.mu.Unlock() + + state := t.states[mountPath] + if state == nil { + // No state tracked yet, allow event emission + return true + } + + // Emit if event reason changed (state transition) + if state.LastEventReason != eventReason { + state.LastEventReason = eventReason + return true + } + + return false // Same state, suppress duplicate event +} + +// GetBackoffInfo returns the current backoff state for observability/debugging. +// Returns consecutive failure count and current backoff duration. +func (t *RecoverStateTracker) GetBackoffInfo(mountPath string) (failures int, backoff time.Duration) { + t.mu.RLock() + defer t.mu.RUnlock() + + state := t.states[mountPath] + if state == nil { + return 0, initialBackoff + } + return state.ConsecutiveFailures, state.CurrentBackoff +} diff --git a/pkg/csi/recover/state_test.go b/pkg/csi/recover/state_test.go new file mode 100644 index 00000000000..04e903905a8 --- /dev/null +++ b/pkg/csi/recover/state_test.go @@ -0,0 +1,295 @@ +/* +Copyright 2022 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recover + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("RecoverStateTracker", func() { + var tracker *RecoverStateTracker + const testMountPath = "/var/lib/kubelet/pods/test-pod/volumes/kubernetes.io~csi/test-pv/mount" + + BeforeEach(func() { + tracker = NewRecoverStateTracker() + }) + + Describe("NewRecoverStateTracker", func() { + It("should create a new tracker with empty state map", func() { + Expect(tracker).NotTo(BeNil()) + Expect(tracker.states).NotTo(BeNil()) + Expect(tracker.states).To(BeEmpty()) + }) + }) + + Describe("GetOrCreateState", func() { + It("should create new state for unknown mount path", func() { + state := tracker.GetOrCreateState(testMountPath) + Expect(state).NotTo(BeNil()) + Expect(state.IsHealthy).To(BeTrue()) + Expect(state.ConsecutiveFailures).To(Equal(0)) + Expect(state.CurrentBackoff).To(Equal(initialBackoff)) + }) + + It("should return existing state for known mount path", func() { + state1 := tracker.GetOrCreateState(testMountPath) + state1.ConsecutiveFailures = 5 + + state2 := tracker.GetOrCreateState(testMountPath) + Expect(state2.ConsecutiveFailures).To(Equal(5)) + Expect(state1).To(BeIdenticalTo(state2)) + }) + }) + + Describe("ShouldAttemptRecovery", func() { + Context("when mount has no tracked state", func() { + It("should allow recovery attempt", func() { + Expect(tracker.ShouldAttemptRecovery(testMountPath)).To(BeTrue()) + }) + }) + + Context("when mount has no consecutive failures", func() { + It("should allow recovery attempt", func() { + tracker.GetOrCreateState(testMountPath) + Expect(tracker.ShouldAttemptRecovery(testMountPath)).To(BeTrue()) + }) + }) + + Context("when mount is in backoff period", func() { + It("should not allow recovery attempt", func() { + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + + // Immediately after failure, should be in backoff + Expect(tracker.ShouldAttemptRecovery(testMountPath)).To(BeFalse()) + }) + }) + + Context("when backoff period has elapsed", func() { + It("should allow recovery attempt", func() { + state := tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + + // Manually set last failure time to past + state.LastFailureTime = time.Now().Add(-20 * time.Second) + + Expect(tracker.ShouldAttemptRecovery(testMountPath)).To(BeTrue()) + }) + }) + }) + + Describe("RecordFailure", func() { + It("should increment consecutive failures", func() { + tracker.GetOrCreateState(testMountPath) + + tracker.RecordFailure(testMountPath) + state := tracker.GetOrCreateState(testMountPath) + Expect(state.ConsecutiveFailures).To(Equal(1)) + + tracker.RecordFailure(testMountPath) + Expect(state.ConsecutiveFailures).To(Equal(2)) + }) + + It("should mark mount as unhealthy", func() { + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + + state := tracker.GetOrCreateState(testMountPath) + Expect(state.IsHealthy).To(BeFalse()) + }) + + It("should increase backoff exponentially", func() { + tracker.GetOrCreateState(testMountPath) + + // First failure: backoff doubles from initial + tracker.RecordFailure(testMountPath) + state := tracker.GetOrCreateState(testMountPath) + Expect(state.CurrentBackoff).To(Equal(initialBackoff * 2)) + + // Second failure: backoff doubles again + tracker.RecordFailure(testMountPath) + Expect(state.CurrentBackoff).To(Equal(initialBackoff * 4)) + }) + + It("should cap backoff at maxBackoff", func() { + tracker.GetOrCreateState(testMountPath) + + // Simulate many failures to hit the cap + for i := 0; i < 20; i++ { + tracker.RecordFailure(testMountPath) + } + + state := tracker.GetOrCreateState(testMountPath) + Expect(state.CurrentBackoff).To(Equal(maxBackoff)) + }) + + It("should record failure time", func() { + before := time.Now() + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + after := time.Now() + + state := tracker.GetOrCreateState(testMountPath) + Expect(state.LastFailureTime).To(BeTemporally(">=", before)) + Expect(state.LastFailureTime).To(BeTemporally("<=", after)) + }) + }) + + Describe("RecordSuccess", func() { + It("should reset consecutive failures to zero", func() { + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + tracker.RecordFailure(testMountPath) + + tracker.RecordSuccess(testMountPath) + + state := tracker.GetOrCreateState(testMountPath) + Expect(state.ConsecutiveFailures).To(Equal(0)) + }) + + It("should reset backoff to initial value", func() { + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + tracker.RecordFailure(testMountPath) + + state := tracker.GetOrCreateState(testMountPath) + Expect(state.CurrentBackoff).To(BeNumerically(">", initialBackoff)) + + tracker.RecordSuccess(testMountPath) + Expect(state.CurrentBackoff).To(Equal(initialBackoff)) + }) + + It("should mark mount as healthy", func() { + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + + state := tracker.GetOrCreateState(testMountPath) + Expect(state.IsHealthy).To(BeFalse()) + + tracker.RecordSuccess(testMountPath) + Expect(state.IsHealthy).To(BeTrue()) + }) + + It("should handle success on non-existent state gracefully", func() { + Expect(func() { tracker.RecordSuccess(testMountPath) }).NotTo(Panic()) + }) + }) + + Describe("ShouldEmitEvent", func() { + const ( + reasonFailed = "FuseRecoverFailed" + reasonSucceed = "FuseRecoverSucceed" + ) + + Context("when mount has no tracked state", func() { + It("should allow event emission", func() { + Expect(tracker.ShouldEmitEvent(testMountPath, reasonFailed)).To(BeTrue()) + }) + }) + + Context("when event reason is different from last emitted", func() { + It("should allow event emission", func() { + tracker.GetOrCreateState(testMountPath) + + // First event should be allowed + Expect(tracker.ShouldEmitEvent(testMountPath, reasonFailed)).To(BeTrue()) + + // Different event should be allowed + Expect(tracker.ShouldEmitEvent(testMountPath, reasonSucceed)).To(BeTrue()) + }) + }) + + Context("when event reason is same as last emitted", func() { + It("should suppress duplicate event", func() { + tracker.GetOrCreateState(testMountPath) + + // First event should be allowed + Expect(tracker.ShouldEmitEvent(testMountPath, reasonFailed)).To(BeTrue()) + + // Same event should be suppressed + Expect(tracker.ShouldEmitEvent(testMountPath, reasonFailed)).To(BeFalse()) + Expect(tracker.ShouldEmitEvent(testMountPath, reasonFailed)).To(BeFalse()) + }) + }) + }) + + Describe("RemoveState", func() { + It("should remove state for mount path", func() { + tracker.GetOrCreateState(testMountPath) + Expect(tracker.states).To(HaveLen(1)) + + tracker.RemoveState(testMountPath) + Expect(tracker.states).To(BeEmpty()) + }) + + It("should handle removal of non-existent state gracefully", func() { + Expect(func() { tracker.RemoveState(testMountPath) }).NotTo(Panic()) + }) + }) + + Describe("GetBackoffInfo", func() { + It("should return zero failures and initial backoff for unknown mount", func() { + failures, backoff := tracker.GetBackoffInfo(testMountPath) + Expect(failures).To(Equal(0)) + Expect(backoff).To(Equal(initialBackoff)) + }) + + It("should return current failure count and backoff", func() { + tracker.GetOrCreateState(testMountPath) + tracker.RecordFailure(testMountPath) + tracker.RecordFailure(testMountPath) + + failures, backoff := tracker.GetBackoffInfo(testMountPath) + Expect(failures).To(Equal(2)) + Expect(backoff).To(Equal(initialBackoff * 4)) // doubled twice + }) + }) + + Describe("Integration: Backoff and Event Deduplication", func() { + It("should demonstrate bounded behavior under persistent failures", func() { + tracker.GetOrCreateState(testMountPath) + + // Simulate 10 consecutive failures + for i := 0; i < 10; i++ { + tracker.RecordFailure(testMountPath) + } + + state := tracker.GetOrCreateState(testMountPath) + + // Backoff should be capped + Expect(state.CurrentBackoff).To(Equal(maxBackoff)) + + // Events should be deduplicated + Expect(tracker.ShouldEmitEvent(testMountPath, "FuseRecoverFailed")).To(BeTrue()) // First + Expect(tracker.ShouldEmitEvent(testMountPath, "FuseRecoverFailed")).To(BeFalse()) // Duplicate + Expect(tracker.ShouldEmitEvent(testMountPath, "FuseRecoverFailed")).To(BeFalse()) // Duplicate + + // Recovery should reset state + tracker.RecordSuccess(testMountPath) + Expect(state.ConsecutiveFailures).To(Equal(0)) + Expect(state.CurrentBackoff).To(Equal(initialBackoff)) + Expect(state.IsHealthy).To(BeTrue()) + + // Next failure should allow event again (state transition) + tracker.RecordFailure(testMountPath) + Expect(tracker.ShouldEmitEvent(testMountPath, "FuseRecoverFailed")).To(BeTrue()) + }) + }) +})