Skip to content

Commit

Permalink
failover: Allow custom state setter
Browse files Browse the repository at this point in the history
  • Loading branch information
monstermunchkin committed Jan 8, 2025
1 parent c943684 commit 4fca574
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 10 deletions.
44 changes: 34 additions & 10 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/bsm/redislock"
"github.com/pace/bricks/backend/k8sapi"
"github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health"
"github.com/pace/bricks/maintenance/log"
Expand All @@ -28,8 +27,6 @@ const (
ACTIVE status = 1
)

const Label = "github.com.pace.bricks.activepassive"

// ActivePassive implements a failover mechanism that allows
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
Expand All @@ -55,33 +52,58 @@ type ActivePassive struct {
timeToFailover time.Duration
locker *redislock.Client

// access to the kubernetes api
client *k8sapi.Client
stateSetter StateSetter

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

type ActivePassiveOption func(*ActivePassive) error

func WithCustomStateSetter(fn func(ctx context.Context, state string) error) ActivePassiveOption {
return func(ap *ActivePassive) error {
stateSetter, err := NewCustomStateSetter(fn)
if err != nil {
return fmt.Errorf("failed to create state setter: %w", err)
}

ap.stateSetter = stateSetter
return nil
}
}

// NewActivePassive creates a new active passive cluster
// identified by the name, the time to failover determines
// the frequency of checks performed against the redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one processes
// is not working correctly as there is only one readiness
// probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
cl, err := k8sapi.NewClient()
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client, opts ...ActivePassiveOption) (*ActivePassive, error) {
var (
stateSetter StateSetter
err error
)

// Default state setter uses the k8s api to set the state.
stateSetter, err = NewPodStateSetter()
if err != nil {
return nil, err
log.Ctx(context.Background()).Warn().Err(err).Msg("failed to create state setter")
stateSetter = &NoopStateSetter{}
}

ap := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
client: cl,
stateSetter: stateSetter,
}

for _, opt := range opts {
opt(ap)

Check failure on line 104 in maintenance/failover/failover.go

View workflow job for this annotation

GitHub Actions / test (1.23)

Error return value is not checked (errcheck)
}

health.SetCustomReadinessCheck(ap.Handler)

return ap, nil
Expand Down Expand Up @@ -165,6 +187,8 @@ func (a *ActivePassive) Run(ctx context.Context) error {
// passive in next iteration
logger.Debug().Msg("ttl expired")
a.becomeUndefined(ctx)

continue
}
refreshTime := d / 2

Expand Down Expand Up @@ -222,7 +246,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.stateSetter.SetState(ctx, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand Down
53 changes: 53 additions & 0 deletions maintenance/failover/state_setter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package failover

import (
"context"
"fmt"

"github.com/pace/bricks/backend/k8sapi"
)

const Label = "github.com.pace.bricks.activepassive"

type StateSetter interface {
SetState(ctx context.Context, state string) error
}

type podStateSetter struct {
client *k8sapi.Client
}

func NewPodStateSetter() (*podStateSetter, error) {
client, err := k8sapi.NewClient()
if err != nil {
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}

return &podStateSetter{client: client}, nil
}

func (p *podStateSetter) SetState(ctx context.Context, state string) error {
return p.client.SetCurrentPodLabel(ctx, Label, state)
}

type CustomStateSetter struct {
fn func(ctx context.Context, state string) error
}

func NewCustomStateSetter(fn func(ctx context.Context, state string) error) (*CustomStateSetter, error) {
if fn == nil {
return nil, fmt.Errorf("fn must not be nil")
}

return &CustomStateSetter{fn: fn}, nil
}

func (c *CustomStateSetter) SetState(ctx context.Context, state string) error {
return c.fn(ctx, state)
}

type NoopStateSetter struct{}

func (n *NoopStateSetter) SetState(ctx context.Context, state string) error {
return nil
}

0 comments on commit 4fca574

Please sign in to comment.