Skip to content

Commit fdba276

Browse files
committed
feat(payments): implement retry backoff in scheduler when retryable
1 parent 3a80032 commit fdba276

File tree

1 file changed

+44
-3
lines changed
  • components/payments/cmd/connectors/internal/task

1 file changed

+44
-3
lines changed

components/payments/cmd/connectors/internal/task/scheduler.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math"
78
"runtime/debug"
89
"sync"
910
"time"
@@ -24,6 +25,10 @@ var (
2425
ErrValidation = errors.New("validation error")
2526
ErrAlreadyScheduled = errors.New("already scheduled")
2627
ErrUnableToResolve = errors.New("unable to resolve task")
28+
29+
initialInterval = 500 * time.Millisecond
30+
backoffCoefficient = 2.0
31+
maximumInterval = 120 * time.Second
2732
)
2833

2934
type Scheduler interface {
@@ -455,7 +460,6 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
455460
ctx,
456461
logger,
457462
holder,
458-
descriptor,
459463
options,
460464
taskResolver,
461465
container,
@@ -568,6 +572,7 @@ func (s *DefaultTaskScheduler) runTaskOnce(
568572
return err
569573
}
570574

575+
retries := 0
571576
loop:
572577
for {
573578
select {
@@ -581,7 +586,22 @@ loop:
581586
case err == nil:
582587
break loop
583588
case errors.Is(err, ErrRetryable):
584-
logger.Infof("Task terminated with retryable error: %s", err)
589+
retries++
590+
backoffInterval := time.Duration(float64(initialInterval) * math.Pow(backoffCoefficient, float64(retries-1)))
591+
if backoffInterval > maximumInterval {
592+
backoffInterval = maximumInterval
593+
}
594+
595+
timer := time.NewTimer(backoffInterval)
596+
logger.Infof("Task terminated with retryable error: %s, retrying in %s", err, backoffInterval)
597+
select {
598+
case <-ctx.Done():
599+
timer.Stop()
600+
return
601+
case <-timer.C:
602+
timer.Stop()
603+
}
604+
585605
continue
586606
case errors.Is(err, ErrNonRetryable):
587607
logger.Infof("Task terminated with non retryable error: %s", err)
@@ -621,7 +641,6 @@ func (s *DefaultTaskScheduler) runTaskPeriodically(
621641
ctx context.Context,
622642
logger logging.Logger,
623643
holder *taskHolder,
624-
descriptor models.TaskDescriptor,
625644
options models.TaskSchedulerOptions,
626645
taskResolver Task,
627646
container *dig.Container,
@@ -679,12 +698,34 @@ func (s *DefaultTaskScheduler) runTaskPeriodically(
679698

680699
logger.Infof("Starting task...")
681700
ticker := time.NewTicker(options.Duration)
701+
retries := 0
682702
for {
683703
stopped, err := processFunc()
684704
switch {
685705
case err == nil:
686706
// Doing nothing, waiting for the next tick
687707
case errors.Is(err, ErrRetryable):
708+
retries++
709+
backoffInterval := time.Duration(float64(initialInterval) * math.Pow(backoffCoefficient, float64(retries-1)))
710+
if backoffInterval > maximumInterval {
711+
backoffInterval = maximumInterval
712+
}
713+
714+
timer := time.NewTimer(backoffInterval)
715+
logger.Infof("Task terminated with retryable error: %s, retrying in %s", err, backoffInterval)
716+
select {
717+
case <-ctx.Done():
718+
timer.Stop()
719+
return
720+
case ch := <-holder.stopChan:
721+
logger.Infof("Stopping task...")
722+
close(ch)
723+
timer.Stop()
724+
return
725+
case <-timer.C:
726+
timer.Stop()
727+
}
728+
688729
ticker.Reset(options.Duration)
689730
continue
690731
case errors.Is(err, ErrNonRetryable):

0 commit comments

Comments
 (0)