diff --git a/components/payments/cmd/connectors/internal/task/scheduler.go b/components/payments/cmd/connectors/internal/task/scheduler.go index a4397fd1e6..0db94c7ad1 100644 --- a/components/payments/cmd/connectors/internal/task/scheduler.go +++ b/components/payments/cmd/connectors/internal/task/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math" "runtime/debug" "sync" "time" @@ -24,6 +25,10 @@ var ( ErrValidation = errors.New("validation error") ErrAlreadyScheduled = errors.New("already scheduled") ErrUnableToResolve = errors.New("unable to resolve task") + + initialInterval = 500 * time.Millisecond + backoffCoefficient = 2.0 + maximumInterval = 120 * time.Second ) type Scheduler interface { @@ -455,7 +460,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. ctx, logger, holder, - descriptor, options, taskResolver, container, @@ -568,6 +572,7 @@ func (s *DefaultTaskScheduler) runTaskOnce( return err } + retries := 0 loop: for { select { @@ -581,7 +586,22 @@ loop: case err == nil: break loop case errors.Is(err, ErrRetryable): - logger.Infof("Task terminated with retryable error: %s", err) + retries++ + backoffInterval := time.Duration(float64(initialInterval) * math.Pow(backoffCoefficient, float64(retries-1))) + if backoffInterval > maximumInterval { + backoffInterval = maximumInterval + } + + timer := time.NewTimer(backoffInterval) + logger.Infof("Task terminated with retryable error: %s, retrying in %s", err, backoffInterval) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + timer.Stop() + } + continue case errors.Is(err, ErrNonRetryable): logger.Infof("Task terminated with non retryable error: %s", err) @@ -621,7 +641,6 @@ func (s *DefaultTaskScheduler) runTaskPeriodically( ctx context.Context, logger logging.Logger, holder *taskHolder, - descriptor models.TaskDescriptor, options models.TaskSchedulerOptions, taskResolver Task, container *dig.Container, @@ -679,12 +698,34 @@ func (s *DefaultTaskScheduler) runTaskPeriodically( logger.Infof("Starting task...") ticker := time.NewTicker(options.Duration) + retries := 0 for { stopped, err := processFunc() switch { case err == nil: // Doing nothing, waiting for the next tick case errors.Is(err, ErrRetryable): + retries++ + backoffInterval := time.Duration(float64(initialInterval) * math.Pow(backoffCoefficient, float64(retries-1))) + if backoffInterval > maximumInterval { + backoffInterval = maximumInterval + } + + timer := time.NewTimer(backoffInterval) + logger.Infof("Task terminated with retryable error: %s, retrying in %s", err, backoffInterval) + select { + case <-ctx.Done(): + timer.Stop() + return + case ch := <-holder.stopChan: + logger.Infof("Stopping task...") + close(ch) + timer.Stop() + return + case <-timer.C: + timer.Stop() + } + ticker.Reset(options.Duration) continue case errors.Is(err, ErrNonRetryable):