-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbreaker.go
380 lines (336 loc) · 9.02 KB
/
breaker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
// SPDX-License-Identifier: MIT
//
// Copyright 2023 Andrew Bursavich. All rights reserved.
// Use of this source code is governed by The MIT License
// which can be found in the LICENSE file.
// Package circuit provides a circuit breaker implementation.
package circuit
import (
"fmt"
"sync"
"sync/atomic"
"time"
"bursavich.dev/fastrand"
"github.com/go-logr/logr"
)
// Default values.
const (
DefaultWindow = 10 * time.Second
DefaultCooldown = 5 * time.Second
DefaultSpacing = 2 * time.Second
DefaultJitterFactor = 0.25
DefaultMinSuccess = 3
DefaultMinResults = 5
DefaultFailureRatio = 0.5
)
// State is the state of a circuit breaker.
type State int
//go:generate stringer -type=State
const (
Closed State = iota // Closed allows all operations.
HalfOpen // HalfOpen allows a probing operation.
Open // Open disallows all operations.
)
// An Observer observes changes to circuit breakers.
type Observer interface {
// ObserverStateChange is a called in a critical section
// when the state of the circuit breaker changes.
ObserveStateChange(state State)
}
// A NotifyFunc is a function that is called in a critical section
// when the state of a circuit breaker changes.
type NotifyFunc func(state State)
type notifyFunc func(state State)
func (fn notifyFunc) ObserveStateChange(state State) { fn(state) }
// An Option provides an override to defaults.
type Option func(*Breaker) error
// WithLogger returns an Option that specifies the logger.
func WithLogger(log logr.Logger) Option {
return func(b *Breaker) error {
b.log = log
return nil
}
}
// WithObserver returns an Option that adds an Observer.
func WithObserver(observer Observer) Option {
return func(b *Breaker) error {
b.observers = append(b.observers, observer)
return nil
}
}
// WithNotifyFunc returns an Option that adds a function to be called
// in a critical section when the State of the circuit breaker changes.
func WithNotifyFunc(notify NotifyFunc) Option {
return func(b *Breaker) error {
b.observers = append(b.observers, notifyFunc(notify))
return nil
}
}
// WithWindow returns an Option that sets the window of time after
// which results are reset when the circuit breaker is Closed.
// The default is 10s.
func WithWindow(window time.Duration) Option {
return func(b *Breaker) error {
if window <= 0 {
return fmt.Errorf("invalid window: %v", window)
}
b.window = window
return nil
}
}
// WithMinResults returns an Option that sets the minimum number of results
// in a window required to switch the circuit breaker from Closed to Open.
// The default is 5.
func WithMinResults(min int) Option {
return func(b *Breaker) error {
if min <= 0 {
return fmt.Errorf("invalid minimum results: %v", min)
}
b.minResults = min
return nil
}
}
// WithFailureRatio returns an Option that sets the minimum failure ratio
// in a window required to switch the circuit breaker from Closed to Open.
// The default is 50%.
func WithFailureRatio(ratio float64) Option {
return func(b *Breaker) error {
if ratio <= 0 || ratio > 1 {
return fmt.Errorf("invalid failure ratio: %v", ratio)
}
b.failRatio = ratio
return nil
}
}
// WithCooldown returns an Option that sets the cooldown time before
// probes will be allowed when the circuit breaker is Open.
// The default is 5s.
func WithCooldown(cooldown time.Duration) Option {
return func(b *Breaker) error {
if cooldown <= 0 {
return fmt.Errorf("invalid cooldown: %v", cooldown)
}
b.cooldown = cooldown
return nil
}
}
// WithSpacing returns an Option that sets the spacing time between
// allowed probes when the circuit breaker is HalfOpen.
// The default is 2s.
func WithSpacing(spacing time.Duration) Option {
return func(b *Breaker) error {
if spacing <= 0 {
return fmt.Errorf("invalid spacing: %v", spacing)
}
b.spacing = spacing
return nil
}
}
// WithJitterFactor returns an Option that sets the random
// jitter factor applied to cooldown and spacing delays.
// The default is 25%.
func WithJitterFactor(jitter float64) Option {
return func(b *Breaker) error {
if jitter < 0 {
return fmt.Errorf("invalid jitter: %v", jitter)
}
b.jitter = jitter
return nil
}
}
// WithMinSuccess returns an Option that sets the minimum number of consecutive
// successful probes required to switch the circuit breaker from HalfOpen to Closed.
// The default is 3.
func WithMinSuccess(min int) Option {
return func(b *Breaker) error {
if min <= 0 {
return fmt.Errorf("invalid minimum success: %v", min)
}
b.minSuccess = min
return nil
}
}
// WithFilter returns an Option that specifies a function to filter
// expected error results that should not be counted as a failure
// (e.g. Canceled, InvalidArgument, NotFound, etc.).
func WithFilter(filter func(error) error) Option {
return func(b *Breaker) error {
b.filter = filter
return nil
}
}
// A Breaker is a circuit breaker.
type Breaker struct {
log logr.Logger
window time.Duration
cooldown time.Duration
spacing time.Duration
jitter float64
minResults int
minSuccess int
failRatio float64
filter func(error) error
mu sync.Mutex
state atomic.Int64
deadline atomic.Pointer[time.Time]
success atomic.Int64
failure atomic.Int64
observers []Observer
timeNow func() time.Time
}
// NewBreaker returns a circuit breaker with the given options.
func NewBreaker(options ...Option) (*Breaker, error) {
b := &Breaker{
log: logr.Discard(),
window: DefaultWindow,
cooldown: DefaultCooldown,
spacing: DefaultSpacing,
jitter: DefaultJitterFactor,
minResults: DefaultMinResults,
minSuccess: DefaultMinSuccess,
failRatio: DefaultFailureRatio,
timeNow: time.Now,
}
for _, fn := range options {
if err := fn(b); err != nil {
return nil, err
}
}
window := b.timeNow().Add(b.window)
b.deadline.Store(&window)
return b, nil
}
// State returns the current state of the circuit breaker.
func (b *Breaker) State() State { return State(b.state.Load()) }
func (b *Breaker) lockedStoreState(state State) {
b.log.Info("Circuit breaker state changed", "state", state)
b.state.Store(int64(state))
for _, o := range b.observers {
o.ObserveStateChange(state)
}
}
// AddNotifyFunc adds the notify function to be called in a critical section
// when the State of the circuit breaker changes.
func (b *Breaker) AddNotifyFunc(notify NotifyFunc) {
b.mu.Lock()
defer b.mu.Unlock()
b.observers = append(b.observers, notifyFunc(notify))
}
// Allow returns a value indicating if an operation is allowed.
// If the operation is allowed, its result MUST be recorded.
func (b *Breaker) Allow() bool {
if b.State() == Closed {
return true
}
return b.shouldProbe()
}
func (b *Breaker) shouldProbe() bool {
now := b.timeNow()
deadline := b.deadline.Load()
if !deadline.Before(now) {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
delay := b.spacing
state := b.State()
if state == Open {
delay = b.cooldown
}
then := now.Add(fastrand.Jitter(delay, b.jitter))
if !b.deadline.CompareAndSwap(deadline, &then) {
return false
}
if state == Open {
b.failure.Store(0)
b.success.Store(0)
b.lockedStoreState(HalfOpen)
}
return true
}
// Record records the error result of an allowed operation.
func (b *Breaker) Record(err error) {
if err != nil && b.filter != nil {
err = b.filter(err)
}
switch b.State() {
case Closed:
b.recordClosed(err)
case HalfOpen:
b.recordHalfOpen(err)
case Open:
// TODO: Record latent success?
}
}
func (b *Breaker) recordHalfOpen(err error) {
b.mu.Lock()
if b.State() != HalfOpen {
b.mu.Unlock()
b.Record(err)
return
}
defer b.mu.Unlock()
// Reopen
if err != nil {
cooldown := b.deadline.Load().Add(fastrand.Jitter(b.cooldown, b.jitter))
if now := b.timeNow(); cooldown.Before(now) {
cooldown = now
}
b.deadline.Store(&cooldown)
b.lockedStoreState(Open)
return
}
// Stay HalfOpen
if b.success.Add(1) < int64(b.minSuccess) {
return
}
// Close
window := b.timeNow().Add(b.window)
b.deadline.Store(&window)
b.failure.Store(0)
b.success.Store(0)
b.lockedStoreState(Closed)
}
func (b *Breaker) recordClosed(err error) {
now := b.timeNow()
// Rotate window.
if b.deadline.Load().Before(now) {
b.mu.Lock()
if b.State() != Closed {
b.mu.Unlock()
b.Record(err)
return
}
total := b.success.Load() + b.failure.Load()
if b.deadline.Load().Before(now) && total >= int64(b.minResults) {
b.failure.Store(0)
b.success.Store(0)
window := now.Add(b.window)
b.deadline.Store(&window)
}
b.mu.Unlock()
}
// Increment counters and check status.
var failure, success int64
if err != nil {
failure = b.failure.Add(1)
success = b.success.Load()
} else {
success = b.success.Add(1)
failure = b.failure.Load()
}
total := failure + success
ratio := float64(failure) / float64(total)
if total < int64(b.minResults) || ratio < b.failRatio {
return
}
// Open circuit.
b.mu.Lock()
defer b.mu.Unlock()
if b.State() != Closed {
return
}
cooldown := now.Add(fastrand.Jitter(b.cooldown, b.jitter))
b.deadline.Store(&cooldown)
b.lockedStoreState(Open)
}