diff --git a/app/daemon.go b/app/daemon.go index c6e98afa8e..b7d7a1be09 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -1,6 +1,7 @@ package app import ( + "context" "fmt" "net/http" _ "net/http/pprof" // for runtime profiling @@ -11,6 +12,7 @@ import ( "github.com/rancher/wrangler/pkg/signals" "github.com/sirupsen/logrus" "github.com/urfave/cli" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/longhorn/go-iscsi-helper/iscsi" @@ -144,6 +146,16 @@ func startManager(c *cli.Context) error { return fmt.Errorf("failed to detect the node IP") } + podName, err := util.GetRequiredEnv(types.EnvPodName) + if err != nil { + return fmt.Errorf("failed to detect the manager pod name") + } + + podNamespace, err := util.GetRequiredEnv(types.EnvPodNamespace) + if err != nil { + return fmt.Errorf("failed to detect the manager pod namespace") + } + ctx := signals.SetupSignalContext() logger := logrus.StandardLogger().WithField("node", currentNodeID) @@ -151,7 +163,8 @@ func startManager(c *cli.Context) error { // Conversion webhook needs to be started first since we use its port 9501 as readiness port. // longhorn-manager pod becomes ready only when conversion webhook is running. // The services in the longhorn-manager can then start to receive the requests. - // Conversion webhook does not longhorn datastore. + // Conversion webhook does not use datastore, since it is a prerequisite for + // datastore operation. clientsWithoutDatastore, err := client.NewClients(kubeconfigPath, false, ctx.Done()) if err != nil { return err @@ -159,6 +172,20 @@ func startManager(c *cli.Context) error { if err := webhook.StartWebhook(ctx, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil { return err } + + // This adds the label for the conversion webhook's selector. We do it the hard way without datastore to avoid chicken-and-egg. + pod, err := clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Get(context.Background(), podName, v1.GetOptions{}) + if err != nil { + return err + } + labels := types.GetConversionWebhookLabel() + for key, value := range labels { + pod.Labels[key] = value + } + _, err = clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Update(context.Background(), pod, v1.UpdateOptions{}) + if err != nil { + return err + } if err := webhook.CheckWebhookServiceAvailability(types.WebhookTypeConversion); err != nil { return err } @@ -167,9 +194,13 @@ func startManager(c *cli.Context) error { if err != nil { return err } + if err := webhook.StartWebhook(ctx, types.WebhookTypeAdmission, clients); err != nil { return err } + if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetAdmissionWebhookLabel()); err != nil { + return err + } if err := webhook.CheckWebhookServiceAvailability(types.WebhookTypeAdmission); err != nil { return err } @@ -178,6 +209,10 @@ func startManager(c *cli.Context) error { return err } + if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetRecoveryBackendLabel()); err != nil { + return err + } + if err := upgrade.Upgrade(kubeconfigPath, currentNodeID, managerImage, c.Bool(FlagUpgradeVersionCheck)); err != nil { return err } @@ -209,7 +244,7 @@ func startManager(c *cli.Context) error { return err } - if err := initDaemonNode(clients.Datastore); err != nil { + if err := initDaemonNode(clients.Datastore, currentNodeID); err != nil { return err } @@ -286,8 +321,7 @@ func updateRegistrySecretName(m *manager.VolumeManager) error { return nil } -func initDaemonNode(ds *datastore.DataStore) error { - nodeName := os.Getenv("NODE_NAME") +func initDaemonNode(ds *datastore.DataStore, nodeName string) error { if _, err := ds.GetNode(nodeName); err != nil { // init default disk on node when starting longhorn-manager if datastore.ErrorIsNotFound(err) { diff --git a/controller/engine_controller.go b/controller/engine_controller.go index fd210a5499..9703a77386 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -292,6 +292,7 @@ func (ec *EngineController) syncEngine(key string) (err error) { if !isResponsible { return nil } + if engine.Status.OwnerID != ec.controllerID { engine.Status.OwnerID = ec.controllerID engine, err = ec.ds.UpdateEngineStatus(engine) @@ -455,7 +456,6 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) { for _, e := range engineMap { ec.enqueueEngine(e) } - } func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { @@ -556,16 +556,9 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { } } - v, err := ec.ds.GetVolumeRO(e.Spec.VolumeName) + isRWXVolume, err := ec.ds.IsRegularRWXVolume(e.Spec.VolumeName) if err != nil { - if !apierrors.IsNotFound(err) { - return err - } - } - - isRWXVolume := false - if v != nil && v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable { - isRWXVolume = true + return err } // For a RWX volume, the node down, for example, caused by kubelet restart, leads to share-manager pod deletion/recreation @@ -580,16 +573,21 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { } } + isDelinquent, err := ec.ds.IsNodeDelinquent(im.Spec.NodeID, e.Spec.VolumeName) + if err != nil { + return err + } + log.Info("Deleting engine instance") defer func() { if err != nil { log.WithError(err).Warnf("Failed to delete engine %v", e.Name) } - if isRWXVolume && im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + if isRWXVolume && (im.Status.CurrentState != longhorn.InstanceManagerStateRunning || isDelinquent) { // Try the best to delete engine instance. // To prevent that the volume is stuck at detaching state, ignore the error when volume is - // a RWX volume and the instance manager is not running. + // a RWX volume and the instance manager is not running or the RWX volume is currently delinquent. // // If the engine instance of a RWX volume is not deleted successfully: // If a RWX volume is on node A and the network of this node is partitioned, @@ -599,7 +597,13 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { // After shifting to node A, the first reattachment fail due to the IO error resulting from the // orphaned engine instance and block device. Then, the detachment will trigger the teardown of the // problematic engine instance and block device. The next reattachment then will succeed. - log.Warnf("Ignored the failure of deleting engine %v", e.Name) + if im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + log.Warnf("Ignored the failure of deleting engine %v because im.Status.CurrentState is %v", e.Name, im.Status.CurrentState) + } + if isDelinquent { + log.Warnf("Ignored the failure of deleting engine %v because the RWX volume is currently delinquent", e.Name) + } + err = nil } }() @@ -2228,6 +2232,21 @@ func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineIm err = errors.Wrap(err, "error while checking isResponsibleFor") }() + // If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod + isDelinquent, err := ec.ds.IsNodeDelinquent(e.Status.OwnerID, e.Spec.VolumeName) + if err != nil { + return false, err + } + if isDelinquent { + pod, err := ec.ds.GetPodRO(ec.namespace, types.GetShareManagerPodNameFromShareManagerName(e.Spec.VolumeName)) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + if pod != nil && ec.controllerID == pod.Spec.NodeName { + return true, nil + } + } + isResponsible := isControllerResponsibleFor(ec.controllerID, ec.ds, e.Name, e.Spec.NodeID, e.Status.OwnerID) // The engine is not running, the owner node doesn't need to have e.Status.CurrentImage diff --git a/controller/instance_handler.go b/controller/instance_handler.go index c42566d980..efcd8aaf2f 100644 --- a/controller/instance_handler.go +++ b/controller/instance_handler.go @@ -56,7 +56,12 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan } }() - if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown { + isDelinquent := false + if im != nil { + isDelinquent, _ = h.ds.IsNodeDelinquent(im.Spec.NodeID, spec.VolumeName) + } + + if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown || isDelinquent { if status.Started { if status.CurrentState != longhorn.InstanceStateUnknown { logrus.Warnf("Marking the instance as state UNKNOWN since the related node %v of instance %v is down or deleted", spec.NodeID, instanceName) @@ -281,7 +286,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn return nil } // The related node maybe cleaned up then there is no available instance manager for this instance (typically it's replica). - isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeleted(spec.NodeID) + isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeletedOrDelinquent(spec.NodeID, spec.VolumeName) if err != nil { return err } diff --git a/controller/node_controller.go b/controller/node_controller.go index 51c45ccc65..b5e444d080 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -383,7 +383,7 @@ func (nc *NodeController) syncNode(key string) (err error) { existingNode := node.DeepCopy() defer func() { - // we're going to update volume assume things changes + // we're going to update node assume things changes if err == nil && !reflect.DeepEqual(existingNode.Status, node.Status) { _, err = nc.ds.UpdateNodeStatus(node) } @@ -519,6 +519,14 @@ func (nc *NodeController) syncNode(key string) (err error) { return nil } + // Getting here is enough proof of life to turn on the services that might + // have been turned off for RWX failover. + labels := types.MergeStringMaps(types.GetAdmissionWebhookLabel(), types.GetRecoveryBackendLabel()) + if err := nc.ds.AddLabelToManagerPod(node.Name, labels); err != nil { + log.WithError(err).Error("Failed to restore its admission webhook and recovery backend") + return err + } + // Create a monitor for collecting disk information if _, err := nc.createDiskMonitor(); err != nil { return err diff --git a/controller/replica_controller.go b/controller/replica_controller.go index 0eb6c7b07b..83691ae62b 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -231,7 +231,11 @@ func (rc *ReplicaController) syncReplica(key string) (err error) { log := getLoggerForReplica(rc.logger, replica) - if !rc.isResponsibleFor(replica) { + isResponsible, err := rc.isResponsibleFor(replica) + if err != nil { + return err + } + if !isResponsible { return nil } if replica.Status.OwnerID != rc.controllerID { @@ -510,7 +514,7 @@ func (rc *ReplicaController) CanStartRebuildingReplica(r *longhorn.Replica) (boo return true, nil } -func (rc *ReplicaController) DeleteInstance(obj interface{}) error { +func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) { r, ok := obj.(*longhorn.Replica) if !ok { return fmt.Errorf("invalid object for replica instance deletion: %v", obj) @@ -524,7 +528,6 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error { } var im *longhorn.InstanceManager - var err error // Not assigned or not updated, try best to delete if r.Status.InstanceManagerName == "" { if r.Spec.NodeID == "" { @@ -553,6 +556,21 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error { return nil } + isDelinquent, err := rc.ds.IsNodeDelinquent(im.Spec.NodeID, r.Spec.VolumeName) + if err != nil { + return err + } + + defer func() { + if err != nil { + log.WithError(err).Warnf("Failed to delete replica process %v", r.Name) + if isDelinquent { + log.Warnf("Ignored the failure of deleting replica process %v because the RWX volume is currently delinquent", r.Name) + err = nil + } + } + }() + c, err := engineapi.NewInstanceManagerClient(im) if err != nil { return err @@ -900,8 +918,23 @@ func (rc *ReplicaController) enqueueAllRebuildingReplicaOnCurrentNode() { } } -func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) bool { - return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID) +func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) (bool, error) { + // If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod + isDelinquent, err := rc.ds.IsNodeDelinquent(r.Status.OwnerID, r.Spec.VolumeName) + if err != nil { + return false, err + } + if isDelinquent { + pod, err := rc.ds.GetPodRO(rc.namespace, types.GetShareManagerPodNameFromShareManagerName(r.Spec.VolumeName)) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + if pod != nil && rc.controllerID == pod.Spec.NodeName { + return true, nil + } + } + + return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID), nil } func hasMatchingReplica(replica *longhorn.Replica, replicas map[string]*longhorn.Replica) bool { diff --git a/controller/share_manager_controller.go b/controller/share_manager_controller.go index 966e0a0594..2a0c9ee2ed 100644 --- a/controller/share_manager_controller.go +++ b/controller/share_manager_controller.go @@ -18,6 +18,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -35,6 +36,14 @@ import ( longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" ) +const shareManagerLeaseDurationSeconds = 7 // This should be slightly more than twice the share-manager lease renewal interval. + +type nfsServerConfig struct { + enableFastFailover bool + leaseLifetime int + gracePeriod int +} + type ShareManagerController struct { *baseController @@ -154,7 +163,7 @@ func (c *ShareManagerController) enqueueShareManagerForVolume(obj interface{}) { } } - if volume.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !volume.Spec.Migratable { + if isRegularRWXVolume(volume) { // we can queue the key directly since a share manager only manages a single volume from it's own namespace // and there is no need for us to retrieve the whole object, since we already know the volume name key := volume.Namespace + "/" + volume.Name @@ -203,6 +212,7 @@ func isShareManagerPod(obj interface{}) bool { } } + // This only matches once the pod is fully constructed, which may be the point. podContainers := pod.Spec.Containers for _, con := range podContainers { if con.Name == types.LonghornLabelShareManager { @@ -212,6 +222,42 @@ func isShareManagerPod(obj interface{}) bool { return false } +func (c *ShareManagerController) checkLeasesAndEnqueueAnyStale() error { + sms, err := c.ds.ListShareManagersRO() + if err != nil { + return err + } + for _, sm := range sms { + isStale, _, err := c.isShareManagerPodStale(sm) + if err != nil { + return err + } + if isStale { + c.enqueueShareManager(sm) + } + } + + return nil +} + +func (c *ShareManagerController) runLeaseCheck(stopCh <-chan struct{}) { + c.logger.Infof("Starting lease check goroutine") + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-stopCh: + c.logger.Info("Share manager lease check is ending") + return + case <-ticker.C: + if err := c.checkLeasesAndEnqueueAnyStale(); err != nil { + c.logger.WithError(err).Warn("Failed to check share-manager leases.") + } + } + } +} + func (c *ShareManagerController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -225,6 +271,7 @@ func (c *ShareManagerController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } + go c.runLeaseCheck(stopCh) <-stopCh } @@ -300,6 +347,9 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { } return err } + // This node may be an interim owner picked by isResponsibleFor until the pod is created + // and scheduled, at which point ownership will transfer to the pod's spec.nodename. + // But we need some controller to assume responsibility and do the restart. log.Infof("Share manager got new owner %v", c.controllerID) } @@ -316,6 +366,33 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { return c.ds.RemoveFinalizerForShareManager(sm) } + isStale, leaseHolder, err := c.isShareManagerPodStale(sm) + if err != nil { + return err + } + if isStale { + isDelinquent, _, err := c.ds.IsRWXVolumeDelinquent(sm.Name) + if err != nil { + return err + } + if !isDelinquent { + // Turn off the traffic to the admission webhook and recovery backend on the suspected node. + // Trying to talk to it will delay any effort to modify resources. + // Only turn off the other nodes to avoid a deadlock in a single node cluster. + if c.controllerID != leaseHolder { + labels := types.MergeStringMaps(types.GetAdmissionWebhookLabel(), types.GetRecoveryBackendLabel()) + if err := c.ds.RemoveLabelFromManagerPod(leaseHolder, labels); err != nil { + return errors.Wrapf(err, "failed to turn off admission webhook/recovery backed on node %v", leaseHolder) + } + } + + log.Infof("Marking share manager %v delinquent", sm.Name) + if err := c.markShareManagerDelinquent(sm); err != nil { + return err + } + } + } + // update at the end, after the whole reconcile loop existingShareManager := sm.DeepCopy() defer func() { @@ -330,11 +407,11 @@ func (c *ShareManagerController) syncShareManager(key string) (err error) { } }() - if err = c.syncShareManagerVolume(sm); err != nil { + if err = c.syncShareManagerPod(sm); err != nil { return err } - if err = c.syncShareManagerPod(sm); err != nil { + if err = c.syncShareManagerVolume(sm); err != nil { return err } @@ -617,8 +694,73 @@ func (c *ShareManagerController) syncShareManagerVolume(sm *longhorn.ShareManage return nil } -func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManager) error { +// markShareManagerLeaseDelinquent zeros the acquire time field as a flag that the volume +// should be fast-tracked for detaching away from the current lease-holding node. +func (c *ShareManagerController) markShareManagerDelinquent(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to markShareManagerDelinquent") + }() + + lease, err := c.ds.GetLease(sm.Name) + if err != nil { + return err + } + + lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Time{}} + if _, err := c.ds.UpdateLease(lease); err != nil { + return err + } + return nil +} + +// clearShareManagerLease just zeros out the renew time field preparatory to pod +// cleanup, so it won't be flagged as stale in normal-path code. +func (c *ShareManagerController) clearShareManagerLease(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to clearShareManagerLease") + }() + log := getLoggerForShareManager(c.logger, sm) + + lease, err := c.ds.GetLease(sm.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + holder := *lease.Spec.HolderIdentity + if !lease.Spec.RenewTime.IsZero() { + log.Infof("Clearing lease %v held by node %v for share manager pod cleanup.", sm.Name, holder) + lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Time{}} + if _, err := c.ds.UpdateLease(lease); err != nil { + return err + } + } + + return nil +} + +func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to cleanupShareManagerPod") + }() + + log := getLoggerForShareManager(c.logger, sm) + + // Are we cleaning up after a lease timeout? + isStale, leaseHolder, err := c.isShareManagerPodStale(sm) + if err != nil { + return err + } + + // Clear the lease so we won't try to act on apparent staleness. Staleness is now either dealt with or moot. + err = c.clearShareManagerLease(sm) + if err != nil { + return err + } + podName := types.GetShareManagerPodNameFromShareManagerName(sm.Name) pod, err := c.ds.GetPod(podName) if err != nil && !apierrors.IsNotFound(err) { @@ -631,10 +773,14 @@ func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManage log.Infof("Deleting share manager pod") if err := c.ds.DeletePod(podName); err != nil && !apierrors.IsNotFound(err) { + log.WithError(err).Warnf("Failed to delete share manager pod") return err } - if nodeFailed, _ := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName); nodeFailed { + // Force delete if the pod's node is known dead, or likely so since it let + // the lease time out and another node's controller is cleaning up after it. + nodeFailed, _ := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName) + if nodeFailed || (isStale && leaseHolder != c.controllerID) { log.Info("Force deleting pod to allow fail over since node of share manager pod is down") gracePeriod := int64(0) err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}) @@ -654,6 +800,10 @@ func (c *ShareManagerController) cleanupShareManagerPod(sm *longhorn.ShareManage // starting, running -> running (share ready to use) // controls transitions to running, error func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) (err error) { + defer func() { + err = errors.Wrapf(err, "failed to syncShareManagerPod") + }() + defer func() { if sm.Status.State == longhorn.ShareManagerStateStopping || sm.Status.State == longhorn.ShareManagerStateStopped || @@ -696,7 +846,28 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) } // If the node where the pod is running on become defective, we clean up the pod by setting sm.Status.State to STOPPED or ERROR - // A new pod will be recreated by the share manager controller. + // A new pod will be recreated by the share manager controller. We might get an early warning of that by the pod going stale. + isStale, _, err := c.isShareManagerPodStale(sm) + if err != nil { + return err + } + if isStale { + log.Warnf("ShareManager Pod %v is stale", pod.Name) + + // if we just transitioned to the starting state, while the prior cleanup is still in progress we will switch to error state + // which will lead to a bad loop of starting (new workload) -> error (remount) -> stopped (cleanup sm) + if sm.Status.State == longhorn.ShareManagerStateStopping { + return nil + } + + if sm.Status.State != longhorn.ShareManagerStateStopped { + log.Info("Updating share manager to error state") + sm.Status.State = longhorn.ShareManagerStateError + } + + return nil + } + isDown, err := c.ds.IsNodeDownOrDeleted(pod.Spec.NodeName) if err != nil { log.WithError(err).Warnf("Failed to check IsNodeDownOrDeleted(%v) when syncShareManagerPod", pod.Spec.NodeName) @@ -746,6 +917,7 @@ func (c *ShareManagerController) syncShareManagerPod(sm *longhorn.ShareManager) sm.Status.State = longhorn.ShareManagerStateError } default: + log.Infof("Share manager pod %v in unexpected phase: %v, setting sharemanager to error state.", sm.Name, pod.Status.Phase) sm.Status.State = longhorn.ShareManagerStateError } @@ -782,6 +954,39 @@ func (c *ShareManagerController) getAffinityFromStorageClass(sc *storagev1.Stora } } +func (c *ShareManagerController) addStaleNodeAntiAffinity(affinity *corev1.Affinity, staleNode string) *corev1.Affinity { + var matchFields []corev1.NodeSelectorRequirement + + matchFields = append(matchFields, corev1.NodeSelectorRequirement{ + Key: "metadata.name", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{staleNode}, + }) + + // Note the difference between MatchFields and MatchExpressions. + //See https://stackoverflow.com/questions/67018171/kubernetes-what-are-valid-node-fields + nodeAntiAffinity := &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + corev1.PreferredSchedulingTerm{ + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchFields: matchFields, + }, + }, + }, + } + + if affinity == nil { + affinity = &corev1.Affinity{ + NodeAffinity: nodeAntiAffinity, + } + } else { + affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = nodeAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution + } + + return affinity +} + func (c *ShareManagerController) getShareManagerNodeSelectorFromStorageClass(sc *storagev1.StorageClass) map[string]string { value, ok := sc.Parameters["shareManagerNodeSelector"] if !ok { @@ -846,8 +1051,11 @@ func (c *ShareManagerController) createServiceAndEndpoint(shareManager *longhorn return nil } -// createShareManagerPod ensures existence of service, it's assumed that the pvc for this share manager already exists +// createShareManagerPod ensures existence of corresponding service and lease. +// it's assumed that the pvc for this share manager already exists. func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager) (*corev1.Pod, error) { + log := getLoggerForShareManager(c.logger, sm) + tolerations, err := c.ds.GetSettingTaintToleration() if err != nil { return nil, errors.Wrap(err, "failed to get taint toleration setting before creating share manager pod") @@ -886,6 +1094,35 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager return nil, errors.Wrapf(err, "failed to create service and endpoint for share manager %v", sm.Name) } + nfsConfig := &nfsServerConfig{ + enableFastFailover: false, + leaseLifetime: 60, + gracePeriod: 90, + } + + enabled, err := c.ds.GetSettingAsBool(types.SettingNameRWXVolumeFastFailover) + if err != nil { + return nil, err + } + if enabled { + nfsConfig = &nfsServerConfig{ + enableFastFailover: true, + leaseLifetime: 20, + gracePeriod: 30, + } + + if _, err := c.ds.GetLeaseRO(sm.Name); err != nil { + if !apierrors.IsNotFound(err) { + return nil, errors.Wrapf(err, "failed to get lease for share manager %v", sm.Name) + } + + if _, err = c.ds.CreateLease(c.createLeaseManifest(sm)); err != nil { + return nil, errors.Wrapf(err, "failed to create lease for share manager %v", sm.Name) + } + } + + } + volume, err := c.ds.GetVolume(sm.Name) if err != nil { return nil, err @@ -923,6 +1160,15 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager } } + isDelinquent, delinquentNode, err := c.ds.IsRWXVolumeDelinquent(sm.Name) + if err != nil { + return nil, err + } + if isDelinquent && delinquentNode != "" { + log.Infof("Creating anti-affinity for share manager pod against delinquent node %v", delinquentNode) + affinity = c.addStaleNodeAntiAffinity(affinity, delinquentNode) + } + fsType := pv.Spec.CSI.FSType mountOptions := pv.Spec.MountOptions @@ -948,7 +1194,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager } manifest := c.createPodManifest(sm, annotations, tolerations, affinity, imagePullPolicy, nil, registrySecret, - priorityClass, nodeSelector, fsType, mountOptions, cryptoKey, cryptoParams) + priorityClass, nodeSelector, fsType, mountOptions, cryptoKey, cryptoParams, nfsConfig) storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork) if err != nil { @@ -971,8 +1217,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager if err != nil { return nil, errors.Wrapf(err, "failed to create pod for share manager %v", sm.Name) } - - getLoggerForShareManager(c.logger, sm).WithField("pod", pod.Name).Infof("Created pod for share manager on node %v", pod.Spec.NodeName) + log.WithField("pod", pod.Name).Infof("Created pod for share manager on node %v", pod.Spec.NodeName) return pod, nil } @@ -1040,9 +1285,36 @@ func (c *ShareManagerController) createEndpoint(sm *longhorn.ShareManager) (*cor return c.ds.CreateKubernetesEndpoint(newObj) } +func (c *ShareManagerController) createLeaseManifest(sm *longhorn.ShareManager) *coordinationv1.Lease { + // No current holder, share-manager pod will fill it with its owning node. + holderIdentity := "" + leaseDurationSeconds := int32(shareManagerLeaseDurationSeconds) + leaseTransitions := int32(0) + zeroTime := time.Now() + + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: sm.Name, + Namespace: c.namespace, + OwnerReferences: datastore.GetOwnerReferencesForShareManager(sm, false), + Labels: types.GetShareManagerInstanceLabel(sm.Name), + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &holderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &metav1.MicroTime{Time: zeroTime}, + RenewTime: &metav1.MicroTime{Time: zeroTime}, + LeaseTransitions: &leaseTransitions, + }, + } + + return lease +} + func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, annotations map[string]string, tolerations []corev1.Toleration, affinity *corev1.Affinity, pullPolicy corev1.PullPolicy, resourceReq *corev1.ResourceRequirements, registrySecret, priorityClass string, - nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams) *corev1.Pod { + nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams, + nfsConfig *nfsServerConfig) *corev1.Pod { // command args for the share-manager args := []string{"--debug", "daemon", "--volume", sm.Name} @@ -1097,9 +1369,24 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an }, } + podSpec.Spec.Containers[0].Env = []corev1.EnvVar{ + { + Name: "FAST_FAILOVER", + Value: fmt.Sprint(nfsConfig.enableFastFailover), + }, + { + Name: "LEASE_LIFETIME", + Value: fmt.Sprint(nfsConfig.leaseLifetime), + }, + { + Name: "GRACE_PERIOD", + Value: fmt.Sprint(nfsConfig.gracePeriod), + }, + } + // this is an encrypted volume the cryptoKey is base64 encoded if len(cryptoKey) > 0 { - podSpec.Spec.Containers[0].Env = []corev1.EnvVar{ + podSpec.Spec.Containers[0].Env = append(podSpec.Spec.Containers[0].Env, []corev1.EnvVar{ { Name: "ENCRYPTED", Value: "True", @@ -1124,7 +1411,7 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an Name: "CRYPTOPBKDF", Value: string(cryptoParams.GetPBKDF()), }, - } + }...) } podSpec.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{ @@ -1198,12 +1485,65 @@ func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, an return podSpec } -// isResponsibleFor in most controllers we only checks if the node of the current owner is down -// but in the case where the node is unschedulable we want to transfer ownership, -// since we will create sm pod on the sm.Status.OwnerID when the sm starts +// isShareManagerPodStale checks the associated lease CR to see whether the current pod (if any) +// has fallen behind on renewing the lease. If there is any error finding out, we assume not stale. +func (c *ShareManagerController) isShareManagerPodStale(sm *longhorn.ShareManager) (stale bool, holder string, err error) { + defer func() { + err = errors.Wrapf(err, "failed to check isShareManagerPodStale") + }() + + log := getLoggerForShareManager(c.logger, sm) + + enabled, err := c.ds.GetSettingAsBool(types.SettingNameRWXVolumeFastFailover) + if err != nil { + return false, "", err + } + if !enabled { + return false, "", nil + } + + leaseName := sm.Name + lease, err := c.ds.GetLeaseRO(leaseName) + if err != nil { + if !apierrors.IsNotFound(err) { + return false, "", err + } + return false, "", nil + } + + // Consider it stale if + // - there is a lease-holding node, ie there is a share-manager version running that knows to acquire the lease. + // - the pod is not being cleaned up, + // - the lease duration is a sane value, and + // - the time of renewal is longer ago than the lease duration. + holder = *lease.Spec.HolderIdentity + if holder == "" { + return false, "", nil + } + if (lease.Spec.RenewTime).IsZero() { + return false, holder, nil + } + if *lease.Spec.LeaseDurationSeconds < shareManagerLeaseDurationSeconds { + log.Warnf("Lease for %v has a crazy value for duration: %v seconds. Ignoring.", leaseName, *lease.Spec.LeaseDurationSeconds) + return false, holder, nil + } + expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + if time.Now().After(expireTime) { + log.Warnf("Lease for %v held by %v is stale, expired %v ago", leaseName, holder, time.Since(expireTime)) + stale = true + } + + return stale, holder, nil +} + +// isResponsibleFor in most controllers only checks if the node of the current owner is known +// by kubernetes to be down. But in the case where the lease is stale or the node is unschedulable +// we want to transfer ownership and mark the node as delinquent for related status checking. func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bool, error) { - // We prefer keeping the owner of the share manager CR to be the same node - // where the share manager pod is running on. + log := getLoggerForShareManager(c.logger, sm) + + // We prefer keeping the owner of the share manager CR to be the node + // where the share manager pod got scheduled and is running. preferredOwnerID := "" podName := types.GetShareManagerPodNameFromShareManagerName(sm.Name) pod, err := c.ds.GetPodRO(c.namespace, podName) @@ -1211,6 +1551,7 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo preferredOwnerID = pod.Spec.NodeName } + // Base class method is used to decide based on node schedulable condition. isResponsible := isControllerResponsibleFor(c.controllerID, c.ds, sm.Name, preferredOwnerID, sm.Status.OwnerID) readyAndSchedulableNodes, err := c.ds.ListReadyAndSchedulableNodesRO() @@ -1229,5 +1570,29 @@ func (c *ShareManagerController) isResponsibleFor(sm *longhorn.ShareManager) (bo continueToBeOwner := currentNodeSchedulable && !preferredOwnerSchedulable && c.controllerID == sm.Status.OwnerID requiresNewOwner := currentNodeSchedulable && !preferredOwnerSchedulable && !currentOwnerSchedulable + isNodeAvailable := func(node string) bool { + isUnavailable, _ := c.ds.IsNodeDownOrDeletedOrMissingManager(node) + return node != "" && !isUnavailable + } + + // If the lease is stale, we assume the owning node is down but not officially dead. + // Some node has to take over, and it might as well be this one, if another one + // has not already. + isStale, leaseHolder, err := c.isShareManagerPodStale(sm) + if err != nil { + return false, err + } + if isStale { + // Avoid race between nodes by checking for an existing interim owner. + if leaseHolder != sm.Status.OwnerID && + c.controllerID != sm.Status.OwnerID && + isNodeAvailable(sm.Status.OwnerID) && + c.ds.IsNodeSchedulable(sm.Status.OwnerID) { + return false, nil + } + log.Infof("Interim owner %v taking responsibility for stale lease-holder %v", c.controllerID, leaseHolder) + return currentNodeSchedulable, nil + } + return isPreferredOwner || continueToBeOwner || requiresNewOwner, nil } diff --git a/controller/utils.go b/controller/utils.go index 72b0cadc4e..0535d5ff9f 100644 --- a/controller/utils.go +++ b/controller/utils.go @@ -83,3 +83,10 @@ func setReplicaFailedAt(r *longhorn.Replica, timestamp string) { r.Spec.LastFailedAt = timestamp } } + +func isRegularRWXVolume(v *longhorn.Volume) bool { + if v == nil { + return false + } + return v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable +} diff --git a/controller/volume_attachment_controller.go b/controller/volume_attachment_controller.go index b58f3971a7..04cef6e05f 100644 --- a/controller/volume_attachment_controller.go +++ b/controller/volume_attachment_controller.go @@ -940,13 +940,6 @@ func isCSIAttacherTicketOfRegularRWXVolume(attachmentTicket *longhorn.Attachment return isRegularRWXVolume(v) && isCSIAttacherTicket(attachmentTicket) } -func isRegularRWXVolume(v *longhorn.Volume) bool { - if v == nil { - return false - } - return v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable -} - func isCSIAttacherTicket(ticket *longhorn.AttachmentTicket) bool { if ticket == nil { return false diff --git a/controller/volume_controller.go b/controller/volume_controller.go index d8569c19b8..b2956d02ff 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -2038,11 +2038,11 @@ func (c *VolumeController) reconcileVolumeSize(v *longhorn.Volume, e *longhorn.E } func (c *VolumeController) canInstanceManagerLaunchReplica(r *longhorn.Replica) (bool, error) { - nodeDown, err := c.ds.IsNodeDownOrDeleted(r.Spec.NodeID) + isNodeDownOrDeletedOrDelinquent, err := c.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, r.Spec.VolumeName) if err != nil { - return false, errors.Wrapf(err, "fail to check IsNodeDownOrDeleted %v", r.Spec.NodeID) + return false, errors.Wrapf(err, "fail to check IsNodeDownOrDeletedOrDelinquent %v", r.Spec.NodeID) } - if nodeDown { + if isNodeDownOrDeletedOrDelinquent { return false, nil } // Replica already had IM @@ -4368,6 +4368,21 @@ func (c *VolumeController) isResponsibleFor(v *longhorn.Volume, defaultEngineIma err = errors.Wrap(err, "error while checking isResponsibleFor") }() + // If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod + isDelinquent, err := c.ds.IsNodeDelinquent(v.Status.OwnerID, v.Name) + if err != nil { + return false, err + } + if isDelinquent { + pod, err := c.ds.GetPodRO(v.Namespace, types.GetShareManagerPodNameFromShareManagerName(v.Name)) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + if pod != nil && c.controllerID == pod.Spec.NodeName { + return true, nil + } + } + isResponsible := isControllerResponsibleFor(c.controllerID, c.ds, v.Name, v.Spec.NodeID, v.Status.OwnerID) if types.IsDataEngineV1(v.Spec.DataEngine) { diff --git a/datastore/datastore.go b/datastore/datastore.go index 2091279284..e933165592 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -10,6 +10,7 @@ import ( clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" batchlisters_v1 "k8s.io/client-go/listers/batch/v1" + coordinationlisters "k8s.io/client-go/listers/coordination/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" schedulinglisters "k8s.io/client-go/listers/scheduling/v1" @@ -123,6 +124,8 @@ type DataStore struct { ServiceInformer cache.SharedInformer endpointLister corelisters.EndpointsLister EndpointInformer cache.SharedInformer + leaseLister coordinationlisters.LeaseLister + LeaseInformer cache.SharedInformer extensionsClient apiextensionsclientset.Interface } @@ -194,6 +197,8 @@ func NewDataStore(namespace string, lhClient lhclientset.Interface, kubeClient c cacheSyncs = append(cacheSyncs, storageclassInformer.Informer().HasSynced) priorityClassInformer := informerFactories.KubeInformerFactory.Scheduling().V1().PriorityClasses() cacheSyncs = append(cacheSyncs, priorityClassInformer.Informer().HasSynced) + leaseInformer := informerFactories.KubeInformerFactory.Coordination().V1().Leases() + cacheSyncs = append(cacheSyncs, leaseInformer.Informer().HasSynced) // Filtered kube Informers by longhorn-system namespace cronJobInformer := informerFactories.KubeNamespaceFilteredInformerFactory.Batch().V1().CronJobs() @@ -281,6 +286,8 @@ func NewDataStore(namespace string, lhClient lhclientset.Interface, kubeClient c StorageClassInformer: storageclassInformer.Informer(), priorityClassLister: priorityClassInformer.Lister(), PriorityClassInformer: priorityClassInformer.Informer(), + leaseLister: leaseInformer.Lister(), + LeaseInformer: leaseInformer.Informer(), cronJobLister: cronJobInformer.Lister(), CronJobInformer: cronJobInformer.Informer(), diff --git a/datastore/kubernetes.go b/datastore/kubernetes.go index e72507d79d..f9120760cf 100644 --- a/datastore/kubernetes.go +++ b/datastore/kubernetes.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/resource" @@ -18,6 +19,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -243,11 +245,78 @@ func (s *DataStore) UpdatePod(obj *corev1.Pod) (*corev1.Pod, error) { return s.kubeClient.CoreV1().Pods(s.namespace).Update(context.TODO(), obj, metav1.UpdateOptions{}) } +// CreateLease creates a Lease resource for the given CreateLease object +func (s *DataStore) CreateLease(lease *coordinationv1.Lease) (*coordinationv1.Lease, error) { + return s.kubeClient.CoordinationV1().Leases(s.namespace).Create(context.TODO(), lease, metav1.CreateOptions{}) +} + +// GetLease gets the Lease for the given name +func (s *DataStore) GetLeaseRO(name string) (*coordinationv1.Lease, error) { + return s.leaseLister.Leases(s.namespace).Get(name) + // return s.kubeClient.CoordinationV1().Leases(namespace).Get(context.Background(), name, metav1.GetOptions{}) +} + +// GetLease returns a new Lease object for the given name +func (s *DataStore) GetLease(name string) (*coordinationv1.Lease, error) { + resultRO, err := s.GetLeaseRO(name) + if err != nil { + return nil, err + } + // Cannot use cached object from lister + return resultRO.DeepCopy(), nil +} + +// ListLeasesRO returns a list of all Leases for the given namespace, +// the list contains direct references to the internal cache objects and should not be mutated. +func (s *DataStore) ListLeasesRO() ([]*coordinationv1.Lease, error) { + return s.leaseLister.Leases(s.namespace).List(labels.Everything()) +} + // DeleteLease deletes Lease with the given name in s.namespace func (s *DataStore) DeleteLease(name string) error { return s.kubeClient.CoordinationV1().Leases(s.namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) } +// UpdateLease updates the Lease resource with the given object and namespace +func (s *DataStore) UpdateLease(lease *coordinationv1.Lease) (*coordinationv1.Lease, error) { + return s.kubeClient.CoordinationV1().Leases(s.namespace).Update(context.TODO(), lease, metav1.UpdateOptions{}) +} + +// IsRWXVolumeDelinquent checks whether the volume has a lease by the same name, which an RWX volume should, +// and whether that lease's spec shows that its holder is delinquent (its acquire time has been zeroed.) +// If so, return the delinquent holder. +// Any hiccup yields a return of "false". +func (s *DataStore) IsRWXVolumeDelinquent(name string) (isDelinquent bool, holder string, err error) { + defer func() { + err = errors.Wrapf(err, "failed to check IsRWXVolumeDelinquent") + }() + + enabled, err := s.GetSettingAsBool(types.SettingNameRWXVolumeFastFailover) + if err != nil { + return false, "", err + } + if !enabled { + return false, "", nil + } + + lease, err := s.GetLeaseRO(name) + if err != nil { + if apierrors.IsNotFound(err) { + return false, "", nil + } + return false, "", err + } + + holder = *lease.Spec.HolderIdentity + if holder == "" { + return false, "", nil + } + if (lease.Spec.AcquireTime).IsZero() { + isDelinquent = true + } + return isDelinquent, holder, nil +} + // GetStorageClassRO gets StorageClass with the given name // This function returns direct reference to the internal cache object and should not be mutated. // Consider using this function when you can guarantee read only access and don't want the overhead of deep copies @@ -430,6 +499,51 @@ func (s *DataStore) ListManagerPodsRO() ([]*corev1.Pod, error) { return s.ListPodsBySelectorRO(selector) } +func (s *DataStore) GetManagerPodForNode(nodeName string) (*corev1.Pod, error) { + pods, err := s.ListManagerPods() + if err != nil { + return nil, err + } + for _, pod := range pods { + if pod.Spec.NodeName == nodeName { + return pod, nil + } + } + return nil, apierrors.NewNotFound(corev1.Resource("pod"), nodeName) +} + +func (s *DataStore) AddLabelToManagerPod(nodeName string, label map[string]string) error { + pod, err := s.GetManagerPodForNode(nodeName) + if err != nil { + return err + } + changed := false + for key, value := range label { + if _, exists := pod.Labels[key]; !exists { + pod.Labels[key] = value + changed = true + } + } + + // Protect against frequent no-change updates. + if changed { + _, err = s.UpdatePod(pod) + } + return err +} + +func (s *DataStore) RemoveLabelFromManagerPod(nodeName string, label map[string]string) error { + pod, err := s.GetManagerPodForNode(nodeName) + if err != nil { + return err + } + for key := range label { + delete(pod.Labels, key) + } + _, err = s.UpdatePod(pod) + return err +} + func getInstanceManagerComponentSelector() (labels.Selector, error) { return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: types.GetInstanceManagerComponentLabel(), diff --git a/datastore/longhorn.go b/datastore/longhorn.go index 4e2efdf3f6..41951c4ddf 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -962,6 +962,17 @@ func (s *DataStore) ListVolumes() (map[string]*longhorn.Volume, error) { return itemMap, nil } +func (s *DataStore) IsRegularRWXVolume(volumeName string) (bool, error) { + v, err := s.GetVolumeRO(volumeName) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable, nil +} + func MarshalLabelToVolumeRecurringJob(labels map[string]string) map[string]*longhorn.VolumeRecurringJob { groupPrefix := fmt.Sprintf(types.LonghornLabelRecurringJobKeyPrefixFmt, types.LonghornLabelRecurringJobGroup) + "/" jobPrefix := fmt.Sprintf(types.LonghornLabelRecurringJobKeyPrefixFmt, types.LonghornLabelRecurringJob) + "/" @@ -2930,6 +2941,52 @@ func (s *DataStore) IsNodeDownOrDeleted(name string) (bool, error) { return false, nil } +// IsNodeDelinquent checks an early-warning condition of Lease expiration +// that is of interest to share-manager types. +func (s *DataStore) IsNodeDelinquent(nodeName string, volumeName string) (bool, error) { + if nodeName == "" || volumeName == "" { + return false, nil + } + + isRWX, err := s.IsRegularRWXVolume(volumeName) + if err != nil { + return false, err + } + if !isRWX { + return false, nil + } + + isDelinquent, delinquentNode, err := s.IsRWXVolumeDelinquent(volumeName) + if err != nil { + return false, err + } + return isDelinquent && delinquentNode == nodeName, nil +} + +// IsNodeDownOrDeletedOrDelinquent gets Node for the given name and checks +// if the Node condition is gone or not ready or, if we are asking on behalf +// of an RWX-related resource, delinquent for that resource's volume. +func (s *DataStore) IsNodeDownOrDeletedOrDelinquent(nodeName string, volumeName string) (bool, error) { + if nodeName == "" { + return false, errors.New("no node name provided to check node down or deleted or delinquent") + } + node, err := s.GetNodeRO(nodeName) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + return false, err + } + cond := types.GetCondition(node.Status.Conditions, longhorn.NodeConditionTypeReady) + if cond.Status == longhorn.ConditionStatusFalse && + (cond.Reason == string(longhorn.NodeConditionReasonKubernetesNodeGone) || + cond.Reason == string(longhorn.NodeConditionReasonKubernetesNodeNotReady)) { + return true, nil + } + + return s.IsNodeDelinquent(nodeName, volumeName) +} + // IsNodeDeleted checks whether the node does not exist by passing in the node name func (s *DataStore) IsNodeDeleted(name string) (bool, error) { if name == "" { @@ -3835,6 +3892,10 @@ func (s *DataStore) ListShareManagers() (map[string]*longhorn.ShareManager, erro return itemMap, nil } +func (s *DataStore) ListShareManagersRO() ([]*longhorn.ShareManager, error) { + return s.shareManagerLister.ShareManagers(s.namespace).List(labels.Everything()) +} + // CreateBackupTarget creates a Longhorn BackupTargets CR and verifies creation func (s *DataStore) CreateBackupTarget(backupTarget *longhorn.BackupTarget) (*longhorn.BackupTarget, error) { ret, err := s.lhClient.LonghornV1beta2().BackupTargets(s.namespace).Create(context.TODO(), backupTarget, metav1.CreateOptions{}) diff --git a/manager/volume.go b/manager/volume.go index 4bbaef2c7f..1ef08cad71 100644 --- a/manager/volume.go +++ b/manager/volume.go @@ -347,7 +347,7 @@ func (m *VolumeManager) Salvage(volumeName string, replicaNames []string) (v *lo if r.Spec.VolumeName != v.Name { return nil, fmt.Errorf("replica %v doesn't belong to volume %v", r.Name, v.Name) } - isDownOrDeleted, err := m.ds.IsNodeDownOrDeleted(r.Spec.NodeID) + isDownOrDeleted, err := m.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, v.Name) if err != nil { return nil, fmt.Errorf("failed to check if the related node %v is still running for replica %v", r.Spec.NodeID, name) } diff --git a/types/setting.go b/types/setting.go index 7d3c3b1978..37b825aa0a 100644 --- a/types/setting.go +++ b/types/setting.go @@ -136,6 +136,7 @@ const ( SettingNameFreezeFilesystemForSnapshot = SettingName("freeze-filesystem-for-snapshot") SettingNameAutoCleanupSnapshotWhenDeleteBackup = SettingName("auto-cleanup-when-delete-backup") SettingNameDefaultMinNumberOfBackingImageCopies = SettingName("default-min-number-of-backing-image-copies") + SettingNameRWXVolumeFastFailover = SettingName("rwx-volume-fast-failover") ) var ( @@ -228,6 +229,7 @@ var ( SettingNameFreezeFilesystemForSnapshot, SettingNameAutoCleanupSnapshotWhenDeleteBackup, SettingNameDefaultMinNumberOfBackingImageCopies, + SettingNameRWXVolumeFastFailover, } ) @@ -348,6 +350,7 @@ var ( SettingNameFreezeFilesystemForSnapshot: SettingDefinitionFreezeFilesystemForSnapshot, SettingNameAutoCleanupSnapshotWhenDeleteBackup: SettingDefinitionAutoCleanupSnapshotWhenDeleteBackup, SettingNameDefaultMinNumberOfBackingImageCopies: SettingDefinitionDefaultMinNumberOfBackingImageCopies, + SettingNameRWXVolumeFastFailover: SettingDefinitionRWXVolumeFastFailover, } SettingDefinitionBackupTarget = SettingDefinition{ @@ -1482,6 +1485,16 @@ var ( ValueIntRangeMinimum: 1, }, } + + SettingDefinitionRWXVolumeFastFailover = SettingDefinition{ + DisplayName: "RWX Volume Fast Failover", + Description: "Turn on logic to detect and move stale RWX volumes quickly (Experimental)", + Category: SettingCategoryGeneral, + Type: SettingTypeBool, + Required: true, + ReadOnly: false, + Default: "false", + } ) type NodeDownPodDeletionPolicy string diff --git a/types/types.go b/types/types.go index 63272530cc..f4059923ca 100644 --- a/types/types.go +++ b/types/types.go @@ -174,6 +174,10 @@ const ( LonghornLabelLastSystemRestoreBackup = "last-system-restored-backup" LonghornLabelDataEngine = "data-engine" LonghornLabelVersion = "version" + LonghornLabelAdmissionWebhook = "admission-webhook" + LonghornLabelConversionWebhook = "conversion-webhook" + + LonghornRecoveryBackendServiceName = "longhorn-recovery-backend" LonghornLabelValueEnabled = "enabled" LonghornLabelValueIgnored = "ignored" @@ -220,6 +224,7 @@ const ( const ( EnvNodeName = "NODE_NAME" + EnvPodName = "POD_NAME" EnvPodNamespace = "POD_NAMESPACE" EnvPodIP = "POD_IP" EnvServiceAccount = "SERVICE_ACCOUNT" @@ -421,6 +426,24 @@ func GetManagerLabels() map[string]string { } } +func GetAdmissionWebhookLabel() map[string]string { + return map[string]string{ + GetLonghornLabelKey(LonghornLabelAdmissionWebhook): AdmissionWebhookServiceName, + } +} + +func GetRecoveryBackendLabel() map[string]string { + return map[string]string{ + GetLonghornLabelKey(LonghornLabelRecoveryBackend): LonghornRecoveryBackendServiceName, + } +} + +func GetConversionWebhookLabel() map[string]string { + return map[string]string{ + GetLonghornLabelKey(LonghornLabelConversionWebhook): ConversionWebhookServiceName, + } +} + func GetEngineImageLabels(engineImageName string) map[string]string { labels := GetBaseLabelsForSystemManagedComponent() labels[GetLonghornLabelComponentKey()] = LonghornLabelEngineImage @@ -1233,3 +1256,14 @@ func IsStorageNetworkForRWXVolume(storageNetwork *longhorn.Setting, isStorageNet } return storageNetwork.Value != CniNetworkNone && isStorageNetworkForRWXVolumeEnabled } + +func MergeStringMaps(baseMap, overwriteMap map[string]string) map[string]string { + result := map[string]string{} + for k, v := range baseMap { + result[k] = v + } + for k, v := range overwriteMap { + result[k] = v + } + return result +} diff --git a/upgrade/util/util.go b/upgrade/util/util.go index 9311db352d..cad3038e55 100644 --- a/upgrade/util/util.go +++ b/upgrade/util/util.go @@ -128,17 +128,6 @@ func ListManagerPods(namespace string, kubeClient *clientset.Clientset) ([]corev return managerPodsList.Items, nil } -func MergeStringMaps(baseMap, overwriteMap map[string]string) map[string]string { - result := map[string]string{} - for k, v := range baseMap { - result[k] = v - } - for k, v := range overwriteMap { - result[k] = v - } - return result -} - func GetCurrentLonghornVersion(namespace string, lhClient lhclientset.Interface) (string, error) { currentLHVersionSetting, err := lhClient.LonghornV1beta2().Settings(namespace).Get(context.TODO(), string(types.SettingNameCurrentLonghornVersion), metav1.GetOptions{}) if err != nil {