diff --git a/benchmark_test.go b/benchmark_test.go index 9492b30..24c1a90 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -1,7 +1,6 @@ package rill import ( - "sync" "testing" "time" @@ -9,9 +8,26 @@ import ( ) const benchmarkInputSize = 100000 -const benchmarkWorkDuration = 10 * time.Microsecond -func runBenchmark[B any](b *testing.B, name string, body func(in <-chan Try[int]) <-chan B) { +// code called on each benchmark iteration +func benchmarkIteration() { + busySleep(1 * time.Microsecond) + //time.Sleep(1 * time.Microsecond) + //busySleep(10 * time.Microsecond) + //time.Sleep(10 * time.Microsecond) +} + +func busySleep(d time.Duration) { + if d == 0 { + return + } + + start := time.Now() + for time.Since(start) < d { + } +} + +func runBenchmark(b *testing.B, name string, body func(in <-chan Try[int])) { b.Run(name, func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() @@ -21,11 +37,7 @@ func runBenchmark[B any](b *testing.B, name string, body func(in <-chan Try[int] go func() { defer close(done) - out := body(in) - - if out != nil { - Drain(out) - } + body(in) }() // Give body a some time to spawn goroutines @@ -46,136 +58,83 @@ func runBenchmark[B any](b *testing.B, name string, body func(in <-chan Try[int] }) } -func busySleep(d time.Duration) { - if d == 0 { - return - } - - start := time.Now() - for time.Since(start) < d { - } -} - -func BenchmarkBasicForLoop(b *testing.B) { - for i := 0; i < b.N; i++ { - for k := 0; k < benchmarkInputSize; k++ { - busySleep(benchmarkWorkDuration) - } - } -} - -func BenchmarkBasicForLoopWithSleep(b *testing.B) { - for i := 0; i < b.N; i++ { - for k := 0; k < benchmarkInputSize; k++ { - time.Sleep(benchmarkWorkDuration) - } - } -} - -func BenchmarkWaitGroup(b *testing.B) { - for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] { - var wg sync.WaitGroup - wg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer wg.Done() - for range in { - busySleep(benchmarkWorkDuration) - } - }() - } - wg.Wait() - return nil - }) - } -} - -func BenchmarkWaitGroupWithSleep(b *testing.B) { - for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] { - var wg sync.WaitGroup - wg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer wg.Done() - for range in { - time.Sleep(benchmarkWorkDuration) - } - }() - } - wg.Wait() - return nil - }) - } -} +// Benchmarks below are commented out to remove dependency on errgroup + +//// This benchmark uses classic goroutine-per-item + semaphore pattern. +//func BenchmarkErrGroupWithSetLimit(b *testing.B) { +// for _, n := range []int{1, 2, 4, 8} { +// runBenchmark(b, th.Name(n), func(in <-chan Try[int]) { +// var eg errgroup.Group +// eg.SetLimit(n) +// +// for x := range in { +// x := x +// eg.Go(func() error { +// if err := x.Error; err != nil { +// return err +// } +// benchmarkIteration() +// return nil +// }) +// } +// +// _ = eg.Wait() +// }) +// } +//} +// +//// This benchmark uses much less common worker pool pattern. +//func BenchmarkErrGroupWithWorkerPool(b *testing.B) { +// for _, n := range []int{1, 2, 4, 8} { +// runBenchmark(b, th.Name(n), func(in <-chan Try[int]) { +// var eg errgroup.Group +// for i := 0; i < n; i++ { +// eg.Go(func() error { +// for x := range in { +// if err := x.Error; err != nil { +// return err +// } +// benchmarkIteration() +// } +// return nil +// }) +// } +// _ = eg.Wait() +// }) +// } +//} func BenchmarkForEach(b *testing.B) { for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] { - ForEach(in, n, func(x int) error { - busySleep(benchmarkWorkDuration) + runBenchmark(b, th.Name(n), func(in <-chan Try[int]) { + _ = ForEach(in, n, func(x int) error { + benchmarkIteration() return nil }) - return nil }) } } -func BenchmarkForEachWithSleep(b *testing.B) { +func BenchmarkMapAndDrain(b *testing.B) { for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] { - ForEach(in, n, func(x int) error { - time.Sleep(benchmarkWorkDuration) - return nil - }) - return nil - }) - } -} - -func BenchmarkMap(b *testing.B) { - for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] { - return Map(in, n, func(x int) (int, error) { - busySleep(benchmarkWorkDuration) + runBenchmark(b, th.Name(n), func(in <-chan Try[int]) { + out := Map(in, n, func(x int) (int, error) { + benchmarkIteration() return x, nil }) - }) - } -} -func BenchmarkMapWithSleep(b *testing.B) { - for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] { - return Map(in, n, func(x int) (int, error) { - time.Sleep(benchmarkWorkDuration) - return x, nil - }) + Drain(out) }) } } func BenchmarkReduce(b *testing.B) { for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan int { - Reduce(in, n, func(x, y int) (int, error) { - busySleep(benchmarkWorkDuration) - return x, nil - }) - return nil - }) - } -} - -func BenchmarkReduceWithSleep(b *testing.B) { - for _, n := range []int{1, 2, 4, 8} { - runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan int { - Reduce(in, n, func(x, y int) (int, error) { - time.Sleep(benchmarkWorkDuration) + runBenchmark(b, th.Name(n), func(in <-chan Try[int]) { + _, _, _ = Reduce(in, n, func(x, y int) (int, error) { + benchmarkIteration() return x, nil }) - return nil }) } }