Skip to content

Commit

Permalink
refactoring without logic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan.rieckhof committed Jan 16, 2025
1 parent 52e5c76 commit a527776
Showing 1 changed file with 32 additions and 40 deletions.
72 changes: 32 additions & 40 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/redis/go-redis/v9"
)

const waitRetry = time.Millisecond * 500

type status int

const (
Expand All @@ -34,17 +32,15 @@ const Label = "github.com.pace.bricks.activepassive"
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
// In order to determine the active, a lock needs to be hold
// in redis. Hocks can be passed to handle the case of becoming
// in redis. Hooks can be passed to handle the case of becoming
// the active or passive.
// The readiness probe will report the state (ACTIVE/PASSIVE)
// of each of the members in the cluster.
type ActivePassive struct {
// OnActive will be called in case the current processes
// is elected to be the active one
// OnActive will be called in case the current processes is elected to be the active one
OnActive func(ctx context.Context)

// OnPassive will be called in case the current process is
// the passive one
// OnPassive will be called in case the current process is the passive one
OnPassive func(ctx context.Context)

// OnStop is called after the ActivePassive process stops
Expand All @@ -56,41 +52,41 @@ type ActivePassive struct {
locker *redislock.Client

// access to the kubernetes api
client *k8sapi.Client
k8sClient *k8sapi.Client

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

// NewActivePassive creates a new active passive cluster
// identified by the name, the time to failover determines
// the frequency of checks performed against the redis to
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one processes
// is not working correctly as there is only one readiness
// probe.
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
cl, err := k8sapi.NewClient()
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
}

ap := &ActivePassive{
activePassive := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
client: cl,
k8sClient: k8sClient,
}
health.SetCustomReadinessCheck(ap.Handler)

return ap, nil
health.SetCustomReadinessCheck(activePassive.Handler)

return activePassive, nil
}

// Run registers the readiness probe and calls the OnActive
// and OnPassive callbacks in case the election toke place.
// Will handle panic safely and therefore can be directly called
// with go.
// Run manages distributed lock-based leadership.
// This method is designed to continually monitor and maintain the leadership status of the calling pod,
// ensuring only one active instance holds the lock at a time, while transitioning other instances to passive
// mode. The handler will try to renew its active status by refreshing the lock periodically.
func (a *ActivePassive) Run(ctx context.Context) error {
defer errors.HandleWithCtx(ctx, "activepassive failover handler")

Expand All @@ -101,7 +97,6 @@ func (a *ActivePassive) Run(ctx context.Context) error {
a.close = make(chan struct{})
defer close(a.close)

// trigger stop handler
defer func() {
if a.OnStop != nil {
a.OnStop()
Expand All @@ -110,53 +105,50 @@ func (a *ActivePassive) Run(ctx context.Context) error {

var lock *redislock.Lock

// t is a ticker that reminds to call refresh if
// the token was acquired after half of the remaining ttl time
t := time.NewTicker(a.timeToFailover)
// Ticker to try to refresh the lock's TTL before it expires
tryRefreshLock := time.NewTicker(a.timeToFailover)

// retry time triggers to check if the look needs to be acquired
retry := time.NewTicker(waitRetry)
// Ticker to try to acquire the lock if in passive or undefined state
tryAcquireInterval := 500 * time.Millisecond
tryAcquireLock := time.NewTicker(tryAcquireInterval)

for {
// allow close or cancel
select {
case <-ctx.Done():
return ctx.Err()
case <-a.close:
return nil
case <-t.C:
case <-tryRefreshLock.C:
if a.getState() == ACTIVE {
err := lock.Refresh(ctx, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
})
if err != nil {
logger.Debug().Err(err).Msg("failed to refresh")
logger.Info().Err(err).Msg("failed to refresh the lock; becoming undefined...")
a.becomeUndefined(ctx)
}
}
case <-retry.C:
// try to acquire the lock, as we are not the active
case <-tryAcquireLock.C:
if a.getState() != ACTIVE {
var err error

lock, err = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
})
if err != nil {
// we became passive, trigger callback
if a.getState() != PASSIVE {
logger.Debug().Err(err).Msg("becoming passive")
logger.Info().Err(err).Msg("failed to obtain the lock; becoming passive...")
a.becomePassive(ctx)
}

continue
}

// lock acquired
logger.Debug().Msg("becoming active")
logger.Debug().Msg("lock acquired; becoming active...")
a.becomeActive(ctx)

// set to trigger refresh after TTL / 2
t.Reset(a.timeToFailover / 2)
// Reset the refresh ticker to half of the time to failover
tryRefreshLock.Reset(a.timeToFailover / 2)
}
}
}
Expand Down Expand Up @@ -207,7 +199,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand Down

0 comments on commit a527776

Please sign in to comment.