diff --git a/README.md b/README.md index 698b164e..e2743657 100644 --- a/README.md +++ b/README.md @@ -286,6 +286,20 @@ argFoo = flag.String("foo", "", "foo via cli arg") setter.SetOverride(&config.Foo, *argFoo, os.Env("FOO")) ``` +## Check for Nil interface +```go +func NewImplementation() MyInterface { + // Type and Value are not nil + var p *MyImplementation = nil + return p +} + +thing := NewImplementation() +assert.False(t, thing == nil) +assert.True(t, setter.IsNil(thing)) +assert.False(t, setter.IsNil(&MyImplementation{})) +``` + ## GetEnv import "github.com/mailgun/holster/v3/config" Get a value from an environment variable or return the provided default @@ -490,4 +504,63 @@ func TestUntilConnect(t *testing.T) { // Wait until we can connect, then continue with the test testutil.UntilConnect(t, 10, time.Millisecond*100, ln.Addr().String()) } -``` \ No newline at end of file +``` + +### Retry Until +Retries a function until the function returns error = nil or until the context is deadline is exceeded +```go +ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) +defer cancel() +err := retry.Until(ctx, retry.Interval(time.Millisecond*10), func(ctx context.Context, att int) error { + res, err := http.Get("http://example.com/get") + if err != nil { + return err + } + if res.StatusCode != http.StatusOK { + return errors.New("expected status 200") + } + // Do something with the body + return nil +}) +if err != nil { + panic(err) +} +``` + +Backoff functions provided + +* `retry.Attempts(10, time.Millisecond*10)` retries up to `10` attempts +* `retry.Interval(time.Millisecond*10)` retries at an interval indefinitely or until context is cancelled +* `retry.ExponentialBackoff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2}` retries + at an exponential backoff interval. Can accept an optional `Attempts` which will limit the number of attempts + + +### Retry Async +Runs a function asynchronously and retries it until it succeeds, or the context is +cancelled or `Stop()` is called. This is useful in distributed programming where +you know a remote thing will eventually succeed, but you need to keep trying until +the remote thing succeeds, or we are told to shutdown. + +```go +ctx := context.Background() +async := retry.NewRetryAsync() + +backOff := &retry.ExponentialBackoff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + Attempts: 10, +} + +id := createNewEC2("my-new-server") + +async.Async(id, ctx, backOff, func(ctx context.Context, i int) error { + // Waits for a new EC2 instance to be created then updates the config and exits + if err := updateInstance(id, mySettings); err != nil { + return err + } + return nil +}) +// Wait for all the asyncs to complete +async.Wait() +``` diff --git a/collections/expire_cache_test.go b/collections/expire_cache_test.go new file mode 100644 index 00000000..2c18a0f2 --- /dev/null +++ b/collections/expire_cache_test.go @@ -0,0 +1,33 @@ +package collections_test + +import ( + "testing" + "time" + + "github.com/mailgun/holster/v3/collections" + "github.com/stretchr/testify/assert" +) + +func TestNewExpireCache(t *testing.T) { + ec := collections.NewExpireCache(time.Millisecond * 100) + + ec.Add("one", "one") + time.Sleep(time.Millisecond * 100) + + var runs int + ec.Each(1, func(key interface{}, value interface{}) error { + assert.Equal(t, key, "one") + assert.Equal(t, value, "one") + runs++ + return nil + }) + assert.Equal(t, runs, 1) + + // Should NOT be in the cache + time.Sleep(time.Millisecond * 100) + ec.Each(1, func(key interface{}, value interface{}) error { + runs++ + return nil + }) + assert.Equal(t, runs, 1) +} diff --git a/errors/go113.go b/errors/go113.go new file mode 100644 index 00000000..be0d10d0 --- /dev/null +++ b/errors/go113.go @@ -0,0 +1,38 @@ +// +build go1.13 + +package errors + +import ( + stderrors "errors" +) + +// Is reports whether any error in err's chain matches target. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +func Is(err, target error) bool { return stderrors.Is(err, target) } + +// As finds the first error in err's chain that matches target, and if so, sets +// target to that error value and returns true. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error matches target if the error's concrete value is assignable to the value +// pointed to by target, or if the error has a method As(interface{}) bool such that +// As(target) returns true. In the latter case, the As method is responsible for +// setting target. +// +// As will panic if target is not a non-nil pointer to either a type that implements +// error, or to any interface type. As returns false if err is nil. +func As(err error, target interface{}) bool { return stderrors.As(err, target) } + +// Unwrap returns the result of calling the Unwrap method on err, if err's +// type contains an Unwrap method returning error. +// Otherwise, Unwrap returns nil. +func Unwrap(err error) error { + return stderrors.Unwrap(err) +} diff --git a/go.mod b/go.mod index 5165af78..9307a231 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.10.0 // indirect github.com/jonboulle/clockwork v0.1.0 // indirect - github.com/pkg/errors v0.8.1 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.1.0 // indirect github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 5e7677b3..4d78b1e8 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= diff --git a/retry/backoff.go b/retry/backoff.go new file mode 100644 index 00000000..feef5e1c --- /dev/null +++ b/retry/backoff.go @@ -0,0 +1,103 @@ +package retry + +import ( + "math" + "sync/atomic" + "time" +) + +type BackOff interface { + New() BackOff + Next() (time.Duration, bool) + NumRetries() int + Reset() +} + +func Interval(t time.Duration) *ConstBackOff { + return &ConstBackOff{Interval: t} +} + +// Retry indefinitely sleeping for `interval` between each retry +type ConstBackOff struct { + Interval time.Duration + retries int64 +} + +func (b *ConstBackOff) NumRetries() int { return int(atomic.LoadInt64(&b.retries)) } +func (b *ConstBackOff) Reset() {} +func (b *ConstBackOff) Next() (time.Duration, bool) { + atomic.AddInt64(&b.retries, 1) + return b.Interval, true +} +func (b *ConstBackOff) New() BackOff { + return &ConstBackOff{ + retries: atomic.LoadInt64(&b.retries), + Interval: b.Interval, + } +} + +func Attempts(a int, t time.Duration) *AttemptsBackOff { + return &AttemptsBackOff{Interval: t, Attempts: int64(a)} +} + +// Retry for `attempts` number of retries sleeping for `interval` between each retry +type AttemptsBackOff struct { + Interval time.Duration + Attempts int64 + retries int64 +} + +func (b *AttemptsBackOff) NumRetries() int { return int(atomic.LoadInt64(&b.retries)) } +func (b *AttemptsBackOff) Reset() { atomic.StoreInt64(&b.retries, 0) } +func (b *AttemptsBackOff) Next() (time.Duration, bool) { + retries := atomic.AddInt64(&b.retries, 1) + if retries < b.Attempts { + return b.Interval, true + } + return b.Interval, false +} +func (b *AttemptsBackOff) New() BackOff { + return &AttemptsBackOff{ + retries: atomic.LoadInt64(&b.retries), + Interval: b.Interval, + Attempts: b.Attempts, + } +} + +type ExponentialBackOff struct { + Min, Max time.Duration + Factor float64 + Attempts int64 + retries int64 +} + +func (b *ExponentialBackOff) NumRetries() int { return int(atomic.LoadInt64(&b.retries)) } +func (b *ExponentialBackOff) Reset() { atomic.StoreInt64(&b.retries, 0) } +func (b *ExponentialBackOff) Next() (time.Duration, bool) { + retries := atomic.AddInt64(&b.retries, 1) + interval := b.nextInterval(retries) + if b.Attempts != 0 && retries > b.Attempts { + return interval, false + } + return interval, true +} +func (b *ExponentialBackOff) New() BackOff { + return &ExponentialBackOff{ + retries: atomic.LoadInt64(&b.retries), + Attempts: b.Attempts, + Factor: b.Factor, + Min: b.Min, + Max: b.Max, + } +} + +func (b *ExponentialBackOff) nextInterval(retries int64) time.Duration { + d := time.Duration(float64(b.Min) * math.Pow(b.Factor, float64(retries))) + if d > b.Max { + return b.Max + } + if d < b.Min { + return b.Min + } + return d +} diff --git a/retry/retry.go b/retry/retry.go new file mode 100644 index 00000000..f398e31d --- /dev/null +++ b/retry/retry.go @@ -0,0 +1,243 @@ +package retry + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/mailgun/holster/v3/syncutil" + "github.com/pkg/errors" +) + +const ( + Cancelled = cancelReason("context cancelled") + Stopped = cancelReason("retry stopped") + AttemptsExhausted = cancelReason("attempts exhausted") +) + +type Func func(context.Context, int) error + +type cancelReason string + +type stopErr struct { + err error +} + +func (e *stopErr) Error() string { + return fmt.Sprintf("stop err: %s", e.err.Error()) +} + +type Err struct { + Err error + Reason cancelReason + Attempts int +} + +func (e *Err) Cause() error { return e.Err } +func (e *Err) Error() string { + return fmt.Sprintf("on attempt '%d'; %s: %s", e.Attempts, e.Reason, e.Err.Error()) +} + +func (e *Err) Is(target error) bool { + _, ok := target.(*Err) + if !ok { + return false + } + return true +} + +// Stop forces the retry to cancel with the provided error +// and retry.Err.Reason == retry.Stopped +func Stop(err error) error { + return &stopErr{err: err} +} + +// Until will retry the provided `retry.Func` until it returns nil or +// the context is cancelled. Optionally users may use `retry.Stop()` to force +// the retry to terminate with an error. Returns a `retry.Err` with +// the included Reason and Attempts +func Until(ctx context.Context, backOff BackOff, f Func) error { + var attempt int + for { + attempt++ + if err := f(ctx, attempt); err != nil { + var stop *stopErr + if errors.As(err, &stop) { + return &Err{Attempts: attempt, Reason: Stopped, Err: stop.err} + } + interval, retry := backOff.Next() + if !retry { + return &Err{Attempts: attempt, Reason: AttemptsExhausted, Err: err} + } + timer := time.NewTimer(interval) + select { + case <-timer.C: + timer.Stop() + continue + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return &Err{Attempts: attempt, Reason: Cancelled, Err: err} + } + } + return nil + } +} + +type AsyncItem struct { + Retrying bool + Attempts int + Err error +} + +func (s *AsyncItem) Error() string { + return s.Err.Error() +} + +type Async struct { + asyncs map[interface{}]AsyncItem + mutex *sync.Mutex + ctx context.Context + wg syncutil.WaitGroup +} + +// Given a function that takes a context, run the provided function; if it fails, retry the function asynchronously +// and return Async{}. Subsequent calls to with the same 'key' will return Async{} if the function is still +// retrying, this continues until the retry period has exhausted or the context expires or is cancelled. +// Then the final error returned by f() is returned to the caller on the final call with the same 'key' +// +// The code assumes the caller will continue to call `Async()` until either the retries have exhausted or +// an Async{Retrying: false} is returned. +func NewRetryAsync() *Async { + return &Async{ + mutex: &sync.Mutex{}, + asyncs: make(map[interface{}]AsyncItem), + } +} + +// Return the number of active async retries +func (s *Async) Len() int { + s.mutex.Lock() + defer s.mutex.Unlock() + return len(s.asyncs) +} + +// Stop forces stop of all running async retries +func (s *Async) Stop() { + s.wg.Stop() +} + +// Wait waits for all running async retries to complete +func (s *Async) Wait() { + s.wg.Wait() +} + +func (s *Async) Async(key interface{}, ctx context.Context, bo BackOff, + f func(context.Context, int) error) *AsyncItem { + + // does this key have an existing retry running? + s.mutex.Lock() + if async, ok := s.asyncs[key]; ok { + // Remove entries that are no longer re-trying + if !async.Retrying { + delete(s.asyncs, key) + } + s.mutex.Unlock() + return &async + } + s.mutex.Unlock() + + // Attempt to run the function, if successful return nil + err := f(s.ctx, 0) + if err == nil { + return nil + } + + async := AsyncItem{ + Retrying: true, + Err: err, + } + + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + + // Create an go routine to run the retry + s.wg.Until(func(done chan struct{}) bool { + //var start = time.Now() + async := AsyncItem{Retrying: true} + + for { + // Retry the function + async.Attempts++ + async.Err = f(ctx, async.Attempts) + + // If success, then indicate we are no longer retrying + if async.Err == nil { + async.Retrying = false + + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + return false + } + + // Record the error and attempts + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + + interval, retry := bo.Next() + if !retry { + async.Retrying = false + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + return false + } + + timer := time.NewTimer(interval) + select { + case <-timer.C: + timer.Stop() + case <-ctx.Done(): + async.Retrying = false + + s.mutex.Lock() + s.asyncs[key] = async + s.mutex.Unlock() + timer.Stop() + return false + case <-done: + // immediate abort, abandon all work + if !timer.Stop() { + <-timer.C + } + return false + } + } + }) + return &async +} + +// Return errors from failed asyncs and clean up the internal async map +func (s *Async) Errs() map[interface{}]AsyncItem { + results := make(map[interface{}]AsyncItem) + s.mutex.Lock() + + for key, async := range s.asyncs { + // Remove entries that are no longer re-trying + if !async.Retrying { + delete(s.asyncs, key) + + // Only include async's that had an error + if async.Err != nil { + results[key] = async + } + } + } + s.mutex.Unlock() + return results +} diff --git a/retry/retry_test.go b/retry/retry_test.go new file mode 100644 index 00000000..1c44f73f --- /dev/null +++ b/retry/retry_test.go @@ -0,0 +1,177 @@ +package retry_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/mailgun/holster/v3/retry" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var errCause = errors.New("cause of error") + +func TestUntilInterval(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) + defer cancel() + err := retry.Until(ctx, retry.Interval(time.Millisecond*10), func(ctx context.Context, att int) error { + return errCause + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + + // Inspect the error + var retryErr *retry.Err + assert.True(t, errors.As(err, &retryErr)) + assert.Equal(t, 19, retryErr.Attempts) + assert.Equal(t, retry.Cancelled, retryErr.Reason) + + // Cause() works as expected + cause := errors.Cause(err) + assert.Equal(t, errCause, cause) + assert.Equal(t, "on attempt '19'; context cancelled: cause of error", err.Error()) +} + +func TestUntilNoError(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) + defer cancel() + err := retry.Until(ctx, retry.Interval(time.Millisecond*10), func(ctx context.Context, att int) error { + return nil + }) + + require.NoError(t, err) + assert.False(t, errors.Is(err, &retry.Err{})) +} + +func TestUntilAttempts(t *testing.T) { + ctx := context.Background() + err := retry.Until(ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + assert.Equal(t, "on attempt '10'; attempts exhausted: failed attempt '10'", err.Error()) +} + +func TestUntilStopped(t *testing.T) { + ctx := context.Background() + err := retry.Until(ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, att int) error { + return retry.Stop(fmt.Errorf("failed attempt '%d'", att)) + }) + require.Error(t, err) + // Inspect the error + var retryErr *retry.Err + assert.True(t, errors.As(err, &retryErr)) + assert.Equal(t, 1, retryErr.Attempts) + assert.Equal(t, retry.Stopped, retryErr.Reason) + assert.Equal(t, "on attempt '1'; retry stopped: failed attempt '1'", err.Error()) +} + +func TestUntilExponential(t *testing.T) { + ctx := context.Background() + backOff := &retry.ExponentialBackOff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + Attempts: 10, + } + + err := retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + assert.Equal(t, "on attempt '11'; attempts exhausted: failed attempt '11'", err.Error()) +} + +func TestUntilExponentialCancelled(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + backOff := &retry.ExponentialBackOff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + } + + err := retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + return fmt.Errorf("failed attempt '%d'", att) + }) + + require.Error(t, err) + assert.True(t, errors.Is(err, &retry.Err{})) + assert.Equal(t, "on attempt '6'; context cancelled: failed attempt '6'", err.Error()) +} + +func TestAsync(t *testing.T) { + ctx := context.Background() + async := retry.NewRetryAsync() + async.Async("one", ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, i int) error { return errCause }) + async.Async("two", ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, i int) error { return errCause }) + async.Async("thr", ctx, retry.Attempts(10, time.Millisecond*10), func(ctx context.Context, i int) error { return errCause }) + + // Creates the async retry + f1 := async.Async("for", ctx, retry.Attempts(10, time.Millisecond*100), func(ctx context.Context, i int) error { return errCause }) + // Returns a handler to the currently running async retry + f2 := async.Async("for", ctx, retry.Attempts(10, time.Millisecond*100), func(ctx context.Context, i int) error { return errCause }) + + // The are the same + assert.Equal(t, f1, f2) + // Should contain the error for our inspection + assert.Equal(t, errCause, f2.Err) + // Should report that the retry is still running + assert.Equal(t, true, f2.Retrying) + + // Retries are all still running + time.Sleep(time.Millisecond * 10) + assert.Equal(t, 4, async.Len()) + + // We can inspect the errors for all running async retries + errs := async.Errs() + require.NotNil(t, errs) + for _, e := range errs { + assert.Equal(t, e, errCause) + } + + // Wait for all the async retries to exhaust their timeouts + async.Wait() +} + +func TestBackoffRace(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + backOff := &retry.ExponentialBackOff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + retry.Until(ctx, backOff, func(ctx context.Context, att int) error { + t.Logf("Attempts: %d", backOff.NumRetries()) + return fmt.Errorf("failed attempt '%d'", att) + }) + wg.Done() + }() + } + wg.Wait() +} + +func TestBackOffNew(t *testing.T) { + backOff := &retry.ExponentialBackOff{ + Min: time.Millisecond, + Max: time.Millisecond * 100, + Factor: 2, + } + bo := backOff.New() + assert.Equal(t, bo, backOff) +} diff --git a/setter/setter.go b/setter/setter.go index 80ca5aef..712c5b30 100644 --- a/setter/setter.go +++ b/setter/setter.go @@ -113,3 +113,16 @@ func IsZeroValue(value reflect.Value) bool { } return false } + +// Returns true if the interface value is nil or the pointed to value is nil, +// for instance a map, array, channel or slice +func IsNil(i interface{}) bool { + if i == nil { + return true + } + switch reflect.TypeOf(i).Kind() { + case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice: + return reflect.ValueOf(i).IsNil() + } + return false +} diff --git a/setter/setter_test.go b/setter/setter_test.go index 563f16d7..181a0553 100644 --- a/setter/setter_test.go +++ b/setter/setter_test.go @@ -106,3 +106,42 @@ func TestIfEmptyNonPtrPanic(t *testing.T) { setter.SetDefault(thing, "thrawn") assert.Fail(t, "Should have caught panic") } + +type MyInterface interface { + Thing() string +} + +type MyImplementation struct{} + +func (s *MyImplementation) Thing() string { + return "thing" +} + +func NewImplementation() MyInterface { + // Type and Value are not nil + var p *MyImplementation = nil + return p +} + +type MyStruct struct { + T MyInterface +} + +func NewMyStruct(t MyInterface) *MyStruct { + return &MyStruct{T: t} +} + +func TestIsNil(t *testing.T) { + m := MyStruct{T: &MyImplementation{}} + assert.True(t, m.T != nil) + m.T = nil + assert.True(t, m.T == nil) + + o := NewMyStruct(nil) + assert.True(t, o.T == nil) + + thing := NewImplementation() + assert.False(t, thing == nil) + assert.True(t, setter.IsNil(thing)) + assert.False(t, setter.IsNil(&MyImplementation{})) +}