From ccc53c55cc5b7b8a9152d743b73b426c7ed3b6dd Mon Sep 17 00:00:00 2001 From: James Munson Date: Fri, 3 May 2024 18:18:07 -0600 Subject: [PATCH 01/21] Share manager HA longhorn-6205 Signed-off-by: James Munson Signed-off-by: Phan Le (cherry picked from commit a12c12abc68d2d34c927fa99bce5185b8225f1f6) --- app/daemon.go | 35 ++- controller/engine_controller.go | 85 +++++- controller/instance_handler.go | 16 +- controller/node_controller.go | 8 +- controller/replica_controller.go | 22 ++ controller/share_manager_controller.go | 336 ++++++++++++++++++++- controller/utils.go | 7 + controller/volume_attachment_controller.go | 7 - controller/volume_controller.go | 45 ++- datastore/datastore.go | 7 + datastore/kubernetes.go | 105 +++++++ datastore/longhorn.go | 65 ++++ k8s/pkg/apis/longhorn/v1beta2/node.go | 1 + manager/volume.go | 2 +- types/setting.go | 13 + types/types.go | 15 + 16 files changed, 719 insertions(+), 50 deletions(-) diff --git a/app/daemon.go b/app/daemon.go index c6e98afa8e..79822f4d13 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -1,6 +1,7 @@ package app import ( + "context" "fmt" "net/http" _ "net/http/pprof" // for runtime profiling @@ -11,6 +12,7 @@ import ( "github.com/rancher/wrangler/pkg/signals" "github.com/sirupsen/logrus" "github.com/urfave/cli" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/longhorn/go-iscsi-helper/iscsi" @@ -144,6 +146,16 @@ func startManager(c *cli.Context) error { return fmt.Errorf("failed to detect the node IP") } + podName, err := util.GetRequiredEnv(types.EnvPodName) + if err != nil { + return fmt.Errorf("failed to detect the manager pod name") + } + + podNamespace, err := util.GetRequiredEnv(types.EnvPodNamespace) + if err != nil { + return fmt.Errorf("failed to detect the manager pod namespace") + } + ctx := signals.SetupSignalContext() logger := logrus.StandardLogger().WithField("node", currentNodeID) @@ -151,7 +163,8 @@ func startManager(c *cli.Context) error { // Conversion webhook needs to be started first since we use its port 9501 as readiness port. // longhorn-manager pod becomes ready only when conversion webhook is running. // The services in the longhorn-manager can then start to receive the requests. - // Conversion webhook does not longhorn datastore. + // Conversion webhook does not use datastore, since it is a prerequisite for + // datastore operation. clientsWithoutDatastore, err := client.NewClients(kubeconfigPath, false, ctx.Done()) if err != nil { return err @@ -159,6 +172,17 @@ func startManager(c *cli.Context) error { if err := webhook.StartWebhook(ctx, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil { return err } + + // This adds the label for the conversion webhook's selector. We do it the hard way without datastore to avoid chicken-and-egg. + pod, _ := clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Get(context.Background(), podName, v1.GetOptions{}) + labels := types.GetConversionWebhookLabel() + for key, value := range labels { + pod.Labels[key] = value + } + _, err = clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Update(context.Background(), pod, v1.UpdateOptions{}) + if err != nil { + return err + } if err := webhook.CheckWebhookServiceAvailability(types.WebhookTypeConversion); err != nil { return err } @@ -167,9 +191,13 @@ func startManager(c *cli.Context) error { if err != nil { return err } + if err := webhook.StartWebhook(ctx, types.WebhookTypeAdmission, clients); err != nil { return err } + if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetAdmissionWebhookLabel()); err != nil { + return err + } if err := webhook.CheckWebhookServiceAvailability(types.WebhookTypeAdmission); err != nil { return err } @@ -209,7 +237,7 @@ func startManager(c *cli.Context) error { return err } - if err := initDaemonNode(clients.Datastore); err != nil { + if err := initDaemonNode(clients.Datastore, currentNodeID); err != nil { return err } @@ -286,8 +314,7 @@ func updateRegistrySecretName(m *manager.VolumeManager) error { return nil } -func initDaemonNode(ds *datastore.DataStore) error { - nodeName := os.Getenv("NODE_NAME") +func initDaemonNode(ds *datastore.DataStore, nodeName string) error { if _, err := ds.GetNode(nodeName); err != nil { // init default disk on node when starting longhorn-manager if datastore.ErrorIsNotFound(err) { diff --git a/controller/engine_controller.go b/controller/engine_controller.go index fd210a5499..1b05e223a9 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -187,6 +187,18 @@ func NewEngineController( } ec.cacheSyncs = append(ec.cacheSyncs, ds.InstanceManagerInformer.HasSynced) + if _, err = ds.PodInformer.AddEventHandlerWithResyncPeriod(cache.FilteringResourceEventHandler{ + FilterFunc: isShareManagerPod, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: ec.enqueueShareManagerPodChange, + UpdateFunc: func(old, cur interface{}) { ec.enqueueShareManagerPodChange(cur) }, + DeleteFunc: ec.enqueueShareManagerPodChange, + }, + }, 0); err != nil { + return nil, err + } + ec.cacheSyncs = append(ec.cacheSyncs, ds.PodInformer.HasSynced) + return ec, nil } @@ -292,6 +304,7 @@ func (ec *EngineController) syncEngine(key string) (err error) { if !isResponsible { return nil } + if engine.Status.OwnerID != ec.controllerID { engine.Status.OwnerID = ec.controllerID engine, err = ec.ds.UpdateEngineStatus(engine) @@ -455,7 +468,43 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) { for _, e := range engineMap { ec.enqueueEngine(e) } +} + +func (ec *EngineController) enqueueShareManagerPodChange(obj interface{}) { + pod, isPod := obj.(*corev1.Pod) + if !isPod { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("received unexpected obj: %#v", obj)) + return + } + + // use the last known state, to enqueue, dependent objects + pod, ok = deletedState.Obj.(*corev1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert DeletedFinalStateUnknown to ShareManager Pod object: %#v", deletedState.Obj)) + return + } + } + + engineMap := map[string]*longhorn.Engine{} + smName := pod.Labels[types.GetLonghornLabelKey(types.LonghornLabelShareManager)] + + es, err := ec.ds.ListEnginesRO() + if err != nil { + ec.logger.WithError(err).Warn("Failed to list engines") + } + for _, e := range es { + // Volume name is the same as share manager name. + if e.Spec.VolumeName == smName { + engineMap[e.Name] = e + } + } + + for _, e := range engineMap { + ec.enqueueEngine(e) + } } func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { @@ -556,16 +605,9 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { } } - v, err := ec.ds.GetVolumeRO(e.Spec.VolumeName) + isRWXVolume, err := ec.ds.IsRegularRWXVolume(e.Spec.VolumeName) if err != nil { - if !apierrors.IsNotFound(err) { - return err - } - } - - isRWXVolume := false - if v != nil && v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable { - isRWXVolume = true + return err } // For a RWX volume, the node down, for example, caused by kubelet restart, leads to share-manager pod deletion/recreation @@ -580,6 +622,20 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { } } + // If the node is unreachable, don't bother with the successive timeouts we would spend attempting to contact + // its client proxy to delete the engine. + if isRWXVolume { + isDelinquent, _ := ec.ds.IsNodeDelinquent(im.Spec.NodeID, e.Spec.VolumeName) + if isDelinquent { + log.Infof("Skipping deleting RWX engine %v since IM node %v is delinquent", e.Name, im.Spec.NodeID) + if e.Spec.NodeID != "" { + log.Infof("Clearing delinquent nodeID for RWX engine %v", e.Name) + e.Spec.NodeID = "" + } + return nil + } + } + log.Info("Deleting engine instance") defer func() { @@ -2228,6 +2284,17 @@ func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineIm err = errors.Wrap(err, "error while checking isResponsibleFor") }() + // If there is a share manager pod for this and it has an owner, engine should use that too. + if isRWX, _ := ec.ds.IsRegularRWXVolume(e.Spec.VolumeName); isRWX { + if isDelinquent, _ := ec.ds.IsNodeDownOrDeletedOrDelinquent(e.Status.OwnerID, e.Spec.VolumeName); isDelinquent { + pod, err := ec.ds.GetPodRO(e.Namespace, types.GetShareManagerPodNameFromShareManagerName(e.Spec.VolumeName)) + if err == nil && pod != nil { + return ec.controllerID == pod.Spec.NodeName, nil + + } + } + } + isResponsible := isControllerResponsibleFor(ec.controllerID, ec.ds, e.Name, e.Spec.NodeID, e.Status.OwnerID) // The engine is not running, the owner node doesn't need to have e.Status.CurrentImage diff --git a/controller/instance_handler.go b/controller/instance_handler.go index c42566d980..47e2693bef 100644 --- a/controller/instance_handler.go +++ b/controller/instance_handler.go @@ -56,7 +56,19 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan } }() - if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown { + isDelinquent := false + if im != nil { + isDelinquent, _ = h.ds.IsNodeDelinquent(im.Spec.NodeID, spec.VolumeName) + } + imName := "nil" + nodeName := "empty" + if im != nil { + imName = im.Name + nodeName = im.Spec.NodeID + } + logrus.Infof("==================> instanceName: %v -------- isDelinquent: %v --------- im: %v ------------ nodeName: %v", instanceName, isDelinquent, imName, nodeName) + + if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown || isDelinquent { if status.Started { if status.CurrentState != longhorn.InstanceStateUnknown { logrus.Warnf("Marking the instance as state UNKNOWN since the related node %v of instance %v is down or deleted", spec.NodeID, instanceName) @@ -281,7 +293,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn return nil } // The related node maybe cleaned up then there is no available instance manager for this instance (typically it's replica). - isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeleted(spec.NodeID) + isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeletedOrDelinquent(spec.NodeID, spec.VolumeName) if err != nil { return err } diff --git a/controller/node_controller.go b/controller/node_controller.go index 51c45ccc65..3cc4406f17 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -383,7 +383,7 @@ func (nc *NodeController) syncNode(key string) (err error) { existingNode := node.DeepCopy() defer func() { - // we're going to update volume assume things changes + // we're going to update node assume things changes if err == nil && !reflect.DeepEqual(existingNode.Status, node.Status) { _, err = nc.ds.UpdateNodeStatus(node) } @@ -519,6 +519,12 @@ func (nc *NodeController) syncNode(key string) (err error) { return nil } + // Getting here is enough proof of life to Turn on the webhook endpoint, + // if it has been turned off for RWX failover. + if err := nc.ds.AddLabelToManagerPod(node.Name, types.GetAdmissionWebhookLabel()); err != nil { + log.Warnf("Node %v faied to restore its admission webhook", node.Name) + } + // Create a monitor for collecting disk information if _, err := nc.createDiskMonitor(); err != nil { return err diff --git a/controller/replica_controller.go b/controller/replica_controller.go index 0eb6c7b07b..b9b631cbcb 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -553,6 +553,20 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error { return nil } + isRWXVolume, err := rc.ds.IsRegularRWXVolume(r.Spec.VolumeName) + if err != nil { + return err + } + // If the node is unreachable, don't bother with the successive timeouts we would spend attempting to contact + // its client proxy to delete the engine. + if isRWXVolume { + isDelinquent, _ := rc.ds.IsNodeDelinquent(im.Spec.NodeID, r.Spec.VolumeName) + if isDelinquent { + log.Infof("Skipping deleting RWX replica %v since IM node %v is delinquent", r.Name, im.Spec.NodeID) + return nil + } + } + c, err := engineapi.NewInstanceManagerClient(im) if err != nil { return err @@ -901,6 +915,14 @@ func (rc *ReplicaController) enqueueAllRebuildingReplicaOnCurrentNode() { } func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) bool { + if isRWX, _ := rc.ds.IsRegularRWXVolume(r.Spec.VolumeName); isRWX { + if isDelinquent, _ := rc.ds.IsNodeDownOrDeletedOrDelinquent(r.Status.OwnerID, r.Spec.VolumeName); isDelinquent { + pod, err := rc.ds.GetPodRO(r.Namespace, types.GetShareManagerPodNameFromShareManagerName(r.Spec.VolumeName)) + if err == nil && pod != nil { + return rc.controllerID == pod.Spec.NodeName + } + } + } return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID) } diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 966e0a0594..94cd6c3a53 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -18,6 +18,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -35,6 +36,8 @@ import ( longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" ) +const shareManagerLeaseDurationSeconds = 7 // This should be slightly more than twice the share-manager lease renewal interval. + type ShareManagerController struct { *baseController @@ -48,6 +51,8 @@ type ShareManagerController struct { ds *datastore.DataStore cacheSyncs []cache.InformerSynced + + staleNodeMap map[string]string } func NewShareManagerController( @@ -77,6 +82,8 @@ func NewShareManagerController( eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-share-manager-controller"}), ds: ds, + + staleNodeMap: map[string]string{}, } var err error @@ -154,7 +161,7 @@ func (c *ShareManagerController) enqueueShareManagerForVolume(obj interface{}) { } } - if volume.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !volume.Spec.Migratable { + if isRegularRWXVolume(volume) { // we can queue the key directly since a share manager only manages a single volume from it's own namespace // and there is no need for us to retrieve the whole object, since we already know the volume name key := volume.Namespace + "/" + volume.Name @@ -203,6 +210,7 @@ func isShareManagerPod(obj interface{}) bool { } } + // This only matches once the pod is fully constructed, which may be the point. podContainers := pod.Spec.Containers for _, con := range podContainers { if con.Name == types.LonghornLabelShareManager { @@ -212,6 +220,42 @@ func isShareManagerPod(obj interface{}) bool { return false } +func (c *ShareManagerController) checkLeasesAndEnqueueAnyStale() error { + sms, err := c.ds.ListShareManagersRO() + if err != nil { + return err + } + for _, sm := range sms { + isStale, _, err := c.isShareManagerPodStale(sm) + if err != nil { + return err + } + if isStale { + c.enqueueShareManager(sm) + } + } + + return nil +} + +func (c *ShareManagerController) runLeaseCheck(stopCh <-chan struct{}) { + c.logger.Infof("Starting lease check goroutine") + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-stopCh: + c.logger.Info("Share manager lease check is ending") + return + case <-ticker.C: + if err := c.checkLeasesAndEnqueueAnyStale(); err != nil { + c.logger.WithError(err).Warn("Failed to check share-manager leases.") + } + } + } +} + func (c *ShareManagerController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -225,6 +269,7 @@ func (c *ShareManagerController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } + go c.runLeaseCheck(stopCh) <-stopCh } @@ -300,6 +345,9 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { } return err } + // This node may be an interim owner picked by isResponsibleFor until the pod is created + // and scheduled, at which point ownership will transfer to the pod's spec.nodename. + // But we need some controller to assume responsibility and do the restart. log.Infof("Share manager got new owner %v", c.controllerID) } @@ -330,11 +378,11 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { } }() - if err = c.syncShareManagerVolume(sm); err != nil { + if err = c.syncShareManagerPod(sm); err != nil { return err } - if err = c.syncShareManagerPod(sm); err != nil { + if err = c.syncShareManagerVolume(sm); err != nil { return err } @@ -617,8 +665,86 @@ func (c *ShareManagerController) syncShareManagerVolume(sm *longhorn.ShareManage return nil } +func (c *ShareManagerController) cleanupShareManagerService(shareManager *longhorn.ShareManager) error { + log := getLoggerForShareManager(c.logger, shareManager) + + service, err := c.ds.GetService(shareManager.Namespace, shareManager.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + log.Infof("Cleaning up share manager service %v", service.Name) + return c.ds.DeleteService(shareManager.Namespace, service.Name) +} + +// markShareManagerLeaseDelinquent zeros the acquire time field as a flag that the volume +// should be fast-tracked for failover away from the current lease-holding node. +func (c *ShareManagerController) markShareManagerDelinquent(sm *longhorn.ShareManager) error { + log := getLoggerForShareManager(c.logger, sm) + + lease, err := c.ds.GetLease(sm.Name) + if err != nil && !apierrors.IsNotFound(err) { + log.WithError(err).Warnf("Failed to retrieve lease for share manager %v from datastore", sm.Name) + return err + } + + if lease != nil { + holder := *lease.Spec.HolderIdentity + log.Infof("Marking lease %v held by suspect node %v for share manager failover.", sm.Name, holder) + lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Time{}} + _, err := c.ds.UpdateLease(lease) + if err != nil { + log.WithError(err).Warnf("Failed to update lease for share manager %v", sm.Name) + } + } + return err +} + +// clearShareManagerLease just zeros out the renew time field preparatory to pod +// cleanup, so it won't be flagged as stale in normal-path code. +func (c *ShareManagerController) clearShareManagerLease(sm *longhorn.ShareManager) error { + log := getLoggerForShareManager(c.logger, sm) + + lease, err := c.ds.GetLease(sm.Name) + if err != nil && !apierrors.IsNotFound(err) { + log.WithError(err).Warnf("Failed to retrieve lease for share manager %v from datastore", sm.Name) + return err + } + + if lease != nil { + holder := *lease.Spec.HolderIdentity + log.Infof("Clearing lease %v held by node %v for share manager pod cleanup.", sm.Name, holder) + lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Time{}} + _, err := c.ds.UpdateLease(lease) + if err != nil { + log.WithError(err).Warnf("Failed to clear lease for share manager %v", sm.Name) + } + } + return err +} + func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManager) error { log := getLoggerForShareManager(c.logger, sm) + + // Are we cleaning up after a lease timeout? + leaseExpired, leaseHolder, err := c.isShareManagerPodStale(sm) + if err != nil { + log.WithError(err).Warnf("Failed to check isShareManagerPodStale(%v) when cleanupShareManagerPod", sm.Name) + } + if leaseExpired { + // Remember this node so we can avoid it in the new pod we will create. + c.staleNodeMap[sm.Name] = leaseHolder + } + + // Clear the lease so we won't try to act on apparent staleness. Staleness is now either dealt with or moot. + err = c.clearShareManagerLease(sm) + if err != nil { + log.WithError(err).Warnf("Failed to clear lease holder for share manager (%v) when cleanupShareManagerPod", sm.Name) + } + podName := types.GetShareManagerPodNameFromShareManagerName(sm.Name) pod, err := c.ds.GetPod(podName) if err != nil && !apierrors.IsNotFound(err) { @@ -631,10 +757,14 @@ func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManage log.Infof("Deleting share manager pod") if err := c.ds.DeletePod(podName); err != nil && !apierrors.IsNotFound(err) { + log.WithError(err).Warnf("Failed to delete share manager pod") return err } - if nodeFailed, _ := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName); nodeFailed { + // Force delete if the pod's node is known dead, or likely so since it let + // the lease time out and another node's controller is cleaning up after it. + nodeFailed, _ := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName) + if nodeFailed || (leaseExpired && leaseHolder != c.controllerID) { log.Info("Force deleting pod to allow fail over since node of share manager pod is down") gracePeriod := int64(0) err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}) @@ -696,7 +826,28 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) } // If the node where the pod is running on become defective, we clean up the pod by setting sm.Status.State to STOPPED or ERROR - // A new pod will be recreated by the share manager controller. + // A new pod will be recreated by the share manager controller. We might get an early warning of that by the pod going stale. + isStale, _, err := c.isShareManagerPodStale(sm) + if err != nil { + log.WithError(err).Warnf("Failed to check isShareManagerPodStale(%v) when syncShareManagerPod", sm.Name) + } else if isStale { + log.Infof("ShareManager Pod %v is stale", pod.Name) + } + if isStale { + // if we just transitioned to the starting state, while the prior cleanup is still in progress we will switch to error state + // which will lead to a bad loop of starting (new workload) -> error (remount) -> stopped (cleanup sm) + if sm.Status.State == longhorn.ShareManagerStateStopping { + return nil + } + + if sm.Status.State != longhorn.ShareManagerStateStopped { + log.Info("Updating share manager to error state") + sm.Status.State = longhorn.ShareManagerStateError + } + + return nil + } + isDown, err := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName) if err != nil { log.WithError(err).Warnf("Failed to check IsNodeDownOrDeleted(%v) when syncShareManagerPod", pod.Spec.NodeName) @@ -782,6 +933,39 @@ func (c *ShareManagerController) getAffinityFromStorageClass(sc *storagev1.Stora } } +func (c *ShareManagerController) addStaleNodeAntiAffinity(affinity *corev1.Affinity, staleNode string) *corev1.Affinity { + var matchFields []corev1.NodeSelectorRequirement + + matchFields = append(matchFields, corev1.NodeSelectorRequirement{ + Key: "metadata.name", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{staleNode}, + }) + + // Note the difference between MatchFields and MatchExpressions. + //See https://stackoverflow.com/questions/67018171/kubernetes-what-are-valid-node-fields + nodeAntiAffinity := &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + corev1.PreferredSchedulingTerm{ + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchFields: matchFields, + }, + }, + }, + } + + if affinity == nil { + affinity = &corev1.Affinity{ + NodeAffinity: nodeAntiAffinity, + } + } else { + affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = nodeAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution + } + + return affinity +} + func (c *ShareManagerController) getShareManagerNodeSelectorFromStorageClass(sc *storagev1.StorageClass) map[string]string { value, ok := sc.Parameters["shareManagerNodeSelector"] if !ok { @@ -846,8 +1030,11 @@ func (c *ShareManagerController) createServiceAndEndpoint(shareManager *longhorn return nil } -// createShareManagerPod ensures existence of service, it's assumed that the pvc for this share manager already exists +// createShareManagerPod ensures existence of corresponding service and lease. +// it's assumed that the pvc for this share manager already exists. func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager) (*corev1.Pod, error) { + log := getLoggerForShareManager(c.logger, sm) + tolerations, err := c.ds.GetSettingTaintToleration() if err != nil { return nil, errors.Wrap(err, "failed to get taint toleration setting before creating share manager pod") @@ -886,6 +1073,17 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager return nil, errors.Wrapf(err, "failed to create service and endpoint for share manager %v", sm.Name) } + // likewise for the lease + if _, err := c.ds.GetLeaseRO(sm.Name); err != nil { + if !apierrors.IsNotFound(err) { + return nil, errors.Wrapf(err, "failed to get lease for share manager %v", sm.Name) + } + + if _, err = c.ds.CreateLease(c.createLeaseManifest(sm)); err != nil { + return nil, errors.Wrapf(err, "failed to create lease for share manager %v", sm.Name) + } + } + volume, err := c.ds.GetVolume(sm.Name) if err != nil { return nil, err @@ -923,6 +1121,13 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager } } + staleNode := c.staleNodeMap[sm.Name] + if staleNode != "" { + log.Infof("Creating anti-affinity for share manager pod against stale node %v", staleNode) + affinity = c.addStaleNodeAntiAffinity(affinity, staleNode) + delete(c.staleNodeMap, sm.Name) + } + fsType := pv.Spec.CSI.FSType mountOptions := pv.Spec.MountOptions @@ -971,8 +1176,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager if err != nil { return nil, errors.Wrapf(err, "failed to create pod for share manager %v", sm.Name) } - - getLoggerForShareManager(c.logger, sm).WithField("pod", pod.Name).Infof("Created pod for share manager on node %v", pod.Spec.NodeName) + log.WithField("pod", pod.Name).Infof("Created pod for share manager on node %v", pod.Spec.NodeName) return pod, nil } @@ -1040,6 +1244,32 @@ func (c *ShareManagerController) createEndpoint(sm *longhorn.ShareManager) (*cor return c.ds.CreateKubernetesEndpoint(newObj) } +func (c *ShareManagerController) createLeaseManifest(sm *longhorn.ShareManager) *coordinationv1.Lease { + // No current holder, share-manager pod will fill it with its owning node. + holderIdentity := "" + leaseDurationSeconds := int32(shareManagerLeaseDurationSeconds) + leaseTransitions := int32(0) + zeroTime := time.Now() + + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: sm.Name, + Namespace: c.namespace, + OwnerReferences: datastore.GetOwnerReferencesForShareManager(sm, false), + Labels: types.GetShareManagerInstanceLabel(sm.Name), + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &holderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &metav1.MicroTime{Time: zeroTime}, + RenewTime: &metav1.MicroTime{Time: zeroTime}, + LeaseTransitions: &leaseTransitions, + }, + } + + return lease +} + func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, annotations map[string]string, tolerations []corev1.Toleration, affinity *corev1.Affinity, pullPolicy corev1.PullPolicy, resourceReq *corev1.ResourceRequirements, registrySecret, priorityClass string, nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams) *corev1.Pod { @@ -1198,12 +1428,56 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an return podSpec } -// isResponsibleFor in most controllers we only checks if the node of the current owner is down -// but in the case where the node is unschedulable we want to transfer ownership, -// since we will create sm pod on the sm.Status.OwnerID when the sm starts +// isShareManagerPodStale checks the associated lease CR to see whether the current pod (if any) +// has fallen behind on renewing the lease. If there is any error finding out, we assume not stale. +func (c *ShareManagerController) isShareManagerPodStale(sm *longhorn.ShareManager) (stale bool, holder string, err error) { + log := getLoggerForShareManager(c.logger, sm) + + if enabled, _ := c.ds.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover); !enabled { + // stale is false, holder is empty, err is nil + return + } + + leaseName := sm.Name + lease, err := c.ds.GetLeaseRO(leaseName) + if err != nil { + return + } + + // Consider it stale if + // - there is a lease-holding node, ie there is a share-manager version running that knows to acquire the lease. + // - the pod is not being cleaned up, + // - the lease duration is a sane value, and + // - the time of renewal is longer ago than the lease duration. + holder = *lease.Spec.HolderIdentity + if holder == "" { + return + } + if (lease.Spec.RenewTime).IsZero() { + // log.Warnf("Lease for %v held by %v has already been cleared for deletion", leaseName, holder) + return + } + if *lease.Spec.LeaseDurationSeconds < shareManagerLeaseDurationSeconds { + // log.Warnf("Lease for %v has a crazy value for duration: %v seconds. Ignoring.", leaseName, *lease.Spec.LeaseDurationSeconds) + return + } + expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + if time.Now().After(expireTime) { + log.Warnf("Lease for %v held by %v is stale, expired %v ago", leaseName, holder, time.Since(expireTime)) + stale = true + } + + return +} + +// isResponsibleFor in most controllers only checks if the node of the current owner is known +// by kubernetes to be down. But in the case where the lease is stale or the node is unschedulable +// we want to transfer ownership and mark the node as delinquent for related status checking. func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bool, error) { - // We prefer keeping the owner of the share manager CR to be the same node - // where the share manager pod is running on. + log := getLoggerForShareManager(c.logger, sm) + + // We prefer keeping the owner of the share manager CR to be the node + // where the share manager pod got scheduled and is running. preferredOwnerID := "" podName := types.GetShareManagerPodNameFromShareManagerName(sm.Name) pod, err := c.ds.GetPodRO(c.namespace, podName) @@ -1211,6 +1485,7 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo preferredOwnerID = pod.Spec.NodeName } + // Base class method is used to decide based on node schedulable condition. isResponsible := isControllerResponsibleFor(c.controllerID, c.ds, sm.Name, preferredOwnerID, sm.Status.OwnerID) readyAndSchedulableNodes, err := c.ds.ListReadyAndSchedulableNodesRO() @@ -1229,5 +1504,40 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo continueToBeOwner := currentNodeSchedulable && !preferredOwnerSchedulable && c.controllerID == sm.Status.OwnerID requiresNewOwner := currentNodeSchedulable && !preferredOwnerSchedulable && !currentOwnerSchedulable + isNodeAvailable := func(node string) bool { + isUnavailable, _ := c.ds.IsNodeDownOrDeletedOrMissingManager(node) + return node != "" && !isUnavailable + } + + // If the lease is stale, we assume the owning node is down but not officially dead. + // Some node has to take over, and it might as well be this one, if another one + // has not already. + isStale, leaseHolder, err := c.isShareManagerPodStale(sm) + if err == nil && isStale { + // Avoid race between nodes by checking for an existing interim owner. + if leaseHolder != sm.Status.OwnerID && + c.controllerID != sm.Status.OwnerID && + isNodeAvailable(sm.Status.OwnerID) && + c.ds.IsNodeSchedulable(sm.Status.OwnerID) { + return false, nil + } + log.Infof("Interim owner %v taking responsibility for stale lease-holder %v", c.controllerID, leaseHolder) + + // TODO: move this to main sync loop + // This code should be moved from here to the sync loop, but it requires care. + // The first naive attempt caused repeated calls to cleanup and delete/create. + if err := c.markShareManagerDelinquent(sm); err != nil { + log.WithError(err).Warnf("Failed to update leease to set delinquent condition for %v", sm.Name) + } + + // Also, turn off the admission webhook on the suspected node. Trying to talk to it + // will delay any effort to modify resources. + if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetAdmissionWebhookLabel()); err != nil { + log.WithError(err).Warnf("Failed to turn off admission webhook on node %v", leaseHolder) + } + + return currentNodeSchedulable, nil + } + return isPreferredOwner || continueToBeOwner || requiresNewOwner, nil } diff --git a/controller/utils.go b/controller/utils.go index 72b0cadc4e..0535d5ff9f 100644 --- a/controller/utils.go +++ b/controller/utils.go @@ -83,3 +83,10 @@ func setReplicaFailedAt(r *longhorn.Replica, timestamp string) { r.Spec.LastFailedAt = timestamp } } + +func isRegularRWXVolume(v *longhorn.Volume) bool { + if v == nil { + return false + } + return v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable +} diff --git a/controller/volume_attachment_controller.go b/controller/volume_attachment_controller.go index b58f3971a7..04cef6e05f 100644 --- a/controller/volume_attachment_controller.go +++ b/controller/volume_attachment_controller.go @@ -940,13 +940,6 @@ func isCSIAttacherTicketOfRegularRWXVolume(attachmentTicket *longhorn.Attachment return isRegularRWXVolume(v) && isCSIAttacherTicket(attachmentTicket) } -func isRegularRWXVolume(v *longhorn.Volume) bool { - if v == nil { - return false - } - return v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable -} - func isCSIAttacherTicket(ticket *longhorn.AttachmentTicket) bool { if ticket == nil { return false diff --git a/controller/volume_controller.go b/controller/volume_controller.go index d8569c19b8..c1d5b0cc96 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -152,14 +152,17 @@ func NewVolumeController( } c.cacheSyncs = append(c.cacheSyncs, ds.ReplicaInformer.HasSynced) - if _, err = ds.ShareManagerInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueVolumesForShareManager, - UpdateFunc: func(old, cur interface{}) { c.enqueueVolumesForShareManager(cur) }, - DeleteFunc: c.enqueueVolumesForShareManager, + if _, err = ds.PodInformer.AddEventHandlerWithResyncPeriod(cache.FilteringResourceEventHandler{ + FilterFunc: isShareManagerPod, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueVolumesForShareManager, + UpdateFunc: func(old, cur interface{}) { c.enqueueVolumesForShareManager(cur) }, + DeleteFunc: c.enqueueVolumesForShareManager, + }, }, 0); err != nil { return nil, err } - c.cacheSyncs = append(c.cacheSyncs, ds.ShareManagerInformer.HasSynced) + c.cacheSyncs = append(c.cacheSyncs, ds.PodInformer.HasSynced) if _, err = ds.BackupVolumeInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, cur interface{}) { c.enqueueVolumesForBackupVolume(cur) }, @@ -2038,7 +2041,7 @@ func (c *VolumeController) reconcileVolumeSize(v *longhorn.Volume, e *longhorn.E } func (c *VolumeController) canInstanceManagerLaunchReplica(r *longhorn.Replica) (bool, error) { - nodeDown, err := c.ds.IsNodeDownOrDeleted(r.Spec.NodeID) + nodeDown, err := c.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, r.Spec.VolumeName) if err != nil { return false, errors.Wrapf(err, "fail to check IsNodeDownOrDeleted %v", r.Spec.NodeID) } @@ -4343,6 +4346,12 @@ func (c *VolumeController) IsReplicaUnavailable(r *longhorn.Replica) (bool, erro return true, nil } + if isRWX, _ := c.ds.IsRegularRWXVolume(r.Spec.VolumeName); isRWX { + if isDelinquent, _ := c.ds.IsNodeDelinquent(r.Spec.NodeID, r.Spec.VolumeName); isDelinquent { + return true, nil + } + } + node, err := c.ds.GetNodeRO(r.Spec.NodeID) if err != nil { return true, errors.Wrapf(err, "failed to get node %v for failed replica %v", r.Spec.NodeID, r.Name) @@ -4368,6 +4377,16 @@ func (c *VolumeController) isResponsibleFor(v *longhorn.Volume, defaultEngineIma err = errors.Wrap(err, "error while checking isResponsibleFor") }() + // If there is a share manager pod and it has an owner, we should use that too. + if isRWX := isRegularRWXVolume(v); isRWX { + if isDelinquent, _ := c.ds.IsNodeDownOrDeletedOrDelinquent(v.Status.OwnerID, v.Name); isDelinquent { + pod, err := c.ds.GetPodRO(v.Namespace, types.GetShareManagerPodNameFromShareManagerName(v.Name)) + if err == nil && pod != nil { + return c.controllerID == pod.Spec.NodeName, nil + } + } + } + isResponsible := isControllerResponsibleFor(c.controllerID, c.ds, v.Name, v.Spec.NodeID, v.Status.OwnerID) if types.IsDataEngineV1(v.Spec.DataEngine) { @@ -4428,8 +4447,8 @@ func (c *VolumeController) deleteEngine(e *longhorn.Engine, es map[string]*longh // enqueueVolumesForShareManager enqueues all volumes that are currently claimed by this share manager func (c *VolumeController) enqueueVolumesForShareManager(obj interface{}) { - sm, isShareManager := obj.(*longhorn.ShareManager) - if !isShareManager { + pod, isPod := obj.(*corev1.Pod) + if !isPod { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("received unexpected obj: %#v", obj)) @@ -4437,16 +4456,16 @@ func (c *VolumeController) enqueueVolumesForShareManager(obj interface{}) { } // use the last known state, to requeue the claimed volumes - sm, ok = deletedState.Obj.(*longhorn.ShareManager) + pod, ok = deletedState.Obj.(*corev1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("DeletedFinalStateUnknown contained non ShareManager object: %#v", deletedState.Obj)) + utilruntime.HandleError(fmt.Errorf("DeletedFinalStateUnknown contained non ShareManager Pod object: %#v", deletedState.Obj)) return } } - // we can queue the key directly since a share manager only manages a single volume from it's own namespace - // and there is no need for us to retrieve the whole object, since we already know the volume name - key := sm.Namespace + "/" + sm.Name + // sharemanager name is the same as volume name. + smName := pod.Labels[types.GetLonghornLabelKey(types.LonghornLabelShareManager)] + key := pod.Namespace + "/" + smName c.queue.Add(key) } diff --git a/datastore/datastore.go b/datastore/datastore.go index 2091279284..e933165592 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -10,6 +10,7 @@ import ( clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" batchlisters_v1 "k8s.io/client-go/listers/batch/v1" + coordinationlisters "k8s.io/client-go/listers/coordination/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" schedulinglisters "k8s.io/client-go/listers/scheduling/v1" @@ -123,6 +124,8 @@ type DataStore struct { ServiceInformer cache.SharedInformer endpointLister corelisters.EndpointsLister EndpointInformer cache.SharedInformer + leaseLister coordinationlisters.LeaseLister + LeaseInformer cache.SharedInformer extensionsClient apiextensionsclientset.Interface } @@ -194,6 +197,8 @@ func NewDataStore(namespace string, lhClient lhclientset.Interface, kubeClient c cacheSyncs = append(cacheSyncs, storageclassInformer.Informer().HasSynced) priorityClassInformer := informerFactories.KubeInformerFactory.Scheduling().V1().PriorityClasses() cacheSyncs = append(cacheSyncs, priorityClassInformer.Informer().HasSynced) + leaseInformer := informerFactories.KubeInformerFactory.Coordination().V1().Leases() + cacheSyncs = append(cacheSyncs, leaseInformer.Informer().HasSynced) // Filtered kube Informers by longhorn-system namespace cronJobInformer := informerFactories.KubeNamespaceFilteredInformerFactory.Batch().V1().CronJobs() @@ -281,6 +286,8 @@ func NewDataStore(namespace string, lhClient lhclientset.Interface, kubeClient c StorageClassInformer: storageclassInformer.Informer(), priorityClassLister: priorityClassInformer.Lister(), PriorityClassInformer: priorityClassInformer.Informer(), + leaseLister: leaseInformer.Lister(), + LeaseInformer: leaseInformer.Informer(), cronJobLister: cronJobInformer.Lister(), CronJobInformer: cronJobInformer.Informer(), diff --git a/datastore/kubernetes.go b/datastore/kubernetes.go index e72507d79d..122f6cc185 100644 --- a/datastore/kubernetes.go +++ b/datastore/kubernetes.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/version" @@ -18,6 +19,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -243,11 +245,69 @@ func (s *DataStore) UpdatePod(obj *corev1.Pod) (*corev1.Pod, error) { return s.kubeClient.CoreV1().Pods(s.namespace).Update(context.TODO(), obj, metav1.UpdateOptions{}) } +// CreateLease creates a Lease resource for the given CreateLease object +func (s *DataStore) CreateLease(lease *coordinationv1.Lease) (*coordinationv1.Lease, error) { + return s.kubeClient.CoordinationV1().Leases(s.namespace).Create(context.TODO(), lease, metav1.CreateOptions{}) +} + +// GetLease gets the Lease for the given name +func (s *DataStore) GetLeaseRO(name string) (*coordinationv1.Lease, error) { + return s.leaseLister.Leases(s.namespace).Get(name) + // return s.kubeClient.CoordinationV1().Leases(namespace).Get(context.Background(), name, metav1.GetOptions{}) +} + +// GetLease returns a new Lease object for the given name +func (s *DataStore) GetLease(name string) (*coordinationv1.Lease, error) { + resultRO, err := s.GetLeaseRO(name) + if err != nil { + return nil, err + } + // Cannot use cached object from lister + return resultRO.DeepCopy(), nil +} + +// ListLeasesRO returns a list of all Leases for the given namespace, +// the list contains direct references to the internal cache objects and should not be mutated. +func (s *DataStore) ListLeasesRO() ([]*coordinationv1.Lease, error) { + return s.leaseLister.Leases(s.namespace).List(labels.Everything()) +} + // DeleteLease deletes Lease with the given name in s.namespace func (s *DataStore) DeleteLease(name string) error { return s.kubeClient.CoordinationV1().Leases(s.namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) } +// UpdateLease updates the Lease resource with the given object and namespace +func (s *DataStore) UpdateLease(lease *coordinationv1.Lease) (*coordinationv1.Lease, error) { + return s.kubeClient.CoordinationV1().Leases(s.namespace).Update(context.TODO(), lease, metav1.UpdateOptions{}) +} + +// IsRWXVolumeInFailover checks whether the volume has a lease by the same name, which an RWX volume should, +// and whether that lease's spec shows that its holder is delinquent (its acquire time has been zeroed.) +// If so, return the delinquent holder. +// Any hiccup yields a return of "false". +func (s *DataStore) IsRWXVolumeInFailover(name string) (failover bool, holder string, err error) { + if enabled, _ := s.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover); !enabled { + // failover is false, holder is empty, err is nil + return + } + + lease, err := s.GetLeaseRO(name) + if err != nil { + logrus.WithError(err).Warnf("Failed to get lease for RWX volume %v", name) + return + } + + holder = *lease.Spec.HolderIdentity + if holder == "" { + return + } + if (lease.Spec.AcquireTime).IsZero() { + failover = true + } + return +} + // GetStorageClassRO gets StorageClass with the given name // This function returns direct reference to the internal cache object and should not be mutated. // Consider using this function when you can guarantee read only access and don't want the overhead of deep copies @@ -430,6 +490,51 @@ func (s *DataStore) ListManagerPodsRO() ([]*corev1.Pod, error) { return s.ListPodsBySelectorRO(selector) } +func (s *DataStore) GetManagerPodForNode(nodeName string) (*corev1.Pod, error) { + pods, err := s.ListManagerPods() + if err != nil { + return nil, err + } + for _, pod := range pods { + if pod.Spec.NodeName == nodeName { + return pod, nil + } + } + return nil, errors.NewNotFound(corev1.Resource("pod"), nodeName) +} + +func (s *DataStore) AddLabelToManagerPod(nodeName string, label map[string]string) error { + pod, err := s.GetManagerPodForNode(nodeName) + if err != nil { + return err + } + changed := false + for key, value := range label { + if _, exists := pod.Labels[key]; !exists { + pod.Labels[key] = value + changed = true + } + } + + // Protect against frequent no-change updates. + if changed { + _, err = s.UpdatePod(pod) + } + return err +} + +func (s *DataStore) RemoveLabelFromManagerPod(nodeName string, label map[string]string) error { + pod, err := s.GetManagerPodForNode(nodeName) + if err != nil { + return err + } + for key := range label { + delete(pod.Labels, key) + } + _, err = s.UpdatePod(pod) + return err +} + func getInstanceManagerComponentSelector() (labels.Selector, error) { return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: types.GetInstanceManagerComponentLabel(), diff --git a/datastore/longhorn.go b/datastore/longhorn.go index 4e2efdf3f6..408c528d73 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -962,6 +962,20 @@ func (s *DataStore) ListVolumes() (map[string]*longhorn.Volume, error) { return itemMap, nil } +func (s *DataStore) IsRegularRWXVolume(volumeName string) (bool, error) { + v, err := s.GetVolumeRO(volumeName) + if err != nil { + if !apierrors.IsNotFound(err) { + return false, err + } + } + + if v != nil && v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable { + return true, nil + } + return false, nil +} + func MarshalLabelToVolumeRecurringJob(labels map[string]string) map[string]*longhorn.VolumeRecurringJob { groupPrefix := fmt.Sprintf(types.LonghornLabelRecurringJobKeyPrefixFmt, types.LonghornLabelRecurringJobGroup) + "/" jobPrefix := fmt.Sprintf(types.LonghornLabelRecurringJobKeyPrefixFmt, types.LonghornLabelRecurringJob) + "/" @@ -2930,6 +2944,53 @@ func (s *DataStore) IsNodeDownOrDeleted(name string) (bool, error) { return false, nil } +// IsNodeDelinquent checks an early-warning condition of Lease expiration +// that is of interest to share-manager types. +func (s *DataStore) IsNodeDelinquent(nodeName string, volumeName string) (bool, error) { + if nodeName == "" { + return false, errors.New("no node name provided to IsNodeDelinquent") + } + + if volumeName == "" { + return false, errors.New("no volume name provided to IsNodeDelinquent") + } + isRWX, _ := s.IsRegularRWXVolume(volumeName) + if isRWX { + inFailover, delinquentNode, err := s.IsRWXVolumeInFailover(volumeName) + if err != nil { + return false, err + } + if inFailover && delinquentNode == nodeName { + return true, nil + } + } + return false, nil +} + +// IsNodeDownOrDeletedOrDelinquent gets Node for the given name and checks +// if the Node condition is gone or not ready or, if we are asking on behalf +// of an RWX-related resource, delinquent for that resource's volume. +func (s *DataStore) IsNodeDownOrDeletedOrDelinquent(nodeName string, volumeName string) (bool, error) { + if nodeName == "" { + return false, errors.New("no node name provided to check node down or deleted or delinquent") + } + node, err := s.GetNodeRO(nodeName) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + return false, err + } + cond := types.GetCondition(node.Status.Conditions, longhorn.NodeConditionTypeReady) + if cond.Status == longhorn.ConditionStatusFalse && + (cond.Reason == string(longhorn.NodeConditionReasonKubernetesNodeGone) || + cond.Reason == string(longhorn.NodeConditionReasonKubernetesNodeNotReady)) { + return true, nil + } + + return s.IsNodeDelinquent(nodeName, volumeName) +} + // IsNodeDeleted checks whether the node does not exist by passing in the node name func (s *DataStore) IsNodeDeleted(name string) (bool, error) { if name == "" { @@ -3835,6 +3896,10 @@ func (s *DataStore) ListShareManagers() (map[string]*longhorn.ShareManager, erro return itemMap, nil } +func (s *DataStore) ListShareManagersRO() ([]*longhorn.ShareManager, error) { + return s.shareManagerLister.ShareManagers(s.namespace).List(labels.Everything()) +} + // CreateBackupTarget creates a Longhorn BackupTargets CR and verifies creation func (s *DataStore) CreateBackupTarget(backupTarget *longhorn.BackupTarget) (*longhorn.BackupTarget, error) { ret, err := s.lhClient.LonghornV1beta2().BackupTargets(s.namespace).Create(context.TODO(), backupTarget, metav1.CreateOptions{}) diff --git a/k8s/pkg/apis/longhorn/v1beta2/node.go b/k8s/pkg/apis/longhorn/v1beta2/node.go index 7943543bbf..6a823b9491 100644 --- a/k8s/pkg/apis/longhorn/v1beta2/node.go +++ b/k8s/pkg/apis/longhorn/v1beta2/node.go @@ -26,6 +26,7 @@ const ( NodeConditionReasonKernelConfigIsNotFound = "KernelConfigIsNotFound" NodeConditionReasonNFSClientIsNotFound = "NFSClientIsNotFound" NodeConditionReasonKubernetesNodeCordoned = "KubernetesNodeCordoned" + NodeConditionReasonMissedLeaseRenewal = "MissedLeaseRenewal" ) const ( diff --git a/manager/volume.go b/manager/volume.go index 4bbaef2c7f..1ef08cad71 100644 --- a/manager/volume.go +++ b/manager/volume.go @@ -347,7 +347,7 @@ func (m *VolumeManager) Salvage(volumeName string, replicaNames []string) (v *lo if r.Spec.VolumeName != v.Name { return nil, fmt.Errorf("replica %v doesn't belong to volume %v", r.Name, v.Name) } - isDownOrDeleted, err := m.ds.IsNodeDownOrDeleted(r.Spec.NodeID) + isDownOrDeleted, err := m.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, v.Name) if err != nil { return nil, fmt.Errorf("failed to check if the related node %v is still running for replica %v", r.Spec.NodeID, name) } diff --git a/types/setting.go b/types/setting.go index 7d3c3b1978..9a88ed04bb 100644 --- a/types/setting.go +++ b/types/setting.go @@ -136,6 +136,7 @@ const ( SettingNameFreezeFilesystemForSnapshot = SettingName("freeze-filesystem-for-snapshot") SettingNameAutoCleanupSnapshotWhenDeleteBackup = SettingName("auto-cleanup-when-delete-backup") SettingNameDefaultMinNumberOfBackingImageCopies = SettingName("default-min-number-of-backing-image-copies") + SettingNameEnableShareManagerFastFailover = SettingName("enable-share-manager-fast-failover") ) var ( @@ -228,6 +229,7 @@ var ( SettingNameFreezeFilesystemForSnapshot, SettingNameAutoCleanupSnapshotWhenDeleteBackup, SettingNameDefaultMinNumberOfBackingImageCopies, + SettingNameEnableShareManagerFastFailover, } ) @@ -348,6 +350,7 @@ var ( SettingNameFreezeFilesystemForSnapshot: SettingDefinitionFreezeFilesystemForSnapshot, SettingNameAutoCleanupSnapshotWhenDeleteBackup: SettingDefinitionAutoCleanupSnapshotWhenDeleteBackup, SettingNameDefaultMinNumberOfBackingImageCopies: SettingDefinitionDefaultMinNumberOfBackingImageCopies, + SettingNameEnableShareManagerFastFailover: SettingDefinitionEnableShareManagerFastFailover, } SettingDefinitionBackupTarget = SettingDefinition{ @@ -1482,6 +1485,16 @@ var ( ValueIntRangeMinimum: 1, }, } + + SettingDefinitionEnableShareManagerFastFailover = SettingDefinition{ + DisplayName: "Enable Share Manager Fast Failover", + Description: "Turn on logic to detect and move stale RWX volumes quickly (Experimental)", + Category: SettingCategoryDangerZone, + Type: SettingTypeBool, + Required: true, + ReadOnly: false, + Default: "false", + } ) type NodeDownPodDeletionPolicy string diff --git a/types/types.go b/types/types.go index 63272530cc..d37c4b4d21 100644 --- a/types/types.go +++ b/types/types.go @@ -174,6 +174,8 @@ const ( LonghornLabelLastSystemRestoreBackup = "last-system-restored-backup" LonghornLabelDataEngine = "data-engine" LonghornLabelVersion = "version" + LonghornLabelAdmissionWebhook = "admission-webhook" + LonghornLabelConversionWebhook = "conversion-webhook" LonghornLabelValueEnabled = "enabled" LonghornLabelValueIgnored = "ignored" @@ -220,6 +222,7 @@ const ( const ( EnvNodeName = "NODE_NAME" + EnvPodName = "POD_NAME" EnvPodNamespace = "POD_NAMESPACE" EnvPodIP = "POD_IP" EnvServiceAccount = "SERVICE_ACCOUNT" @@ -421,6 +424,18 @@ func GetManagerLabels() map[string]string { } } +func GetAdmissionWebhookLabel() map[string]string { + return map[string]string{ + GetLonghornLabelKey(LonghornLabelAdmissionWebhook): AdmissionWebhookServiceName, + } +} + +func GetConversionWebhookLabel() map[string]string { + return map[string]string{ + GetLonghornLabelKey(LonghornLabelConversionWebhook): ConversionWebhookServiceName, + } +} + func GetEngineImageLabels(engineImageName string) map[string]string { labels := GetBaseLabelsForSystemManagedComponent() labels[GetLonghornLabelComponentKey()] = LonghornLabelEngineImage From 4bdb74a99c9d702a3369b6acab11dd573bcd6a78 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Wed, 17 Jul 2024 13:16:08 -0700 Subject: [PATCH 02/21] Removed unused cleanupShareManagerService function longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit 61d6b3e7bb41a54df68295fa72d5cc2815866ab5) --- controller/share_manager_controller.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 94cd6c3a53..66cfe5099a 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -665,21 +665,6 @@ func (c *ShareManagerController) syncShareManagerVolume(sm *longhorn.ShareManage return nil } -func (c *ShareManagerController) cleanupShareManagerService(shareManager *longhorn.ShareManager) error { - log := getLoggerForShareManager(c.logger, shareManager) - - service, err := c.ds.GetService(shareManager.Namespace, shareManager.Name) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - - log.Infof("Cleaning up share manager service %v", service.Name) - return c.ds.DeleteService(shareManager.Namespace, service.Name) -} - // markShareManagerLeaseDelinquent zeros the acquire time field as a flag that the volume // should be fast-tracked for failover away from the current lease-holding node. func (c *ShareManagerController) markShareManagerDelinquent(sm *longhorn.ShareManager) error { From 70b3b763ec4e0feb8cd173c9eac39fd4367e808d Mon Sep 17 00:00:00 2001 From: Phan Le Date: Wed, 17 Jul 2024 17:15:53 -0700 Subject: [PATCH 03/21] Fix recovery backend failure longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit 9834f2e678ce18c873c0384f2bfe0e8db3476290) --- app/daemon.go | 6 ++++++ controller/node_controller.go | 5 +++++ controller/share_manager_controller.go | 21 ++++++++++++++++++++- types/types.go | 7 +++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/app/daemon.go b/app/daemon.go index 79822f4d13..3395ec14ad 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -206,6 +206,12 @@ func startManager(c *cli.Context) error { return err } + // TODO: polish this code + // longhorn.io/component: longhorn-recovery-backend + if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetRecoveryBackendLabel()); err != nil { + return err + } + if err := upgrade.Upgrade(kubeconfigPath, currentNodeID, managerImage, c.Bool(FlagUpgradeVersionCheck)); err != nil { return err } diff --git a/controller/node_controller.go b/controller/node_controller.go index 3cc4406f17..bd218ca34b 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -525,6 +525,11 @@ func (nc *NodeController) syncNode(key string) (err error) { log.Warnf("Node %v faied to restore its admission webhook", node.Name) } + // TODO: polish this code + if err := nc.ds.AddLabelToManagerPod(node.Name, types.GetRecoveryBackendLabel()); err != nil { + log.Warnf("Node %v faied to restore its recovery backend", node.Name) + } + // Create a monitor for collecting disk information if _, err := nc.createDiskMonitor(); err != nil { return err diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 66cfe5099a..3d5ce8fa0a 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -351,6 +351,19 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { log.Infof("Share manager got new owner %v", c.controllerID) } + //// TODO: move this to main sync loop + //// This code should be moved from here to the sync loop, but it requires care. + //// The first naive attempt caused repeated calls to cleanup and delete/create. + //if err := c.markShareManagerDelinquent(sm); err != nil { + // log.WithError(err).Warnf("Failed to update leease to set delinquent condition for %v", sm.Name) + //} + // + //// Also, turn off the admission webhook on the suspected node. Trying to talk to it + //// will delay any effort to modify resources. + //if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetAdmissionWebhookLabel()); err != nil { + // log.WithError(err).Warnf("Failed to turn off admission webhook on node %v", leaseHolder) + //} + if sm.DeletionTimestamp != nil { if err := c.cleanupShareManagerPod(sm); err != nil { return err @@ -1515,12 +1528,18 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo log.WithError(err).Warnf("Failed to update leease to set delinquent condition for %v", sm.Name) } + // TODO: polish this code // Also, turn off the admission webhook on the suspected node. Trying to talk to it // will delay any effort to modify resources. if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetAdmissionWebhookLabel()); err != nil { - log.WithError(err).Warnf("Failed to turn off admission webhook on node %v", leaseHolder) + log.WithError(err).Warnf("Failed to turn off admission webhook/recovery backed on node %v", leaseHolder) } + //// TODO: polish this code + //if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetRecoveryBackendLabel()); err != nil { + // log.WithError(err).Warnf("Failed to turn off recovery backed on node %v", leaseHolder) + //} + return currentNodeSchedulable, nil } diff --git a/types/types.go b/types/types.go index d37c4b4d21..d3d651b60e 100644 --- a/types/types.go +++ b/types/types.go @@ -427,6 +427,13 @@ func GetManagerLabels() map[string]string { func GetAdmissionWebhookLabel() map[string]string { return map[string]string{ GetLonghornLabelKey(LonghornLabelAdmissionWebhook): AdmissionWebhookServiceName, + "longhorn.io/component": "longhorn-recovery-backend", + } +} + +func GetRecoveryBackendLabel() map[string]string { + return map[string]string{ + "longhorn.io/component": "longhorn-recovery-backend", } } From 594d57b4af09482ec732c7c87583217de4a14492 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Thu, 18 Jul 2024 17:37:33 -0700 Subject: [PATCH 04/21] Code refinement: refactor the isResponsibleFor longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit df5b510e57d00ef50947b9a856af26f1f6647445) --- controller/share_manager_controller.go | 118 +++++++++++++------------ datastore/kubernetes.go | 33 ++++--- datastore/longhorn.go | 4 +- types/types.go | 11 +++ upgrade/util/util.go | 11 --- 5 files changed, 94 insertions(+), 83 deletions(-) diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 3d5ce8fa0a..0ae2dd61b7 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -351,19 +351,6 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { log.Infof("Share manager got new owner %v", c.controllerID) } - //// TODO: move this to main sync loop - //// This code should be moved from here to the sync loop, but it requires care. - //// The first naive attempt caused repeated calls to cleanup and delete/create. - //if err := c.markShareManagerDelinquent(sm); err != nil { - // log.WithError(err).Warnf("Failed to update leease to set delinquent condition for %v", sm.Name) - //} - // - //// Also, turn off the admission webhook on the suspected node. Trying to talk to it - //// will delay any effort to modify resources. - //if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetAdmissionWebhookLabel()); err != nil { - // log.WithError(err).Warnf("Failed to turn off admission webhook on node %v", leaseHolder) - //} - if sm.DeletionTimestamp != nil { if err := c.cleanupShareManagerPod(sm); err != nil { return err @@ -377,6 +364,33 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { return c.ds.RemoveFinalizerForShareManager(sm) } + isStale, leaseHolder, err := c.isShareManagerPodStale(sm) + if err != nil { + return err + } + if isStale { + isDelinquent, _, err := c.ds.IsRWXVolumeInDelinquent(sm.Name) + if err != nil { + return err + } + if !isDelinquent { + // Turn off the traffic to the admission webhook and recovery backend on the suspected node. + // Trying to talk to it will delay any effort to modify resources. + // Only turn off the other nodes to avoid a deadlock in a single node cluster. + if c.controllerID != leaseHolder { + labels := types.MergeStringMaps(types.GetAdmissionWebhookLabel(), types.GetRecoveryBackendLabel()) + if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, labels); err != nil { + return errors.Wrapf(err, "failed to turn off admission webhook/recovery backed on node %v", leaseHolder) + } + } + + log.Infof("Marking share manager %v delinquent", sm.Name) + if err := c.markShareManagerDelinquent(sm); err != nil { + return err + } + } + } + // update at the end, after the whole reconcile loop existingShareManager := sm.DeepCopy() defer func() { @@ -679,26 +693,22 @@ func (c *ShareManagerController) syncShareManagerVolume(sm *longhorn.ShareManage } // markShareManagerLeaseDelinquent zeros the acquire time field as a flag that the volume -// should be fast-tracked for failover away from the current lease-holding node. -func (c *ShareManagerController) markShareManagerDelinquent(sm *longhorn.ShareManager) error { - log := getLoggerForShareManager(c.logger, sm) +// should be fast-tracked for detaching away from the current lease-holding node. +func (c *ShareManagerController) markShareManagerDelinquent(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to markShareManagerDelinquent") + }() lease, err := c.ds.GetLease(sm.Name) - if err != nil && !apierrors.IsNotFound(err) { - log.WithError(err).Warnf("Failed to retrieve lease for share manager %v from datastore", sm.Name) + if err != nil { return err } - if lease != nil { - holder := *lease.Spec.HolderIdentity - log.Infof("Marking lease %v held by suspect node %v for share manager failover.", sm.Name, holder) - lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Time{}} - _, err := c.ds.UpdateLease(lease) - if err != nil { - log.WithError(err).Warnf("Failed to update lease for share manager %v", sm.Name) - } + lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Time{}} + if _, err := c.ds.UpdateLease(lease); err != nil { + return err } - return err + return nil } // clearShareManagerLease just zeros out the renew time field preparatory to pod @@ -1429,17 +1439,27 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an // isShareManagerPodStale checks the associated lease CR to see whether the current pod (if any) // has fallen behind on renewing the lease. If there is any error finding out, we assume not stale. func (c *ShareManagerController) isShareManagerPodStale(sm *longhorn.ShareManager) (stale bool, holder string, err error) { + defer func() { + err = errors.Wrapf(err, "failed to check isShareManagerPodStale") + }() + log := getLoggerForShareManager(c.logger, sm) - if enabled, _ := c.ds.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover); !enabled { - // stale is false, holder is empty, err is nil - return + enabled, err := c.ds.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) + if err != nil { + return false, "", err + } + if !enabled { + return false, "", nil } leaseName := sm.Name lease, err := c.ds.GetLeaseRO(leaseName) if err != nil { - return + if !apierrors.IsNotFound(err) { + return false, "", err + } + return false, "", nil } // Consider it stale if @@ -1449,15 +1469,14 @@ func (c *ShareManagerController) isShareManagerPodStale(sm *longhorn.ShareManage // - the time of renewal is longer ago than the lease duration. holder = *lease.Spec.HolderIdentity if holder == "" { - return + return false, "", nil } if (lease.Spec.RenewTime).IsZero() { - // log.Warnf("Lease for %v held by %v has already been cleared for deletion", leaseName, holder) - return + return false, holder, nil } if *lease.Spec.LeaseDurationSeconds < shareManagerLeaseDurationSeconds { - // log.Warnf("Lease for %v has a crazy value for duration: %v seconds. Ignoring.", leaseName, *lease.Spec.LeaseDurationSeconds) - return + log.Warnf("Lease for %v has a crazy value for duration: %v seconds. Ignoring.", leaseName, *lease.Spec.LeaseDurationSeconds) + return false, holder, nil } expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) if time.Now().After(expireTime) { @@ -1465,7 +1484,7 @@ func (c *ShareManagerController) isShareManagerPodStale(sm *longhorn.ShareManage stale = true } - return + return stale, holder, nil } // isResponsibleFor in most controllers only checks if the node of the current owner is known @@ -1511,7 +1530,10 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo // Some node has to take over, and it might as well be this one, if another one // has not already. isStale, leaseHolder, err := c.isShareManagerPodStale(sm) - if err == nil && isStale { + if err != nil { + return false, err + } + if isStale { // Avoid race between nodes by checking for an existing interim owner. if leaseHolder != sm.Status.OwnerID && c.controllerID != sm.Status.OwnerID && @@ -1520,26 +1542,6 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo return false, nil } log.Infof("Interim owner %v taking responsibility for stale lease-holder %v", c.controllerID, leaseHolder) - - // TODO: move this to main sync loop - // This code should be moved from here to the sync loop, but it requires care. - // The first naive attempt caused repeated calls to cleanup and delete/create. - if err := c.markShareManagerDelinquent(sm); err != nil { - log.WithError(err).Warnf("Failed to update leease to set delinquent condition for %v", sm.Name) - } - - // TODO: polish this code - // Also, turn off the admission webhook on the suspected node. Trying to talk to it - // will delay any effort to modify resources. - if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetAdmissionWebhookLabel()); err != nil { - log.WithError(err).Warnf("Failed to turn off admission webhook/recovery backed on node %v", leaseHolder) - } - - //// TODO: polish this code - //if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, types.GetRecoveryBackendLabel()); err != nil { - // log.WithError(err).Warnf("Failed to turn off recovery backed on node %v", leaseHolder) - //} - return currentNodeSchedulable, nil } diff --git a/datastore/kubernetes.go b/datastore/kubernetes.go index 122f6cc185..952f47a84d 100644 --- a/datastore/kubernetes.go +++ b/datastore/kubernetes.go @@ -9,9 +9,9 @@ import ( "strings" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/version" @@ -282,30 +282,39 @@ func (s *DataStore) UpdateLease(lease *coordinationv1.Lease) (*coordinationv1.Le return s.kubeClient.CoordinationV1().Leases(s.namespace).Update(context.TODO(), lease, metav1.UpdateOptions{}) } -// IsRWXVolumeInFailover checks whether the volume has a lease by the same name, which an RWX volume should, +// IsRWXVolumeInDelinquent checks whether the volume has a lease by the same name, which an RWX volume should, // and whether that lease's spec shows that its holder is delinquent (its acquire time has been zeroed.) // If so, return the delinquent holder. // Any hiccup yields a return of "false". -func (s *DataStore) IsRWXVolumeInFailover(name string) (failover bool, holder string, err error) { - if enabled, _ := s.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover); !enabled { - // failover is false, holder is empty, err is nil - return +func (s *DataStore) IsRWXVolumeInDelinquent(name string) (isDelinquent bool, holder string, err error) { + defer func() { + err = errors.Wrapf(err, "failed to check IsRWXVolumeInDelinquent") + }() + + enabled, err := s.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) + if err != nil { + return false, "", err + } + if !enabled { + return false, "", nil } lease, err := s.GetLeaseRO(name) if err != nil { - logrus.WithError(err).Warnf("Failed to get lease for RWX volume %v", name) - return + if apierrors.IsNotFound(err) { + return false, "", nil + } + return false, "", err } holder = *lease.Spec.HolderIdentity if holder == "" { - return + return false, "", nil } if (lease.Spec.AcquireTime).IsZero() { - failover = true + isDelinquent = true } - return + return isDelinquent, holder, nil } // GetStorageClassRO gets StorageClass with the given name @@ -500,7 +509,7 @@ func (s *DataStore) GetManagerPodForNode(nodeName string) (*corev1.Pod, error) { return pod, nil } } - return nil, errors.NewNotFound(corev1.Resource("pod"), nodeName) + return nil, apierrors.NewNotFound(corev1.Resource("pod"), nodeName) } func (s *DataStore) AddLabelToManagerPod(nodeName string, label map[string]string) error { diff --git a/datastore/longhorn.go b/datastore/longhorn.go index 408c528d73..b0155bf445 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -2956,11 +2956,11 @@ func (s *DataStore) IsNodeDelinquent(nodeName string, volumeName string) (bool, } isRWX, _ := s.IsRegularRWXVolume(volumeName) if isRWX { - inFailover, delinquentNode, err := s.IsRWXVolumeInFailover(volumeName) + isDelinquent, delinquentNode, err := s.IsRWXVolumeInDelinquent(volumeName) if err != nil { return false, err } - if inFailover && delinquentNode == nodeName { + if isDelinquent && delinquentNode == nodeName { return true, nil } } diff --git a/types/types.go b/types/types.go index d3d651b60e..f8bf2cb5b3 100644 --- a/types/types.go +++ b/types/types.go @@ -1255,3 +1255,14 @@ func IsStorageNetworkForRWXVolume(storageNetwork *longhorn.Setting, isStorageNet } return storageNetwork.Value != CniNetworkNone && isStorageNetworkForRWXVolumeEnabled } + +func MergeStringMaps(baseMap, overwriteMap map[string]string) map[string]string { + result := map[string]string{} + for k, v := range baseMap { + result[k] = v + } + for k, v := range overwriteMap { + result[k] = v + } + return result +} diff --git a/upgrade/util/util.go b/upgrade/util/util.go index 9311db352d..cad3038e55 100644 --- a/upgrade/util/util.go +++ b/upgrade/util/util.go @@ -128,17 +128,6 @@ func ListManagerPods(namespace string, kubeClient *clientset.Clientset) ([]corev return managerPodsList.Items, nil } -func MergeStringMaps(baseMap, overwriteMap map[string]string) map[string]string { - result := map[string]string{} - for k, v := range baseMap { - result[k] = v - } - for k, v := range overwriteMap { - result[k] = v - } - return result -} - func GetCurrentLonghornVersion(namespace string, lhClient lhclientset.Interface) (string, error) { currentLHVersionSetting, err := lhClient.LonghornV1beta2().Settings(namespace).Get(context.TODO(), string(types.SettingNameCurrentLonghornVersion), metav1.GetOptions{}) if err != nil { From 47ed54076db383e0b1e27619264aff54d9e30b0c Mon Sep 17 00:00:00 2001 From: Phan Le Date: Fri, 19 Jul 2024 12:55:02 -0700 Subject: [PATCH 05/21] Code refinement: remove the logic to remember and add node affinity longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit c80dca332f0481513d64a5d3661ba2e1e9aad93c) --- controller/share_manager_controller.go | 68 ++++++++++++++------------ 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 0ae2dd61b7..9ab33635ef 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -51,8 +51,6 @@ type ShareManagerController struct { ds *datastore.DataStore cacheSyncs []cache.InformerSynced - - staleNodeMap map[string]string } func NewShareManagerController( @@ -82,8 +80,6 @@ func NewShareManagerController( eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-share-manager-controller"}), ds: ds, - - staleNodeMap: map[string]string{}, } var err error @@ -713,44 +709,50 @@ func (c *ShareManagerController) markShareManagerDelinquent(sm *longhorn.ShareMa // clearShareManagerLease just zeros out the renew time field preparatory to pod // cleanup, so it won't be flagged as stale in normal-path code. -func (c *ShareManagerController) clearShareManagerLease(sm *longhorn.ShareManager) error { +func (c *ShareManagerController) clearShareManagerLease(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to clearShareManagerLease") + }() + log := getLoggerForShareManager(c.logger, sm) lease, err := c.ds.GetLease(sm.Name) - if err != nil && !apierrors.IsNotFound(err) { - log.WithError(err).Warnf("Failed to retrieve lease for share manager %v from datastore", sm.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } return err } - if lease != nil { - holder := *lease.Spec.HolderIdentity + holder := *lease.Spec.HolderIdentity + if !lease.Spec.RenewTime.IsZero() { log.Infof("Clearing lease %v held by node %v for share manager pod cleanup.", sm.Name, holder) lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Time{}} - _, err := c.ds.UpdateLease(lease) - if err != nil { - log.WithError(err).Warnf("Failed to clear lease for share manager %v", sm.Name) + if _, err := c.ds.UpdateLease(lease); err != nil { + return err } } - return err + + return nil } -func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManager) error { +func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to cleanupShareManagerPod") + }() + log := getLoggerForShareManager(c.logger, sm) // Are we cleaning up after a lease timeout? - leaseExpired, leaseHolder, err := c.isShareManagerPodStale(sm) + isStale, leaseHolder, err := c.isShareManagerPodStale(sm) if err != nil { - log.WithError(err).Warnf("Failed to check isShareManagerPodStale(%v) when cleanupShareManagerPod", sm.Name) - } - if leaseExpired { - // Remember this node so we can avoid it in the new pod we will create. - c.staleNodeMap[sm.Name] = leaseHolder + return err } // Clear the lease so we won't try to act on apparent staleness. Staleness is now either dealt with or moot. err = c.clearShareManagerLease(sm) if err != nil { - log.WithError(err).Warnf("Failed to clear lease holder for share manager (%v) when cleanupShareManagerPod", sm.Name) + return err } podName := types.GetShareManagerPodNameFromShareManagerName(sm.Name) @@ -772,7 +774,7 @@ func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManage // Force delete if the pod's node is known dead, or likely so since it let // the lease time out and another node's controller is cleaning up after it. nodeFailed, _ := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName) - if nodeFailed || (leaseExpired && leaseHolder != c.controllerID) { + if nodeFailed || (isStale && leaseHolder != c.controllerID) { log.Info("Force deleting pod to allow fail over since node of share manager pod is down") gracePeriod := int64(0) err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}) @@ -792,6 +794,10 @@ func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManage // starting, running -> running (share ready to use) // controls transitions to running, error func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to syncShareManagerPod") + }() + defer func() { if sm.Status.State == longhorn.ShareManagerStateStopping || sm.Status.State == longhorn.ShareManagerStateStopped || @@ -837,11 +843,11 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) // A new pod will be recreated by the share manager controller. We might get an early warning of that by the pod going stale. isStale, _, err := c.isShareManagerPodStale(sm) if err != nil { - log.WithError(err).Warnf("Failed to check isShareManagerPodStale(%v) when syncShareManagerPod", sm.Name) - } else if isStale { - log.Infof("ShareManager Pod %v is stale", pod.Name) + return err } if isStale { + log.Warnf("ShareManager Pod %v is stale", pod.Name) + // if we just transitioned to the starting state, while the prior cleanup is still in progress we will switch to error state // which will lead to a bad loop of starting (new workload) -> error (remount) -> stopped (cleanup sm) if sm.Status.State == longhorn.ShareManagerStateStopping { @@ -1129,11 +1135,13 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager } } - staleNode := c.staleNodeMap[sm.Name] - if staleNode != "" { - log.Infof("Creating anti-affinity for share manager pod against stale node %v", staleNode) - affinity = c.addStaleNodeAntiAffinity(affinity, staleNode) - delete(c.staleNodeMap, sm.Name) + isDelinquent, delinquentNode, err := c.ds.IsRWXVolumeInDelinquent(sm.Name) + if err != nil { + return nil, err + } + if isDelinquent && delinquentNode != "" { + log.Infof("Creating anti-affinity for share manager pod against delinquent node %v", delinquentNode) + affinity = c.addStaleNodeAntiAffinity(affinity, delinquentNode) } fsType := pv.Spec.CSI.FSType From 7590d3feab16e456dd7ed419be7b952cfd0c49e9 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Fri, 19 Jul 2024 17:01:15 -0700 Subject: [PATCH 06/21] Code refinement: refactor ownership transfer for volume/engine/replica longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit 87004594ea4c325b4a7af739d7ef9a6b8698c0da) --- controller/engine_controller.go | 20 ++++++++++------- controller/replica_controller.go | 29 ++++++++++++++++-------- controller/volume_controller.go | 31 +++++++++++++------------- datastore/longhorn.go | 38 ++++++++++++++------------------ 4 files changed, 64 insertions(+), 54 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 1b05e223a9..469cbd275f 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -2284,14 +2284,18 @@ func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineIm err = errors.Wrap(err, "error while checking isResponsibleFor") }() - // If there is a share manager pod for this and it has an owner, engine should use that too. - if isRWX, _ := ec.ds.IsRegularRWXVolume(e.Spec.VolumeName); isRWX { - if isDelinquent, _ := ec.ds.IsNodeDownOrDeletedOrDelinquent(e.Status.OwnerID, e.Spec.VolumeName); isDelinquent { - pod, err := ec.ds.GetPodRO(e.Namespace, types.GetShareManagerPodNameFromShareManagerName(e.Spec.VolumeName)) - if err == nil && pod != nil { - return ec.controllerID == pod.Spec.NodeName, nil - - } + // If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod + isDelinquent, err := ec.ds.IsNodeDelinquent(e.Status.OwnerID, e.Spec.VolumeName) + if err != nil { + return false, err + } + if isDelinquent { + pod, err := ec.ds.GetPodRO(ec.namespace, types.GetShareManagerPodNameFromShareManagerName(e.Spec.VolumeName)) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + if pod != nil && ec.controllerID == pod.Spec.NodeName { + return true, nil } } diff --git a/controller/replica_controller.go b/controller/replica_controller.go index b9b631cbcb..d436d9e3cd 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -231,7 +231,11 @@ func (rc *ReplicaController) syncReplica(key string) (err error) { log := getLoggerForReplica(rc.logger, replica) - if !rc.isResponsibleFor(replica) { + isResponsible, err := rc.isResponsibleFor(replica) + if err != nil { + return err + } + if !isResponsible { return nil } if replica.Status.OwnerID != rc.controllerID { @@ -914,16 +918,23 @@ func (rc *ReplicaController) enqueueAllRebuildingReplicaOnCurrentNode() { } } -func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) bool { - if isRWX, _ := rc.ds.IsRegularRWXVolume(r.Spec.VolumeName); isRWX { - if isDelinquent, _ := rc.ds.IsNodeDownOrDeletedOrDelinquent(r.Status.OwnerID, r.Spec.VolumeName); isDelinquent { - pod, err := rc.ds.GetPodRO(r.Namespace, types.GetShareManagerPodNameFromShareManagerName(r.Spec.VolumeName)) - if err == nil && pod != nil { - return rc.controllerID == pod.Spec.NodeName - } +func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) (bool, error) { + // If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod + isDelinquent, err := rc.ds.IsNodeDelinquent(r.Status.OwnerID, r.Spec.VolumeName) + if err != nil { + return false, err + } + if isDelinquent { + pod, err := rc.ds.GetPodRO(rc.namespace, types.GetShareManagerPodNameFromShareManagerName(r.Spec.VolumeName)) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + if pod != nil && rc.controllerID == pod.Spec.NodeName { + return true, nil } } - return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID) + + return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID), nil } func hasMatchingReplica(replica *longhorn.Replica, replicas map[string]*longhorn.Replica) bool { diff --git a/controller/volume_controller.go b/controller/volume_controller.go index c1d5b0cc96..21823ae7ed 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -2041,11 +2041,11 @@ func (c *VolumeController) reconcileVolumeSize(v *longhorn.Volume, e *longhorn.E } func (c *VolumeController) canInstanceManagerLaunchReplica(r *longhorn.Replica) (bool, error) { - nodeDown, err := c.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, r.Spec.VolumeName) + isNodeDownOrDeletedOrDelinquent, err := c.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, r.Spec.VolumeName) if err != nil { - return false, errors.Wrapf(err, "fail to check IsNodeDownOrDeleted %v", r.Spec.NodeID) + return false, errors.Wrapf(err, "fail to check IsNodeDownOrDeletedOrDelinquent %v", r.Spec.NodeID) } - if nodeDown { + if isNodeDownOrDeletedOrDelinquent { return false, nil } // Replica already had IM @@ -4346,12 +4346,6 @@ func (c *VolumeController) IsReplicaUnavailable(r *longhorn.Replica) (bool, erro return true, nil } - if isRWX, _ := c.ds.IsRegularRWXVolume(r.Spec.VolumeName); isRWX { - if isDelinquent, _ := c.ds.IsNodeDelinquent(r.Spec.NodeID, r.Spec.VolumeName); isDelinquent { - return true, nil - } - } - node, err := c.ds.GetNodeRO(r.Spec.NodeID) if err != nil { return true, errors.Wrapf(err, "failed to get node %v for failed replica %v", r.Spec.NodeID, r.Name) @@ -4377,13 +4371,18 @@ func (c *VolumeController) isResponsibleFor(v *longhorn.Volume, defaultEngineIma err = errors.Wrap(err, "error while checking isResponsibleFor") }() - // If there is a share manager pod and it has an owner, we should use that too. - if isRWX := isRegularRWXVolume(v); isRWX { - if isDelinquent, _ := c.ds.IsNodeDownOrDeletedOrDelinquent(v.Status.OwnerID, v.Name); isDelinquent { - pod, err := c.ds.GetPodRO(v.Namespace, types.GetShareManagerPodNameFromShareManagerName(v.Name)) - if err == nil && pod != nil { - return c.controllerID == pod.Spec.NodeName, nil - } + // If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod + isDelinquent, err := c.ds.IsNodeDelinquent(v.Status.OwnerID, v.Name) + if err != nil { + return false, err + } + if isDelinquent { + pod, err := c.ds.GetPodRO(v.Namespace, types.GetShareManagerPodNameFromShareManagerName(v.Name)) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + if pod != nil && c.controllerID == pod.Spec.NodeName { + return true, nil } } diff --git a/datastore/longhorn.go b/datastore/longhorn.go index b0155bf445..32d9311095 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -965,15 +965,12 @@ func (s *DataStore) ListVolumes() (map[string]*longhorn.Volume, error) { func (s *DataStore) IsRegularRWXVolume(volumeName string) (bool, error) { v, err := s.GetVolumeRO(volumeName) if err != nil { - if !apierrors.IsNotFound(err) { - return false, err + if apierrors.IsNotFound(err) { + return false, nil } + return false, err } - - if v != nil && v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable { - return true, nil - } - return false, nil + return v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable, nil } func MarshalLabelToVolumeRecurringJob(labels map[string]string) map[string]*longhorn.VolumeRecurringJob { @@ -2947,24 +2944,23 @@ func (s *DataStore) IsNodeDownOrDeleted(name string) (bool, error) { // IsNodeDelinquent checks an early-warning condition of Lease expiration // that is of interest to share-manager types. func (s *DataStore) IsNodeDelinquent(nodeName string, volumeName string) (bool, error) { - if nodeName == "" { - return false, errors.New("no node name provided to IsNodeDelinquent") + if nodeName == "" || volumeName == "" { + return false, nil } - if volumeName == "" { - return false, errors.New("no volume name provided to IsNodeDelinquent") + isRWX, err := s.IsRegularRWXVolume(volumeName) + if err != nil { + return false, err } - isRWX, _ := s.IsRegularRWXVolume(volumeName) - if isRWX { - isDelinquent, delinquentNode, err := s.IsRWXVolumeInDelinquent(volumeName) - if err != nil { - return false, err - } - if isDelinquent && delinquentNode == nodeName { - return true, nil - } + if !isRWX { + return false, nil } - return false, nil + + isDelinquent, delinquentNode, err := s.IsRWXVolumeInDelinquent(volumeName) + if err != nil { + return false, err + } + return isDelinquent && delinquentNode == nodeName, nil } // IsNodeDownOrDeletedOrDelinquent gets Node for the given name and checks From 91eb0e64af684858a154d427f7575226f7848ed8 Mon Sep 17 00:00:00 2001 From: James Munson Date: Wed, 17 Jul 2024 13:49:49 -0600 Subject: [PATCH 07/21] Fix spelling typo. Add a debug output. Signed-off-by: James Munson (cherry picked from commit bf151e7a98936d2aeda14d787f0b9c9f757b2d8e) --- controller/node_controller.go | 2 +- controller/share_manager_controller.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/controller/node_controller.go b/controller/node_controller.go index bd218ca34b..99b661f4c5 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -522,7 +522,7 @@ func (nc *NodeController) syncNode(key string) (err error) { // Getting here is enough proof of life to Turn on the webhook endpoint, // if it has been turned off for RWX failover. if err := nc.ds.AddLabelToManagerPod(node.Name, types.GetAdmissionWebhookLabel()); err != nil { - log.Warnf("Node %v faied to restore its admission webhook", node.Name) + log.Warnf("Node %v failed to restore its admission webhook", node.Name) } // TODO: polish this code diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 9ab33635ef..9821c14aa8 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -904,6 +904,7 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) } if !allContainersReady { + log.Infof("Share manager pod %v not all containers ready, requeuing with sharem manager in state : %v", sm.Name, sm.Status.State) c.enqueueShareManager(sm) } else if sm.Status.State == longhorn.ShareManagerStateStarting { sm.Status.State = longhorn.ShareManagerStateRunning @@ -911,6 +912,7 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) sm.Status.State = longhorn.ShareManagerStateError } default: + log.Infof("Share manager pod %v in unexpected phase: %v, setting sharemanager to error state.", sm.Name, pod.Status.Phase) sm.Status.State = longhorn.ShareManagerStateError } From 0ec3bc712ffe59b7903e14d771ba6d6d374feb62 Mon Sep 17 00:00:00 2001 From: James Munson Date: Fri, 19 Jul 2024 18:51:18 -0600 Subject: [PATCH 08/21] Add envariables to set config params in share-manager pod. Signed-off-by: James Munson (cherry picked from commit a79c2e897e33289138ff6ade2f253746e7fc4ff8) --- controller/setting_controller.go | 2 + controller/share_manager_controller.go | 60 +++++++++++++++++++++----- types/setting.go | 2 +- 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/controller/setting_controller.go b/controller/setting_controller.go index 3b31150469..aa8da9d552 100644 --- a/controller/setting_controller.go +++ b/controller/setting_controller.go @@ -943,6 +943,7 @@ func (sc *SettingController) updateKubernetesClusterAutoscalerEnabled() error { return nil } +/* func (sc *SettingController) cleanupShareManagerServiceAndEndpoints() error { var err error defer func() { @@ -974,6 +975,7 @@ func (sc *SettingController) cleanupShareManagerServiceAndEndpoints() error { return nil } +*/ // updateCNI deletes all system-managed data plane components immediately with the updated CNI annotation. func (sc *SettingController) updateCNI(funcPreupdate func() error) error { diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 9821c14aa8..c2ff60f050 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -38,6 +38,12 @@ import ( const shareManagerLeaseDurationSeconds = 7 // This should be slightly more than twice the share-manager lease renewal interval. +type nfsServerConfig struct { + enableFastFailover bool + leaseLifetime int + gracePeriod int +} + type ShareManagerController struct { *baseController @@ -1089,15 +1095,33 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager return nil, errors.Wrapf(err, "failed to create service and endpoint for share manager %v", sm.Name) } - // likewise for the lease - if _, err := c.ds.GetLeaseRO(sm.Name); err != nil { - if !apierrors.IsNotFound(err) { - return nil, errors.Wrapf(err, "failed to get lease for share manager %v", sm.Name) + nfsConfig := &nfsServerConfig{ + enableFastFailover: false, + leaseLifetime: 60, + gracePeriod: 90, + } + + enabled, err := c.ds.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) + if err != nil { + return nil, err + } + if enabled { + nfsConfig = &nfsServerConfig{ + enableFastFailover: true, + leaseLifetime: 20, + gracePeriod: 30, } - if _, err = c.ds.CreateLease(c.createLeaseManifest(sm)); err != nil { - return nil, errors.Wrapf(err, "failed to create lease for share manager %v", sm.Name) + if _, err := c.ds.GetLeaseRO(sm.Name); err != nil { + if !apierrors.IsNotFound(err) { + return nil, errors.Wrapf(err, "failed to get lease for share manager %v", sm.Name) + } + + if _, err = c.ds.CreateLease(c.createLeaseManifest(sm)); err != nil { + return nil, errors.Wrapf(err, "failed to create lease for share manager %v", sm.Name) + } } + } volume, err := c.ds.GetVolume(sm.Name) @@ -1171,7 +1195,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager } manifest := c.createPodManifest(sm, annotations, tolerations, affinity, imagePullPolicy, nil, registrySecret, - priorityClass, nodeSelector, fsType, mountOptions, cryptoKey, cryptoParams) + priorityClass, nodeSelector, fsType, mountOptions, cryptoKey, cryptoParams, nfsConfig) storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork) if err != nil { @@ -1290,7 +1314,8 @@ func (c *ShareManagerController) createLeaseManifest(sm *longhorn.ShareManager) func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, annotations map[string]string, tolerations []corev1.Toleration, affinity *corev1.Affinity, pullPolicy corev1.PullPolicy, resourceReq *corev1.ResourceRequirements, registrySecret, priorityClass string, - nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams) *corev1.Pod { + nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams, + nfsConfig *nfsServerConfig) *corev1.Pod { // command args for the share-manager args := []string{"--debug", "daemon", "--volume", sm.Name} @@ -1345,9 +1370,24 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an }, } + podSpec.Spec.Containers[0].Env = []corev1.EnvVar{ + { + Name: "FAST_FAILOVER", + Value: fmt.Sprint(nfsConfig.enableFastFailover), + }, + { + Name: "LEASE_LIFETIME", + Value: fmt.Sprint(nfsConfig.leaseLifetime), + }, + { + Name: "GRACE_PERIOD", + Value: fmt.Sprint(nfsConfig.gracePeriod), + }, + } + // this is an encrypted volume the cryptoKey is base64 encoded if len(cryptoKey) > 0 { - podSpec.Spec.Containers[0].Env = []corev1.EnvVar{ + podSpec.Spec.Containers[0].Env = append(podSpec.Spec.Containers[0].Env, []corev1.EnvVar{ { Name: "ENCRYPTED", Value: "True", @@ -1372,7 +1412,7 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an Name: "CRYPTOPBKDF", Value: string(cryptoParams.GetPBKDF()), }, - } + }...) } podSpec.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{ diff --git a/types/setting.go b/types/setting.go index 9a88ed04bb..0db5fc6105 100644 --- a/types/setting.go +++ b/types/setting.go @@ -1489,7 +1489,7 @@ var ( SettingDefinitionEnableShareManagerFastFailover = SettingDefinition{ DisplayName: "Enable Share Manager Fast Failover", Description: "Turn on logic to detect and move stale RWX volumes quickly (Experimental)", - Category: SettingCategoryDangerZone, + Category: SettingCategoryGeneral, Type: SettingTypeBool, Required: true, ReadOnly: false, From e1c8c944fb09c6fd73d4f0d59837cb5a6299cfa6 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Fri, 19 Jul 2024 18:21:56 -0700 Subject: [PATCH 09/21] Code refinement: refactor engine/replica controller to try to delete the engine/replica once before skipping the deletion longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit f9e3eada9744c6ac70480f5bb53aca158f21f943) --- controller/engine_controller.go | 27 ++++++++++++--------------- controller/replica_controller.go | 17 +++++++++-------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 469cbd275f..d9e3ff27a4 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -622,18 +622,9 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { } } - // If the node is unreachable, don't bother with the successive timeouts we would spend attempting to contact - // its client proxy to delete the engine. - if isRWXVolume { - isDelinquent, _ := ec.ds.IsNodeDelinquent(im.Spec.NodeID, e.Spec.VolumeName) - if isDelinquent { - log.Infof("Skipping deleting RWX engine %v since IM node %v is delinquent", e.Name, im.Spec.NodeID) - if e.Spec.NodeID != "" { - log.Infof("Clearing delinquent nodeID for RWX engine %v", e.Name) - e.Spec.NodeID = "" - } - return nil - } + isDelinquent, err := ec.ds.IsNodeDelinquent(im.Spec.NodeID, e.Spec.VolumeName) + if err != nil { + return err } log.Info("Deleting engine instance") @@ -642,10 +633,10 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { if err != nil { log.WithError(err).Warnf("Failed to delete engine %v", e.Name) } - if isRWXVolume && im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + if isRWXVolume && (im.Status.CurrentState != longhorn.InstanceManagerStateRunning || isDelinquent) { // Try the best to delete engine instance. // To prevent that the volume is stuck at detaching state, ignore the error when volume is - // a RWX volume and the instance manager is not running. + // a RWX volume and the instance manager is not running or the RWX volume is currently delinquent // // If the engine instance of a RWX volume is not deleted successfully: // If a RWX volume is on node A and the network of this node is partitioned, @@ -655,7 +646,13 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { // After shifting to node A, the first reattachment fail due to the IO error resulting from the // orphaned engine instance and block device. Then, the detachment will trigger the teardown of the // problematic engine instance and block device. The next reattachment then will succeed. - log.Warnf("Ignored the failure of deleting engine %v", e.Name) + if im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + log.Warnf("Ignored the failure of deleting engine %v because im.Status.CurrentState is %v", e.Name, im.Status.CurrentState) + } + if isDelinquent { + log.Warnf("Ignored the failure of deleting engine %v because the RWX volume is currently delinquent", e.Name) + } + err = nil } }() diff --git a/controller/replica_controller.go b/controller/replica_controller.go index d436d9e3cd..4b6cb42952 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -557,19 +557,20 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error { return nil } - isRWXVolume, err := rc.ds.IsRegularRWXVolume(r.Spec.VolumeName) + isDelinquent, err := rc.ds.IsNodeDelinquent(im.Spec.NodeID, r.Spec.VolumeName) if err != nil { return err } - // If the node is unreachable, don't bother with the successive timeouts we would spend attempting to contact - // its client proxy to delete the engine. - if isRWXVolume { - isDelinquent, _ := rc.ds.IsNodeDelinquent(im.Spec.NodeID, r.Spec.VolumeName) + + defer func() { + if err != nil { + log.WithError(err).Warnf("Failed to delete replica process %v", r.Name) + } if isDelinquent { - log.Infof("Skipping deleting RWX replica %v since IM node %v is delinquent", r.Name, im.Spec.NodeID) - return nil + log.Warnf("Ignored the failure of deleting replica process %v because the RWX volume is currently delinquent", r.Name) + err = nil } - } + }() c, err := engineapi.NewInstanceManagerClient(im) if err != nil { From 7f9fdacb145cde034301d94ccdcf4ad123018264 Mon Sep 17 00:00:00 2001 From: James Munson Date: Fri, 19 Jul 2024 19:38:46 -0600 Subject: [PATCH 10/21] Update k8s/pkg/apis/longhorn/v1beta2/node.go Co-authored-by: Eric Weber Signed-off-by: James Munson (cherry picked from commit 820e59733e31bfece59255573079234885477f64) --- k8s/pkg/apis/longhorn/v1beta2/node.go | 1 - 1 file changed, 1 deletion(-) diff --git a/k8s/pkg/apis/longhorn/v1beta2/node.go b/k8s/pkg/apis/longhorn/v1beta2/node.go index 6a823b9491..7943543bbf 100644 --- a/k8s/pkg/apis/longhorn/v1beta2/node.go +++ b/k8s/pkg/apis/longhorn/v1beta2/node.go @@ -26,7 +26,6 @@ const ( NodeConditionReasonKernelConfigIsNotFound = "KernelConfigIsNotFound" NodeConditionReasonNFSClientIsNotFound = "NFSClientIsNotFound" NodeConditionReasonKubernetesNodeCordoned = "KubernetesNodeCordoned" - NodeConditionReasonMissedLeaseRenewal = "MissedLeaseRenewal" ) const ( From 139d9a0d40d668bc1652ce7cc748eee882cd312b Mon Sep 17 00:00:00 2001 From: Phan Le Date: Fri, 19 Jul 2024 18:21:56 -0700 Subject: [PATCH 11/21] Code refinement: refactor engine/replica controller to try to delete the engine/replica once before skipping the deletion longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit ef0ae5d05313b315e9c529c2976fb8ad32bfa7d8) --- controller/replica_controller.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/controller/replica_controller.go b/controller/replica_controller.go index 4b6cb42952..b727200cd9 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -514,7 +514,7 @@ func (rc *ReplicaController) CanStartRebuildingReplica(r *longhorn.Replica) (boo return true, nil } -func (rc *ReplicaController) DeleteInstance(obj interface{}) error { +func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) { r, ok := obj.(*longhorn.Replica) if !ok { return fmt.Errorf("invalid object for replica instance deletion: %v", obj) @@ -528,7 +528,6 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error { } var im *longhorn.InstanceManager - var err error // Not assigned or not updated, try best to delete if r.Status.InstanceManagerName == "" { if r.Spec.NodeID == "" { From 5460f0cab447435cf29423a7403c5fb1473ff6f5 Mon Sep 17 00:00:00 2001 From: James Munson Date: Fri, 19 Jul 2024 20:26:27 -0600 Subject: [PATCH 12/21] Get rid of babbling debug log line. Signed-off-by: James Munson (cherry picked from commit 3627cf34c13e9db2d15a2f7b11d39c55247f9569) --- controller/share_manager_controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index c2ff60f050..00f688d7d5 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -910,7 +910,6 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) } if !allContainersReady { - log.Infof("Share manager pod %v not all containers ready, requeuing with sharem manager in state : %v", sm.Name, sm.Status.State) c.enqueueShareManager(sm) } else if sm.Status.State == longhorn.ShareManagerStateStarting { sm.Status.State = longhorn.ShareManagerStateRunning From 6df97c79e5a39e31cd83f41acea54428a6c3008a Mon Sep 17 00:00:00 2001 From: James Munson Date: Sun, 21 Jul 2024 15:49:39 -0600 Subject: [PATCH 13/21] Fix up the recovery backend service selector. Signed-off-by: James Munson (cherry picked from commit 58fc2e7659ebab505ecd23ed9b45c20cd5b84904) --- controller/node_controller.go | 15 ++++++--------- types/types.go | 5 +++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/controller/node_controller.go b/controller/node_controller.go index 99b661f4c5..54d658da4c 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -519,15 +519,12 @@ func (nc *NodeController) syncNode(key string) (err error) { return nil } - // Getting here is enough proof of life to Turn on the webhook endpoint, - // if it has been turned off for RWX failover. - if err := nc.ds.AddLabelToManagerPod(node.Name, types.GetAdmissionWebhookLabel()); err != nil { - log.Warnf("Node %v failed to restore its admission webhook", node.Name) - } - - // TODO: polish this code - if err := nc.ds.AddLabelToManagerPod(node.Name, types.GetRecoveryBackendLabel()); err != nil { - log.Warnf("Node %v faied to restore its recovery backend", node.Name) + // Getting here is enough proof of life to turn on the services that might + // have been turned off for RWX failover. + labels := types.MergeStringMaps(types.GetAdmissionWebhookLabel(), types.GetRecoveryBackendLabel()) + if err := nc.ds.AddLabelToManagerPod(node.Name, labels); err != nil { + log.Warnf("Node %v failed to restore its admission webhook and recovery backend", node.Name) + return err } // Create a monitor for collecting disk information diff --git a/types/types.go b/types/types.go index f8bf2cb5b3..f4059923ca 100644 --- a/types/types.go +++ b/types/types.go @@ -177,6 +177,8 @@ const ( LonghornLabelAdmissionWebhook = "admission-webhook" LonghornLabelConversionWebhook = "conversion-webhook" + LonghornRecoveryBackendServiceName = "longhorn-recovery-backend" + LonghornLabelValueEnabled = "enabled" LonghornLabelValueIgnored = "ignored" @@ -427,13 +429,12 @@ func GetManagerLabels() map[string]string { func GetAdmissionWebhookLabel() map[string]string { return map[string]string{ GetLonghornLabelKey(LonghornLabelAdmissionWebhook): AdmissionWebhookServiceName, - "longhorn.io/component": "longhorn-recovery-backend", } } func GetRecoveryBackendLabel() map[string]string { return map[string]string{ - "longhorn.io/component": "longhorn-recovery-backend", + GetLonghornLabelKey(LonghornLabelRecoveryBackend): LonghornRecoveryBackendServiceName, } } From 29432bc50501f25c08d54c035d81c7d2cfa23067 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Mon, 22 Jul 2024 11:46:24 -0700 Subject: [PATCH 14/21] Reenable the RWX service deletion logic from ChinYa longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit d0787ab9e35701ebbf3cb8bc05464e3f268c11fb) --- app/daemon.go | 2 -- controller/setting_controller.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/app/daemon.go b/app/daemon.go index 3395ec14ad..afd62d624f 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -206,8 +206,6 @@ func startManager(c *cli.Context) error { return err } - // TODO: polish this code - // longhorn.io/component: longhorn-recovery-backend if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetRecoveryBackendLabel()); err != nil { return err } diff --git a/controller/setting_controller.go b/controller/setting_controller.go index aa8da9d552..3b31150469 100644 --- a/controller/setting_controller.go +++ b/controller/setting_controller.go @@ -943,7 +943,6 @@ func (sc *SettingController) updateKubernetesClusterAutoscalerEnabled() error { return nil } -/* func (sc *SettingController) cleanupShareManagerServiceAndEndpoints() error { var err error defer func() { @@ -975,7 +974,6 @@ func (sc *SettingController) cleanupShareManagerServiceAndEndpoints() error { return nil } -*/ // updateCNI deletes all system-managed data plane components immediately with the updated CNI annotation. func (sc *SettingController) updateCNI(funcPreupdate func() error) error { From 3576af4e90f04516c4eff1b6d5eafbfb17e89363 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Mon, 22 Jul 2024 12:06:09 -0700 Subject: [PATCH 15/21] Code refinement: remove debugging comments longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit c5d879638038ba8a0aadf43721c242dcf396f965) --- controller/instance_handler.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/controller/instance_handler.go b/controller/instance_handler.go index 47e2693bef..efcd8aaf2f 100644 --- a/controller/instance_handler.go +++ b/controller/instance_handler.go @@ -60,13 +60,6 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan if im != nil { isDelinquent, _ = h.ds.IsNodeDelinquent(im.Spec.NodeID, spec.VolumeName) } - imName := "nil" - nodeName := "empty" - if im != nil { - imName = im.Name - nodeName = im.Spec.NodeID - } - logrus.Infof("==================> instanceName: %v -------- isDelinquent: %v --------- im: %v ------------ nodeName: %v", instanceName, isDelinquent, imName, nodeName) if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown || isDelinquent { if status.Started { From ccddd1434b720a7a502f135a2c1039d1376a3f3f Mon Sep 17 00:00:00 2001 From: James Munson Date: Mon, 22 Jul 2024 13:51:00 -0600 Subject: [PATCH 16/21] Update controller/engine_controller.go Co-authored-by: Eric Weber Signed-off-by: James Munson (cherry picked from commit 1cdb62772b224d2a90acad6c85dc3afef9e142b2) --- controller/engine_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index d9e3ff27a4..48b11bb4ba 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -636,7 +636,7 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { if isRWXVolume && (im.Status.CurrentState != longhorn.InstanceManagerStateRunning || isDelinquent) { // Try the best to delete engine instance. // To prevent that the volume is stuck at detaching state, ignore the error when volume is - // a RWX volume and the instance manager is not running or the RWX volume is currently delinquent + // a RWX volume and the instance manager is not running or the RWX volume is currently delinquent. // // If the engine instance of a RWX volume is not deleted successfully: // If a RWX volume is on node A and the network of this node is partitioned, From 0ed72bc8085a2e0edaa3d74bb63e3b8feb836f88 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Mon, 22 Jul 2024 12:52:41 -0700 Subject: [PATCH 17/21] Remove pod infomer in engine controller and fix some NIT for Eric's comments longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit db97d371472ebf7a4221586b788620bae5d2f3ce) --- controller/engine_controller.go | 12 ------------ controller/share_manager_controller.go | 4 ++-- datastore/kubernetes.go | 6 +++--- datastore/longhorn.go | 2 +- 4 files changed, 6 insertions(+), 18 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 48b11bb4ba..386e89b6cb 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -187,18 +187,6 @@ func NewEngineController( } ec.cacheSyncs = append(ec.cacheSyncs, ds.InstanceManagerInformer.HasSynced) - if _, err = ds.PodInformer.AddEventHandlerWithResyncPeriod(cache.FilteringResourceEventHandler{ - FilterFunc: isShareManagerPod, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: ec.enqueueShareManagerPodChange, - UpdateFunc: func(old, cur interface{}) { ec.enqueueShareManagerPodChange(cur) }, - DeleteFunc: ec.enqueueShareManagerPodChange, - }, - }, 0); err != nil { - return nil, err - } - ec.cacheSyncs = append(ec.cacheSyncs, ds.PodInformer.HasSynced) - return ec, nil } diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 00f688d7d5..08ff26d703 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -371,7 +371,7 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { return err } if isStale { - isDelinquent, _, err := c.ds.IsRWXVolumeInDelinquent(sm.Name) + isDelinquent, _, err := c.ds.IsRWXVolumeDelinquent(sm.Name) if err != nil { return err } @@ -1160,7 +1160,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager } } - isDelinquent, delinquentNode, err := c.ds.IsRWXVolumeInDelinquent(sm.Name) + isDelinquent, delinquentNode, err := c.ds.IsRWXVolumeDelinquent(sm.Name) if err != nil { return nil, err } diff --git a/datastore/kubernetes.go b/datastore/kubernetes.go index 952f47a84d..f9dbd331e5 100644 --- a/datastore/kubernetes.go +++ b/datastore/kubernetes.go @@ -282,13 +282,13 @@ func (s *DataStore) UpdateLease(lease *coordinationv1.Lease) (*coordinationv1.Le return s.kubeClient.CoordinationV1().Leases(s.namespace).Update(context.TODO(), lease, metav1.UpdateOptions{}) } -// IsRWXVolumeInDelinquent checks whether the volume has a lease by the same name, which an RWX volume should, +// IsRWXVolumeDelinquent checks whether the volume has a lease by the same name, which an RWX volume should, // and whether that lease's spec shows that its holder is delinquent (its acquire time has been zeroed.) // If so, return the delinquent holder. // Any hiccup yields a return of "false". -func (s *DataStore) IsRWXVolumeInDelinquent(name string) (isDelinquent bool, holder string, err error) { +func (s *DataStore) IsRWXVolumeDelinquent(name string) (isDelinquent bool, holder string, err error) { defer func() { - err = errors.Wrapf(err, "failed to check IsRWXVolumeInDelinquent") + err = errors.Wrapf(err, "failed to check IsRWXVolumeDelinquent") }() enabled, err := s.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) diff --git a/datastore/longhorn.go b/datastore/longhorn.go index 32d9311095..41951c4ddf 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -2956,7 +2956,7 @@ func (s *DataStore) IsNodeDelinquent(nodeName string, volumeName string) (bool, return false, nil } - isDelinquent, delinquentNode, err := s.IsRWXVolumeInDelinquent(volumeName) + isDelinquent, delinquentNode, err := s.IsRWXVolumeDelinquent(volumeName) if err != nil { return false, err } From 8e38ad228ff61153a25fa5e83c0c7979a9b13c3a Mon Sep 17 00:00:00 2001 From: Phan Le Date: Mon, 22 Jul 2024 13:06:21 -0700 Subject: [PATCH 18/21] Remove enqueueShareManagerPodChange longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit 55677bd55dcb658b5a2842d726c4cde9169c697d) --- controller/engine_controller.go | 37 --------------------------------- 1 file changed, 37 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 386e89b6cb..9703a77386 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -458,43 +458,6 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) { } } -func (ec *EngineController) enqueueShareManagerPodChange(obj interface{}) { - pod, isPod := obj.(*corev1.Pod) - if !isPod { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("received unexpected obj: %#v", obj)) - return - } - - // use the last known state, to enqueue, dependent objects - pod, ok = deletedState.Obj.(*corev1.Pod) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert DeletedFinalStateUnknown to ShareManager Pod object: %#v", deletedState.Obj)) - return - } - } - - engineMap := map[string]*longhorn.Engine{} - - smName := pod.Labels[types.GetLonghornLabelKey(types.LonghornLabelShareManager)] - - es, err := ec.ds.ListEnginesRO() - if err != nil { - ec.logger.WithError(err).Warn("Failed to list engines") - } - for _, e := range es { - // Volume name is the same as share manager name. - if e.Spec.VolumeName == smName { - engineMap[e.Name] = e - } - } - - for _, e := range engineMap { - ec.enqueueEngine(e) - } -} - func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { e, ok := obj.(*longhorn.Engine) if !ok { From 790ff49004d2578ad8e27b289100c422c0831891 Mon Sep 17 00:00:00 2001 From: Phan Le Date: Mon, 22 Jul 2024 13:41:04 -0700 Subject: [PATCH 19/21] Revert share manager pod informer back to share manager CR informer longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit a86805327f8e60325b5857ac1391805e9ee8a024) --- controller/volume_controller.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/controller/volume_controller.go b/controller/volume_controller.go index 21823ae7ed..b2956d02ff 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -152,17 +152,14 @@ func NewVolumeController( } c.cacheSyncs = append(c.cacheSyncs, ds.ReplicaInformer.HasSynced) - if _, err = ds.PodInformer.AddEventHandlerWithResyncPeriod(cache.FilteringResourceEventHandler{ - FilterFunc: isShareManagerPod, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueVolumesForShareManager, - UpdateFunc: func(old, cur interface{}) { c.enqueueVolumesForShareManager(cur) }, - DeleteFunc: c.enqueueVolumesForShareManager, - }, + if _, err = ds.ShareManagerInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueVolumesForShareManager, + UpdateFunc: func(old, cur interface{}) { c.enqueueVolumesForShareManager(cur) }, + DeleteFunc: c.enqueueVolumesForShareManager, }, 0); err != nil { return nil, err } - c.cacheSyncs = append(c.cacheSyncs, ds.PodInformer.HasSynced) + c.cacheSyncs = append(c.cacheSyncs, ds.ShareManagerInformer.HasSynced) if _, err = ds.BackupVolumeInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, cur interface{}) { c.enqueueVolumesForBackupVolume(cur) }, @@ -4446,8 +4443,8 @@ func (c *VolumeController) deleteEngine(e *longhorn.Engine, es map[string]*longh // enqueueVolumesForShareManager enqueues all volumes that are currently claimed by this share manager func (c *VolumeController) enqueueVolumesForShareManager(obj interface{}) { - pod, isPod := obj.(*corev1.Pod) - if !isPod { + sm, isShareManager := obj.(*longhorn.ShareManager) + if !isShareManager { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("received unexpected obj: %#v", obj)) @@ -4455,16 +4452,16 @@ func (c *VolumeController) enqueueVolumesForShareManager(obj interface{}) { } // use the last known state, to requeue the claimed volumes - pod, ok = deletedState.Obj.(*corev1.Pod) + sm, ok = deletedState.Obj.(*longhorn.ShareManager) if !ok { - utilruntime.HandleError(fmt.Errorf("DeletedFinalStateUnknown contained non ShareManager Pod object: %#v", deletedState.Obj)) + utilruntime.HandleError(fmt.Errorf("DeletedFinalStateUnknown contained non ShareManager object: %#v", deletedState.Obj)) return } } - // sharemanager name is the same as volume name. - smName := pod.Labels[types.GetLonghornLabelKey(types.LonghornLabelShareManager)] - key := pod.Namespace + "/" + smName + // we can queue the key directly since a share manager only manages a single volume from it's own namespace + // and there is no need for us to retrieve the whole object, since we already know the volume name + key := sm.Namespace + "/" + sm.Name c.queue.Add(key) } From a99a20f9beec5c33ffcfd4906d3b7ef6e1e0fece Mon Sep 17 00:00:00 2001 From: James Munson Date: Mon, 22 Jul 2024 15:12:10 -0600 Subject: [PATCH 20/21] Rename RWX volume fast failover setting. Signed-off-by: James Munson (cherry picked from commit fc5dfe1b2beaf8b0a0662c1179e5acf13154b6a2) --- controller/share_manager_controller.go | 4 ++-- datastore/kubernetes.go | 2 +- types/setting.go | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 08ff26d703..2a0c9ee2ed 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -1100,7 +1100,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager gracePeriod: 90, } - enabled, err := c.ds.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) + enabled, err := c.ds.GetSettingAsBool(types.SettingNameRWXVolumeFastFailover) if err != nil { return nil, err } @@ -1494,7 +1494,7 @@ func (c *ShareManagerController) isShareManagerPodStale(sm *longhorn.ShareManage log := getLoggerForShareManager(c.logger, sm) - enabled, err := c.ds.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) + enabled, err := c.ds.GetSettingAsBool(types.SettingNameRWXVolumeFastFailover) if err != nil { return false, "", err } diff --git a/datastore/kubernetes.go b/datastore/kubernetes.go index f9dbd331e5..f9120760cf 100644 --- a/datastore/kubernetes.go +++ b/datastore/kubernetes.go @@ -291,7 +291,7 @@ func (s *DataStore) IsRWXVolumeDelinquent(name string) (isDelinquent bool, holde err = errors.Wrapf(err, "failed to check IsRWXVolumeDelinquent") }() - enabled, err := s.GetSettingAsBool(types.SettingNameEnableShareManagerFastFailover) + enabled, err := s.GetSettingAsBool(types.SettingNameRWXVolumeFastFailover) if err != nil { return false, "", err } diff --git a/types/setting.go b/types/setting.go index 0db5fc6105..37b825aa0a 100644 --- a/types/setting.go +++ b/types/setting.go @@ -136,7 +136,7 @@ const ( SettingNameFreezeFilesystemForSnapshot = SettingName("freeze-filesystem-for-snapshot") SettingNameAutoCleanupSnapshotWhenDeleteBackup = SettingName("auto-cleanup-when-delete-backup") SettingNameDefaultMinNumberOfBackingImageCopies = SettingName("default-min-number-of-backing-image-copies") - SettingNameEnableShareManagerFastFailover = SettingName("enable-share-manager-fast-failover") + SettingNameRWXVolumeFastFailover = SettingName("rwx-volume-fast-failover") ) var ( @@ -229,7 +229,7 @@ var ( SettingNameFreezeFilesystemForSnapshot, SettingNameAutoCleanupSnapshotWhenDeleteBackup, SettingNameDefaultMinNumberOfBackingImageCopies, - SettingNameEnableShareManagerFastFailover, + SettingNameRWXVolumeFastFailover, } ) @@ -350,7 +350,7 @@ var ( SettingNameFreezeFilesystemForSnapshot: SettingDefinitionFreezeFilesystemForSnapshot, SettingNameAutoCleanupSnapshotWhenDeleteBackup: SettingDefinitionAutoCleanupSnapshotWhenDeleteBackup, SettingNameDefaultMinNumberOfBackingImageCopies: SettingDefinitionDefaultMinNumberOfBackingImageCopies, - SettingNameEnableShareManagerFastFailover: SettingDefinitionEnableShareManagerFastFailover, + SettingNameRWXVolumeFastFailover: SettingDefinitionRWXVolumeFastFailover, } SettingDefinitionBackupTarget = SettingDefinition{ @@ -1486,8 +1486,8 @@ var ( }, } - SettingDefinitionEnableShareManagerFastFailover = SettingDefinition{ - DisplayName: "Enable Share Manager Fast Failover", + SettingDefinitionRWXVolumeFastFailover = SettingDefinition{ + DisplayName: "RWX Volume Fast Failover", Description: "Turn on logic to detect and move stale RWX volumes quickly (Experimental)", Category: SettingCategoryGeneral, Type: SettingTypeBool, From cdd2a969bde8eef24e4b7cecd85d9b3a6c493e4b Mon Sep 17 00:00:00 2001 From: Phan Le Date: Mon, 22 Jul 2024 19:52:38 -0700 Subject: [PATCH 21/21] Fix Derek's review comments longhorn-6205 Signed-off-by: Phan Le (cherry picked from commit 1b5cafd686223af62eb1d24d523384d898d2809a) --- app/daemon.go | 5 ++++- controller/node_controller.go | 2 +- controller/replica_controller.go | 8 ++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/app/daemon.go b/app/daemon.go index afd62d624f..b7d7a1be09 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -174,7 +174,10 @@ func startManager(c *cli.Context) error { } // This adds the label for the conversion webhook's selector. We do it the hard way without datastore to avoid chicken-and-egg. - pod, _ := clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Get(context.Background(), podName, v1.GetOptions{}) + pod, err := clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Get(context.Background(), podName, v1.GetOptions{}) + if err != nil { + return err + } labels := types.GetConversionWebhookLabel() for key, value := range labels { pod.Labels[key] = value diff --git a/controller/node_controller.go b/controller/node_controller.go index 54d658da4c..b5e444d080 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -523,7 +523,7 @@ func (nc *NodeController) syncNode(key string) (err error) { // have been turned off for RWX failover. labels := types.MergeStringMaps(types.GetAdmissionWebhookLabel(), types.GetRecoveryBackendLabel()) if err := nc.ds.AddLabelToManagerPod(node.Name, labels); err != nil { - log.Warnf("Node %v failed to restore its admission webhook and recovery backend", node.Name) + log.WithError(err).Error("Failed to restore its admission webhook and recovery backend") return err } diff --git a/controller/replica_controller.go b/controller/replica_controller.go index b727200cd9..83691ae62b 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -564,10 +564,10 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) { defer func() { if err != nil { log.WithError(err).Warnf("Failed to delete replica process %v", r.Name) - } - if isDelinquent { - log.Warnf("Ignored the failure of deleting replica process %v because the RWX volume is currently delinquent", r.Name) - err = nil + if isDelinquent { + log.Warnf("Ignored the failure of deleting replica process %v because the RWX volume is currently delinquent", r.Name) + err = nil + } } }()