Skip to content

Commit

Permalink
feat(RWX HA) Add fast-failover code.
Browse files Browse the repository at this point in the history
Signed-off-by: James Munson <james.munson@suse.com>
  • Loading branch information
james-munson committed Jul 19, 2024
1 parent 9555b77 commit fe264b6
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 15 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ require (
golang.org/x/sys v0.22.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
k8s.io/client-go v0.30.3
k8s.io/kubernetes v1.30.2
k8s.io/mount-utils v0.30.2
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
Expand Down Expand Up @@ -97,9 +100,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.30.2 // indirect
k8s.io/apimachinery v0.30.2 // indirect
k8s.io/client-go v0.30.2 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
18 changes: 11 additions & 7 deletions pkg/server/nfs/nfs_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ LOG {
NFSV4
{
Lease_Lifetime = 60;
Grace_Period = 90;
Lease_Lifetime = {{.LeaseLifetime}};
Grace_Period = {{.GracePeriod}};
Minor_Versions = 1, 2;
RecoveryBackend = longhorn;
Only_Numeric_Owners = true;
Expand Down Expand Up @@ -84,13 +84,13 @@ type Server struct {
exporter *Exporter
}

func NewServer(logger logrus.FieldLogger, configPath, exportPath, volume string) (*Server, error) {
func NewServer(logger logrus.FieldLogger, configPath, exportPath, volume string, leaseLifetime int, gracePeriod int) (*Server, error) {
if err := setRlimitNOFILE(logger); err != nil {
logger.WithError(err).Warn("Error setting RLIMIT_NOFILE, there may be 'Too many open files' errors later")
}

if _, err := os.Stat(configPath); os.IsNotExist(err) {
if err = os.WriteFile(configPath, getUpdatedGaneshConfig(defaultConfig), 0600); err != nil {
if err = os.WriteFile(configPath, getUpdatedGaneshConfig(defaultConfig, leaseLifetime, gracePeriod), 0600); err != nil {
return nil, errors.Wrapf(err, "error writing nfs config %s", configPath)
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func setRlimitNOFILE(logger logrus.FieldLogger) error {
return nil
}

func getUpdatedGaneshConfig(config []byte) []byte {
func getUpdatedGaneshConfig(config []byte, leaseLifetime int, gracePeriod int) []byte {
var (
tmplBuf bytes.Buffer
logPath string
Expand All @@ -158,9 +158,13 @@ func getUpdatedGaneshConfig(config []byte) []byte {
}

tmplVals := struct {
LogPath string
LogPath string
LeaseLifetime int
GracePeriod int
}{
LogPath: logPath,
LogPath: logPath,
LeaseLifetime: leaseLifetime,
GracePeriod: gracePeriod,
}

if err := template.Must(template.New("Ganesha_Config").Parse(string(config))).Execute(&tmplBuf, tmplVals); err != nil {
Expand Down
161 changes: 159 additions & 2 deletions pkg/server/share_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ package server
import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/rest"
mount "k8s.io/mount-utils"

"github.com/longhorn/longhorn-share-manager/pkg/crypto"
Expand All @@ -20,8 +28,17 @@ import (
)

const waitBetweenChecks = time.Second * 5
const leaseRenewInterval = time.Second * 3
const healthCheckInterval = time.Second * 10
const configPath = "/tmp/vfs.conf"
const namespace = "longhorn-system"
const shareManagerPrefix = "share-manager-"

const EnvKeyFastFailover = "FAST_FAILOVER"
const EnvKeyLeaseLifetime = "LEASE_LIFETIME"
const EnvKeyGracePeriod = "GRACE_PERIOD"
const defaultLeaseLifetime = 60
const defaultGracePeriod = 90

const (
UnhealthyErr = "UNHEALTHY: volume with mount path %v is unhealthy"
Expand All @@ -37,6 +54,11 @@ type ShareManager struct {
context context.Context
shutdown context.CancelFunc

enableFastFailover bool
leaseHolder string
leaseClient coordinationv1client.LeasesGetter
lease *coordinationv1.Lease

nfsServer *nfs.Server
}

Expand All @@ -47,7 +69,31 @@ func NewShareManager(logger logrus.FieldLogger, volume volume.Volume) (*ShareMan
}
m.context, m.shutdown = context.WithCancel(context.Background())

nfsServer, err := nfs.NewServer(logger, configPath, types.ExportPath, volume.Name)
enableFastFailover := m.getEnvAsBool(EnvKeyFastFailover, false)
leaseLifetime := m.getEnvAsInt(EnvKeyLeaseLifetime, defaultLeaseLifetime)
gracePeriod := m.getEnvAsInt(EnvKeyGracePeriod, defaultGracePeriod)

m.enableFastFailover = enableFastFailover
if m.enableFastFailover {
kubeclientset, err := m.NewKubeClient()
if err != nil {
m.logger.WithError(err).Warn("Failed to make lease client for fast failover")
return nil, err
}

// Use the clientset to get the node name of the share-manager pod
// and store for use as lease holder.
podName := shareManagerPrefix + m.volume.Name
pod, err := kubeclientset.CoreV1().Pods(namespace).Get(m.context, podName, metav1.GetOptions{})
if err != nil {
m.logger.WithError(err).Warn("Failed to get share-manager pod for fast failover")
return nil, err
}
m.leaseHolder = pod.Spec.NodeName
m.leaseClient = kubeclientset.CoordinationV1()
}

nfsServer, err := nfs.NewServer(logger, configPath, types.ExportPath, volume.Name, leaseLifetime, gracePeriod)
if err != nil {
return nil, err
}
Expand All @@ -73,6 +119,7 @@ func (m *ShareManager) Run() error {
m.Shutdown()
}()

// Check every waitBetweenChecks for volume attachment. Then run server process once and wait for completion.
for ; ; time.Sleep(waitBetweenChecks) {
select {
case <-m.context.Done():
Expand Down Expand Up @@ -105,6 +152,15 @@ func (m *ShareManager) Run() error {
}

m.logger.Info("Starting nfs server, volume is ready for export")

if m.enableFastFailover {
if err = m.takeLease(); err != nil {
m.logger.WithError(err).Warn("Failed to take lease for fast failovr")
return err
}
go m.runLeaseRenew()
}

go m.runHealthCheck()

if _, err := m.nfsServer.CreateExport(vol.Name); err != nil {
Expand All @@ -114,7 +170,7 @@ func (m *ShareManager) Run() error {

m.SetShareExported(true)

// This blocks until server exist
// This blocks until server exits
if err := m.nfsServer.Run(m.context); err != nil {
m.logger.WithError(err).Error("NFS server exited with error")
}
Expand Down Expand Up @@ -210,6 +266,107 @@ func (m *ShareManager) resizeVolume(devicePath, mountPath string) error {
return nil
}

func (m *ShareManager) getEnvAsInt(key string, defaultVal int) int {
env := os.Getenv(key)
if env == "" {
m.logger.Warnf("Failed to get expected environment variable, env %v wasn't set, defaulting to %v", key, defaultVal)
return defaultVal
}
value, err := strconv.Atoi(env)
if err != nil {
m.logger.Warnf("Failed to convert environment variable, %v, value %v, to an int, defaulting to %v", key, env, defaultVal)
return defaultVal
}
return value
}

func (m *ShareManager) getEnvAsBool(key string, defaultVal bool) bool {
env := os.Getenv(key)
if env == "" {
m.logger.Warnf("Failed to get expected environment variable, env %v wasn't set, defaulting to %v", key, defaultVal)
return defaultVal
}
value, err := strconv.ParseBool(env)
if err != nil {
m.logger.Warnf("Failed to convert environment variable, %v, value %v, to an int, defaulting to %v", key, env, defaultVal)
return defaultVal
}
return value
}

func (m *ShareManager) NewKubeClient() (*kubernetes.Clientset, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}

return clientset, nil
}

func (m *ShareManager) takeLease() error {
if m.leaseClient == nil {
return fmt.Errorf("kubernetes API client is unset")
}

lease, err := m.leaseClient.Leases(namespace).Get(m.context, m.volume.Name, metav1.GetOptions{})
if err != nil {
return err
}
m.lease = lease

now := time.Now()
currentHolder := *m.lease.Spec.HolderIdentity
if currentHolder != "" {
m.logger.Infof("Updating lease holderIdentity from %v to %v", currentHolder, m.leaseHolder)
}

*m.lease.Spec.HolderIdentity = m.leaseHolder
*m.lease.Spec.LeaseTransitions = *m.lease.Spec.LeaseTransitions + 1
m.lease.Spec.AcquireTime = &metav1.MicroTime{Time: now}
m.lease.Spec.RenewTime = &metav1.MicroTime{Time: now}

lease, err = m.leaseClient.Leases(namespace).Update(m.context, m.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
m.lease = lease
m.logger.Infof("Took lease for volume %v as holder %v", m.volume.Name, m.leaseHolder)
return nil
}

func (m *ShareManager) renewLease() error {
m.lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
lease, err := m.leaseClient.Leases(namespace).Update(m.context, m.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
m.lease = lease
return nil
}

func (m *ShareManager) runLeaseRenew() {
m.logger.Infof("Starting lease renewal for volume mounted at: %v", types.GetMountPath(m.volume.Name))
ticker := time.NewTicker(leaseRenewInterval)
defer ticker.Stop()

for {
select {
case <-m.context.Done():
m.logger.Info("NFS lease renewal is ending")
return
case <-ticker.C:
if err := m.renewLease(); err != nil {
m.logger.Warn("Failed to renew share-manager lease - expect to be terminated.")
}
}
}
}

func (m *ShareManager) runHealthCheck() {
m.logger.Infof("Starting health check for volume mounted at: %v", types.GetMountPath(m.volume.Name))
ticker := time.NewTicker(healthCheckInterval)
Expand Down
6 changes: 3 additions & 3 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ gopkg.in/yaml.v2
# gopkg.in/yaml.v3 v3.0.1
## explicit
gopkg.in/yaml.v3
# k8s.io/api v0.30.2 => k8s.io/api v0.30.2
# k8s.io/api v0.30.3 => k8s.io/api v0.30.2
## explicit; go 1.22.0
k8s.io/api/admissionregistration/v1
k8s.io/api/admissionregistration/v1alpha1
Expand Down Expand Up @@ -324,7 +324,7 @@ k8s.io/api/storage/v1
k8s.io/api/storage/v1alpha1
k8s.io/api/storage/v1beta1
k8s.io/api/storagemigration/v1alpha1
# k8s.io/apimachinery v0.30.2 => k8s.io/apimachinery v0.30.2
# k8s.io/apimachinery v0.30.3 => k8s.io/apimachinery v0.30.2
## explicit; go 1.22.0
k8s.io/apimachinery/pkg/api/equality
k8s.io/apimachinery/pkg/api/errors
Expand Down Expand Up @@ -366,7 +366,7 @@ k8s.io/apimachinery/pkg/util/yaml
k8s.io/apimachinery/pkg/version
k8s.io/apimachinery/pkg/watch
k8s.io/apimachinery/third_party/forked/golang/reflect
# k8s.io/client-go v0.30.2 => k8s.io/client-go v0.30.2
# k8s.io/client-go v0.30.3 => k8s.io/client-go v0.30.2
## explicit; go 1.22.0
k8s.io/client-go/applyconfigurations/admissionregistration/v1
k8s.io/client-go/applyconfigurations/admissionregistration/v1alpha1
Expand Down

0 comments on commit fe264b6

Please sign in to comment.