From fe264b69d4f103d09f55a08abe7abbfbeba6b98f Mon Sep 17 00:00:00 2001 From: James Munson Date: Tue, 7 May 2024 14:50:33 -0600 Subject: [PATCH] feat(RWX HA) Add fast-failover code. Signed-off-by: James Munson --- go.mod | 6 +- pkg/server/nfs/nfs_server.go | 18 ++-- pkg/server/share_manager.go | 161 ++++++++++++++++++++++++++++++++++- vendor/modules.txt | 6 +- 4 files changed, 176 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 6471c6a2..525a4ab0 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,9 @@ require ( golang.org/x/sys v0.22.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 + k8s.io/api v0.30.3 + k8s.io/apimachinery v0.30.3 + k8s.io/client-go v0.30.3 k8s.io/kubernetes v1.30.2 k8s.io/mount-utils v0.30.2 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 @@ -97,9 +100,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.30.2 // indirect - k8s.io/apimachinery v0.30.2 // indirect - k8s.io/client-go v0.30.2 // indirect k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/pkg/server/nfs/nfs_server.go b/pkg/server/nfs/nfs_server.go index acb982cd..233168dc 100644 --- a/pkg/server/nfs/nfs_server.go +++ b/pkg/server/nfs/nfs_server.go @@ -45,8 +45,8 @@ LOG { NFSV4 { - Lease_Lifetime = 60; - Grace_Period = 90; + Lease_Lifetime = {{.LeaseLifetime}}; + Grace_Period = {{.GracePeriod}}; Minor_Versions = 1, 2; RecoveryBackend = longhorn; Only_Numeric_Owners = true; @@ -84,13 +84,13 @@ type Server struct { exporter *Exporter } -func NewServer(logger logrus.FieldLogger, configPath, exportPath, volume string) (*Server, error) { +func NewServer(logger logrus.FieldLogger, configPath, exportPath, volume string, leaseLifetime int, gracePeriod int) (*Server, error) { if err := setRlimitNOFILE(logger); err != nil { logger.WithError(err).Warn("Error setting RLIMIT_NOFILE, there may be 'Too many open files' errors later") } if _, err := os.Stat(configPath); os.IsNotExist(err) { - if err = os.WriteFile(configPath, getUpdatedGaneshConfig(defaultConfig), 0600); err != nil { + if err = os.WriteFile(configPath, getUpdatedGaneshConfig(defaultConfig, leaseLifetime, gracePeriod), 0600); err != nil { return nil, errors.Wrapf(err, "error writing nfs config %s", configPath) } } @@ -145,7 +145,7 @@ func setRlimitNOFILE(logger logrus.FieldLogger) error { return nil } -func getUpdatedGaneshConfig(config []byte) []byte { +func getUpdatedGaneshConfig(config []byte, leaseLifetime int, gracePeriod int) []byte { var ( tmplBuf bytes.Buffer logPath string @@ -158,9 +158,13 @@ func getUpdatedGaneshConfig(config []byte) []byte { } tmplVals := struct { - LogPath string + LogPath string + LeaseLifetime int + GracePeriod int }{ - LogPath: logPath, + LogPath: logPath, + LeaseLifetime: leaseLifetime, + GracePeriod: gracePeriod, } if err := template.Must(template.New("Ganesha_Config").Parse(string(config))).Execute(&tmplBuf, tmplVals); err != nil { diff --git a/pkg/server/share_manager.go b/pkg/server/share_manager.go index d2c9cfa3..8a15cf41 100644 --- a/pkg/server/share_manager.go +++ b/pkg/server/share_manager.go @@ -3,12 +3,20 @@ package server import ( "context" "fmt" + "os" "os/exec" + "strconv" "strings" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" + + coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" + "k8s.io/client-go/rest" mount "k8s.io/mount-utils" "github.com/longhorn/longhorn-share-manager/pkg/crypto" @@ -20,8 +28,17 @@ import ( ) const waitBetweenChecks = time.Second * 5 +const leaseRenewInterval = time.Second * 3 const healthCheckInterval = time.Second * 10 const configPath = "/tmp/vfs.conf" +const namespace = "longhorn-system" +const shareManagerPrefix = "share-manager-" + +const EnvKeyFastFailover = "FAST_FAILOVER" +const EnvKeyLeaseLifetime = "LEASE_LIFETIME" +const EnvKeyGracePeriod = "GRACE_PERIOD" +const defaultLeaseLifetime = 60 +const defaultGracePeriod = 90 const ( UnhealthyErr = "UNHEALTHY: volume with mount path %v is unhealthy" @@ -37,6 +54,11 @@ type ShareManager struct { context context.Context shutdown context.CancelFunc + enableFastFailover bool + leaseHolder string + leaseClient coordinationv1client.LeasesGetter + lease *coordinationv1.Lease + nfsServer *nfs.Server } @@ -47,7 +69,31 @@ func NewShareManager(logger logrus.FieldLogger, volume volume.Volume) (*ShareMan } m.context, m.shutdown = context.WithCancel(context.Background()) - nfsServer, err := nfs.NewServer(logger, configPath, types.ExportPath, volume.Name) + enableFastFailover := m.getEnvAsBool(EnvKeyFastFailover, false) + leaseLifetime := m.getEnvAsInt(EnvKeyLeaseLifetime, defaultLeaseLifetime) + gracePeriod := m.getEnvAsInt(EnvKeyGracePeriod, defaultGracePeriod) + + m.enableFastFailover = enableFastFailover + if m.enableFastFailover { + kubeclientset, err := m.NewKubeClient() + if err != nil { + m.logger.WithError(err).Warn("Failed to make lease client for fast failover") + return nil, err + } + + // Use the clientset to get the node name of the share-manager pod + // and store for use as lease holder. + podName := shareManagerPrefix + m.volume.Name + pod, err := kubeclientset.CoreV1().Pods(namespace).Get(m.context, podName, metav1.GetOptions{}) + if err != nil { + m.logger.WithError(err).Warn("Failed to get share-manager pod for fast failover") + return nil, err + } + m.leaseHolder = pod.Spec.NodeName + m.leaseClient = kubeclientset.CoordinationV1() + } + + nfsServer, err := nfs.NewServer(logger, configPath, types.ExportPath, volume.Name, leaseLifetime, gracePeriod) if err != nil { return nil, err } @@ -73,6 +119,7 @@ func (m *ShareManager) Run() error { m.Shutdown() }() + // Check every waitBetweenChecks for volume attachment. Then run server process once and wait for completion. for ; ; time.Sleep(waitBetweenChecks) { select { case <-m.context.Done(): @@ -105,6 +152,15 @@ func (m *ShareManager) Run() error { } m.logger.Info("Starting nfs server, volume is ready for export") + + if m.enableFastFailover { + if err = m.takeLease(); err != nil { + m.logger.WithError(err).Warn("Failed to take lease for fast failovr") + return err + } + go m.runLeaseRenew() + } + go m.runHealthCheck() if _, err := m.nfsServer.CreateExport(vol.Name); err != nil { @@ -114,7 +170,7 @@ func (m *ShareManager) Run() error { m.SetShareExported(true) - // This blocks until server exist + // This blocks until server exits if err := m.nfsServer.Run(m.context); err != nil { m.logger.WithError(err).Error("NFS server exited with error") } @@ -210,6 +266,107 @@ func (m *ShareManager) resizeVolume(devicePath, mountPath string) error { return nil } +func (m *ShareManager) getEnvAsInt(key string, defaultVal int) int { + env := os.Getenv(key) + if env == "" { + m.logger.Warnf("Failed to get expected environment variable, env %v wasn't set, defaulting to %v", key, defaultVal) + return defaultVal + } + value, err := strconv.Atoi(env) + if err != nil { + m.logger.Warnf("Failed to convert environment variable, %v, value %v, to an int, defaulting to %v", key, env, defaultVal) + return defaultVal + } + return value +} + +func (m *ShareManager) getEnvAsBool(key string, defaultVal bool) bool { + env := os.Getenv(key) + if env == "" { + m.logger.Warnf("Failed to get expected environment variable, env %v wasn't set, defaulting to %v", key, defaultVal) + return defaultVal + } + value, err := strconv.ParseBool(env) + if err != nil { + m.logger.Warnf("Failed to convert environment variable, %v, value %v, to an int, defaulting to %v", key, env, defaultVal) + return defaultVal + } + return value +} + +func (m *ShareManager) NewKubeClient() (*kubernetes.Clientset, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + + return clientset, nil +} + +func (m *ShareManager) takeLease() error { + if m.leaseClient == nil { + return fmt.Errorf("kubernetes API client is unset") + } + + lease, err := m.leaseClient.Leases(namespace).Get(m.context, m.volume.Name, metav1.GetOptions{}) + if err != nil { + return err + } + m.lease = lease + + now := time.Now() + currentHolder := *m.lease.Spec.HolderIdentity + if currentHolder != "" { + m.logger.Infof("Updating lease holderIdentity from %v to %v", currentHolder, m.leaseHolder) + } + + *m.lease.Spec.HolderIdentity = m.leaseHolder + *m.lease.Spec.LeaseTransitions = *m.lease.Spec.LeaseTransitions + 1 + m.lease.Spec.AcquireTime = &metav1.MicroTime{Time: now} + m.lease.Spec.RenewTime = &metav1.MicroTime{Time: now} + + lease, err = m.leaseClient.Leases(namespace).Update(m.context, m.lease, metav1.UpdateOptions{}) + if err != nil { + return err + } + m.lease = lease + m.logger.Infof("Took lease for volume %v as holder %v", m.volume.Name, m.leaseHolder) + return nil +} + +func (m *ShareManager) renewLease() error { + m.lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} + lease, err := m.leaseClient.Leases(namespace).Update(m.context, m.lease, metav1.UpdateOptions{}) + if err != nil { + return err + } + m.lease = lease + return nil +} + +func (m *ShareManager) runLeaseRenew() { + m.logger.Infof("Starting lease renewal for volume mounted at: %v", types.GetMountPath(m.volume.Name)) + ticker := time.NewTicker(leaseRenewInterval) + defer ticker.Stop() + + for { + select { + case <-m.context.Done(): + m.logger.Info("NFS lease renewal is ending") + return + case <-ticker.C: + if err := m.renewLease(); err != nil { + m.logger.Warn("Failed to renew share-manager lease - expect to be terminated.") + } + } + } +} + func (m *ShareManager) runHealthCheck() { m.logger.Infof("Starting health check for volume mounted at: %v", types.GetMountPath(m.volume.Name)) ticker := time.NewTicker(healthCheckInterval) diff --git a/vendor/modules.txt b/vendor/modules.txt index 04cbfb91..02730ef7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -268,7 +268,7 @@ gopkg.in/yaml.v2 # gopkg.in/yaml.v3 v3.0.1 ## explicit gopkg.in/yaml.v3 -# k8s.io/api v0.30.2 => k8s.io/api v0.30.2 +# k8s.io/api v0.30.3 => k8s.io/api v0.30.2 ## explicit; go 1.22.0 k8s.io/api/admissionregistration/v1 k8s.io/api/admissionregistration/v1alpha1 @@ -324,7 +324,7 @@ k8s.io/api/storage/v1 k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1beta1 k8s.io/api/storagemigration/v1alpha1 -# k8s.io/apimachinery v0.30.2 => k8s.io/apimachinery v0.30.2 +# k8s.io/apimachinery v0.30.3 => k8s.io/apimachinery v0.30.2 ## explicit; go 1.22.0 k8s.io/apimachinery/pkg/api/equality k8s.io/apimachinery/pkg/api/errors @@ -366,7 +366,7 @@ k8s.io/apimachinery/pkg/util/yaml k8s.io/apimachinery/pkg/version k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/client-go v0.30.2 => k8s.io/client-go v0.30.2 +# k8s.io/client-go v0.30.3 => k8s.io/client-go v0.30.2 ## explicit; go 1.22.0 k8s.io/client-go/applyconfigurations/admissionregistration/v1 k8s.io/client-go/applyconfigurations/admissionregistration/v1alpha1