Skip to content

Commit

Permalink
retry resolving on errors
Browse files Browse the repository at this point in the history
Some grpc-go load balancing policies, including round_robin, do not take
care about retrying resolving after an error is reported[^1].

This causes that when querying consul fails the grpc connection can hang
in a state without addresses until the connection idle timeout expires,
despite consul became reachable.

To prevent it, retry periodically querying consul when it fails.
This is done by creating a timer that calls ResolveNow() after it
expires.
The backoff implementation from commit f1f1a15 is recovered and modified
to randomize subtracting or adding the jitter from the delay, use
smaller pauses and returning the default implementation from a
constructor instead of defining it as package variable.

[^1]: grpc/grpc-go#7729
  • Loading branch information
fho committed Oct 21, 2024
1 parent 1a960e9 commit 0091cec
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 30 deletions.
47 changes: 47 additions & 0 deletions consul/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package consul

import (
"math/rand"
"time"
)

type backoff struct {
intervals []time.Duration
jitterPct int
jitterSrc *rand.Rand
}

func defaultBackoff() *backoff {
return &backoff{
intervals: []time.Duration{
10 * time.Millisecond,
50 * time.Millisecond,
250 * time.Millisecond,
500 * time.Millisecond,
time.Second,
2 * time.Second,
3 * time.Second,
4 * time.Second,
5 * time.Second,
},

jitterPct: 10,
jitterSrc: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (b *backoff) Backoff(retry int) time.Duration {
idx := retry

if idx < 0 || idx > len(b.intervals)-1 {
idx = len(b.intervals) - 1
}

d := b.intervals[idx]

if b.jitterSrc.Intn(2) == 0 {
return d + time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct)))))
}

return d - time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct)))))
}
36 changes: 36 additions & 0 deletions consul/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package consul

import (
"fmt"
"testing"
"time"
)

func minBackoff(t time.Duration, jitterPCT int) time.Duration {
return t - t*time.Duration(jitterPCT)/100
}

func maxBackoff(t time.Duration, jitterPCT int) time.Duration {
return t + t*time.Duration(jitterPCT)/100
}

func TestBackoff_IntervalIdxBounds(t *testing.T) {
b := defaultBackoff()

for i := range []int{-100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 99999999} {
t.Run(fmt.Sprintf("retry-%d", i), func(t *testing.T) {
d := b.Backoff(-10)

minB := minBackoff(b.intervals[len(b.intervals)-1], b.jitterPct)
maxB := maxBackoff(b.intervals[len(b.intervals)-1], b.jitterPct)

if d < minB {
t.Errorf("backoff is %s, expecting >%s", d, minB)
}

if d > maxB {
t.Errorf("backoff is %s, expecting <%s", d, minB)
}
})
}
}
85 changes: 59 additions & 26 deletions consul/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"sort"
"strings"
"sync"
"time"

Expand All @@ -25,15 +26,16 @@ const (
var logger = grpclog.Component("grpcconsulresolver")

type consulResolver struct {
cc resolver.ClientConn
consulHealth consulHealthEndpoint
service string
tags []string
healthFilter healthFilter
ctx context.Context
cancel context.CancelFunc
resolveNow chan struct{}
wgStop sync.WaitGroup
cc resolver.ClientConn
consulHealth consulHealthEndpoint
service string
tags []string
healthFilter healthFilter
backoffCounter *backoff
ctx context.Context
cancel context.CancelFunc
resolveNow chan struct{}
wgStop sync.WaitGroup
}

type consulHealthEndpoint interface {
Expand Down Expand Up @@ -73,14 +75,15 @@ func newConsulResolver(
ctx, cancel := context.WithCancel(context.Background())

return &consulResolver{
cc: cc,
consulHealth: health,
service: consulService,
tags: tags,
healthFilter: healthFilter,
ctx: ctx,
cancel: cancel,
resolveNow: make(chan struct{}, 1),
cc: cc,
consulHealth: health,
service: consulService,
tags: tags,
healthFilter: healthFilter,
backoffCounter: defaultBackoff(),
ctx: ctx,
cancel: cancel,
resolveNow: make(chan struct{}, 1),
}, nil
}

Expand All @@ -90,11 +93,20 @@ func (c *consulResolver) start() {
}

func (c *consulResolver) query(opts *consul.QueryOptions) ([]resolver.Address, uint64, error) {
if logger.V(2) {
var tagsDescr, healthyDescr string
if len(c.tags) > 0 {
tagsDescr = "with tags: " + strings.Join(c.tags, ", ")
}
if c.healthFilter == healthFilterOnlyHealthy {
healthyDescr = "healthy "
}

logger.Infof("querying consul for "+healthyDescr+"addresses of service '%s'"+tagsDescr, c.service)
}

entries, meta, err := c.consulHealth.ServiceMultipleTags(c.service, c.tags, c.healthFilter == healthFilterOnlyHealthy, opts)
if err != nil {
logger.Infof("resolving service name '%s' via consul failed: %v\n",
c.service, err)

return nil, 0, err
}

Expand Down Expand Up @@ -170,6 +182,8 @@ func addressesEqual(a, b []resolver.Address) bool {

func (c *consulResolver) watcher() {
var lastReportedAddrs []resolver.Address
var retryTimer *time.Timer
var retryCnt int

opts := (&consul.QueryOptions{}).WithContext(c.ctx)

Expand All @@ -181,17 +195,31 @@ func (c *consulResolver) watcher() {
var err error

lastWaitIndex := opts.WaitIndex

queryStartTime := time.Now()

if retryTimer != nil {
retryTimer.Stop()
}

addrs, opts.WaitIndex, err = c.query(opts)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}

retryIn := c.backoffCounter.Backoff(retryCnt)
logger.Infof("resolving service name '%s' via consul failed, retrying in %s: %s",
c.service, retryIn, err)

retryTimer = time.AfterFunc(c.backoffCounter.Backoff(retryCnt), func() {
c.ResolveNow(resolver.ResolveNowOptions{})
})
retryCnt++

c.cc.ReportError(err)
break
}
retryCnt = 0

if opts.WaitIndex < lastWaitIndex {
logger.Infof("consul responded with a smaller waitIndex (%d) then the previous one (%d), restarting blocking query loop",
Expand All @@ -211,13 +239,14 @@ func (c *consulResolver) watcher() {
// If the service does not exist, an empty addrs slice
// is returned. If we never reported any resolved
// addresses (addrs is nil), we have to report an empty
// set of resolved addresses. It informs the grpc-balancer that resolution is not
// in progress anymore and grpc calls can failFast.
// set of resolved addresses. It informs the
// grpc-balancer that resolution is not in progress
// anymore and grpc calls can failFast.
if addressesEqual(addrs, lastReportedAddrs) {
// If the consul server responds with
// the same data then in the last
// query in less then 50ms, we sleep a
// bit to prevent querying in a tight loop
// the same data than in the last
// query in less than 50ms, sleep a
// bit to prevent querying in a tight loop.
// This should only happen if the consul server
// is buggy but better be safe. :-)
if lastWaitIndex == opts.WaitIndex &&
Expand All @@ -243,6 +272,10 @@ func (c *consulResolver) watcher() {

select {
case <-c.ctx.Done():
if retryTimer != nil {
retryTimer.Stop()
}

return

case <-c.resolveNow:
Expand Down
46 changes: 46 additions & 0 deletions consul/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,49 @@ func TestQueryResultsAreSorted(t *testing.T) {

r.Close()
}

func TestRetryOnError(t *testing.T) {
cc := mocks.NewClientConn()
health := mocks.NewConsulHealthClient()
cleanup := replaceCreateHealthClientFn(
func(*consul.Config) (consulHealthEndpoint, error) {
return health, nil
},
)
t.Cleanup(cleanup)

health.SetRespError(errors.New("ERROR"))

r, err := NewBuilder().Build(resolver.Target{URL: url.URL{Path: "test"}}, cc, resolver.BuildOptions{})
if err != nil {
t.Fatal("Build() failed:", err.Error())
}

t.Cleanup(r.Close)

for health.ResolveCount() < 3 {
time.Sleep(50 * time.Millisecond)
}

t.Logf("%d retries were done", health.ResolveCount())
health.SetRespError(nil)

health.SetRespEntries([]*consul.ServiceEntry{
{
Service: &consul.AgentService{
Address: "227.0.0.1",
Port: 1,
},
},
{
Service: &consul.AgentService{
Address: "127.0.0.1",
Port: 1,
},
},
})

for len(cc.Addrs()) != 2 {
time.Sleep(50 * time.Millisecond)
}
}
16 changes: 12 additions & 4 deletions internal/mocks/consulhealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
)

type ConsulHealthClient struct {
mutex sync.Mutex
entries []*consul.ServiceEntry
queryMeta consul.QueryMeta
err error
mutex sync.Mutex
entries []*consul.ServiceEntry
queryMeta consul.QueryMeta
err error
resolveCnt int
}

func NewConsulHealthClient() *ConsulHealthClient {
Expand Down Expand Up @@ -46,10 +47,17 @@ func (c *ConsulHealthClient) SetRespError(err error) {
func (c *ConsulHealthClient) ServiceMultipleTags(_ string, _ []string, _ bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.resolveCnt++

if q.Context().Err() != nil {
return nil, nil, q.Context().Err()
}

return c.entries, &c.queryMeta, c.err
}

func (c *ConsulHealthClient) ResolveCount() int {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.resolveCnt
}

0 comments on commit 0091cec

Please sign in to comment.