Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/api/common"
"github.com/formancehq/ledger/internal/leadership"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewServeCommand() *cobra.Command {
}),
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
leadership.NewFXModule(),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
Expand All @@ -122,15 +124,15 @@ func NewServeCommand() *cobra.Command {
}),
fx.Decorate(func(
params struct {
fx.In
fx.In

Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger
Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger

MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
) chi.Router {
return assembleFinalRouter(
service.IsDebug(cmd),
Expand Down
2 changes: 1 addition & 1 deletion internal/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//go:generate gomarkdoc -o README.md --repository.default-branch main
//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger
package ledger
85 changes: 85 additions & 0 deletions internal/leadership/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package leadership

import (
"sync"
)

type listener struct {
channel chan Leadership
}

type Broadcaster struct {
mu sync.Mutex
t *Leadership

inner []listener
outer chan Leadership
}

func (h *Broadcaster) Actual() Leadership {
h.mu.Lock()
defer h.mu.Unlock()

if h.t == nil {
return Leadership{}
}
return *h.t
}

func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan Leadership, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.t
paul-nicolas marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 40 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L29-L40

Added lines #L29 - L40 were not covered by tests

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
} else {
h.inner = h.inner[:index]
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 50 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L42-L50

Added lines #L42 - L50 were not covered by tests
}
}

func (h *Broadcaster) Broadcast(t Leadership) {
h.mu.Lock()
defer h.mu.Unlock()

h.t = &t

for _, inner := range h.inner {
inner.channel <- t
}

Check warning on line 62 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

func (h *Broadcaster) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}

Check warning on line 71 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L65-L71

Added lines #L65 - L71 were not covered by tests
}

func (h *Broadcaster) CountListeners() int {
h.mu.Lock()
defer h.mu.Unlock()

return len(h.inner)

Check warning on line 78 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L74-L78

Added lines #L74 - L78 were not covered by tests
}

func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
}
}
43 changes: 43 additions & 0 deletions internal/leadership/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package leadership

import (
"context"
"sync"
)

type contextKey struct{}

var holderContextKey contextKey = struct{}{}

func ContextWithLeadershipInfo(ctx context.Context) context.Context {
return context.WithValue(ctx, holderContextKey, &holder{})
}

func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}

Check warning on line 20 in internal/leadership/context.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/context.go#L19-L20

Added lines #L19 - L20 were not covered by tests
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

return holder.isLeader
}

func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

holder.isLeader = isLeader
}

type holder struct {
sync.Mutex
isLeader bool
}
6 changes: 6 additions & 0 deletions internal/leadership/leadership.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package leadership

type Leadership struct {
Acquired bool
DB DBHandle
}
54 changes: 54 additions & 0 deletions internal/leadership/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package leadership

import (
"context"
"fmt"
"github.com/uptrace/bun"
)

const leadershipAdvisoryLockKey = 123456789

type DBHandle interface {
bun.IDB
Close() error
}

// Locker take a lock at process level
// It returns a bun.IDB which MUST be invalidated when the lock is lost
type Locker interface {
Take(ctx context.Context) (DBHandle, error)
}

type defaultLocker struct {
db *bun.DB
}

func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) {
conn, err := p.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("error opening new connection: %w", err)
}

Check warning on line 30 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L29-L30

Added lines #L29 - L30 were not covered by tests

ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
if ret.Err() != nil {
_ = conn.Close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle connection close errors.

The error from conn.Close() is consistently ignored. These errors should be wrapped and returned.

-		_ = conn.Close()
+		if closeErr := conn.Close(); closeErr != nil {
+			return nil, fmt.Errorf("error closing connection after lock acquisition failure: %w (original error: %v)", closeErr, err)
+		}

Also applies to: 40-40, 45-45

return nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
}

Check warning on line 36 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L34-L36

Added lines #L34 - L36 were not covered by tests

var acquired bool
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
return nil, err
}

Check warning on line 42 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L40-L42

Added lines #L40 - L42 were not covered by tests

if !acquired {
_ = conn.Close()
return nil, nil
}

return conn, nil
}

func NewDefaultLocker(db *bun.DB) Locker {
return &defaultLocker{db: db}
}
24 changes: 24 additions & 0 deletions internal/leadership/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build it

package leadership

import (
. "github.com/formancehq/go-libs/v2/testing/utils"
"testing"

"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/testing/docker"
"github.com/formancehq/go-libs/v2/testing/platform/pgtesting"
)

var (
srv *pgtesting.PostgresServer
)

func TestMain(m *testing.M) {
WithTestMain(func(t *TestingTForMain) int {
srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto"))

return m.Run()
})
}
99 changes: 99 additions & 0 deletions internal/leadership/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package leadership

import (
"context"
"github.com/formancehq/go-libs/v2/logging"
"time"
)

type Manager struct {
locker Locker
changes *Broadcaster
logger logging.Logger
retryPeriod time.Duration
stopChannel chan chan struct{}
}

func (m *Manager) Run(ctx context.Context) {
var (
db DBHandle
nextRetry = time.After(time.Duration(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It spams ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry just at init

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just the first time

err error
)
for {
select {
case ch := <-m.stopChannel:
if db != nil {
m.logger.Info("leadership lost")
_ = db.Close()
setIsLeader(ctx, false)
m.changes.Broadcast(Leadership{})
}
close(ch)
close(m.stopChannel)
return
case <-nextRetry:
db, err = m.locker.Take(ctx)
if err != nil || db == nil {
if err != nil {
m.logger.Error("error acquiring lock", err)
}

Check warning on line 40 in internal/leadership/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/manager.go#L39-L40

Added lines #L39 - L40 were not covered by tests
nextRetry = time.After(m.retryPeriod)
continue
}

m.changes.Broadcast(Leadership{
DB: db,
Acquired: true,
})
m.logger.Info("leadership acquired")

setIsLeader(ctx, true)
}
}
}

func (m *Manager) Stop(ctx context.Context) error {
select {
// if already closed
case <-m.stopChannel:
return nil
default:
ch := make(chan struct{})
m.stopChannel <- ch
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 66 in internal/leadership/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/manager.go#L65-L66

Added lines #L65 - L66 were not covered by tests
case <-ch:
return nil
}
}
}

func (m *Manager) GetSignal() *Broadcaster {
return m.changes
}

func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manager {
l := &Manager{
locker: locker,
logger: logger,
changes: NewSignal(),
retryPeriod: 2 * time.Second,
stopChannel: make(chan chan struct{}),
}

for _, option := range options {
option(l)
}

return l
}

type Option func(leadership *Manager)

func WithRetryPeriod(duration time.Duration) Option {
return func(leadership *Manager) {
leadership.retryPeriod = duration
}
}
Loading