Skip to content

Commit

Permalink
feat: use retry-go in scheduler runWorker method
Browse files Browse the repository at this point in the history
  • Loading branch information
ironman0x7b2 committed Dec 30, 2024
1 parent 9c44595 commit 92958a6
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace github.com/apernet/hysteria/core/v2 v2.4.5 => github.com/JimmyHuang454/h
require (
cosmossdk.io/log v1.4.1
cosmossdk.io/math v1.4.0
github.com/avast/retry-go/v4 v4.6.0
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816
github.com/cometbft/cometbft v0.37.13
github.com/cosmos/cosmos-sdk v0.47.15
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ github.com/apernet/quic-go v0.45.2-0.20240702221538-ed74cfbe8b6e h1:KBs8aBfKl5AK
github.com/apernet/quic-go v0.45.2-0.20240702221538-ed74cfbe8b6e/go.mod h1:MjGWpXA31DZZWESdX3/PjIpSWIT1fOm8FNCqyXXFZFU=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
33 changes: 16 additions & 17 deletions libs/cron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"sync"
"time"

"github.com/avast/retry-go/v4"
)

// Scheduler manages the scheduling and execution of workers.
Expand Down Expand Up @@ -87,26 +89,23 @@ func (s *Scheduler) runWorker(w Worker) {
s.wg.Done()
}()

for retries := 0; ; {
for {
// Attempt the worker's run function with retries
if err := retry.Do(
w.Run,
retry.Attempts(w.Retries()),
retry.Delay(w.RetryDelay()),
retry.OnRetry(w.OnRetry),
retry.LastErrorOnly(true),
); err != nil && w.OnError(err) {
return
}

// Sleep for the interval—or stop early if we receive a stopSignal
select {
case <-s.stopSignal:
return
default:
if err := w.Run(); err != nil {
if stop := w.OnError(err); stop {
return
}

retries++
if retries == w.MaxRetries() {
return
}
} else {
retries = 0

// Sleep before the next execution.
time.Sleep(w.Interval())
}
case <-time.After(w.Interval()):
}
}
}
77 changes: 55 additions & 22 deletions libs/cron/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (

// Worker defines the interface for a scheduler worker.
type Worker interface {
Interval() time.Duration // Returns the interval at which the worker should be run.
MaxRetries() int // Returns the maximum number of retry attempts for the worker.
Name() string // Returns the name of the worker.
OnError(err error) (stop bool) // Handles errors that occur during worker execution. Returns true to stop the worker, false otherwise.
OnExit() // Called when the worker stops to perform cleanup actions.
Run() error // Executes the worker and returns an error if it fails.
Interval() time.Duration // Returns the interval at which the worker should be run.
Name() string // Returns the name of the worker.
OnError(err error) (stop bool) // Handles errors that occur during worker execution. Returns true to stop the worker, false otherwise.
OnExit() // Called when the worker stops to perform cleanup actions.
OnRetry(attempt uint, err error) // Called after each failed attempt, passing the attempt count and error.
Retries() uint // Returns the number of retry attempts for the worker.
RetryDelay() time.Duration // Returns the delay between retries.
Run() error // Executes the worker and returns an error if it fails.
}

// Ensure BasicWorker implements the Worker interface.
Expand All @@ -21,18 +23,21 @@ var _ Worker = (*BasicWorker)(nil)
type BasicWorker struct {
handler func() error
interval time.Duration
maxRetries int
name string
onError func(error) bool
onExit func()
onRetry func(uint, error)
retries uint
retryDelay time.Duration
}

// NewBasicWorker creates a new BasicWorker with default settings.
func NewBasicWorker() *BasicWorker {
return &BasicWorker{
maxRetries: 5,
onError: func(error) bool { return false },
onExit: func() {},
onError: func(error) bool { return false },
onExit: func() {},
onRetry: func(uint, error) {},
retries: 1,
}
}

Expand All @@ -48,12 +53,6 @@ func (bw *BasicWorker) WithInterval(interval time.Duration) *BasicWorker {
return bw
}

// WithMaxRetries sets the maximum number of retry attempts for the worker.
func (bw *BasicWorker) WithMaxRetries(maxRetries int) *BasicWorker {
bw.maxRetries = maxRetries
return bw
}

// WithName sets the name of the worker.
func (bw *BasicWorker) WithName(name string) *BasicWorker {
bw.name = name
Expand All @@ -72,16 +71,29 @@ func (bw *BasicWorker) WithOnExit(onExit func()) *BasicWorker {
return bw
}

// WithOnRetry sets the function to be called after each failed attempt.
func (bw *BasicWorker) WithOnRetry(onRetry func(uint, error)) *BasicWorker {
bw.onRetry = onRetry
return bw
}

// WithRetries sets the number of retry attempts for the worker.
func (bw *BasicWorker) WithRetries(retries uint) *BasicWorker {
bw.retries = retries
return bw
}

// WithRetryDelay sets the delay between retries.
func (bw *BasicWorker) WithRetryDelay(delay time.Duration) *BasicWorker {
bw.retryDelay = delay
return bw
}

// Interval returns the interval at which the worker should be executed.
func (bw *BasicWorker) Interval() time.Duration {
return bw.interval
}

// MaxRetries returns the maximum number of retry attempts for the worker.
func (bw *BasicWorker) MaxRetries() int {
return bw.maxRetries
}

// Name returns the name of the worker.
func (bw *BasicWorker) Name() string {
return bw.name
Expand All @@ -103,7 +115,28 @@ func (bw *BasicWorker) OnExit() {
}
}

// OnRetry processes retry attempts for the worker.
func (bw *BasicWorker) OnRetry(attempt uint, err error) {
if bw.onRetry != nil {
bw.onRetry(attempt, err)
}
}

// Retries returns the number of retry attempts for the worker.
func (bw *BasicWorker) Retries() uint {
return bw.retries
}

// RetryDelay returns the delay between retries.
func (bw *BasicWorker) RetryDelay() time.Duration {
return bw.retryDelay
}

// Run executes the worker's handler function and returns any error encountered.
func (bw *BasicWorker) Run() error {
return bw.handler()
if bw.handler != nil {
return bw.handler()
}

return nil
}

0 comments on commit 92958a6

Please sign in to comment.