Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share-manager HA - lease renewal #222

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ require (
golang.org/x/sys v0.22.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
k8s.io/client-go v0.30.3
k8s.io/kubernetes v1.30.3
k8s.io/mount-utils v0.30.3
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
Expand Down Expand Up @@ -97,9 +100,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.30.3 // indirect
k8s.io/apimachinery v0.30.3 // indirect
k8s.io/client-go v0.30.3 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
18 changes: 11 additions & 7 deletions pkg/server/nfs/nfs_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ LOG {

NFSV4
{
Lease_Lifetime = 60;
Grace_Period = 90;
Lease_Lifetime = {{.LeaseLifetime}};
Grace_Period = {{.GracePeriod}};
Minor_Versions = 1, 2;
RecoveryBackend = longhorn;
Only_Numeric_Owners = true;
Expand Down Expand Up @@ -84,13 +84,13 @@ type Server struct {
exporter *Exporter
}

func NewServer(logger logrus.FieldLogger, configPath, exportPath, volume string) (*Server, error) {
func NewServer(logger logrus.FieldLogger, configPath, exportPath, volume string, leaseLifetime, gracePeriod int) (*Server, error) {
if err := setRlimitNOFILE(logger); err != nil {
logger.WithError(err).Warn("Error setting RLIMIT_NOFILE, there may be 'Too many open files' errors later")
}

if _, err := os.Stat(configPath); os.IsNotExist(err) {
if err = os.WriteFile(configPath, getUpdatedGaneshConfig(defaultConfig), 0600); err != nil {
if err = os.WriteFile(configPath, getUpdatedGaneshConfig(defaultConfig, leaseLifetime, gracePeriod), 0600); err != nil {
return nil, errors.Wrapf(err, "error writing nfs config %s", configPath)
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func setRlimitNOFILE(logger logrus.FieldLogger) error {
return nil
}

func getUpdatedGaneshConfig(config []byte) []byte {
func getUpdatedGaneshConfig(config []byte, leaseLifetime, gracePeriod int) []byte {
var (
tmplBuf bytes.Buffer
logPath string
Expand All @@ -158,9 +158,13 @@ func getUpdatedGaneshConfig(config []byte) []byte {
}

tmplVals := struct {
LogPath string
LogPath string
LeaseLifetime int
GracePeriod int
}{
LogPath: logPath,
LogPath: logPath,
LeaseLifetime: leaseLifetime,
GracePeriod: gracePeriod,
}

if err := template.Must(template.New("Ganesha_Config").Parse(string(config))).Execute(&tmplBuf, tmplVals); err != nil {
Expand Down
158 changes: 156 additions & 2 deletions pkg/server/share_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ package server
import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/rest"
mount "k8s.io/mount-utils"

"github.com/longhorn/longhorn-share-manager/pkg/crypto"
Expand All @@ -20,8 +28,17 @@ import (
)

const waitBetweenChecks = time.Second * 5
const leaseRenewInterval = time.Second * 3
const healthCheckInterval = time.Second * 10
derekbit marked this conversation as resolved.
Show resolved Hide resolved
const configPath = "/tmp/vfs.conf"
const namespace = "longhorn-system"
const shareManagerPrefix = "share-manager-"

const EnvKeyFastFailover = "FAST_FAILOVER"
const EnvKeyLeaseLifetime = "LEASE_LIFETIME"
const EnvKeyGracePeriod = "GRACE_PERIOD"
const defaultLeaseLifetime = 60
const defaultGracePeriod = 90

const (
UnhealthyErr = "UNHEALTHY: volume with mount path %v is unhealthy"
Expand All @@ -37,6 +54,11 @@ type ShareManager struct {
context context.Context
shutdown context.CancelFunc

enableFastFailover bool
leaseHolder string
leaseClient coordinationv1client.LeasesGetter
lease *coordinationv1.Lease

nfsServer *nfs.Server
}

Expand All @@ -47,7 +69,30 @@ func NewShareManager(logger logrus.FieldLogger, volume volume.Volume) (*ShareMan
}
m.context, m.shutdown = context.WithCancel(context.Background())

nfsServer, err := nfs.NewServer(logger, configPath, types.ExportPath, volume.Name)
m.enableFastFailover = m.getEnvAsBool(EnvKeyFastFailover, false)
leaseLifetime := m.getEnvAsInt(EnvKeyLeaseLifetime, defaultLeaseLifetime)
gracePeriod := m.getEnvAsInt(EnvKeyGracePeriod, defaultGracePeriod)

if m.enableFastFailover {
kubeclientset, err := m.NewKubeClient()
if err != nil {
m.logger.WithError(err).Error("Failed to make lease client for fast failover")
return nil, err
}

// Use the clientset to get the node name of the share-manager pod
// and store for use as lease holder.
podName := shareManagerPrefix + m.volume.Name
pod, err := kubeclientset.CoreV1().Pods(namespace).Get(m.context, podName, metav1.GetOptions{})
ejweber marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
m.logger.WithError(err).Warn("Failed to get share-manager pod for fast failover")
return nil, err
}
m.leaseHolder = pod.Spec.NodeName
m.leaseClient = kubeclientset.CoordinationV1()
}

nfsServer, err := nfs.NewServer(logger, configPath, types.ExportPath, volume.Name, leaseLifetime, gracePeriod)
if err != nil {
return nil, err
}
Expand All @@ -73,6 +118,7 @@ func (m *ShareManager) Run() error {
m.Shutdown()
}()

// Check every waitBetweenChecks for volume attachment. Then run server process once and wait for completion.
for ; ; time.Sleep(waitBetweenChecks) {
select {
case <-m.context.Done():
Expand Down Expand Up @@ -105,6 +151,15 @@ func (m *ShareManager) Run() error {
}

m.logger.Info("Starting nfs server, volume is ready for export")

if m.enableFastFailover {
if err = m.takeLease(); err != nil {
m.logger.WithError(err).Error("Failed to take lease for fast failovr")
return err
}
go m.runLeaseRenew()
}

go m.runHealthCheck()

if _, err := m.nfsServer.CreateExport(vol.Name); err != nil {
Expand All @@ -114,7 +169,7 @@ func (m *ShareManager) Run() error {

m.SetShareExported(true)

// This blocks until server exist
// This blocks until server exits
if err := m.nfsServer.Run(m.context); err != nil {
m.logger.WithError(err).Error("NFS server exited with error")
}
Expand Down Expand Up @@ -210,6 +265,105 @@ func (m *ShareManager) resizeVolume(devicePath, mountPath string) error {
return nil
}

func (m *ShareManager) getEnvAsInt(key string, defaultVal int) int {
env := os.Getenv(key)
if env == "" {
m.logger.Warnf("Failed to get expected environment variable, env %v wasn't set, defaulting to %v", key, defaultVal)
return defaultVal
}
value, err := strconv.Atoi(env)
if err != nil {
m.logger.Warnf("Failed to convert environment variable, %v, value %v, to an int, defaulting to %v", key, env, defaultVal)
return defaultVal
}
return value
}

func (m *ShareManager) getEnvAsBool(key string, defaultVal bool) bool {
env := os.Getenv(key)
if env == "" {
m.logger.Warnf("Failed to get expected environment variable, env %v wasn't set, defaulting to %v", key, defaultVal)
return defaultVal
}
value, err := strconv.ParseBool(env)
if err != nil {
m.logger.Warnf("Failed to convert environment variable, %v, value %v, to an int, defaulting to %v", key, env, defaultVal)
return defaultVal
}
return value
}

func (m *ShareManager) NewKubeClient() (*kubernetes.Clientset, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
james-munson marked this conversation as resolved.
Show resolved Hide resolved

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}

return clientset, nil
}

func (m *ShareManager) takeLease() error {
if m.leaseClient == nil {
return fmt.Errorf("kubernetes API client is unset")
ejweber marked this conversation as resolved.
Show resolved Hide resolved
ejweber marked this conversation as resolved.
Show resolved Hide resolved
}

lease, err := m.leaseClient.Leases(namespace).Get(m.context, m.volume.Name, metav1.GetOptions{})
if err != nil {
return err
}
m.lease = lease

now := time.Now()
currentHolder := *m.lease.Spec.HolderIdentity
m.logger.Infof("Updating lease holderIdentity from %v to %v", currentHolder, m.leaseHolder)

*m.lease.Spec.HolderIdentity = m.leaseHolder
*m.lease.Spec.LeaseTransitions = *m.lease.Spec.LeaseTransitions + 1
m.lease.Spec.AcquireTime = &metav1.MicroTime{Time: now}
m.lease.Spec.RenewTime = &metav1.MicroTime{Time: now}

lease, err = m.leaseClient.Leases(namespace).Update(m.context, m.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
m.lease = lease
m.logger.Infof("Took lease for volume %v as holder %v", m.volume.Name, m.leaseHolder)
return nil
}

func (m *ShareManager) renewLease() error {
m.lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
lease, err := m.leaseClient.Leases(namespace).Update(m.context, m.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
m.lease = lease
return nil
}

func (m *ShareManager) runLeaseRenew() {
m.logger.Infof("Starting lease renewal for volume mounted at: %v", types.GetMountPath(m.volume.Name))
ticker := time.NewTicker(leaseRenewInterval)
defer ticker.Stop()

for {
select {
case <-m.context.Done():
m.logger.Info("NFS lease renewal is ending")
return
case <-ticker.C:
if err := m.renewLease(); err != nil {
m.logger.Warn("Failed to renew share-manager lease - expect to be terminated.")
}
derekbit marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (m *ShareManager) runHealthCheck() {
m.logger.Infof("Starting health check for volume mounted at: %v", types.GetMountPath(m.volume.Name))
ticker := time.NewTicker(healthCheckInterval)
Expand Down