From 905ab09254731d1aa70218b3c48f3d45bfe1a03c Mon Sep 17 00:00:00 2001 From: chenyanchen Date: Wed, 25 Oct 2023 18:41:13 +0800 Subject: [PATCH] feat: define Breaker and implement Google SRE circuit breaker. --- .golangci.yaml | 72 ++++++++++++ README.md | 44 +++++++- breaker.go | 9 ++ example/acceptableerror/breaker.go | 54 +++++++++ example/cmd/main.go | 29 +++++ example/fallback/fallbackbreaker.go | 46 ++++++++ example/go.mod | 12 ++ example/go.sum | 20 ++++ example/service.go | 17 +++ example/simple/breaker.go | 31 ++++++ example/telemetrybreaker/telemetrybreaker.go | 60 ++++++++++ go.mod | 3 + googlebreaker.go | 97 +++++++++++++++++ googlebreaker_test.go | 73 +++++++++++++ internal/rand/rand.go | 39 +++++++ internal/rand/rand_test.go | 13 +++ internal/rollingwindow/rollingwindow.go | 109 +++++++++++++++++++ internal/rollingwindow/rollingwindow_test.go | 93 ++++++++++++++++ 18 files changed, 819 insertions(+), 2 deletions(-) create mode 100644 .golangci.yaml create mode 100644 breaker.go create mode 100644 example/acceptableerror/breaker.go create mode 100644 example/cmd/main.go create mode 100644 example/fallback/fallbackbreaker.go create mode 100644 example/go.mod create mode 100644 example/go.sum create mode 100644 example/service.go create mode 100644 example/simple/breaker.go create mode 100644 example/telemetrybreaker/telemetrybreaker.go create mode 100644 go.mod create mode 100644 googlebreaker.go create mode 100644 googlebreaker_test.go create mode 100644 internal/rand/rand.go create mode 100644 internal/rand/rand_test.go create mode 100644 internal/rollingwindow/rollingwindow.go create mode 100644 internal/rollingwindow/rollingwindow_test.go diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..90e5eae --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,72 @@ +# Complete configurations: https://golangci-lint.run/usage/configuration/ + +linters: + enable: + - asasalint + - asciicheck + - bidichk + - bodyclose + - contextcheck + - durationcheck + - errcheck + - errname + - errorlint + - exportloopref + - gochecknoglobals + - gochecknoinits + - gocritic + - godot + - gofmt + - gofumpt + - goimports + - gomnd + - gosec + - gosimple + - govet + - ineffassign + - interfacer + - misspell + - nakedret + - nilerr + - nilnil + - noctx + - nolintlint + - prealloc + - predeclared + - promlinter + - reassign + - revive + - rowserrcheck + - sqlclosecheck + - staticcheck + - stylecheck + - tenv + - testableexamples + - thelper + - tparallel + - unconvert + - unparam + - unused + - usestdlibvars + - wastedassign + +linters-settings: + gosec: + excludes: + - G404 # Use of weak random number generator (math/rand instead of crypto/rand) + - G501 # Blocklisted import crypto/md5: weak cryptographic primitive + - G401 #Use of weak cryptographic primitive + + revive: + rules: + - name: unexported-return + disabled: true + +output: + sort-results: true + +issues: + exclude-rules: + - path: "_test\\.go" + linters: + - gochecknoglobals diff --git a/README.md b/README.md index 1483371..9409680 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,42 @@ -# breaker -Circuit Breaker +# What is this? + +Circuit Breaker in Go. + +# Why use it? + +A grace way to Handling Overload in client-side. + +# How does it work? + +There are only one implementation of Circuit Breaker, it is from [Google SRE](https://sre.google/sre-book/handling-overload). + +# How to use it? + +The abstract of Breaker interface is clear, it only cares about: + +- the dependency is available or not + +Not care about: + +- specific errors +- fallback strategies +- telemetry + +There are some examples to show how to use it: + +- Use Circuit Breaker to protect your service (e.g. [example/simple/breaker.go](example/simple/breaker.go)) +- Handle specific errors (e.g. [example/acceptableerror/breaker.go](example/acceptableerror/breaker.go)) +- Add fallback strategies (e.g. [example/fallback/breaker.go](example/fallback/breaker.go)) +- Add telemetry middleware (e.g. [example/telemetry/breaker.go](example/telemetry/breaker.go)) + +# Benchmark + +```bash +❯ go test -bench=. -benchmem +goos: darwin +goarch: arm64 +pkg: github.com/chenyanchen/breaker +BenchmarkGoogleBreaker_Do-8 5794507 249.1 ns/op 0 B/op 0 allocs/op +PASS +ok github.com/chenyanchen/breaker 1.658s +``` diff --git a/breaker.go b/breaker.go new file mode 100644 index 0000000..bd77ac2 --- /dev/null +++ b/breaker.go @@ -0,0 +1,9 @@ +package breaker + +import "errors" + +type Breaker interface { + Do(func() error) error +} + +var ErrServiceUnavailable = errors.New("circuit breaker is open") diff --git a/example/acceptableerror/breaker.go b/example/acceptableerror/breaker.go new file mode 100644 index 0000000..98c7228 --- /dev/null +++ b/example/acceptableerror/breaker.go @@ -0,0 +1,54 @@ +package acceptableerror + +import ( + "context" + "errors" + + "github.com/chenyanchen/breaker" + "github.com/chenyanchen/breaker/example" +) + +type breakerContentService struct { + breaker breaker.Breaker + + contentService example.ContentService +} + +func NewBreakerContentService(breaker breaker.Breaker, contentService example.ContentService) example.ContentService { + return &breakerContentService{ + breaker: breaker, + contentService: contentService, + } +} + +func (s *breakerContentService) GetContent(ctx context.Context, req *example.GetContentRequest) (*example.GetContentResponse, error) { + var resp *example.GetContentResponse + getContentFn := func() (err error) { + resp, err = s.contentService.GetContent(ctx, req) + return err + } + + // handle acceptable errors + getContentFn = handleAcceptableErrors(getContentFn, example.ErrContentNotFound) + + err := s.breaker.Do(getContentFn) + return resp, err +} + +func handleAcceptableErrors(fn func() error, acceptableErrors ...error) func() error { + return func() error { + err := fn() + if err == nil { + return nil + } + + for _, target := range acceptableErrors { + if errors.Is(err, target) { + // TODO: do something, like log + return nil + } + } + + return err + } +} diff --git a/example/cmd/main.go b/example/cmd/main.go new file mode 100644 index 0000000..5d0e008 --- /dev/null +++ b/example/cmd/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "context" + "fmt" + + "github.com/chenyanchen/breaker" + "github.com/chenyanchen/breaker/example" + "github.com/chenyanchen/breaker/example/simple" +) + +func main() { + contentService := &contentService{} + + breakerContentService := simple.NewBreakerContentService(breaker.NewGoogleBreaker(), contentService) + + response, err := breakerContentService.GetContent(context.Background(), &example.GetContentRequest{}) + if err != nil { + panic(err) + } + + fmt.Println("response:", response) +} + +type contentService struct{} + +func (*contentService) GetContent(context.Context, *example.GetContentRequest) (*example.GetContentResponse, error) { + return &example.GetContentResponse{}, nil +} diff --git a/example/fallback/fallbackbreaker.go b/example/fallback/fallbackbreaker.go new file mode 100644 index 0000000..4d61512 --- /dev/null +++ b/example/fallback/fallbackbreaker.go @@ -0,0 +1,46 @@ +package fallback + +import ( + "context" + + "github.com/chenyanchen/breaker" + "github.com/chenyanchen/breaker/example" +) + +type breakerContentService struct { + breaker breaker.Breaker + + contentService example.ContentService + + defaultContent *example.GetContentResponse +} + +func NewBreakerContentService( + breaker breaker.Breaker, + contentService example.ContentService, + defaultContent *example.GetContentResponse, +) example.ContentService { + return &breakerContentService{ + breaker: breaker, + contentService: contentService, + defaultContent: defaultContent, + } +} + +func (s *breakerContentService) GetContent(ctx context.Context, req *example.GetContentRequest) (*example.GetContentResponse, error) { + var resp *example.GetContentResponse + err := s.breaker.Do(func() (err error) { + resp, err = s.contentService.GetContent(ctx, req) + return err + }) + if err == nil { + return resp, nil + } + + // do fallback strategy + if s.defaultContent != nil { + return s.defaultContent, nil + } + + return resp, err +} diff --git a/example/go.mod b/example/go.mod new file mode 100644 index 0000000..65d1bec --- /dev/null +++ b/example/go.mod @@ -0,0 +1,12 @@ +module github.com/chenyanchen/breaker/example + +go 1.21.3 + +require ( + github.com/chenyanchen/breaker v0.0.1 + go.opentelemetry.io/otel/metric v1.19.0 +) + +require go.opentelemetry.io/otel v1.19.0 // indirect + +replace github.com/chenyanchen/breaker => ../ diff --git a/example/go.sum b/example/go.sum new file mode 100644 index 0000000..b6d436b --- /dev/null +++ b/example/go.sum @@ -0,0 +1,20 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/example/service.go b/example/service.go new file mode 100644 index 0000000..1cf1bb6 --- /dev/null +++ b/example/service.go @@ -0,0 +1,17 @@ +package example + +import ( + "context" + "errors" +) + +type ContentService interface { + GetContent(ctx context.Context, req *GetContentRequest) (*GetContentResponse, error) +} + +type ( + GetContentRequest struct{} + GetContentResponse struct{} +) + +var ErrContentNotFound = errors.New("content not found") diff --git a/example/simple/breaker.go b/example/simple/breaker.go new file mode 100644 index 0000000..7c3c357 --- /dev/null +++ b/example/simple/breaker.go @@ -0,0 +1,31 @@ +package simple + +import ( + "context" + + "github.com/chenyanchen/breaker" + + "github.com/chenyanchen/breaker/example" +) + +type breakerContentService struct { + breaker breaker.Breaker + + contentService example.ContentService +} + +func NewBreakerContentService(breaker breaker.Breaker, contentService example.ContentService) example.ContentService { + return &breakerContentService{ + breaker: breaker, + contentService: contentService, + } +} + +func (s *breakerContentService) GetContent(ctx context.Context, req *example.GetContentRequest) (*example.GetContentResponse, error) { + var resp *example.GetContentResponse + err := s.breaker.Do(func() (err error) { + resp, err = s.contentService.GetContent(ctx, req) + return err + }) + return resp, err +} diff --git a/example/telemetrybreaker/telemetrybreaker.go b/example/telemetrybreaker/telemetrybreaker.go new file mode 100644 index 0000000..45ada60 --- /dev/null +++ b/example/telemetrybreaker/telemetrybreaker.go @@ -0,0 +1,60 @@ +package telemetrybreaker + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/chenyanchen/breaker" +) + +type telemetryBreaker struct { + successCounter metric.Int64Counter + dropCounter metric.Int64Counter + failureCounter metric.Int64Counter + + breaker breaker.Breaker +} + +func NewTelemetryBreaker(breaker breaker.Breaker, meter metric.Meter) (breaker.Breaker, error) { + successCounter, err := meter.Int64Counter("breaker.success") + if err != nil { + return nil, fmt.Errorf("failed to create success counter: %w", err) + } + dropCounter, err := meter.Int64Counter("breaker.drop") + if err != nil { + return nil, fmt.Errorf("failed to create drop counter: %w", err) + } + failureCounter, err := meter.Int64Counter("breaker.failure") + if err != nil { + return nil, fmt.Errorf("failed to create failure counter: %w", err) + } + + return &telemetryBreaker{ + successCounter: successCounter, + dropCounter: dropCounter, + failureCounter: failureCounter, + breaker: breaker, + }, nil +} + +func (b *telemetryBreaker) Do(f func() error) error { + err := b.breaker.Do(f) + + ctx := context.Background() + + if err == nil { + b.successCounter.Add(ctx, 1) + return nil + } + + if errors.Is(err, breaker.ErrServiceUnavailable) { + b.dropCounter.Add(ctx, 1) + return err + } + + b.failureCounter.Add(ctx, 1) + return err +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4223737 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/chenyanchen/breaker + +go 1.21 diff --git a/googlebreaker.go b/googlebreaker.go new file mode 100644 index 0000000..446fb01 --- /dev/null +++ b/googlebreaker.go @@ -0,0 +1,97 @@ +package breaker + +import ( + "math/rand" + "time" + + "github.com/chenyanchen/breaker/internal/rollingwindow" +) + +const ( + defaultK = 1.5 + defaultSize = 20 + defaultInterval = time.Millisecond * 500 +) + +type googleBreaker struct { + rand *rand.Rand + + k float64 + + stat *rollingwindow.RollingWindow +} + +type Option func(*googleBreaker) + +func WithK(k float64) Option { + return func(b *googleBreaker) { b.k = k } +} + +func WithWindow(size int, interval time.Duration) Option { + return func(b *googleBreaker) { + b.stat = rollingwindow.NewRollingWindow(size, interval) + } +} + +func NewGoogleBreaker(opts ...Option) *googleBreaker { + b := &googleBreaker{ + k: defaultK, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + stat: rollingwindow.NewRollingWindow(defaultSize, defaultInterval), + } + + for _, opt := range opts { + opt(b) + } + + return b +} + +func (b *googleBreaker) Do(f func() error) error { + if err := b.accept(); err != nil { + return err + } + + defer func() { + if v := recover(); v != nil { + b.markFailure() + panic(v) + } + }() + + err := f() + if err != nil { + b.markFailure() + } else { + b.markSuccess() + } + + return err +} + +func (b *googleBreaker) accept() error { + accepts, requests := b.history() + + // https://sre.google/sre-book/handling-overload/#eq2101 + dropRatio := (requests - b.k*accepts) / (requests + 1) + if dropRatio <= 0 { + return nil + } + + if b.rand.Float64() < dropRatio { + return ErrServiceUnavailable + } + + return nil +} + +func (b *googleBreaker) markSuccess() { b.stat.Add(1) } +func (b *googleBreaker) markFailure() { b.stat.Add(0) } + +func (b *googleBreaker) history() (accepts, requests float64) { + b.stat.Reduce(func(b *rollingwindow.Bucket) { + accepts += b.Value + requests += b.Count + }) + return +} diff --git a/googlebreaker_test.go b/googlebreaker_test.go new file mode 100644 index 0000000..5723e95 --- /dev/null +++ b/googlebreaker_test.go @@ -0,0 +1,73 @@ +package breaker + +import ( + "errors" + "math/rand" + "testing" +) + +var errTest = errors.New("test error") + +func Test_googleBreaker_Do(t *testing.T) { + type args struct { + f func() error + } + tests := []struct { + name string + breakerCreateFn func() *googleBreaker + args args + wantErr error + }{ + { + name: "inner error", + breakerCreateFn: func() *googleBreaker { return NewGoogleBreaker() }, + args: args{func() error { return errTest }}, + wantErr: errTest, + }, { + name: "0% drop ratio", + breakerCreateFn: func() *googleBreaker { + breaker := NewGoogleBreaker() + for i := 0; i < 100; i++ { + _ = breaker.Do(func() error { return nil }) + } + return breaker + }, + args: args{f: func() error { return nil }}, + wantErr: nil, + }, { + // This case is not 100% accurate, but it is enough to prove that the drop ratio is close to 99%. + name: "close 99% drop ratio", + breakerCreateFn: func() *googleBreaker { + breaker := NewGoogleBreaker(WithK(0.5)) + for i := 0; i < 100; i++ { + _ = breaker.Do(func() error { return errTest }) + } + return breaker + }, + args: args{f: func() error { return nil }}, + wantErr: ErrServiceUnavailable, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := tt.breakerCreateFn() + if err := b.Do(tt.args.f); !errors.Is(err, tt.wantErr) { + t.Errorf("Do() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func BenchmarkGoogleBreaker_Do(b *testing.B) { + breaker := NewGoogleBreaker() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = breaker.Do(func() error { + if rand.Float64() > 0.5 { + return errTest + } + return nil + }) + } + }) +} diff --git a/internal/rand/rand.go b/internal/rand/rand.go new file mode 100644 index 0000000..22d30fb --- /dev/null +++ b/internal/rand/rand.go @@ -0,0 +1,39 @@ +package rand + +import ( + "math/rand" + "sync" +) + +// Why not use rand.Int63() and others functions from rand.*? +// Cause the rand.globalRand use a global lock in rand.lockedSource, +// that may have a performance impact in a big project. + +// Why not use rand.NewSource()? +// Cause math.NewSource are not safe for concurrent use by multiple goroutines. +// See: https://github.com/golang/go/blob/master/src/math/rand/rand.go#L47 + +// lockedSource is a rand.Source implementation that is safe for concurrent +// use by multiple goroutines. +// The code is partial copied from rand.lockedSource. +type lockedSource struct { + lk sync.Mutex + s rand.Source +} + +func NewLockSource() *lockedSource { + return &lockedSource{s: rand.NewSource(rand.Int63())} +} + +func (r *lockedSource) Int63() (n int64) { + r.lk.Lock() + n = r.s.Int63() + r.lk.Unlock() + return n +} + +func (r *lockedSource) Seed(seed int64) { + r.lk.Lock() + r.s.Seed(seed) + r.lk.Unlock() +} diff --git a/internal/rand/rand_test.go b/internal/rand/rand_test.go new file mode 100644 index 0000000..45555bf --- /dev/null +++ b/internal/rand/rand_test.go @@ -0,0 +1,13 @@ +package rand + +import "testing" + +func BenchmarkLockedSource_Int63(b *testing.B) { + source := NewLockSource() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // lockedSource is thread-safe, should not panic. + _ = source.Int63() + } + }) +} diff --git a/internal/rollingwindow/rollingwindow.go b/internal/rollingwindow/rollingwindow.go new file mode 100644 index 0000000..ccf75a9 --- /dev/null +++ b/internal/rollingwindow/rollingwindow.go @@ -0,0 +1,109 @@ +package rollingwindow + +import ( + "sync" + "time" +) + +// RollingWindow defines a thread-safe rolling window to calculate +// the events in buckets with time interval. +type RollingWindow struct { + lock sync.RWMutex + + // window size + size int + + // bucket time interval + interval time.Duration + + // current bucket offset + offset int + + buckets []*Bucket + + // last update time + lastTime time.Time +} + +// NewRollingWindow returns a RollingWindow that with size buckets and time interval. +func NewRollingWindow(size int, interval time.Duration) *RollingWindow { + if size < 1 { + panic("size must be greater than 0") + } + + w := &RollingWindow{ + size: size, + interval: interval, + buckets: make([]*Bucket, size), + lastTime: time.Now(), + } + + for i := range w.buckets { + w.buckets[i] = &Bucket{} + } + + return w +} + +func (w *RollingWindow) Add(v float64) { + w.lock.Lock() + defer w.lock.Unlock() + + w.updateOffset() + + // Add value to current Bucket. + w.buckets[w.offset].Value += v + w.buckets[w.offset].Count++ +} + +// updateOffset updates the offset of current bucket. +func (w *RollingWindow) updateOffset() { + // Calculate window span. + span := w.span() + if span <= 0 { + return + } + + if span > w.size { + span = w.size + } + + // Reset expired buckets. + for i := 0; i < span; i++ { + w.buckets[(w.offset+i)%w.size].Reset() + } + + // Move offset. + w.offset = (w.offset + span) % w.size + + // Update last update time. + w.lastTime = time.Now().Truncate(w.interval) +} + +func (w *RollingWindow) Reduce(fn func(bucket *Bucket)) { + w.lock.RLock() + defer w.lock.RUnlock() + + span := w.span() + if span >= w.size { + return + } + + for i := 0; i < w.size-span; i++ { + bucket := w.buckets[(w.offset+span+i)%w.size] + fn(bucket) + } +} + +func (w *RollingWindow) span() int { + return int(time.Since(w.lastTime) / w.interval) +} + +type Bucket struct { + Value float64 + Count float64 +} + +func (b *Bucket) Reset() { + b.Value, b.Count = 0, 0 +} diff --git a/internal/rollingwindow/rollingwindow_test.go b/internal/rollingwindow/rollingwindow_test.go new file mode 100644 index 0000000..ff595c6 --- /dev/null +++ b/internal/rollingwindow/rollingwindow_test.go @@ -0,0 +1,93 @@ +package rollingwindow + +import ( + "testing" + "time" +) + +const span = time.Millisecond * 10 + +func TestRollingWindow_Reduce(t *testing.T) { + tests := []struct { + name string + windowCreateFn func() *RollingWindow + wantCount float64 + wantSum float64 + }{ + { + name: "all buckets are valid", + windowCreateFn: func() *RollingWindow { + rollingWindow := NewRollingWindow(2, span) + rollingWindow.Add(1 << 0) + rollingWindow.Add(1 << 1) + return rollingWindow + }, + wantCount: 2, + wantSum: 3, + }, { + name: "all buckets are invalid", + windowCreateFn: func() *RollingWindow { + rollingWindow := NewRollingWindow(2, span) + rollingWindow.Add(1 << 0) + rollingWindow.Add(1 << 1) + time.Sleep(span) + return rollingWindow + }, + wantCount: 0, + wantSum: 0, + }, { + name: "case 3", + windowCreateFn: func() *RollingWindow { + rollingWindow := NewRollingWindow(2, span) + rollingWindow.Add(1 << 0) + time.Sleep(span) + rollingWindow.Add(1 << 1) + return rollingWindow + }, + wantCount: 1, + wantSum: 2, + }, { + name: "expire all buckets and add new buckets", + windowCreateFn: func() *RollingWindow { + rollingWindow := NewRollingWindow(2, span) + rollingWindow.Add(1 << 0) + time.Sleep(span) + rollingWindow.Add(1 << 1) + time.Sleep(span * 3) + rollingWindow.Add(1 << 2) + return rollingWindow + }, + wantCount: 1, + wantSum: 4, + }, { + name: "reduce all expired buckets", + windowCreateFn: func() *RollingWindow { + rollingWindow := NewRollingWindow(2, span) + rollingWindow.Add(1 << 0) + time.Sleep(span) + rollingWindow.Add(1 << 1) + time.Sleep(span * 3) + return rollingWindow + }, + wantCount: 0, + wantSum: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rollingWindow := tt.windowCreateFn() + var sum, count float64 + rollingWindow.Reduce(func(bucket *Bucket) { + sum += bucket.Value + count += bucket.Count + }) + if count != tt.wantCount { + t.Errorf("Reduce() count = %v, wantCount %v", count, tt.wantCount) + return + } + if sum != tt.wantSum { + t.Errorf("Reduce() sum = %v, wantSum %v", sum, tt.wantSum) + } + }) + } +}