diff --git a/consul/resolver.go b/consul/resolver.go index e982337..4241c3a 100644 --- a/consul/resolver.go +++ b/consul/resolver.go @@ -17,14 +17,6 @@ import ( type healthFilter int -const ( - healthFilterUndefined healthFilter = iota - healthFilterOnlyHealthy - healthFilterFallbackToUnhealthy -) - -var logger = grpclog.Component("grpcconsulresolver") - type consulResolver struct { cc resolver.ClientConn consulHealth consulHealthEndpoint @@ -36,12 +28,27 @@ type consulResolver struct { cancel context.CancelFunc resolveNow chan struct{} wgStop sync.WaitGroup + + lastReporterState state +} + +type state struct { + addresses []resolver.Address + err error } type consulHealthEndpoint interface { ServiceMultipleTags(service string, tags []string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) } +const ( + healthFilterUndefined healthFilter = iota + healthFilterOnlyHealthy + healthFilterFallbackToUnhealthy +) + +var logger = grpclog.Component("grpcconsulresolver") + // consulCreateHealthClientFn can be overwritten in tests to make // newConsulResolver() return a different consulHealthEndpoint implementation var consulCreateHealthClientFn = func(cfg *consul.Config) (consulHealthEndpoint, error) { @@ -181,7 +188,6 @@ func addressesEqual(a, b []resolver.Address) bool { } func (c *consulResolver) watcher() { - var lastReportedAddrs []resolver.Address var retryTimer *time.Timer var retryCnt int @@ -201,6 +207,8 @@ func (c *consulResolver) watcher() { retryTimer.Stop() } + // query() blocks until a consul internal timeout expired or + // data newer then the passed opts.WaitIndex is available. addrs, opts.WaitIndex, err = c.query(opts) if err != nil { if errors.Is(err, context.Canceled) { @@ -216,7 +224,7 @@ func (c *consulResolver) watcher() { }) retryCnt++ - c.cc.ReportError(err) + c.reportError(err) break } retryCnt = 0 @@ -228,21 +236,7 @@ func (c *consulResolver) watcher() { continue } - sort.Slice(addrs, func(i, j int) bool { - return addrs[i].Addr < addrs[j].Addr - }) - - // query() blocks until a consul internal timeout expired or - // data newer then the passed opts.WaitIndex is available. - // We check if the returned addrs changed to not call - // cc.UpdateState() unnecessary for unchanged addresses. - // If the service does not exist, an empty addrs slice - // is returned. If we never reported any resolved - // addresses (addrs is nil), we have to report an empty - // set of resolved addresses. It informs the - // grpc-balancer that resolution is not in progress - // anymore and grpc calls can failFast. - if addressesEqual(addrs, lastReportedAddrs) { + if !c.reportAddress(addrs) { // If the consul server responds with // the same data than in the last // query in less than 50ms, sleep a @@ -251,23 +245,13 @@ func (c *consulResolver) watcher() { // is buggy but better be safe. :-) if lastWaitIndex == opts.WaitIndex && time.Since(queryStartTime) < 50*time.Millisecond { - logger.Warningf("consul responded too fast with same data and waitIndex (%d) then in previous query, delaying next query", + logger.Warningf("consul responded too fast with same data and waitIndex (%d) than in previous query, delaying next query", opts.WaitIndex) time.Sleep(50 * time.Millisecond) } continue } - - err = c.cc.UpdateState(resolver.State{Addresses: addrs}) - if err != nil && grpclog.V(2) { - // UpdateState errors can be ignored in - // watch-based resolvers, see - // https://github.com/grpc/grpc-go/issues/5048 - // for a detailed explanation. - logger.Infof("ignoring error returned by UpdateState, no other addresses available, error: %s", err) - } - lastReportedAddrs = addrs } select { @@ -283,6 +267,45 @@ func (c *consulResolver) watcher() { } } +// reportAddress reports addrs to [c.cc.UpdateState] if it differs from the +// previous reported addresses or an error has been reported before. +// It returns true if [c.cc.UpdateState] has been called. +func (c *consulResolver) reportAddress(addrs []resolver.Address) bool { + sort.Slice(addrs, func(i, j int) bool { + return addrs[i].Addr < addrs[j].Addr + }) + + if c.lastReporterState.err == nil && addressesEqual(addrs, c.lastReporterState.addresses) { + return false + } + + c.lastReporterState.addresses = addrs + c.lastReporterState.err = nil + + err := c.cc.UpdateState(resolver.State{Addresses: addrs}) + if err != nil && grpclog.V(2) { + // UpdateState errors can be ignored in + // watch-based resolvers, see + // https://github.com/grpc/grpc-go/issues/5048 + // for a detailed explanation. + logger.Infof("ignoring error returned by UpdateState: %s", err) + } + + return true +} + +func (c *consulResolver) reportError(err error) bool { + if c.lastReporterState.err == err { //nolint: errorlint + return false + } + + c.lastReporterState.addresses = nil + c.lastReporterState.err = err + + c.cc.ReportError(err) + return true +} + func (c *consulResolver) ResolveNow(resolver.ResolveNowOptions) { select { case c.resolveNow <- struct{}{}: