Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/api/common"
"github.com/formancehq/ledger/internal/leadership"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewServeCommand() *cobra.Command {
}),
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
leadership.NewFXModule(),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
Expand All @@ -122,15 +124,15 @@ func NewServeCommand() *cobra.Command {
}),
fx.Decorate(func(
params struct {
fx.In
fx.In

Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger
Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger

MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
) chi.Router {
return assembleFinalRouter(
service.IsDebug(cmd),
Expand Down
2 changes: 1 addition & 1 deletion internal/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//go:generate gomarkdoc -o README.md --repository.default-branch main
//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger
package ledger
84 changes: 84 additions & 0 deletions internal/leadership/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package leadership

import (
"sync"
)

type listener[T any] struct {
channel chan T
}

type Broadcaster[T any] struct {
mu sync.Mutex
t *T

inner []listener[T]
outer chan T
}

func (h *Broadcaster[T]) Actual() T {
h.mu.Lock()
defer h.mu.Unlock()

if h.t == nil {
var t T
return t
}
return *h.t
}

func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
paul-nicolas marked this conversation as resolved.
Show resolved Hide resolved
}

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

for index, listener := range h.inner {
if listener == l {
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
break
}
}
}
}
Comment on lines +30 to +58
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize subscription cleanup.

The unsubscribe function has several areas for improvement:

  1. Linear search could be slow with many subscribers
  2. No protection against double unsubscribe
  3. Slice modification could be simplified

Consider this improved implementation:

 func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	newChannel := make(chan T, 1)
 	l := listener[T]{
 		channel: newChannel,
 	}
 	h.inner = append(h.inner, l)
 	if h.t != nil {
 		newChannel <- *h.t
 	}
 
+	var once sync.Once
 	return newChannel, func() {
+		once.Do(func() {
 			h.mu.Lock()
 			defer h.mu.Unlock()
 
 			for index, listener := range h.inner {
 				if listener == l {
-					if index < len(h.inner)-1 {
-						h.inner = append(h.inner[:index], h.inner[index+1:]...)
-					} else {
-						h.inner = h.inner[:index]
-					}
+					// Use copy for better performance
+					copy(h.inner[index:], h.inner[index+1:])
+					h.inner = h.inner[:len(h.inner)-1]
 					break
 				}
 			}
+		})
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
for index, listener := range h.inner {
if listener == l {
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
break
}
}
}
}
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
}
var once sync.Once
return newChannel, func() {
once.Do(func() {
h.mu.Lock()
defer h.mu.Unlock()
for index, listener := range h.inner {
if listener == l {
// Use copy for better performance
copy(h.inner[index:], h.inner[index+1:])
h.inner = h.inner[:len(h.inner)-1]
break
}
}
})
}
}


func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()

h.t = &t

for _, inner := range h.inner {
inner.channel <- t
}
}
Comment on lines +60 to +69
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prevent blocking on slow consumers.

The current implementation could block if any subscriber is slow to consume messages. Consider using non-blocking sends to prevent this.

 func (h *Broadcaster[T]) Broadcast(t T) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	h.t = &t
 
 	for _, inner := range h.inner {
-		inner.channel <- t
+		select {
+		case inner.channel <- t:
+		default:
+			// Skip slow consumers
+		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()
h.t = &t
for _, inner := range h.inner {
inner.channel <- t
}
}
func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()
h.t = &t
for _, inner := range h.inner {
select {
case inner.channel <- t:
default:
// Skip slow consumers
}
}
}


func (h *Broadcaster[T]) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}
}

func NewBroadcaster[T any]() *Broadcaster[T] {
return &Broadcaster[T]{
outer: make(chan T),
}
}
73 changes: 73 additions & 0 deletions internal/leadership/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package leadership

import (
"testing"
"time"
)

func TestBroadcaster(t *testing.T) {
t.Parallel()

broadcaster := NewBroadcaster[struct{}]()
t.Cleanup(broadcaster.Close)

const nbSubscriptions = 5

subscriptions := make([]<-chan struct{}, nbSubscriptions)
releases := make([]func(), nbSubscriptions)

for i := 0; i < nbSubscriptions; i++ {
subscriptions[i], releases[i] = broadcaster.Subscribe()
}

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[2]()
subscriptions = append(subscriptions[:2], subscriptions[3:]...)
releases = append(releases[:2], releases[3:]...)

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[0]()
subscriptions = subscriptions[1:]
releases = releases[1:]

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[2]()
subscriptions = subscriptions[:2]

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}
}
Comment on lines +8 to +73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage.

While the current test covers basic functionality, consider adding tests for:

  1. Concurrent access to ensure thread safety
  2. Slow consumers to verify non-blocking behavior
  3. Close method to ensure proper cleanup
  4. Actual method to verify current value retrieval

Here's an example of additional test cases:

func TestBroadcasterConcurrent(t *testing.T) {
	t.Parallel()
	
	broadcaster := NewBroadcaster[int]()
	t.Cleanup(broadcaster.Close)
	
	const nbGoroutines = 10
	done := make(chan struct{})
	
	// Start multiple goroutines that subscribe and unsubscribe
	for i := 0; i < nbGoroutines; i++ {
		go func() {
			defer func() { done <- struct{}{} }()
			ch, cancel := broadcaster.Subscribe()
			defer cancel()
			
			// Receive some messages
			for j := 0; j < 5; j++ {
				select {
				case <-ch:
				case <-time.After(time.Second):
					t.Error("timeout waiting for broadcast")
				}
			}
		}()
	}
	
	// Broadcast messages while goroutines are subscribing/unsubscribing
	for i := 0; i < 10; i++ {
		broadcaster.Broadcast(i)
	}
	
	// Wait for all goroutines to finish
	for i := 0; i < nbGoroutines; i++ {
		<-done
	}
}

func TestBroadcasterSlowConsumer(t *testing.T) {
	t.Parallel()
	
	broadcaster := NewBroadcaster[int]()
	t.Cleanup(broadcaster.Close)
	
	// Create a slow consumer
	ch, cancel := broadcaster.Subscribe()
	defer cancel()
	
	// Broadcast should not block even if consumer is slow
	for i := 0; i < 100; i++ {
		broadcaster.Broadcast(i)
	}
	
	// Verify we can still receive the latest value
	select {
	case v := <-ch:
		if v != 99 {
			t.Errorf("expected 99, got %d", v)
		}
	case <-time.After(time.Second):
		t.Error("timeout waiting for broadcast")
	}
}

43 changes: 43 additions & 0 deletions internal/leadership/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package leadership

import (
"context"
"sync"
)

type contextKey struct{}

var holderContextKey contextKey = struct{}{}

func ContextWithLeadershipInfo(ctx context.Context) context.Context {
return context.WithValue(ctx, holderContextKey, &holder{})
}

func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}

Check warning on line 20 in internal/leadership/context.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/context.go#L19-L20

Added lines #L19 - L20 were not covered by tests
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

return holder.isLeader
}

func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
holder := h.(*holder)
holder.Lock()
defer holder.Unlock()

holder.isLeader = isLeader
}

type holder struct {
sync.Mutex
isLeader bool
}
25 changes: 25 additions & 0 deletions internal/leadership/database_handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package leadership

import (
"github.com/uptrace/bun"
"sync"
)

type DatabaseHandle struct {
*sync.Mutex
db DBHandle
}
Comment on lines +8 to +11
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Define or import the DBHandle type.

The DBHandle type is used in the struct but is not defined or imported. This could lead to compilation errors.

Please either:

  1. Define the DBHandle type interface in this package, or
  2. Import it from the appropriate package

🛠️ Refactor suggestion

Avoid embedding sync.Mutex to prevent exposing Lock/Unlock methods.

Embedding *sync.Mutex exposes the mutex methods directly, which breaks encapsulation and could lead to incorrect usage. Consider making it a regular field instead.

 type DatabaseHandle struct {
-    *sync.Mutex
+    mu *sync.Mutex
     db DBHandle
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type DatabaseHandle struct {
*sync.Mutex
db DBHandle
}
type DatabaseHandle struct {
mu *sync.Mutex
db DBHandle
}


func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
m.Mutex.Lock()
defer m.Mutex.Unlock()

fn(m.db)
}
Comment on lines +13 to +18
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add context and error handling support to the Exec method.

The current implementation has several limitations:

  1. No context support for cancellation
  2. Panics in the callback function are not handled
  3. No way to return errors from the callback

Consider this improved implementation:

-func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
+func (m *DatabaseHandle) Exec(ctx context.Context, fn func(db bun.IDB) error) error {
     m.Mutex.Lock()
     defer m.Mutex.Unlock()
 
-    fn(m.db)
+    if err := fn(m.db); err != nil {
+        return fmt.Errorf("database operation failed: %w", err)
+    }
+    return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
m.Mutex.Lock()
defer m.Mutex.Unlock()
fn(m.db)
}
func (m *DatabaseHandle) Exec(ctx context.Context, fn func(db bun.IDB) error) error {
m.Mutex.Lock()
defer m.Mutex.Unlock()
if err := fn(m.db); err != nil {
return fmt.Errorf("database operation failed: %w", err)
}
return nil
}


func NewDatabaseHandle(db DBHandle) *DatabaseHandle {
return &DatabaseHandle{
Mutex: &sync.Mutex{},
db: db,
}
}
6 changes: 6 additions & 0 deletions internal/leadership/leadership.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package leadership

type Leadership struct {
Acquired bool
DB *DatabaseHandle
}
54 changes: 54 additions & 0 deletions internal/leadership/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package leadership

import (
"context"
"fmt"
"github.com/uptrace/bun"
)

const leadershipAdvisoryLockKey = 123456789

type DBHandle interface {
bun.IDB
Close() error
}

// Locker take a lock at process level
// It returns a bun.IDB which MUST be invalidated when the lock is lost
type Locker interface {
Take(ctx context.Context) (DBHandle, error)
}

type defaultLocker struct {
db *bun.DB
}

func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) {
conn, err := p.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("error opening new connection: %w", err)
}

Check warning on line 30 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L29-L30

Added lines #L29 - L30 were not covered by tests

ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
if ret.Err() != nil {
_ = conn.Close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle connection close errors.

The error from conn.Close() is consistently ignored. These errors should be wrapped and returned.

-		_ = conn.Close()
+		if closeErr := conn.Close(); closeErr != nil {
+			return nil, fmt.Errorf("error closing connection after lock acquisition failure: %w (original error: %v)", closeErr, err)
+		}

Also applies to: 40-40, 45-45

return nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
}

Check warning on line 36 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L34-L36

Added lines #L34 - L36 were not covered by tests

var acquired bool
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
return nil, err
}

if !acquired {

Check warning on line 44 in internal/leadership/locker.go

View check run for this annotation

Codecov / codecov/patch

internal/leadership/locker.go#L40-L44

Added lines #L40 - L44 were not covered by tests
_ = conn.Close()
return nil, nil
}

return conn, nil
}

func NewDefaultLocker(db *bun.DB) Locker {
return &defaultLocker{db: db}
}
24 changes: 24 additions & 0 deletions internal/leadership/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build it

package leadership

import (
. "github.com/formancehq/go-libs/v2/testing/utils"
"testing"

"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/testing/docker"
"github.com/formancehq/go-libs/v2/testing/platform/pgtesting"
)

var (
srv *pgtesting.PostgresServer
)

func TestMain(m *testing.M) {
WithTestMain(func(t *TestingTForMain) int {
srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto"))

return m.Run()
})
}
Loading