Skip to content

Commit

Permalink
test: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Jan 30, 2025
1 parent 935699b commit 294efc4
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 47 deletions.
55 changes: 23 additions & 32 deletions internal/leadership/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,35 @@ import (
"sync"
)

type listener struct {
channel chan Leadership
type listener[T any] struct {
channel chan T
}

type Broadcaster struct {
type Broadcaster[T any] struct {
mu sync.Mutex
t *Leadership
t *T

inner []listener
outer chan Leadership
inner []listener[T]
outer chan T
}

func (h *Broadcaster) Actual() Leadership {
func (h *Broadcaster[T]) Actual() T {
h.mu.Lock()
defer h.mu.Unlock()

if h.t == nil {
return Leadership{}
var t T
return t
}
return *h.t
}

func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan Leadership, 1)
l := listener{
newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
Expand All @@ -43,23 +44,20 @@ func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
h.mu.Lock()
defer h.mu.Unlock()

index := -1
for i, listener := range h.inner {
for index, listener := range h.inner {
if listener == l {
index = i
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
break
}
}

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
}
}

func (h *Broadcaster) Broadcast(t Leadership) {
func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -70,7 +68,7 @@ func (h *Broadcaster) Broadcast(t Leadership) {
}
}

func (h *Broadcaster) Close() {
func (h *Broadcaster[T]) Close() {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -79,15 +77,8 @@ func (h *Broadcaster) Close() {
}
}

func (h *Broadcaster) CountListeners() int {
h.mu.Lock()
defer h.mu.Unlock()

return len(h.inner)
}

func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
func NewBroadcaster[T any]() *Broadcaster[T] {
return &Broadcaster[T]{
outer: make(chan T),
}
}
73 changes: 73 additions & 0 deletions internal/leadership/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package leadership

import (
"testing"
"time"
)

func TestBroadcaster(t *testing.T) {
t.Parallel()

broadcaster := NewBroadcaster[struct{}]()
t.Cleanup(broadcaster.Close)

const nbSubscriptions = 5

subscriptions := make([]<-chan struct{}, nbSubscriptions)
releases := make([]func(), nbSubscriptions)

for i := 0; i < nbSubscriptions; i++ {
subscriptions[i], releases[i] = broadcaster.Subscribe()
}

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[2]()
subscriptions = append(subscriptions[:2], subscriptions[3:]...)
releases = append(releases[:2], releases[3:]...)

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[0]()
subscriptions = subscriptions[1:]
releases = releases[1:]

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[2]()
subscriptions = subscriptions[:2]

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import (
"sync"
)

type Mutex struct {
type DatabaseHandle struct {
*sync.Mutex
db DBHandle
}

func (m *Mutex) Exec(fn func(db bun.IDB)) {
func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
m.Mutex.Lock()
defer m.Mutex.Unlock()

fn(m.db)
}

func NewMutex(db DBHandle) *Mutex {
return &Mutex{
func NewDatabaseHandle(db DBHandle) *DatabaseHandle {
return &DatabaseHandle{
Mutex: &sync.Mutex{},
db: db,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/leadership/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package leadership

type Leadership struct {
Acquired bool
DB *Mutex
DB *DatabaseHandle
}
13 changes: 7 additions & 6 deletions internal/leadership/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (

type Manager struct {
locker Locker
changes *Broadcaster
changes *Broadcaster[Leadership]
logger logging.Logger
retryPeriod time.Duration
stopChannel chan chan struct{}
}

func (m *Manager) Run(ctx context.Context) {
var (
dbMutex *Mutex
dbMutex *DatabaseHandle
nextRetry = time.After(time.Duration(0))
nextPing <-chan time.Time
)
Expand Down Expand Up @@ -46,7 +46,7 @@ func (m *Manager) Run(ctx context.Context) {
continue
}

dbMutex = NewMutex(db)
dbMutex = NewDatabaseHandle(db)

m.changes.Broadcast(Leadership{
DB: dbMutex,
Expand All @@ -67,7 +67,7 @@ func (m *Manager) Run(ctx context.Context) {
ColumnExpr("1 as v").
Count(ctx)
if err != nil {
m.logger.Error("error pinging db", err)
m.logger.Errorf("error pinging db: %s", err)
_ = dbMutex.db.Close()
dbMutex = nil

Expand All @@ -87,6 +87,7 @@ func (m *Manager) Stop(ctx context.Context) error {
select {
// if already closed
case <-m.stopChannel:
m.changes.Close()
return nil
default:
ch := make(chan struct{})
Expand All @@ -100,15 +101,15 @@ func (m *Manager) Stop(ctx context.Context) error {
}
}

func (m *Manager) GetSignal() *Broadcaster {
func (m *Manager) GetBroadcaster() *Broadcaster[Leadership] {
return m.changes
}

func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manager {
l := &Manager{
locker: locker,
logger: logger,
changes: NewSignal(),
changes: NewBroadcaster[Leadership](),
retryPeriod: 2 * time.Second,
stopChannel: make(chan chan struct{}),
}
Expand Down
41 changes: 37 additions & 4 deletions internal/leadership/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestLeaderShip(t *testing.T) {
selectedLeader := -1
require.Eventually(t, func() bool {
for index, manager := range instances {
actual := manager.GetSignal().Actual()
actual := manager.GetBroadcaster().Actual()
if actual.Acquired {
selectedLeader = index
return true
Expand All @@ -46,7 +46,7 @@ func TestLeaderShip(t *testing.T) {
}, 2*time.Second, 10*time.Millisecond)
leaderCount := 0
for _, manager := range instances {
if manager.GetSignal().Actual().Acquired {
if manager.GetBroadcaster().Actual().Acquired {
leaderCount++
}
}
Expand All @@ -55,7 +55,7 @@ func TestLeaderShip(t *testing.T) {

// ensure the provided db connection is still functional
instances[selectedLeader].
GetSignal().
GetBroadcaster().
Actual().DB.
Exec(func(db bun.IDB) {
require.NoError(t, db.
Expand All @@ -66,11 +66,44 @@ func TestLeaderShip(t *testing.T) {
)
})

// Stop the instance to trigger a new leader election
require.NoError(t, instances[selectedLeader].Stop(ctx))

require.Eventually(t, func() bool {
for index, manager := range instances {
if manager.GetSignal().Actual().Acquired {
if manager.GetBroadcaster().Actual().Acquired {
selectedLeader = index
return true
}
}
return false
}, 2*time.Second, 10*time.Millisecond)

broadcaster := instances[selectedLeader].GetBroadcaster()
subscription, release := broadcaster.Subscribe()
t.Cleanup(release)

// We will receive the leadership on the subscription
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for leadership acquirement")
}

// Close the database connection of the actual leader to check the manager is able to detect the connection loss
require.NoError(t, instances[selectedLeader].GetBroadcaster().Actual().DB.db.Close())

select {
case leadership := <-subscription:
require.Equal(t, Leadership{}, leadership)
case <-time.After(time.Second):
t.Fatal("timeout waiting for leadership loss")
}
release()

require.Eventually(t, func() bool {
for index, manager := range instances {
if manager.GetBroadcaster().Actual().Acquired {
selectedLeader = index
return true
}
Expand Down

0 comments on commit 294efc4

Please sign in to comment.