Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/api/common"
"github.com/formancehq/ledger/internal/leadership"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewServeCommand() *cobra.Command {
}),
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
leadership.NewFXModule(),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
Expand All @@ -122,15 +124,15 @@ func NewServeCommand() *cobra.Command {
}),
fx.Decorate(func(
params struct {
fx.In
fx.In

Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger
Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger

MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
) chi.Router {
return assembleFinalRouter(
service.IsDebug(cmd),
Expand Down
2 changes: 1 addition & 1 deletion internal/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//go:generate gomarkdoc -o README.md --repository.default-branch main
//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger
package ledger
93 changes: 93 additions & 0 deletions internal/leadership/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package leadership

import (
"sync"
)

type listener struct {
channel chan Leadership
}

type Broadcaster struct {
mu sync.Mutex
t *Leadership

inner []listener
outer chan Leadership
}

func (h *Broadcaster) Actual() Leadership {
h.mu.Lock()
defer h.mu.Unlock()

if h.t == nil {
return Leadership{}
}
return *h.t
}

func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan Leadership, 1)
l := listener{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
paul-nicolas marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 40 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L29-L40

Added lines #L29 - L40 were not covered by tests

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

index := -1
for i, listener := range h.inner {
if listener == l {
index = i
break

Check warning on line 50 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L42-L50

Added lines #L42 - L50 were not covered by tests
}
}

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
} else {
h.inner = h.inner[:index]
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 58 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L54-L58

Added lines #L54 - L58 were not covered by tests
}
}

func (h *Broadcaster) Broadcast(t Leadership) {
h.mu.Lock()
defer h.mu.Unlock()

h.t = &t

for _, inner := range h.inner {
inner.channel <- t
}

Check warning on line 70 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

func (h *Broadcaster) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}

Check warning on line 79 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L73-L79

Added lines #L73 - L79 were not covered by tests
}

func (h *Broadcaster) CountListeners() int {
h.mu.Lock()
defer h.mu.Unlock()

return len(h.inner)

Check warning on line 86 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L82-L86

Added lines #L82 - L86 were not covered by tests
}

func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
}
}
43 changes: 43 additions & 0 deletions internal/leadership/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package leadership

import (
"context"
"sync"
)

type contextKey struct{}

var holderContextKey contextKey = struct{}{}

func ContextWithLeadershipInfo(ctx context.Context) context.Context {
return context.WithValue(ctx, holderContextKey, &holder{})
}

func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}

Check warning on line 20 in internal/leadership/context.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/context.go#L19-L20

Added lines #L19 - L20 were not covered by tests
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

return holder.isLeader
}

func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

holder.isLeader = isLeader
}

type holder struct {
sync.Mutex
isLeader bool
}
6 changes: 6 additions & 0 deletions internal/leadership/leadership.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package leadership

type Leadership struct {
Acquired bool
DB *Mutex
}
54 changes: 54 additions & 0 deletions internal/leadership/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package leadership

import (
"context"
"fmt"
"github.com/uptrace/bun"
)

const leadershipAdvisoryLockKey = 123456789

type DBHandle interface {
bun.IDB
Close() error
}

// Locker take a lock at process level
// It returns a bun.IDB which MUST be invalidated when the lock is lost
type Locker interface {
Take(ctx context.Context) (DBHandle, error)
}

type defaultLocker struct {
db *bun.DB
}

func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) {
conn, err := p.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("error opening new connection: %w", err)
}

Check warning on line 30 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L29-L30

Added lines #L29 - L30 were not covered by tests

ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
if ret.Err() != nil {
_ = conn.Close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle connection close errors.

The error from conn.Close() is consistently ignored. These errors should be wrapped and returned.

-		_ = conn.Close()
+		if closeErr := conn.Close(); closeErr != nil {
+			return nil, fmt.Errorf("error closing connection after lock acquisition failure: %w (original error: %v)", closeErr, err)
+		}

Also applies to: 40-40, 45-45

return nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
}

Check warning on line 36 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L34-L36

Added lines #L34 - L36 were not covered by tests

var acquired bool
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
return nil, err
}

if !acquired {

Check warning on line 44 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L40-L44

Added lines #L40 - L44 were not covered by tests
_ = conn.Close()
return nil, nil
}

return conn, nil
}

func NewDefaultLocker(db *bun.DB) Locker {
return &defaultLocker{db: db}
}
24 changes: 24 additions & 0 deletions internal/leadership/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build it

package leadership

import (
. "github.com/formancehq/go-libs/v2/testing/utils"
"testing"

"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/testing/docker"
"github.com/formancehq/go-libs/v2/testing/platform/pgtesting"
)

var (
srv *pgtesting.PostgresServer
)

func TestMain(m *testing.M) {
WithTestMain(func(t *TestingTForMain) int {
srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto"))

return m.Run()
})
}
129 changes: 129 additions & 0 deletions internal/leadership/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package leadership

import (
"context"
"github.com/formancehq/go-libs/v2/logging"
"github.com/uptrace/bun"
"time"
)

type Manager struct {
locker Locker
changes *Broadcaster
logger logging.Logger
retryPeriod time.Duration
stopChannel chan chan struct{}
}

func (m *Manager) Run(ctx context.Context) {
var (
dbMutex *Mutex
nextRetry = time.After(time.Duration(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It spams ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry just at init

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just the first time

nextPing <-chan time.Time
)
for {
select {
case ch := <-m.stopChannel:
if dbMutex != nil {
m.logger.Info("leadership lost")
dbMutex.Exec(func(_ bun.IDB) {
_ = dbMutex.db.Close()
})

setIsLeader(ctx, false)
m.changes.Broadcast(Leadership{})
}
close(ch)
close(m.stopChannel)
return
case <-nextRetry:
db, err := m.locker.Take(ctx)
if err != nil || db == nil {
if err != nil {
m.logger.Error("error acquiring lock", err)
}
nextRetry = time.After(m.retryPeriod)
continue
}

dbMutex = NewMutex(db)

m.changes.Broadcast(Leadership{
DB: dbMutex,
Acquired: true,
})
m.logger.Info("leadership acquired")

setIsLeader(ctx, true)

nextPing = time.After(m.retryPeriod)

// Ping the database to check the connection status
// If the connection is lost, signal the listeners about the leadership loss
case <-nextPing:
dbMutex.Exec(func(db bun.IDB) {
_, err := db.
NewSelect().
ColumnExpr("1 as v").
Count(ctx)
if err != nil {
m.logger.Error("error pinging db", err)
_ = dbMutex.db.Close()
dbMutex = nil

setIsLeader(ctx, false)
m.changes.Broadcast(Leadership{})

nextRetry = time.After(m.retryPeriod)

Check warning on line 77 in internal/leadership/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/manager.go#L70-L77

Added lines #L70 - L77 were not covered by tests
} else {
nextPing = time.After(m.retryPeriod)
}
})
}
}
}

func (m *Manager) Stop(ctx context.Context) error {
select {
// if already closed
case <-m.stopChannel:
return nil
default:
ch := make(chan struct{})
m.stopChannel <- ch
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 96 in internal/leadership/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/manager.go#L95-L96

Added lines #L95 - L96 were not covered by tests
case <-ch:
return nil
}
}
}

func (m *Manager) GetSignal() *Broadcaster {
return m.changes
}

func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manager {
l := &Manager{
locker: locker,
logger: logger,
changes: NewSignal(),
retryPeriod: 2 * time.Second,
stopChannel: make(chan chan struct{}),
}

for _, option := range options {
option(l)
}

return l
}

type Option func(leadership *Manager)

func WithRetryPeriod(duration time.Duration) Option {
return func(leadership *Manager) {
leadership.retryPeriod = duration
}
}
Loading