Skip to content

Commit bc8e921

Browse files
authored
add a library to concurrently executes tasks (#197)
1 parent 86e9195 commit bc8e921

File tree

4 files changed

+161
-0
lines changed

4 files changed

+161
-0
lines changed

ctask/doer.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ctask
2+
3+
import (
4+
"context"
5+
"runtime"
6+
7+
"golang.org/x/sync/errgroup"
8+
)
9+
10+
type DoOpt func(cfg *DoConfig)
11+
type DoConfig struct {
12+
WorkerNum int
13+
}
14+
15+
// Do execute tasks using the given executor function,
16+
// and return the results in the same order as the given tasks respectively.
17+
// It stops executing remaining tasks after any first error is encountered.
18+
//
19+
// The max number of goroutines can optionally be specified using the option WithWorkerNum.
20+
// By default, it is set to runtime.NumCPU()
21+
func Do[Task any, Result any](
22+
ctx context.Context,
23+
tasks []Task,
24+
executor func(t Task) (Result, error),
25+
opts ...DoOpt,
26+
) ([]Result, error) {
27+
cfg := getConfigWithOptions(opts...)
28+
29+
g, ctx := errgroup.WithContext(ctx)
30+
g.SetLimit(int(cfg.WorkerNum))
31+
results := make([]Result, len(tasks))
32+
for idx, task := range tasks {
33+
idx, task := idx, task // retain current loop values to be used in goroutine
34+
g.Go(func() error {
35+
select {
36+
case <-ctx.Done():
37+
return ctx.Err()
38+
default:
39+
res, err := executor(task)
40+
if err != nil {
41+
return err
42+
}
43+
results[idx] = res
44+
return nil
45+
}
46+
})
47+
}
48+
if err := g.Wait(); err != nil {
49+
return nil, err
50+
}
51+
return results, nil
52+
}
53+
54+
func getConfigWithOptions(opts ...DoOpt) DoConfig {
55+
cfg := DoConfig{
56+
WorkerNum: runtime.NumCPU(),
57+
}
58+
for _, opt := range opts {
59+
opt(&cfg)
60+
}
61+
return cfg
62+
}
63+
64+
func WithWorkerNum(num int) DoOpt {
65+
return func(cfg *DoConfig) {
66+
cfg.WorkerNum = num
67+
}
68+
}

ctask/doer_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package ctask
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestDo(t *testing.T) {
12+
type T = int // task type
13+
type R = int // result type
14+
15+
type args struct {
16+
ctx context.Context
17+
tasks []T
18+
executor func(t T) (R, error)
19+
opts []DoOpt
20+
}
21+
tests := []struct {
22+
name string
23+
args args
24+
want []R
25+
requireErr require.ErrorAssertionFunc
26+
}{
27+
{
28+
name: "happy path",
29+
args: args{
30+
ctx: context.Background(),
31+
tasks: []T{0, 1, 2, 3, 4, 5, 6},
32+
executor: fibonacci,
33+
opts: nil,
34+
},
35+
want: []R{1, 1, 2, 3, 5, 8, 13},
36+
requireErr: require.NoError,
37+
},
38+
{
39+
name: "empty slice",
40+
args: args{
41+
ctx: context.Background(),
42+
tasks: []T{0, 1, 2, 3, 4, 5, 6},
43+
executor: fibonacci,
44+
opts: nil,
45+
},
46+
want: []R{1, 1, 2, 3, 5, 8, 13},
47+
requireErr: require.NoError,
48+
},
49+
{
50+
name: "error path & ensure tasks after error aren't executed (1000th fibonacci is too slow to be computed)",
51+
args: args{
52+
ctx: context.Background(),
53+
tasks: []T{0, 1, 2, 1, -1, 1000},
54+
executor: fibonacci,
55+
opts: []DoOpt{WithWorkerNum(1)},
56+
},
57+
want: nil,
58+
requireErr: func(t require.TestingT, err error, i ...interface{}) {
59+
require.Equal(t, errors.New("negative"), err)
60+
},
61+
},
62+
}
63+
for _, tt := range tests {
64+
t.Run(tt.name, func(t *testing.T) {
65+
got, err := Do(tt.args.ctx, tt.args.tasks, tt.args.executor, tt.args.opts...)
66+
tt.requireErr(t, err)
67+
require.Equal(t, tt.want, got)
68+
})
69+
}
70+
}
71+
72+
func fibonacci(n int) (int, error) {
73+
if n < 0 {
74+
return 0, errors.New("negative")
75+
}
76+
if n < 2 {
77+
return 1, nil
78+
}
79+
r1, err := fibonacci(n - 1)
80+
if err != nil {
81+
return 0, err
82+
}
83+
84+
r2, err := fibonacci(n - 2)
85+
if err != nil {
86+
return 0, err
87+
}
88+
89+
return r1 + r2, nil
90+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/streadway/amqp v1.0.0
1919
github.com/stretchr/testify v1.7.0
2020
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91
21+
golang.org/x/sync v0.1.0
2122
gorm.io/driver/postgres v1.2.3
2223
gorm.io/gorm v1.22.4
2324
gotest.tools v2.2.0+incompatible

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
619619
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
620620
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
621621
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
622+
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
623+
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
622624
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
623625
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
624626
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

0 commit comments

Comments
 (0)