diff --git a/consul/backoff.go b/consul/backoff.go new file mode 100644 index 0000000..ca6ac99 --- /dev/null +++ b/consul/backoff.go @@ -0,0 +1,47 @@ +package consul + +import ( + "math/rand" + "time" +) + +type backoff struct { + intervals []time.Duration + jitterPct int + jitterSrc *rand.Rand +} + +func defaultBackoff() *backoff { + return &backoff{ + intervals: []time.Duration{ + 10 * time.Millisecond, + 50 * time.Millisecond, + 250 * time.Millisecond, + 500 * time.Millisecond, + time.Second, + 2 * time.Second, + 3 * time.Second, + 4 * time.Second, + 5 * time.Second, + }, + + jitterPct: 10, + jitterSrc: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (b *backoff) Backoff(retry int) time.Duration { + idx := retry + + if idx < 0 || idx > len(b.intervals)-1 { + idx = len(b.intervals) - 1 + } + + d := b.intervals[idx] + + if b.jitterSrc.Intn(2) == 0 { + return d + time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct))))) + } + + return d - time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct))))) +} diff --git a/consul/backoff_test.go b/consul/backoff_test.go new file mode 100644 index 0000000..6c99e01 --- /dev/null +++ b/consul/backoff_test.go @@ -0,0 +1,36 @@ +package consul + +import ( + "fmt" + "testing" + "time" +) + +func minBackoff(t time.Duration, jitterPCT int) time.Duration { + return t - t*time.Duration(jitterPCT)/100 +} + +func maxBackoff(t time.Duration, jitterPCT int) time.Duration { + return t + t*time.Duration(jitterPCT)/100 +} + +func TestBackoff_IntervalIdxBounds(t *testing.T) { + b := defaultBackoff() + + for i := range []int{-100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 99999999} { + t.Run(fmt.Sprintf("retry-%d", i), func(t *testing.T) { + d := b.Backoff(-10) + + minB := minBackoff(b.intervals[len(b.intervals)-1], b.jitterPct) + maxB := maxBackoff(b.intervals[len(b.intervals)-1], b.jitterPct) + + if d < minB { + t.Errorf("backoff is %s, expecting >%s", d, minB) + } + + if d > maxB { + t.Errorf("backoff is %s, expecting <%s", d, minB) + } + }) + } +} diff --git a/consul/resolver.go b/consul/resolver.go index 1382fc9..e982337 100644 --- a/consul/resolver.go +++ b/consul/resolver.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "sort" + "strings" "sync" "time" @@ -25,15 +26,16 @@ const ( var logger = grpclog.Component("grpcconsulresolver") type consulResolver struct { - cc resolver.ClientConn - consulHealth consulHealthEndpoint - service string - tags []string - healthFilter healthFilter - ctx context.Context - cancel context.CancelFunc - resolveNow chan struct{} - wgStop sync.WaitGroup + cc resolver.ClientConn + consulHealth consulHealthEndpoint + service string + tags []string + healthFilter healthFilter + backoffCounter *backoff + ctx context.Context + cancel context.CancelFunc + resolveNow chan struct{} + wgStop sync.WaitGroup } type consulHealthEndpoint interface { @@ -73,14 +75,15 @@ func newConsulResolver( ctx, cancel := context.WithCancel(context.Background()) return &consulResolver{ - cc: cc, - consulHealth: health, - service: consulService, - tags: tags, - healthFilter: healthFilter, - ctx: ctx, - cancel: cancel, - resolveNow: make(chan struct{}, 1), + cc: cc, + consulHealth: health, + service: consulService, + tags: tags, + healthFilter: healthFilter, + backoffCounter: defaultBackoff(), + ctx: ctx, + cancel: cancel, + resolveNow: make(chan struct{}, 1), }, nil } @@ -90,11 +93,20 @@ func (c *consulResolver) start() { } func (c *consulResolver) query(opts *consul.QueryOptions) ([]resolver.Address, uint64, error) { + if logger.V(2) { + var tagsDescr, healthyDescr string + if len(c.tags) > 0 { + tagsDescr = "with tags: " + strings.Join(c.tags, ", ") + } + if c.healthFilter == healthFilterOnlyHealthy { + healthyDescr = "healthy " + } + + logger.Infof("querying consul for "+healthyDescr+"addresses of service '%s'"+tagsDescr, c.service) + } + entries, meta, err := c.consulHealth.ServiceMultipleTags(c.service, c.tags, c.healthFilter == healthFilterOnlyHealthy, opts) if err != nil { - logger.Infof("resolving service name '%s' via consul failed: %v\n", - c.service, err) - return nil, 0, err } @@ -170,6 +182,8 @@ func addressesEqual(a, b []resolver.Address) bool { func (c *consulResolver) watcher() { var lastReportedAddrs []resolver.Address + var retryTimer *time.Timer + var retryCnt int opts := (&consul.QueryOptions{}).WithContext(c.ctx) @@ -181,17 +195,31 @@ func (c *consulResolver) watcher() { var err error lastWaitIndex := opts.WaitIndex - queryStartTime := time.Now() + + if retryTimer != nil { + retryTimer.Stop() + } + addrs, opts.WaitIndex, err = c.query(opts) if err != nil { if errors.Is(err, context.Canceled) { return } + retryIn := c.backoffCounter.Backoff(retryCnt) + logger.Infof("resolving service name '%s' via consul failed, retrying in %s: %s", + c.service, retryIn, err) + + retryTimer = time.AfterFunc(c.backoffCounter.Backoff(retryCnt), func() { + c.ResolveNow(resolver.ResolveNowOptions{}) + }) + retryCnt++ + c.cc.ReportError(err) break } + retryCnt = 0 if opts.WaitIndex < lastWaitIndex { logger.Infof("consul responded with a smaller waitIndex (%d) then the previous one (%d), restarting blocking query loop", @@ -211,13 +239,14 @@ func (c *consulResolver) watcher() { // If the service does not exist, an empty addrs slice // is returned. If we never reported any resolved // addresses (addrs is nil), we have to report an empty - // set of resolved addresses. It informs the grpc-balancer that resolution is not - // in progress anymore and grpc calls can failFast. + // set of resolved addresses. It informs the + // grpc-balancer that resolution is not in progress + // anymore and grpc calls can failFast. if addressesEqual(addrs, lastReportedAddrs) { // If the consul server responds with - // the same data then in the last - // query in less then 50ms, we sleep a - // bit to prevent querying in a tight loop + // the same data than in the last + // query in less than 50ms, sleep a + // bit to prevent querying in a tight loop. // This should only happen if the consul server // is buggy but better be safe. :-) if lastWaitIndex == opts.WaitIndex && @@ -243,6 +272,10 @@ func (c *consulResolver) watcher() { select { case <-c.ctx.Done(): + if retryTimer != nil { + retryTimer.Stop() + } + return case <-c.resolveNow: diff --git a/consul/resolver_test.go b/consul/resolver_test.go index 6b20953..e7e8dcd 100644 --- a/consul/resolver_test.go +++ b/consul/resolver_test.go @@ -564,3 +564,49 @@ func TestQueryResultsAreSorted(t *testing.T) { r.Close() } + +func TestRetryOnError(t *testing.T) { + cc := mocks.NewClientConn() + health := mocks.NewConsulHealthClient() + cleanup := replaceCreateHealthClientFn( + func(*consul.Config) (consulHealthEndpoint, error) { + return health, nil + }, + ) + t.Cleanup(cleanup) + + health.SetRespError(errors.New("ERROR")) + + r, err := NewBuilder().Build(resolver.Target{URL: url.URL{Path: "test"}}, cc, resolver.BuildOptions{}) + if err != nil { + t.Fatal("Build() failed:", err.Error()) + } + + t.Cleanup(r.Close) + + for health.ResolveCount() < 3 { + time.Sleep(50 * time.Millisecond) + } + + t.Logf("%d retries were done", health.ResolveCount()) + health.SetRespError(nil) + + health.SetRespEntries([]*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "227.0.0.1", + Port: 1, + }, + }, + { + Service: &consul.AgentService{ + Address: "127.0.0.1", + Port: 1, + }, + }, + }) + + for len(cc.Addrs()) != 2 { + time.Sleep(50 * time.Millisecond) + } +} diff --git a/internal/mocks/consulhealth.go b/internal/mocks/consulhealth.go index 541643a..384421c 100644 --- a/internal/mocks/consulhealth.go +++ b/internal/mocks/consulhealth.go @@ -7,10 +7,11 @@ import ( ) type ConsulHealthClient struct { - mutex sync.Mutex - entries []*consul.ServiceEntry - queryMeta consul.QueryMeta - err error + mutex sync.Mutex + entries []*consul.ServiceEntry + queryMeta consul.QueryMeta + err error + resolveCnt int } func NewConsulHealthClient() *ConsulHealthClient { @@ -46,6 +47,7 @@ func (c *ConsulHealthClient) SetRespError(err error) { func (c *ConsulHealthClient) ServiceMultipleTags(_ string, _ []string, _ bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { c.mutex.Lock() defer c.mutex.Unlock() + c.resolveCnt++ if q.Context().Err() != nil { return nil, nil, q.Context().Err() @@ -53,3 +55,9 @@ func (c *ConsulHealthClient) ServiceMultipleTags(_ string, _ []string, _ bool, q return c.entries, &c.queryMeta, c.err } + +func (c *ConsulHealthClient) ResolveCount() int { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.resolveCnt +}