Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/api/common"
"github.com/formancehq/ledger/internal/leadership"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewServeCommand() *cobra.Command {
}),
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
leadership.NewFXModule(),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
Expand All @@ -122,15 +124,15 @@ func NewServeCommand() *cobra.Command {
}),
fx.Decorate(func(
params struct {
fx.In
fx.In

Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger
Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger

MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
) chi.Router {
return assembleFinalRouter(
service.IsDebug(cmd),
Expand Down
2 changes: 1 addition & 1 deletion internal/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//go:generate gomarkdoc -o README.md --repository.default-branch main
//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger
package ledger
86 changes: 86 additions & 0 deletions internal/leadership/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package leadership

import (
"sync"
)

type listener struct {
channel chan Leadership
}

type Broadcaster struct {
mu *sync.Mutex
t *Leadership

inner []listener
outer chan Leadership
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use mutex by value and remove unused channel.

The Broadcaster struct has two issues:

  1. Using a pointer to mutex is unnecessary and can lead to nil pointer dereference if not properly initialized.
  2. The outer channel is declared but never used in the implementation.

Apply this diff to fix these issues:

 type Broadcaster struct {
-	mu *sync.Mutex
+	mu sync.Mutex
 	t  *Leadership
 
 	inner []listener
-	outer chan Leadership
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type Broadcaster struct {
mu *sync.Mutex
t *Leadership
inner []listener
outer chan Leadership
}
type Broadcaster struct {
mu sync.Mutex
t *Leadership
inner []listener
}


func (h *Broadcaster) Actual() Leadership {
h.mu.Lock()
defer h.mu.Unlock()

if h.t == nil {
return Leadership{}
}
return *h.t
}

func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan Leadership, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.t
paul-nicolas marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 40 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L29-L40

Added lines #L29 - L40 were not covered by tests

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
} else {
h.inner = h.inner[:index]
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 50 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L42-L50

Added lines #L42 - L50 were not covered by tests
}
}

func (h *Broadcaster) Broadcast(t Leadership) {
h.mu.Lock()
defer h.mu.Unlock()

h.t = &t

for _, inner := range h.inner {
inner.channel <- t
}

Check warning on line 62 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

func (h *Broadcaster) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}

Check warning on line 71 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L65-L71

Added lines #L65 - L71 were not covered by tests
}

func (h *Broadcaster) CountListeners() int {
h.mu.Lock()
defer h.mu.Unlock()

return len(h.inner)

Check warning on line 78 in internal/leadership/broadcaster.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/broadcaster.go#L74-L78

Added lines #L74 - L78 were not covered by tests
}

func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
mu: &sync.Mutex{},
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve constructor naming and initialization.

The NewSignal constructor has several issues:

  1. The name doesn't clearly convey its purpose.
  2. It initializes an unused channel.
  3. It creates a pointer to mutex unnecessarily.

Apply this diff to improve the constructor:

-func NewSignal() *Broadcaster {
+func NewBroadcaster() *Broadcaster {
 	return &Broadcaster{
-		outer: make(chan Leadership),
-		mu:    &sync.Mutex{},
+		mu:    sync.Mutex{},
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
mu: &sync.Mutex{},
}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
mu: sync.Mutex{},
}
}

43 changes: 43 additions & 0 deletions internal/leadership/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package leadership

import (
"context"
"sync"
)

type contextKey string

var holderContextKey contextKey = "holder"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a private type for context key to prevent collisions.

Using a string type for context keys is not recommended as it can lead to key collisions. Instead, use a private unexported type.

Apply this diff to make the context key more robust:

-type contextKey string
+type contextKey struct{}
 
-var holderContextKey contextKey = "holder"
+var holderContextKey = &contextKey{}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type contextKey string
var holderContextKey contextKey = "holder"
type contextKey struct{}
var holderContextKey = &contextKey{}


func ContextWithLeadershipInfo(ctx context.Context) context.Context {
return context.WithValue(ctx, holderContextKey, &holder{})
}

func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}

Check warning on line 20 in internal/leadership/context.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/context.go#L19-L20

Added lines #L19 - L20 were not covered by tests
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

return holder.isLeader
}

func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

holder.isLeader = isLeader
}

type holder struct {
sync.Mutex
isLeader bool
}
6 changes: 6 additions & 0 deletions internal/leadership/leadership.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package leadership

type Leadership struct {
Acquired bool
DB DBHandle
}
54 changes: 54 additions & 0 deletions internal/leadership/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package leadership

import (
"context"
"fmt"
"github.com/uptrace/bun"
)

const leadershipAdvisoryLockKey = 123456789

type DBHandle interface {
bun.IDB
Close() error
}

// Locker take a lock at process level
// It returns a bun.IDB which MUST be invalidated when the lock is lost
type Locker interface {
Take(ctx context.Context) (DBHandle, error)
}

type defaultLocker struct {
db *bun.DB
}

func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) {
conn, err := p.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("error opening new connection: %w", err)
}

Check warning on line 30 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L29-L30

Added lines #L29 - L30 were not covered by tests

ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
if ret.Err() != nil {
_ = conn.Close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle connection close errors.

The error from conn.Close() is consistently ignored. These errors should be wrapped and returned.

-		_ = conn.Close()
+		if closeErr := conn.Close(); closeErr != nil {
+			return nil, fmt.Errorf("error closing connection after lock acquisition failure: %w (original error: %v)", closeErr, err)
+		}

Also applies to: 40-40, 45-45

return nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
}

Check warning on line 36 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L34-L36

Added lines #L34 - L36 were not covered by tests

var acquired bool
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
panic(err)

Check warning on line 41 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L40-L41

Added lines #L40 - L41 were not covered by tests
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using panic in library code

Using panic in a library function can cause the entire application to crash unexpectedly. It's better to return an error to the caller for proper handling.

Replace the panic with an error return:

	if err := ret.Scan(&acquired); err != nil {
		_ = conn.Close()
-		panic(err)
+		return false, nil, fmt.Errorf("error scanning result: %w", err)
	}

Committable suggestion skipped: line range outside the PR's diff.

}

if !acquired {
_ = conn.Close()
return nil, nil
}

return conn, nil
}

func NewDefaultLocker(db *bun.DB) Locker {
return &defaultLocker{db: db}
}
24 changes: 24 additions & 0 deletions internal/leadership/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build it

package leadership

import (
. "github.com/formancehq/go-libs/v2/testing/utils"
"testing"

"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/testing/docker"
"github.com/formancehq/go-libs/v2/testing/platform/pgtesting"
)

var (
srv *pgtesting.PostgresServer
)

func TestMain(m *testing.M) {
WithTestMain(func(t *TestingTForMain) int {
srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto"))

return m.Run()
})
}
99 changes: 99 additions & 0 deletions internal/leadership/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package leadership

import (
"context"
"github.com/formancehq/go-libs/v2/logging"
"time"
)

type Manager struct {
locker Locker
changes *Broadcaster
logger logging.Logger
retryPeriod time.Duration
stopChannel chan chan struct{}
}

func (m *Manager) Run(ctx context.Context) {
var (
db DBHandle
nextRetry = time.After(time.Duration(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It spams ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry just at init

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just the first time

err error
)
for {
select {
case ch := <-m.stopChannel:
if db != nil {
m.logger.Info("leadership lost")
_ = db.Close()
setIsLeader(ctx, false)
m.changes.Broadcast(Leadership{})
}
close(ch)
close(m.stopChannel)
return
case <-nextRetry:
db, err = m.locker.Take(ctx)
if err != nil || db == nil {
if err != nil {
m.logger.Error("error acquiring lock", err)
}

Check warning on line 40 in internal/leadership/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/manager.go#L39-L40

Added lines #L39 - L40 were not covered by tests
nextRetry = time.After(m.retryPeriod)
continue
}

m.changes.Broadcast(Leadership{
DB: db,
Acquired: true,
})
m.logger.Info("leadership acquired")

setIsLeader(ctx, true)
}
}
}

func (m *Manager) Stop(ctx context.Context) error {
select {
// if already closed
case <-m.stopChannel:
return nil
default:
ch := make(chan struct{})
m.stopChannel <- ch
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 66 in internal/leadership/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/manager.go#L65-L66

Added lines #L65 - L66 were not covered by tests
case <-ch:
return nil
}
}
}

func (m *Manager) GetSignal() *Broadcaster {
return m.changes
}

func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manager {
l := &Manager{
locker: locker,
logger: logger,
changes: NewSignal(),
retryPeriod: 2 * time.Second,
stopChannel: make(chan chan struct{}),
}

for _, option := range options {
option(l)
}

return l
}

type Option func(leadership *Manager)

func WithRetryPeriod(duration time.Duration) Option {
return func(leadership *Manager) {
leadership.retryPeriod = duration
}
}
Loading