From f964473c1ec0636b3c42b2d264031549e33e7af7 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 24 Aug 2020 20:34:34 -0500 Subject: [PATCH 1/3] Added setter.IsNil and retry package * Added Is(), As(), Unwrap() for go1.13 or greater --- README.md | 75 ++++++++++++- errors/go113.go | 38 +++++++ go.mod | 2 +- go.sum | 2 + retry/backoff.go | 75 +++++++++++++ retry/retry.go | 243 ++++++++++++++++++++++++++++++++++++++++++ retry/retry_test.go | 143 +++++++++++++++++++++++++ setter/setter.go | 13 +++ setter/setter_test.go | 39 +++++++ 9 files changed, 628 insertions(+), 2 deletions(-) create mode 100644 errors/go113.go create mode 100644 retry/backoff.go create mode 100644 retry/retry.go create mode 100644 retry/retry_test.go diff --git a/README.md b/README.md index 698b164e..e2743657 100644 --- a/README.md +++ b/README.md @@ -286,6 +286,20 @@ argFoo = flag.String("foo", "", "foo via cli arg") setter.SetOverride(&config.Foo, *argFoo, os.Env("FOO")) ``` +## Check for Nil interface +```go +func NewImplementation() MyInterface { + // Type and Value are not nil + var p *MyImplementation = nil + return p +} + +thing := NewImplementation() +assert.False(t, thing == nil) +assert.True(t, setter.IsNil(thing)) +assert.False(t, setter.IsNil(&MyImplementation{})) +``` + ## GetEnv import "github.com/mailgun/holster/v3/config" Get a value from an environment variable or return the provided default @@ -490,4 +504,63 @@ func TestUntilConnect(t *testing.T) { // Wait until we can connect, then continue with the test testutil.UntilConnect(t, 10, time.Millisecond*100, ln.Addr().String()) } -``` \ No newline at end of file +``` + +### Retry Until +Retries a function until the function returns error = nil or until the context is deadline is exceeded +```go +ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) +defer cancel() +err := retry.Until(ctx, retry.Interval(time.Millisecond*10), func(ctx context.Context, att int) error { + res, err := http.Get("http://example.com/get") + if err != nil { + return err + } + if res.StatusCode != http.StatusOK { + return errors.New("expected status 200") + } + // Do something with the body + return nil +}) +if err != nil { + panic(err) +} +``` + +Backoff functions provided + +* `retry.Attempts(10, time.Millisecond*10)` retries up to `10` attempts +* `retry.Interval(time.Millisecond*10)` retries at an interval indefinitely or until context is cancelled +* `retry.ExponentialBackoff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2}` retries + at an exponential backoff interval. Can accept an optional `Attempts` which will limit the number of attempts + + +### Retry Async +Runs a function asynchronously and retries it until it succeeds, or the context is +cancelled or `Stop()` is called. This is useful in distributed programming where +you know a remote thing will eventually succeed, but you need to keep trying until +the remote thing succeeds, or we are told to shutdown. + +```go +ctx := context.Background() +async := retry.NewRetryAsync() + +backOff := &retry.ExponentialBackoff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + Attempts: 10, +} + +id := createNewEC2("my-new-server") + +async.Async(id, ctx, backOff, func(ctx context.Context, i int) error { + // Waits for a new EC2 instance to be created then updates the config and exits + if err := updateInstance(id, mySettings); err != nil { + return err + } + return nil +}) +// Wait for all the asyncs to complete +async.Wait() +``` diff --git a/errors/go113.go b/errors/go113.go new file mode 100644 index 00000000..be0d10d0 --- /dev/null +++ b/errors/go113.go @@ -0,0 +1,38 @@ +// +build go1.13 + +package errors + +import ( + stderrors "errors" +) + +// Is reports whether any error in err's chain matches target. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +func Is(err, target error) bool { return stderrors.Is(err, target) } + +// As finds the first error in err's chain that matches target, and if so, sets +// target to that error value and returns true. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error matches target if the error's concrete value is assignable to the value +// pointed to by target, or if the error has a method As(interface{}) bool such that +// As(target) returns true. In the latter case, the As method is responsible for +// setting target. +// +// As will panic if target is not a non-nil pointer to either a type that implements +// error, or to any interface type. As returns false if err is nil. +func As(err error, target interface{}) bool { return stderrors.As(err, target) } + +// Unwrap returns the result of calling the Unwrap method on err, if err's +// type contains an Unwrap method returning error. +// Otherwise, Unwrap returns nil. +func Unwrap(err error) error { + return stderrors.Unwrap(err) +} diff --git a/go.mod b/go.mod index 5165af78..9307a231 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.10.0 // indirect github.com/jonboulle/clockwork v0.1.0 // indirect - github.com/pkg/errors v0.8.1 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.1.0 // indirect github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 5e7677b3..4d78b1e8 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= diff --git a/retry/backoff.go b/retry/backoff.go new file mode 100644 index 00000000..04fc0185 --- /dev/null +++ b/retry/backoff.go @@ -0,0 +1,75 @@ +package retry + +import ( + "math" + "time" +) + +type Backoff interface { + Next() (time.Duration, bool) + Reset() +} + +func Interval(t time.Duration) *ConstBackoff { + return &ConstBackoff{Interval: t} +} + +// Retry indefinitely sleeping for `interval` between each retry +type ConstBackoff struct { + Interval time.Duration + retries int +} + +func (b *ConstBackoff) Reset() {} +func (b *ConstBackoff) Next() (time.Duration, bool) { + b.retries++ + return b.Interval, true +} + +func Attempts(a int, t time.Duration) *AttemptsBackoff { + return &AttemptsBackoff{Interval: t, Attempts: a} +} + +// Retry for `attempts` number of retries sleeping for `interval` between each retry +type AttemptsBackoff struct { + Interval time.Duration + Attempts int + retries int +} + +func (b *AttemptsBackoff) Reset() { b.retries = 0 } +func (b *AttemptsBackoff) Next() (time.Duration, bool) { + b.retries++ + if b.retries < b.Attempts { + return b.Interval, true + } + return b.Interval, false +} + +type ExponentialBackoff struct { + Min, Max time.Duration + Factor float64 + Attempts int + retries int +} + +func (b *ExponentialBackoff) Reset() { b.retries = 0 } +func (b *ExponentialBackoff) Next() (time.Duration, bool) { + interval := b.nextInterval() + b.retries++ + if b.Attempts != 0 && b.retries > b.Attempts { + return interval, false + } + return interval, true +} + +func (b *ExponentialBackoff) nextInterval() time.Duration { + d := time.Duration(float64(b.Min) * math.Pow(b.Factor, float64(b.retries))) + if d > b.Max { + return b.Max + } + if d < b.Min { + return b.Min + } + return d +} diff --git a/retry/retry.go b/retry/retry.go new file mode 100644 index 00000000..c0fd0742 --- /dev/null +++ b/retry/retry.go @@ -0,0 +1,243 @@ +package retry + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/mailgun/holster/v3/syncutil" + "github.com/pkg/errors" +) + +const ( + Cancelled = cancelReason("context cancelled") + Stopped = cancelReason("retry stopped") + AttemptsExhausted = cancelReason("attempts exhausted") +) + +type Func func(context.Context, int) error + +type cancelReason string + +type stopErr struct { + err error +} + +func (e *stopErr) Error() string { + return fmt.Sprintf("stop err: %s", e.err.Error()) +} + +type Err struct { + Err error + Reason cancelReason + Attempts int +} + +func (e *Err) Cause() error { return e.Err } +func (e *Err) Error() string { + return fmt.Sprintf("on attempt '%d'; %s: %s", e.Attempts, e.Reason, e.Err.Error()) +} + +func (e *Err) Is(target error) bool { + _, ok := target.(*Err) + if !ok { + return false + } + return true +} + +// Stop forces the retry to cancel with the provided error +// and retry.Err.Reason == retry.Stopped +func Stop(err error) error { + return &stopErr{err: err} +} + +// Until will retry the provided `retry.Func` until it returns nil or +// the context is cancelled. Optionally users may use `retry.Stop()` to force +// the retry to terminate with an error. Returns a `retry.Err` with +// the included Reason and Attempts +func Until(ctx context.Context, backOff Backoff, f Func) error { + var attempt int + for { + attempt++ + if err := f(ctx, attempt); err != nil { + var stop *stopErr + if errors.As(err, &stop) { + return &Err{Attempts: attempt, Reason: Stopped, Err: stop.err} + } + interval, retry := backOff.Next() + if !retry { + return &Err{Attempts: attempt, Reason: AttemptsExhausted, Err: err} + } + timer := time.NewTimer(interval) + select { + case <-timer.C: + timer.Stop() + continue + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return &Err{Attempts: attempt, Reason: Cancelled, Err: err} + } + } + return nil + } +} + +type AsyncItem struct { + Retrying bool + Attempts int + Err error +} + +func (s *AsyncItem) Error() string { + return s.Err.Error() +} + +type Async struct { + asyncs map[interface{}]AsyncItem + mutex *sync.Mutex + ctx context.Context + wg syncutil.WaitGroup +} + +// Given a function that takes a context, run the provided function; if it fails, retry the function asynchronously +// and return Async{}. Subsequent calls to with the same 'key' will return Async{} if the function is still +// retrying, this continues until the retry period has exhausted or the context expires or is cancelled. +// Then the final error returned by f() is returned to the caller on the final call with the same 'key' +// +// The code assumes the caller will continue to call `Async()` until either the retries have exhausted or +// an Async{Retrying: false} is returned. +func NewRetryAsync() *Async { + return &Async{ + mutex: &sync.Mutex{}, + asyncs: make(map[interface{}]AsyncItem), + } +} + +// Return the number of active async retries +func (s *Async) Len() int { + s.mutex.Lock() + defer s.mutex.Unlock() + return len(s.asyncs) +} + +// Stop forces stop of all running async retries +func (s *Async) Stop() { + s.wg.Stop() +} + +// Wait waits for all running async retries to complete +func (s *Async) Wait() { + s.wg.Wait() +} + +func (s *Async) Async(key interface{}, ctx context.Context, backOff Backoff, + f func(context.Context, int) error) *AsyncItem { + + // does this key have an existing retry running? + s.mutex.Lock() + if async, ok := s.asyncs[key]; ok { + // Remove entries that are no longer re-trying + if !async.Retrying { + delete(s.asyncs, key) + } + s.mutex.Unlock() + return &async + } + s.mutex.Unlock() + + // Attempt to run the function, if successful return nil + err := f(s.ctx, 0) + if err == nil { + return nil + } + + async := AsyncItem{ + Retrying: true, + Err: err, + } + + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + + // Create an go routine to run the retry + s.wg.Until(func(done chan struct{}) bool { + //var start = time.Now() + async := AsyncItem{Retrying: true} + + for { + // Retry the function + async.Attempts++ + async.Err = f(ctx, async.Attempts) + + // If success, then indicate we are no longer retrying + if async.Err == nil { + async.Retrying = false + + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + return false + } + + // Record the error and attempts + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + + interval, retry := backOff.Next() + if !retry { + async.Retrying = false + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + return false + } + + timer := time.NewTimer(interval) + select { + case <-timer.C: + timer.Stop() + case <-ctx.Done(): + async.Retrying = false + + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + timer.Stop() + return false + case <-done: + // immediate abort, abandon all work + if !timer.Stop() { + <-timer.C + } + return false + } + } + }) + return &async +} + +// Return errors from failed asyncs and clean up the internal async map +func (s *Async) Errs() map[interface{}]AsyncItem { + results := make(map[interface{}]AsyncItem) + s.mutex.Lock() + + for key, async := range s.asyncs { + // Remove entries that are no longer re-trying + if !async.Retrying { + delete(s.asyncs, key) + + // Only include async's that had an error + if async.Err != nil { + results[key] = async + } + } + } + s.mutex.Unlock() + return results +} diff --git a/retry/retry_test.go b/retry/retry_test.go new file mode 100644 index 00000000..362e08b1 --- /dev/null +++ b/retry/retry_test.go @@ -0,0 +1,143 @@ +package retry_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/mailgun/holster/v3/retry" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var errCause = errors.New("cause of error") + +func TestUntilInterval(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) + defer cancel() + err := retry.Until(ctx, retry.Interval(time.Millisecond*10), func(ctx context.Context, att int) error { + return errCause + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + + // Inspect the error + var retryErr *retry.Err + assert.True(t, errors.As(err, &retryErr)) + assert.Equal(t, 19, retryErr.Attempts) + assert.Equal(t, retry.Cancelled, retryErr.Reason) + + // Cause() works as expected + cause := errors.Cause(err) + assert.Equal(t, errCause, cause) + assert.Equal(t, "on attempt '19'; context cancelled: cause of error", err.Error()) +} + +func TestUntilNoError(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) + defer cancel() + err := retry.Until(ctx, retry.Interval(time.Millisecond*10), func(ctx context.Context, att int) error { + return nil + }) + + require.NoError(t, err) + assert.False(t, errors.Is(err, &retry.Err{})) +} + +func TestUntilAttempts(t *testing.T) { + ctx := context.Background() + err := retry.Until(ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + assert.Equal(t, "on attempt '10'; attempts exhausted: failed attempt '10'", err.Error()) +} + +func TestUntilStopped(t *testing.T) { + ctx := context.Background() + err := retry.Until(ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, att int) error { + return retry.Stop(fmt.Errorf("failed attempt '%d'", att)) + }) + require.Error(t, err) + // Inspect the error + var retryErr *retry.Err + assert.True(t, errors.As(err, &retryErr)) + assert.Equal(t, 1, retryErr.Attempts) + assert.Equal(t, retry.Stopped, retryErr.Reason) + assert.Equal(t, "on attempt '1'; retry stopped: failed attempt '1'", err.Error()) +} + +func TestUntilExponential(t *testing.T) { + ctx := context.Background() + backOff := &retry.ExponentialBackoff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + Attempts: 10, + } + + err := retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + assert.Equal(t, "on attempt '11'; attempts exhausted: failed attempt '11'", err.Error()) +} + +func TestUntilExponentialCancelled(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + backOff := &retry.ExponentialBackoff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + } + + err := retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + assert.Equal(t, "on attempt '7'; context cancelled: failed attempt '7'", err.Error()) +} + +func TestAsync(t *testing.T) { + ctx := context.Background() + async := retry.NewRetryAsync() + async.Async("one", ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, i int) error { return errCause }) + async.Async("two", ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, i int) error { return errCause }) + async.Async("thr", ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, i int) error { return errCause }) + + // Creates the async retry + f1 := async.Async("for", ctx, retry.Attempts(10, time.Millisecond*100), func(ctx context.Context, i int) error { return errCause }) + // Returns a handler to the currently running async retry + f2 := async.Async("for", ctx, retry.Attempts(10, time.Millisecond*100), func(ctx context.Context, i int) error { return errCause }) + + // The are the same + assert.Equal(t, f1, f2) + // Should contain the error for our inspection + assert.Equal(t, errCause, f2.Err) + // Should report that the retry is still running + assert.Equal(t, true, f2.Retrying) + + // Retries are all still running + time.Sleep(time.Millisecond * 10) + assert.Equal(t, 4, async.Len()) + + // We can inspect the errors for all running async retries + errs := async.Errs() + require.NotNil(t, errs) + for _, e := range errs { + assert.Equal(t, e, errCause) + } + + // Wait for all the async retries to exhaust their timeouts + async.Wait() +} diff --git a/setter/setter.go b/setter/setter.go index 80ca5aef..712c5b30 100644 --- a/setter/setter.go +++ b/setter/setter.go @@ -113,3 +113,16 @@ func IsZeroValue(value reflect.Value) bool { } return false } + +// Returns true if the interface value is nil or the pointed to value is nil, +// for instance a map, array, channel or slice +func IsNil(i interface{}) bool { + if i == nil { + return true + } + switch reflect.TypeOf(i).Kind() { + case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice: + return reflect.ValueOf(i).IsNil() + } + return false +} diff --git a/setter/setter_test.go b/setter/setter_test.go index 563f16d7..181a0553 100644 --- a/setter/setter_test.go +++ b/setter/setter_test.go @@ -106,3 +106,42 @@ func TestIfEmptyNonPtrPanic(t *testing.T) { setter.SetDefault(thing, "thrawn") assert.Fail(t, "Should have caught panic") } + +type MyInterface interface { + Thing() string +} + +type MyImplementation struct{} + +func (s *MyImplementation) Thing() string { + return "thing" +} + +func NewImplementation() MyInterface { + // Type and Value are not nil + var p *MyImplementation = nil + return p +} + +type MyStruct struct { + T MyInterface +} + +func NewMyStruct(t MyInterface) *MyStruct { + return &MyStruct{T: t} +} + +func TestIsNil(t *testing.T) { + m := MyStruct{T: &MyImplementation{}} + assert.True(t, m.T != nil) + m.T = nil + assert.True(t, m.T == nil) + + o := NewMyStruct(nil) + assert.True(t, o.T == nil) + + thing := NewImplementation() + assert.False(t, thing == nil) + assert.True(t, setter.IsNil(thing)) + assert.False(t, setter.IsNil(&MyImplementation{})) +} From 3ddc402e5b8110224ada71f68d7785e29068ef26 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 14 Sep 2020 16:50:24 -0500 Subject: [PATCH 2/3] Fixed race condition when re-using backoff in go routines --- retry/backoff.go | 33 +++++++++++++++++---------------- retry/retry_test.go | 23 +++++++++++++++++++++++ 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/retry/backoff.go b/retry/backoff.go index 04fc0185..e929a083 100644 --- a/retry/backoff.go +++ b/retry/backoff.go @@ -2,6 +2,7 @@ package retry import ( "math" + "sync/atomic" "time" ) @@ -17,30 +18,30 @@ func Interval(t time.Duration) *ConstBackoff { // Retry indefinitely sleeping for `interval` between each retry type ConstBackoff struct { Interval time.Duration - retries int + retries int64 } func (b *ConstBackoff) Reset() {} func (b *ConstBackoff) Next() (time.Duration, bool) { - b.retries++ + atomic.AddInt64(&b.retries, 1) return b.Interval, true } func Attempts(a int, t time.Duration) *AttemptsBackoff { - return &AttemptsBackoff{Interval: t, Attempts: a} + return &AttemptsBackoff{Interval: t, Attempts: int64(a)} } // Retry for `attempts` number of retries sleeping for `interval` between each retry type AttemptsBackoff struct { Interval time.Duration - Attempts int - retries int + Attempts int64 + retries int64 } -func (b *AttemptsBackoff) Reset() { b.retries = 0 } +func (b *AttemptsBackoff) Reset() { atomic.StoreInt64(&b.retries, 0) } func (b *AttemptsBackoff) Next() (time.Duration, bool) { - b.retries++ - if b.retries < b.Attempts { + retries := atomic.AddInt64(&b.retries, 1) + if retries < b.Attempts { return b.Interval, true } return b.Interval, false @@ -49,22 +50,22 @@ func (b *AttemptsBackoff) Next() (time.Duration, bool) { type ExponentialBackoff struct { Min, Max time.Duration Factor float64 - Attempts int - retries int + Attempts int64 + retries int64 } -func (b *ExponentialBackoff) Reset() { b.retries = 0 } +func (b *ExponentialBackoff) Reset() { atomic.StoreInt64(&b.retries, 0) } func (b *ExponentialBackoff) Next() (time.Duration, bool) { - interval := b.nextInterval() - b.retries++ - if b.Attempts != 0 && b.retries > b.Attempts { + retries := atomic.AddInt64(&b.retries, 1) + interval := b.nextInterval(retries) + if b.Attempts != 0 && retries > b.Attempts { return interval, false } return interval, true } -func (b *ExponentialBackoff) nextInterval() time.Duration { - d := time.Duration(float64(b.Min) * math.Pow(b.Factor, float64(b.retries))) +func (b *ExponentialBackoff) nextInterval(retries int64) time.Duration { + d := time.Duration(float64(b.Min) * math.Pow(b.Factor, float64(retries))) if d > b.Max { return b.Max } diff --git a/retry/retry_test.go b/retry/retry_test.go index 362e08b1..69ee52d1 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -3,6 +3,7 @@ package retry_test import ( "context" "fmt" + "sync" "testing" "time" @@ -141,3 +142,25 @@ func TestAsync(t *testing.T) { // Wait for all the async retries to exhaust their timeouts async.Wait() } + +func TestBackoffRace(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + backOff := &retry.ExponentialBackoff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + wg.Done() + }() + } + wg.Wait() +} From c89e6be9a5426e477b0026b22f74cab11fd8b4e4 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 17 Sep 2020 11:39:30 -0500 Subject: [PATCH 3/3] Renamed Backoff to Backoff, etc... --- collections/expire_cache_test.go | 33 ++++++++++++++++++ retry/backoff.go | 57 +++++++++++++++++++++++--------- retry/retry.go | 6 ++-- retry/retry_test.go | 19 ++++++++--- 4 files changed, 93 insertions(+), 22 deletions(-) create mode 100644 collections/expire_cache_test.go diff --git a/collections/expire_cache_test.go b/collections/expire_cache_test.go new file mode 100644 index 00000000..2c18a0f2 --- /dev/null +++ b/collections/expire_cache_test.go @@ -0,0 +1,33 @@ +package collections_test + +import ( + "testing" + "time" + + "github.com/mailgun/holster/v3/collections" + "github.com/stretchr/testify/assert" +) + +func TestNewExpireCache(t *testing.T) { + ec := collections.NewExpireCache(time.Millisecond * 100) + + ec.Add("one", "one") + time.Sleep(time.Millisecond * 100) + + var runs int + ec.Each(1, func(key interface{}, value interface{}) error { + assert.Equal(t, key, "one") + assert.Equal(t, value, "one") + runs++ + return nil + }) + assert.Equal(t, runs, 1) + + // Should NOT be in the cache + time.Sleep(time.Millisecond * 100) + ec.Each(1, func(key interface{}, value interface{}) error { + runs++ + return nil + }) + assert.Equal(t, runs, 1) +} diff --git a/retry/backoff.go b/retry/backoff.go index e929a083..feef5e1c 100644 --- a/retry/backoff.go +++ b/retry/backoff.go @@ -6,56 +6,74 @@ import ( "time" ) -type Backoff interface { +type BackOff interface { + New() BackOff Next() (time.Duration, bool) + NumRetries() int Reset() } -func Interval(t time.Duration) *ConstBackoff { - return &ConstBackoff{Interval: t} +func Interval(t time.Duration) *ConstBackOff { + return &ConstBackOff{Interval: t} } // Retry indefinitely sleeping for `interval` between each retry -type ConstBackoff struct { +type ConstBackOff struct { Interval time.Duration retries int64 } -func (b *ConstBackoff) Reset() {} -func (b *ConstBackoff) Next() (time.Duration, bool) { +func (b *ConstBackOff) NumRetries() int { return int(atomic.LoadInt64(&b.retries)) } +func (b *ConstBackOff) Reset() {} +func (b *ConstBackOff) Next() (time.Duration, bool) { atomic.AddInt64(&b.retries, 1) return b.Interval, true } +func (b *ConstBackOff) New() BackOff { + return &ConstBackOff{ + retries: atomic.LoadInt64(&b.retries), + Interval: b.Interval, + } +} -func Attempts(a int, t time.Duration) *AttemptsBackoff { - return &AttemptsBackoff{Interval: t, Attempts: int64(a)} +func Attempts(a int, t time.Duration) *AttemptsBackOff { + return &AttemptsBackOff{Interval: t, Attempts: int64(a)} } // Retry for `attempts` number of retries sleeping for `interval` between each retry -type AttemptsBackoff struct { +type AttemptsBackOff struct { Interval time.Duration Attempts int64 retries int64 } -func (b *AttemptsBackoff) Reset() { atomic.StoreInt64(&b.retries, 0) } -func (b *AttemptsBackoff) Next() (time.Duration, bool) { +func (b *AttemptsBackOff) NumRetries() int { return int(atomic.LoadInt64(&b.retries)) } +func (b *AttemptsBackOff) Reset() { atomic.StoreInt64(&b.retries, 0) } +func (b *AttemptsBackOff) Next() (time.Duration, bool) { retries := atomic.AddInt64(&b.retries, 1) if retries < b.Attempts { return b.Interval, true } return b.Interval, false } +func (b *AttemptsBackOff) New() BackOff { + return &AttemptsBackOff{ + retries: atomic.LoadInt64(&b.retries), + Interval: b.Interval, + Attempts: b.Attempts, + } +} -type ExponentialBackoff struct { +type ExponentialBackOff struct { Min, Max time.Duration Factor float64 Attempts int64 retries int64 } -func (b *ExponentialBackoff) Reset() { atomic.StoreInt64(&b.retries, 0) } -func (b *ExponentialBackoff) Next() (time.Duration, bool) { +func (b *ExponentialBackOff) NumRetries() int { return int(atomic.LoadInt64(&b.retries)) } +func (b *ExponentialBackOff) Reset() { atomic.StoreInt64(&b.retries, 0) } +func (b *ExponentialBackOff) Next() (time.Duration, bool) { retries := atomic.AddInt64(&b.retries, 1) interval := b.nextInterval(retries) if b.Attempts != 0 && retries > b.Attempts { @@ -63,8 +81,17 @@ func (b *ExponentialBackoff) Next() (time.Duration, bool) { } return interval, true } +func (b *ExponentialBackOff) New() BackOff { + return &ExponentialBackOff{ + retries: atomic.LoadInt64(&b.retries), + Attempts: b.Attempts, + Factor: b.Factor, + Min: b.Min, + Max: b.Max, + } +} -func (b *ExponentialBackoff) nextInterval(retries int64) time.Duration { +func (b *ExponentialBackOff) nextInterval(retries int64) time.Duration { d := time.Duration(float64(b.Min) * math.Pow(b.Factor, float64(retries))) if d > b.Max { return b.Max diff --git a/retry/retry.go b/retry/retry.go index c0fd0742..f398e31d 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -57,7 +57,7 @@ func Stop(err error) error { // the context is cancelled. Optionally users may use `retry.Stop()` to force // the retry to terminate with an error. Returns a `retry.Err` with // the included Reason and Attempts -func Until(ctx context.Context, backOff Backoff, f Func) error { +func Until(ctx context.Context, backOff BackOff, f Func) error { var attempt int for { attempt++ @@ -134,7 +134,7 @@ func (s *Async) Wait() { s.wg.Wait() } -func (s *Async) Async(key interface{}, ctx context.Context, backOff Backoff, +func (s *Async) Async(key interface{}, ctx context.Context, bo BackOff, f func(context.Context, int) error) *AsyncItem { // does this key have an existing retry running? @@ -189,7 +189,7 @@ func (s *Async) Async(key interface{}, ctx context.Context, backOff Backoff, s.asyncs[key] = async s.mutex.Unlock() - interval, retry := backOff.Next() + interval, retry := bo.Next() if !retry { async.Retrying = false s.mutex.Lock() diff --git a/retry/retry_test.go b/retry/retry_test.go index 69ee52d1..1c44f73f 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -75,7 +75,7 @@ func TestUntilStopped(t *testing.T) { func TestUntilExponential(t *testing.T) { ctx := context.Background() - backOff := &retry.ExponentialBackoff{ + backOff := &retry.ExponentialBackOff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2, @@ -94,7 +94,7 @@ func TestUntilExponential(t *testing.T) { func TestUntilExponentialCancelled(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() - backOff := &retry.ExponentialBackoff{ + backOff := &retry.ExponentialBackOff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2, @@ -106,7 +106,7 @@ func TestUntilExponentialCancelled(t *testing.T) { require.Error(t, err) assert.True(t, errors.Is(err, &retry.Err{})) - assert.Equal(t, "on attempt '7'; context cancelled: failed attempt '7'", err.Error()) + assert.Equal(t, "on attempt '6'; context cancelled: failed attempt '6'", err.Error()) } func TestAsync(t *testing.T) { @@ -146,7 +146,7 @@ func TestAsync(t *testing.T) { func TestBackoffRace(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() - backOff := &retry.ExponentialBackoff{ + backOff := &retry.ExponentialBackOff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2, @@ -157,6 +157,7 @@ func TestBackoffRace(t *testing.T) { wg.Add(1) go func() { retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + t.Logf("Attempts: %d", backOff.NumRetries()) return fmt.Errorf("failed attempt '%d'", att) }) wg.Done() @@ -164,3 +165,13 @@ func TestBackoffRace(t *testing.T) { } wg.Wait() } + +func TestBackOffNew(t *testing.T) { + backOff := &retry.ExponentialBackOff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + } + bo := backOff.New() + assert.Equal(t, bo, backOff) +}