diff --git a/client/discovery.go b/client/discovery.go index 7231b73f..800d4ac0 100644 --- a/client/discovery.go +++ b/client/discovery.go @@ -3,6 +3,7 @@ package client import ( "encoding/json" "os" + "sync" ) // ServiceDiscoveryFilter can be used to filter services with customized logics. @@ -25,6 +26,9 @@ type cachedServiceDiscovery struct { cachedFile string cached []*KVPair + chansLock sync.RWMutex + chans map[chan []*KVPair]chan []*KVPair + ServiceDiscovery } @@ -39,6 +43,7 @@ func CacheDiscovery(threshold int, cachedFile string, discovery ServiceDiscovery threshold: threshold, cachedFile: cachedFile, ServiceDiscovery: discovery, + chans: make(map[chan []*KVPair]chan []*KVPair), } } @@ -62,6 +67,49 @@ func (cd *cachedServiceDiscovery) GetServices() []*KVPair { return cd.cached } +func (cd *cachedServiceDiscovery) WatchService() chan []*KVPair { + ch := cd.ServiceDiscovery.WatchService() + + cachedCh := make(chan []*KVPair, 10) + cd.chansLock.Lock() + cd.chans[cachedCh] = ch + cd.chansLock.Unlock() + + go func() { + defer recover() + + for { + kvPairs, ok := <-ch + if !ok { + close(cachedCh) + return + } + + n := len(kvPairs) + if n > len(cd.cached) { + cd.cached = kvPairs + cd.storeCached(kvPairs) + } + + cachedCh <- kvPairs + } + }() + + return cachedCh +} + +func (cd *cachedServiceDiscovery) RemoveWatcher(ch chan []*KVPair) { + cd.chansLock.Lock() + origin := cd.chans[ch] + delete(cd.chans, ch) + cd.chansLock.Unlock() + + if origin != nil { + cd.ServiceDiscovery.RemoveWatcher(origin) + } + +} + func (cd *cachedServiceDiscovery) storeCached(kvPairs []*KVPair) { data, _ := json.Marshal(kvPairs) os.WriteFile(cd.cachedFile, data, 0644)