Skip to content

Commit

Permalink
fix update notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandreLamarre committed Oct 24, 2023
1 parent f2096a8 commit 67948bb
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
30 changes: 21 additions & 9 deletions pkg/util/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type updateNotifier[T Clonable[T]] struct {
channelsMu *sync.Mutex
startCond *sync.Cond

gcQueue chan chan []T

latest []T
latestMu sync.Mutex
}
Expand All @@ -46,6 +48,7 @@ func NewUpdateNotifier[T Clonable[T]](finder Finder[T]) UpdateNotifier[T] {
channelsMu: mu,
startCond: sync.NewCond(mu),
latest: []T{},
gcQueue: make(chan chan []T, 128),
}
}

Expand All @@ -61,19 +64,26 @@ func (u *updateNotifier[T]) NotifyC(ctx context.Context) <-chan []T {
}
go func() {
<-ctx.Done()
u.channelsMu.Lock()
defer u.channelsMu.Unlock()
// Remove the channel from the list
for i, c := range u.updateChannels {
if c == updateC {
u.updateChannels = slices.Delete(u.updateChannels, i, i+1)
break
}
}
u.gcQueue <- updateC
}()
return updateC
}

func (u *updateNotifier[T]) gc() {
u.channelsMu.Lock()
defer u.channelsMu.Unlock()
for {
select {
case toDelete := <-u.gcQueue:
u.updateChannels = slices.DeleteFunc(u.updateChannels, func(uc chan []T) bool {
return toDelete == uc
})
default:
return
}
}
}

func (u *updateNotifier[T]) Refresh(ctx context.Context) {
u.channelsMu.Lock()
for len(u.updateChannels) == 0 {
Expand Down Expand Up @@ -104,6 +114,8 @@ func (u *updateNotifier[T]) Refresh(ctx context.Context) {
return
}
u.latest = groups

u.gc()
u.channelsMu.Lock()
cloned := CloneList(u.latest)
for _, c := range u.updateChannels {
Expand Down
11 changes: 5 additions & 6 deletions pkg/util/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var _ = Describe("Update Notifier", Label("unit"), func() {

groups := atomic.Value[[]rules.RuleGroup]{}
groups.Store(testGroups1)
un := notifier.NewUpdateNotifier(mock_rules.NewTestFinder(ctrl, func() []rules.RuleGroup {
updateNotifier := notifier.NewUpdateNotifier(mock_rules.NewTestFinder(ctrl, func() []rules.RuleGroup {
return notifier.CloneList(groups.Load())
}))

Expand All @@ -120,22 +120,21 @@ var _ = Describe("Update Notifier", Label("unit"), func() {

channels := make([]<-chan []rules.RuleGroup, count)
for i := 0; i < count; i++ {
channels[i] = un.NotifyC(contexts[i].ctx)
channels[i] = updateNotifier.NotifyC(contexts[i].ctx)
}

go un.Refresh(context.Background())
go updateNotifier.Refresh(context.Background())

for i := 0; i < count; i++ {
Eventually(channels[i]).Should(Receive(Equal(testGroups1)))
}

groups.Store(testGroups2)

groups.Store(testGroups2) // cancel the channels
for i := 0; i < count; i++ {
contexts[i].ca()
}

go un.Refresh(context.Background())
go updateNotifier.Refresh(context.Background())

for i := 0; i < count; i++ {
Expect(channels[i]).NotTo(Receive())
Expand Down

0 comments on commit 67948bb

Please sign in to comment.