Skip to content

Commit

Permalink
add CacheDiscovery
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Oct 31, 2023
1 parent 609a836 commit 39c51c0
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions client/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"encoding/json"
"os"
"sync"
)

// ServiceDiscoveryFilter can be used to filter services with customized logics.
Expand All @@ -25,6 +26,9 @@ type cachedServiceDiscovery struct {
cachedFile string
cached []*KVPair

chansLock sync.RWMutex
chans map[chan []*KVPair]chan []*KVPair

ServiceDiscovery
}

Expand All @@ -39,6 +43,7 @@ func CacheDiscovery(threshold int, cachedFile string, discovery ServiceDiscovery
threshold: threshold,
cachedFile: cachedFile,
ServiceDiscovery: discovery,
chans: make(map[chan []*KVPair]chan []*KVPair),
}
}

Expand All @@ -62,6 +67,49 @@ func (cd *cachedServiceDiscovery) GetServices() []*KVPair {
return cd.cached
}

func (cd *cachedServiceDiscovery) WatchService() chan []*KVPair {
ch := cd.ServiceDiscovery.WatchService()

cachedCh := make(chan []*KVPair, 10)
cd.chansLock.Lock()
cd.chans[cachedCh] = ch
cd.chansLock.Unlock()

go func() {
defer recover()

for {
kvPairs, ok := <-ch
if !ok {
close(cachedCh)
return
}

n := len(kvPairs)
if n > len(cd.cached) {
cd.cached = kvPairs
cd.storeCached(kvPairs)
}

cachedCh <- kvPairs
}
}()

return cachedCh
}

func (cd *cachedServiceDiscovery) RemoveWatcher(ch chan []*KVPair) {
cd.chansLock.Lock()
origin := cd.chans[ch]
delete(cd.chans, ch)
cd.chansLock.Unlock()

if origin != nil {
cd.ServiceDiscovery.RemoveWatcher(origin)
}

}

func (cd *cachedServiceDiscovery) storeCached(kvPairs []*KVPair) {
data, _ := json.Marshal(kvPairs)
os.WriteFile(cd.cachedFile, data, 0644)
Expand Down

0 comments on commit 39c51c0

Please sign in to comment.