Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(payments): implement retry backoff in scheduler when retryable #1747

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions components/payments/cmd/connectors/internal/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"runtime/debug"
"sync"
"time"
Expand All @@ -24,6 +25,10 @@ var (
ErrValidation = errors.New("validation error")
ErrAlreadyScheduled = errors.New("already scheduled")
ErrUnableToResolve = errors.New("unable to resolve task")

initialInterval = 500 * time.Millisecond
backoffCoefficient = 2.0
maximumInterval = 120 * time.Second
)

type Scheduler interface {
Expand Down Expand Up @@ -455,7 +460,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
ctx,
logger,
holder,
descriptor,
options,
taskResolver,
container,
Expand Down Expand Up @@ -568,6 +572,7 @@ func (s *DefaultTaskScheduler) runTaskOnce(
return err
}

retries := 0
loop:
for {
select {
Expand All @@ -581,7 +586,22 @@ loop:
case err == nil:
break loop
case errors.Is(err, ErrRetryable):
logger.Infof("Task terminated with retryable error: %s", err)
retries++
backoffInterval := time.Duration(float64(initialInterval) * math.Pow(backoffCoefficient, float64(retries-1)))
if backoffInterval > maximumInterval {
backoffInterval = maximumInterval
}

timer := time.NewTimer(backoffInterval)
logger.Infof("Task terminated with retryable error: %s, retrying in %s", err, backoffInterval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
timer.Stop()
}

continue
case errors.Is(err, ErrNonRetryable):
logger.Infof("Task terminated with non retryable error: %s", err)
Expand Down Expand Up @@ -621,7 +641,6 @@ func (s *DefaultTaskScheduler) runTaskPeriodically(
ctx context.Context,
logger logging.Logger,
holder *taskHolder,
descriptor models.TaskDescriptor,
options models.TaskSchedulerOptions,
taskResolver Task,
container *dig.Container,
Expand Down Expand Up @@ -679,12 +698,34 @@ func (s *DefaultTaskScheduler) runTaskPeriodically(

logger.Infof("Starting task...")
ticker := time.NewTicker(options.Duration)
retries := 0
for {
stopped, err := processFunc()
switch {
case err == nil:
// Doing nothing, waiting for the next tick
case errors.Is(err, ErrRetryable):
retries++
backoffInterval := time.Duration(float64(initialInterval) * math.Pow(backoffCoefficient, float64(retries-1)))
if backoffInterval > maximumInterval {
backoffInterval = maximumInterval
}

timer := time.NewTimer(backoffInterval)
logger.Infof("Task terminated with retryable error: %s, retrying in %s", err, backoffInterval)
select {
case <-ctx.Done():
timer.Stop()
return
case ch := <-holder.stopChan:
logger.Infof("Stopping task...")
close(ch)
timer.Stop()
return
case <-timer.C:
timer.Stop()
}

ticker.Reset(options.Duration)
continue
case errors.Is(err, ErrNonRetryable):
Expand Down
Loading