Skip to content

Commit

Permalink
prevent calling ReportError with the same error in a row
Browse files Browse the repository at this point in the history
The resolver already prevents that UpdateState is called for the same
address multiples times in a row, do the same when reporting errors.
Only call ReportError() if the same error hasn't been reported in the
last call and UpdateState hasn't been called last.

This prevents that we unnecessary report the same state multiple times
and trigger the same events for the connection.

The code is refactored to store the last reported state in the
consulResolver struct and reporting addresses and errors is moved to
separate methods.
  • Loading branch information
fho committed Oct 21, 2024
1 parent 0091cec commit bb65ae2
Showing 1 changed file with 59 additions and 36 deletions.
95 changes: 59 additions & 36 deletions consul/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ import (

type healthFilter int

const (
healthFilterUndefined healthFilter = iota
healthFilterOnlyHealthy
healthFilterFallbackToUnhealthy
)

var logger = grpclog.Component("grpcconsulresolver")

type consulResolver struct {
cc resolver.ClientConn
consulHealth consulHealthEndpoint
Expand All @@ -36,12 +28,27 @@ type consulResolver struct {
cancel context.CancelFunc
resolveNow chan struct{}
wgStop sync.WaitGroup

lastReporterState state
}

type state struct {
addresses []resolver.Address
err error
}

type consulHealthEndpoint interface {
ServiceMultipleTags(service string, tags []string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
}

const (
healthFilterUndefined healthFilter = iota
healthFilterOnlyHealthy
healthFilterFallbackToUnhealthy
)

var logger = grpclog.Component("grpcconsulresolver")

// consulCreateHealthClientFn can be overwritten in tests to make
// newConsulResolver() return a different consulHealthEndpoint implementation
var consulCreateHealthClientFn = func(cfg *consul.Config) (consulHealthEndpoint, error) {
Expand Down Expand Up @@ -181,7 +188,6 @@ func addressesEqual(a, b []resolver.Address) bool {
}

func (c *consulResolver) watcher() {
var lastReportedAddrs []resolver.Address
var retryTimer *time.Timer
var retryCnt int

Expand All @@ -201,6 +207,8 @@ func (c *consulResolver) watcher() {
retryTimer.Stop()
}

// query() blocks until a consul internal timeout expired or
// data newer then the passed opts.WaitIndex is available.
addrs, opts.WaitIndex, err = c.query(opts)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -216,7 +224,7 @@ func (c *consulResolver) watcher() {
})
retryCnt++

c.cc.ReportError(err)
c.reportError(err)
break
}
retryCnt = 0
Expand All @@ -228,21 +236,7 @@ func (c *consulResolver) watcher() {
continue
}

sort.Slice(addrs, func(i, j int) bool {
return addrs[i].Addr < addrs[j].Addr
})

// query() blocks until a consul internal timeout expired or
// data newer then the passed opts.WaitIndex is available.
// We check if the returned addrs changed to not call
// cc.UpdateState() unnecessary for unchanged addresses.
// If the service does not exist, an empty addrs slice
// is returned. If we never reported any resolved
// addresses (addrs is nil), we have to report an empty
// set of resolved addresses. It informs the
// grpc-balancer that resolution is not in progress
// anymore and grpc calls can failFast.
if addressesEqual(addrs, lastReportedAddrs) {
if !c.reportAddress(addrs) {
// If the consul server responds with
// the same data than in the last
// query in less than 50ms, sleep a
Expand All @@ -251,23 +245,13 @@ func (c *consulResolver) watcher() {
// is buggy but better be safe. :-)
if lastWaitIndex == opts.WaitIndex &&
time.Since(queryStartTime) < 50*time.Millisecond {
logger.Warningf("consul responded too fast with same data and waitIndex (%d) then in previous query, delaying next query",
logger.Warningf("consul responded too fast with same data and waitIndex (%d) than in previous query, delaying next query",
opts.WaitIndex)
time.Sleep(50 * time.Millisecond)
}

continue
}

err = c.cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil && grpclog.V(2) {
// UpdateState errors can be ignored in
// watch-based resolvers, see
// https://github.com/grpc/grpc-go/issues/5048
// for a detailed explanation.
logger.Infof("ignoring error returned by UpdateState, no other addresses available, error: %s", err)
}
lastReportedAddrs = addrs
}

select {
Expand All @@ -283,6 +267,45 @@ func (c *consulResolver) watcher() {
}
}

// reportAddress reports addrs to [c.cc.UpdateState] if it differs from the
// previous reported addresses or an error has been reported before.
// It returns true if [c.cc.UpdateState] has been called.
func (c *consulResolver) reportAddress(addrs []resolver.Address) bool {
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].Addr < addrs[j].Addr
})

if c.lastReporterState.err == nil && addressesEqual(addrs, c.lastReporterState.addresses) {
return false
}

c.lastReporterState.addresses = addrs
c.lastReporterState.err = nil

err := c.cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil && grpclog.V(2) {
// UpdateState errors can be ignored in
// watch-based resolvers, see
// https://github.com/grpc/grpc-go/issues/5048
// for a detailed explanation.
logger.Infof("ignoring error returned by UpdateState: %s", err)
}

return true
}

func (c *consulResolver) reportError(err error) bool {
if c.lastReporterState.err == err { //nolint: errorlint
return false
}

c.lastReporterState.addresses = nil
c.lastReporterState.err = err

c.cc.ReportError(err)
return true
}

func (c *consulResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case c.resolveNow <- struct{}{}:
Expand Down

0 comments on commit bb65ae2

Please sign in to comment.