From a1b7c0eb56bfc2acd6696c8fbb734188dc5957b6 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 23 Jan 2025 11:30:08 +0100 Subject: [PATCH 1/7] feat: add leadership package --- cmd/serve.go | 9 +- internal/doc.go | 2 +- internal/leadership/leadership.go | 80 ++++++++++++++++++ internal/leadership/leadership_test.go | 73 ++++++++++++++++ internal/leadership/locker.go | 50 +++++++++++ internal/leadership/locker_generated_test.go | 55 ++++++++++++ internal/leadership/module.go | 39 +++++++++ internal/leadership/signal.go | 88 ++++++++++++++++++++ 8 files changed, 391 insertions(+), 5 deletions(-) create mode 100644 internal/leadership/leadership.go create mode 100644 internal/leadership/leadership_test.go create mode 100644 internal/leadership/locker.go create mode 100644 internal/leadership/locker_generated_test.go create mode 100644 internal/leadership/module.go create mode 100644 internal/leadership/signal.go diff --git a/cmd/serve.go b/cmd/serve.go index 1d5bec751..e4eee4d93 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -1,13 +1,13 @@ package cmd import ( - "net/http" - "net/http/pprof" - "time" - "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/ledger/internal/api/common" + "github.com/formancehq/ledger/internal/leadership" systemstore "github.com/formancehq/ledger/internal/storage/system" + "net/http" + "net/http/pprof" + "time" apilib "github.com/formancehq/go-libs/v2/api" "github.com/formancehq/go-libs/v2/health" @@ -112,6 +112,7 @@ func NewServeCommand() *cobra.Command { }), bus.NewFxModule(), ballast.Module(serveConfiguration.ballastSize), + leadership.NewFXModule(), api.Module(api.Config{ Version: Version, Debug: service.IsDebug(cmd), diff --git a/internal/doc.go b/internal/doc.go index 631b605c7..84136d93d 100644 --- a/internal/doc.go +++ b/internal/doc.go @@ -1,2 +1,2 @@ -//go:generate gomarkdoc -o README.md --repository.default-branch main +//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger package ledger diff --git a/internal/leadership/leadership.go b/internal/leadership/leadership.go new file mode 100644 index 000000000..2cef3459c --- /dev/null +++ b/internal/leadership/leadership.go @@ -0,0 +1,80 @@ +package leadership + +import ( + "context" + "errors" + "fmt" + "github.com/formancehq/go-libs/v2/logging" + "time" +) + +type Leadership struct { + locker Locker + changes *Signal + logger logging.Logger + retryPeriod time.Duration +} + +func (l *Leadership) acquire(ctx context.Context) error { + + acquired, release, err := l.locker.Take(ctx) + if err != nil { + return fmt.Errorf("error acquiring lock: %w", err) + } + + if acquired { + l.changes.Signal(true) + l.logger.Info("leadership acquired") + <-ctx.Done() + l.logger.Info("leadership lost") + release() + l.changes.Signal(false) + return ctx.Err() + } else { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(l.retryPeriod): + } + } + + return nil +} + +func (l *Leadership) Run(ctx context.Context) { + for { + if err := l.acquire(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return + } + l.logger.Errorf("error acquiring leadership: %s", err) + } + } +} + +func (l *Leadership) GetSignal() *Signal { + return l.changes +} + +func NewLeadership(locker Locker, logger logging.Logger, options ...Option) *Leadership { + l := &Leadership{ + locker: locker, + logger: logger, + changes: NewSignal(), + retryPeriod: 2 * time.Second, + } + + for _, option := range options { + option(l) + } + + return l +} + +type Option func(leadership *Leadership) + +func WithRetryPeriod(duration time.Duration) Option { + return func(leadership *Leadership) { + leadership.retryPeriod = duration + } +} diff --git a/internal/leadership/leadership_test.go b/internal/leadership/leadership_test.go new file mode 100644 index 000000000..66f20dbbb --- /dev/null +++ b/internal/leadership/leadership_test.go @@ -0,0 +1,73 @@ +package leadership + +import ( + "context" + "github.com/formancehq/go-libs/v2/logging" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "sync/atomic" + "testing" + "time" +) + +func TestLeaderShip(t *testing.T) { + + t.Parallel() + + ctx := logging.TestingContext() + ctrl := gomock.NewController(t) + + const count = 10 + selectedInstance := atomic.Int32{} + + type instance struct { + leadership *Leadership + ctx context.Context + cancel func() + } + + instances := make([]instance, count) + for i := range count { + m := NewMockLocker(ctrl) + m.EXPECT(). + Take(gomock.Any()). + AnyTimes(). + DoAndReturn(func(ctx context.Context) (bool, func(), error) { + return i == int(selectedInstance.Load()), func() {}, nil + }) + + l := NewLeadership(m, logging.Testing(), WithRetryPeriod(10*time.Millisecond)) + + ctx, cancel := context.WithCancel(ctx) + + go l.Run(ctx) + + instances[i] = instance{ + leadership: l, + ctx: ctx, + cancel: cancel, + } + } + + for _, nextLeader := range []int{0, 2, 4, 8} { + selectedInstance.Store(int32(nextLeader)) + + leadershipSignal, release := instances[nextLeader].leadership.GetSignal().Listen() + select { + case acquired := <-leadershipSignal: + require.True(t, acquired, "instance %d should be leader", nextLeader) + case <-time.After(100 * time.Millisecond): + t.Fatal("signal should have been received") + } + + instances[nextLeader].cancel() + + select { + case acquired := <-leadershipSignal: + require.False(t, acquired, "instance %d should have lost the leadership", nextLeader) + case <-time.After(100 * time.Millisecond): + t.Fatal("signal should have been received") + } + release() + } +} diff --git a/internal/leadership/locker.go b/internal/leadership/locker.go new file mode 100644 index 000000000..e8fadd2ed --- /dev/null +++ b/internal/leadership/locker.go @@ -0,0 +1,50 @@ +package leadership + +import ( + "context" + "fmt" + "github.com/uptrace/bun" +) + +const leadershipAdvisoryLockKey = 123456789 + +//go:generate mockgen -write_source_comment=false -write_package_comment=false -source locker.go -destination locker_generated_test.go -package leadership . Locker +type Locker interface { + Take(ctx context.Context) (bool, func(), error) +} + +type defaultLocker struct { + db *bun.DB +} + +func (p *defaultLocker) Take(ctx context.Context) (bool, func(), error) { + conn, err := p.db.Conn(ctx) + if err != nil { + return false, nil, fmt.Errorf("error opening new connection: %w", err) + } + + ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey) + if ret.Err() != nil { + _ = conn.Close() + return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err()) + } + + var acquired bool + if err := ret.Scan(&acquired); err != nil { + _ = conn.Close() + panic(err) + } + + if !acquired { + _ = conn.Close() + return false, nil, nil + } + + return true, func() { + _ = conn.Close() + }, nil +} + +func NewDefaultLocker(db *bun.DB) Locker { + return &defaultLocker{db: db} +} diff --git a/internal/leadership/locker_generated_test.go b/internal/leadership/locker_generated_test.go new file mode 100644 index 000000000..51e3a6ae8 --- /dev/null +++ b/internal/leadership/locker_generated_test.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// +// Generated by this command: +// +// mockgen -write_source_comment=false -write_package_comment=false -source locker.go -destination locker_generated_test.go -package leadership . Locker +// + +package leadership + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockLocker is a mock of Locker interface. +type MockLocker struct { + ctrl *gomock.Controller + recorder *MockLockerMockRecorder + isgomock struct{} +} + +// MockLockerMockRecorder is the mock recorder for MockLocker. +type MockLockerMockRecorder struct { + mock *MockLocker +} + +// NewMockLocker creates a new mock instance. +func NewMockLocker(ctrl *gomock.Controller) *MockLocker { + mock := &MockLocker{ctrl: ctrl} + mock.recorder = &MockLockerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLocker) EXPECT() *MockLockerMockRecorder { + return m.recorder +} + +// Take mocks base method. +func (m *MockLocker) Take(ctx context.Context) (bool, func(), error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Take", ctx) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(func()) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Take indicates an expected call of Take. +func (mr *MockLockerMockRecorder) Take(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Take", reflect.TypeOf((*MockLocker)(nil).Take), ctx) +} diff --git a/internal/leadership/module.go b/internal/leadership/module.go new file mode 100644 index 000000000..104f45ccf --- /dev/null +++ b/internal/leadership/module.go @@ -0,0 +1,39 @@ +package leadership + +import ( + "context" + "go.uber.org/fx" +) + +func NewFXModule() fx.Option { + return fx.Options( + fx.Provide(NewLeadership), + fx.Provide(NewDefaultLocker), + fx.Invoke(func(lc fx.Lifecycle, runner *Leadership) { + var ( + cancel context.CancelFunc + stopped = make(chan struct{}) + ) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + ctx, cancel = context.WithCancel(context.WithoutCancel(ctx)) + go func() { + defer close(stopped) + runner.Run(ctx) + }() + + return nil + }, + OnStop: func(ctx context.Context) error { + cancel() + select { + case <-stopped: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + }) + }), + ) +} diff --git a/internal/leadership/signal.go b/internal/leadership/signal.go new file mode 100644 index 000000000..3fe538436 --- /dev/null +++ b/internal/leadership/signal.go @@ -0,0 +1,88 @@ +package leadership + +import ( + "github.com/formancehq/go-libs/v2/pointer" + "sync" +) + +type listener struct { + channel chan bool +} + +type Signal struct { + mu *sync.Mutex + t *bool + + inner []listener + outer chan bool +} + +func (h *Signal) Actual() *bool { + h.mu.Lock() + defer h.mu.Unlock() + + if h.t == nil { + return nil + } + + return pointer.For(*h.t) +} + +func (h *Signal) Listen() (<-chan bool, func()) { + h.mu.Lock() + defer h.mu.Unlock() + + newChannel := make(chan bool, 1) + index := len(h.inner) + h.inner = append(h.inner, listener{ + channel: newChannel, + }) + if h.t != nil { + newChannel <- *h.t + } + + return newChannel, func() { + h.mu.Lock() + defer h.mu.Unlock() + + if index < len(h.inner)-1 { + h.inner = append(h.inner[:index], h.inner[index+1:]...) + } else { + h.inner = h.inner[:index] + } + } +} + +func (h *Signal) Signal(t bool) { + h.mu.Lock() + defer h.mu.Unlock() + + h.t = &t + + for _, inner := range h.inner { + inner.channel <- t + } +} + +func (h *Signal) Close() { + h.mu.Lock() + defer h.mu.Unlock() + + for _, inner := range h.inner { + close(inner.channel) + } +} + +func (h *Signal) CountListeners() int { + h.mu.Lock() + defer h.mu.Unlock() + + return len(h.inner) +} + +func NewSignal() *Signal { + return &Signal{ + outer: make(chan bool), + mu: &sync.Mutex{}, + } +} From d758b471730bb6163e1e3f507150e98c9f0c0e27 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 29 Jan 2025 14:18:19 +0100 Subject: [PATCH 2/7] feat: refine --- .../bulking/mocks_ledger_controller_test.go | 3 - .../common/mocks_ledger_controller_test.go | 3 - .../common/mocks_system_controller_test.go | 3 - .../api/v1/mocks_ledger_controller_test.go | 3 - .../api/v1/mocks_system_controller_test.go | 3 - .../api/v2/mocks_ledger_controller_test.go | 3 - .../api/v2/mocks_system_controller_test.go | 3 - .../ledger/controller_generated_test.go | 3 - ...too_many_client_handling_generated_test.go | 3 - .../ledger/listener_generated_test.go | 3 - .../ledger/numscript_parser_generated_test.go | 3 - .../numscript_runtime_generated_test.go | 3 - .../controller/ledger/store_generated_test.go | 5 - .../leadership/{signal.go => broadcaster.go} | 32 +++--- internal/leadership/context.go | 33 +++++++ internal/leadership/leadership.go | 78 +-------------- internal/leadership/leadership_test.go | 73 -------------- internal/leadership/locker.go | 22 +++-- internal/leadership/locker_generated_test.go | 55 ----------- internal/leadership/main_test.go | 24 +++++ internal/leadership/manager.go | 99 +++++++++++++++++++ internal/leadership/manager_test.go | 73 ++++++++++++++ internal/leadership/module.go | 13 +-- .../storage/driver/buckets_generated_test.go | 4 - .../storage/driver/ledger_generated_test.go | 3 - .../storage/driver/system_generated_test.go | 3 - pkg/testserver/server.go | 9 +- test/e2e/app_multiple_instance_test.go | 62 +++++++++++- 28 files changed, 329 insertions(+), 295 deletions(-) rename internal/leadership/{signal.go => broadcaster.go} (62%) create mode 100644 internal/leadership/context.go delete mode 100644 internal/leadership/leadership_test.go delete mode 100644 internal/leadership/locker_generated_test.go create mode 100644 internal/leadership/main_test.go create mode 100644 internal/leadership/manager.go create mode 100644 internal/leadership/manager_test.go diff --git a/internal/api/bulking/mocks_ledger_controller_test.go b/internal/api/bulking/mocks_ledger_controller_test.go index cbf90fc33..2cede2100 100644 --- a/internal/api/bulking/mocks_ledger_controller_test.go +++ b/internal/api/bulking/mocks_ledger_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package bulking --mock_names Controller=LedgerController . Controller -// - package bulking import ( @@ -23,7 +21,6 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder - isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/common/mocks_ledger_controller_test.go b/internal/api/common/mocks_ledger_controller_test.go index 01e775d3f..c263cfa1f 100644 --- a/internal/api/common/mocks_ledger_controller_test.go +++ b/internal/api/common/mocks_ledger_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package common --mock_names Controller=LedgerController . Controller -// - package common import ( @@ -23,7 +21,6 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder - isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/common/mocks_system_controller_test.go b/internal/api/common/mocks_system_controller_test.go index 0c85dc246..b0fbeaea8 100644 --- a/internal/api/common/mocks_system_controller_test.go +++ b/internal/api/common/mocks_system_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/system/controller.go -destination mocks_system_controller_test.go -package common --mock_names Controller=SystemController . Controller -// - package common import ( @@ -21,7 +19,6 @@ import ( type SystemController struct { ctrl *gomock.Controller recorder *SystemControllerMockRecorder - isgomock struct{} } // SystemControllerMockRecorder is the mock recorder for SystemController. diff --git a/internal/api/v1/mocks_ledger_controller_test.go b/internal/api/v1/mocks_ledger_controller_test.go index 2f3a686e1..f89439826 100644 --- a/internal/api/v1/mocks_ledger_controller_test.go +++ b/internal/api/v1/mocks_ledger_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package v1 --mock_names Controller=LedgerController . Controller -// - package v1 import ( @@ -23,7 +21,6 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder - isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/v1/mocks_system_controller_test.go b/internal/api/v1/mocks_system_controller_test.go index f3f19b232..1ad57614e 100644 --- a/internal/api/v1/mocks_system_controller_test.go +++ b/internal/api/v1/mocks_system_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/system/controller.go -destination mocks_system_controller_test.go -package v1 --mock_names Controller=SystemController . Controller -// - package v1 import ( @@ -21,7 +19,6 @@ import ( type SystemController struct { ctrl *gomock.Controller recorder *SystemControllerMockRecorder - isgomock struct{} } // SystemControllerMockRecorder is the mock recorder for SystemController. diff --git a/internal/api/v2/mocks_ledger_controller_test.go b/internal/api/v2/mocks_ledger_controller_test.go index a0d043ca4..2cbbfee4a 100644 --- a/internal/api/v2/mocks_ledger_controller_test.go +++ b/internal/api/v2/mocks_ledger_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package v2 --mock_names Controller=LedgerController . Controller -// - package v2 import ( @@ -23,7 +21,6 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder - isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/v2/mocks_system_controller_test.go b/internal/api/v2/mocks_system_controller_test.go index c4d8c215a..45d1eaaa5 100644 --- a/internal/api/v2/mocks_system_controller_test.go +++ b/internal/api/v2/mocks_system_controller_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/system/controller.go -destination mocks_system_controller_test.go -package v2 --mock_names Controller=SystemController . Controller -// - package v2 import ( @@ -21,7 +19,6 @@ import ( type SystemController struct { ctrl *gomock.Controller recorder *SystemControllerMockRecorder - isgomock struct{} } // SystemControllerMockRecorder is the mock recorder for SystemController. diff --git a/internal/controller/ledger/controller_generated_test.go b/internal/controller/ledger/controller_generated_test.go index 1495b8c74..7e6601231 100644 --- a/internal/controller/ledger/controller_generated_test.go +++ b/internal/controller/ledger/controller_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source controller.go -destination controller_generated_test.go -package ledger . Controller -// - package ledger import ( @@ -22,7 +20,6 @@ import ( type MockController struct { ctrl *gomock.Controller recorder *MockControllerMockRecorder - isgomock struct{} } // MockControllerMockRecorder is the mock recorder for MockController. diff --git a/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go b/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go index 9f397752e..2f0c421cb 100644 --- a/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go +++ b/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source controller_with_too_many_client_handling.go -destination controller_with_too_many_client_handling_generated_test.go -package ledger . DelayCalculator -typed -// - package ledger import ( @@ -18,7 +16,6 @@ import ( type MockDelayCalculator struct { ctrl *gomock.Controller recorder *MockDelayCalculatorMockRecorder - isgomock struct{} } // MockDelayCalculatorMockRecorder is the mock recorder for MockDelayCalculator. diff --git a/internal/controller/ledger/listener_generated_test.go b/internal/controller/ledger/listener_generated_test.go index e0e7e584c..44df6a6a7 100644 --- a/internal/controller/ledger/listener_generated_test.go +++ b/internal/controller/ledger/listener_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source listener.go -destination listener_generated_test.go -package ledger . Listener -// - package ledger import ( @@ -20,7 +18,6 @@ import ( type MockListener struct { ctrl *gomock.Controller recorder *MockListenerMockRecorder - isgomock struct{} } // MockListenerMockRecorder is the mock recorder for MockListener. diff --git a/internal/controller/ledger/numscript_parser_generated_test.go b/internal/controller/ledger/numscript_parser_generated_test.go index 5219a92e2..f319d367d 100644 --- a/internal/controller/ledger/numscript_parser_generated_test.go +++ b/internal/controller/ledger/numscript_parser_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source numscript_parser.go -destination numscript_parser_generated_test.go -package ledger . NumscriptParser -// - package ledger import ( @@ -17,7 +15,6 @@ import ( type MockNumscriptParser struct { ctrl *gomock.Controller recorder *MockNumscriptParserMockRecorder - isgomock struct{} } // MockNumscriptParserMockRecorder is the mock recorder for MockNumscriptParser. diff --git a/internal/controller/ledger/numscript_runtime_generated_test.go b/internal/controller/ledger/numscript_runtime_generated_test.go index 8a6343843..254a78556 100644 --- a/internal/controller/ledger/numscript_runtime_generated_test.go +++ b/internal/controller/ledger/numscript_runtime_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source numscript_runtime.go -destination numscript_runtime_generated_test.go -package ledger . NumscriptRuntime -// - package ledger import ( @@ -18,7 +16,6 @@ import ( type MockNumscriptRuntime struct { ctrl *gomock.Controller recorder *MockNumscriptRuntimeMockRecorder - isgomock struct{} } // MockNumscriptRuntimeMockRecorder is the mock recorder for MockNumscriptRuntime. diff --git a/internal/controller/ledger/store_generated_test.go b/internal/controller/ledger/store_generated_test.go index accb45087..7a677d58e 100644 --- a/internal/controller/ledger/store_generated_test.go +++ b/internal/controller/ledger/store_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source store.go -destination store_generated_test.go -package ledger . PaginatedResource -// - package ledger import ( @@ -25,7 +23,6 @@ import ( type MockStore struct { ctrl *gomock.Controller recorder *MockStoreMockRecorder - isgomock struct{} } // MockStoreMockRecorder is the mock recorder for MockStore. @@ -373,7 +370,6 @@ func (mr *MockStoreMockRecorder) Volumes() *gomock.Call { type MockResource[ResourceType any, OptionsType any] struct { ctrl *gomock.Controller recorder *MockResourceMockRecorder[ResourceType, OptionsType] - isgomock struct{} } // MockResourceMockRecorder is the mock recorder for MockResource. @@ -427,7 +423,6 @@ func (mr *MockResourceMockRecorder[ResourceType, OptionsType]) GetOne(ctx, query type MockPaginatedResource[ResourceType any, OptionsType any, PaginationQueryType PaginatedQuery[OptionsType]] struct { ctrl *gomock.Controller recorder *MockPaginatedResourceMockRecorder[ResourceType, OptionsType, PaginationQueryType] - isgomock struct{} } // MockPaginatedResourceMockRecorder is the mock recorder for MockPaginatedResource. diff --git a/internal/leadership/signal.go b/internal/leadership/broadcaster.go similarity index 62% rename from internal/leadership/signal.go rename to internal/leadership/broadcaster.go index 3fe538436..b0f084878 100644 --- a/internal/leadership/signal.go +++ b/internal/leadership/broadcaster.go @@ -1,38 +1,36 @@ package leadership import ( - "github.com/formancehq/go-libs/v2/pointer" "sync" ) type listener struct { - channel chan bool + channel chan Leadership } -type Signal struct { +type Broadcaster struct { mu *sync.Mutex - t *bool + t *Leadership inner []listener - outer chan bool + outer chan Leadership } -func (h *Signal) Actual() *bool { +func (h *Broadcaster) Actual() Leadership { h.mu.Lock() defer h.mu.Unlock() if h.t == nil { - return nil + return Leadership{} } - - return pointer.For(*h.t) + return *h.t } -func (h *Signal) Listen() (<-chan bool, func()) { +func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) { h.mu.Lock() defer h.mu.Unlock() - newChannel := make(chan bool, 1) + newChannel := make(chan Leadership, 1) index := len(h.inner) h.inner = append(h.inner, listener{ channel: newChannel, @@ -53,7 +51,7 @@ func (h *Signal) Listen() (<-chan bool, func()) { } } -func (h *Signal) Signal(t bool) { +func (h *Broadcaster) Broadcast(t Leadership) { h.mu.Lock() defer h.mu.Unlock() @@ -64,7 +62,7 @@ func (h *Signal) Signal(t bool) { } } -func (h *Signal) Close() { +func (h *Broadcaster) Close() { h.mu.Lock() defer h.mu.Unlock() @@ -73,16 +71,16 @@ func (h *Signal) Close() { } } -func (h *Signal) CountListeners() int { +func (h *Broadcaster) CountListeners() int { h.mu.Lock() defer h.mu.Unlock() return len(h.inner) } -func NewSignal() *Signal { - return &Signal{ - outer: make(chan bool), +func NewSignal() *Broadcaster { + return &Broadcaster{ + outer: make(chan Leadership), mu: &sync.Mutex{}, } } diff --git a/internal/leadership/context.go b/internal/leadership/context.go new file mode 100644 index 000000000..4311b2b63 --- /dev/null +++ b/internal/leadership/context.go @@ -0,0 +1,33 @@ +package leadership + +import ( + "context" +) + +type contextKey string + +var holderContextKey contextKey = "holder" + +func ContextWithLeadershipInfo(ctx context.Context) context.Context { + return context.WithValue(ctx, holderContextKey, &holder{}) +} + +func IsLeader(ctx context.Context) bool { + h := ctx.Value(holderContextKey) + if h == nil { + return false + } + return h.(*holder).isLeader +} + +func setIsLeader(ctx context.Context, isLeader bool) { + h := ctx.Value(holderContextKey) + if h == nil { + return + } + h.(*holder).isLeader = isLeader +} + +type holder struct { + isLeader bool +} diff --git a/internal/leadership/leadership.go b/internal/leadership/leadership.go index 2cef3459c..27814df15 100644 --- a/internal/leadership/leadership.go +++ b/internal/leadership/leadership.go @@ -1,80 +1,6 @@ package leadership -import ( - "context" - "errors" - "fmt" - "github.com/formancehq/go-libs/v2/logging" - "time" -) - type Leadership struct { - locker Locker - changes *Signal - logger logging.Logger - retryPeriod time.Duration -} - -func (l *Leadership) acquire(ctx context.Context) error { - - acquired, release, err := l.locker.Take(ctx) - if err != nil { - return fmt.Errorf("error acquiring lock: %w", err) - } - - if acquired { - l.changes.Signal(true) - l.logger.Info("leadership acquired") - <-ctx.Done() - l.logger.Info("leadership lost") - release() - l.changes.Signal(false) - return ctx.Err() - } else { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(l.retryPeriod): - } - } - - return nil -} - -func (l *Leadership) Run(ctx context.Context) { - for { - if err := l.acquire(ctx); err != nil { - if errors.Is(err, context.Canceled) { - return - } - l.logger.Errorf("error acquiring leadership: %s", err) - } - } -} - -func (l *Leadership) GetSignal() *Signal { - return l.changes -} - -func NewLeadership(locker Locker, logger logging.Logger, options ...Option) *Leadership { - l := &Leadership{ - locker: locker, - logger: logger, - changes: NewSignal(), - retryPeriod: 2 * time.Second, - } - - for _, option := range options { - option(l) - } - - return l -} - -type Option func(leadership *Leadership) - -func WithRetryPeriod(duration time.Duration) Option { - return func(leadership *Leadership) { - leadership.retryPeriod = duration - } + Acquired bool + DB DBHandle } diff --git a/internal/leadership/leadership_test.go b/internal/leadership/leadership_test.go deleted file mode 100644 index 66f20dbbb..000000000 --- a/internal/leadership/leadership_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package leadership - -import ( - "context" - "github.com/formancehq/go-libs/v2/logging" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "sync/atomic" - "testing" - "time" -) - -func TestLeaderShip(t *testing.T) { - - t.Parallel() - - ctx := logging.TestingContext() - ctrl := gomock.NewController(t) - - const count = 10 - selectedInstance := atomic.Int32{} - - type instance struct { - leadership *Leadership - ctx context.Context - cancel func() - } - - instances := make([]instance, count) - for i := range count { - m := NewMockLocker(ctrl) - m.EXPECT(). - Take(gomock.Any()). - AnyTimes(). - DoAndReturn(func(ctx context.Context) (bool, func(), error) { - return i == int(selectedInstance.Load()), func() {}, nil - }) - - l := NewLeadership(m, logging.Testing(), WithRetryPeriod(10*time.Millisecond)) - - ctx, cancel := context.WithCancel(ctx) - - go l.Run(ctx) - - instances[i] = instance{ - leadership: l, - ctx: ctx, - cancel: cancel, - } - } - - for _, nextLeader := range []int{0, 2, 4, 8} { - selectedInstance.Store(int32(nextLeader)) - - leadershipSignal, release := instances[nextLeader].leadership.GetSignal().Listen() - select { - case acquired := <-leadershipSignal: - require.True(t, acquired, "instance %d should be leader", nextLeader) - case <-time.After(100 * time.Millisecond): - t.Fatal("signal should have been received") - } - - instances[nextLeader].cancel() - - select { - case acquired := <-leadershipSignal: - require.False(t, acquired, "instance %d should have lost the leadership", nextLeader) - case <-time.After(100 * time.Millisecond): - t.Fatal("signal should have been received") - } - release() - } -} diff --git a/internal/leadership/locker.go b/internal/leadership/locker.go index e8fadd2ed..3bebb246d 100644 --- a/internal/leadership/locker.go +++ b/internal/leadership/locker.go @@ -8,25 +8,31 @@ import ( const leadershipAdvisoryLockKey = 123456789 -//go:generate mockgen -write_source_comment=false -write_package_comment=false -source locker.go -destination locker_generated_test.go -package leadership . Locker +type DBHandle interface { + bun.IDB + Close() error +} + +// Locker take a lock at process level +// It returns a bun.IDB which MUST be invalidated when the lock is lost type Locker interface { - Take(ctx context.Context) (bool, func(), error) + Take(ctx context.Context) (DBHandle, error) } type defaultLocker struct { db *bun.DB } -func (p *defaultLocker) Take(ctx context.Context) (bool, func(), error) { +func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) { conn, err := p.db.Conn(ctx) if err != nil { - return false, nil, fmt.Errorf("error opening new connection: %w", err) + return nil, fmt.Errorf("error opening new connection: %w", err) } ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey) if ret.Err() != nil { _ = conn.Close() - return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err()) + return nil, fmt.Errorf("error acquiring lock: %w", ret.Err()) } var acquired bool @@ -37,12 +43,10 @@ func (p *defaultLocker) Take(ctx context.Context) (bool, func(), error) { if !acquired { _ = conn.Close() - return false, nil, nil + return nil, nil } - return true, func() { - _ = conn.Close() - }, nil + return conn, nil } func NewDefaultLocker(db *bun.DB) Locker { diff --git a/internal/leadership/locker_generated_test.go b/internal/leadership/locker_generated_test.go deleted file mode 100644 index 51e3a6ae8..000000000 --- a/internal/leadership/locker_generated_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// -// Generated by this command: -// -// mockgen -write_source_comment=false -write_package_comment=false -source locker.go -destination locker_generated_test.go -package leadership . Locker -// - -package leadership - -import ( - context "context" - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockLocker is a mock of Locker interface. -type MockLocker struct { - ctrl *gomock.Controller - recorder *MockLockerMockRecorder - isgomock struct{} -} - -// MockLockerMockRecorder is the mock recorder for MockLocker. -type MockLockerMockRecorder struct { - mock *MockLocker -} - -// NewMockLocker creates a new mock instance. -func NewMockLocker(ctrl *gomock.Controller) *MockLocker { - mock := &MockLocker{ctrl: ctrl} - mock.recorder = &MockLockerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockLocker) EXPECT() *MockLockerMockRecorder { - return m.recorder -} - -// Take mocks base method. -func (m *MockLocker) Take(ctx context.Context) (bool, func(), error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Take", ctx) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(func()) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// Take indicates an expected call of Take. -func (mr *MockLockerMockRecorder) Take(ctx any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Take", reflect.TypeOf((*MockLocker)(nil).Take), ctx) -} diff --git a/internal/leadership/main_test.go b/internal/leadership/main_test.go new file mode 100644 index 000000000..11f5f9e35 --- /dev/null +++ b/internal/leadership/main_test.go @@ -0,0 +1,24 @@ +//go:build it + +package leadership + +import ( + . "github.com/formancehq/go-libs/v2/testing/utils" + "testing" + + "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/testing/docker" + "github.com/formancehq/go-libs/v2/testing/platform/pgtesting" +) + +var ( + srv *pgtesting.PostgresServer +) + +func TestMain(m *testing.M) { + WithTestMain(func(t *TestingTForMain) int { + srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto")) + + return m.Run() + }) +} \ No newline at end of file diff --git a/internal/leadership/manager.go b/internal/leadership/manager.go new file mode 100644 index 000000000..0ac3c007c --- /dev/null +++ b/internal/leadership/manager.go @@ -0,0 +1,99 @@ +package leadership + +import ( + "context" + "github.com/formancehq/go-libs/v2/logging" + "time" +) + +type Manager struct { + locker Locker + changes *Broadcaster + logger logging.Logger + retryPeriod time.Duration + stopChannel chan chan struct{} +} + +func (m *Manager) Run(ctx context.Context) { + var ( + db DBHandle + nextRetry = time.After(time.Duration(0)) + err error + ) + for { + select { + case ch := <-m.stopChannel: + if db != nil { + m.logger.Info("leadership lost") + _ = db.Close() + setIsLeader(ctx, false) + m.changes.Broadcast(Leadership{}) + } + close(ch) + close(m.stopChannel) + return + case <-nextRetry: + db, err = m.locker.Take(ctx) + if err != nil || db == nil { + if err != nil { + m.logger.Error("error acquiring lock", err) + } + nextRetry = time.After(m.retryPeriod) + continue + } + + m.changes.Broadcast(Leadership{ + DB: db, + Acquired: true, + }) + m.logger.Info("leadership acquired") + + setIsLeader(ctx, true) + } + } +} + +func (m *Manager) Stop(ctx context.Context) error { + select { + // if already closed + case <-m.stopChannel: + return nil + default: + ch := make(chan struct{}) + m.stopChannel <- ch + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch: + return nil + } + } +} + +func (m *Manager) GetSignal() *Broadcaster { + return m.changes +} + +func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manager { + l := &Manager{ + locker: locker, + logger: logger, + changes: NewSignal(), + retryPeriod: 2 * time.Second, + stopChannel: make(chan chan struct{}), + } + + for _, option := range options { + option(l) + } + + return l +} + +type Option func(leadership *Manager) + +func WithRetryPeriod(duration time.Duration) Option { + return func(leadership *Manager) { + leadership.retryPeriod = duration + } +} diff --git a/internal/leadership/manager_test.go b/internal/leadership/manager_test.go new file mode 100644 index 000000000..5151ed64e --- /dev/null +++ b/internal/leadership/manager_test.go @@ -0,0 +1,73 @@ +//go:build it + +package leadership + +import ( + "github.com/formancehq/go-libs/v2/bun/bunconnect" + "github.com/formancehq/go-libs/v2/logging" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestLeaderShip(t *testing.T) { + t.Parallel() + + ctx := logging.TestingContext() + pgDB := srv.NewDatabase(t) + db, err := bunconnect.OpenSQLDB(ctx, pgDB.ConnectionOptions()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + const count = 10 + + instances := make([]*Manager, count) + for i := range count { + m := NewDefaultLocker(db) + manager := NewManager(m, logging.Testing(), WithRetryPeriod(10*time.Millisecond)) + + go manager.Run(ctx) + instances[i] = manager + } + + selectedLeader := -1 + require.Eventually(t, func() bool { + for index, manager := range instances { + actual := manager.GetSignal().Actual() + if actual.Acquired { + selectedLeader = index + return true + } + } + return false + }, 2*time.Second, 10*time.Millisecond) + require.GreaterOrEqual(t, selectedLeader, 0) + + // ensure the provided db connection is still functionnal + require.NoError(t, instances[selectedLeader]. + GetSignal(). + Actual().DB. + NewSelect(). + Model(&map[string]any{}). + ColumnExpr("1 as v"). + Scan(ctx), + ) + + require.NoError(t, instances[selectedLeader].Stop(ctx)) + + require.Eventually(t, func() bool { + for index, manager := range instances { + if manager.GetSignal().Actual().Acquired { + selectedLeader = index + return true + } + } + return false + }, 2*time.Second, 10*time.Millisecond) + + for _, i := range instances { + require.NoError(t, i.Stop(ctx)) + } +} diff --git a/internal/leadership/module.go b/internal/leadership/module.go index 104f45ccf..d7f1fcfec 100644 --- a/internal/leadership/module.go +++ b/internal/leadership/module.go @@ -7,30 +7,27 @@ import ( func NewFXModule() fx.Option { return fx.Options( - fx.Provide(NewLeadership), + fx.Provide(NewManager), fx.Provide(NewDefaultLocker), - fx.Invoke(func(lc fx.Lifecycle, runner *Leadership) { + fx.Invoke(func(lc fx.Lifecycle, runner *Manager) { var ( - cancel context.CancelFunc stopped = make(chan struct{}) ) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - ctx, cancel = context.WithCancel(context.WithoutCancel(ctx)) go func() { defer close(stopped) - runner.Run(ctx) + runner.Run(context.WithoutCancel(ctx)) }() return nil }, OnStop: func(ctx context.Context) error { - cancel() select { case <-stopped: return nil - case <-ctx.Done(): - return ctx.Err() + default: + return runner.Stop(ctx) } }, }) diff --git a/internal/storage/driver/buckets_generated_test.go b/internal/storage/driver/buckets_generated_test.go index 61635f327..b71780813 100644 --- a/internal/storage/driver/buckets_generated_test.go +++ b/internal/storage/driver/buckets_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../bucket/bucket.go -destination buckets_generated_test.go -package driver --mock_names Factory=BucketFactory . Factory -// - package driver import ( @@ -21,7 +19,6 @@ import ( type MockBucket struct { ctrl *gomock.Controller recorder *MockBucketMockRecorder - isgomock struct{} } // MockBucketMockRecorder is the mock recorder for MockBucket. @@ -138,7 +135,6 @@ func (mr *MockBucketMockRecorder) Migrate(ctx any, opts ...any) *gomock.Call { type BucketFactory struct { ctrl *gomock.Controller recorder *BucketFactoryMockRecorder - isgomock struct{} } // BucketFactoryMockRecorder is the mock recorder for BucketFactory. diff --git a/internal/storage/driver/ledger_generated_test.go b/internal/storage/driver/ledger_generated_test.go index b940e41b3..fb2f8a6ab 100644 --- a/internal/storage/driver/ledger_generated_test.go +++ b/internal/storage/driver/ledger_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../ledger/factory.go -destination ledger_generated_test.go -package driver --mock_names Factory=LedgerStoreFactory . Factory -// - package driver import ( @@ -20,7 +18,6 @@ import ( type LedgerStoreFactory struct { ctrl *gomock.Controller recorder *LedgerStoreFactoryMockRecorder - isgomock struct{} } // LedgerStoreFactoryMockRecorder is the mock recorder for LedgerStoreFactory. diff --git a/internal/storage/driver/system_generated_test.go b/internal/storage/driver/system_generated_test.go index 6ce339a3f..d6afce573 100644 --- a/internal/storage/driver/system_generated_test.go +++ b/internal/storage/driver/system_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../system/store.go -destination system_generated_test.go -package driver --mock_names Store=SystemStore . Store -// - package driver import ( @@ -23,7 +21,6 @@ import ( type SystemStore struct { ctrl *gomock.Controller recorder *SystemStoreMockRecorder - isgomock struct{} } // SystemStoreMockRecorder is the mock recorder for SystemStore. diff --git a/pkg/testserver/server.go b/pkg/testserver/server.go index 1ec8fd314..ab9dc1873 100644 --- a/pkg/testserver/server.go +++ b/pkg/testserver/server.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" + "github.com/formancehq/ledger/internal/leadership" + "github.com/nats-io/nats.go" "io" "net/http" "strings" "time" - "github.com/nats-io/nats.go" - "github.com/formancehq/go-libs/v2/otlp" "github.com/formancehq/go-libs/v2/otlp/otlpmetrics" "github.com/formancehq/go-libs/v2/publish" @@ -203,6 +203,7 @@ func (s *Server) Start() error { ctx := logging.TestingContext() ctx = service.ContextWithLifecycle(ctx) ctx = httpserver.ContextWithServerInfo(ctx) + ctx = leadership.ContextWithLeadershipInfo(ctx) ctx, cancel := context.WithCancel(ctx) go func() { @@ -324,6 +325,10 @@ func (s *Server) URL() string { return httpserver.URL(s.ctx) } +func (s *Server) IsLeader() bool { + return leadership.IsLeader(s.ctx) +} + func New(t T, configuration Configuration) *Server { t.Helper() diff --git a/test/e2e/app_multiple_instance_test.go b/test/e2e/app_multiple_instance_test.go index 25f19bf3d..f056728f0 100644 --- a/test/e2e/app_multiple_instance_test.go +++ b/test/e2e/app_multiple_instance_test.go @@ -3,12 +3,14 @@ package test_suite import ( + "context" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/go-libs/v2/testing/platform/pgtesting" . "github.com/formancehq/ledger/pkg/testserver" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "sync" + "time" ) var _ = Context("Ledger application multiple instance tests", func() { @@ -20,9 +22,9 @@ var _ = Context("Ledger application multiple instance tests", func() { const nbServer = 3 When("starting multiple instances of the service", func() { - var servers chan *Server + var allServers []*Server BeforeEach(func() { - servers = make(chan *Server, nbServer) + servers := make(chan *Server, nbServer) wg := sync.WaitGroup{} wg.Add(nbServer) waitStart := make(chan struct{}) @@ -31,7 +33,7 @@ var _ = Context("Ledger application multiple instance tests", func() { defer GinkgoRecover() defer wg.Done() - // Best effort to start all servers at the same time + // Best effort to start all servers at the same time and detect conflict errors <-waitStart servers <- New(GinkgoT(), Configuration{ @@ -46,14 +48,66 @@ var _ = Context("Ledger application multiple instance tests", func() { close(waitStart) wg.Wait() close(servers) + + for server := range servers { + allServers = append(allServers, server) + } }) It("each service should be up and running", func() { - for server := range servers { + + for _, server := range allServers { info, err := server.Client().Ledger.GetInfo(ctx) Expect(err).NotTo(HaveOccurred()) Expect(info.V2ConfigInfoResponse.Version).To(Equal("develop")) } + + By("Only one should be a leader", func() { + Eventually(func() bool { + leaderFound := false + for _, server := range allServers { + if server.IsLeader() { + if leaderFound { + Fail("Multiple leaders found") + } + leaderFound = true + } + } + return leaderFound + }).WithTimeout(5 * time.Second).Should(BeTrue()) + }) + }) + Context("When a leader is elected", func() { + var ( + selected int + ) + BeforeEach(func() { + Eventually(func() int { + for index, server := range allServers { + if server.IsLeader() { + selected = index + return index + } + } + return -1 + }).Should(Not(Equal(-1))) + }) + Context("and the leader dies", func() { + BeforeEach(func() { + Expect(allServers[selected].Stop(context.TODO())).To(BeNil()) + allServers = append(allServers[:selected], allServers[selected+1:]...) + }) + It("should select another instance", func() { + Eventually(func() bool { + for _, server := range allServers { + if server.IsLeader() { + return true + } + } + return false + }).Should(BeTrue()) + }) + }) }) }) }) From d9778e3be5d039989f0efe3eab0bcdd26941151a Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 29 Jan 2025 16:59:45 +0100 Subject: [PATCH 3/7] fix: race --- .../api/bulking/mocks_ledger_controller_test.go | 3 +++ .../api/common/mocks_ledger_controller_test.go | 3 +++ .../api/common/mocks_system_controller_test.go | 3 +++ internal/api/v1/mocks_ledger_controller_test.go | 3 +++ internal/api/v1/mocks_system_controller_test.go | 3 +++ internal/api/v2/mocks_ledger_controller_test.go | 3 +++ internal/api/v2/mocks_system_controller_test.go | 3 +++ .../controller/ledger/controller_generated_test.go | 3 +++ ...with_too_many_client_handling_generated_test.go | 3 +++ .../controller/ledger/listener_generated_test.go | 3 +++ .../ledger/numscript_parser_generated_test.go | 3 +++ .../ledger/numscript_runtime_generated_test.go | 3 +++ internal/controller/ledger/store_generated_test.go | 5 +++++ internal/leadership/context.go | 14 ++++++++++++-- internal/storage/driver/buckets_generated_test.go | 4 ++++ internal/storage/driver/ledger_generated_test.go | 3 +++ internal/storage/driver/system_generated_test.go | 3 +++ 17 files changed, 63 insertions(+), 2 deletions(-) diff --git a/internal/api/bulking/mocks_ledger_controller_test.go b/internal/api/bulking/mocks_ledger_controller_test.go index 2cede2100..cbf90fc33 100644 --- a/internal/api/bulking/mocks_ledger_controller_test.go +++ b/internal/api/bulking/mocks_ledger_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package bulking --mock_names Controller=LedgerController . Controller +// + package bulking import ( @@ -21,6 +23,7 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder + isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/common/mocks_ledger_controller_test.go b/internal/api/common/mocks_ledger_controller_test.go index c263cfa1f..01e775d3f 100644 --- a/internal/api/common/mocks_ledger_controller_test.go +++ b/internal/api/common/mocks_ledger_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package common --mock_names Controller=LedgerController . Controller +// + package common import ( @@ -21,6 +23,7 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder + isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/common/mocks_system_controller_test.go b/internal/api/common/mocks_system_controller_test.go index b0fbeaea8..0c85dc246 100644 --- a/internal/api/common/mocks_system_controller_test.go +++ b/internal/api/common/mocks_system_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/system/controller.go -destination mocks_system_controller_test.go -package common --mock_names Controller=SystemController . Controller +// + package common import ( @@ -19,6 +21,7 @@ import ( type SystemController struct { ctrl *gomock.Controller recorder *SystemControllerMockRecorder + isgomock struct{} } // SystemControllerMockRecorder is the mock recorder for SystemController. diff --git a/internal/api/v1/mocks_ledger_controller_test.go b/internal/api/v1/mocks_ledger_controller_test.go index f89439826..2f3a686e1 100644 --- a/internal/api/v1/mocks_ledger_controller_test.go +++ b/internal/api/v1/mocks_ledger_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package v1 --mock_names Controller=LedgerController . Controller +// + package v1 import ( @@ -21,6 +23,7 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder + isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/v1/mocks_system_controller_test.go b/internal/api/v1/mocks_system_controller_test.go index 1ad57614e..f3f19b232 100644 --- a/internal/api/v1/mocks_system_controller_test.go +++ b/internal/api/v1/mocks_system_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/system/controller.go -destination mocks_system_controller_test.go -package v1 --mock_names Controller=SystemController . Controller +// + package v1 import ( @@ -19,6 +21,7 @@ import ( type SystemController struct { ctrl *gomock.Controller recorder *SystemControllerMockRecorder + isgomock struct{} } // SystemControllerMockRecorder is the mock recorder for SystemController. diff --git a/internal/api/v2/mocks_ledger_controller_test.go b/internal/api/v2/mocks_ledger_controller_test.go index 2cbbfee4a..a0d043ca4 100644 --- a/internal/api/v2/mocks_ledger_controller_test.go +++ b/internal/api/v2/mocks_ledger_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/ledger/controller.go -destination mocks_ledger_controller_test.go -package v2 --mock_names Controller=LedgerController . Controller +// + package v2 import ( @@ -21,6 +23,7 @@ import ( type LedgerController struct { ctrl *gomock.Controller recorder *LedgerControllerMockRecorder + isgomock struct{} } // LedgerControllerMockRecorder is the mock recorder for LedgerController. diff --git a/internal/api/v2/mocks_system_controller_test.go b/internal/api/v2/mocks_system_controller_test.go index 45d1eaaa5..c4d8c215a 100644 --- a/internal/api/v2/mocks_system_controller_test.go +++ b/internal/api/v2/mocks_system_controller_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../../controller/system/controller.go -destination mocks_system_controller_test.go -package v2 --mock_names Controller=SystemController . Controller +// + package v2 import ( @@ -19,6 +21,7 @@ import ( type SystemController struct { ctrl *gomock.Controller recorder *SystemControllerMockRecorder + isgomock struct{} } // SystemControllerMockRecorder is the mock recorder for SystemController. diff --git a/internal/controller/ledger/controller_generated_test.go b/internal/controller/ledger/controller_generated_test.go index 7e6601231..1495b8c74 100644 --- a/internal/controller/ledger/controller_generated_test.go +++ b/internal/controller/ledger/controller_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source controller.go -destination controller_generated_test.go -package ledger . Controller +// + package ledger import ( @@ -20,6 +22,7 @@ import ( type MockController struct { ctrl *gomock.Controller recorder *MockControllerMockRecorder + isgomock struct{} } // MockControllerMockRecorder is the mock recorder for MockController. diff --git a/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go b/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go index 2f0c421cb..9f397752e 100644 --- a/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go +++ b/internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source controller_with_too_many_client_handling.go -destination controller_with_too_many_client_handling_generated_test.go -package ledger . DelayCalculator -typed +// + package ledger import ( @@ -16,6 +18,7 @@ import ( type MockDelayCalculator struct { ctrl *gomock.Controller recorder *MockDelayCalculatorMockRecorder + isgomock struct{} } // MockDelayCalculatorMockRecorder is the mock recorder for MockDelayCalculator. diff --git a/internal/controller/ledger/listener_generated_test.go b/internal/controller/ledger/listener_generated_test.go index 44df6a6a7..e0e7e584c 100644 --- a/internal/controller/ledger/listener_generated_test.go +++ b/internal/controller/ledger/listener_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source listener.go -destination listener_generated_test.go -package ledger . Listener +// + package ledger import ( @@ -18,6 +20,7 @@ import ( type MockListener struct { ctrl *gomock.Controller recorder *MockListenerMockRecorder + isgomock struct{} } // MockListenerMockRecorder is the mock recorder for MockListener. diff --git a/internal/controller/ledger/numscript_parser_generated_test.go b/internal/controller/ledger/numscript_parser_generated_test.go index f319d367d..5219a92e2 100644 --- a/internal/controller/ledger/numscript_parser_generated_test.go +++ b/internal/controller/ledger/numscript_parser_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source numscript_parser.go -destination numscript_parser_generated_test.go -package ledger . NumscriptParser +// + package ledger import ( @@ -15,6 +17,7 @@ import ( type MockNumscriptParser struct { ctrl *gomock.Controller recorder *MockNumscriptParserMockRecorder + isgomock struct{} } // MockNumscriptParserMockRecorder is the mock recorder for MockNumscriptParser. diff --git a/internal/controller/ledger/numscript_runtime_generated_test.go b/internal/controller/ledger/numscript_runtime_generated_test.go index 254a78556..8a6343843 100644 --- a/internal/controller/ledger/numscript_runtime_generated_test.go +++ b/internal/controller/ledger/numscript_runtime_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source numscript_runtime.go -destination numscript_runtime_generated_test.go -package ledger . NumscriptRuntime +// + package ledger import ( @@ -16,6 +18,7 @@ import ( type MockNumscriptRuntime struct { ctrl *gomock.Controller recorder *MockNumscriptRuntimeMockRecorder + isgomock struct{} } // MockNumscriptRuntimeMockRecorder is the mock recorder for MockNumscriptRuntime. diff --git a/internal/controller/ledger/store_generated_test.go b/internal/controller/ledger/store_generated_test.go index 7a677d58e..accb45087 100644 --- a/internal/controller/ledger/store_generated_test.go +++ b/internal/controller/ledger/store_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source store.go -destination store_generated_test.go -package ledger . PaginatedResource +// + package ledger import ( @@ -23,6 +25,7 @@ import ( type MockStore struct { ctrl *gomock.Controller recorder *MockStoreMockRecorder + isgomock struct{} } // MockStoreMockRecorder is the mock recorder for MockStore. @@ -370,6 +373,7 @@ func (mr *MockStoreMockRecorder) Volumes() *gomock.Call { type MockResource[ResourceType any, OptionsType any] struct { ctrl *gomock.Controller recorder *MockResourceMockRecorder[ResourceType, OptionsType] + isgomock struct{} } // MockResourceMockRecorder is the mock recorder for MockResource. @@ -423,6 +427,7 @@ func (mr *MockResourceMockRecorder[ResourceType, OptionsType]) GetOne(ctx, query type MockPaginatedResource[ResourceType any, OptionsType any, PaginationQueryType PaginatedQuery[OptionsType]] struct { ctrl *gomock.Controller recorder *MockPaginatedResourceMockRecorder[ResourceType, OptionsType, PaginationQueryType] + isgomock struct{} } // MockPaginatedResourceMockRecorder is the mock recorder for MockPaginatedResource. diff --git a/internal/leadership/context.go b/internal/leadership/context.go index 4311b2b63..a79bdb450 100644 --- a/internal/leadership/context.go +++ b/internal/leadership/context.go @@ -2,6 +2,7 @@ package leadership import ( "context" + "sync" ) type contextKey string @@ -17,7 +18,11 @@ func IsLeader(ctx context.Context) bool { if h == nil { return false } - return h.(*holder).isLeader + holder := h.(*holder) + holder.Lock() + defer holder.Unlock() + + return holder.isLeader } func setIsLeader(ctx context.Context, isLeader bool) { @@ -25,9 +30,14 @@ func setIsLeader(ctx context.Context, isLeader bool) { if h == nil { return } - h.(*holder).isLeader = isLeader + holder := h.(*holder) + holder.Lock() + defer holder.Unlock() + + holder.isLeader = isLeader } type holder struct { + sync.Mutex isLeader bool } diff --git a/internal/storage/driver/buckets_generated_test.go b/internal/storage/driver/buckets_generated_test.go index b71780813..61635f327 100644 --- a/internal/storage/driver/buckets_generated_test.go +++ b/internal/storage/driver/buckets_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../bucket/bucket.go -destination buckets_generated_test.go -package driver --mock_names Factory=BucketFactory . Factory +// + package driver import ( @@ -19,6 +21,7 @@ import ( type MockBucket struct { ctrl *gomock.Controller recorder *MockBucketMockRecorder + isgomock struct{} } // MockBucketMockRecorder is the mock recorder for MockBucket. @@ -135,6 +138,7 @@ func (mr *MockBucketMockRecorder) Migrate(ctx any, opts ...any) *gomock.Call { type BucketFactory struct { ctrl *gomock.Controller recorder *BucketFactoryMockRecorder + isgomock struct{} } // BucketFactoryMockRecorder is the mock recorder for BucketFactory. diff --git a/internal/storage/driver/ledger_generated_test.go b/internal/storage/driver/ledger_generated_test.go index fb2f8a6ab..b940e41b3 100644 --- a/internal/storage/driver/ledger_generated_test.go +++ b/internal/storage/driver/ledger_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../ledger/factory.go -destination ledger_generated_test.go -package driver --mock_names Factory=LedgerStoreFactory . Factory +// + package driver import ( @@ -18,6 +20,7 @@ import ( type LedgerStoreFactory struct { ctrl *gomock.Controller recorder *LedgerStoreFactoryMockRecorder + isgomock struct{} } // LedgerStoreFactoryMockRecorder is the mock recorder for LedgerStoreFactory. diff --git a/internal/storage/driver/system_generated_test.go b/internal/storage/driver/system_generated_test.go index d6afce573..6ce339a3f 100644 --- a/internal/storage/driver/system_generated_test.go +++ b/internal/storage/driver/system_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../system/store.go -destination system_generated_test.go -package driver --mock_names Store=SystemStore . Store +// + package driver import ( @@ -21,6 +23,7 @@ import ( type SystemStore struct { ctrl *gomock.Controller recorder *SystemStoreMockRecorder + isgomock struct{} } // SystemStoreMockRecorder is the mock recorder for SystemStore. From 382771123d2c773b403d650619645a06cc5f763b Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 30 Jan 2025 12:18:59 +0100 Subject: [PATCH 4/7] chore: coderabbit --- internal/leadership/broadcaster.go | 3 +-- internal/leadership/context.go | 4 ++-- internal/leadership/locker.go | 2 +- internal/leadership/manager_test.go | 7 +++++++ 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/leadership/broadcaster.go b/internal/leadership/broadcaster.go index b0f084878..1b84c8a4b 100644 --- a/internal/leadership/broadcaster.go +++ b/internal/leadership/broadcaster.go @@ -9,7 +9,7 @@ type listener struct { } type Broadcaster struct { - mu *sync.Mutex + mu sync.Mutex t *Leadership inner []listener @@ -81,6 +81,5 @@ func (h *Broadcaster) CountListeners() int { func NewSignal() *Broadcaster { return &Broadcaster{ outer: make(chan Leadership), - mu: &sync.Mutex{}, } } diff --git a/internal/leadership/context.go b/internal/leadership/context.go index a79bdb450..58766edde 100644 --- a/internal/leadership/context.go +++ b/internal/leadership/context.go @@ -5,9 +5,9 @@ import ( "sync" ) -type contextKey string +type contextKey struct{} -var holderContextKey contextKey = "holder" +var holderContextKey contextKey = struct{}{} func ContextWithLeadershipInfo(ctx context.Context) context.Context { return context.WithValue(ctx, holderContextKey, &holder{}) diff --git a/internal/leadership/locker.go b/internal/leadership/locker.go index 3bebb246d..4fe2fc5a5 100644 --- a/internal/leadership/locker.go +++ b/internal/leadership/locker.go @@ -38,7 +38,7 @@ func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) { var acquired bool if err := ret.Scan(&acquired); err != nil { _ = conn.Close() - panic(err) + return nil, err } if !acquired { diff --git a/internal/leadership/manager_test.go b/internal/leadership/manager_test.go index 5151ed64e..2cde900f4 100644 --- a/internal/leadership/manager_test.go +++ b/internal/leadership/manager_test.go @@ -43,6 +43,13 @@ func TestLeaderShip(t *testing.T) { } return false }, 2*time.Second, 10*time.Millisecond) + leaderCount := 0 + for _, manager := range instances { + if manager.GetSignal().Actual().Acquired { + leaderCount++ + } + } + require.Equal(t, 1, leaderCount) require.GreaterOrEqual(t, selectedLeader, 0) // ensure the provided db connection is still functionnal From ef690f0332a68f964199adbad77d5460ea02b130 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 30 Jan 2025 13:38:41 +0100 Subject: [PATCH 5/7] fix: from review --- internal/leadership/broadcaster.go | 14 +++++++++++--- internal/leadership/mutex.go | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 internal/leadership/mutex.go diff --git a/internal/leadership/broadcaster.go b/internal/leadership/broadcaster.go index 1b84c8a4b..7414d585e 100644 --- a/internal/leadership/broadcaster.go +++ b/internal/leadership/broadcaster.go @@ -31,10 +31,10 @@ func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) { defer h.mu.Unlock() newChannel := make(chan Leadership, 1) - index := len(h.inner) - h.inner = append(h.inner, listener{ + l := listener{ channel: newChannel, - }) + } + h.inner = append(h.inner, l) if h.t != nil { newChannel <- *h.t } @@ -43,6 +43,14 @@ func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) { h.mu.Lock() defer h.mu.Unlock() + index := -1 + for i, listener := range h.inner { + if listener == l { + index = i + break + } + } + if index < len(h.inner)-1 { h.inner = append(h.inner[:index], h.inner[index+1:]...) } else { diff --git a/internal/leadership/mutex.go b/internal/leadership/mutex.go new file mode 100644 index 000000000..e4dcb50d3 --- /dev/null +++ b/internal/leadership/mutex.go @@ -0,0 +1,20 @@ +package leadership + +import "sync" + +type Mutex[T any] struct { + *sync.Mutex + t T +} + +func (m *Mutex[T]) Lock() (T, func()) { + m.Mutex.Lock() + return m.t, m.Unlock +} + +func NewMutex[T any](t T) *Mutex[T] { + return &Mutex[T]{ + Mutex: &sync.Mutex{}, + t: t, + } +} From 279ff6fe455ededf4484b82d9bb7163d49654b42 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 30 Jan 2025 14:10:15 +0100 Subject: [PATCH 6/7] feat: monitor connection --- internal/leadership/leadership.go | 2 +- internal/leadership/manager.go | 42 ++++++++++++++++++++++++----- internal/leadership/manager_test.go | 18 ++++++++----- internal/leadership/mutex.go | 21 +++++++++------ 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/internal/leadership/leadership.go b/internal/leadership/leadership.go index 27814df15..1f40c97a4 100644 --- a/internal/leadership/leadership.go +++ b/internal/leadership/leadership.go @@ -2,5 +2,5 @@ package leadership type Leadership struct { Acquired bool - DB DBHandle + DB *Mutex } diff --git a/internal/leadership/manager.go b/internal/leadership/manager.go index 0ac3c007c..a8fb75b60 100644 --- a/internal/leadership/manager.go +++ b/internal/leadership/manager.go @@ -3,6 +3,7 @@ package leadership import ( "context" "github.com/formancehq/go-libs/v2/logging" + "github.com/uptrace/bun" "time" ) @@ -16,16 +17,19 @@ type Manager struct { func (m *Manager) Run(ctx context.Context) { var ( - db DBHandle + dbMutex *Mutex nextRetry = time.After(time.Duration(0)) - err error + nextPing <-chan time.Time ) for { select { case ch := <-m.stopChannel: - if db != nil { + if dbMutex != nil { m.logger.Info("leadership lost") - _ = db.Close() + dbMutex.Exec(func(_ bun.IDB) { + _ = dbMutex.db.Close() + }) + setIsLeader(ctx, false) m.changes.Broadcast(Leadership{}) } @@ -33,7 +37,7 @@ func (m *Manager) Run(ctx context.Context) { close(m.stopChannel) return case <-nextRetry: - db, err = m.locker.Take(ctx) + db, err := m.locker.Take(ctx) if err != nil || db == nil { if err != nil { m.logger.Error("error acquiring lock", err) @@ -42,13 +46,39 @@ func (m *Manager) Run(ctx context.Context) { continue } + dbMutex = NewMutex(db) + m.changes.Broadcast(Leadership{ - DB: db, + DB: dbMutex, Acquired: true, }) m.logger.Info("leadership acquired") setIsLeader(ctx, true) + + nextPing = time.After(m.retryPeriod) + + // Ping the database to check the connection status + // If the connection is lost, signal the listeners about the leadership loss + case <-nextPing: + dbMutex.Exec(func(db bun.IDB) { + _, err := db. + NewSelect(). + ColumnExpr("1 as v"). + Count(ctx) + if err != nil { + m.logger.Error("error pinging db", err) + _ = dbMutex.db.Close() + dbMutex = nil + + setIsLeader(ctx, false) + m.changes.Broadcast(Leadership{}) + + nextRetry = time.After(m.retryPeriod) + } else { + nextPing = time.After(m.retryPeriod) + } + }) } } } diff --git a/internal/leadership/manager_test.go b/internal/leadership/manager_test.go index 2cde900f4..427cb507b 100644 --- a/internal/leadership/manager_test.go +++ b/internal/leadership/manager_test.go @@ -6,6 +6,7 @@ import ( "github.com/formancehq/go-libs/v2/bun/bunconnect" "github.com/formancehq/go-libs/v2/logging" "github.com/stretchr/testify/require" + "github.com/uptrace/bun" "testing" "time" ) @@ -52,15 +53,18 @@ func TestLeaderShip(t *testing.T) { require.Equal(t, 1, leaderCount) require.GreaterOrEqual(t, selectedLeader, 0) - // ensure the provided db connection is still functionnal - require.NoError(t, instances[selectedLeader]. + // ensure the provided db connection is still functional + instances[selectedLeader]. GetSignal(). Actual().DB. - NewSelect(). - Model(&map[string]any{}). - ColumnExpr("1 as v"). - Scan(ctx), - ) + Exec(func(db bun.IDB) { + require.NoError(t, db. + NewSelect(). + Model(&map[string]any{}). + ColumnExpr("1 as v"). + Scan(ctx), + ) + }) require.NoError(t, instances[selectedLeader].Stop(ctx)) diff --git a/internal/leadership/mutex.go b/internal/leadership/mutex.go index e4dcb50d3..68c96a866 100644 --- a/internal/leadership/mutex.go +++ b/internal/leadership/mutex.go @@ -1,20 +1,25 @@ package leadership -import "sync" +import ( + "github.com/uptrace/bun" + "sync" +) -type Mutex[T any] struct { +type Mutex struct { *sync.Mutex - t T + db DBHandle } -func (m *Mutex[T]) Lock() (T, func()) { +func (m *Mutex) Exec(fn func(db bun.IDB)) { m.Mutex.Lock() - return m.t, m.Unlock + defer m.Mutex.Unlock() + + fn(m.db) } -func NewMutex[T any](t T) *Mutex[T] { - return &Mutex[T]{ +func NewMutex(db DBHandle) *Mutex { + return &Mutex{ Mutex: &sync.Mutex{}, - t: t, + db: db, } } From 1d6200aae3f4f1459c5e8dc309a06d1a99ce262d Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 30 Jan 2025 14:59:38 +0100 Subject: [PATCH 7/7] test: add tests --- internal/leadership/broadcaster.go | 55 ++++++-------- internal/leadership/broadcaster_test.go | 73 +++++++++++++++++++ .../{mutex.go => database_handle.go} | 8 +- internal/leadership/leadership.go | 2 +- internal/leadership/manager.go | 13 ++-- internal/leadership/manager_test.go | 41 ++++++++++- 6 files changed, 145 insertions(+), 47 deletions(-) create mode 100644 internal/leadership/broadcaster_test.go rename internal/leadership/{mutex.go => database_handle.go} (54%) diff --git a/internal/leadership/broadcaster.go b/internal/leadership/broadcaster.go index 7414d585e..53eaee93f 100644 --- a/internal/leadership/broadcaster.go +++ b/internal/leadership/broadcaster.go @@ -4,34 +4,35 @@ import ( "sync" ) -type listener struct { - channel chan Leadership +type listener[T any] struct { + channel chan T } -type Broadcaster struct { +type Broadcaster[T any] struct { mu sync.Mutex - t *Leadership + t *T - inner []listener - outer chan Leadership + inner []listener[T] + outer chan T } -func (h *Broadcaster) Actual() Leadership { +func (h *Broadcaster[T]) Actual() T { h.mu.Lock() defer h.mu.Unlock() if h.t == nil { - return Leadership{} + var t T + return t } return *h.t } -func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) { +func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) { h.mu.Lock() defer h.mu.Unlock() - newChannel := make(chan Leadership, 1) - l := listener{ + newChannel := make(chan T, 1) + l := listener[T]{ channel: newChannel, } h.inner = append(h.inner, l) @@ -43,23 +44,20 @@ func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) { h.mu.Lock() defer h.mu.Unlock() - index := -1 - for i, listener := range h.inner { + for index, listener := range h.inner { if listener == l { - index = i + if index < len(h.inner)-1 { + h.inner = append(h.inner[:index], h.inner[index+1:]...) + } else { + h.inner = h.inner[:index] + } break } } - - if index < len(h.inner)-1 { - h.inner = append(h.inner[:index], h.inner[index+1:]...) - } else { - h.inner = h.inner[:index] - } } } -func (h *Broadcaster) Broadcast(t Leadership) { +func (h *Broadcaster[T]) Broadcast(t T) { h.mu.Lock() defer h.mu.Unlock() @@ -70,7 +68,7 @@ func (h *Broadcaster) Broadcast(t Leadership) { } } -func (h *Broadcaster) Close() { +func (h *Broadcaster[T]) Close() { h.mu.Lock() defer h.mu.Unlock() @@ -79,15 +77,8 @@ func (h *Broadcaster) Close() { } } -func (h *Broadcaster) CountListeners() int { - h.mu.Lock() - defer h.mu.Unlock() - - return len(h.inner) -} - -func NewSignal() *Broadcaster { - return &Broadcaster{ - outer: make(chan Leadership), +func NewBroadcaster[T any]() *Broadcaster[T] { + return &Broadcaster[T]{ + outer: make(chan T), } } diff --git a/internal/leadership/broadcaster_test.go b/internal/leadership/broadcaster_test.go new file mode 100644 index 000000000..86c54c1d5 --- /dev/null +++ b/internal/leadership/broadcaster_test.go @@ -0,0 +1,73 @@ +package leadership + +import ( + "testing" + "time" +) + +func TestBroadcaster(t *testing.T) { + t.Parallel() + + broadcaster := NewBroadcaster[struct{}]() + t.Cleanup(broadcaster.Close) + + const nbSubscriptions = 5 + + subscriptions := make([]<-chan struct{}, nbSubscriptions) + releases := make([]func(), nbSubscriptions) + + for i := 0; i < nbSubscriptions; i++ { + subscriptions[i], releases[i] = broadcaster.Subscribe() + } + + go broadcaster.Broadcast(struct{}{}) + + for _, subscription := range subscriptions { + select { + case <-subscription: + case <-time.After(time.Second): + t.Fatal("timeout waiting for broadcast") + } + } + + releases[2]() + subscriptions = append(subscriptions[:2], subscriptions[3:]...) + releases = append(releases[:2], releases[3:]...) + + go broadcaster.Broadcast(struct{}{}) + + for _, subscription := range subscriptions { + select { + case <-subscription: + case <-time.After(time.Second): + t.Fatal("timeout waiting for broadcast") + } + } + + releases[0]() + subscriptions = subscriptions[1:] + releases = releases[1:] + + go broadcaster.Broadcast(struct{}{}) + + for _, subscription := range subscriptions { + select { + case <-subscription: + case <-time.After(time.Second): + t.Fatal("timeout waiting for broadcast") + } + } + + releases[2]() + subscriptions = subscriptions[:2] + + go broadcaster.Broadcast(struct{}{}) + + for _, subscription := range subscriptions { + select { + case <-subscription: + case <-time.After(time.Second): + t.Fatal("timeout waiting for broadcast") + } + } +} diff --git a/internal/leadership/mutex.go b/internal/leadership/database_handle.go similarity index 54% rename from internal/leadership/mutex.go rename to internal/leadership/database_handle.go index 68c96a866..61ba3bb29 100644 --- a/internal/leadership/mutex.go +++ b/internal/leadership/database_handle.go @@ -5,20 +5,20 @@ import ( "sync" ) -type Mutex struct { +type DatabaseHandle struct { *sync.Mutex db DBHandle } -func (m *Mutex) Exec(fn func(db bun.IDB)) { +func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) { m.Mutex.Lock() defer m.Mutex.Unlock() fn(m.db) } -func NewMutex(db DBHandle) *Mutex { - return &Mutex{ +func NewDatabaseHandle(db DBHandle) *DatabaseHandle { + return &DatabaseHandle{ Mutex: &sync.Mutex{}, db: db, } diff --git a/internal/leadership/leadership.go b/internal/leadership/leadership.go index 1f40c97a4..e5428e1b7 100644 --- a/internal/leadership/leadership.go +++ b/internal/leadership/leadership.go @@ -2,5 +2,5 @@ package leadership type Leadership struct { Acquired bool - DB *Mutex + DB *DatabaseHandle } diff --git a/internal/leadership/manager.go b/internal/leadership/manager.go index a8fb75b60..45925bbf1 100644 --- a/internal/leadership/manager.go +++ b/internal/leadership/manager.go @@ -9,7 +9,7 @@ import ( type Manager struct { locker Locker - changes *Broadcaster + changes *Broadcaster[Leadership] logger logging.Logger retryPeriod time.Duration stopChannel chan chan struct{} @@ -17,7 +17,7 @@ type Manager struct { func (m *Manager) Run(ctx context.Context) { var ( - dbMutex *Mutex + dbMutex *DatabaseHandle nextRetry = time.After(time.Duration(0)) nextPing <-chan time.Time ) @@ -46,7 +46,7 @@ func (m *Manager) Run(ctx context.Context) { continue } - dbMutex = NewMutex(db) + dbMutex = NewDatabaseHandle(db) m.changes.Broadcast(Leadership{ DB: dbMutex, @@ -67,7 +67,7 @@ func (m *Manager) Run(ctx context.Context) { ColumnExpr("1 as v"). Count(ctx) if err != nil { - m.logger.Error("error pinging db", err) + m.logger.Errorf("error pinging db: %s", err) _ = dbMutex.db.Close() dbMutex = nil @@ -87,6 +87,7 @@ func (m *Manager) Stop(ctx context.Context) error { select { // if already closed case <-m.stopChannel: + m.changes.Close() return nil default: ch := make(chan struct{}) @@ -100,7 +101,7 @@ func (m *Manager) Stop(ctx context.Context) error { } } -func (m *Manager) GetSignal() *Broadcaster { +func (m *Manager) GetBroadcaster() *Broadcaster[Leadership] { return m.changes } @@ -108,7 +109,7 @@ func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manage l := &Manager{ locker: locker, logger: logger, - changes: NewSignal(), + changes: NewBroadcaster[Leadership](), retryPeriod: 2 * time.Second, stopChannel: make(chan chan struct{}), } diff --git a/internal/leadership/manager_test.go b/internal/leadership/manager_test.go index 427cb507b..3820d6784 100644 --- a/internal/leadership/manager_test.go +++ b/internal/leadership/manager_test.go @@ -36,7 +36,7 @@ func TestLeaderShip(t *testing.T) { selectedLeader := -1 require.Eventually(t, func() bool { for index, manager := range instances { - actual := manager.GetSignal().Actual() + actual := manager.GetBroadcaster().Actual() if actual.Acquired { selectedLeader = index return true @@ -46,7 +46,7 @@ func TestLeaderShip(t *testing.T) { }, 2*time.Second, 10*time.Millisecond) leaderCount := 0 for _, manager := range instances { - if manager.GetSignal().Actual().Acquired { + if manager.GetBroadcaster().Actual().Acquired { leaderCount++ } } @@ -55,7 +55,7 @@ func TestLeaderShip(t *testing.T) { // ensure the provided db connection is still functional instances[selectedLeader]. - GetSignal(). + GetBroadcaster(). Actual().DB. Exec(func(db bun.IDB) { require.NoError(t, db. @@ -66,11 +66,44 @@ func TestLeaderShip(t *testing.T) { ) }) + // Stop the instance to trigger a new leader election require.NoError(t, instances[selectedLeader].Stop(ctx)) require.Eventually(t, func() bool { for index, manager := range instances { - if manager.GetSignal().Actual().Acquired { + if manager.GetBroadcaster().Actual().Acquired { + selectedLeader = index + return true + } + } + return false + }, 2*time.Second, 10*time.Millisecond) + + broadcaster := instances[selectedLeader].GetBroadcaster() + subscription, release := broadcaster.Subscribe() + t.Cleanup(release) + + // We will receive the leadership on the subscription + select { + case <-subscription: + case <-time.After(time.Second): + t.Fatal("timeout waiting for leadership acquirement") + } + + // Close the database connection of the actual leader to check the manager is able to detect the connection loss + require.NoError(t, instances[selectedLeader].GetBroadcaster().Actual().DB.db.Close()) + + select { + case leadership := <-subscription: + require.Equal(t, Leadership{}, leadership) + case <-time.After(time.Second): + t.Fatal("timeout waiting for leadership loss") + } + release() + + require.Eventually(t, func() bool { + for index, manager := range instances { + if manager.GetBroadcaster().Actual().Acquired { selectedLeader = index return true }