diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index fb6497fdc..a97135658 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -233,11 +233,9 @@ func startIngestion( logger, collector, ) - const retries = 15 - restartableEventEngine := models.NewRestartableEngine(eventEngine, retries, logger) go func() { - err = restartableEventEngine.Run(ctx) + err = eventEngine.Run(ctx) if err != nil { logger.Error().Err(err).Msg("event ingestion engine failed to run") panic(err) @@ -245,7 +243,7 @@ func startIngestion( }() // wait for ingestion engines to be ready - <-restartableEventEngine.Ready() + <-eventEngine.Ready() logger.Info().Msg("ingestion start up successful") return nil diff --git a/models/engine.go b/models/engine.go index 2fcd685c5..4e2ade6cf 100644 --- a/models/engine.go +++ b/models/engine.go @@ -2,13 +2,6 @@ package models import ( "context" - "errors" - "fmt" - "time" - - "github.com/rs/zerolog" - - errs "github.com/onflow/flow-evm-gateway/models/errors" ) // Engine defines a processing unit @@ -23,86 +16,6 @@ type Engine interface { Ready() <-chan struct{} } -var _ Engine = &RestartableEngine{} - -func NewRestartableEngine(engine Engine, retries uint, logger zerolog.Logger) *RestartableEngine { - // build a Fibonacci sequence, we could support more strategies in future - // we use 0 as first run shouldn't be delayed - backoff := []time.Duration{0, time.Second, time.Second} - for i := len(backoff); i < int(retries); i++ { - backoff = append(backoff, backoff[i-2]+backoff[i-1]) - } - if int(retries) < len(backoff) { - backoff = backoff[0:retries] - } - - logger = logger.With().Str("component", "restartable-engine").Logger() - - return &RestartableEngine{ - engine: engine, - backoff: backoff, - logger: logger, - } -} - -// RestartableEngine is an engine wrapper that tries to restart -// the engine in case of starting errors. -// -// The strategy of the restarts contains Fibonacci backoff time and -// limited number of retries that can be configured. -// Here are backoff values for different retries provided: -// 1s 1s 2s 3s 5s 8s 13s 21s 34s 55s 1m29s 2m24s 3m53s 6m17s 10m10s 16m27s 26m37s 43m4s 1h9m41s -type RestartableEngine struct { - logger zerolog.Logger - engine Engine - backoff []time.Duration -} - -func (r *RestartableEngine) Stop() { - r.engine.Stop() -} - -func (r *RestartableEngine) Done() <-chan struct{} { - return r.engine.Done() -} - -func (r *RestartableEngine) Ready() <-chan struct{} { - return r.engine.Ready() -} - -func (r *RestartableEngine) Run(ctx context.Context) error { - var err error - for i, b := range r.backoff { - select { - case <-time.After(b): // wait for the backoff duration - if b > 0 { - r.logger.Warn().Msg("restarting the engine now") - } - case <-ctx.Done(): - // todo should we return the error if context is canceled? - r.logger.Warn().Msg("context cancelled, stopping the engine") - return ctx.Err() - } - - err = r.engine.Run(ctx) - if err == nil { - // don't restart if no error is returned, normal after stop procedure is done - return nil - } - if !errors.Is(err, errs.ErrRecoverable) { - r.logger.Error().Err(err).Msg("received unrecoverable error") - // if error is not recoverable just die - return err - } - - r.logger.Error().Err(err).Msg(fmt.Sprintf("received recoverable error, restarting for the %d time after backoff time", i)) - } - - r.logger.Error().Msg("failed to recover and restart the engine, stop retrying") - // if after retries we still get an error it's time to stop - return err -} - type EngineStatus struct { done chan struct{} ready chan struct{} diff --git a/models/engine_test.go b/models/engine_test.go deleted file mode 100644 index 3bf4eb077..000000000 --- a/models/engine_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package models - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - errs "github.com/onflow/flow-evm-gateway/models/errors" - "github.com/onflow/flow-evm-gateway/models/mocks" -) - -func Test_RestartableEngine(t *testing.T) { - t.Parallel() - - t.Run("shouldn't restart if nil is returned", func(t *testing.T) { - t.Parallel() - mockEngine := mocks.NewEngine(t) - mockEngine. - On("Run", mock.Anything). - Return(func(ctx context.Context) error { - return nil - }). - Once() - - r := NewRestartableEngine(mockEngine, 5, zerolog.New(zerolog.NewTestWriter(t))) - err := r.Run(context.Background()) - require.NoError(t, err) - }) - - t.Run("shouldn't restart if non-recoverable error is returned", func(t *testing.T) { - t.Parallel() - retErr := fmt.Errorf("non-receoverable error") - mockEngine := mocks.NewEngine(t) - mockEngine. - On("Run", mock.Anything). - Return(func(ctx context.Context) error { - return retErr - }). - Once() - - r := NewRestartableEngine(mockEngine, 5, zerolog.New(zerolog.NewTestWriter(t))) - err := r.Run(context.Background()) - require.EqualError(t, retErr, err.Error()) - }) - - t.Run("should restart when recoverable error is returned and then return the error after retries", func(t *testing.T) { - t.Parallel() - retries := uint(5) - prevTime := time.Now() - prevDiff := time.Duration(0) - - mockEngine := mocks.NewEngine(t) - mockEngine. - On("Run", mock.Anything). - Return(func(ctx context.Context) error { - curDiff := time.Since(prevTime) - // make sure time diff increases with each retry - assert.True(t, prevDiff < curDiff) - prevDiff = curDiff - return errs.ErrDisconnected - }). - Times(int(retries)) - - r := NewRestartableEngine(mockEngine, retries, zerolog.New(zerolog.NewTestWriter(t))) - err := r.Run(context.Background()) - require.EqualError(t, errs.ErrDisconnected, err.Error()) - }) - - t.Run("should restart when recoverable error is returned but then return nil after error is no longer returned", func(t *testing.T) { - t.Parallel() - mockEngine := mocks.NewEngine(t) - mockEngine. - On("Run", mock.Anything). - Return(func(ctx context.Context) error { - mockEngine. - On("Run", mock.Anything). - Return(func(ctx context.Context) error { - return nil - }). - Once() - - return errs.ErrDisconnected - }). - Once() - - r := NewRestartableEngine(mockEngine, 5, zerolog.New(zerolog.NewTestWriter(t))) - err := r.Run(context.Background()) - require.NoError(t, err) - }) -}