From 0091cece856683597806a97e5329c4702908153d Mon Sep 17 00:00:00 2001 From: Fabian Holler Date: Fri, 18 Oct 2024 16:55:44 +0200 Subject: [PATCH] retry resolving on errors Some grpc-go load balancing policies, including round_robin, do not take care about retrying resolving after an error is reported[^1]. This causes that when querying consul fails the grpc connection can hang in a state without addresses until the connection idle timeout expires, despite consul became reachable. To prevent it, retry periodically querying consul when it fails. This is done by creating a timer that calls ResolveNow() after it expires. The backoff implementation from commit f1f1a15 is recovered and modified to randomize subtracting or adding the jitter from the delay, use smaller pauses and returning the default implementation from a constructor instead of defining it as package variable. [^1]: https://github.com/grpc/grpc-go/issues/7729 --- consul/backoff.go | 47 +++++++++++++++++++ consul/backoff_test.go | 36 ++++++++++++++ consul/resolver.go | 85 +++++++++++++++++++++++----------- consul/resolver_test.go | 46 ++++++++++++++++++ internal/mocks/consulhealth.go | 16 +++++-- 5 files changed, 200 insertions(+), 30 deletions(-) create mode 100644 consul/backoff.go create mode 100644 consul/backoff_test.go diff --git a/consul/backoff.go b/consul/backoff.go new file mode 100644 index 0000000..ca6ac99 --- /dev/null +++ b/consul/backoff.go @@ -0,0 +1,47 @@ +package consul + +import ( + "math/rand" + "time" +) + +type backoff struct { + intervals []time.Duration + jitterPct int + jitterSrc *rand.Rand +} + +func defaultBackoff() *backoff { + return &backoff{ + intervals: []time.Duration{ + 10 * time.Millisecond, + 50 * time.Millisecond, + 250 * time.Millisecond, + 500 * time.Millisecond, + time.Second, + 2 * time.Second, + 3 * time.Second, + 4 * time.Second, + 5 * time.Second, + }, + + jitterPct: 10, + jitterSrc: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (b *backoff) Backoff(retry int) time.Duration { + idx := retry + + if idx < 0 || idx > len(b.intervals)-1 { + idx = len(b.intervals) - 1 + } + + d := b.intervals[idx] + + if b.jitterSrc.Intn(2) == 0 { + return d + time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct))))) + } + + return d - time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct))))) +} diff --git a/consul/backoff_test.go b/consul/backoff_test.go new file mode 100644 index 0000000..6c99e01 --- /dev/null +++ b/consul/backoff_test.go @@ -0,0 +1,36 @@ +package consul + +import ( + "fmt" + "testing" + "time" +) + +func minBackoff(t time.Duration, jitterPCT int) time.Duration { + return t - t*time.Duration(jitterPCT)/100 +} + +func maxBackoff(t time.Duration, jitterPCT int) time.Duration { + return t + t*time.Duration(jitterPCT)/100 +} + +func TestBackoff_IntervalIdxBounds(t *testing.T) { + b := defaultBackoff() + + for i := range []int{-100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 99999999} { + t.Run(fmt.Sprintf("retry-%d", i), func(t *testing.T) { + d := b.Backoff(-10) + + minB := minBackoff(b.intervals[len(b.intervals)-1], b.jitterPct) + maxB := maxBackoff(b.intervals[len(b.intervals)-1], b.jitterPct) + + if d < minB { + t.Errorf("backoff is %s, expecting >%s", d, minB) + } + + if d > maxB { + t.Errorf("backoff is %s, expecting <%s", d, minB) + } + }) + } +} diff --git a/consul/resolver.go b/consul/resolver.go index 1382fc9..e982337 100644 --- a/consul/resolver.go +++ b/consul/resolver.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "sort" + "strings" "sync" "time" @@ -25,15 +26,16 @@ const ( var logger = grpclog.Component("grpcconsulresolver") type consulResolver struct { - cc resolver.ClientConn - consulHealth consulHealthEndpoint - service string - tags []string - healthFilter healthFilter - ctx context.Context - cancel context.CancelFunc - resolveNow chan struct{} - wgStop sync.WaitGroup + cc resolver.ClientConn + consulHealth consulHealthEndpoint + service string + tags []string + healthFilter healthFilter + backoffCounter *backoff + ctx context.Context + cancel context.CancelFunc + resolveNow chan struct{} + wgStop sync.WaitGroup } type consulHealthEndpoint interface { @@ -73,14 +75,15 @@ func newConsulResolver( ctx, cancel := context.WithCancel(context.Background()) return &consulResolver{ - cc: cc, - consulHealth: health, - service: consulService, - tags: tags, - healthFilter: healthFilter, - ctx: ctx, - cancel: cancel, - resolveNow: make(chan struct{}, 1), + cc: cc, + consulHealth: health, + service: consulService, + tags: tags, + healthFilter: healthFilter, + backoffCounter: defaultBackoff(), + ctx: ctx, + cancel: cancel, + resolveNow: make(chan struct{}, 1), }, nil } @@ -90,11 +93,20 @@ func (c *consulResolver) start() { } func (c *consulResolver) query(opts *consul.QueryOptions) ([]resolver.Address, uint64, error) { + if logger.V(2) { + var tagsDescr, healthyDescr string + if len(c.tags) > 0 { + tagsDescr = "with tags: " + strings.Join(c.tags, ", ") + } + if c.healthFilter == healthFilterOnlyHealthy { + healthyDescr = "healthy " + } + + logger.Infof("querying consul for "+healthyDescr+"addresses of service '%s'"+tagsDescr, c.service) + } + entries, meta, err := c.consulHealth.ServiceMultipleTags(c.service, c.tags, c.healthFilter == healthFilterOnlyHealthy, opts) if err != nil { - logger.Infof("resolving service name '%s' via consul failed: %v\n", - c.service, err) - return nil, 0, err } @@ -170,6 +182,8 @@ func addressesEqual(a, b []resolver.Address) bool { func (c *consulResolver) watcher() { var lastReportedAddrs []resolver.Address + var retryTimer *time.Timer + var retryCnt int opts := (&consul.QueryOptions{}).WithContext(c.ctx) @@ -181,17 +195,31 @@ func (c *consulResolver) watcher() { var err error lastWaitIndex := opts.WaitIndex - queryStartTime := time.Now() + + if retryTimer != nil { + retryTimer.Stop() + } + addrs, opts.WaitIndex, err = c.query(opts) if err != nil { if errors.Is(err, context.Canceled) { return } + retryIn := c.backoffCounter.Backoff(retryCnt) + logger.Infof("resolving service name '%s' via consul failed, retrying in %s: %s", + c.service, retryIn, err) + + retryTimer = time.AfterFunc(c.backoffCounter.Backoff(retryCnt), func() { + c.ResolveNow(resolver.ResolveNowOptions{}) + }) + retryCnt++ + c.cc.ReportError(err) break } + retryCnt = 0 if opts.WaitIndex < lastWaitIndex { logger.Infof("consul responded with a smaller waitIndex (%d) then the previous one (%d), restarting blocking query loop", @@ -211,13 +239,14 @@ func (c *consulResolver) watcher() { // If the service does not exist, an empty addrs slice // is returned. If we never reported any resolved // addresses (addrs is nil), we have to report an empty - // set of resolved addresses. It informs the grpc-balancer that resolution is not - // in progress anymore and grpc calls can failFast. + // set of resolved addresses. It informs the + // grpc-balancer that resolution is not in progress + // anymore and grpc calls can failFast. if addressesEqual(addrs, lastReportedAddrs) { // If the consul server responds with - // the same data then in the last - // query in less then 50ms, we sleep a - // bit to prevent querying in a tight loop + // the same data than in the last + // query in less than 50ms, sleep a + // bit to prevent querying in a tight loop. // This should only happen if the consul server // is buggy but better be safe. :-) if lastWaitIndex == opts.WaitIndex && @@ -243,6 +272,10 @@ func (c *consulResolver) watcher() { select { case <-c.ctx.Done(): + if retryTimer != nil { + retryTimer.Stop() + } + return case <-c.resolveNow: diff --git a/consul/resolver_test.go b/consul/resolver_test.go index 6b20953..e7e8dcd 100644 --- a/consul/resolver_test.go +++ b/consul/resolver_test.go @@ -564,3 +564,49 @@ func TestQueryResultsAreSorted(t *testing.T) { r.Close() } + +func TestRetryOnError(t *testing.T) { + cc := mocks.NewClientConn() + health := mocks.NewConsulHealthClient() + cleanup := replaceCreateHealthClientFn( + func(*consul.Config) (consulHealthEndpoint, error) { + return health, nil + }, + ) + t.Cleanup(cleanup) + + health.SetRespError(errors.New("ERROR")) + + r, err := NewBuilder().Build(resolver.Target{URL: url.URL{Path: "test"}}, cc, resolver.BuildOptions{}) + if err != nil { + t.Fatal("Build() failed:", err.Error()) + } + + t.Cleanup(r.Close) + + for health.ResolveCount() < 3 { + time.Sleep(50 * time.Millisecond) + } + + t.Logf("%d retries were done", health.ResolveCount()) + health.SetRespError(nil) + + health.SetRespEntries([]*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "227.0.0.1", + Port: 1, + }, + }, + { + Service: &consul.AgentService{ + Address: "127.0.0.1", + Port: 1, + }, + }, + }) + + for len(cc.Addrs()) != 2 { + time.Sleep(50 * time.Millisecond) + } +} diff --git a/internal/mocks/consulhealth.go b/internal/mocks/consulhealth.go index 541643a..384421c 100644 --- a/internal/mocks/consulhealth.go +++ b/internal/mocks/consulhealth.go @@ -7,10 +7,11 @@ import ( ) type ConsulHealthClient struct { - mutex sync.Mutex - entries []*consul.ServiceEntry - queryMeta consul.QueryMeta - err error + mutex sync.Mutex + entries []*consul.ServiceEntry + queryMeta consul.QueryMeta + err error + resolveCnt int } func NewConsulHealthClient() *ConsulHealthClient { @@ -46,6 +47,7 @@ func (c *ConsulHealthClient) SetRespError(err error) { func (c *ConsulHealthClient) ServiceMultipleTags(_ string, _ []string, _ bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { c.mutex.Lock() defer c.mutex.Unlock() + c.resolveCnt++ if q.Context().Err() != nil { return nil, nil, q.Context().Err() @@ -53,3 +55,9 @@ func (c *ConsulHealthClient) ServiceMultipleTags(_ string, _ []string, _ bool, q return c.entries, &c.queryMeta, c.err } + +func (c *ConsulHealthClient) ResolveCount() int { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.resolveCnt +}