-
Notifications
You must be signed in to change notification settings - Fork 0
/
scatter.go
243 lines (194 loc) · 6.95 KB
/
scatter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package scatter
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// Error provides a wrapper struct around an error object so that we can include the original input with the error. This
// is due to not being able to guarantee the order in which tasks are executed in a goroutine pool.
type Error[I any] struct {
Input I
Error error
}
// RunIOFn specifies the signature for the function being passed into the pool which will get executed for each input
// and that produces an output.
type RunIOFn[I any, O any] func(I) (O, error)
// RunIFn specifies the signature for the function being passed into the pool which will get executed for each input
// and that produces no output.
type RunIFn[I any] func(I) error
// RunIO creates a specified number of goroutines and executes a batch on inputs concurrently as well as aggregates the
// results and/or errors.
func RunIO[I any, O any](numRoutines uint, inputs []I, fn RunIOFn[I, O]) (results []O, errors []Error[I]) {
return runIO(context.Background(), numRoutines, inputs, fn)
}
// RunI creates a specified number of goroutines and executes a batch on inputs concurrently as well as aggregates the
// errors.
func RunI[I any](numRoutines uint, inputs []I, fn RunIFn[I]) (errors []Error[I]) {
return runI(context.Background(), numRoutines, inputs, fn)
}
// RunIOCtx creates specified number of goroutines and executes a batch on inputs concurrently as well as aggregates the
// results and/or errors. In addition, it takes a context which when the deadline is received, executions will stop and
// timeout errors returned for the remaining jobs.
func RunIOCtx[I any, O any](ctx context.Context, numRoutines uint, inputs []I, fn RunIOFn[I, O]) (results []O, errors []Error[I]) {
return runIO(ctx, numRoutines, inputs, fn)
}
// RunICtx creates specified number of goroutines and executes a batch on inputs concurrently as well as aggregates the
// errors. In addition, it takes a context which when the deadline is received, executions will stop and timeout errors
// returned for the remaining jobs.
func RunICtx[I any](ctx context.Context, numRoutines uint, inputs []I, fn RunIFn[I]) (errors []Error[I]) {
return runI(ctx, numRoutines, inputs, fn)
}
// RunIOWithDeadline is a helper function to run the inputs with a given deadline.
func RunIOWithDeadline[I any, O any](deadline time.Time, numRoutines uint, inputs []I, fn RunIOFn[I, O]) (results []O, errors []Error[I]) {
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
return runIO(ctx, numRoutines, inputs, fn)
}
// RunIWithDeadline is a helper function to run the inputs with a given deadline.
func RunIWithDeadline[I any](deadline time.Time, numRoutines uint, inputs []I, fn RunIFn[I]) (errors []Error[I]) {
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
return runI(ctx, numRoutines, inputs, fn)
}
// RunIOWithTimeout is a helper function to run the inputs with a given timeout.
func RunIOWithTimeout[I any, O any](timeout time.Duration, numRoutines uint, inputs []I, fn RunIOFn[I, O]) (results []O, errors []Error[I]) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return runIO(ctx, numRoutines, inputs, fn)
}
// RunIWithTimeout is a helper function to run the inputs with a given timeout.
func RunIWithTimeout[I any](timeout time.Duration, numRoutines uint, inputs []I, fn RunIFn[I]) (errors []Error[I]) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return runI(ctx, numRoutines, inputs, fn)
}
func runIO[I any, O any](ctx context.Context, numRoutines uint, inputs []I, fn RunIOFn[I, O]) (results []O, errors []Error[I]) {
rfn := tryIO(fn)
var wg sync.WaitGroup
// create channels for the inputs, outputs as well as any errors that get generated
ic := make(chan I, numRoutines+1)
oc := make(chan O, len(inputs)+1)
ec := make(chan Error[I], len(inputs)+1)
// limit the wait group to the number provided by the caller
wg.Add(int(numRoutines))
// create the routines
for i := uint(0); i < numRoutines; i++ {
go func() {
for {
// grab the next input from the channel, stopping if the channel has been closed
in, ok := <-ic
if !ok {
wg.Done()
return
}
// check that the context has not been canceled or the deadline reached
select {
case <-ctx.Done():
ec <- Error[I]{Input: in, Error: ctx.Err()}
default:
// execute the actual provided function
out, err := rfn(in)
if err != nil {
// wrap the error to include the input for identification purposes on the caller's side
ec <- Error[I]{Input: in, Error: err}
} else {
oc <- out
}
}
}
}()
}
// pump all the inputs into the input channel so that they can be executed
for _, in := range inputs {
ic <- in
}
// close the channel to indicate all inputs have been taken
close(ic)
// wait for all jobs to finish
wg.Wait()
// close the output and error channels
close(oc)
close(ec)
// aggregate all successful outputs
for out := range oc {
results = append(results, out)
}
// aggregate all errors
for e := range ec {
errors = append(errors, e)
}
return
}
func runI[I any](ctx context.Context, numRoutines uint, inputs []I, fn RunIFn[I]) (errs []Error[I]) {
rfn := tryI(fn)
var wg sync.WaitGroup
// create channels for the inputs, outputs as well as any errors that get generated
ic := make(chan I, numRoutines+1)
ec := make(chan Error[I], len(inputs)+1)
// limit the wait group to the number provided by the caller
wg.Add(int(numRoutines))
// create the routines
for i := uint(0); i < numRoutines; i++ {
go func() {
for {
// grab the next input from the channel, stopping if the channel has been closed
in, ok := <-ic
if !ok {
wg.Done()
return
}
// check that the context has not been canceled or the deadline reached
select {
case <-ctx.Done():
ec <- Error[I]{Input: in, Error: ctx.Err()}
default:
// execute the actual provided function
err := rfn(in)
if err != nil {
// wrap the error to include the input for identification purposes on the caller's side
ec <- Error[I]{Input: in, Error: err}
}
}
}
}()
}
// pump all the inputs into the input channel so that they can be executed
for _, in := range inputs {
ic <- in
}
// close the channel to indicate all inputs have been taken
close(ic)
// wait for all jobs to finish
wg.Wait()
// close the output and error channels
close(ec)
// aggregate all errs
for e := range ec {
errs = append(errs, e)
}
return
}
func tryI[I any](fn RunIFn[I]) RunIFn[I] {
return func(i I) (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprintf("%v", r))
}
}()
err = fn(i)
return
}
}
func tryIO[I any, O any](fn RunIOFn[I, O]) RunIOFn[I, O] {
return func(i I) (o O, err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprintf("%v", r))
}
}()
o, err = fn(i)
return
}
}