Skip to content

Commit 935699b

Browse files
committed
feat: monitor connection
1 parent f39ef3f commit 935699b

File tree

4 files changed

+61
-22
lines changed

4 files changed

+61
-22
lines changed

internal/leadership/leadership.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ package leadership
22

33
type Leadership struct {
44
Acquired bool
5-
DB DBHandle
5+
DB *Mutex
66
}

internal/leadership/manager.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package leadership
33
import (
44
"context"
55
"github.com/formancehq/go-libs/v2/logging"
6+
"github.com/uptrace/bun"
67
"time"
78
)
89

@@ -16,24 +17,27 @@ type Manager struct {
1617

1718
func (m *Manager) Run(ctx context.Context) {
1819
var (
19-
db DBHandle
20+
dbMutex *Mutex
2021
nextRetry = time.After(time.Duration(0))
21-
err error
22+
nextPing <-chan time.Time
2223
)
2324
for {
2425
select {
2526
case ch := <-m.stopChannel:
26-
if db != nil {
27+
if dbMutex != nil {
2728
m.logger.Info("leadership lost")
28-
_ = db.Close()
29+
dbMutex.Exec(func(_ bun.IDB) {
30+
_ = dbMutex.db.Close()
31+
})
32+
2933
setIsLeader(ctx, false)
3034
m.changes.Broadcast(Leadership{})
3135
}
3236
close(ch)
3337
close(m.stopChannel)
3438
return
3539
case <-nextRetry:
36-
db, err = m.locker.Take(ctx)
40+
db, err := m.locker.Take(ctx)
3741
if err != nil || db == nil {
3842
if err != nil {
3943
m.logger.Error("error acquiring lock", err)
@@ -42,13 +46,39 @@ func (m *Manager) Run(ctx context.Context) {
4246
continue
4347
}
4448

49+
dbMutex = NewMutex(db)
50+
4551
m.changes.Broadcast(Leadership{
46-
DB: db,
52+
DB: dbMutex,
4753
Acquired: true,
4854
})
4955
m.logger.Info("leadership acquired")
5056

5157
setIsLeader(ctx, true)
58+
59+
nextPing = time.After(m.retryPeriod)
60+
61+
// Ping the database to check the connection status
62+
// If the connection is lost, signal the listeners about the leadership loss
63+
case <-nextPing:
64+
dbMutex.Exec(func(db bun.IDB) {
65+
_, err := db.
66+
NewSelect().
67+
ColumnExpr("1 as v").
68+
Count(ctx)
69+
if err != nil {
70+
m.logger.Error("error pinging db", err)
71+
_ = dbMutex.db.Close()
72+
dbMutex = nil
73+
74+
setIsLeader(ctx, false)
75+
m.changes.Broadcast(Leadership{})
76+
77+
nextRetry = time.After(m.retryPeriod)
78+
} else {
79+
nextPing = time.After(m.retryPeriod)
80+
}
81+
})
5282
}
5383
}
5484
}

internal/leadership/manager_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/formancehq/go-libs/v2/bun/bunconnect"
77
"github.com/formancehq/go-libs/v2/logging"
88
"github.com/stretchr/testify/require"
9+
"github.com/uptrace/bun"
910
"testing"
1011
"time"
1112
)
@@ -52,15 +53,18 @@ func TestLeaderShip(t *testing.T) {
5253
require.Equal(t, 1, leaderCount)
5354
require.GreaterOrEqual(t, selectedLeader, 0)
5455

55-
// ensure the provided db connection is still functionnal
56-
require.NoError(t, instances[selectedLeader].
56+
// ensure the provided db connection is still functional
57+
instances[selectedLeader].
5758
GetSignal().
5859
Actual().DB.
59-
NewSelect().
60-
Model(&map[string]any{}).
61-
ColumnExpr("1 as v").
62-
Scan(ctx),
63-
)
60+
Exec(func(db bun.IDB) {
61+
require.NoError(t, db.
62+
NewSelect().
63+
Model(&map[string]any{}).
64+
ColumnExpr("1 as v").
65+
Scan(ctx),
66+
)
67+
})
6468

6569
require.NoError(t, instances[selectedLeader].Stop(ctx))
6670

internal/leadership/mutex.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
package leadership
22

3-
import "sync"
3+
import (
4+
"github.com/uptrace/bun"
5+
"sync"
6+
)
47

5-
type Mutex[T any] struct {
8+
type Mutex struct {
69
*sync.Mutex
7-
t T
10+
db DBHandle
811
}
912

10-
func (m *Mutex[T]) Lock() (T, func()) {
13+
func (m *Mutex) Exec(fn func(db bun.IDB)) {
1114
m.Mutex.Lock()
12-
return m.t, m.Unlock
15+
defer m.Mutex.Unlock()
16+
17+
fn(m.db)
1318
}
1419

15-
func NewMutex[T any](t T) *Mutex[T] {
16-
return &Mutex[T]{
20+
func NewMutex(db DBHandle) *Mutex {
21+
return &Mutex{
1722
Mutex: &sync.Mutex{},
18-
t: t,
23+
db: db,
1924
}
2025
}

0 commit comments

Comments
 (0)